- Bucketのプロセスを監視するプロセスをGenServerを使って作成する
- GenServerはElixirとOTPでサーバー機能を実装するための抽象化された仕組みである。
Our first GenServer
- GenServerは
Client API
と Server Callback
の2つの部品が実装される。
- ClientとServerはそれぞれ別のプロセスで動いていて、Clientを通過したメッセージはServerのCallback関数に渡される
lib/kv/registry.ex
を作成してGenServerを実装してみる。各関数の意味はコード内にコメントで書いておく。
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link(opts \\ []) do
# 3つの引数をpassingする新しいGenServerをスタートする
# arg1 は server callbackが実装されているモジュールで `__MODULE__` は現在のモジュールを指す
# arg2 は 初期設定でこの場合はatom
# arg3 は オプションのリスト
GenServer.start_link(__MODULE__, :ok, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
"""
def lookup(server, name) do
# call/2 は serverからresponseが返るrequest
# serverに渡す命令は tupleにして先頭をserverへの命令を意味するatom をつけることが多い
GenServer.call(server, {:lookup, name})
end
@doc """
Ensures there is a bucket associated to the given `name` in `server`.
"""
def create(server, name) do
# cast/2 はserverからresponseが帰らないrequest
GenServer.cast(server, {:create, name})
end
## Server Callbacks
def init(:ok) do
# initはstart_link/3のコールバックで引数を受け取ってstateを返している。この場合はHashDict
{:ok, HashDict.new}
end
def handle_call({:lookup, name}, _from, names) do
# handle_callはcallのコールバック関数で先頭のatomでパターンマッチをかけているので`:lookup`が命令になる
# 引数の先頭のtupleはrequestから受け取った値
# _fromはリクエスト元の情報
# namesは現在のサーバーの情報
# callのReturnのフォーマットは下記の通りで、先頭のatomは`:reply`、Clientに返す値、Serverの情報となる
{:reply, HashDict.fetch(names, name), names}
end
def handle_cast({:create, name}, names) do
# handle_castはcastのコールバック関数でcallと同様に先頭のatomのパターンマッチで命令を判別する
# 引数の先頭のtupleはrequestから受け取った値
# namesは現在のサーバーの情報
# castのReturnのフォーマットは下記の通りで、先頭のatomは`:noreply`、Serverの情報となる
if HashDict.has_key?(names, name) do
{:noreply, names}
else
{:ok, bucket} = KV.Bucket.start_link()
{:noreply, HashDict.put(names, name, bucket)}
end
end
end
Testing a GenServer
- Agentとは別にGenServerのテストを書く。
defmodule KV.RegistryTest do
use ExUnit.Case, async: true
setup do
# レジストリを起動
{:ok, registry} = KV.Registry.start_link
{:ok, registry: registry}
end
test "spawns buckets", %{registry: registry} do
# 未登録のBucketはエラーとなること
assert KV.Registry.lookup(registry, "shopping") == :error
# Bucketを新しく登録してBucketを取得できること
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
# 登録したBucketにKey-Valueを登録できること
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
end
- テストの中でstart_linkでレジストリのプロセスが起動するが、テストが完了したタイミングで
:shutdown
を受け取るので明示的に書く必要はない。
- プロセスを止めるテストを作るにはGenServerのコールバックを追加してやる。
@doc """
Stops the registry.
"""
def stop(server) do
GenServer.call(server, :stop)
end
## Server Callbacks
def handle_call(:stop, _from, state) do
{:stop, :normal, :ok, state}
end
- "spawns buckets" テストで最後にレジストリを止めるテストを追加
# レジストリが正しく止まること
assert KV.Registry.stop(registry) == :ok
The need for monitoring
- Bucketのプロセスが止まったりクラッシュした場合を考える。次の失敗するテストのコードをテストに追加し、走らせてみる。
test "removes buckets on exit", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
Agent.stop(bucket)
assert KV.Registry.lookup(registry, "shopping") == :error
end
- 取り出そうとするBucketのプロセスはすでに終了しているのでエラーとなる。これを解消するためにはレジストリはすべてのBucektのプロセスを監視する必要がある。
- レジストリはBucketが終了した場合、その通知を受けとっているので、そのコールバックでHashDictを掃除してやればよい。レジストリのコールバックを以下のように書き換えてみる
## Server callbacks
def init(:ok) do
# 単に新しいHashDictを返していたところをnamesとrefsのtupleを返すようにする
names = HashDict.new
refs = HashDict.new
{:ok, {names, refs}}
end
def handle_call({:lookup, name}, _from, {names, _} = state) do
{:reply, HashDict.fetch(names, name), state}
end
def handle_call(:stop, _from, state) do
{:stop, :normal, :ok, state}
end
# namesとrefsを受け取るように修正
def handle_cast({:create, name}, {names, refs}) do
if HashDict.has_key?(names, name) do
{:noreply, {names, refs}}
else
{:ok, pid} = KV.Bucket.start_link()
# refsに作成したBucketのPIDを登録する
ref = Process.monitor(pid)
refs = HashDict.put(refs, ref, name)
names = HashDict.put(names, name, pid)
{:noreply, {names, refs}}
end
end
# bucketのプロセスがダウンした時のコールバック
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
# refsから対応するBucketの名前を取り出しつつ削除
{name, refs} = HashDict.pop(refs, ref)
# namesから削除
names = HashDict.delete(names, name)
{:noreply, {names, refs}}
end
# すべてのイベントをキャッチするコールバック
def handle_info(_msg, state) do
# 特に何もせず、stateだけを回す
{:noreply, state}
end
call, cast or info?
- レジストリの例で call、cast、infoの3種類のコールバックを使った。それぞれどのような場合に使い分けるか考えてみる。
- handle_call/3
- これは同期的なリクエストの時に使うべきで、サーバからの応答を待つ基本的なコールバックである。
- handle_cast/2
- これは非同期なリクエストの時に使うべきでレスポンスが不要な場合に使う。
- handle_info/2
- 上記以外の一般的なメッセージを受け取る時に使う。
- send/2を経て送られたあらゆるメッセージがこのコールバックにたどり着く。catch-allをしかけなくてもunhandleなメッセージはここで捉えることができる。
- なので
handle_call/3
と handle_cast/2
はそれほど悩む必要がない。GenServer APIを経てリクエストされるだけのなので、仮に未知のメッセージが送られた場合は開発者の間違いに起因するものだろう。