もやもやエンジニア

IT系のネタで思ったことや技術系のネタを備忘録的に綴っていきます。フロント率高め。

Elixir 入門 ~ MIX AND OTP ~ その3 - GenServer

  • Bucketのプロセスを監視するプロセスをGenServerを使って作成する
  • GenServerはElixirとOTPでサーバー機能を実装するための抽象化された仕組みである。

Our first GenServer

  • GenServerは Client APIServer Callback の2つの部品が実装される。
  • ClientとServerはそれぞれ別のプロセスで動いていて、Clientを通過したメッセージはServerのCallback関数に渡される
  • lib/kv/registry.ex を作成してGenServerを実装してみる。各関数の意味はコード内にコメントで書いておく。
defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(opts \\ []) do
    # 3つの引数をpassingする新しいGenServerをスタートする
    # arg1 は server callbackが実装されているモジュールで `__MODULE__` は現在のモジュールを指す
    # arg2 は 初期設定でこの場合はatom
    # arg3 は オプションのリスト
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    # call/2 は serverからresponseが返るrequest
    # serverに渡す命令は tupleにして先頭をserverへの命令を意味するatom をつけることが多い
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  Ensures there is a bucket associated to the given `name` in `server`.
  """
  def create(server, name) do
    # cast/2 はserverからresponseが帰らないrequest
    GenServer.cast(server, {:create, name})
  end

  ## Server Callbacks

  def init(:ok) do
    # initはstart_link/3のコールバックで引数を受け取ってstateを返している。この場合はHashDict
    {:ok, HashDict.new}
  end

  def handle_call({:lookup, name}, _from, names) do
    # handle_callはcallのコールバック関数で先頭のatomでパターンマッチをかけているので`:lookup`が命令になる
    # 引数の先頭のtupleはrequestから受け取った値
    # _fromはリクエスト元の情報
    # namesは現在のサーバーの情報
    
    # callのReturnのフォーマットは下記の通りで、先頭のatomは`:reply`、Clientに返す値、Serverの情報となる
    {:reply, HashDict.fetch(names, name), names}
  end

  def handle_cast({:create, name}, names) do
    # handle_castはcastのコールバック関数でcallと同様に先頭のatomのパターンマッチで命令を判別する

    # 引数の先頭のtupleはrequestから受け取った値
    # namesは現在のサーバーの情報

    # castのReturnのフォーマットは下記の通りで、先頭のatomは`:noreply`、Serverの情報となる
    if HashDict.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link()
      {:noreply, HashDict.put(names, name, bucket)}
    end
  end
end
  • コールバックのフォーマットは他にもあり、先頭のatomで動きが変わる。詳しくはDocsのGenServer

Testing a GenServer

  • Agentとは別にGenServerのテストを書く。
defmodule KV.RegistryTest do
  use ExUnit.Case, async: true

  setup do
    # レジストリを起動
    {:ok, registry} = KV.Registry.start_link
    {:ok, registry: registry}
  end

  test "spawns buckets", %{registry: registry} do
    # 未登録のBucketはエラーとなること
    assert KV.Registry.lookup(registry, "shopping") == :error

    # Bucketを新しく登録してBucketを取得できること
    KV.Registry.create(registry, "shopping")
    assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # 登録したBucketにKey-Valueを登録できること
    KV.Bucket.put(bucket, "milk", 1)
    assert KV.Bucket.get(bucket, "milk") == 1
  end
end
  • テストの中でstart_linkでレジストリのプロセスが起動するが、テストが完了したタイミングで:shutdownを受け取るので明示的に書く必要はない。
  • プロセスを止めるテストを作るにはGenServerのコールバックを追加してやる。
  @doc """
  Stops the registry.
  """
  def stop(server) do
    GenServer.call(server, :stop)
  end

  ## Server Callbacks

  def handle_call(:stop, _from, state) do
    {:stop, :normal, :ok, state}
  end
  • "spawns buckets" テストで最後にレジストリを止めるテストを追加
    # レジストリが正しく止まること
    assert KV.Registry.stop(registry) == :ok

The need for monitoring

  • Bucketのプロセスが止まったりクラッシュした場合を考える。次の失敗するテストのコードをテストに追加し、走らせてみる。
  test "removes buckets on exit", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    Agent.stop(bucket)
    assert KV.Registry.lookup(registry, "shopping") == :error
  end
  • 取り出そうとするBucketのプロセスはすでに終了しているのでエラーとなる。これを解消するためにはレジストリはすべてのBucektのプロセスを監視する必要がある。
  • レジストリはBucketが終了した場合、その通知を受けとっているので、そのコールバックでHashDictを掃除してやればよい。レジストリのコールバックを以下のように書き換えてみる
 ## Server callbacks

  def init(:ok) do
    # 単に新しいHashDictを返していたところをnamesとrefsのtupleを返すようにする
    names = HashDict.new
    refs  = HashDict.new
    {:ok, {names, refs}}
  end

  def handle_call({:lookup, name}, _from, {names, _} = state) do
    {:reply, HashDict.fetch(names, name), state}
  end

  def handle_call(:stop, _from, state) do
    {:stop, :normal, :ok, state}
  end

  # namesとrefsを受け取るように修正
  def handle_cast({:create, name}, {names, refs}) do
    if HashDict.has_key?(names, name) do
      {:noreply, {names, refs}}
    else
      {:ok, pid} = KV.Bucket.start_link()
      # refsに作成したBucketのPIDを登録する
      ref = Process.monitor(pid)
      refs = HashDict.put(refs, ref, name)
      names = HashDict.put(names, name, pid)
      {:noreply, {names, refs}}
    end
  end

   # bucketのプロセスがダウンした時のコールバック
  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    # refsから対応するBucketの名前を取り出しつつ削除
    {name, refs} = HashDict.pop(refs, ref)
    # namesから削除
    names = HashDict.delete(names, name)
    {:noreply, {names, refs}}
  end

  # すべてのイベントをキャッチするコールバック
  def handle_info(_msg, state) do
    # 特に何もせず、stateだけを回す
    {:noreply, state}
  end

call, cast or info?

  • レジストリの例で call、cast、infoの3種類のコールバックを使った。それぞれどのような場合に使い分けるか考えてみる。
  • handle_call/3
    • これは同期的なリクエストの時に使うべきで、サーバからの応答を待つ基本的なコールバックである。
  • handle_cast/2
    • これは非同期なリクエストの時に使うべきでレスポンスが不要な場合に使う。
  • handle_info/2
    • 上記以外の一般的なメッセージを受け取る時に使う。
    • send/2を経て送られたあらゆるメッセージがこのコールバックにたどり着く。catch-allをしかけなくてもunhandleなメッセージはここで捉えることができる。
    • なのでhandle_call/3handle_cast/2 はそれほど悩む必要がない。GenServer APIを経てリクエストされるだけのなので、仮に未知のメッセージが送られた場合は開発者の間違いに起因するものだろう。