もやもやエンジニア

IT系のネタで思ったことや技術系のネタを備忘録的に綴っていきます。フロント率高め。

Elixir 入門 ~ MIX AND OTP ~ 実際にOTPを使ってプロセス管理をする

  • ElixirのGetting StartもMix/OTPが終わり、次に進むのですが、復習がてら前にElixirで書いたコードをMix and OTPの章でやったことを使って改善してみます。
  • 元ネタはQiitaの自分の投稿 ElixirでSlackのbotを作ってHerokuで動かしてみる - Qiita
    • いまいちQiita とはてブの使い分けが明確になってないけど、今のとこ、完全に自分用のメモとかQiitaに上げるまでもないかなと思ったものははてブに書いてます。

やりたいこと

  • とにかくElixirでなんか作ろうと思って書いたのが上の記事ですが、書いていた時に微妙だなーと思ってた点が2つあります。
    1. アプリケーションの起動が微妙。わざわざboot用のファイルを作ってmix run で指定するなんてことはしなくてもできるはず。
    2. Slackからメッセージを受け取り解析してメッセージを送り返すという動作を一つのプロセスで回しているので、botに重いことをさせようとすると、全体が遅れてしまう。プロセスを分離してbotの行動の一つ一つは別プロセスで動かしたい。

改善してみる

  • コードはこちらにあがっています。

rei-m/magic_bot · GitHub

アプリケーションの起動まわり

  • mix.exs を以下のように編集してapplication起動時のモジュールを指定して、Applicationビヘイビアを実装したMagicBotモジュールを作成してそこに起動用のコードを書けば解決。
  def application do
    [applications: [:logger, :slack],
    mod: {MagicBot, []}]
  end
  • これでmix run --no-halt でアプリケーションが起動するようになりました。

Supervision Tree を使ってプロセスを分離する

  • 今、プロセス1本で動いてるところをこういうSupervision Treeに変える。

  • アプリケーションのsupervisor

    • Slackのソケットを張り続けるプロセス
    • Botに実行させたいコマンドを管理するsupervisor
      • 各コマンド。命令を受け取った時点でsupevisor経由で動的に作成される。
  • Slackからメッセージを受け取ったらパースしてsupervisor経由でやりたいことを実行させるという感じですね。

作ってみた

  • まずはアプリケーションの起点となるモジュールで全体を管理するSupervisorを起動

lib/magic_bot.ex

defmodule MagicBot do

  use Application

  def start(_type, _args) do
    MagicBot.Supervisor.start_link
  end

end
  • 次にBotのsupervisor

lib/magic_bot/supervisor.ex

defmodule MagicBot.Supervisor do

  use Supervisor

  def start_link(opts \\ []) do
    Supervisor.start_link(__MODULE__, :ok, opts)
  end

  # Define alias of process name
  @bot_name MagicBot.Bot
  @action_sup_name BotAction.Supervisor

  def init(:ok) do

    # Get API Token of Slack.
    api_key = case System.get_env("MAGICBOT_API_KEY") do
      nil -> Application.get_env(:MagicBot, :api_key)
      s -> s
    end

    # Make child process
    children = [
      supervisor(BotAction.Supervisor, [[name: @action_sup_name]]),
      worker(MagicBot.Bot, [api_key, [name: @bot_name, sup_action: @action_sup_name]])
    ]

    # 戦略は `one_for_one`でSlackとアクションのsupervisorを起動
    supervise(children, strategy: :one_for_one)
  end

end
  • MagicBot.Supervisorに管理されるSlackのコネクションを張り続けるモジュール

lib/magic_bot/bot.ex

defmodule MagicBot.Bot do

  use Slack

  def handle_connect(slack, state) do
    # Slackとの接続成功時に呼ばれるコールバック
    IO.puts "Connected as #{slack.me.name}"
    {:ok, state}
  end

  def handle_message(message = %{type: "message", text: _}, slack, state) do
    # Slackからメッセージを受け取った時に呼ばれるコールバック
    trigger = String.split(message.text, ~r{ | })
    
    case String.starts_with?(message.text, "<@#{slack.me.id}>: ") do
      # @bot名 ~ できたら :respond を渡してactionのプロセスを開始
      true -> BotAction.Supervisor.start_action(state[:sup_action], :respond, Enum.fetch!(trigger, 1), message, slack)
      # それ以外は :hear を渡して actionのプロセスを開始
      false -> BotAction.Supervisor.start_action(state[:sup_action], :hear, hd(trigger), message, slack)
    end
    {:ok, state}
  end

  def handle_message(_message, _slack, state) do
    {:ok, state}
  end
end
  • Botの行動を管理するSupervisor。

lib/bot_action/supervisor.ex

defmodule BotAction.Supervisor do

  def start_link(opts \\ []) do
    Task.Supervisor.start_link(opts)
  end

  def start_action(supervisor, command, trigger, message, slack) do
    # Slackのプロセスから呼ばれ、このSupervisor配下に新しいプロセスを作成する。
    # Task.Supervisorの子プロセスの戦略は `simple_one_for_one` になり、動的に追加できる。
    # クラッシュ時は再起動されない
    Task.Supervisor.start_child(supervisor, fn ->
      case command do
        :respond -> BotAction.Action.respond(trigger, message, slack)
        :hear -> BotAction.Action.hear(trigger, message, slack)
      end
    end)
  end

end
  • Botの行動を管理するモジュール。Botに何かさせたい時はこのモジュールに追加していく。

lib/bot_action/action.ex

