もやもやエンジニア

IT系のネタで思ったことや技術系のネタを備忘録的に綴っていきます。フロント率高め。

Elixir 入門 ~ MIX AND OTP ~ その6 - Dependencies and umbrella projects

  • KVアプリケーションは完成したので次にそれを実装するTCPサーバーを作成する。その前にMixDependencyについて理解する。

External dependencies

  • 外部依存はビジネスドメインに縛られない。例えば今のKVアプリケーションのためにHTTP APIが必要であれば外部依存するパッケージとしてPlugを使う。
  • パッケージはHex Package Manager に登録されている。依存関係は mix.exsdeps関数に定義する。
def deps do
  # 0.5.x で最新のバージョンを取得
  [{:plug, "~> 0.5.0"}]
end
  • Hexで管理されているパッケージは安定版だが、開発中のものを使いたいときはgit経由でも書くことができる。
def deps do
  [{:plug, git: "git://github.com/elixir-lang/plug.git"}]
end
  • 依存パッケージを追加してコンパイルするとMixmix.lockを生成する。これは依存関係にあるパッケージのどのバージョンを使ったかが記録されている。

Internal dependencies

  • 内部依存はプロジェクト特有のもので、プロジェクト外では意味をなさずプライベートにしておきたいもので、内部依存のアプリケーションを使いたい場合は、git経由か、umbrella projectsという方法がある。umbrella projectsを使うと一つのプロジェクトの中に複数のアプリケーションをホストでき、単一のリポジトリで管理できるので通常はこちらを使う方が管理しやすい。これまでに作ったKVアプリケーションを含んだプロジェクト kv_umbrella を作成する。

Umbrella projects

  • 複数のアプリケーションを管理する kv_umbrella を新しく作成する。mix newのときに--umbrellaオプションを付与する。
mix new kv_umbrella --umbrella
  • 実行するとコンソールに表示される内容が --umbrellaなしでmix newしたときと変わっているはずだ。mix.exsのコードもまた変わっている。--umbrellaオプションを指定したことでこのアプリケーションがumbrellaとして動作するようになった。
  • 次にkv_umbrella/apps/に移動してkv_serverアプリケーションを作成する。こちらは作成の際に--supオプションをつける。これはMixが自動でsupervision treeを生成してくれるようになる。
mix new kv_server --module KVServer --sup
  • KVServerのmix.exsを見てみよう。
defmodule KVServer.Mixfile do
  use Mix.Project

  def project do
    [app: :kv_server,
     version: "0.0.1",
     deps_path: "../../deps",
     lockfile: "../../mix.lock",
     elixir: "~> 1.0",
     deps: deps]
  end

  def application do
    [applications: [:logger],
     mod: {KVServer, []}]
  end

  defp deps do
    []
  end
end
  • deps_pathlockfileが追加されているが、これはappsの下でmix newをしたことでMixが自動で全体の構造を解釈して追加している。このアプリケーションが外部依存するパッケージはkv_umbrellaと共有されることになった。
  • また、application/0の中も変わって、mod: {KVServer, []}が追加されている。これは--supフラグを追加したことによるもので、KVServerが全体のアプリケーションのsupervision treeを開始することを意味する。
  • lib/kv_server.exの中を見ると前の章で作成したsupervisorの実装が自動で組み込まれていることがわかる。あとはこれに監督されるアプリケーションを追加していく。

In umbrella dependencies

  • apps/kv_server/mix.exsを開いてkvを追加する。
defp deps do
  # kvがkv_serverの中で使えるようになる
  [{:kv, in_umbrella: true}]
end
  • depsは依存関係を解決するだけなのでKVアプリケーションが開始されるわけではない。applicaiton/0に追加してKVアプリケーションが開始されるようにする。
def application do
  # kv_serverが開始されれば`kv`も開始されるようになる
  [applications: [:logger, :kv],
   mod: {KVServer, []}]
end
  • 次にappsの下に今まで作成したKVアプリケーションを移動する。KVのmix.exsもKVServerと同様に以下のコードを追加しておく。
deps_path: "../../deps",
lockfile: "../../mix.lock",
  • ここまで出来たらkv_umbrellaのrootに移動してmix testを実行してみる。2つのアプリケーションのテストが走るはず。
  • このようにUmbrella Projectはアプリケーションを管理するのに便利である。管理されるアプリケーションは切り離されていて個別の設定が定義できる。個別に開発することもできるし、リストを明示的にして一緒に使うこともできる。

Elixir 入門 ~ MIX AND OTP ~ その5 - ETS

  • キャッシュ機構であるErlang Term Storageについて学ぶ。

ETS as a cache

  • ETSは etsモジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected])
