読者です 読者をやめる 読者になる 読者になる

もやもやエンジニア

IT系のネタで思ったことや技術系のネタを備忘録的に綴っていきます。フロント率高め。

Elixir 入門 ~ MIX AND OTP ~ その5 - ETS

Elixir OTP Mix
  • キャッシュ機構であるErlang Term Storageについて学ぶ。

ETS as a cache

  • ETSは etsモジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected])
8211
iex(2)> :ets.insert(table, {"foo", self})
true
iex(3)> :ets.lookup(table, "foo")
[{"foo", #PID<0.59.0>}]
  • ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では:buckets_registryというテーブルにテーブルの型とアクセスルールを与えている。:setはキーの重複を許さないという意味、:protected はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。
  • [:set, :protected] はデフォルト値なので指定しなかった場合は自動で設定される。
  • lib/kv/registry.exを開いてETSを組み込んでみよう
defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(table, event_manager, buckets, opts \\ []) do
    # 1. ETSのテーブルのプロセスをstart_linkに追加
    GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `table`.

  Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
  """
  def lookup(table, name) do
    # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す
    case :ets.lookup(table, name) do
      [{^name, bucket}] -> {:ok, bucket}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  def init({table, events, buckets}) do
    # 3. HashDictからETS tableに変更
    ets  = :ets.new(table, [:named_table, read_concurrency: true])
    refs = HashDict.new
    {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
  end

  # 4. handle_call Callbackは消しておく

  def handle_cast({:create, name}, state) do
    # 5. HashDictの代わりにETS Tableに対して Read/writeする
    case lookup(state.names, name) do
      {:ok, _pid} ->
        {:noreply, state}
      :error ->
        {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
        ref = Process.monitor(pid)
        refs = HashDict.put(state.refs, ref, name)
        :ets.insert(state.names, {name, pid})
        GenEvent.sync_notify(state.events, {:create, name, pid})
        {:noreply, %{state | refs: refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    # 6. BucketのプロセスがダウンしたらETS Tableから消す
    {name, refs} = HashDict.pop(state.refs, ref)
    :ets.delete(state.names, name)
    GenEvent.sync_notify(state.events, {:exit, name, pid})
    {:noreply, %{state | refs: refs}}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end
end
  • この修正でテストも壊れるので直しておく
setup do
  {:ok, sup} = KV.Bucket.Supervisor.start_link
  {:ok, manager} = GenEvent.start_link
  {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)

  GenEvent.add_mon_handler(manager, Forwarder, self())
  {:ok, registry: registry, ets: :registry_table}
end
  • 各テストの引数を %{registry: registry, ets: ets}とし、setupで追加したテーブル名を受け取れるようにしておく。
  • データを取り出す先が ETS になったので、KV.Registry.lookup(registry, ...)KV.Registry.lookup(ets, ...)に変更。
  • まだ失敗する。。。 assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping") ここで落ちる。
    • lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
  • 再び lib/kv/registry.ex を修正する。
def create(server, name) do
  # cast から call に
  GenServer.call(server, {:create, name})
end

# handle_callに。callになったので _fromを追加。使わないけど
def handle_call({:create, name}, _from, state) do
  case lookup(state.names, name) do
    {:ok, pid} ->
      # returnが必要なのでbucketのpidを返す。
      {:reply, pid, state} # Reply with pid
    :error ->
      {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      :ets.insert(state.names, {name, pid})
      GenEvent.sync_notify(state.events, {:create, name, pid})
      # returnが必要なのでbucket のpidを返す。
      {:reply, pid, %{state | refs: refs}} # Reply with pid
  end
end
  • これでmix testを実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。
  • 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
  • さきほどと違い、プロセスの停止のイベントをhandle_infoで拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Managerの通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
  Agent.stop(bucket)
  assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。
  assert KV.Registry.lookup(ets, "shopping") == :error
end
  • これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので liv/kv/supervisor.exを編集してETSのプロセスを追加する。
@manager_name KV.EventManager
@registry_name KV.Registry
@ets_registry_name KV.Registry
@bucket_sup_name KV.Bucket.Supervisor

def init(:ok) do
  children = [
    worker(GenEvent, [[name: @manager_name]]),
    supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
    worker(KV.Registry, [@ets_registry_name, @manager_name,
                         @bucket_sup_name, [name: @registry_name]])
  ]

  supervise(children, strategy: :one_for_one)
end

ETS as persistent storage

  • これまでに ETSのプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETSのプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registryのプロセスが終了すれば自動的にETSのプロセスも終了する。
  • では、ETSregistryのプロセスから切り離したらどうなるか。bucketの情報がETSに保存されているのであればregistryが死んだとしてもETSからbucketのプロセスをregistrysupervisorに復帰させることができる。

  • liv/kv/supervisor.ex のinitを以下のように編集する。

  def init(:ok) do
    # init時にetsを開始する。
    ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}])

    # テーブル名ではなくetsのプロセスを渡す。
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
      worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]])
    ]

    supervise(children, strategy: :one_for_one)
  end
  • 合わせてlib/kv/registry.ex のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do
  refs = HashDict.new
  {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
  • 最後にテストを直す。setup時にETSを開始して開始済みのプロセスをregistryに渡してやるようにする。ここまででmix testをして通ることを確認する。
setup do
  ets = :ets.new(:registry_table, [:set, :public])
  registry = start_registry(ets)
  {:ok, registry: registry, ets: ets}
end

defp start_registry(ets) do
  {:ok, sup} = KV.Bucket.Supervisor.start_link
  {:ok, manager} = GenEvent.start_link
  {:ok, registry} = KV.Registry.start_link(ets, manager, sup)

  GenEvent.add_mon_handler(manager, Forwarder, self())
  registry
end
  • では最後のテストケースを追加する。ETSのプロセスをregistryから分離したことでregistryが死んでもETSは死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do
  # とりあえずbucketを作成
  bucket = KV.Registry.create(registry, "shopping")

  # registryのプロセスを終了する
  Process.unlink(registry)
  Process.exit(registry, :shutdown)

  # 生き残っているetsを使ってregisryを起動しなおす
  start_registry(ets)

  # 最初に作成したbucketが取れることを確認
  assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}

  # bucketのプロセスを終了
  Process.exit(bucket, :shutdown)

  # ETSからの削除されていることを確認
  assert_receive {:exit, "shopping", ^bucket}
  assert KV.Registry.lookup(ets, "shopping") == :error
end
  • これを通すための修正をlib/kv/registry.ex に入れる。
def init({ets, events, buckets}) do
  # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す
  refs = :ets.foldl(fn {name, pid}, acc ->
    HashDict.put(acc, Process.monitor(pid), name)
  end, HashDict.new, ets)

  {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end