defmodule BotAction.Action do

  def hear("lgtm", message, slack) do
    HTTPoison.start
    case URI.encode("http://www.lgtm.in/g") |> HTTPoison.get do
      {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> body
        |> Floki.find("#imageUrl")
        |> Floki.attribute("value")
        |> hd
        |> send_message(message, slack)
      {_, _} -> nil
    end
  end

  def hear(_, _, _) do end

  def respond("エリクサーほしい?", message, slack) do
     send_message("エリクサーちょうだい!\nhttp://img.yaplog.jp/img/01/pc/2/5/2/25253/1/1354.jpg", message, slack)
  end

  def respond(_, _, _) do end

  defp send_message(text, message, slack) do
    Slack.send_message(text, message.channel, slack)
  end

end
  • これで完成。

意図した通りに動くか確認

  • lib/bot_action/action.exに以下の関数を追加してみる。
  def hear("test1", message, slack) do
    # 単純にメッセージを返す
    send_message("test1", message, slack)
  end

  def hear("test2", message, slack) do
    # 5秒待ってからメッセージを返す
    :timer.sleep(5000)
    send_message("test2", message, slack)
  end

  def hear("test3", message, slack) do
    # 意図的に例外を起こす
    raise "oops"
    send_message("test3", message, slack)
  end
  • botを起動してSlackに"test2"を流したあとに"test1"を流す。

f:id:Rei19:20150921140421p:plain

  • test1はtest2のsleepに待たされることなく、即座にメッセージが返ってきたので、意図した通りに動いていてますね。

  • 次に"test2"を流したあとに"test3"を流して"test3"のプロセスをクラッシュさせてみます。

f:id:Rei19:20150921140720p:plain

  • test2はtest3のクラッシュの影響を受けずにメッセージを返していますね。test3のプロセスはメッセージを送る前にraiseしているのでメッセージは返ってきません。

  • という感じでOTPを使ってプロセス管理するとなかなかElixirっぽいコードになったかなという印象ですね。

Elixir 入門 ~ MIX AND OTP ~ その8 - Docs, tests and pipelines

  • TCP経由で受け取ったメッセージをパースしてKVアプリケーションに送る処理を実装する。

Doctests

  • doctestを使ってパースする関数を実装する。これはドキュメントからテストを作成することができ、ドキュメント内の正確なサンプルコードの提供を助けてくれる。lib/kv_server/command.exを新しく作ってみよう。
defmodule KVServer.Command do
  @doc ~S"""
  Parses the given `line` into a command.

  ## Examples

      iex> KVServer.Command.parse "CREATE shopping\r\n"
      {:ok, {:create, "shopping"}}

  """
  def parse(line) do
    :not_implemented
  end
end
  • doctestiex>のラインから始まる、スペース4文字分インデントが下がっているブロックのところで、コマンドと、その次の行にコマンドが返すであろう値を書く
  • doctestを走らせてみよう。新しくテスト用のtest/kv_server/command_test.exsを作成してmix testする。
defmodule KVServer.CommandTest do
  use ExUnit.Case, async: true
  doctest KVServer.Command
end
  • 実行すると以下のログが出て、doctestが実行されていることがわかる。
     test/kv_server/command_test.exs:3
     Doctest failed
     code: KVServer.Command.parse "CREATE shopping\r\n" === {:ok, {:create, "shopping"}}
     lhs:  :not_implemented
     stacktrace:
       lib/kv_server/command.ex:11: KVServer.Command (module)
  • ではparse/1を実装してテストが通るようにしよう。stlingを受け取ってパターンマッチで"CREATE"だったら返すようにする。直した上でテストを走らせる。今度は通るはず。
def parse(line) do
  case String.split(line) do
    ["CREATE", bucket] -> {:ok, {:create, bucket}}
  end
end
  • 他のテストも追加していこう。
    iex> KVServer.Command.parse "CREATE shopping\r\n"
    {:ok, {:create, "shopping"}}

    iex> KVServer.Command.parse "CREATE  shopping  \r\n"
    {:ok, {:create, "shopping"}}

    iex> KVServer.Command.parse "PUT shopping milk 1\r\n"
    {:ok, {:put, "shopping", "milk", "1"}}

    iex> KVServer.Command.parse "GET shopping milk\r\n"
    {:ok, {:get, "shopping", "milk"}}

    iex> KVServer.Command.parse "DELETE shopping eggs\r\n"
    {:ok, {:delete, "shopping", "eggs"}}

Unknown commands or commands with the wrong number of
arguments return an error:

    iex> KVServer.Command.parse "UNKNOWN shopping eggs\r\n"
    {:error, :unknown_command}

    iex> KVServer.Command.parse "GET shopping\r\n"
    {:error, :unknown_command}
  • そして実装も追加。
  def parse(line) do
    case String.split(line) do
      ["CREATE", bucket] -> {:ok, {:create, bucket}}
      ["GET", bucket, key] -> {:ok, {:get, bucket, key}}
      ["PUT", bucket, key, value] -> {:ok, {:put, bucket, key, value}}
      ["DELETE", bucket, key] -> {:ok, {:delete, bucket, key}}
      _ -> {:error, :unknown_command}
    end
  end

Pipelines

  • kv_server.exに作成したコマンドを組み込んでいこう。その前にcommand.exに以下の関数を追加する。
  @doc """
  Runs the given command.
  """
  def run(command) do
    {:ok, "OK\r\n"}
  end
  • そしてkv_server.exでクライアントからメッセージを受け取った際にコマンドを実行するようにする。
defp serve(socket) do
  # 受け取ったメッセージを元にコマンドを実行する
  msg =
    case read_line(socket) do
      {:ok, data} ->
        case KVServer.Command.parse(data) do
          {:ok, command} ->
            KVServer.Command.run(command)
          {:error, _} = err ->
            err
        end
      {:error, _} = err ->
        err
    end

  # コマンドの結果を出力
  write_line(socket, msg)
  serve(socket)
end

defp read_line(socket) do
  :gen_tcp.recv(socket, 0)
end

defp write_line(socket, msg) do
  :gen_tcp.send(socket, format_msg(msg))
end

defp format_msg({:ok, text}), do: text
defp format_msg({:error, :unknown_command}), do: "UNKNOWN COMMAND\r\n"
defp format_msg({:error, _}), do: "ERROR\r\n"
  • ここまでできたら再びmix run --no-haltでサーバーを起動してtelnetで接続しよう。CREATEコマンドを打ったらOKが、未定義のコマンドを打ったらUNKNOWN COMMANDが返ってくるはず
CREATE shopping
OK
HELLO
UNKNOWN COMMAND
  • ここで、ふたたびserveを見るとネストが深く見辛いコードになっているように感じる。できれば |> でつなげて書きたいが、コマンドが返す値は:okを含んだtupleかerrorを含んだtupleが返る可能性があるので返り値をうまくさばけない。{:ok, _}が返ってくる限り、値をパイプで私続けるような機能が欲しい。
  • elixir-pipesというモジュールがまさにあてはまる。mix.exsを開いて追加しよう。
def application do
  [applications: [:logger, :pipe, :kv],
   mod: {KVServer, []}]
end

defp deps do
  [{:kv, in_umbrella: true},
   {:pipe, github: "batate/elixir-pipes"}]
end
  • 保存したらmix deps.getして依存モジュールを取得しよう。そしてこれを使うことでserve/1は以下のように書き直すことができる。
defp serve(socket) do
  import Pipe

  # pipe_matching/3は x, {:ok, x} で {:ok, value} が与えられている間はvalueを次の関数に渡し続けるように命令している
  # マッチしない場合はマッチしなかった値を返す。
  msg =
    pipe_matching x, {:ok, x},
      read_line(socket)
      |> KVServer.Command.parse()
      |> KVServer.Command.run()

  write_line(socket, msg)
  serve(socket)
end

Running commands

  • 最後にKVServer.Command.run/1を実装してKVアプリケーションに命令を渡すようにしよう。
@doc """
Runs the given command.
"""
def run(command)

def run({:create, bucket}) do
  KV.Registry.create(KV.Registry, bucket)
  {:ok, "OK\r\n"}
end

def run({:get, bucket, key}) do
  lookup bucket, fn pid ->
    value = KV.Bucket.get(pid, key)
    {:ok, "#{value}\r\nOK\r\n"}
  end
end

def run({:put, bucket, key, value}) do
  lookup bucket, fn pid ->
    KV.Bucket.put(pid, key, value)
    {:ok, "OK\r\n"}
  end
end

def run({:delete, bucket, key}) do
  lookup bucket, fn pid ->
    KV.Bucket.delete(pid, key)
    {:ok, "OK\r\n"}
  end
end

defp lookup(bucket, callback) do
  case KV.Registry.lookup(KV.Registry, bucket) do
    {:ok, pid} -> callback.(pid)
    :error -> {:error, :not_found}
  end
end
  • 受け取った命令に応じてbucketを操作するように直した。lookupを見てみよう。bucketが見つからなかったら{:error, :not_found}を返すようにしているが、ユーザーに表示する場合もNot Foundであることを伝えるようにしたほうがよい。KV.Serverformat_msg/1 のパターンを追加する。
defp format_msg({:error, :not_found}), do: "NOT FOUND\r\n"
  • これでサーバーはほとんど完成した。最後にテストを追加しよう。
defmodule KVServerTest do
  use ExUnit.Case

  setup do
    Logger.remove_backend(:console)
    # KVアプリケーションを止めてstateをリセットした上で再開
    Application.stop(:kv)
    :ok = Application.start(:kv)
    Logger.add_backend(:console, flush: true)
    :ok
  end

  setup do
    # クライアントからの接続を作成
    opts = [:binary, packet: :line, active: false]
    {:ok, socket} = :gen_tcp.connect('localhost', 4040, opts)
    {:ok, socket: socket}
  end

  test "server interaction", %{socket: socket} do
    assert send_and_recv(socket, "UNKNOWN shopping\r\n") ==
           "UNKNOWN COMMAND\r\n"

    assert send_and_recv(socket, "GET shopping eggs\r\n") ==
           "NOT FOUND\r\n"

    assert send_and_recv(socket, "CREATE shopping\r\n") ==
           "OK\r\n"

    assert send_and_recv(socket, "PUT shopping eggs 3\r\n") ==
           "OK\r\n"

    # GET returns two lines
    assert send_and_recv(socket, "GET shopping eggs\r\n") == "3\r\n"
    assert send_and_recv(socket, "") == "OK\r\n"

    assert send_and_recv(socket, "DELETE shopping eggs\r\n") ==
           "OK\r\n"

    # GET returns two lines
    assert send_and_recv(socket, "GET shopping eggs\r\n") == "\r\n"
    assert send_and_recv(socket, "") == "OK\r\n"
  end

  defp send_and_recv(socket, command) do
    :ok = :gen_tcp.send(socket, command)
    {:ok, data} = :gen_tcp.recv(socket, 0, 1000)
    data
  end
end
  • mix testを走らせて通れば完成だ。

Elixir 入門 ~ MIX AND OTP ~ その7 - Task and gen-tcp

  • Erlang:gen_tcp モジュールについて学ぶ。

Echo server

  • まずはEcho Serverを作成することから始める。TCP Serverは以下のStepを実行する
  • 利用可能なPortを開いてListenする
  • そのPortでクライアントからの接続を待ち、受け入れる
  • クライアントからの要求を解析し、レスポンスを返す。
  • KVServerアプリケーションにこれらの機能を組み込んでみる。lib/kv_server.exを開いて以下のように編集する。
# 指定されたportでListenを開始する
def accept(port) do
  # The options below mean:
  #
  # 1. `:binary` - receives data as binaries (instead of lists)
  # 2. `packet: :line` - receives data line by line
  # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
  # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
  #
  {:ok, socket} = :gen_tcp.listen(port,
                    [:binary, packet: :line, active: false, reuseaddr: true])
  IO.puts "Accepting connections on port #{port}"
  loop_acceptor(socket)
end

# socketを受け続けるループ。
defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

# socketを受け取っったときの処理
defp serve(socket) do
  socket
  |> read_line()
  |> write_line(socket)

  serve(socket)
end

# socketを読み込む
defp read_line(socket) do
  {:ok, data} = :gen_tcp.recv(socket, 0)
  data
end

# TCPを通してResponseを返す
defp write_line(line, socket) do
  :gen_tcp.send(socket, line)
end
  • iex -S mixからREPLの中でListenを開始してみる
iex> KVServer.accept(4040)
Accepting connections on port 4040
  • 開始されたらTerminalからtelnetでローカルホストの4040ポートに接続して、適当な文字を送ってみる
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello world !!
hello world !!
  • 送ったhello world !! がそのまま返されていることがわかる。
  • telnetの接続を終了させると以下のようなエラーがサーバー側に表示される。
iex(1)> KVServer.accept(4040)
Accepting connections on port 4040
** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:50: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:43: KVServer.serve/1
    (kv_server) lib/kv_server.ex:37: KVServer.loop_acceptor/1
  • これは gen_tcp.recv/2でsocketを受け取ることを期待しているのに接続が閉じられたためで、その場合のハンドルが足りていない。そして、例外が発生したと同時にサーバーが終了して再起動しないことがわかる。これらを解決するためにはsupervision treeの元でプロセスを動かす必要があることに気づくだろう。

Tasks

  • Taskモジュールを使ってKVServerをsupervision treeの監督下の元で動かす。再びlib/kv_server.exを編集する。
def start(_type, _args) do
  import Supervisor.Spec

  children = [
    worker(Task, [KVServer, :accept, [4040]])
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end
  • これでサーバーはsupervision treeの一部となったのでmix run --no-haltでアプリケーションを動かしてみよう。--no-haltはアプリケーションのプロセスを止めないで起動するオプションである。
  • サーバーが立ち上がるはずなので同じくtelnetでローカルホストの4040ポートに接続してメッセージを送ってみる。問題なくメッセージは帰ってくるはずだ。
  • さきほどと同じようにtelnetを終了してみる。そうすると今度はエラーメッセージは表示されるが、サーバー自体は終了しない。これはone_for_one戦略に従いsupervisorが終了したサーバーのプロセスを再起動しているため。
  • では、別のターミナルで2個目のコネクションを張って同じくメッセージを送ってみる。すると接続はできるがレスポンスは返ってこない。今の作りでは接続中のクライアントがすでに存在していると新しいクライアントは受け入れられていないことがわかる。

Task supervisor

  • 複数のコネクションを処理するためにはコネクションを受け入れるプロセスとコネクションを処理するプロセスを分離する必要がある。
  • 再びstart/2を編集する。
  def start(_type, _args) do
    import Supervisor.Spec

    # TaskのSupervisorをsupervision treeに追加
    children = [
      supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
      worker(Task, [KVServer, :accept, [4040]])
    ]

    # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end
  • TaskSupervisorsupervision treeの下に追加した。次にコネクションを処理するプロセスをTaskSupervisorの下に作るようにloop_acceptor/1を変更する。
defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid)
  loop_acceptor(socket)
end
  • :ok = :gen_tcp.controlling_process(client, pid)が追加された。これは子供のプロセスにクライアントからのソケットの"controlling process"を処理させる。これをしないとソケットはacceptするプロセスに縛られるため、クラッシュするとすべての子プロセスが死ぬ。
  • mix run --no-haltでサーバーを開始して複数のクライアントから接続できるようになっているのを確認しよう。また、複数接続した後でどれかのコネクションを終了させても他の接続は保たれているのも確認できる。

Elixir 入門 ~ MIX AND OTP ~ その6 - Dependencies and umbrella projects

  • KVアプリケーションは完成したので次にそれを実装するTCPサーバーを作成する。その前にMixDependencyについて理解する。

External dependencies

  • 外部依存はビジネスドメインに縛られない。例えば今のKVアプリケーションのためにHTTP APIが必要であれば外部依存するパッケージとしてPlugを使う。
  • パッケージはHex Package Manager に登録されている。依存関係は mix.exsdeps関数に定義する。
def deps do
  # 0.5.x で最新のバージョンを取得
  [{:plug, "~> 0.5.0"}]
end
  • Hexで管理されているパッケージは安定版だが、開発中のものを使いたいときはgit経由でも書くことができる。
def deps do
  [{:plug, git: "git://github.com/elixir-lang/plug.git"}]
end
  • 依存パッケージを追加してコンパイルするとMixmix.lockを生成する。これは依存関係にあるパッケージのどのバージョンを使ったかが記録されている。

Internal dependencies

  • 内部依存はプロジェクト特有のもので、プロジェクト外では意味をなさずプライベートにしておきたいもので、内部依存のアプリケーションを使いたい場合は、git経由か、umbrella projectsという方法がある。umbrella projectsを使うと一つのプロジェクトの中に複数のアプリケーションをホストでき、単一のリポジトリで管理できるので通常はこちらを使う方が管理しやすい。これまでに作ったKVアプリケーションを含んだプロジェクト kv_umbrella を作成する。

Umbrella projects

  • 複数のアプリケーションを管理する kv_umbrella を新しく作成する。mix newのときに--umbrellaオプションを付与する。
mix new kv_umbrella --umbrella
  • 実行するとコンソールに表示される内容が --umbrellaなしでmix newしたときと変わっているはずだ。mix.exsのコードもまた変わっている。--umbrellaオプションを指定したことでこのアプリケーションがumbrellaとして動作するようになった。
  • 次にkv_umbrella/apps/に移動してkv_serverアプリケーションを作成する。こちらは作成の際に--supオプションをつける。これはMixが自動でsupervision treeを生成してくれるようになる。
mix new kv_server --module KVServer --sup
  • KVServerのmix.exsを見てみよう。
defmodule KVServer.Mixfile do
  use Mix.Project

  def project do
    [app: :kv_server,
     version: "0.0.1",
     deps_path: "../../deps",
     lockfile: "../../mix.lock",
     elixir: "~> 1.0",
     deps: deps]
  end

  def application do
    [applications: [:logger],
     mod: {KVServer, []}]
  end

  defp deps do
    []
  end
end
  • deps_pathlockfileが追加されているが、これはappsの下でmix newをしたことでMixが自動で全体の構造を解釈して追加している。このアプリケーションが外部依存するパッケージはkv_umbrellaと共有されることになった。
  • また、application/0の中も変わって、mod: {KVServer, []}が追加されている。これは--supフラグを追加したことによるもので、KVServerが全体のアプリケーションのsupervision treeを開始することを意味する。
  • lib/kv_server.exの中を見ると前の章で作成したsupervisorの実装が自動で組み込まれていることがわかる。あとはこれに監督されるアプリケーションを追加していく。

In umbrella dependencies

  • apps/kv_server/mix.exsを開いてkvを追加する。
defp deps do
  # kvがkv_serverの中で使えるようになる
  [{:kv, in_umbrella: true}]
end
  • depsは依存関係を解決するだけなのでKVアプリケーションが開始されるわけではない。applicaiton/0に追加してKVアプリケーションが開始されるようにする。
def application do
  # kv_serverが開始されれば`kv`も開始されるようになる
  [applications: [:logger, :kv],
   mod: {KVServer, []}]
end
  • 次にappsの下に今まで作成したKVアプリケーションを移動する。KVのmix.exsもKVServerと同様に以下のコードを追加しておく。
deps_path: "../../deps",
lockfile: "../../mix.lock",
  • ここまで出来たらkv_umbrellaのrootに移動してmix testを実行してみる。2つのアプリケーションのテストが走るはず。
  • このようにUmbrella Projectはアプリケーションを管理するのに便利である。管理されるアプリケーションは切り離されていて個別の設定が定義できる。個別に開発することもできるし、リストを明示的にして一緒に使うこともできる。

Elixir 入門 ~ MIX AND OTP ~ その5 - ETS

  • キャッシュ機構であるErlang Term Storageについて学ぶ。

ETS as a cache

  • ETSは etsモジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected])
