- キャッシュ機構である
Erlang Term Storage
について学ぶ。
ETS as a cache
- ETSは
ets
モジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected]) 8211 iex(2)> :ets.insert(table, {"foo", self}) true iex(3)> :ets.lookup(table, "foo") [{"foo", #PID<0.59.0>}]
- ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では
:buckets_registry
というテーブルにテーブルの型とアクセスルールを与えている。:set
はキーの重複を許さないという意味、:protected
はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。 [:set, :protected]
はデフォルト値なので指定しなかった場合は自動で設定される。lib/kv/registry.ex
を開いてETSを組み込んでみよう
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry. """ def start_link(table, event_manager, buckets, opts \\ []) do # 1. ETSのテーブルのプロセスをstart_linkに追加 GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts) end @doc """ Looks up the bucket pid for `name` stored in `table`. Returns `{:ok, pid}` if a bucket exists, `:error` otherwise. """ def lookup(table, name) do # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す case :ets.lookup(table, name) do [{^name, bucket}] -> {:ok, bucket} [] -> :error end end @doc """ Ensures there is a bucket associated with the given `name` in `server`. """ def create(server, name) do GenServer.cast(server, {:create, name}) end ## Server callbacks def init({table, events, buckets}) do # 3. HashDictからETS tableに変更 ets = :ets.new(table, [:named_table, read_concurrency: true]) refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end # 4. handle_call Callbackは消しておく def handle_cast({:create, name}, state) do # 5. HashDictの代わりにETS Tableに対して Read/writeする case lookup(state.names, name) do {:ok, _pid} -> {:noreply, state} :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) {:noreply, %{state | refs: refs}} end end def handle_info({:DOWN, ref, :process, pid, _reason}, state) do # 6. BucketのプロセスがダウンしたらETS Tableから消す {name, refs} = HashDict.pop(state.refs, ref) :ets.delete(state.names, name) GenEvent.sync_notify(state.events, {:exit, name, pid}) {:noreply, %{state | refs: refs}} end def handle_info(_msg, state) do {:noreply, state} end end
- この修正でテストも壊れるので直しておく
setup do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry, ets: :registry_table} end
- 各テストの引数を
%{registry: registry, ets: ets}
とし、setupで追加したテーブル名を受け取れるようにしておく。 - データを取り出す先が
ETS
になったので、KV.Registry.lookup(registry, ...)
をKV.Registry.lookup(ets, ...)
に変更。 - まだ失敗する。。。
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
ここで落ちる。- lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
- 再び
lib/kv/registry.ex
を修正する。
def create(server, name) do # cast から call に GenServer.call(server, {:create, name}) end # handle_callに。callになったので _fromを追加。使わないけど def handle_call({:create, name}, _from, state) do case lookup(state.names, name) do {:ok, pid} -> # returnが必要なのでbucketのpidを返す。 {:reply, pid, state} # Reply with pid :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) # returnが必要なのでbucket のpidを返す。 {:reply, pid, %{state | refs: refs}} # Reply with pid end end
- これで
mix test
を実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。 - 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
- さきほどと違い、プロセスの停止のイベントを
handle_info
で拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Manager
の通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(ets, "shopping") Agent.stop(bucket) assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。 assert KV.Registry.lookup(ets, "shopping") == :error end
- これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので
liv/kv/supervisor.ex
を編集してETS
のプロセスを追加する。
@manager_name KV.EventManager @registry_name KV.Registry @ets_registry_name KV.Registry @bucket_sup_name KV.Bucket.Supervisor def init(:ok) do children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [@ets_registry_name, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
ETS as persistent storage
- これまでに
ETS
のプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETS
のプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registry
のプロセスが終了すれば自動的にETS
のプロセスも終了する。 では、
ETS
をregistry
のプロセスから切り離したらどうなるか。bucket
の情報がETS
に保存されているのであればregistry
が死んだとしてもETS
からbucket
のプロセスをregistry
のsupervisor
に復帰させることができる。liv/kv/supervisor.ex
のinitを以下のように編集する。
def init(:ok) do # init時にetsを開始する。 ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}]) # テーブル名ではなくetsのプロセスを渡す。 children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
- 合わせて
lib/kv/registry.ex
のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end
- 最後にテストを直す。setup時に
ETS
を開始して開始済みのプロセスをregistry
に渡してやるようにする。ここまででmix test
をして通ることを確認する。
setup do ets = :ets.new(:registry_table, [:set, :public]) registry = start_registry(ets) {:ok, registry: registry, ets: ets} end defp start_registry(ets) do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(ets, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) registry end
- では最後のテストケースを追加する。
ETS
のプロセスをregistry
から分離したことでregistry
が死んでもETS
は死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do # とりあえずbucketを作成 bucket = KV.Registry.create(registry, "shopping") # registryのプロセスを終了する Process.unlink(registry) Process.exit(registry, :shutdown) # 生き残っているetsを使ってregisryを起動しなおす start_registry(ets) # 最初に作成したbucketが取れることを確認 assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket} # bucketのプロセスを終了 Process.exit(bucket, :shutdown) # ETSからの削除されていることを確認 assert_receive {:exit, "shopping", ^bucket} assert KV.Registry.lookup(ets, "shopping") == :error end
- これを通すための修正を
lib/kv/registry.ex
に入れる。
def init({ets, events, buckets}) do # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す refs = :ets.foldl(fn {name, pid}, acc -> HashDict.put(acc, Process.monitor(pid), name) end, HashDict.new, ets) {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end