Elixir 入門 ~ MIX AND OTP ~ 実際にOTPを使ってプロセス管理をする
- ElixirのGetting StartもMix/OTPが終わり、次に進むのですが、復習がてら前にElixirで書いたコードを
Mix and OTP
の章でやったことを使って改善してみます。 - 元ネタはQiitaの自分の投稿 ElixirでSlackのbotを作ってHerokuで動かしてみる - Qiita
やりたいこと
- とにかくElixirでなんか作ろうと思って書いたのが上の記事ですが、書いていた時に微妙だなーと思ってた点が2つあります。
改善してみる
- コードはこちらにあがっています。
アプリケーションの起動まわり
mix.exs
を以下のように編集してapplication起動時のモジュールを指定して、Application
ビヘイビアを実装したMagicBot
モジュールを作成してそこに起動用のコードを書けば解決。
def application do [applications: [:logger, :slack], mod: {MagicBot, []}] end
- これで
mix run --no-halt
でアプリケーションが起動するようになりました。
Supervision Tree を使ってプロセスを分離する
今、プロセス1本で動いてるところをこういう
Supervision Tree
に変える。アプリケーションの
supervisor
Slack
のソケットを張り続けるプロセス- Botに実行させたいコマンドを管理する
supervisor
- 各コマンド。命令を受け取った時点で
supevisor
経由で動的に作成される。
- 各コマンド。命令を受け取った時点で
- Slackからメッセージを受け取ったらパースして
supervisor
経由でやりたいことを実行させるという感じですね。
作ってみた
- まずはアプリケーションの起点となるモジュールで全体を管理するSupervisorを起動
lib/magic_bot.ex
defmodule MagicBot do use Application def start(_type, _args) do MagicBot.Supervisor.start_link end end
- 次にBotのsupervisor
lib/magic_bot/supervisor.ex
defmodule MagicBot.Supervisor do use Supervisor def start_link(opts \\ []) do Supervisor.start_link(__MODULE__, :ok, opts) end # Define alias of process name @bot_name MagicBot.Bot @action_sup_name BotAction.Supervisor def init(:ok) do # Get API Token of Slack. api_key = case System.get_env("MAGICBOT_API_KEY") do nil -> Application.get_env(:MagicBot, :api_key) s -> s end # Make child process children = [ supervisor(BotAction.Supervisor, [[name: @action_sup_name]]), worker(MagicBot.Bot, [api_key, [name: @bot_name, sup_action: @action_sup_name]]) ] # 戦略は `one_for_one`でSlackとアクションのsupervisorを起動 supervise(children, strategy: :one_for_one) end end
- MagicBot.Supervisorに管理されるSlackのコネクションを張り続けるモジュール
defmodule MagicBot.Bot do use Slack def handle_connect(slack, state) do # Slackとの接続成功時に呼ばれるコールバック IO.puts "Connected as #{slack.me.name}" {:ok, state} end def handle_message(message = %{type: "message", text: _}, slack, state) do # Slackからメッセージを受け取った時に呼ばれるコールバック trigger = String.split(message.text, ~r{ | }) case String.starts_with?(message.text, "<@#{slack.me.id}>: ") do # @bot名 ~ できたら :respond を渡してactionのプロセスを開始 true -> BotAction.Supervisor.start_action(state[:sup_action], :respond, Enum.fetch!(trigger, 1), message, slack) # それ以外は :hear を渡して actionのプロセスを開始 false -> BotAction.Supervisor.start_action(state[:sup_action], :hear, hd(trigger), message, slack) end {:ok, state} end def handle_message(_message, _slack, state) do {:ok, state} end end
- Botの行動を管理するSupervisor。
lib/bot_action/supervisor.ex
defmodule BotAction.Supervisor do def start_link(opts \\ []) do Task.Supervisor.start_link(opts) end def start_action(supervisor, command, trigger, message, slack) do # Slackのプロセスから呼ばれ、このSupervisor配下に新しいプロセスを作成する。 # Task.Supervisorの子プロセスの戦略は `simple_one_for_one` になり、動的に追加できる。 # クラッシュ時は再起動されない Task.Supervisor.start_child(supervisor, fn -> case command do :respond -> BotAction.Action.respond(trigger, message, slack) :hear -> BotAction.Action.hear(trigger, message, slack) end end) end end
lib/bot_action/action.ex
defmodule BotAction.Action do def hear("lgtm", message, slack) do HTTPoison.start case URI.encode("http://www.lgtm.in/g") |> HTTPoison.get do {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> body |> Floki.find("#imageUrl") |> Floki.attribute("value") |> hd |> send_message(message, slack) {_, _} -> nil end end def hear(_, _, _) do end def respond("エリクサーほしい?", message, slack) do send_message("エリクサーちょうだい!\nhttp://img.yaplog.jp/img/01/pc/2/5/2/25253/1/1354.jpg", message, slack) end def respond(_, _, _) do end defp send_message(text, message, slack) do Slack.send_message(text, message.channel, slack) end end
- これで完成。
意図した通りに動くか確認
- lib/bot_action/action.exに以下の関数を追加してみる。
def hear("test1", message, slack) do # 単純にメッセージを返す send_message("test1", message, slack) end def hear("test2", message, slack) do # 5秒待ってからメッセージを返す :timer.sleep(5000) send_message("test2", message, slack) end def hear("test3", message, slack) do # 意図的に例外を起こす raise "oops" send_message("test3", message, slack) end
- botを起動してSlackに"test2"を流したあとに"test1"を流す。
test1はtest2のsleepに待たされることなく、即座にメッセージが返ってきたので、意図した通りに動いていてますね。
次に"test2"を流したあとに"test3"を流して"test3"のプロセスをクラッシュさせてみます。
test2はtest3のクラッシュの影響を受けずにメッセージを返していますね。test3のプロセスはメッセージを送る前に
raise
しているのでメッセージは返ってきません。という感じで
OTP
を使ってプロセス管理するとなかなかElixirっぽいコードになったかなという印象ですね。
Elixir 入門 ~ MIX AND OTP ~ その8 - Docs, tests and pipelines
- TCP経由で受け取ったメッセージをパースしてKVアプリケーションに送る処理を実装する。
Doctests
doctest
を使ってパースする関数を実装する。これはドキュメントからテストを作成することができ、ドキュメント内の正確なサンプルコードの提供を助けてくれる。lib/kv_server/command.ex
を新しく作ってみよう。
defmodule KVServer.Command do @doc ~S""" Parses the given `line` into a command. ## Examples iex> KVServer.Command.parse "CREATE shopping\r\n" {:ok, {:create, "shopping"}} """ def parse(line) do :not_implemented end end
doctest
はiex>
のラインから始まる、スペース4文字分インデントが下がっているブロックのところで、コマンドと、その次の行にコマンドが返すであろう値を書くdoctest
を走らせてみよう。新しくテスト用のtest/kv_server/command_test.exs
を作成してmix test
する。
defmodule KVServer.CommandTest do use ExUnit.Case, async: true doctest KVServer.Command end
- 実行すると以下のログが出て、
doctest
が実行されていることがわかる。
test/kv_server/command_test.exs:3 Doctest failed code: KVServer.Command.parse "CREATE shopping\r\n" === {:ok, {:create, "shopping"}} lhs: :not_implemented stacktrace: lib/kv_server/command.ex:11: KVServer.Command (module)
- では
parse/1
を実装してテストが通るようにしよう。stlingを受け取ってパターンマッチで"CREATE"だったら返すようにする。直した上でテストを走らせる。今度は通るはず。
def parse(line) do case String.split(line) do ["CREATE", bucket] -> {:ok, {:create, bucket}} end end
- 他のテストも追加していこう。
iex> KVServer.Command.parse "CREATE shopping\r\n" {:ok, {:create, "shopping"}} iex> KVServer.Command.parse "CREATE shopping \r\n" {:ok, {:create, "shopping"}} iex> KVServer.Command.parse "PUT shopping milk 1\r\n" {:ok, {:put, "shopping", "milk", "1"}} iex> KVServer.Command.parse "GET shopping milk\r\n" {:ok, {:get, "shopping", "milk"}} iex> KVServer.Command.parse "DELETE shopping eggs\r\n" {:ok, {:delete, "shopping", "eggs"}} Unknown commands or commands with the wrong number of arguments return an error: iex> KVServer.Command.parse "UNKNOWN shopping eggs\r\n" {:error, :unknown_command} iex> KVServer.Command.parse "GET shopping\r\n" {:error, :unknown_command}
- そして実装も追加。
def parse(line) do case String.split(line) do ["CREATE", bucket] -> {:ok, {:create, bucket}} ["GET", bucket, key] -> {:ok, {:get, bucket, key}} ["PUT", bucket, key, value] -> {:ok, {:put, bucket, key, value}} ["DELETE", bucket, key] -> {:ok, {:delete, bucket, key}} _ -> {:error, :unknown_command} end end
Pipelines
kv_server.ex
に作成したコマンドを組み込んでいこう。その前にcommand.ex
に以下の関数を追加する。
@doc """ Runs the given command. """ def run(command) do {:ok, "OK\r\n"} end
- そして
kv_server.ex
でクライアントからメッセージを受け取った際にコマンドを実行するようにする。
defp serve(socket) do # 受け取ったメッセージを元にコマンドを実行する msg = case read_line(socket) do {:ok, data} -> case KVServer.Command.parse(data) do {:ok, command} -> KVServer.Command.run(command) {:error, _} = err -> err end {:error, _} = err -> err end # コマンドの結果を出力 write_line(socket, msg) serve(socket) end defp read_line(socket) do :gen_tcp.recv(socket, 0) end defp write_line(socket, msg) do :gen_tcp.send(socket, format_msg(msg)) end defp format_msg({:ok, text}), do: text defp format_msg({:error, :unknown_command}), do: "UNKNOWN COMMAND\r\n" defp format_msg({:error, _}), do: "ERROR\r\n"
- ここまでできたら再び
mix run --no-halt
でサーバーを起動してtelnetで接続しよう。CREATEコマンドを打ったらOKが、未定義のコマンドを打ったらUNKNOWN COMMANDが返ってくるはず
CREATE shopping OK HELLO UNKNOWN COMMAND
- ここで、ふたたびserveを見るとネストが深く見辛いコードになっているように感じる。できれば
|>
でつなげて書きたいが、コマンドが返す値は:ok
を含んだtupleかerror
を含んだtupleが返る可能性があるので返り値をうまくさばけない。{:ok, _}
が返ってくる限り、値をパイプで私続けるような機能が欲しい。 elixir-pipes
というモジュールがまさにあてはまる。mix.exs
を開いて追加しよう。
def application do [applications: [:logger, :pipe, :kv], mod: {KVServer, []}] end defp deps do [{:kv, in_umbrella: true}, {:pipe, github: "batate/elixir-pipes"}] end
- 保存したら
mix deps.get
して依存モジュールを取得しよう。そしてこれを使うことでserve/1
は以下のように書き直すことができる。
defp serve(socket) do import Pipe # pipe_matching/3は x, {:ok, x} で {:ok, value} が与えられている間はvalueを次の関数に渡し続けるように命令している # マッチしない場合はマッチしなかった値を返す。 msg = pipe_matching x, {:ok, x}, read_line(socket) |> KVServer.Command.parse() |> KVServer.Command.run() write_line(socket, msg) serve(socket) end
Running commands
- 最後に
KVServer.Command.run/1
を実装してKVアプリケーションに命令を渡すようにしよう。
@doc """ Runs the given command. """ def run(command) def run({:create, bucket}) do KV.Registry.create(KV.Registry, bucket) {:ok, "OK\r\n"} end def run({:get, bucket, key}) do lookup bucket, fn pid -> value = KV.Bucket.get(pid, key) {:ok, "#{value}\r\nOK\r\n"} end end def run({:put, bucket, key, value}) do lookup bucket, fn pid -> KV.Bucket.put(pid, key, value) {:ok, "OK\r\n"} end end def run({:delete, bucket, key}) do lookup bucket, fn pid -> KV.Bucket.delete(pid, key) {:ok, "OK\r\n"} end end defp lookup(bucket, callback) do case KV.Registry.lookup(KV.Registry, bucket) do {:ok, pid} -> callback.(pid) :error -> {:error, :not_found} end end
- 受け取った命令に応じてbucketを操作するように直した。
lookup
を見てみよう。bucketが見つからなかったら{:error, :not_found}
を返すようにしているが、ユーザーに表示する場合もNot Foundであることを伝えるようにしたほうがよい。KV.Server
のformat_msg/1
のパターンを追加する。
defp format_msg({:error, :not_found}), do: "NOT FOUND\r\n"
- これでサーバーはほとんど完成した。最後にテストを追加しよう。
defmodule KVServerTest do use ExUnit.Case setup do Logger.remove_backend(:console) # KVアプリケーションを止めてstateをリセットした上で再開 Application.stop(:kv) :ok = Application.start(:kv) Logger.add_backend(:console, flush: true) :ok end setup do # クライアントからの接続を作成 opts = [:binary, packet: :line, active: false] {:ok, socket} = :gen_tcp.connect('localhost', 4040, opts) {:ok, socket: socket} end test "server interaction", %{socket: socket} do assert send_and_recv(socket, "UNKNOWN shopping\r\n") == "UNKNOWN COMMAND\r\n" assert send_and_recv(socket, "GET shopping eggs\r\n") == "NOT FOUND\r\n" assert send_and_recv(socket, "CREATE shopping\r\n") == "OK\r\n" assert send_and_recv(socket, "PUT shopping eggs 3\r\n") == "OK\r\n" # GET returns two lines assert send_and_recv(socket, "GET shopping eggs\r\n") == "3\r\n" assert send_and_recv(socket, "") == "OK\r\n" assert send_and_recv(socket, "DELETE shopping eggs\r\n") == "OK\r\n" # GET returns two lines assert send_and_recv(socket, "GET shopping eggs\r\n") == "\r\n" assert send_and_recv(socket, "") == "OK\r\n" end defp send_and_recv(socket, command) do :ok = :gen_tcp.send(socket, command) {:ok, data} = :gen_tcp.recv(socket, 0, 1000) data end end
mix test
を走らせて通れば完成だ。
Elixir 入門 ~ MIX AND OTP ~ その7 - Task and gen-tcp
- Erlangの
:gen_tcp
モジュールについて学ぶ。
Echo server
- まずはEcho Serverを作成することから始める。TCP Serverは以下のStepを実行する
- 利用可能なPortを開いてListenする
- そのPortでクライアントからの接続を待ち、受け入れる
- クライアントからの要求を解析し、レスポンスを返す。
- KVServerアプリケーションにこれらの機能を組み込んでみる。
lib/kv_server.ex
を開いて以下のように編集する。
# 指定されたportでListenを開始する def accept(port) do # The options below mean: # # 1. `:binary` - receives data as binaries (instead of lists) # 2. `packet: :line` - receives data line by line # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes # {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true]) IO.puts "Accepting connections on port #{port}" loop_acceptor(socket) end # socketを受け続けるループ。 defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) serve(client) loop_acceptor(socket) end # socketを受け取っったときの処理 defp serve(socket) do socket |> read_line() |> write_line(socket) serve(socket) end # socketを読み込む defp read_line(socket) do {:ok, data} = :gen_tcp.recv(socket, 0) data end # TCPを通してResponseを返す defp write_line(line, socket) do :gen_tcp.send(socket, line) end
iex -S mix
からREPLの中でListenを開始してみる
iex> KVServer.accept(4040) Accepting connections on port 4040
- 開始されたらTerminalからtelnetでローカルホストの4040ポートに接続して、適当な文字を送ってみる
$ telnet 127.0.0.1 4040 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world !! hello world !!
- 送った
hello world !!
がそのまま返されていることがわかる。 - telnetの接続を終了させると以下のようなエラーがサーバー側に表示される。
iex(1)> KVServer.accept(4040) Accepting connections on port 4040 ** (MatchError) no match of right hand side value: {:error, :closed} (kv_server) lib/kv_server.ex:50: KVServer.read_line/1 (kv_server) lib/kv_server.ex:43: KVServer.serve/1 (kv_server) lib/kv_server.ex:37: KVServer.loop_acceptor/1
- これは
gen_tcp.recv/2
でsocketを受け取ることを期待しているのに接続が閉じられたためで、その場合のハンドルが足りていない。そして、例外が発生したと同時にサーバーが終了して再起動しないことがわかる。これらを解決するためにはsupervision tree
の元でプロセスを動かす必要があることに気づくだろう。
Tasks
- Taskモジュールを使ってKVServerを
supervision tree
の監督下の元で動かす。再びlib/kv_server.ex
を編集する。
def start(_type, _args) do import Supervisor.Spec children = [ worker(Task, [KVServer, :accept, [4040]]) ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
- これでサーバーは
supervision tree
の一部となったのでmix run --no-halt
でアプリケーションを動かしてみよう。--no-halt
はアプリケーションのプロセスを止めないで起動するオプションである。 - サーバーが立ち上がるはずなので同じくtelnetでローカルホストの4040ポートに接続してメッセージを送ってみる。問題なくメッセージは帰ってくるはずだ。
- さきほどと同じようにtelnetを終了してみる。そうすると今度はエラーメッセージは表示されるが、サーバー自体は終了しない。これは
one_for_one
戦略に従いsupervisor
が終了したサーバーのプロセスを再起動しているため。 - では、別のターミナルで2個目のコネクションを張って同じくメッセージを送ってみる。すると接続はできるがレスポンスは返ってこない。今の作りでは接続中のクライアントがすでに存在していると新しいクライアントは受け入れられていないことがわかる。
Task supervisor
- 複数のコネクションを処理するためにはコネクションを受け入れるプロセスとコネクションを処理するプロセスを分離する必要がある。
- 再び
start/2
を編集する。
def start(_type, _args) do import Supervisor.Spec # TaskのSupervisorをsupervision treeに追加 children = [ supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]), worker(Task, [KVServer, :accept, [4040]]) ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
TaskSupervisor
をsupervision tree
の下に追加した。次にコネクションを処理するプロセスをTaskSupervisor
の下に作るようにloop_acceptor/1
を変更する。
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end) :ok = :gen_tcp.controlling_process(client, pid) loop_acceptor(socket) end
Elixir 入門 ~ MIX AND OTP ~ その6 - Dependencies and umbrella projects
- KVアプリケーションは完成したので次にそれを実装するTCPサーバーを作成する。その前に
Mix
のDependency
について理解する。
External dependencies
- 外部依存はビジネスドメインに縛られない。例えば今のKVアプリケーションのためにHTTP APIが必要であれば外部依存するパッケージとして
Plug
を使う。 - パッケージは
Hex Package Manager
に登録されている。依存関係はmix.exs
のdeps
関数に定義する。
def deps do # 0.5.x で最新のバージョンを取得 [{:plug, "~> 0.5.0"}] end
- Hexで管理されているパッケージは安定版だが、開発中のものを使いたいときはgit経由でも書くことができる。
def deps do [{:plug, git: "git://github.com/elixir-lang/plug.git"}] end
- 依存パッケージを追加してコンパイルすると
Mix
はmix.lock
を生成する。これは依存関係にあるパッケージのどのバージョンを使ったかが記録されている。
Internal dependencies
- 内部依存はプロジェクト特有のもので、プロジェクト外では意味をなさずプライベートにしておきたいもので、内部依存のアプリケーションを使いたい場合は、git経由か、
umbrella projects
という方法がある。umbrella projects
を使うと一つのプロジェクトの中に複数のアプリケーションをホストでき、単一のリポジトリで管理できるので通常はこちらを使う方が管理しやすい。これまでに作ったKVアプリケーションを含んだプロジェクトkv_umbrella
を作成する。
Umbrella projects
- 複数のアプリケーションを管理する
kv_umbrella
を新しく作成する。mix new
のときに--umbrella
オプションを付与する。
mix new kv_umbrella --umbrella
- 実行するとコンソールに表示される内容が
--umbrella
なしでmix new
したときと変わっているはずだ。mix.exs
のコードもまた変わっている。--umbrella
オプションを指定したことでこのアプリケーションがumbrella
として動作するようになった。 - 次に
kv_umbrella/apps/
に移動してkv_server
アプリケーションを作成する。こちらは作成の際に--sup
オプションをつける。これはMix
が自動でsupervision tree
を生成してくれるようになる。
mix new kv_server --module KVServer --sup
- KVServerの
mix.exs
を見てみよう。
defmodule KVServer.Mixfile do use Mix.Project def project do [app: :kv_server, version: "0.0.1", deps_path: "../../deps", lockfile: "../../mix.lock", elixir: "~> 1.0", deps: deps] end def application do [applications: [:logger], mod: {KVServer, []}] end defp deps do [] end end
deps_path
とlockfile
が追加されているが、これはappsの下でmix new
をしたことでMix
が自動で全体の構造を解釈して追加している。このアプリケーションが外部依存するパッケージはkv_umbrella
と共有されることになった。- また、
application/0
の中も変わって、mod: {KVServer, []}
が追加されている。これは--sup
フラグを追加したことによるもので、KVServer
が全体のアプリケーションのsupervision tree
を開始することを意味する。 lib/kv_server.ex
の中を見ると前の章で作成したsupervisor
の実装が自動で組み込まれていることがわかる。あとはこれに監督されるアプリケーションを追加していく。
In umbrella dependencies
apps/kv_server/mix.exs
を開いてkv
を追加する。
defp deps do # kvがkv_serverの中で使えるようになる [{:kv, in_umbrella: true}] end
deps
は依存関係を解決するだけなのでKVアプリケーションが開始されるわけではない。applicaiton/0
に追加してKVアプリケーションが開始されるようにする。
def application do # kv_serverが開始されれば`kv`も開始されるようになる [applications: [:logger, :kv], mod: {KVServer, []}] end
- 次に
apps
の下に今まで作成したKVアプリケーションを移動する。KVのmix.exs
もKVServerと同様に以下のコードを追加しておく。
deps_path: "../../deps", lockfile: "../../mix.lock",
- ここまで出来たら
kv_umbrella
のrootに移動してmix test
を実行してみる。2つのアプリケーションのテストが走るはず。 - このように
Umbrella Project
はアプリケーションを管理するのに便利である。管理されるアプリケーションは切り離されていて個別の設定が定義できる。個別に開発することもできるし、リストを明示的にして一緒に使うこともできる。
Elixir 入門 ~ MIX AND OTP ~ その5 - ETS
- キャッシュ機構である
Erlang Term Storage
について学ぶ。
ETS as a cache
- ETSは
ets
モジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected]) 8211 iex(2)> :ets.insert(table, {"foo", self}) true iex(3)> :ets.lookup(table, "foo") [{"foo", #PID<0.59.0>}]
- ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では
:buckets_registry
というテーブルにテーブルの型とアクセスルールを与えている。:set
はキーの重複を許さないという意味、:protected
はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。 [:set, :protected]
はデフォルト値なので指定しなかった場合は自動で設定される。lib/kv/registry.ex
を開いてETSを組み込んでみよう
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry. """ def start_link(table, event_manager, buckets, opts \\ []) do # 1. ETSのテーブルのプロセスをstart_linkに追加 GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts) end @doc """ Looks up the bucket pid for `name` stored in `table`. Returns `{:ok, pid}` if a bucket exists, `:error` otherwise. """ def lookup(table, name) do # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す case :ets.lookup(table, name) do [{^name, bucket}] -> {:ok, bucket} [] -> :error end end @doc """ Ensures there is a bucket associated with the given `name` in `server`. """ def create(server, name) do GenServer.cast(server, {:create, name}) end ## Server callbacks def init({table, events, buckets}) do # 3. HashDictからETS tableに変更 ets = :ets.new(table, [:named_table, read_concurrency: true]) refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end # 4. handle_call Callbackは消しておく def handle_cast({:create, name}, state) do # 5. HashDictの代わりにETS Tableに対して Read/writeする case lookup(state.names, name) do {:ok, _pid} -> {:noreply, state} :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) {:noreply, %{state | refs: refs}} end end def handle_info({:DOWN, ref, :process, pid, _reason}, state) do # 6. BucketのプロセスがダウンしたらETS Tableから消す {name, refs} = HashDict.pop(state.refs, ref) :ets.delete(state.names, name) GenEvent.sync_notify(state.events, {:exit, name, pid}) {:noreply, %{state | refs: refs}} end def handle_info(_msg, state) do {:noreply, state} end end
- この修正でテストも壊れるので直しておく
setup do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry, ets: :registry_table} end
- 各テストの引数を
%{registry: registry, ets: ets}
とし、setupで追加したテーブル名を受け取れるようにしておく。 - データを取り出す先が
ETS
になったので、KV.Registry.lookup(registry, ...)
をKV.Registry.lookup(ets, ...)
に変更。 - まだ失敗する。。。
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
ここで落ちる。- lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
- 再び
lib/kv/registry.ex
を修正する。
def create(server, name) do # cast から call に GenServer.call(server, {:create, name}) end # handle_callに。callになったので _fromを追加。使わないけど def handle_call({:create, name}, _from, state) do case lookup(state.names, name) do {:ok, pid} -> # returnが必要なのでbucketのpidを返す。 {:reply, pid, state} # Reply with pid :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) # returnが必要なのでbucket のpidを返す。 {:reply, pid, %{state | refs: refs}} # Reply with pid end end
- これで
mix test
を実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。 - 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
- さきほどと違い、プロセスの停止のイベントを
handle_info
で拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Manager
の通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(ets, "shopping") Agent.stop(bucket) assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。 assert KV.Registry.lookup(ets, "shopping") == :error end
- これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので
liv/kv/supervisor.ex
を編集してETS
のプロセスを追加する。
@manager_name KV.EventManager @registry_name KV.Registry @ets_registry_name KV.Registry @bucket_sup_name KV.Bucket.Supervisor def init(:ok) do children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [@ets_registry_name, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
ETS as persistent storage
- これまでに
ETS
のプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETS
のプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registry
のプロセスが終了すれば自動的にETS
のプロセスも終了する。 では、
ETS
をregistry
のプロセスから切り離したらどうなるか。bucket
の情報がETS
に保存されているのであればregistry
が死んだとしてもETS
からbucket
のプロセスをregistry
のsupervisor
に復帰させることができる。liv/kv/supervisor.ex
のinitを以下のように編集する。
def init(:ok) do # init時にetsを開始する。 ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}]) # テーブル名ではなくetsのプロセスを渡す。 children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
- 合わせて
lib/kv/registry.ex
のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end
- 最後にテストを直す。setup時に
ETS
を開始して開始済みのプロセスをregistry
に渡してやるようにする。ここまででmix test
をして通ることを確認する。
setup do ets = :ets.new(:registry_table, [:set, :public]) registry = start_registry(ets) {:ok, registry: registry, ets: ets} end defp start_registry(ets) do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(ets, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) registry end
- では最後のテストケースを追加する。
ETS
のプロセスをregistry
から分離したことでregistry
が死んでもETS
は死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do # とりあえずbucketを作成 bucket = KV.Registry.create(registry, "shopping") # registryのプロセスを終了する Process.unlink(registry) Process.exit(registry, :shutdown) # 生き残っているetsを使ってregisryを起動しなおす start_registry(ets) # 最初に作成したbucketが取れることを確認 assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket} # bucketのプロセスを終了 Process.exit(bucket, :shutdown) # ETSからの削除されていることを確認 assert_receive {:exit, "shopping", ^bucket} assert KV.Registry.lookup(ets, "shopping") == :error end
- これを通すための修正を
lib/kv/registry.ex
に入れる。
def init({ets, events, buckets}) do # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す refs = :ets.foldl(fn {name, pid}, acc -> HashDict.put(acc, Process.monitor(pid), name) end, HashDict.new, ets) {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end
Elixir 入門 ~ MIX AND OTP ~ その4 - Supervisor and Application
- この章ではElixirのポリシーである“fail fast” と “let it crash”を実現している
supervisor
について学ぶ。
Our first supervisor
Supervisor behaviour
を使ってsupervisor
を実装したモジュールlib/kv/supervisor.ex
を作成する。
defmodule KV.Supervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, :ok) end # attributeで子プロセス名を定義 @manager_name KV.EventManager @registry_name KV.Registry # GenServerと同様にstart_linkで呼ばれるコールバック def init(:ok) do # `supervisor`は`event manager` と`registry`の2つの子プロセスを管理している。 # 子プロセスは他のプロセスがpidを知らなくてもアクセスできるように別名をつけておく。例えば子プロセスがクラッシュして立ち上げ直した場合にpidは変わるが別名をつけておけばそのままアクセスできる children = [ worker(GenEvent, [[name: @manager_name]]), worker(KV.Registry, [@manager_name, [name: @registry_name]]) ] # strategyは `:one_for_one` を指定してプロセスの管理を開始。この戦略はもし、子プロセスが死んだら新しいプロセスを作るというもの。 supervise(children, strategy: :one_for_one) end end
- ファイルを作成したら
iex -S mix
で確認する
iex> KV.Supervisor.start_link {:ok, #PID<0.100.0>} iex> KV.Registry.create(KV.Registry, "shopping") :ok iex> KV.Registry.lookup(KV.Registry, "shopping") {:ok, #PID<0.104.0>}
supervisor
を開始すると同時にevent manager
とregistry
のプロセスが同時に開始された。なのでsuoervisor.start_link
のあとにbucketのcreateやlookupができている。
Understanding applications
- いま、
.app
ファイルは_build/dev/lib/kv/ebin/kv.ap
に見つけることができる。 - このファイルは
Erlang
のシンタックスで書かれており、Erlang
のカーネルやElixir
自身、mix.exs
で定義した依存関係にあるモジュールやそのバージョンなどの情報が含まれている。 mix.exs
のapplication/0
で返された値を使って.app
をカスタマイズすることもできる。
Starting applications
iex -S mix run --no-start
を使ってアプリケーションの開始・停止を手動で行ってみる。
iex(1)> Application.start(:kv) :ok iex(2)> Application.stop(:kv) :ok 14:19:09.629 [info] Application kv exited: :stopped iex(3)> Application.stop(:logger) :ok =INFO REPORT==== 14-Sep-2015::14:19:16 === application: logger exited: stopped type: temporary iex(4)> Application.start(:kv) {:error, {:not_started, :logger}}
- 最初の
start
はiex起動時に--no-start
オプションがないと二重に起動しているという旨のエラーが返る。 - 4行目の
start
が失敗するのは依存関係にあるlogger
が3行目で止められているから。このような場合はApplication.ensure_all_started/1
を使えば丸ごと立ち上げることができる。
The application callback
- アプリケーションは起動時にcallbackを指定できる。callbackの戻り値は
{:ok, pid}
で、pidはsupervisorのpidでなければいけない。 - callbackを実装するために
mix.exs
を開いて以下のように編集する。
def application do [applications: [], mod: {KV, []}] end
:mod
オプションがcallback時に起動するモジュールを指しており、そのモジュールはApplication behaviour
を実装していなければいけない。:mod
にKV
を指定したので次にKV
にApplication behaviour
を実装する。lib/kv.ex
を開き、以下のように編集する。
defmodule KV do use Application # Application behaviourを実装するためには start/2 を実装する必要がある。 def start(_type, _args) do KV.Supervisor.start_link end # stop/1 を定義して停止の際にカスタムを入れることもできる end
- ここまでできたら
iex -S mix
を起動する。start
ですでにsupervisor
のプロセスが開始されているのでKV.Supervisor.start_link
をたたかなくてもすでにbucket
の操作ができるようになっているはず。
Projects or applications?
- Mixは
project
とApplication
を区別する。mix.exs
の内容に基づき、我々は:kv application
を定義したMix Project
を持っていると言える。あとの章ではどのようなApplication
も定義しないProject
がでてくる。 - チュートリアルの中で
Project
について話すときはMix
について考えるべきである。Mix
はProject
を管理するツールで、どのようにProject
を編集し、テストし、関連したApplication
を開始するかを知っている。 Application
ではOTP
について考えるべきである。Application
はランタイムで開始して終了するエンティティでmix help compile.app
でdef application
のオプションについて学ぶことができる。
Simple one for one supervisors
bucket
のプロセスとregistry
のプロセスはリンクしていてbucket
のクラッシュはregistry
のクラッシュと同意義である。registry
がクラッシュした場合はsupervisor
がregistry
を復活させることが保証されているが、bucket
はすべてなくなってしまう。bucket
がクラッシュしてもregistry
は生き続けるようにしてみよう。- 次のテストを追加する。
test "removes bucket on crash", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # Kill the bucket and wait for the notification Process.exit(bucket, :shutdown) assert_receive {:exit, "shopping", ^bucket} assert KV.Registry.lookup(registry, "shopping") == :error end
- “removes bucket on exit” テストと似ているが、
Agent.stop/1
の代わりにshutdown
の命令を送ってbucket
のプロセス自体を破棄している。bucket
のプロセスはregistry
のプロセスとリンクしているのでこの行為はregistry
のプロセスを破棄することに等しく、すなわちテストも失敗する。 - この問題を解決するために新しい戦略
:simple_one_for_one
を持ったsupervisor
を定義する。このsupervisor
はすべてのbucket
を生み出し、管理する役目を負う。lib/kv/bucket/supervisor.ex
を新しく作成する。
defmodule KV.Bucket.Supervisor do use Supervisor def start_link(opts \\ []) do Supervisor.start_link(__MODULE__, :ok, opts) end # 受け取ったsupervisorの子プロセスとしてbucketを開始する関数 # KV.Bucket.start_linkの代わりに呼び出すようになる def start_bucket(supervisor) do Supervisor.start_child(supervisor, []) end def init(:ok) do # restart: :temporaryはbucketが死んでも自動で再開しないことを明示している。 # bucketはregistryを通してのみ管理されるようにする = start_bucket をでしか開始されない children = [ worker(KV.Bucket, [], restart: :temporary) ] supervise(children, strategy: :simple_one_for_one) end end
iex -S mix
でBucket.Supervisor
の動きを確認する。
iex(1)> {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, #PID<0.90.0>} iex(2)> {:ok, bucket} = KV.Bucket.Supervisor.start_bucket(sup) {:ok, #PID<0.92.0>} iex(3)> KV.Bucket.put(bucket, "eggs", 3) :ok iex(4)> KV.Bucket.get(bucket, "eggs") 3
- 次に
test/kv/registry_test.exs
のsetupを編集して期待する動作を書く
setup do # Bucketのsupervisorを開始(今回追加) {:ok, sup} = KV.Bucket.Supervisor.start_link # Event Managerを起動 {:ok, manager} = GenEvent.start_link # EventManagerとBucketのsupervisorをレジストリに渡して起動(bucketのsupervisorのpidをargmentsに追加) {:ok, registry} = KV.Registry.start_link(manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry} end
- 合わせて
KV.Registry
をテストが通るように編集
## Client API @doc """ Starts the registry. """ def start_link(event_manager, buckets, opts \\ []) do # 1 bucketのsupervisorのプロセスを受け取れるようにする GenServer.start_link(__MODULE__, {event_manager, buckets}, opts) end
## Server callbacks def init({events, buckets}) do names = HashDict.new refs = HashDict.new # 2 start_linkで追加したbucketのsupervisorのプロセスをstateに追加する。 {:ok, %{names: names, refs: refs, events: events, buckets: buckets}} end def handle_cast({:create, name}, state) do if HashDict.get(state.names, name) do {:noreply, state} else # 3 bucketのsupervisor経由で新しいbucketが作成されるようにする {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) names = HashDict.put(state.names, name, pid) GenEvent.sync_notify(state.events, {:create, name, pid}) {:noreply, %{state | names: names, refs: refs}} end end
- ここまで追加できたらtestを通してみる。正しく動くようになっているはず。
Supervision trees
Bucket
のsupervisor
をアプリケーションで使うためには、supervisor
を監督するsupervisor
を作りsupervision trees
を形成する必要がある。lib/kv/supervisor.ex
を編集する。
@manager_name KV.EventManager @registry_name KV.Registry # bucketのsupervisorの別名を追加 @bucket_sup_name KV.Bucket.Supervisor def init(:ok) do # childrenにbucketのsupervisorを追加し、registryのworkerのargumentsにbucketのsupervisorのpid(別名)を追加 # bucketのsupervisorはregistryのworkerより前に定義しなければいけない。 children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [@manager_name, @bucket_sup_name, [name: @registry_name]]) ] # この時点での :one_for_one 戦略は正しい。registryがクラッシュしたらbucketのsupervisorも死ぬべきだし、その逆も同じ。 supervise(children, strategy: :one_for_one) end
Elixir 入門 ~ MIX AND OTP ~ その4 - GenEvent
- 複数のhandlerにイベントをpublishできるGenEventについて学ぶ。
Event managers
iex -S mix
でセッションを新しく初め、GenEvent API
を触ってみる。
iex> {:ok, manager} = GenEvent.start_link {:ok, #PID<0.83.0>} iex> GenEvent.sync_notify(manager, :hello) :ok iex> GenEvent.notify(manager, :world) :ok
sync_notify
とnotify
でmanagerにメッセージを送信しているが、handlerが実装されていないのでpublishが成功したという意味の:ok
が返るだけで特になにもおきない。- 次に通知を受け取るhandlerを作成する.
iex> defmodule Forwarder do ...> use GenEvent ...> def handle_event(event, parent) do ...> send parent, event ...> {:ok, parent} ...> end ...> end iex> GenEvent.add_handler(manager, Forwarder, self()) :ok iex> GenEvent.sync_notify(manager, {:hello, :world}) :ok iex> flush {:hello, :world} :ok
- Forwarderというハンドラを作成し、
GenEvent.add_handler/3
で今のプロセスで動くhandlerとして追加した。 - Forwarderは単に受け取ったメッセージを親プロセスに送り返すだけのhandlerなのでflushでメッセージを解放してやると送り返されていることがわかる。
sync_notify/2
はリクエストに対し同期的に動き、notify/2
はリクエストに対し非同期的に動く。 これらはGenServer
のcall
とcast
によく似ている
Registry events
- registryにEventManagerの仕組みを実装する。まずは
test/kv/registry_test.exs
に期待する動作を実装する。
# イベントハンドラ、EventMangaerから通知を受け取った時に動く動作を書く。 # Forwarderモジュールはイベントを受け取ったら親にイベントを送り返すだけのハンドラ defmodule Forwarder do use GenEvent def handle_event(event, parent) do send parent, event {:ok, parent} end end # setup時にadd_mon_handler/3でForwarderをセットする形に変更 setup do {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(manager) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry} end # ハンドラーのテストケースを追加 test "sends events on create and crash", %{registry: registry} do # bucketをcreateした時に子プロセスからイベントを受け取っているか。 KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # ^ は bucketのpidとマッチさせるために使っている assert_receive {:create, "shopping", ^bucket} # bucketが破棄された時に子プロセスからイベントを受け取っているか。 Agent.stop(bucket) assert_receive {:exit, "shopping", ^bucket} end
- 次にこのテストを通すための実装を
lib/kv/registry.ex
に追加・変更する
## Client API @doc """ Starts the registry. """ def start_link(event_manager, opts \\ []) do # GenEvent.start_linkで取得したEventMangerを受け取れるように引数を追加する GenServer.start_link(__MODULE__, event_manager, opts) end ## Server callbacks def init(events) do # callbackの引数の数が増えたのでstateをtupleからmapに names = HashDict.new refs = HashDict.new {:ok, %{names: names, refs: refs, events: events}} end def handle_call({:lookup, name}, _from, state) do # initでstateがmapになったので対応 {:reply, HashDict.fetch(state.names, name), state} end def handle_cast({:create, name}, state) do # initでstateがmapになったので対応 if HashDict.get(state.names, name) do {:noreply, state} else {:ok, pid} = KV.Bucket.start_link() ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) names = HashDict.put(state.names, name, pid) # EventManagerにCreateイベントを通知する GenEvent.sync_notify(state.events, {:create, name, pid}) # stateを更新 {:noreply, %{state | names: names, refs: refs}} end end def handle_info({:DOWN, ref, :process, pid, _reason}, state) do {name, refs} = HashDict.pop(state.refs, ref) names = HashDict.delete(state.names, name) # EventManagerにExitイベントを通知する GenEvent.sync_notify(state.events, {:exit, name, pid}) # stateを更新 {:noreply, %{state | names: names, refs: refs}} end
Event streams
- 最後にStreamを扱うGenEventについて紹介する
- EventManagerを作成し、spwan_linkで
GenEvent.stream/1
を呼び続けるプロセスを作成する - notifyでメッセージを作成したEventManagerに送るとコールバックで渡したメッセージが出力されるという例
iex> {:ok, manager} = GenEvent.start_link {:ok, #PID<0.83.0>} iex> spawn_link fn -> ...> for x <- GenEvent.stream(manager), do: IO.inspect(x) ...> end #PID<0.97.0> iex> GenEvent.notify(manager, {:hello, :world}) {:hello, :world} :ok