8211
iex(2)> :ets.insert(table, {"foo", self})
true
iex(3)> :ets.lookup(table, "foo")
[{"foo", #PID<0.59.0>}]
  • ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では:buckets_registryというテーブルにテーブルの型とアクセスルールを与えている。:setはキーの重複を許さないという意味、:protected はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。
  • [:set, :protected] はデフォルト値なので指定しなかった場合は自動で設定される。
  • lib/kv/registry.exを開いてETSを組み込んでみよう
defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(table, event_manager, buckets, opts \\ []) do
    # 1. ETSのテーブルのプロセスをstart_linkに追加
    GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `table`.

  Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
  """
  def lookup(table, name) do
    # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す
    case :ets.lookup(table, name) do
      [{^name, bucket}] -> {:ok, bucket}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  def init({table, events, buckets}) do
    # 3. HashDictからETS tableに変更
    ets  = :ets.new(table, [:named_table, read_concurrency: true])
    refs = HashDict.new
    {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
  end

  # 4. handle_call Callbackは消しておく

  def handle_cast({:create, name}, state) do
    # 5. HashDictの代わりにETS Tableに対して Read/writeする
    case lookup(state.names, name) do
      {:ok, _pid} ->
        {:noreply, state}
      :error ->
        {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
        ref = Process.monitor(pid)
        refs = HashDict.put(state.refs, ref, name)
        :ets.insert(state.names, {name, pid})
        GenEvent.sync_notify(state.events, {:create, name, pid})
        {:noreply, %{state | refs: refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    # 6. BucketのプロセスがダウンしたらETS Tableから消す
    {name, refs} = HashDict.pop(state.refs, ref)
    :ets.delete(state.names, name)
    GenEvent.sync_notify(state.events, {:exit, name, pid})
    {:noreply, %{state | refs: refs}}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end
end
  • この修正でテストも壊れるので直しておく
setup do
  {:ok, sup} = KV.Bucket.Supervisor.start_link
  {:ok, manager} = GenEvent.start_link
  {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)

  GenEvent.add_mon_handler(manager, Forwarder, self())
  {:ok, registry: registry, ets: :registry_table}
end
  • 各テストの引数を %{registry: registry, ets: ets}とし、setupで追加したテーブル名を受け取れるようにしておく。
  • データを取り出す先が ETS になったので、KV.Registry.lookup(registry, ...)KV.Registry.lookup(ets, ...)に変更。
  • まだ失敗する。。。 assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping") ここで落ちる。
    • lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
  • 再び lib/kv/registry.ex を修正する。
def create(server, name) do
  # cast から call に
  GenServer.call(server, {:create, name})
end

# handle_callに。callになったので _fromを追加。使わないけど
def handle_call({:create, name}, _from, state) do
  case lookup(state.names, name) do
    {:ok, pid} ->
      # returnが必要なのでbucketのpidを返す。
      {:reply, pid, state} # Reply with pid
    :error ->
      {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      :ets.insert(state.names, {name, pid})
      GenEvent.sync_notify(state.events, {:create, name, pid})
      # returnが必要なのでbucket のpidを返す。
      {:reply, pid, %{state | refs: refs}} # Reply with pid
  end
end
  • これでmix testを実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。
  • 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
  • さきほどと違い、プロセスの停止のイベントをhandle_infoで拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Managerの通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
  Agent.stop(bucket)
  assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。
  assert KV.Registry.lookup(ets, "shopping") == :error
end
  • これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので liv/kv/supervisor.exを編集してETSのプロセスを追加する。
@manager_name KV.EventManager
@registry_name KV.Registry
@ets_registry_name KV.Registry
@bucket_sup_name KV.Bucket.Supervisor

def init(:ok) do
  children = [
    worker(GenEvent, [[name: @manager_name]]),
    supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
    worker(KV.Registry, [@ets_registry_name, @manager_name,
                         @bucket_sup_name, [name: @registry_name]])
  ]

  supervise(children, strategy: :one_for_one)
end

ETS as persistent storage

  • これまでに ETSのプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETSのプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registryのプロセスが終了すれば自動的にETSのプロセスも終了する。
  • では、ETSregistryのプロセスから切り離したらどうなるか。bucketの情報がETSに保存されているのであればregistryが死んだとしてもETSからbucketのプロセスをregistrysupervisorに復帰させることができる。

  • liv/kv/supervisor.ex のinitを以下のように編集する。

  def init(:ok) do
    # init時にetsを開始する。
    ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}])

    # テーブル名ではなくetsのプロセスを渡す。
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
      worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]])
    ]

    supervise(children, strategy: :one_for_one)
  end
  • 合わせてlib/kv/registry.ex のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do
  refs = HashDict.new
  {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
  • 最後にテストを直す。setup時にETSを開始して開始済みのプロセスをregistryに渡してやるようにする。ここまででmix testをして通ることを確認する。
setup do
  ets = :ets.new(:registry_table, [:set, :public])
  registry = start_registry(ets)
  {:ok, registry: registry, ets: ets}
end

defp start_registry(ets) do
  {:ok, sup} = KV.Bucket.Supervisor.start_link
  {:ok, manager} = GenEvent.start_link
  {:ok, registry} = KV.Registry.start_link(ets, manager, sup)

  GenEvent.add_mon_handler(manager, Forwarder, self())
  registry
end
  • では最後のテストケースを追加する。ETSのプロセスをregistryから分離したことでregistryが死んでもETSは死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do
  # とりあえずbucketを作成
  bucket = KV.Registry.create(registry, "shopping")

  # registryのプロセスを終了する
  Process.unlink(registry)
  Process.exit(registry, :shutdown)

  # 生き残っているetsを使ってregisryを起動しなおす
  start_registry(ets)

  # 最初に作成したbucketが取れることを確認
  assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}

  # bucketのプロセスを終了
  Process.exit(bucket, :shutdown)

  # ETSからの削除されていることを確認
  assert_receive {:exit, "shopping", ^bucket}
  assert KV.Registry.lookup(ets, "shopping") == :error
end
  • これを通すための修正をlib/kv/registry.ex に入れる。
def init({ets, events, buckets}) do
  # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す
  refs = :ets.foldl(fn {name, pid}, acc ->
    HashDict.put(acc, Process.monitor(pid), name)
  end, HashDict.new, ets)

  {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end

Elixir 入門 ~ MIX AND OTP ~ その4 - Supervisor and Application

  • この章ではElixirのポリシーである“fail fast” と “let it crash”を実現しているsupervisorについて学ぶ。

Our first supervisor

  • Supervisor behaviour を使ってsupervisorを実装したモジュール lib/kv/supervisor.ex を作成する。
defmodule KV.Supervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, :ok)
  end

  # attributeで子プロセス名を定義
  @manager_name KV.EventManager
  @registry_name KV.Registry

  # GenServerと同様にstart_linkで呼ばれるコールバック
  def init(:ok) do
    # `supervisor`は`event manager` と`registry`の2つの子プロセスを管理している。
    # 子プロセスは他のプロセスがpidを知らなくてもアクセスできるように別名をつけておく。例えば子プロセスがクラッシュして立ち上げ直した場合にpidは変わるが別名をつけておけばそのままアクセスできる
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      worker(KV.Registry, [@manager_name, [name: @registry_name]])
    ]

    # strategyは `:one_for_one` を指定してプロセスの管理を開始。この戦略はもし、子プロセスが死んだら新しいプロセスを作るというもの。
    supervise(children, strategy: :one_for_one)
  end
end
  • ファイルを作成したらiex -S mixで確認する
iex> KV.Supervisor.start_link
{:ok, #PID<0.100.0>}
iex> KV.Registry.create(KV.Registry, "shopping")
:ok
iex> KV.Registry.lookup(KV.Registry, "shopping")
{:ok, #PID<0.104.0>}
  • supervisorを開始すると同時に event managerregistry のプロセスが同時に開始された。なのでsuoervisor.start_linkのあとにbucketのcreateやlookupができている。

Understanding applications

  • いま、.appファイルは_build/dev/lib/kv/ebin/kv.ap に見つけることができる。
  • このファイルはErlangシンタックスで書かれており、ErlangカーネルElixir自身、mix.exsで定義した依存関係にあるモジュールやそのバージョンなどの情報が含まれている。
  • mix.exsapplication/0で返された値を使って.appをカスタマイズすることもできる。

Starting applications

  • iex -S mix run --no-start を使ってアプリケーションの開始・停止を手動で行ってみる。
iex(1)> Application.start(:kv)
:ok
iex(2)> Application.stop(:kv)
:ok

14:19:09.629 [info]  Application kv exited: :stopped
iex(3)> Application.stop(:logger)
:ok

=INFO REPORT==== 14-Sep-2015::14:19:16 ===
    application: logger
    exited: stopped
    type: temporary
iex(4)> Application.start(:kv)
{:error, {:not_started, :logger}}
  • 最初のstartはiex起動時に --no-startオプションがないと二重に起動しているという旨のエラーが返る。
  • 4行目のstartが失敗するのは依存関係にあるloggerが3行目で止められているから。このような場合は Application.ensure_all_started/1を使えば丸ごと立ち上げることができる。

The application callback

  • アプリケーションは起動時にcallbackを指定できる。callbackの戻り値は {:ok, pid}で、pidはsupervisorのpidでなければいけない。
  • callbackを実装するためにmix.exsを開いて以下のように編集する。
def application do
  [applications: [],
   mod: {KV, []}]
end
  • :modオプションがcallback時に起動するモジュールを指しており、そのモジュールは Application behaviour を実装していなければいけない。:modKV を指定したので次に KVApplication behaviour を実装する。lib/kv.exを開き、以下のように編集する。
defmodule KV do
  use Application

  # Application behaviourを実装するためには start/2 を実装する必要がある。
  def start(_type, _args) do
    KV.Supervisor.start_link
  end

  #  stop/1 を定義して停止の際にカスタムを入れることもできる
end
  • ここまでできたら iex -S mix を起動する。startですでにsupervisorのプロセスが開始されているのでKV.Supervisor.start_link をたたかなくてもすでにbucketの操作ができるようになっているはず。

Projects or applications?

  • MixはprojectApplication を区別する。mix.exsの内容に基づき、我々は :kv applicationを定義したMix Project を持っていると言える。あとの章ではどのような Applicationも定義しない Projectがでてくる。
  • チュートリアルの中で Project について話すときはMixについて考えるべきである。MixProjectを管理するツールで、どのようにProjectを編集し、テストし、関連した Applicationを開始するかを知っている。
  • ApplicationではOTPについて考えるべきである。Applicationはランタイムで開始して終了するエンティティで mix help compile.appdef application のオプションについて学ぶことができる。

Simple one for one supervisors

  • bucketのプロセスとregistryのプロセスはリンクしていてbucketのクラッシュはregistryのクラッシュと同意義である。registryがクラッシュした場合はsupervisorregistryを復活させることが保証されているが、bucketはすべてなくなってしまう。bucketがクラッシュしてもregistryは生き続けるようにしてみよう。
  • 次のテストを追加する。
  test "removes bucket on crash", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Kill the bucket and wait for the notification
    Process.exit(bucket, :shutdown)
    assert_receive {:exit, "shopping", ^bucket}
    assert KV.Registry.lookup(registry, "shopping") == :error
  end
  • “removes bucket on exit” テストと似ているが、Agent.stop/1 の代わりにshutdownの命令を送ってbucketのプロセス自体を破棄している。bucketのプロセスはregistryのプロセスとリンクしているのでこの行為はregistryのプロセスを破棄することに等しく、すなわちテストも失敗する。
  • この問題を解決するために新しい戦略 :simple_one_for_one を持ったsupervisorを定義する。このsupervisorはすべてのbucketを生み出し、管理する役目を負う。lib/kv/bucket/supervisor.exを新しく作成する。
defmodule KV.Bucket.Supervisor do
  use Supervisor

  def start_link(opts \\ []) do
    Supervisor.start_link(__MODULE__, :ok, opts)
  end

  # 受け取ったsupervisorの子プロセスとしてbucketを開始する関数
  # KV.Bucket.start_linkの代わりに呼び出すようになる
  def start_bucket(supervisor) do
    Supervisor.start_child(supervisor, [])
  end

  def init(:ok) do
    # restart: :temporaryはbucketが死んでも自動で再開しないことを明示している。
    # bucketはregistryを通してのみ管理されるようにする = start_bucket をでしか開始されない
    children = [
      worker(KV.Bucket, [], restart: :temporary)
    ]

    supervise(children, strategy: :simple_one_for_one)
  end
end
  • iex -S mixBucket.Supervisorの動きを確認する。
iex(1)> {:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, #PID<0.90.0>}
iex(2)> {:ok, bucket} = KV.Bucket.Supervisor.start_bucket(sup)
{:ok, #PID<0.92.0>}
iex(3)> KV.Bucket.put(bucket, "eggs", 3)
:ok
iex(4)> KV.Bucket.get(bucket, "eggs")
3
  • 次に test/kv/registry_test.exs のsetupを編集して期待する動作を書く
  setup do
    # Bucketのsupervisorを開始(今回追加)
    {:ok, sup} = KV.Bucket.Supervisor.start_link
    # Event Managerを起動
    {:ok, manager} = GenEvent.start_link
    # EventManagerとBucketのsupervisorをレジストリに渡して起動(bucketのsupervisorのpidをargmentsに追加)
    {:ok, registry} = KV.Registry.start_link(manager, sup)

    GenEvent.add_mon_handler(manager, Forwarder, self())
    {:ok, registry: registry}
  end
  • 合わせて KV.Registry をテストが通るように編集
 ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(event_manager, buckets, opts \\ []) do
    # 1 bucketのsupervisorのプロセスを受け取れるようにする
    GenServer.start_link(__MODULE__, {event_manager, buckets}, opts)
  end
  ## Server callbacks

  def init({events, buckets}) do
    names = HashDict.new
    refs  = HashDict.new
    # 2 start_linkで追加したbucketのsupervisorのプロセスをstateに追加する。
    {:ok, %{names: names, refs: refs, events: events, buckets: buckets}}
  end

  def handle_cast({:create, name}, state) do
    if HashDict.get(state.names, name) do
      {:noreply, state}
    else
      # 3 bucketのsupervisor経由で新しいbucketが作成されるようにする
      {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      names = HashDict.put(state.names, name, pid)
      GenEvent.sync_notify(state.events, {:create, name, pid})
      {:noreply, %{state | names: names, refs: refs}}
    end
  end
  • ここまで追加できたらtestを通してみる。正しく動くようになっているはず。

Supervision trees

  • Bucketsupervisorをアプリケーションで使うためには、supervisorを監督するsupervisorを作りsupervision treesを形成する必要がある。lib/kv/supervisor.exを編集する。
  @manager_name KV.EventManager
  @registry_name KV.Registry
  # bucketのsupervisorの別名を追加
  @bucket_sup_name KV.Bucket.Supervisor

  def init(:ok) do
    # childrenにbucketのsupervisorを追加し、registryのworkerのargumentsにbucketのsupervisorのpid(別名)を追加
    # bucketのsupervisorはregistryのworkerより前に定義しなければいけない。
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
      worker(KV.Registry, [@manager_name, @bucket_sup_name, [name: @registry_name]])
    ]

    # この時点での :one_for_one 戦略は正しい。registryがクラッシュしたらbucketのsupervisorも死ぬべきだし、その逆も同じ。
    supervise(children, strategy: :one_for_one)
  end

Elixir 入門 ~ MIX AND OTP ~ その4 - GenEvent

  • 複数のhandlerにイベントをpublishできるGenEventについて学ぶ。

Event managers

  • iex -S mix でセッションを新しく初め、GenEvent API を触ってみる。
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> GenEvent.sync_notify(manager, :hello)
:ok
iex> GenEvent.notify(manager, :world)
:ok
  • sync_notifynotifyでmanagerにメッセージを送信しているが、handlerが実装されていないのでpublishが成功したという意味の:okが返るだけで特になにもおきない。
  • 次に通知を受け取るhandlerを作成する.
iex> defmodule Forwarder do
...>   use GenEvent
...>   def handle_event(event, parent) do
...>     send parent, event
...>     {:ok, parent}
...>   end
...> end
iex> GenEvent.add_handler(manager, Forwarder, self())
:ok
iex> GenEvent.sync_notify(manager, {:hello, :world})
:ok
iex> flush
{:hello, :world}
:ok
  • Forwarderというハンドラを作成し、GenEvent.add_handler/3で今のプロセスで動くhandlerとして追加した。
  • Forwarderは単に受け取ったメッセージを親プロセスに送り返すだけのhandlerなのでflushでメッセージを解放してやると送り返されていることがわかる。
  • sync_notify/2 はリクエストに対し同期的に動き、notify/2 はリクエストに対し非同期的に動く。 これらはGenServercallcastによく似ている

Registry events

  • registryにEventManagerの仕組みを実装する。まずは test/kv/registry_test.exsに期待する動作を実装する。
  # イベントハンドラ、EventMangaerから通知を受け取った時に動く動作を書く。
  # Forwarderモジュールはイベントを受け取ったら親にイベントを送り返すだけのハンドラ
  defmodule Forwarder do
    use GenEvent

    def handle_event(event, parent) do
      send parent, event
      {:ok, parent}
    end
  end

  # setup時にadd_mon_handler/3でForwarderをセットする形に変更
  setup do
    {:ok, manager} = GenEvent.start_link
    {:ok, registry} = KV.Registry.start_link(manager)

    GenEvent.add_mon_handler(manager, Forwarder, self())
    {:ok, registry: registry}
  end

  # ハンドラーのテストケースを追加
  test "sends events on create and crash", %{registry: registry} do

    # bucketをcreateした時に子プロセスからイベントを受け取っているか。
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    # ^ は bucketのpidとマッチさせるために使っている
    assert_receive {:create, "shopping", ^bucket}

    # bucketが破棄された時に子プロセスからイベントを受け取っているか。
    Agent.stop(bucket)
    assert_receive {:exit, "shopping", ^bucket}
  end
  • 次にこのテストを通すための実装をlib/kv/registry.exに追加・変更する
  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(event_manager, opts \\ []) do
    # GenEvent.start_linkで取得したEventMangerを受け取れるように引数を追加する
    GenServer.start_link(__MODULE__, event_manager, opts)
  end

  ## Server callbacks

  def init(events) do
    # callbackの引数の数が増えたのでstateをtupleからmapに
    names = HashDict.new
    refs  = HashDict.new
    {:ok, %{names: names, refs: refs, events: events}}
  end

  def handle_call({:lookup, name}, _from, state) do
   # initでstateがmapになったので対応 
   {:reply, HashDict.fetch(state.names, name), state}
  end

  def handle_cast({:create, name}, state) do
   # initでstateがmapになったので対応 
    if HashDict.get(state.names, name) do
      {:noreply, state}
    else
      {:ok, pid} = KV.Bucket.start_link()
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      names = HashDict.put(state.names, name, pid)
      # EventManagerにCreateイベントを通知する
      GenEvent.sync_notify(state.events, {:create, name, pid})
      # stateを更新
      {:noreply, %{state | names: names, refs: refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    {name, refs} = HashDict.pop(state.refs, ref)
    names = HashDict.delete(state.names, name)
    # EventManagerにExitイベントを通知する
    GenEvent.sync_notify(state.events, {:exit, name, pid})
    # stateを更新
    {:noreply, %{state | names: names, refs: refs}}
  end

Event streams

  • 最後にStreamを扱うGenEventについて紹介する
  • EventManagerを作成し、spwan_linkでGenEvent.stream/1を呼び続けるプロセスを作成する
  • notifyでメッセージを作成したEventManagerに送るとコールバックで渡したメッセージが出力されるという例
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}

iex> spawn_link fn ->
...>   for x <- GenEvent.stream(manager), do: IO.inspect(x)
...> end
#PID<0.97.0>
iex> GenEvent.notify(manager, {:hello, :world})
{:hello, :world}
:ok