Elixir 入門 ~ MIX AND OTP ~ その5 - ETS
- キャッシュ機構である
Erlang Term Storageについて学ぶ。
ETS as a cache
- ETSは
etsモジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected])
8211
iex(2)> :ets.insert(table, {"foo", self})
true
iex(3)> :ets.lookup(table, "foo")
[{"foo", #PID<0.59.0>}]
- ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では
:buckets_registryというテーブルにテーブルの型とアクセスルールを与えている。:setはキーの重複を許さないという意味、:protectedはテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。 [:set, :protected]はデフォルト値なので指定しなかった場合は自動で設定される。lib/kv/registry.exを開いてETSを組み込んでみよう
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link(table, event_manager, buckets, opts \\ []) do
# 1. ETSのテーブルのプロセスをstart_linkに追加
GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `table`.
Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
"""
def lookup(table, name) do
# 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す
case :ets.lookup(table, name) do
[{^name, bucket}] -> {:ok, bucket}
[] -> :error
end
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
def init({table, events, buckets}) do
# 3. HashDictからETS tableに変更
ets = :ets.new(table, [:named_table, read_concurrency: true])
refs = HashDict.new
{:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
# 4. handle_call Callbackは消しておく
def handle_cast({:create, name}, state) do
# 5. HashDictの代わりにETS Tableに対して Read/writeする
case lookup(state.names, name) do
{:ok, _pid} ->
{:noreply, state}
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
:ets.insert(state.names, {name, pid})
GenEvent.sync_notify(state.events, {:create, name, pid})
{:noreply, %{state | refs: refs}}
end
end
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
# 6. BucketのプロセスがダウンしたらETS Tableから消す
{name, refs} = HashDict.pop(state.refs, ref)
:ets.delete(state.names, name)
GenEvent.sync_notify(state.events, {:exit, name, pid})
{:noreply, %{state | refs: refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end
- この修正でテストも壊れるので直しておく
setup do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry, ets: :registry_table}
end
- 各テストの引数を
%{registry: registry, ets: ets}とし、setupで追加したテーブル名を受け取れるようにしておく。 - データを取り出す先が
ETSになったので、KV.Registry.lookup(registry, ...)をKV.Registry.lookup(ets, ...)に変更。 - まだ失敗する。。。
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")ここで落ちる。- lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
- 再び
lib/kv/registry.exを修正する。
def create(server, name) do
# cast から call に
GenServer.call(server, {:create, name})
end
# handle_callに。callになったので _fromを追加。使わないけど
def handle_call({:create, name}, _from, state) do
case lookup(state.names, name) do
{:ok, pid} ->
# returnが必要なのでbucketのpidを返す。
{:reply, pid, state} # Reply with pid
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
:ets.insert(state.names, {name, pid})
GenEvent.sync_notify(state.events, {:create, name, pid})
# returnが必要なのでbucket のpidを返す。
{:reply, pid, %{state | refs: refs}} # Reply with pid
end
end
- これで
mix testを実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。 - 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
- さきほどと違い、プロセスの停止のイベントを
handle_infoで拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Managerの通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。
assert KV.Registry.lookup(ets, "shopping") == :error
end
- これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので
liv/kv/supervisor.exを編集してETSのプロセスを追加する。
@manager_name KV.EventManager
@registry_name KV.Registry
@ets_registry_name KV.Registry
@bucket_sup_name KV.Bucket.Supervisor
def init(:ok) do
children = [
worker(GenEvent, [[name: @manager_name]]),
supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
worker(KV.Registry, [@ets_registry_name, @manager_name,
@bucket_sup_name, [name: @registry_name]])
]
supervise(children, strategy: :one_for_one)
end
ETS as persistent storage
- これまでに
ETSのプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETSのプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registryのプロセスが終了すれば自動的にETSのプロセスも終了する。 では、
ETSをregistryのプロセスから切り離したらどうなるか。bucketの情報がETSに保存されているのであればregistryが死んだとしてもETSからbucketのプロセスをregistryのsupervisorに復帰させることができる。liv/kv/supervisor.exのinitを以下のように編集する。
def init(:ok) do
# init時にetsを開始する。
ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}])
# テーブル名ではなくetsのプロセスを渡す。
children = [
worker(GenEvent, [[name: @manager_name]]),
supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]])
]
supervise(children, strategy: :one_for_one)
end
- 合わせて
lib/kv/registry.exのinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do
refs = HashDict.new
{:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
- 最後にテストを直す。setup時に
ETSを開始して開始済みのプロセスをregistryに渡してやるようにする。ここまででmix testをして通ることを確認する。
setup do
ets = :ets.new(:registry_table, [:set, :public])
registry = start_registry(ets)
{:ok, registry: registry, ets: ets}
end
defp start_registry(ets) do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(ets, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
registry
end
- では最後のテストケースを追加する。
ETSのプロセスをregistryから分離したことでregistryが死んでもETSは死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do
# とりあえずbucketを作成
bucket = KV.Registry.create(registry, "shopping")
# registryのプロセスを終了する
Process.unlink(registry)
Process.exit(registry, :shutdown)
# 生き残っているetsを使ってregisryを起動しなおす
start_registry(ets)
# 最初に作成したbucketが取れることを確認
assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}
# bucketのプロセスを終了
Process.exit(bucket, :shutdown)
# ETSからの削除されていることを確認
assert_receive {:exit, "shopping", ^bucket}
assert KV.Registry.lookup(ets, "shopping") == :error
end
- これを通すための修正を
lib/kv/registry.exに入れる。
def init({ets, events, buckets}) do
# initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す
refs = :ets.foldl(fn {name, pid}, acc ->
HashDict.put(acc, Process.monitor(pid), name)
end, HashDict.new, ets)
{:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end