8211
iex(2)> :ets.insert(table, {"foo", self})
true
iex(3)> :ets.lookup(table, "foo")
[{"foo", #PID<0.59.0>}]
  • ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では:buckets_registryというテーブルにテーブルの型とアクセスルールを与えている。:setはキーの重複を許さないという意味、:protected はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。
  • [:set, :protected] はデフォルト値なので指定しなかった場合は自動で設定される。
  • lib/kv/registry.exを開いてETSを組み込んでみよう
defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(table, event_manager, buckets, opts \\ []) do
    # 1. ETSのテーブルのプロセスをstart_linkに追加
    GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `table`.

  Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
  """
  def lookup(table, name) do
    # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す
    case :ets.lookup(table, name) do
      [{^name, bucket}] -> {:ok, bucket}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  def init({table, events, buckets}) do
    # 3. HashDictからETS tableに変更
    ets  = :ets.new(table, [:named_table, read_concurrency: true])
    refs = HashDict.new
    {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
  end

  # 4. handle_call Callbackは消しておく

  def handle_cast({:create, name}, state) do
    # 5. HashDictの代わりにETS Tableに対して Read/writeする
    case lookup(state.names, name) do
      {:ok, _pid} ->
        {:noreply, state}
      :error ->
        {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
        ref = Process.monitor(pid)
        refs = HashDict.put(state.refs, ref, name)
        :ets.insert(state.names, {name, pid})
        GenEvent.sync_notify(state.events, {:create, name, pid})
        {:noreply, %{state | refs: refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    # 6. BucketのプロセスがダウンしたらETS Tableから消す
    {name, refs} = HashDict.pop(state.refs, ref)
    :ets.delete(state.names, name)
    GenEvent.sync_notify(state.events, {:exit, name, pid})
    {:noreply, %{state | refs: refs}}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end
end
  • この修正でテストも壊れるので直しておく
setup do
  {:ok, sup} = KV.Bucket.Supervisor.start_link
  {:ok, manager} = GenEvent.start_link
  {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)

  GenEvent.add_mon_handler(manager, Forwarder, self())
  {:ok, registry: registry, ets: :registry_table}
end
  • 各テストの引数を %{registry: registry, ets: ets}とし、setupで追加したテーブル名を受け取れるようにしておく。
  • データを取り出す先が ETS になったので、KV.Registry.lookup(registry, ...)KV.Registry.lookup(ets, ...)に変更。
  • まだ失敗する。。。 assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping") ここで落ちる。
    • lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
  • 再び lib/kv/registry.ex を修正する。
def create(server, name) do
  # cast から call に
  GenServer.call(server, {:create, name})
end

# handle_callに。callになったので _fromを追加。使わないけど
def handle_call({:create, name}, _from, state) do
  case lookup(state.names, name) do
    {:ok, pid} ->
      # returnが必要なのでbucketのpidを返す。
      {:reply, pid, state} # Reply with pid
    :error ->
      {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      :ets.insert(state.names, {name, pid})
      GenEvent.sync_notify(state.events, {:create, name, pid})
      # returnが必要なのでbucket のpidを返す。
      {:reply, pid, %{state | refs: refs}} # Reply with pid
  end
end
  • これでmix testを実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。
  • 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
  • さきほどと違い、プロセスの停止のイベントをhandle_infoで拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Managerの通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
  Agent.stop(bucket)
  assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。
  assert KV.Registry.lookup(ets, "shopping") == :error
end
  • これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので liv/kv/supervisor.exを編集してETSのプロセスを追加する。
@manager_name KV.EventManager
@registry_name KV.Registry
@ets_registry_name KV.Registry
@bucket_sup_name KV.Bucket.Supervisor

def init(:ok) do
  children = [
    worker(GenEvent, [[name: @manager_name]]),
    supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
    worker(KV.Registry, [@ets_registry_name, @manager_name,
                         @bucket_sup_name, [name: @registry_name]])
  ]

  supervise(children, strategy: :one_for_one)
end

ETS as persistent storage

  • これまでに ETSのプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETSのプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registryのプロセスが終了すれば自動的にETSのプロセスも終了する。
  • では、ETSregistryのプロセスから切り離したらどうなるか。bucketの情報がETSに保存されているのであればregistryが死んだとしてもETSからbucketのプロセスをregistrysupervisorに復帰させることができる。

  • liv/kv/supervisor.ex のinitを以下のように編集する。

  def init(:ok) do
    # init時にetsを開始する。
    ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}])

    # テーブル名ではなくetsのプロセスを渡す。
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
      worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]])
    ]

    supervise(children, strategy: :one_for_one)
  end
  • 合わせてlib/kv/registry.ex のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do
  refs = HashDict.new
  {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
  • 最後にテストを直す。setup時にETSを開始して開始済みのプロセスをregistryに渡してやるようにする。ここまででmix testをして通ることを確認する。
setup do
  ets = :ets.new(:registry_table, [:set, :public])
  registry = start_registry(ets)
  {:ok, registry: registry, ets: ets}
end

defp start_registry(ets) do
  {:ok, sup} = KV.Bucket.Supervisor.start_link
  {:ok, manager} = GenEvent.start_link
  {:ok, registry} = KV.Registry.start_link(ets, manager, sup)

  GenEvent.add_mon_handler(manager, Forwarder, self())
  registry
end
  • では最後のテストケースを追加する。ETSのプロセスをregistryから分離したことでregistryが死んでもETSは死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do
  # とりあえずbucketを作成
  bucket = KV.Registry.create(registry, "shopping")

  # registryのプロセスを終了する
  Process.unlink(registry)
  Process.exit(registry, :shutdown)

  # 生き残っているetsを使ってregisryを起動しなおす
  start_registry(ets)

  # 最初に作成したbucketが取れることを確認
  assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}

  # bucketのプロセスを終了
  Process.exit(bucket, :shutdown)

  # ETSからの削除されていることを確認
  assert_receive {:exit, "shopping", ^bucket}
  assert KV.Registry.lookup(ets, "shopping") == :error
end
  • これを通すための修正をlib/kv/registry.ex に入れる。
def init({ets, events, buckets}) do
  # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す
  refs = :ets.foldl(fn {name, pid}, acc ->
    HashDict.put(acc, Process.monitor(pid), name)
  end, HashDict.new, ets)

  {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end

Elixir 入門 ~ MIX AND OTP ~ その4 - Supervisor and Application

  • この章ではElixirのポリシーである“fail fast” と “let it crash”を実現しているsupervisorについて学ぶ。

Our first supervisor

  • Supervisor behaviour を使ってsupervisorを実装したモジュール lib/kv/supervisor.ex を作成する。
defmodule KV.Supervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, :ok)
  end

  # attributeで子プロセス名を定義
  @manager_name KV.EventManager
  @registry_name KV.Registry

  # GenServerと同様にstart_linkで呼ばれるコールバック
  def init(:ok) do
    # `supervisor`は`event manager` と`registry`の2つの子プロセスを管理している。
    # 子プロセスは他のプロセスがpidを知らなくてもアクセスできるように別名をつけておく。例えば子プロセスがクラッシュして立ち上げ直した場合にpidは変わるが別名をつけておけばそのままアクセスできる
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      worker(KV.Registry, [@manager_name, [name: @registry_name]])
    ]

    # strategyは `:one_for_one` を指定してプロセスの管理を開始。この戦略はもし、子プロセスが死んだら新しいプロセスを作るというもの。
    supervise(children, strategy: :one_for_one)
  end
end
  • ファイルを作成したらiex -S mixで確認する
iex> KV.Supervisor.start_link
{:ok, #PID<0.100.0>}
iex> KV.Registry.create(KV.Registry, "shopping")
:ok
iex> KV.Registry.lookup(KV.Registry, "shopping")
{:ok, #PID<0.104.0>}
  • supervisorを開始すると同時に event managerregistry のプロセスが同時に開始された。なのでsuoervisor.start_linkのあとにbucketのcreateやlookupができている。

Understanding applications

  • いま、.appファイルは_build/dev/lib/kv/ebin/kv.ap に見つけることができる。
  • このファイルはErlangシンタックスで書かれており、ErlangカーネルElixir自身、mix.exsで定義した依存関係にあるモジュールやそのバージョンなどの情報が含まれている。
  • mix.exsapplication/0で返された値を使って.appをカスタマイズすることもできる。

Starting applications

  • iex -S mix run --no-start を使ってアプリケーションの開始・停止を手動で行ってみる。
iex(1)> Application.start(:kv)
:ok
iex(2)> Application.stop(:kv)
:ok

14:19:09.629 [info]  Application kv exited: :stopped
iex(3)> Application.stop(:logger)
:ok

=INFO REPORT==== 14-Sep-2015::14:19:16 ===
    application: logger
    exited: stopped
    type: temporary
iex(4)> Application.start(:kv)
{:error, {:not_started, :logger}}
  • 最初のstartはiex起動時に --no-startオプションがないと二重に起動しているという旨のエラーが返る。
  • 4行目のstartが失敗するのは依存関係にあるloggerが3行目で止められているから。このような場合は Application.ensure_all_started/1を使えば丸ごと立ち上げることができる。

The application callback

  • アプリケーションは起動時にcallbackを指定できる。callbackの戻り値は {:ok, pid}で、pidはsupervisorのpidでなければいけない。
  • callbackを実装するためにmix.exsを開いて以下のように編集する。
def application do
  [applications: [],
   mod: {KV, []}]
end
  • :modオプションがcallback時に起動するモジュールを指しており、そのモジュールは Application behaviour を実装していなければいけない。:modKV を指定したので次に KVApplication behaviour を実装する。lib/kv.exを開き、以下のように編集する。
defmodule KV do
  use Application

  # Application behaviourを実装するためには start/2 を実装する必要がある。
  def start(_type, _args) do
    KV.Supervisor.start_link
  end

  #  stop/1 を定義して停止の際にカスタムを入れることもできる
end
  • ここまでできたら iex -S mix を起動する。startですでにsupervisorのプロセスが開始されているのでKV.Supervisor.start_link をたたかなくてもすでにbucketの操作ができるようになっているはず。

Projects or applications?

  • MixはprojectApplication を区別する。mix.exsの内容に基づき、我々は :kv applicationを定義したMix Project を持っていると言える。あとの章ではどのような Applicationも定義しない Projectがでてくる。
  • チュートリアルの中で Project について話すときはMixについて考えるべきである。MixProjectを管理するツールで、どのようにProjectを編集し、テストし、関連した Applicationを開始するかを知っている。
  • ApplicationではOTPについて考えるべきである。Applicationはランタイムで開始して終了するエンティティで mix help compile.appdef application のオプションについて学ぶことができる。

Simple one for one supervisors

  • bucketのプロセスとregistryのプロセスはリンクしていてbucketのクラッシュはregistryのクラッシュと同意義である。registryがクラッシュした場合はsupervisorregistryを復活させることが保証されているが、bucketはすべてなくなってしまう。bucketがクラッシュしてもregistryは生き続けるようにしてみよう。
  • 次のテストを追加する。
  test "removes bucket on crash", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Kill the bucket and wait for the notification
    Process.exit(bucket, :shutdown)
    assert_receive {:exit, "shopping", ^bucket}
    assert KV.Registry.lookup(registry, "shopping") == :error
  end
  • “removes bucket on exit” テストと似ているが、Agent.stop/1 の代わりにshutdownの命令を送ってbucketのプロセス自体を破棄している。bucketのプロセスはregistryのプロセスとリンクしているのでこの行為はregistryのプロセスを破棄することに等しく、すなわちテストも失敗する。
  • この問題を解決するために新しい戦略 :simple_one_for_one を持ったsupervisorを定義する。このsupervisorはすべてのbucketを生み出し、管理する役目を負う。lib/kv/bucket/supervisor.exを新しく作成する。
defmodule KV.Bucket.Supervisor do
  use Supervisor

  def start_link(opts \\ []) do
    Supervisor.start_link(__MODULE__, :ok, opts)
  end

  # 受け取ったsupervisorの子プロセスとしてbucketを開始する関数
  # KV.Bucket.start_linkの代わりに呼び出すようになる
  def start_bucket(supervisor) do
    Supervisor.start_child(supervisor, [])
  end

  def init(:ok) do
    # restart: :temporaryはbucketが死んでも自動で再開しないことを明示している。
    # bucketはregistryを通してのみ管理されるようにする = start_bucket をでしか開始されない
    children = [
      worker(KV.Bucket, [], restart: :temporary)
    ]

    supervise(children, strategy: :simple_one_for_one)
  end
end
  • iex -S mixBucket.Supervisorの動きを確認する。
iex(1)> {:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, #PID<0.90.0>}
iex(2)> {:ok, bucket} = KV.Bucket.Supervisor.start_bucket(sup)
{:ok, #PID<0.92.0>}
iex(3)> KV.Bucket.put(bucket, "eggs", 3)
:ok
iex(4)> KV.Bucket.get(bucket, "eggs")
3
  • 次に test/kv/registry_test.exs のsetupを編集して期待する動作を書く
  setup do
    # Bucketのsupervisorを開始(今回追加)
    {:ok, sup} = KV.Bucket.Supervisor.start_link
    # Event Managerを起動
    {:ok, manager} = GenEvent.start_link
    # EventManagerとBucketのsupervisorをレジストリに渡して起動(bucketのsupervisorのpidをargmentsに追加)
    {:ok, registry} = KV.Registry.start_link(manager, sup)

    GenEvent.add_mon_handler(manager, Forwarder, self())
    {:ok, registry: registry}
  end
  • 合わせて KV.Registry をテストが通るように編集
 ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(event_manager, buckets, opts \\ []) do
    # 1 bucketのsupervisorのプロセスを受け取れるようにする
    GenServer.start_link(__MODULE__, {event_manager, buckets}, opts)
  end
  ## Server callbacks

  def init({events, buckets}) do
    names = HashDict.new
    refs  = HashDict.new
    # 2 start_linkで追加したbucketのsupervisorのプロセスをstateに追加する。
    {:ok, %{names: names, refs: refs, events: events, buckets: buckets}}
  end

  def handle_cast({:create, name}, state) do
    if HashDict.get(state.names, name) do
      {:noreply, state}
    else
      # 3 bucketのsupervisor経由で新しいbucketが作成されるようにする
      {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      names = HashDict.put(state.names, name, pid)
      GenEvent.sync_notify(state.events, {:create, name, pid})
      {:noreply, %{state | names: names, refs: refs}}
    end
  end
  • ここまで追加できたらtestを通してみる。正しく動くようになっているはず。

Supervision trees

  • Bucketsupervisorをアプリケーションで使うためには、supervisorを監督するsupervisorを作りsupervision treesを形成する必要がある。lib/kv/supervisor.exを編集する。
  @manager_name KV.EventManager
  @registry_name KV.Registry
  # bucketのsupervisorの別名を追加
  @bucket_sup_name KV.Bucket.Supervisor

  def init(:ok) do
    # childrenにbucketのsupervisorを追加し、registryのworkerのargumentsにbucketのsupervisorのpid(別名)を追加
    # bucketのsupervisorはregistryのworkerより前に定義しなければいけない。
    children = [
      worker(GenEvent, [[name: @manager_name]]),
      supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
      worker(KV.Registry, [@manager_name, @bucket_sup_name, [name: @registry_name]])
    ]

    # この時点での :one_for_one 戦略は正しい。registryがクラッシュしたらbucketのsupervisorも死ぬべきだし、その逆も同じ。
    supervise(children, strategy: :one_for_one)
  end

Elixir 入門 ~ MIX AND OTP ~ その4 - GenEvent

  • 複数のhandlerにイベントをpublishできるGenEventについて学ぶ。

Event managers

  • iex -S mix でセッションを新しく初め、GenEvent API を触ってみる。
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> GenEvent.sync_notify(manager, :hello)
:ok
iex> GenEvent.notify(manager, :world)
:ok
  • sync_notifynotifyでmanagerにメッセージを送信しているが、handlerが実装されていないのでpublishが成功したという意味の:okが返るだけで特になにもおきない。
  • 次に通知を受け取るhandlerを作成する.
iex> defmodule Forwarder do
...>   use GenEvent
...>   def handle_event(event, parent) do
...>     send parent, event
...>     {:ok, parent}
...>   end
...> end
iex> GenEvent.add_handler(manager, Forwarder, self())
:ok
iex> GenEvent.sync_notify(manager, {:hello, :world})
:ok
iex> flush
{:hello, :world}
:ok
  • Forwarderというハンドラを作成し、GenEvent.add_handler/3で今のプロセスで動くhandlerとして追加した。
  • Forwarderは単に受け取ったメッセージを親プロセスに送り返すだけのhandlerなのでflushでメッセージを解放してやると送り返されていることがわかる。
  • sync_notify/2 はリクエストに対し同期的に動き、notify/2 はリクエストに対し非同期的に動く。 これらはGenServercallcastによく似ている

Registry events

  • registryにEventManagerの仕組みを実装する。まずは test/kv/registry_test.exsに期待する動作を実装する。
  # イベントハンドラ、EventMangaerから通知を受け取った時に動く動作を書く。
  # Forwarderモジュールはイベントを受け取ったら親にイベントを送り返すだけのハンドラ
  defmodule Forwarder do
    use GenEvent

    def handle_event(event, parent) do
      send parent, event
      {:ok, parent}
    end
  end

  # setup時にadd_mon_handler/3でForwarderをセットする形に変更
  setup do
    {:ok, manager} = GenEvent.start_link
    {:ok, registry} = KV.Registry.start_link(manager)

    GenEvent.add_mon_handler(manager, Forwarder, self())
    {:ok, registry: registry}
  end

  # ハンドラーのテストケースを追加
  test "sends events on create and crash", %{registry: registry} do

    # bucketをcreateした時に子プロセスからイベントを受け取っているか。
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    # ^ は bucketのpidとマッチさせるために使っている
    assert_receive {:create, "shopping", ^bucket}

    # bucketが破棄された時に子プロセスからイベントを受け取っているか。
    Agent.stop(bucket)
    assert_receive {:exit, "shopping", ^bucket}
  end
  • 次にこのテストを通すための実装をlib/kv/registry.exに追加・変更する
  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(event_manager, opts \\ []) do
    # GenEvent.start_linkで取得したEventMangerを受け取れるように引数を追加する
    GenServer.start_link(__MODULE__, event_manager, opts)
  end

  ## Server callbacks

  def init(events) do
    # callbackの引数の数が増えたのでstateをtupleからmapに
    names = HashDict.new
    refs  = HashDict.new
    {:ok, %{names: names, refs: refs, events: events}}
  end

  def handle_call({:lookup, name}, _from, state) do
   # initでstateがmapになったので対応 
   {:reply, HashDict.fetch(state.names, name), state}
  end

  def handle_cast({:create, name}, state) do
   # initでstateがmapになったので対応 
    if HashDict.get(state.names, name) do
      {:noreply, state}
    else
      {:ok, pid} = KV.Bucket.start_link()
      ref = Process.monitor(pid)
      refs = HashDict.put(state.refs, ref, name)
      names = HashDict.put(state.names, name, pid)
      # EventManagerにCreateイベントを通知する
      GenEvent.sync_notify(state.events, {:create, name, pid})
      # stateを更新
      {:noreply, %{state | names: names, refs: refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
    {name, refs} = HashDict.pop(state.refs, ref)
    names = HashDict.delete(state.names, name)
    # EventManagerにExitイベントを通知する
    GenEvent.sync_notify(state.events, {:exit, name, pid})
    # stateを更新
    {:noreply, %{state | names: names, refs: refs}}
  end

Event streams

  • 最後にStreamを扱うGenEventについて紹介する
  • EventManagerを作成し、spwan_linkでGenEvent.stream/1を呼び続けるプロセスを作成する
  • notifyでメッセージを作成したEventManagerに送るとコールバックで渡したメッセージが出力されるという例
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}

iex> spawn_link fn ->
...>   for x <- GenEvent.stream(manager), do: IO.inspect(x)
...> end
#PID<0.97.0>
iex> GenEvent.notify(manager, {:hello, :world})
{:hello, :world}
:ok

Elixir 入門 ~ MIX AND OTP ~ その3 - GenServer

  • Bucketのプロセスを監視するプロセスをGenServerを使って作成する
  • GenServerはElixirとOTPでサーバー機能を実装するための抽象化された仕組みである。

Our first GenServer

  • GenServerは Client APIServer Callback の2つの部品が実装される。
  • ClientとServerはそれぞれ別のプロセスで動いていて、Clientを通過したメッセージはServerのCallback関数に渡される
  • lib/kv/registry.ex を作成してGenServerを実装してみる。各関数の意味はコード内にコメントで書いておく。
defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(opts \\ []) do
    # 3つの引数をpassingする新しいGenServerをスタートする
    # arg1 は server callbackが実装されているモジュールで `__MODULE__` は現在のモジュールを指す
    # arg2 は 初期設定でこの場合はatom
    # arg3 は オプションのリスト
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    # call/2 は serverからresponseが返るrequest
    # serverに渡す命令は tupleにして先頭をserverへの命令を意味するatom をつけることが多い
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  Ensures there is a bucket associated to the given `name` in `server`.
  """
  def create(server, name) do
    # cast/2 はserverからresponseが帰らないrequest
    GenServer.cast(server, {:create, name})
  end

  ## Server Callbacks

  def init(:ok) do
    # initはstart_link/3のコールバックで引数を受け取ってstateを返している。この場合はHashDict
    {:ok, HashDict.new}
  end

  def handle_call({:lookup, name}, _from, names) do
    # handle_callはcallのコールバック関数で先頭のatomでパターンマッチをかけているので`:lookup`が命令になる
    # 引数の先頭のtupleはrequestから受け取った値
    # _fromはリクエスト元の情報
    # namesは現在のサーバーの情報
    
    # callのReturnのフォーマットは下記の通りで、先頭のatomは`:reply`、Clientに返す値、Serverの情報となる
    {:reply, HashDict.fetch(names, name), names}
  end

  def handle_cast({:create, name}, names) do
    # handle_castはcastのコールバック関数でcallと同様に先頭のatomのパターンマッチで命令を判別する

    # 引数の先頭のtupleはrequestから受け取った値
    # namesは現在のサーバーの情報

    # castのReturnのフォーマットは下記の通りで、先頭のatomは`:noreply`、Serverの情報となる
    if HashDict.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link()
      {:noreply, HashDict.put(names, name, bucket)}
    end
  end
end
  • コールバックのフォーマットは他にもあり、先頭のatomで動きが変わる。詳しくはDocsのGenServer

Testing a GenServer

  • Agentとは別にGenServerのテストを書く。
defmodule KV.RegistryTest do
  use ExUnit.Case, async: true

  setup do
    # レジストリを起動
    {:ok, registry} = KV.Registry.start_link
    {:ok, registry: registry}
  end

  test "spawns buckets", %{registry: registry} do
    # 未登録のBucketはエラーとなること
    assert KV.Registry.lookup(registry, "shopping") == :error

    # Bucketを新しく登録してBucketを取得できること
    KV.Registry.create(registry, "shopping")
    assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # 登録したBucketにKey-Valueを登録できること
    KV.Bucket.put(bucket, "milk", 1)
    assert KV.Bucket.get(bucket, "milk") == 1
  end
end
  • テストの中でstart_linkでレジストリのプロセスが起動するが、テストが完了したタイミングで:shutdownを受け取るので明示的に書く必要はない。
  • プロセスを止めるテストを作るにはGenServerのコールバックを追加してやる。
  @doc """
  Stops the registry.
  """
  def stop(server) do
    GenServer.call(server, :stop)
  end

  ## Server Callbacks

  def handle_call(:stop, _from, state) do
    {:stop, :normal, :ok, state}
  end
  • "spawns buckets" テストで最後にレジストリを止めるテストを追加
    # レジストリが正しく止まること
    assert KV.Registry.stop(registry) == :ok

The need for monitoring

  • Bucketのプロセスが止まったりクラッシュした場合を考える。次の失敗するテストのコードをテストに追加し、走らせてみる。
  test "removes buckets on exit", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    Agent.stop(bucket)
    assert KV.Registry.lookup(registry, "shopping") == :error
  end
  • 取り出そうとするBucketのプロセスはすでに終了しているのでエラーとなる。これを解消するためにはレジストリはすべてのBucektのプロセスを監視する必要がある。
  • レジストリはBucketが終了した場合、その通知を受けとっているので、そのコールバックでHashDictを掃除してやればよい。レジストリのコールバックを以下のように書き換えてみる
 ## Server callbacks

  def init(:ok) do
    # 単に新しいHashDictを返していたところをnamesとrefsのtupleを返すようにする
    names = HashDict.new
    refs  = HashDict.new
    {:ok, {names, refs}}
  end

  def handle_call({:lookup, name}, _from, {names, _} = state) do
    {:reply, HashDict.fetch(names, name), state}
  end

  def handle_call(:stop, _from, state) do
    {:stop, :normal, :ok, state}
  end

  # namesとrefsを受け取るように修正
  def handle_cast({:create, name}, {names, refs}) do
    if HashDict.has_key?(names, name) do
      {:noreply, {names, refs}}
    else
      {:ok, pid} = KV.Bucket.start_link()
      # refsに作成したBucketのPIDを登録する
      ref = Process.monitor(pid)
      refs = HashDict.put(refs, ref, name)
      names = HashDict.put(names, name, pid)
      {:noreply, {names, refs}}
    end
  end

   # bucketのプロセスがダウンした時のコールバック
  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    # refsから対応するBucketの名前を取り出しつつ削除
    {name, refs} = HashDict.pop(refs, ref)
    # namesから削除
    names = HashDict.delete(names, name)
    {:noreply, {names, refs}}
  end

  # すべてのイベントをキャッチするコールバック
  def handle_info(_msg, state) do
    # 特に何もせず、stateだけを回す
    {:noreply, state}
  end

call, cast or info?

  • レジストリの例で call、cast、infoの3種類のコールバックを使った。それぞれどのような場合に使い分けるか考えてみる。
  • handle_call/3
    • これは同期的なリクエストの時に使うべきで、サーバからの応答を待つ基本的なコールバックである。
  • handle_cast/2
    • これは非同期なリクエストの時に使うべきでレスポンスが不要な場合に使う。
  • handle_info/2
    • 上記以外の一般的なメッセージを受け取る時に使う。
    • send/2を経て送られたあらゆるメッセージがこのコールバックにたどり着く。catch-allをしかけなくてもunhandleなメッセージはここで捉えることができる。
    • なのでhandle_call/3handle_cast/2 はそれほど悩む必要がない。GenServer APIを経てリクエストされるだけのなので、仮に未知のメッセージが送られた場合は開発者の間違いに起因するものだろう。

Elixir 入門 ~ MIX AND OTP ~ その2 - Agent

  • この章では KV.Bucket というモジュールを作成する。このモジュールはkey-value形式でデータを保存でき、複数のプロセスからの読み書きをできるように実装する。

The trouble with state

  • Elixirはshared nothingでimmutableな言語である。もし状態を持つ何かを作成して複数のプロセスが保存したり読み込んだりしたい場合、Getting StartではProcessを永続化する例をみた。ElixirはErlang OTPを使うことでstateの管理を抽象化したモジュールを提供している。チュートリアルではこれらを一つずつ見ていく。
    • Agent
    • GenServer
    • GenEvent
    • Task

Agents

  • Agentはシンプルにstateを管理することができ、単純にプロセスでstateを持ち続けたいだけであればとても使いやすい。iexを起動して試してみる。
# kv ディレクトリの下で
$ iex -S mix
# start_linkで空のListを持ったagentのプロセスが開始される
iex(1)> {:ok, agent} = Agent.start_link fn -> [] end
{:ok, #PID<0.94.0>}

# AgentのPIDを指定して中で持っているlistの先頭に"eggs"を追加する
iex(2)> Agent.update(agent, fn list -> ["eggs"|list] end)
:ok

# 取り出すと追加した "eggs" が返る
iex(3)> Agent.get(agent, fn list -> list end)
["eggs"]

# stopに渡すとAgentのプロセスは終了する
iex(4)> Agent.stop(agent)
:ok

# プロセスは死んでいるのでListは取り出せない
iex(5)> Agent.get(agent, fn list -> list end)
** (exit) exited in: GenServer.call(#PID<0.94.0>, {:get, #Function<6.54118792/1 in :erl_eval.expr/5>}, 5000)
    ** (EXIT) no process
    (elixir) lib/gen_server.ex:356: GenServer.call/3
  • 次にAgentを KV.Bucket に組み込んでいく。その前にtest/kv/bucket_test.exs を作成してBucketのテストを書く。
  • Bucketの機能としてはKeyを指定してBucketから値を取り出す機能とBucketにKeyを指定してValueを保存する機能を実装するので、それを検証するテストを書けばよい。
defmodule KV.BucketTest do

  # asyncオプションは他のテストと並行で動くという指定。
  # ファイルへの書き込みやDBへの書き込みなど、競合する可能性がある場合は指定しない
  use ExUnit.Case, async: true

  test "stores values by key" do
    {:ok, bucket} = KV.Bucket.start_link

    # 存在しないkeyを指定したらnilが返ること
    assert KV.Bucket.get(bucket, "milk") == nil

    # keyを指定してvalueを保存して値を取り出せること
    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end
  • 次にAgentを実装したBucketを lib/kv/bucket.ex として作成する
  • Agentに保存する形式はMapではなくHashDictを使う。Mapは大量のKeyを扱う場合にパフォーマンスが悪い。
  • &HashDict.get(&1, key)& をつけることでHashDict.get/3をキャプチャしている。
defmodule KV.Bucket do
  @doc """
  Starts a new bucket.
  """
  def start_link do
    Agent.start_link(fn -> HashDict.new end)
  end

  @doc """
  Gets a value from the `bucket` by `key`.
  """
  def get(bucket, key) do
    Agent.get(bucket, &HashDict.get(&1, key))
  end

  @doc """
  Puts the `value` for the given `key` in the `bucket`.
  """
  def put(bucket, key, value) do
    Agent.update(bucket, &HashDict.put(&1, key, value))
  end
end
  • ここまででデータを保存するBucketとそのテストができているので mix test を走らせてみる。通るようになっているはず。

ExUnit callbacks

  • 次に進む前にExUnitのCallbackについて触れる。Bucketのテストには必ずプロセスが開始されていなければならない。テストの中では {:ok, bucket} = KV.Bucket.start_link の部分だが、これを毎回書くのはしんどいので、ExUnitはテスト前に必ず実行される処理を書くためのsetupというMacroを用意している。テストを以下のように書き直してみる。
defmodule KV.BucketTest do
  # asyncオプションは他のテストと並行で動くという指定。
  # ファイルへの書き込みやDBへの書き込みなど、競合する可能性がある場合は指定しない
  use ExUnit.Case, async: true

  # setup Macroは各テストの前に必ず実行される
  setup do
    {:ok, bucket} = KV.Bucket.start_link
    {:ok, bucket: bucket}
  end

  test "stores values by key", %{bucket: bucket} do
    # 存在しないkeyを指定したらnilが返ること
    assert KV.Bucket.get(bucket, "milk") == nil

    # keyを指定してvalueを保存して値を取り出せること
    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end
  • テストの前にsetupが走り、開始された状態のbucketをテストが受け取るようになった。これでテストごとにstart_linkを呼ぶ必要が無くなった。このように繰り返し必要になる前処理はsetupで書いておくとよい。
  • setup以外のコールバックは ExUnit v1.0.5 Documentation を参照

Other agent actions

  • Bucketには更新以外にも削除の機能が必要なので、BucketのdeleteをAgent.get_and_update/2を使って実装する。
  • HashDict.pop/2を使っているのでkeyを削除すると同時にその時の値が返る。
@doc """
Deletes `key` from `bucket`.

Returns the current value of `key`, if `key` exists.
"""
def delete(bucket, key) do
  Agent.get_and_update(bucket, &HashDict.pop(&1, key))
end
  • テストも書いておく
  test "deletes values key", %{bucket: bucket} do

    # 存在しないキーを削除しようとしたらnilが返ること
    assert KV.Bucket.delete(bucket, "cheese") == nil

    # 値を保存
    KV.Bucket.put(bucket, "cheese", 1)

    # 指定したkeyで値を削除できること。削除の際、今持っている値が返ること
    assert KV.Bucket.delete(bucket, "cheese") == 1
    assert KV.Bucket.get(bucket, "cheese") == nil
  end

Client/Server in agents

  • 次の章に行く前に今書いたBucketの課題について考えてみる。Bucketをサーバーとみたてると、複数クライアントからアクセスが来た場合、仮にkey-valueの操作に時間がかかる場合、クライアントの待ち時間が発生する可能性がある。delete関数を以下のように拡張してみる。
def delete(bucket, key) do
  :timer.sleep(1000) # puts client to sleep
  Agent.get_and_update(bucket, fn dict ->
    :timer.sleep(1000) # puts server to sleep
    HashDict.pop(dict, key)
  end)
end
  • こうするとdeleteのたびに2000milsecの待ち時間が発生し、アクセスの数だけ待ち時間が増え続けることになる。
  • 次の章で GenServers を使い、サーバーとクライアントをより明確に分離してみる。

Elixir 入門 ~ MIX AND OTP ~ その1 - Introduction to Mix

  • 実際にMixとOTPを使ったアプリケーションを作成していく。
  • ↓のようにKey-Valueなデータを扱うアプリを作る
CREATE shopping
OK

PUT shopping milk 1
OK

PUT shopping eggs 3
OK

GET shopping milk
1
OK

DELETE shopping eggs
OK
  • アプリケーションを作るために以下の3つのtoolを使う
    • OTP : Open Telecom Platform といい、Erlangから受け継いでいるライブラリ。フォールトトレトラントなシステムを作るために作成された。
    • Mix : ビルドツール。アプリケーションのコンパイルや依存関係の管理など様々な用途で使う
    • ExUnit : ElixirのUnitTest用

Our first project

  • mix new を使ってアプリケーションを作成していく。
  • mixはmix help でhelpを見ることができる。オンライン上のドキュメントは Mix v1.0.5 Documentation
  • --module オプションは作成されるmodule名を明示的に指定する。指定しないと自動でキャメルケースになる。この場合はKv
$ mix new kv --module KV
  • 実行するとkvディレクトリが作成され、その中にいくつかのファイルが作成される。次に各ファイルを見ていく。

Project compilation

  • mix.exs というファイルが作成されている。これはプロジェクトの設定や依存関係を管理する。
defmodule KV.Mixfile do
  use Mix.Project

  # projectという関数はアプリケーションの名前とバージョンを定義する
  def project do
    [app: :kv,
     version: "0.0.1",
     deps: deps]
  end

  # mix compile 時にここに書かれたアプリケーションが参照されて.appファイルに書き込まれる
  def application do
    [applications: [:logger]]
  end

  # deps は依存関係にあるhexのライブラリ群をListで書く。後述。
  defp deps do
    []
  end
end
$ cd kv
$ mix compile
  • すると_buildディレクトリの下に kv.appが作成される
  • パスは _build/dev/lib/kv/ebin/kv.app
  • デフォルトで登録されているloggerが出力されているのがわかる。他にもバージョンやアプリケーション名なども。詳しくは以降の章で。
{application,kv,
             [{registered,[]},
              {description,"kv"},
              {applications,[kernel,stdlib,elixir,logger]},
              {vsn,"0.0.1"},
              {modules,['Elixir.KV']}]}.% 
  • いったんプロジェクトをコンパイルした後にアプリケーションのルートでオプションをつけてREPLを起動すればアプリケーションのモジュールをロードした状態でREPLが起動する
iex -S mix

Running tests

  • mix new でアプリケーションを作成するとテスト用のモジュールも同時に作成される。テスト用のモジュールはtestディレクトリ配下にlibと同じ構成で作成する。今回は test/kv_test.exsが作成されている。
  • testファイルはコンパイルの必要がないので .exs ファイルで作成される。
  • モジュール名は対象のモジュール名 + Test と定義し、Testing APIを使うためにExUnit.Case を実装し、test/2 macroを使ってテストを書く。
defmodule KVTest do
  use ExUnit.Case

  test "the truth" do
    assert 1 + 1 == 2
  end
end
  • また、testディレクトリに test_helper.exs が作成されている。これはTestFrameworkを柔軟にセットアップするためのもので mix test でテストを走らせる際、必ず必要となる。
  • mix test でテストが走るが、テストの前にアプリケーションは再コンパイルされる。
  • mix test [ファイルパス] で走らせるテストを指定することもできる。
  • kv_test.exs の テストを変更して、testがfailになった場合の表示を確認しておく。
> mix test


  1) test the truth (KVTest)
     test/kv_test.exs:4
     Assertion with == failed
     code: 1 + 1 == 3
     lhs:  2
     rhs:  3
     stacktrace:
       test/kv_test.exs:5



Finished in 0.03 seconds (0.03s on load, 0.00s on tests)
1 tests, 1 failures

Randomized with seed 439788

Environments

  • Mixは環境の考え方をサポートしている
    • :dev : Mix taskがデフォルトで走るときの環境
    • :test : mix test 時の環境
    • :prod : プロダクションにアプリケーションを配置するときの環境
  • 環境ごとに異なる値を使いたいときはMix.Env機能を使う。Mix.envには現在使用している環境がatomで格納されている。
  # 自動生成されたmix.exsを見てみると Mix.env == :prod というコードがある。
  # Mix.env には atom でどの動いている環境が入っていることが分かる。
  def project do
    [app: :kv,
     version: "0.0.1",
     elixir: "~> 1.0",
     build_embedded: Mix.env == :prod,
     start_permanent: Mix.env == :prod,
     deps: deps]
  end