- 複数のhandlerにイベントをpublishできるGenEventについて学ぶ。
Event managers
iex -S mix
でセッションを新しく初め、GenEvent API
を触ってみる。
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> GenEvent.sync_notify(manager, :hello)
:ok
iex> GenEvent.notify(manager, :world)
:ok
sync_notify
とnotify
でmanagerにメッセージを送信しているが、handlerが実装されていないのでpublishが成功したという意味の:ok
が返るだけで特になにもおきない。
- 次に通知を受け取るhandlerを作成する.
iex> defmodule Forwarder do
...> use GenEvent
...> def handle_event(event, parent) do
...> send parent, event
...> {:ok, parent}
...> end
...> end
iex> GenEvent.add_handler(manager, Forwarder, self())
:ok
iex> GenEvent.sync_notify(manager, {:hello, :world})
:ok
iex> flush
{:hello, :world}
:ok
- Forwarderというハンドラを作成し、
GenEvent.add_handler/3
で今のプロセスで動くhandlerとして追加した。
- Forwarderは単に受け取ったメッセージを親プロセスに送り返すだけのhandlerなのでflushでメッセージを解放してやると送り返されていることがわかる。
sync_notify/2
はリクエストに対し同期的に動き、notify/2
はリクエストに対し非同期的に動く。 これらはGenServer
のcall
とcast
によく似ている
Registry events
- registryにEventManagerの仕組みを実装する。まずは
test/kv/registry_test.exs
に期待する動作を実装する。
# イベントハンドラ、EventMangaerから通知を受け取った時に動く動作を書く。
# Forwarderモジュールはイベントを受け取ったら親にイベントを送り返すだけのハンドラ
defmodule Forwarder do
use GenEvent
def handle_event(event, parent) do
send parent, event
{:ok, parent}
end
end
# setup時にadd_mon_handler/3でForwarderをセットする形に変更
setup do
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(manager)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry}
end
# ハンドラーのテストケースを追加
test "sends events on create and crash", %{registry: registry} do
# bucketをcreateした時に子プロセスからイベントを受け取っているか。
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
# ^ は bucketのpidとマッチさせるために使っている
assert_receive {:create, "shopping", ^bucket}
# bucketが破棄された時に子プロセスからイベントを受け取っているか。
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket}
end
- 次にこのテストを通すための実装を
lib/kv/registry.ex
に追加・変更する
## Client API
@doc """
Starts the registry.
"""
def start_link(event_manager, opts \\ []) do
# GenEvent.start_linkで取得したEventMangerを受け取れるように引数を追加する
GenServer.start_link(__MODULE__, event_manager, opts)
end
## Server callbacks
def init(events) do
# callbackの引数の数が増えたのでstateをtupleからmapに
names = HashDict.new
refs = HashDict.new
{:ok, %{names: names, refs: refs, events: events}}
end
def handle_call({:lookup, name}, _from, state) do
# initでstateがmapになったので対応
{:reply, HashDict.fetch(state.names, name), state}
end
def handle_cast({:create, name}, state) do
# initでstateがmapになったので対応
if HashDict.get(state.names, name) do
{:noreply, state}
else
{:ok, pid} = KV.Bucket.start_link()
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
names = HashDict.put(state.names, name, pid)
# EventManagerにCreateイベントを通知する
GenEvent.sync_notify(state.events, {:create, name, pid})
# stateを更新
{:noreply, %{state | names: names, refs: refs}}
end
end
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
{name, refs} = HashDict.pop(state.refs, ref)
names = HashDict.delete(state.names, name)
# EventManagerにExitイベントを通知する
GenEvent.sync_notify(state.events, {:exit, name, pid})
# stateを更新
{:noreply, %{state | names: names, refs: refs}}
end
Event streams
- 最後にStreamを扱うGenEventについて紹介する
- EventManagerを作成し、spwan_linkで
GenEvent.stream/1
を呼び続けるプロセスを作成する
- notifyでメッセージを作成したEventManagerに送るとコールバックで渡したメッセージが出力されるという例
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> spawn_link fn ->
...> for x <- GenEvent.stream(manager), do: IO.inspect(x)
...> end
#PID<0.97.0>
iex> GenEvent.notify(manager, {:hello, :world})
{:hello, :world}
:ok