読者です 読者をやめる 読者になる 読者になる

もやもやエンジニア

IT系のネタで思ったことや技術系のネタを備忘録的に綴っていきます。フロント率高め。

Elixir 入門 ~ MIX AND OTP ~ その4 - GenEvent

  • 複数のhandlerにイベントをpublishできるGenEventについて学ぶ。

Event managers

  • iex -S mix でセッションを新しく初め、GenEvent API を触ってみる。
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> GenEvent.sync_notify(manager, :hello)
:ok
iex> GenEvent.notify(manager, :world)
:ok
  • sync_notifynotifyでmanagerにメッセージを送信しているが、handlerが実装されていないのでpublishが成功したという意味の:okが返るだけで特になにもおきない。
  • 次に通知を受け取るhandlerを作成する.
iex> defmodule Forwarder do
...>   use GenEvent
...>   def handle_event(event, parent) do
...>     send parent, event
...>     {:ok, parent}
...>   end
...> end
iex> GenEvent.add_handler(manager, Forwarder, self())
:ok
iex> GenEvent.sync_notify(manager, {:hello, :world})
:ok
iex> flush
{:hello, :world}
:ok
  • Forwarderというハンドラを作成し、GenEvent.add_handler/3で今のプロセスで動くhandlerとして追加した。
  • Forwarderは単に受け取ったメッセージを親プロセスに送り返すだけのhandlerなのでflushでメッセージを解放してやると送り返されていることがわかる。
  • sync_notify/2 はリクエストに対し同期的に動き、notify/2 はリクエストに対し非同期的に動く。 これらはGenServercallcastによく似ている

Registry events

  • registryにEventManagerの仕組みを実装する。まずは test/kv/registry_test.exsに期待する動作を実装する。
  # イベントハンドラ、EventMangaerから通知を受け取った時に動く動作を書く。
  # Forwarderモジュールはイベントを受け取ったら親にイベントを送り返すだけのハンドラ
  defmodule Forwarder do
    use GenEvent

    def handle_event(event, parent) do
      send parent, event
      {:ok, parent}
    end
  end

  # setup時にadd_mon_handler/3でForwarderをセットする形に変更
  setup do
    {:ok, manager} = GenEvent.start_link
    {:ok, registry} = KV.Registry.start_link(manager)

    GenEvent.add_mon_handler(manager, Forwarder, self())
    {:ok, registry: registry}
  end

  # ハンドラーのテストケースを追加
  test "sends events on create and crash", %{registry: registry} do

    # bucketをcreateした時に子プロセスからイベントを受け取っているか。
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    # ^ は bucketのpidとマッチさせるために使っている
    assert_receive {:create, "shopping", ^bucket}

    # bucketが破棄された時に子プロセスからイベントを受け取っているか。
    Agent.stop(bucket)
    assert_receive {:exit, "shopping", ^bucket}
  end
  • 次にこのテストを通すための実装をlib/kv/registry.exに追加・変更する
  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(event_manager, opts \\ []) do
    # GenEvent.start_linkで取得したEventMangerを受け取れるように引数を追加する
    GenServer.start_link(__MODULE__, event_manager, opts)
  end

  ## Server callbacks

  def init(events) do
    # callbackの引数の数が増えたのでstateをtupleからmapに
    names = HashDict.new
    refs  = HashDict.new
    {:ok, %{names: names, refs: refs, events: events}}
  end

  def handle_call({:lookup, name}, _from, state) do
   # initでstateがmapになったので対応 
   {:reply, HashDict.fetch(state.names, name), state}
  end

  def handle_cast({:create, name}, state) do
   # initでstateがmapになったので対応 
    if HashDict.get(state.names, name) do
      {:noreply, state}
    else
      {:ok, pid} = KV.Bucket.start_link()
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      names = HashDict.put(state.names, name, pid)
      # EventManagerにCreateイベントを通知する
      GenEvent.sync_notify(state.events, {:create, name, pid})
      # stateを更新
      {:noreply, %{state | names: names, refs: refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    {name, refs} = HashDict.pop(state.refs, ref)
    names = HashDict.delete(state.names, name)
    # EventManagerにExitイベントを通知する
    GenEvent.sync_notify(state.events, {:exit, name, pid})
    # stateを更新
    {:noreply, %{state | names: names, refs: refs}}
  end

Event streams

  • 最後にStreamを扱うGenEventについて紹介する
  • EventManagerを作成し、spwan_linkでGenEvent.stream/1を呼び続けるプロセスを作成する
  • notifyでメッセージを作成したEventManagerに送るとコールバックで渡したメッセージが出力されるという例
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}

iex> spawn_link fn ->
...>   for x <- GenEvent.stream(manager), do: IO.inspect(x)
...> end
#PID<0.97.0>
iex> GenEvent.notify(manager, {:hello, :world})
{:hello, :world}
:ok