Elixir 入門 ~ MIX AND OTP ~ その6 - Dependencies and umbrella projects
- KVアプリケーションは完成したので次にそれを実装するTCPサーバーを作成する。その前に
Mix
のDependency
について理解する。
External dependencies
- 外部依存はビジネスドメインに縛られない。例えば今のKVアプリケーションのためにHTTP APIが必要であれば外部依存するパッケージとして
Plug
を使う。 - パッケージは
Hex Package Manager
に登録されている。依存関係はmix.exs
のdeps
関数に定義する。
def deps do # 0.5.x で最新のバージョンを取得 [{:plug, "~> 0.5.0"}] end
- Hexで管理されているパッケージは安定版だが、開発中のものを使いたいときはgit経由でも書くことができる。
def deps do [{:plug, git: "git://github.com/elixir-lang/plug.git"}] end
- 依存パッケージを追加してコンパイルすると
Mix
はmix.lock
を生成する。これは依存関係にあるパッケージのどのバージョンを使ったかが記録されている。
Internal dependencies
- 内部依存はプロジェクト特有のもので、プロジェクト外では意味をなさずプライベートにしておきたいもので、内部依存のアプリケーションを使いたい場合は、git経由か、
umbrella projects
という方法がある。umbrella projects
を使うと一つのプロジェクトの中に複数のアプリケーションをホストでき、単一のリポジトリで管理できるので通常はこちらを使う方が管理しやすい。これまでに作ったKVアプリケーションを含んだプロジェクトkv_umbrella
を作成する。
Umbrella projects
- 複数のアプリケーションを管理する
kv_umbrella
を新しく作成する。mix new
のときに--umbrella
オプションを付与する。
mix new kv_umbrella --umbrella
- 実行するとコンソールに表示される内容が
--umbrella
なしでmix new
したときと変わっているはずだ。mix.exs
のコードもまた変わっている。--umbrella
オプションを指定したことでこのアプリケーションがumbrella
として動作するようになった。 - 次に
kv_umbrella/apps/
に移動してkv_server
アプリケーションを作成する。こちらは作成の際に--sup
オプションをつける。これはMix
が自動でsupervision tree
を生成してくれるようになる。
mix new kv_server --module KVServer --sup
- KVServerの
mix.exs
を見てみよう。
defmodule KVServer.Mixfile do use Mix.Project def project do [app: :kv_server, version: "0.0.1", deps_path: "../../deps", lockfile: "../../mix.lock", elixir: "~> 1.0", deps: deps] end def application do [applications: [:logger], mod: {KVServer, []}] end defp deps do [] end end
deps_path
とlockfile
が追加されているが、これはappsの下でmix new
をしたことでMix
が自動で全体の構造を解釈して追加している。このアプリケーションが外部依存するパッケージはkv_umbrella
と共有されることになった。- また、
application/0
の中も変わって、mod: {KVServer, []}
が追加されている。これは--sup
フラグを追加したことによるもので、KVServer
が全体のアプリケーションのsupervision tree
を開始することを意味する。 lib/kv_server.ex
の中を見ると前の章で作成したsupervisor
の実装が自動で組み込まれていることがわかる。あとはこれに監督されるアプリケーションを追加していく。
In umbrella dependencies
apps/kv_server/mix.exs
を開いてkv
を追加する。
defp deps do # kvがkv_serverの中で使えるようになる [{:kv, in_umbrella: true}] end
deps
は依存関係を解決するだけなのでKVアプリケーションが開始されるわけではない。applicaiton/0
に追加してKVアプリケーションが開始されるようにする。
def application do # kv_serverが開始されれば`kv`も開始されるようになる [applications: [:logger, :kv], mod: {KVServer, []}] end
- 次に
apps
の下に今まで作成したKVアプリケーションを移動する。KVのmix.exs
もKVServerと同様に以下のコードを追加しておく。
deps_path: "../../deps", lockfile: "../../mix.lock",
- ここまで出来たら
kv_umbrella
のrootに移動してmix test
を実行してみる。2つのアプリケーションのテストが走るはず。 - このように
Umbrella Project
はアプリケーションを管理するのに便利である。管理されるアプリケーションは切り離されていて個別の設定が定義できる。個別に開発することもできるし、リストを明示的にして一緒に使うこともできる。
Elixir 入門 ~ MIX AND OTP ~ その5 - ETS
- キャッシュ機構である
Erlang Term Storage
について学ぶ。
ETS as a cache
- ETSは
ets
モジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected]) 8211 iex(2)> :ets.insert(table, {"foo", self}) true iex(3)> :ets.lookup(table, "foo") [{"foo", #PID<0.59.0>}]
- ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では
:buckets_registry
というテーブルにテーブルの型とアクセスルールを与えている。:set
はキーの重複を許さないという意味、:protected
はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。 [:set, :protected]
はデフォルト値なので指定しなかった場合は自動で設定される。lib/kv/registry.ex
を開いてETSを組み込んでみよう
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry. """ def start_link(table, event_manager, buckets, opts \\ []) do # 1. ETSのテーブルのプロセスをstart_linkに追加 GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts) end @doc """ Looks up the bucket pid for `name` stored in `table`. Returns `{:ok, pid}` if a bucket exists, `:error` otherwise. """ def lookup(table, name) do # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す case :ets.lookup(table, name) do [{^name, bucket}] -> {:ok, bucket} [] -> :error end end @doc """ Ensures there is a bucket associated with the given `name` in `server`. """ def create(server, name) do GenServer.cast(server, {:create, name}) end ## Server callbacks def init({table, events, buckets}) do # 3. HashDictからETS tableに変更 ets = :ets.new(table, [:named_table, read_concurrency: true]) refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end # 4. handle_call Callbackは消しておく def handle_cast({:create, name}, state) do # 5. HashDictの代わりにETS Tableに対して Read/writeする case lookup(state.names, name) do {:ok, _pid} -> {:noreply, state} :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) {:noreply, %{state | refs: refs}} end end def handle_info({:DOWN, ref, :process, pid, _reason}, state) do # 6. BucketのプロセスがダウンしたらETS Tableから消す {name, refs} = HashDict.pop(state.refs, ref) :ets.delete(state.names, name) GenEvent.sync_notify(state.events, {:exit, name, pid}) {:noreply, %{state | refs: refs}} end def handle_info(_msg, state) do {:noreply, state} end end
- この修正でテストも壊れるので直しておく
setup do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry, ets: :registry_table} end
- 各テストの引数を
%{registry: registry, ets: ets}
とし、setupで追加したテーブル名を受け取れるようにしておく。 - データを取り出す先が
ETS
になったので、KV.Registry.lookup(registry, ...)
をKV.Registry.lookup(ets, ...)
に変更。 - まだ失敗する。。。
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
ここで落ちる。- lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
- 再び
lib/kv/registry.ex
を修正する。
def create(server, name) do # cast から call に GenServer.call(server, {:create, name}) end # handle_callに。callになったので _fromを追加。使わないけど def handle_call({:create, name}, _from, state) do case lookup(state.names, name) do {:ok, pid} -> # returnが必要なのでbucketのpidを返す。 {:reply, pid, state} # Reply with pid :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) # returnが必要なのでbucket のpidを返す。 {:reply, pid, %{state | refs: refs}} # Reply with pid end end
- これで
mix test
を実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。 - 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
- さきほどと違い、プロセスの停止のイベントを
handle_info
で拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Manager
の通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(ets, "shopping") Agent.stop(bucket) assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。 assert KV.Registry.lookup(ets, "shopping") == :error end
- これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので
liv/kv/supervisor.ex
を編集してETS
のプロセスを追加する。
@manager_name KV.EventManager @registry_name KV.Registry @ets_registry_name KV.Registry @bucket_sup_name KV.Bucket.Supervisor def init(:ok) do children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [@ets_registry_name, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
ETS as persistent storage
- これまでに
ETS
のプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETS
のプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registry
のプロセスが終了すれば自動的にETS
のプロセスも終了する。 では、
ETS
をregistry
のプロセスから切り離したらどうなるか。bucket
の情報がETS
に保存されているのであればregistry
が死んだとしてもETS
からbucket
のプロセスをregistry
のsupervisor
に復帰させることができる。liv/kv/supervisor.ex
のinitを以下のように編集する。
def init(:ok) do # init時にetsを開始する。 ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}]) # テーブル名ではなくetsのプロセスを渡す。 children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
- 合わせて
lib/kv/registry.ex
のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end
- 最後にテストを直す。setup時に
ETS
を開始して開始済みのプロセスをregistry
に渡してやるようにする。ここまででmix test
をして通ることを確認する。
setup do ets = :ets.new(:registry_table, [:set, :public]) registry = start_registry(ets) {:ok, registry: registry, ets: ets} end defp start_registry(ets) do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(ets, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) registry end
- では最後のテストケースを追加する。
ETS
のプロセスをregistry
から分離したことでregistry
が死んでもETS
は死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do # とりあえずbucketを作成 bucket = KV.Registry.create(registry, "shopping") # registryのプロセスを終了する Process.unlink(registry) Process.exit(registry, :shutdown) # 生き残っているetsを使ってregisryを起動しなおす start_registry(ets) # 最初に作成したbucketが取れることを確認 assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket} # bucketのプロセスを終了 Process.exit(bucket, :shutdown) # ETSからの削除されていることを確認 assert_receive {:exit, "shopping", ^bucket} assert KV.Registry.lookup(ets, "shopping") == :error end
- これを通すための修正を
lib/kv/registry.ex
に入れる。
def init({ets, events, buckets}) do # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す refs = :ets.foldl(fn {name, pid}, acc -> HashDict.put(acc, Process.monitor(pid), name) end, HashDict.new, ets) {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end
Elixir 入門 ~ MIX AND OTP ~ その4 - Supervisor and Application
- この章ではElixirのポリシーである“fail fast” と “let it crash”を実現している
supervisor
について学ぶ。
Our first supervisor
Supervisor behaviour
を使ってsupervisor
を実装したモジュールlib/kv/supervisor.ex
を作成する。
defmodule KV.Supervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, :ok) end # attributeで子プロセス名を定義 @manager_name KV.EventManager @registry_name KV.Registry # GenServerと同様にstart_linkで呼ばれるコールバック def init(:ok) do # `supervisor`は`event manager` と`registry`の2つの子プロセスを管理している。 # 子プロセスは他のプロセスがpidを知らなくてもアクセスできるように別名をつけておく。例えば子プロセスがクラッシュして立ち上げ直した場合にpidは変わるが別名をつけておけばそのままアクセスできる children = [ worker(GenEvent, [[name: @manager_name]]), worker(KV.Registry, [@manager_name, [name: @registry_name]]) ] # strategyは `:one_for_one` を指定してプロセスの管理を開始。この戦略はもし、子プロセスが死んだら新しいプロセスを作るというもの。 supervise(children, strategy: :one_for_one) end end
- ファイルを作成したら
iex -S mix
で確認する
iex> KV.Supervisor.start_link {:ok, #PID<0.100.0>} iex> KV.Registry.create(KV.Registry, "shopping") :ok iex> KV.Registry.lookup(KV.Registry, "shopping") {:ok, #PID<0.104.0>}
supervisor
を開始すると同時にevent manager
とregistry
のプロセスが同時に開始された。なのでsuoervisor.start_link
のあとにbucketのcreateやlookupができている。
Understanding applications
- いま、
.app
ファイルは_build/dev/lib/kv/ebin/kv.ap
に見つけることができる。 - このファイルは
Erlang
のシンタックスで書かれており、Erlang
のカーネルやElixir
自身、mix.exs
で定義した依存関係にあるモジュールやそのバージョンなどの情報が含まれている。 mix.exs
のapplication/0
で返された値を使って.app
をカスタマイズすることもできる。
Starting applications
iex -S mix run --no-start
を使ってアプリケーションの開始・停止を手動で行ってみる。
iex(1)> Application.start(:kv) :ok iex(2)> Application.stop(:kv) :ok 14:19:09.629 [info] Application kv exited: :stopped iex(3)> Application.stop(:logger) :ok =INFO REPORT==== 14-Sep-2015::14:19:16 === application: logger exited: stopped type: temporary iex(4)> Application.start(:kv) {:error, {:not_started, :logger}}
- 最初の
start
はiex起動時に--no-start
オプションがないと二重に起動しているという旨のエラーが返る。 - 4行目の
start
が失敗するのは依存関係にあるlogger
が3行目で止められているから。このような場合はApplication.ensure_all_started/1
を使えば丸ごと立ち上げることができる。
The application callback
- アプリケーションは起動時にcallbackを指定できる。callbackの戻り値は
{:ok, pid}
で、pidはsupervisorのpidでなければいけない。 - callbackを実装するために
mix.exs
を開いて以下のように編集する。
def application do [applications: [], mod: {KV, []}] end
:mod
オプションがcallback時に起動するモジュールを指しており、そのモジュールはApplication behaviour
を実装していなければいけない。:mod
にKV
を指定したので次にKV
にApplication behaviour
を実装する。lib/kv.ex
を開き、以下のように編集する。
defmodule KV do use Application # Application behaviourを実装するためには start/2 を実装する必要がある。 def start(_type, _args) do KV.Supervisor.start_link end # stop/1 を定義して停止の際にカスタムを入れることもできる end
- ここまでできたら
iex -S mix
を起動する。start
ですでにsupervisor
のプロセスが開始されているのでKV.Supervisor.start_link
をたたかなくてもすでにbucket
の操作ができるようになっているはず。
Projects or applications?
- Mixは
project
とApplication
を区別する。mix.exs
の内容に基づき、我々は:kv application
を定義したMix Project
を持っていると言える。あとの章ではどのようなApplication
も定義しないProject
がでてくる。 - チュートリアルの中で
Project
について話すときはMix
について考えるべきである。Mix
はProject
を管理するツールで、どのようにProject
を編集し、テストし、関連したApplication
を開始するかを知っている。 Application
ではOTP
について考えるべきである。Application
はランタイムで開始して終了するエンティティでmix help compile.app
でdef application
のオプションについて学ぶことができる。
Simple one for one supervisors
bucket
のプロセスとregistry
のプロセスはリンクしていてbucket
のクラッシュはregistry
のクラッシュと同意義である。registry
がクラッシュした場合はsupervisor
がregistry
を復活させることが保証されているが、bucket
はすべてなくなってしまう。bucket
がクラッシュしてもregistry
は生き続けるようにしてみよう。- 次のテストを追加する。
test "removes bucket on crash", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # Kill the bucket and wait for the notification Process.exit(bucket, :shutdown) assert_receive {:exit, "shopping", ^bucket} assert KV.Registry.lookup(registry, "shopping") == :error end
- “removes bucket on exit” テストと似ているが、
Agent.stop/1
の代わりにshutdown
の命令を送ってbucket
のプロセス自体を破棄している。bucket
のプロセスはregistry
のプロセスとリンクしているのでこの行為はregistry
のプロセスを破棄することに等しく、すなわちテストも失敗する。 - この問題を解決するために新しい戦略
:simple_one_for_one
を持ったsupervisor
を定義する。このsupervisor
はすべてのbucket
を生み出し、管理する役目を負う。lib/kv/bucket/supervisor.ex
を新しく作成する。
defmodule KV.Bucket.Supervisor do use Supervisor def start_link(opts \\ []) do Supervisor.start_link(__MODULE__, :ok, opts) end # 受け取ったsupervisorの子プロセスとしてbucketを開始する関数 # KV.Bucket.start_linkの代わりに呼び出すようになる def start_bucket(supervisor) do Supervisor.start_child(supervisor, []) end def init(:ok) do # restart: :temporaryはbucketが死んでも自動で再開しないことを明示している。 # bucketはregistryを通してのみ管理されるようにする = start_bucket をでしか開始されない children = [ worker(KV.Bucket, [], restart: :temporary) ] supervise(children, strategy: :simple_one_for_one) end end
iex -S mix
でBucket.Supervisor
の動きを確認する。
iex(1)> {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, #PID<0.90.0>} iex(2)> {:ok, bucket} = KV.Bucket.Supervisor.start_bucket(sup) {:ok, #PID<0.92.0>} iex(3)> KV.Bucket.put(bucket, "eggs", 3) :ok iex(4)> KV.Bucket.get(bucket, "eggs") 3
- 次に
test/kv/registry_test.exs
のsetupを編集して期待する動作を書く
setup do # Bucketのsupervisorを開始(今回追加) {:ok, sup} = KV.Bucket.Supervisor.start_link # Event Managerを起動 {:ok, manager} = GenEvent.start_link # EventManagerとBucketのsupervisorをレジストリに渡して起動(bucketのsupervisorのpidをargmentsに追加) {:ok, registry} = KV.Registry.start_link(manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry} end
- 合わせて
KV.Registry
をテストが通るように編集
## Client API @doc """ Starts the registry. """ def start_link(event_manager, buckets, opts \\ []) do # 1 bucketのsupervisorのプロセスを受け取れるようにする GenServer.start_link(__MODULE__, {event_manager, buckets}, opts) end
## Server callbacks def init({events, buckets}) do names = HashDict.new refs = HashDict.new # 2 start_linkで追加したbucketのsupervisorのプロセスをstateに追加する。 {:ok, %{names: names, refs: refs, events: events, buckets: buckets}} end def handle_cast({:create, name}, state) do if HashDict.get(state.names, name) do {:noreply, state} else # 3 bucketのsupervisor経由で新しいbucketが作成されるようにする {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) names = HashDict.put(state.names, name, pid) GenEvent.sync_notify(state.events, {:create, name, pid}) {:noreply, %{state | names: names, refs: refs}} end end
- ここまで追加できたらtestを通してみる。正しく動くようになっているはず。
Supervision trees
Bucket
のsupervisor
をアプリケーションで使うためには、supervisor
を監督するsupervisor
を作りsupervision trees
を形成する必要がある。lib/kv/supervisor.ex
を編集する。
@manager_name KV.EventManager @registry_name KV.Registry # bucketのsupervisorの別名を追加 @bucket_sup_name KV.Bucket.Supervisor def init(:ok) do # childrenにbucketのsupervisorを追加し、registryのworkerのargumentsにbucketのsupervisorのpid(別名)を追加 # bucketのsupervisorはregistryのworkerより前に定義しなければいけない。 children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [@manager_name, @bucket_sup_name, [name: @registry_name]]) ] # この時点での :one_for_one 戦略は正しい。registryがクラッシュしたらbucketのsupervisorも死ぬべきだし、その逆も同じ。 supervise(children, strategy: :one_for_one) end
Elixir 入門 ~ MIX AND OTP ~ その4 - GenEvent
- 複数のhandlerにイベントをpublishできるGenEventについて学ぶ。
Event managers
iex -S mix
でセッションを新しく初め、GenEvent API
を触ってみる。
iex> {:ok, manager} = GenEvent.start_link {:ok, #PID<0.83.0>} iex> GenEvent.sync_notify(manager, :hello) :ok iex> GenEvent.notify(manager, :world) :ok
sync_notify
とnotify
でmanagerにメッセージを送信しているが、handlerが実装されていないのでpublishが成功したという意味の:ok
が返るだけで特になにもおきない。- 次に通知を受け取るhandlerを作成する.
iex> defmodule Forwarder do ...> use GenEvent ...> def handle_event(event, parent) do ...> send parent, event ...> {:ok, parent} ...> end ...> end iex> GenEvent.add_handler(manager, Forwarder, self()) :ok iex> GenEvent.sync_notify(manager, {:hello, :world}) :ok iex> flush {:hello, :world} :ok
- Forwarderというハンドラを作成し、
GenEvent.add_handler/3
で今のプロセスで動くhandlerとして追加した。 - Forwarderは単に受け取ったメッセージを親プロセスに送り返すだけのhandlerなのでflushでメッセージを解放してやると送り返されていることがわかる。
sync_notify/2
はリクエストに対し同期的に動き、notify/2
はリクエストに対し非同期的に動く。 これらはGenServer
のcall
とcast
によく似ている
Registry events
- registryにEventManagerの仕組みを実装する。まずは
test/kv/registry_test.exs
に期待する動作を実装する。
# イベントハンドラ、EventMangaerから通知を受け取った時に動く動作を書く。 # Forwarderモジュールはイベントを受け取ったら親にイベントを送り返すだけのハンドラ defmodule Forwarder do use GenEvent def handle_event(event, parent) do send parent, event {:ok, parent} end end # setup時にadd_mon_handler/3でForwarderをセットする形に変更 setup do {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(manager) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry} end # ハンドラーのテストケースを追加 test "sends events on create and crash", %{registry: registry} do # bucketをcreateした時に子プロセスからイベントを受け取っているか。 KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # ^ は bucketのpidとマッチさせるために使っている assert_receive {:create, "shopping", ^bucket} # bucketが破棄された時に子プロセスからイベントを受け取っているか。 Agent.stop(bucket) assert_receive {:exit, "shopping", ^bucket} end
- 次にこのテストを通すための実装を
lib/kv/registry.ex
に追加・変更する
## Client API @doc """ Starts the registry. """ def start_link(event_manager, opts \\ []) do # GenEvent.start_linkで取得したEventMangerを受け取れるように引数を追加する GenServer.start_link(__MODULE__, event_manager, opts) end ## Server callbacks def init(events) do # callbackの引数の数が増えたのでstateをtupleからmapに names = HashDict.new refs = HashDict.new {:ok, %{names: names, refs: refs, events: events}} end def handle_call({:lookup, name}, _from, state) do # initでstateがmapになったので対応 {:reply, HashDict.fetch(state.names, name), state} end def handle_cast({:create, name}, state) do # initでstateがmapになったので対応 if HashDict.get(state.names, name) do {:noreply, state} else {:ok, pid} = KV.Bucket.start_link() ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) names = HashDict.put(state.names, name, pid) # EventManagerにCreateイベントを通知する GenEvent.sync_notify(state.events, {:create, name, pid}) # stateを更新 {:noreply, %{state | names: names, refs: refs}} end end def handle_info({:DOWN, ref, :process, pid, _reason}, state) do {name, refs} = HashDict.pop(state.refs, ref) names = HashDict.delete(state.names, name) # EventManagerにExitイベントを通知する GenEvent.sync_notify(state.events, {:exit, name, pid}) # stateを更新 {:noreply, %{state | names: names, refs: refs}} end
Event streams
- 最後にStreamを扱うGenEventについて紹介する
- EventManagerを作成し、spwan_linkで
GenEvent.stream/1
を呼び続けるプロセスを作成する - notifyでメッセージを作成したEventManagerに送るとコールバックで渡したメッセージが出力されるという例
iex> {:ok, manager} = GenEvent.start_link {:ok, #PID<0.83.0>} iex> spawn_link fn -> ...> for x <- GenEvent.stream(manager), do: IO.inspect(x) ...> end #PID<0.97.0> iex> GenEvent.notify(manager, {:hello, :world}) {:hello, :world} :ok
Elixir 入門 ~ MIX AND OTP ~ その3 - GenServer
- Bucketのプロセスを監視するプロセスをGenServerを使って作成する
- GenServerはElixirとOTPでサーバー機能を実装するための抽象化された仕組みである。
Our first GenServer
- GenServerは
Client API
とServer Callback
の2つの部品が実装される。 - ClientとServerはそれぞれ別のプロセスで動いていて、Clientを通過したメッセージはServerのCallback関数に渡される
lib/kv/registry.ex
を作成してGenServerを実装してみる。各関数の意味はコード内にコメントで書いておく。
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry. """ def start_link(opts \\ []) do # 3つの引数をpassingする新しいGenServerをスタートする # arg1 は server callbackが実装されているモジュールで `__MODULE__` は現在のモジュールを指す # arg2 は 初期設定でこの場合はatom # arg3 は オプションのリスト GenServer.start_link(__MODULE__, :ok, opts) end @doc """ Looks up the bucket pid for `name` stored in `server`. Returns `{:ok, pid}` if the bucket exists, `:error` otherwise. """ def lookup(server, name) do # call/2 は serverからresponseが返るrequest # serverに渡す命令は tupleにして先頭をserverへの命令を意味するatom をつけることが多い GenServer.call(server, {:lookup, name}) end @doc """ Ensures there is a bucket associated to the given `name` in `server`. """ def create(server, name) do # cast/2 はserverからresponseが帰らないrequest GenServer.cast(server, {:create, name}) end ## Server Callbacks def init(:ok) do # initはstart_link/3のコールバックで引数を受け取ってstateを返している。この場合はHashDict {:ok, HashDict.new} end def handle_call({:lookup, name}, _from, names) do # handle_callはcallのコールバック関数で先頭のatomでパターンマッチをかけているので`:lookup`が命令になる # 引数の先頭のtupleはrequestから受け取った値 # _fromはリクエスト元の情報 # namesは現在のサーバーの情報 # callのReturnのフォーマットは下記の通りで、先頭のatomは`:reply`、Clientに返す値、Serverの情報となる {:reply, HashDict.fetch(names, name), names} end def handle_cast({:create, name}, names) do # handle_castはcastのコールバック関数でcallと同様に先頭のatomのパターンマッチで命令を判別する # 引数の先頭のtupleはrequestから受け取った値 # namesは現在のサーバーの情報 # castのReturnのフォーマットは下記の通りで、先頭のatomは`:noreply`、Serverの情報となる if HashDict.has_key?(names, name) do {:noreply, names} else {:ok, bucket} = KV.Bucket.start_link() {:noreply, HashDict.put(names, name, bucket)} end end end
- コールバックのフォーマットは他にもあり、先頭のatomで動きが変わる。詳しくはDocsのGenServerを
Testing a GenServer
- Agentとは別にGenServerのテストを書く。
defmodule KV.RegistryTest do use ExUnit.Case, async: true setup do # レジストリを起動 {:ok, registry} = KV.Registry.start_link {:ok, registry: registry} end test "spawns buckets", %{registry: registry} do # 未登録のBucketはエラーとなること assert KV.Registry.lookup(registry, "shopping") == :error # Bucketを新しく登録してBucketを取得できること KV.Registry.create(registry, "shopping") assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # 登録したBucketにKey-Valueを登録できること KV.Bucket.put(bucket, "milk", 1) assert KV.Bucket.get(bucket, "milk") == 1 end end
- テストの中でstart_linkでレジストリのプロセスが起動するが、テストが完了したタイミングで
:shutdown
を受け取るので明示的に書く必要はない。 - プロセスを止めるテストを作るにはGenServerのコールバックを追加してやる。
@doc """ Stops the registry. """ def stop(server) do GenServer.call(server, :stop) end ## Server Callbacks def handle_call(:stop, _from, state) do {:stop, :normal, :ok, state} end
- "spawns buckets" テストで最後にレジストリを止めるテストを追加
# レジストリが正しく止まること assert KV.Registry.stop(registry) == :ok
The need for monitoring
- Bucketのプロセスが止まったりクラッシュした場合を考える。次の失敗するテストのコードをテストに追加し、走らせてみる。
test "removes buckets on exit", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") Agent.stop(bucket) assert KV.Registry.lookup(registry, "shopping") == :error end
- 取り出そうとするBucketのプロセスはすでに終了しているのでエラーとなる。これを解消するためにはレジストリはすべてのBucektのプロセスを監視する必要がある。
- レジストリはBucketが終了した場合、その通知を受けとっているので、そのコールバックでHashDictを掃除してやればよい。レジストリのコールバックを以下のように書き換えてみる
## Server callbacks def init(:ok) do # 単に新しいHashDictを返していたところをnamesとrefsのtupleを返すようにする names = HashDict.new refs = HashDict.new {:ok, {names, refs}} end def handle_call({:lookup, name}, _from, {names, _} = state) do {:reply, HashDict.fetch(names, name), state} end def handle_call(:stop, _from, state) do {:stop, :normal, :ok, state} end # namesとrefsを受け取るように修正 def handle_cast({:create, name}, {names, refs}) do if HashDict.has_key?(names, name) do {:noreply, {names, refs}} else {:ok, pid} = KV.Bucket.start_link() # refsに作成したBucketのPIDを登録する ref = Process.monitor(pid) refs = HashDict.put(refs, ref, name) names = HashDict.put(names, name, pid) {:noreply, {names, refs}} end end # bucketのプロセスがダウンした時のコールバック def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do # refsから対応するBucketの名前を取り出しつつ削除 {name, refs} = HashDict.pop(refs, ref) # namesから削除 names = HashDict.delete(names, name) {:noreply, {names, refs}} end # すべてのイベントをキャッチするコールバック def handle_info(_msg, state) do # 特に何もせず、stateだけを回す {:noreply, state} end
call, cast or info?
- レジストリの例で call、cast、infoの3種類のコールバックを使った。それぞれどのような場合に使い分けるか考えてみる。
- handle_call/3
- これは同期的なリクエストの時に使うべきで、サーバからの応答を待つ基本的なコールバックである。
- handle_cast/2
- これは非同期なリクエストの時に使うべきでレスポンスが不要な場合に使う。
- handle_info/2
- 上記以外の一般的なメッセージを受け取る時に使う。
- send/2を経て送られたあらゆるメッセージがこのコールバックにたどり着く。catch-allをしかけなくてもunhandleなメッセージはここで捉えることができる。
- なので
handle_call/3
とhandle_cast/2
はそれほど悩む必要がない。GenServer APIを経てリクエストされるだけのなので、仮に未知のメッセージが送られた場合は開発者の間違いに起因するものだろう。
Elixir 入門 ~ MIX AND OTP ~ その2 - Agent
- この章では
KV.Bucket
というモジュールを作成する。このモジュールはkey-value形式でデータを保存でき、複数のプロセスからの読み書きをできるように実装する。
The trouble with state
- Elixirはshared nothingでimmutableな言語である。もし状態を持つ何かを作成して複数のプロセスが保存したり読み込んだりしたい場合、Getting StartではProcessを永続化する例をみた。ElixirはErlang OTPを使うことでstateの管理を抽象化したモジュールを提供している。チュートリアルではこれらを一つずつ見ていく。
- Agent
- GenServer
- GenEvent
- Task
Agents
- Agentはシンプルにstateを管理することができ、単純にプロセスでstateを持ち続けたいだけであればとても使いやすい。iexを起動して試してみる。
# kv ディレクトリの下で $ iex -S mix
# start_linkで空のListを持ったagentのプロセスが開始される iex(1)> {:ok, agent} = Agent.start_link fn -> [] end {:ok, #PID<0.94.0>} # AgentのPIDを指定して中で持っているlistの先頭に"eggs"を追加する iex(2)> Agent.update(agent, fn list -> ["eggs"|list] end) :ok # 取り出すと追加した "eggs" が返る iex(3)> Agent.get(agent, fn list -> list end) ["eggs"] # stopに渡すとAgentのプロセスは終了する iex(4)> Agent.stop(agent) :ok # プロセスは死んでいるのでListは取り出せない iex(5)> Agent.get(agent, fn list -> list end) ** (exit) exited in: GenServer.call(#PID<0.94.0>, {:get, #Function<6.54118792/1 in :erl_eval.expr/5>}, 5000) ** (EXIT) no process (elixir) lib/gen_server.ex:356: GenServer.call/3
- 次にAgentを
KV.Bucket
に組み込んでいく。その前にtest/kv/bucket_test.exs
を作成してBucketのテストを書く。 - Bucketの機能としてはKeyを指定してBucketから値を取り出す機能とBucketにKeyを指定してValueを保存する機能を実装するので、それを検証するテストを書けばよい。
defmodule KV.BucketTest do # asyncオプションは他のテストと並行で動くという指定。 # ファイルへの書き込みやDBへの書き込みなど、競合する可能性がある場合は指定しない use ExUnit.Case, async: true test "stores values by key" do {:ok, bucket} = KV.Bucket.start_link # 存在しないkeyを指定したらnilが返ること assert KV.Bucket.get(bucket, "milk") == nil # keyを指定してvalueを保存して値を取り出せること KV.Bucket.put(bucket, "milk", 3) assert KV.Bucket.get(bucket, "milk") == 3 end end
- 次にAgentを実装したBucketを
lib/kv/bucket.ex
として作成する - Agentに保存する形式はMapではなくHashDictを使う。Mapは大量のKeyを扱う場合にパフォーマンスが悪い。
&HashDict.get(&1, key)
は&
をつけることでHashDict.get/3をキャプチャしている。
defmodule KV.Bucket do @doc """ Starts a new bucket. """ def start_link do Agent.start_link(fn -> HashDict.new end) end @doc """ Gets a value from the `bucket` by `key`. """ def get(bucket, key) do Agent.get(bucket, &HashDict.get(&1, key)) end @doc """ Puts the `value` for the given `key` in the `bucket`. """ def put(bucket, key, value) do Agent.update(bucket, &HashDict.put(&1, key, value)) end end
- ここまででデータを保存するBucketとそのテストができているので
mix test
を走らせてみる。通るようになっているはず。
ExUnit callbacks
- 次に進む前にExUnitのCallbackについて触れる。Bucketのテストには必ずプロセスが開始されていなければならない。テストの中では
{:ok, bucket} = KV.Bucket.start_link
の部分だが、これを毎回書くのはしんどいので、ExUnitはテスト前に必ず実行される処理を書くためのsetup
というMacroを用意している。テストを以下のように書き直してみる。
defmodule KV.BucketTest do # asyncオプションは他のテストと並行で動くという指定。 # ファイルへの書き込みやDBへの書き込みなど、競合する可能性がある場合は指定しない use ExUnit.Case, async: true # setup Macroは各テストの前に必ず実行される setup do {:ok, bucket} = KV.Bucket.start_link {:ok, bucket: bucket} end test "stores values by key", %{bucket: bucket} do # 存在しないkeyを指定したらnilが返ること assert KV.Bucket.get(bucket, "milk") == nil # keyを指定してvalueを保存して値を取り出せること KV.Bucket.put(bucket, "milk", 3) assert KV.Bucket.get(bucket, "milk") == 3 end end
- テストの前にsetupが走り、開始された状態のbucketをテストが受け取るようになった。これでテストごとにstart_linkを呼ぶ必要が無くなった。このように繰り返し必要になる前処理はsetupで書いておくとよい。
- setup以外のコールバックは ExUnit v1.0.5 Documentation を参照
Other agent actions
- Bucketには更新以外にも削除の機能が必要なので、Bucketのdeleteを
Agent.get_and_update/2
を使って実装する。 - HashDict.pop/2を使っているのでkeyを削除すると同時にその時の値が返る。
@doc """ Deletes `key` from `bucket`. Returns the current value of `key`, if `key` exists. """ def delete(bucket, key) do Agent.get_and_update(bucket, &HashDict.pop(&1, key)) end
- テストも書いておく
test "deletes values key", %{bucket: bucket} do # 存在しないキーを削除しようとしたらnilが返ること assert KV.Bucket.delete(bucket, "cheese") == nil # 値を保存 KV.Bucket.put(bucket, "cheese", 1) # 指定したkeyで値を削除できること。削除の際、今持っている値が返ること assert KV.Bucket.delete(bucket, "cheese") == 1 assert KV.Bucket.get(bucket, "cheese") == nil end
Client/Server in agents
- 次の章に行く前に今書いたBucketの課題について考えてみる。Bucketをサーバーとみたてると、複数クライアントからアクセスが来た場合、仮にkey-valueの操作に時間がかかる場合、クライアントの待ち時間が発生する可能性がある。delete関数を以下のように拡張してみる。
def delete(bucket, key) do :timer.sleep(1000) # puts client to sleep Agent.get_and_update(bucket, fn dict -> :timer.sleep(1000) # puts server to sleep HashDict.pop(dict, key) end) end
- こうするとdeleteのたびに2000milsecの待ち時間が発生し、アクセスの数だけ待ち時間が増え続けることになる。
- 次の章で
GenServers
を使い、サーバーとクライアントをより明確に分離してみる。
Elixir 入門 ~ MIX AND OTP ~ その1 - Introduction to Mix
- 実際にMixとOTPを使ったアプリケーションを作成していく。
- ↓のようにKey-Valueなデータを扱うアプリを作る
CREATE shopping OK PUT shopping milk 1 OK PUT shopping eggs 3 OK GET shopping milk 1 OK DELETE shopping eggs OK
- アプリケーションを作るために以下の3つのtoolを使う
Our first project
mix new
を使ってアプリケーションを作成していく。- mixは
mix help
でhelpを見ることができる。オンライン上のドキュメントは Mix v1.0.5 Documentation --module
オプションは作成されるmodule名を明示的に指定する。指定しないと自動でキャメルケースになる。この場合はKv
$ mix new kv --module KV
- 実行するとkvディレクトリが作成され、その中にいくつかのファイルが作成される。次に各ファイルを見ていく。
Project compilation
- mix.exs というファイルが作成されている。これはプロジェクトの設定や依存関係を管理する。
defmodule KV.Mixfile do use Mix.Project # projectという関数はアプリケーションの名前とバージョンを定義する def project do [app: :kv, version: "0.0.1", deps: deps] end # mix compile 時にここに書かれたアプリケーションが参照されて.appファイルに書き込まれる def application do [applications: [:logger]] end # deps は依存関係にあるhexのライブラリ群をListで書く。後述。 defp deps do [] end end
- ではコンパイルをしてみる
$ cd kv $ mix compile
- すると_buildディレクトリの下に kv.appが作成される
- パスは
_build/dev/lib/kv/ebin/kv.app
- デフォルトで登録されているloggerが出力されているのがわかる。他にもバージョンやアプリケーション名なども。詳しくは以降の章で。
{application,kv, [{registered,[]}, {description,"kv"}, {applications,[kernel,stdlib,elixir,logger]}, {vsn,"0.0.1"}, {modules,['Elixir.KV']}]}.%
- いったんプロジェクトをコンパイルした後にアプリケーションのルートでオプションをつけてREPLを起動すればアプリケーションのモジュールをロードした状態でREPLが起動する
iex -S mix
Running tests
- mix new でアプリケーションを作成するとテスト用のモジュールも同時に作成される。テスト用のモジュールはtestディレクトリ配下にlibと同じ構成で作成する。今回は
test/kv_test.exs
が作成されている。 - testファイルはコンパイルの必要がないので
.exs
ファイルで作成される。 - モジュール名は対象のモジュール名 + Test と定義し、Testing APIを使うためにExUnit.Case を実装し、test/2 macroを使ってテストを書く。
defmodule KVTest do use ExUnit.Case test "the truth" do assert 1 + 1 == 2 end end
- また、testディレクトリに
test_helper.exs
が作成されている。これはTestFrameworkを柔軟にセットアップするためのものでmix test
でテストを走らせる際、必ず必要となる。 mix test
でテストが走るが、テストの前にアプリケーションは再コンパイルされる。mix test [ファイルパス]
で走らせるテストを指定することもできる。kv_test.exs
の テストを変更して、testがfailになった場合の表示を確認しておく。
> mix test 1) test the truth (KVTest) test/kv_test.exs:4 Assertion with == failed code: 1 + 1 == 3 lhs: 2 rhs: 3 stacktrace: test/kv_test.exs:5 Finished in 0.03 seconds (0.03s on load, 0.00s on tests) 1 tests, 1 failures Randomized with seed 439788
Environments
- Mixは環境の考え方をサポートしている
:dev
: Mix taskがデフォルトで走るときの環境:test
:mix test
時の環境:prod
: プロダクションにアプリケーションを配置するときの環境
- 環境ごとに異なる値を使いたいときは
Mix.Env
機能を使う。Mix.envには現在使用している環境がatomで格納されている。
# 自動生成されたmix.exsを見てみると Mix.env == :prod というコードがある。 # Mix.env には atom でどの動いている環境が入っていることが分かる。 def project do [app: :kv, version: "0.0.1", elixir: "~> 1.0", build_embedded: Mix.env == :prod, start_permanent: Mix.env == :prod, deps: deps] end