Elixir 入門 ~ META-PROGRAMMING IN ELIXIR ~ その1 - Quote and unquote
Quoting
- Elixir のプログラムは
tuple
を伴った3つの要素に置き換えることができる。例えばsum(1, 2, 3)
があった場合は以下のように書き換えることができる。 - 以下、自分の環境だと
sum
なんかおらん!と言われるのでrem
とか適当なビルトイン関数に置き換えて試すといいかと。
{:sum, [], [1, 2, 3]}
- quote マクロを使うと式の構造を見ることができる
iex> quote do: sum(1, 2, 3) {:sum, [], [1, 2, 3]}
iex> quote do: 1 + 2 {:+, [context: Elixir, import: Kernel], [1, 2]}
map
も同様に確認できる。
iex> quote do: %{1 => 2} {:%{}, [], [{1, 2}]}
- 変数も同様
iex> quote do: x {:x, [], Elixir}
Macro.toString/1
を使えば式を文字列で取得できる。
iex> Macro.to_string(quote do: sum(1, 2 + 3, 4)) "sum(1, 2 + 3, 4)"
Unquoting
Quote
で式の構造を見ることができるが、場合によっては展開して欲しくない部分もあるかもしれない。以下の例を見てみる。
iex> number = 13 iex> Macro.to_string(quote do: 11 + number) "11 + number"
- numberがそのままstringとして扱われていて、欲しい結果ではない。numberに値を注入するためには
unquote
を使う。
iex> number = 13 iex> Macro.to_string(quote do: 11 + unquote(number)) "11 + 13"
- さきほどとは異なり、
unquote
で囲んだnumberは束縛されている値が表示された。 unquote
は関数名にも適用できる。
iex> fun = :hello iex> Macro.to_string(quote do: unquote(fun)(:world)) "hello(:world)"
fun
はhello/1
で展開された。- Listの中に新しい要素を追加したい場合を考えてみる。
iex> inner = [3, 4, 5] iex> Macro.to_string(quote do: [1, 2, unquote(inner), 6]) "[1, 2, [3, 4, 5], 6]"
- これは期待した結果ではない。このような場合は
unquote_splicing
を使うとよい。
iex> inner = [3, 4, 5] iex> Macro.to_string(quote do: [1, 2, unquote_splicing(inner), 6]) "[1, 2, 3, 4, 5, 6]"
Unquoting
はコードの中に別のコードを埋め込むことができ、マクロを書くときにとても便利なので覚えておこう。
Escaping
Macro.escape/1
を使っても展開できる。
iex> map = %{hello: :world} iex> Macro.escape(map) {:%{}, [], [hello: :world]}
- マクロは
quoted expressions
を受け取りquoted expressions
を返す必要がある。大事なのはexpression
とvalue
を区別することである。integerやatom、stringといったようなvalueはquoted expressions
が自分自身の値だが、mapのような値は変換される必要がある。 - ちょっと訳が怪しいけど。。。おそらく以下のように
quote
をかけるとquoted expressions
が返るものと自身の値が返るもので違いがあるので気をつけろということかな。
iex> quote do: %{1=> 2} {:%{}, [], [{1, 2}]} iex> quote do: :hello :hello
Elixir 入門 ~ MIX AND OTP ~ その9 - Distributed tasks and configuration
- 最後に
routing
の機能をKVアプリケーションに追加する。routing table
はこのようになる。
[{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
Our first distributed code
- VMに名前をつけて開始する。
iex --sname foo
でREPLを起動する。 - すると以下のように
node名@computer-name
がpromptに表示される
Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help) iex(foo@your_hostname)>
- この中でモジュールを定義する。
iex(foo@your_hostname)> defmodule Hello do ...> def world, do: IO.puts "hello world" ...> end iex(foo@your_hostname)> Hello.world hello world :ok
- 次にfooを生かしたまま、別のnode名で新しくREPLを立ち上げて、先ほどの
Hello.world
を呼ぶ
iex --sname bar
iex(bar@your_hostname)> Hello.world ** (UndefinedFunctionError) undefined function: Hello.world/0 (module Hello is not available) Hello.world()
- 当然、barには定義されていないので返ってこない。ここで
Node.spawn_link
を使ってfoo
の新しいプロセスを作ることができる。
iex(bar@your_hostname)> Node.spawn_link :"foo@ your_hostname", fn -> Hello.world end hello world #PID<8897.78.0>
- hello world` が 表示された。他のnodeのプロセスを生んでそのpidを返したことがわかる。次にそのpidを使ってメッセージの送受信をする。
iex(bar@your_hostname)> pid = Node.spawn_link :"foo@your_hostname", fn -> ...> receive do ...> {:ping, client} -> send client, :pong ...> end ...> end #PID<8897.86.0> iex(bar@your_hostname)> send pid, {:ping, self} {:ping, #PID<0.65.0>} iex(bar@your_hostname)> flush :pong :ok
- foo nodeに :ping を受け取って :pong を返す関数を定義したプロセスを作った。そしてbarから fooに :ping を送って :pong が帰ってきたことがわかる。ただ、このやり方は
supervison tree
の管理からは外れるので避けたい。他の方法も探してみよう。
async/await
- Elixirは
async/await
パターンも提供している。asyncで先に計算してawaitであとで結果を取り出すというようなことが可能。
iex> task = Task.async(fn -> ...> :timer.sleep(5000) ...> "hogehoge" ...> end) %Task{pid: #PID<0.82.0>, ref: #Reference<0.0.2.103>} iex> Task.await(task) "hogehoge"
- Task.supervisorの下で使うことも可能。その場合は
Task.Supervisor.start_child/2
の代わりにTask.Supervisor.async/2
でプロセスを作ってやればよい。結果の取り出しはTask.await/2
で可能
Distributed tasks
- distributeされたtaskはsuperviseされたtaskとほとんど同じで、違いは
supervisor
にtask
を作成するときにnode名を渡しているだけである。lib/kv/supervisor.ex
を開いてTask Supervisor
を追加しよう。
supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),
- 保存したらKVアプリケーションのルートで
iex --sname foo -S mix
とiex --sname bar -S mix
をそれぞれ立ち上げる。 - 片方からもう片方にnode名を返すタスクを作って
async/await
でnode名が返ることを確認する。
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn -> ...> {:ok, node()} ...> end iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn -> ...> (bar@your_hostname)> {:ok, node()} ...> (bar@your_hostname)> end %Task{pid: #PID<12875.125.0>, ref: #Reference<0.0.7.9>} iex(bar@your_hostname)> Task.await(task) {:ok, :"foo@your_hostname"}
Routing layer
lib/kv/router.ex
を作ってroutingの機能を実装する。- 以降のコードの
your_hostname
のあたりはサンプルを動かす実行環境に合わせて修正
defmodule KV.Router do @doc """ Dispatch the given `mod`, `fun`, `args` request to the appropriate node based on the `bucket`. """ def route(bucket, mod, fun, args) do # Get the first byte of the binary first = :binary.first(bucket) # Try to find an entry in the table or raise entry = Enum.find(table, fn {enum, node} -> first in enum end) || no_entry_error(bucket) # If the entry node is the current node if elem(entry, 1) == node() do apply(mod, fun, args) else sup = {KV.RouterTasks, elem(entry, 1)} Task.Supervisor.async(sup, fn -> KV.Router.route(bucket, mod, fun, args) end) |> Task.await() end end defp no_entry_error(bucket) do raise "could not find entry for #{inspect bucket} in table #{inspect table}" end @doc """ The routing table. """ def table do # Replace computer-name with your local machine name. [{?a..?m, :"foo@your_hostname"}, {?n..?z, :"bar@your_hostname"}] end end
- テストも
test/kv/router_test.exs
を追加する。
defmodule KV.RouterTest do use ExUnit.Case, async: true # 対応するnodeがrouterから返ってくるか test "route requests across nodes" do assert KV.Router.route("hello", Kernel, :node, []) == :"foo@your_hostname" assert KV.Router.route("world", Kernel, :node, []) == :"bar@your_hostname" end test "raises on unknown entries" do assert_raise RuntimeError, ~r/could not find entry/, fn -> KV.Router.route(<<0>>, Kernel, :node, []) end end end
- 作成したらテストを走らせるために事前に
iex --sname bar -S mix
で bar node を作成し、その後にelixir --sname foo -S mix test
で foo node でテストが実行される。通ればOK。
Test filters and tags
- テストは通ったが
mix test
だけでは走ることができない複雑な構造になってしまった。 tag
を使おう。test/kv/router_test.exs
を開いてテストにtag
をつけてみる。
# @tag distributed: true と書いても同じ @tag :distributed test "route requests across nodes" do
- 次に
test/test_helper.exs
を開いて、Nodeが生きている場合のみdistributed tag
が付いているテストが実行されるようにする。
exclude = if Node.alive?, do: [], else: [distributed: true] ExUnit.start(exclude: exclude)
mix test
を走らせると、route requests across nodes
テストが対象から外れて実行されなくなったはずだ。tag
を使ったmix test
のオプションとして以下がある。mix test --include tag
: include オプションで指定されたtag
はテスト対象に含まれるmix test --exclude tag
: exclude オプションで指定されたtag
はテスト対象から除外されるmix test --only tag
: only オプションで指定されたtag
はそのtag
がついたテストのみ実行される
- 今回のケースで
distributed tag
がついたケースだけを実行させたい場合はelixir --sname foo -S mix test --only distributed
で実行される。もちろん、この場合はテストで使うnodeが動いていなければテストは通らない。
Application environment and configuration
Routing table
をKV..Router
モジュールにハードコードしたが、これはうまくない。developとproductionで異なるものを使いたいときもあるだろう。application environment
を使う。apps/kv/mix.exs
を開いてapplication/0
を次のように書き直す。
def application do [applications: [], env: [routing_table: []], mod: {KV, []}] end
- 新たに
:env
keyを追加した。env:
で設定されたkey-valueがapplication environment
のデフォルト値となる。KV.Router.table/0' を書き換えて
application environmentから
Routing table`の設定を読み込むようにする。
@doc """ The routing table. """ def table do Application.get_env(:kv, :routing_table) end
Application.get_env/2
で指定したapplication environment
の値を取ることができる。次に実際に値を設定する。設定する場所は決まっていてconfig/config.exs
に書く。routing_table
の設定を書こう。
# Replace computer-name with your local machine nodes. config :kv, :routing_table, [{?a..?m, :"foo@your_hostname"}, {?n..?z, :"bar@your_hostname"}]
config/config.exs
はアプリケーションごとに持っていて共有されない。共有したい場合は他のアプリケーションのconfig
をimport
することができる。例えばkv_umbrella
のconfig.exs
と共有したい場合は次のようにimportする。
import_config "../apps/kv/config/config.exs"
- 実際にはumbrellaオプションをつけて作成したアプリケーションは自動的に
apps
配下のすべてのconfigをimportするようになっているのでこのコードを書く必要はない。
さいごに
- 課題っぽいのが与えられているので解いてみる
- 内部のハードコードされているポートを
application environment
から取るようにする。 kv_server
がローカルのKV.Registry
からbucketを作っているところをrouting
を使うようにする。テストも直す。
- 内部のハードコードされているポートを
- 最後はやっつけ仕事になってしまった。。。
とりあえず、今までのコード
elixir_training/kv_umbrella at master · rei-m/elixir_training · GitHub
Elixir 入門 ~ MIX AND OTP ~ 実際にOTPを使ってプロセス管理をする
- ElixirのGetting StartもMix/OTPが終わり、次に進むのですが、復習がてら前にElixirで書いたコードを
Mix and OTP
の章でやったことを使って改善してみます。 - 元ネタはQiitaの自分の投稿 ElixirでSlackのbotを作ってHerokuで動かしてみる - Qiita
やりたいこと
- とにかくElixirでなんか作ろうと思って書いたのが上の記事ですが、書いていた時に微妙だなーと思ってた点が2つあります。
改善してみる
- コードはこちらにあがっています。
アプリケーションの起動まわり
mix.exs
を以下のように編集してapplication起動時のモジュールを指定して、Application
ビヘイビアを実装したMagicBot
モジュールを作成してそこに起動用のコードを書けば解決。
def application do [applications: [:logger, :slack], mod: {MagicBot, []}] end
- これで
mix run --no-halt
でアプリケーションが起動するようになりました。
Supervision Tree を使ってプロセスを分離する
今、プロセス1本で動いてるところをこういう
Supervision Tree
に変える。アプリケーションの
supervisor
Slack
のソケットを張り続けるプロセス- Botに実行させたいコマンドを管理する
supervisor
- 各コマンド。命令を受け取った時点で
supevisor
経由で動的に作成される。
- 各コマンド。命令を受け取った時点で
- Slackからメッセージを受け取ったらパースして
supervisor
経由でやりたいことを実行させるという感じですね。
作ってみた
- まずはアプリケーションの起点となるモジュールで全体を管理するSupervisorを起動
lib/magic_bot.ex
defmodule MagicBot do use Application def start(_type, _args) do MagicBot.Supervisor.start_link end end
- 次にBotのsupervisor
lib/magic_bot/supervisor.ex
defmodule MagicBot.Supervisor do use Supervisor def start_link(opts \\ []) do Supervisor.start_link(__MODULE__, :ok, opts) end # Define alias of process name @bot_name MagicBot.Bot @action_sup_name BotAction.Supervisor def init(:ok) do # Get API Token of Slack. api_key = case System.get_env("MAGICBOT_API_KEY") do nil -> Application.get_env(:MagicBot, :api_key) s -> s end # Make child process children = [ supervisor(BotAction.Supervisor, [[name: @action_sup_name]]), worker(MagicBot.Bot, [api_key, [name: @bot_name, sup_action: @action_sup_name]]) ] # 戦略は `one_for_one`でSlackとアクションのsupervisorを起動 supervise(children, strategy: :one_for_one) end end
- MagicBot.Supervisorに管理されるSlackのコネクションを張り続けるモジュール
defmodule MagicBot.Bot do use Slack def handle_connect(slack, state) do # Slackとの接続成功時に呼ばれるコールバック IO.puts "Connected as #{slack.me.name}" {:ok, state} end def handle_message(message = %{type: "message", text: _}, slack, state) do # Slackからメッセージを受け取った時に呼ばれるコールバック trigger = String.split(message.text, ~r{ | }) case String.starts_with?(message.text, "<@#{slack.me.id}>: ") do # @bot名 ~ できたら :respond を渡してactionのプロセスを開始 true -> BotAction.Supervisor.start_action(state[:sup_action], :respond, Enum.fetch!(trigger, 1), message, slack) # それ以外は :hear を渡して actionのプロセスを開始 false -> BotAction.Supervisor.start_action(state[:sup_action], :hear, hd(trigger), message, slack) end {:ok, state} end def handle_message(_message, _slack, state) do {:ok, state} end end
- Botの行動を管理するSupervisor。
lib/bot_action/supervisor.ex
defmodule BotAction.Supervisor do def start_link(opts \\ []) do Task.Supervisor.start_link(opts) end def start_action(supervisor, command, trigger, message, slack) do # Slackのプロセスから呼ばれ、このSupervisor配下に新しいプロセスを作成する。 # Task.Supervisorの子プロセスの戦略は `simple_one_for_one` になり、動的に追加できる。 # クラッシュ時は再起動されない Task.Supervisor.start_child(supervisor, fn -> case command do :respond -> BotAction.Action.respond(trigger, message, slack) :hear -> BotAction.Action.hear(trigger, message, slack) end end) end end
lib/bot_action/action.ex
defmodule BotAction.Action do def hear("lgtm", message, slack) do HTTPoison.start case URI.encode("http://www.lgtm.in/g") |> HTTPoison.get do {:ok, %HTTPoison.Response{status_code: 200, body: body}} -> body |> Floki.find("#imageUrl") |> Floki.attribute("value") |> hd |> send_message(message, slack) {_, _} -> nil end end def hear(_, _, _) do end def respond("エリクサーほしい?", message, slack) do send_message("エリクサーちょうだい!\nhttp://img.yaplog.jp/img/01/pc/2/5/2/25253/1/1354.jpg", message, slack) end def respond(_, _, _) do end defp send_message(text, message, slack) do Slack.send_message(text, message.channel, slack) end end
- これで完成。
意図した通りに動くか確認
- lib/bot_action/action.exに以下の関数を追加してみる。
def hear("test1", message, slack) do # 単純にメッセージを返す send_message("test1", message, slack) end def hear("test2", message, slack) do # 5秒待ってからメッセージを返す :timer.sleep(5000) send_message("test2", message, slack) end def hear("test3", message, slack) do # 意図的に例外を起こす raise "oops" send_message("test3", message, slack) end
- botを起動してSlackに"test2"を流したあとに"test1"を流す。
test1はtest2のsleepに待たされることなく、即座にメッセージが返ってきたので、意図した通りに動いていてますね。
次に"test2"を流したあとに"test3"を流して"test3"のプロセスをクラッシュさせてみます。
test2はtest3のクラッシュの影響を受けずにメッセージを返していますね。test3のプロセスはメッセージを送る前に
raise
しているのでメッセージは返ってきません。という感じで
OTP
を使ってプロセス管理するとなかなかElixirっぽいコードになったかなという印象ですね。
Elixir 入門 ~ MIX AND OTP ~ その8 - Docs, tests and pipelines
- TCP経由で受け取ったメッセージをパースしてKVアプリケーションに送る処理を実装する。
Doctests
doctest
を使ってパースする関数を実装する。これはドキュメントからテストを作成することができ、ドキュメント内の正確なサンプルコードの提供を助けてくれる。lib/kv_server/command.ex
を新しく作ってみよう。
defmodule KVServer.Command do @doc ~S""" Parses the given `line` into a command. ## Examples iex> KVServer.Command.parse "CREATE shopping\r\n" {:ok, {:create, "shopping"}} """ def parse(line) do :not_implemented end end
doctest
はiex>
のラインから始まる、スペース4文字分インデントが下がっているブロックのところで、コマンドと、その次の行にコマンドが返すであろう値を書くdoctest
を走らせてみよう。新しくテスト用のtest/kv_server/command_test.exs
を作成してmix test
する。
defmodule KVServer.CommandTest do use ExUnit.Case, async: true doctest KVServer.Command end
- 実行すると以下のログが出て、
doctest
が実行されていることがわかる。
test/kv_server/command_test.exs:3 Doctest failed code: KVServer.Command.parse "CREATE shopping\r\n" === {:ok, {:create, "shopping"}} lhs: :not_implemented stacktrace: lib/kv_server/command.ex:11: KVServer.Command (module)
- では
parse/1
を実装してテストが通るようにしよう。stlingを受け取ってパターンマッチで"CREATE"だったら返すようにする。直した上でテストを走らせる。今度は通るはず。
def parse(line) do case String.split(line) do ["CREATE", bucket] -> {:ok, {:create, bucket}} end end
- 他のテストも追加していこう。
iex> KVServer.Command.parse "CREATE shopping\r\n" {:ok, {:create, "shopping"}} iex> KVServer.Command.parse "CREATE shopping \r\n" {:ok, {:create, "shopping"}} iex> KVServer.Command.parse "PUT shopping milk 1\r\n" {:ok, {:put, "shopping", "milk", "1"}} iex> KVServer.Command.parse "GET shopping milk\r\n" {:ok, {:get, "shopping", "milk"}} iex> KVServer.Command.parse "DELETE shopping eggs\r\n" {:ok, {:delete, "shopping", "eggs"}} Unknown commands or commands with the wrong number of arguments return an error: iex> KVServer.Command.parse "UNKNOWN shopping eggs\r\n" {:error, :unknown_command} iex> KVServer.Command.parse "GET shopping\r\n" {:error, :unknown_command}
- そして実装も追加。
def parse(line) do case String.split(line) do ["CREATE", bucket] -> {:ok, {:create, bucket}} ["GET", bucket, key] -> {:ok, {:get, bucket, key}} ["PUT", bucket, key, value] -> {:ok, {:put, bucket, key, value}} ["DELETE", bucket, key] -> {:ok, {:delete, bucket, key}} _ -> {:error, :unknown_command} end end
Pipelines
kv_server.ex
に作成したコマンドを組み込んでいこう。その前にcommand.ex
に以下の関数を追加する。
@doc """ Runs the given command. """ def run(command) do {:ok, "OK\r\n"} end
- そして
kv_server.ex
でクライアントからメッセージを受け取った際にコマンドを実行するようにする。
defp serve(socket) do # 受け取ったメッセージを元にコマンドを実行する msg = case read_line(socket) do {:ok, data} -> case KVServer.Command.parse(data) do {:ok, command} -> KVServer.Command.run(command) {:error, _} = err -> err end {:error, _} = err -> err end # コマンドの結果を出力 write_line(socket, msg) serve(socket) end defp read_line(socket) do :gen_tcp.recv(socket, 0) end defp write_line(socket, msg) do :gen_tcp.send(socket, format_msg(msg)) end defp format_msg({:ok, text}), do: text defp format_msg({:error, :unknown_command}), do: "UNKNOWN COMMAND\r\n" defp format_msg({:error, _}), do: "ERROR\r\n"
- ここまでできたら再び
mix run --no-halt
でサーバーを起動してtelnetで接続しよう。CREATEコマンドを打ったらOKが、未定義のコマンドを打ったらUNKNOWN COMMANDが返ってくるはず
CREATE shopping OK HELLO UNKNOWN COMMAND
- ここで、ふたたびserveを見るとネストが深く見辛いコードになっているように感じる。できれば
|>
でつなげて書きたいが、コマンドが返す値は:ok
を含んだtupleかerror
を含んだtupleが返る可能性があるので返り値をうまくさばけない。{:ok, _}
が返ってくる限り、値をパイプで私続けるような機能が欲しい。 elixir-pipes
というモジュールがまさにあてはまる。mix.exs
を開いて追加しよう。
def application do [applications: [:logger, :pipe, :kv], mod: {KVServer, []}] end defp deps do [{:kv, in_umbrella: true}, {:pipe, github: "batate/elixir-pipes"}] end
- 保存したら
mix deps.get
して依存モジュールを取得しよう。そしてこれを使うことでserve/1
は以下のように書き直すことができる。
defp serve(socket) do import Pipe # pipe_matching/3は x, {:ok, x} で {:ok, value} が与えられている間はvalueを次の関数に渡し続けるように命令している # マッチしない場合はマッチしなかった値を返す。 msg = pipe_matching x, {:ok, x}, read_line(socket) |> KVServer.Command.parse() |> KVServer.Command.run() write_line(socket, msg) serve(socket) end
Running commands
- 最後に
KVServer.Command.run/1
を実装してKVアプリケーションに命令を渡すようにしよう。
@doc """ Runs the given command. """ def run(command) def run({:create, bucket}) do KV.Registry.create(KV.Registry, bucket) {:ok, "OK\r\n"} end def run({:get, bucket, key}) do lookup bucket, fn pid -> value = KV.Bucket.get(pid, key) {:ok, "#{value}\r\nOK\r\n"} end end def run({:put, bucket, key, value}) do lookup bucket, fn pid -> KV.Bucket.put(pid, key, value) {:ok, "OK\r\n"} end end def run({:delete, bucket, key}) do lookup bucket, fn pid -> KV.Bucket.delete(pid, key) {:ok, "OK\r\n"} end end defp lookup(bucket, callback) do case KV.Registry.lookup(KV.Registry, bucket) do {:ok, pid} -> callback.(pid) :error -> {:error, :not_found} end end
- 受け取った命令に応じてbucketを操作するように直した。
lookup
を見てみよう。bucketが見つからなかったら{:error, :not_found}
を返すようにしているが、ユーザーに表示する場合もNot Foundであることを伝えるようにしたほうがよい。KV.Server
のformat_msg/1
のパターンを追加する。
defp format_msg({:error, :not_found}), do: "NOT FOUND\r\n"
- これでサーバーはほとんど完成した。最後にテストを追加しよう。
defmodule KVServerTest do use ExUnit.Case setup do Logger.remove_backend(:console) # KVアプリケーションを止めてstateをリセットした上で再開 Application.stop(:kv) :ok = Application.start(:kv) Logger.add_backend(:console, flush: true) :ok end setup do # クライアントからの接続を作成 opts = [:binary, packet: :line, active: false] {:ok, socket} = :gen_tcp.connect('localhost', 4040, opts) {:ok, socket: socket} end test "server interaction", %{socket: socket} do assert send_and_recv(socket, "UNKNOWN shopping\r\n") == "UNKNOWN COMMAND\r\n" assert send_and_recv(socket, "GET shopping eggs\r\n") == "NOT FOUND\r\n" assert send_and_recv(socket, "CREATE shopping\r\n") == "OK\r\n" assert send_and_recv(socket, "PUT shopping eggs 3\r\n") == "OK\r\n" # GET returns two lines assert send_and_recv(socket, "GET shopping eggs\r\n") == "3\r\n" assert send_and_recv(socket, "") == "OK\r\n" assert send_and_recv(socket, "DELETE shopping eggs\r\n") == "OK\r\n" # GET returns two lines assert send_and_recv(socket, "GET shopping eggs\r\n") == "\r\n" assert send_and_recv(socket, "") == "OK\r\n" end defp send_and_recv(socket, command) do :ok = :gen_tcp.send(socket, command) {:ok, data} = :gen_tcp.recv(socket, 0, 1000) data end end
mix test
を走らせて通れば完成だ。
Elixir 入門 ~ MIX AND OTP ~ その7 - Task and gen-tcp
- Erlangの
:gen_tcp
モジュールについて学ぶ。
Echo server
- まずはEcho Serverを作成することから始める。TCP Serverは以下のStepを実行する
- 利用可能なPortを開いてListenする
- そのPortでクライアントからの接続を待ち、受け入れる
- クライアントからの要求を解析し、レスポンスを返す。
- KVServerアプリケーションにこれらの機能を組み込んでみる。
lib/kv_server.ex
を開いて以下のように編集する。
# 指定されたportでListenを開始する def accept(port) do # The options below mean: # # 1. `:binary` - receives data as binaries (instead of lists) # 2. `packet: :line` - receives data line by line # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes # {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true]) IO.puts "Accepting connections on port #{port}" loop_acceptor(socket) end # socketを受け続けるループ。 defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) serve(client) loop_acceptor(socket) end # socketを受け取っったときの処理 defp serve(socket) do socket |> read_line() |> write_line(socket) serve(socket) end # socketを読み込む defp read_line(socket) do {:ok, data} = :gen_tcp.recv(socket, 0) data end # TCPを通してResponseを返す defp write_line(line, socket) do :gen_tcp.send(socket, line) end
iex -S mix
からREPLの中でListenを開始してみる
iex> KVServer.accept(4040) Accepting connections on port 4040
- 開始されたらTerminalからtelnetでローカルホストの4040ポートに接続して、適当な文字を送ってみる
$ telnet 127.0.0.1 4040 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world !! hello world !!
- 送った
hello world !!
がそのまま返されていることがわかる。 - telnetの接続を終了させると以下のようなエラーがサーバー側に表示される。
iex(1)> KVServer.accept(4040) Accepting connections on port 4040 ** (MatchError) no match of right hand side value: {:error, :closed} (kv_server) lib/kv_server.ex:50: KVServer.read_line/1 (kv_server) lib/kv_server.ex:43: KVServer.serve/1 (kv_server) lib/kv_server.ex:37: KVServer.loop_acceptor/1
- これは
gen_tcp.recv/2
でsocketを受け取ることを期待しているのに接続が閉じられたためで、その場合のハンドルが足りていない。そして、例外が発生したと同時にサーバーが終了して再起動しないことがわかる。これらを解決するためにはsupervision tree
の元でプロセスを動かす必要があることに気づくだろう。
Tasks
- Taskモジュールを使ってKVServerを
supervision tree
の監督下の元で動かす。再びlib/kv_server.ex
を編集する。
def start(_type, _args) do import Supervisor.Spec children = [ worker(Task, [KVServer, :accept, [4040]]) ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
- これでサーバーは
supervision tree
の一部となったのでmix run --no-halt
でアプリケーションを動かしてみよう。--no-halt
はアプリケーションのプロセスを止めないで起動するオプションである。 - サーバーが立ち上がるはずなので同じくtelnetでローカルホストの4040ポートに接続してメッセージを送ってみる。問題なくメッセージは帰ってくるはずだ。
- さきほどと同じようにtelnetを終了してみる。そうすると今度はエラーメッセージは表示されるが、サーバー自体は終了しない。これは
one_for_one
戦略に従いsupervisor
が終了したサーバーのプロセスを再起動しているため。 - では、別のターミナルで2個目のコネクションを張って同じくメッセージを送ってみる。すると接続はできるがレスポンスは返ってこない。今の作りでは接続中のクライアントがすでに存在していると新しいクライアントは受け入れられていないことがわかる。
Task supervisor
- 複数のコネクションを処理するためにはコネクションを受け入れるプロセスとコネクションを処理するプロセスを分離する必要がある。
- 再び
start/2
を編集する。
def start(_type, _args) do import Supervisor.Spec # TaskのSupervisorをsupervision treeに追加 children = [ supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]), worker(Task, [KVServer, :accept, [4040]]) ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
TaskSupervisor
をsupervision tree
の下に追加した。次にコネクションを処理するプロセスをTaskSupervisor
の下に作るようにloop_acceptor/1
を変更する。
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end) :ok = :gen_tcp.controlling_process(client, pid) loop_acceptor(socket) end
Elixir 入門 ~ MIX AND OTP ~ その6 - Dependencies and umbrella projects
- KVアプリケーションは完成したので次にそれを実装するTCPサーバーを作成する。その前に
Mix
のDependency
について理解する。
External dependencies
- 外部依存はビジネスドメインに縛られない。例えば今のKVアプリケーションのためにHTTP APIが必要であれば外部依存するパッケージとして
Plug
を使う。 - パッケージは
Hex Package Manager
に登録されている。依存関係はmix.exs
のdeps
関数に定義する。
def deps do # 0.5.x で最新のバージョンを取得 [{:plug, "~> 0.5.0"}] end
- Hexで管理されているパッケージは安定版だが、開発中のものを使いたいときはgit経由でも書くことができる。
def deps do [{:plug, git: "git://github.com/elixir-lang/plug.git"}] end
- 依存パッケージを追加してコンパイルすると
Mix
はmix.lock
を生成する。これは依存関係にあるパッケージのどのバージョンを使ったかが記録されている。
Internal dependencies
- 内部依存はプロジェクト特有のもので、プロジェクト外では意味をなさずプライベートにしておきたいもので、内部依存のアプリケーションを使いたい場合は、git経由か、
umbrella projects
という方法がある。umbrella projects
を使うと一つのプロジェクトの中に複数のアプリケーションをホストでき、単一のリポジトリで管理できるので通常はこちらを使う方が管理しやすい。これまでに作ったKVアプリケーションを含んだプロジェクトkv_umbrella
を作成する。
Umbrella projects
- 複数のアプリケーションを管理する
kv_umbrella
を新しく作成する。mix new
のときに--umbrella
オプションを付与する。
mix new kv_umbrella --umbrella
- 実行するとコンソールに表示される内容が
--umbrella
なしでmix new
したときと変わっているはずだ。mix.exs
のコードもまた変わっている。--umbrella
オプションを指定したことでこのアプリケーションがumbrella
として動作するようになった。 - 次に
kv_umbrella/apps/
に移動してkv_server
アプリケーションを作成する。こちらは作成の際に--sup
オプションをつける。これはMix
が自動でsupervision tree
を生成してくれるようになる。
mix new kv_server --module KVServer --sup
- KVServerの
mix.exs
を見てみよう。
defmodule KVServer.Mixfile do use Mix.Project def project do [app: :kv_server, version: "0.0.1", deps_path: "../../deps", lockfile: "../../mix.lock", elixir: "~> 1.0", deps: deps] end def application do [applications: [:logger], mod: {KVServer, []}] end defp deps do [] end end
deps_path
とlockfile
が追加されているが、これはappsの下でmix new
をしたことでMix
が自動で全体の構造を解釈して追加している。このアプリケーションが外部依存するパッケージはkv_umbrella
と共有されることになった。- また、
application/0
の中も変わって、mod: {KVServer, []}
が追加されている。これは--sup
フラグを追加したことによるもので、KVServer
が全体のアプリケーションのsupervision tree
を開始することを意味する。 lib/kv_server.ex
の中を見ると前の章で作成したsupervisor
の実装が自動で組み込まれていることがわかる。あとはこれに監督されるアプリケーションを追加していく。
In umbrella dependencies
apps/kv_server/mix.exs
を開いてkv
を追加する。
defp deps do # kvがkv_serverの中で使えるようになる [{:kv, in_umbrella: true}] end
deps
は依存関係を解決するだけなのでKVアプリケーションが開始されるわけではない。applicaiton/0
に追加してKVアプリケーションが開始されるようにする。
def application do # kv_serverが開始されれば`kv`も開始されるようになる [applications: [:logger, :kv], mod: {KVServer, []}] end
- 次に
apps
の下に今まで作成したKVアプリケーションを移動する。KVのmix.exs
もKVServerと同様に以下のコードを追加しておく。
deps_path: "../../deps", lockfile: "../../mix.lock",
- ここまで出来たら
kv_umbrella
のrootに移動してmix test
を実行してみる。2つのアプリケーションのテストが走るはず。 - このように
Umbrella Project
はアプリケーションを管理するのに便利である。管理されるアプリケーションは切り離されていて個別の設定が定義できる。個別に開発することもできるし、リストを明示的にして一緒に使うこともできる。
Elixir 入門 ~ MIX AND OTP ~ その5 - ETS
- キャッシュ機構である
Erlang Term Storage
について学ぶ。
ETS as a cache
- ETSは
ets
モジュールを経由してメモリ上のETSテーブルにデータを保存できる仕組み
iex(1)> table = :ets.new(:buckets_registry, [:set, :protected]) 8211 iex(2)> :ets.insert(table, {"foo", self}) true iex(3)> :ets.lookup(table, "foo") [{"foo", #PID<0.59.0>}]
- ETS tableを作成する際は、テーブル名とオプションの2つの引数を与える。上記の例では
:buckets_registry
というテーブルにテーブルの型とアクセスルールを与えている。:set
はキーの重複を許さないという意味、:protected
はテーブルを作成したプロセスのみが書き込める、ただし読み込みはすべてのプロセスから可能、という意味である。 [:set, :protected]
はデフォルト値なので指定しなかった場合は自動で設定される。lib/kv/registry.ex
を開いてETSを組み込んでみよう
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry. """ def start_link(table, event_manager, buckets, opts \\ []) do # 1. ETSのテーブルのプロセスをstart_linkに追加 GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts) end @doc """ Looks up the bucket pid for `name` stored in `table`. Returns `{:ok, pid}` if a bucket exists, `:error` otherwise. """ def lookup(table, name) do # 2. tableにbucketが存在していたら:okを含んだtupleをなければ:errorを返す case :ets.lookup(table, name) do [{^name, bucket}] -> {:ok, bucket} [] -> :error end end @doc """ Ensures there is a bucket associated with the given `name` in `server`. """ def create(server, name) do GenServer.cast(server, {:create, name}) end ## Server callbacks def init({table, events, buckets}) do # 3. HashDictからETS tableに変更 ets = :ets.new(table, [:named_table, read_concurrency: true]) refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end # 4. handle_call Callbackは消しておく def handle_cast({:create, name}, state) do # 5. HashDictの代わりにETS Tableに対して Read/writeする case lookup(state.names, name) do {:ok, _pid} -> {:noreply, state} :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) {:noreply, %{state | refs: refs}} end end def handle_info({:DOWN, ref, :process, pid, _reason}, state) do # 6. BucketのプロセスがダウンしたらETS Tableから消す {name, refs} = HashDict.pop(state.refs, ref) :ets.delete(state.names, name) GenEvent.sync_notify(state.events, {:exit, name, pid}) {:noreply, %{state | refs: refs}} end def handle_info(_msg, state) do {:noreply, state} end end
- この修正でテストも壊れるので直しておく
setup do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) {:ok, registry: registry, ets: :registry_table} end
- 各テストの引数を
%{registry: registry, ets: ets}
とし、setupで追加したテーブル名を受け取れるようにしておく。 - データを取り出す先が
ETS
になったので、KV.Registry.lookup(registry, ...)
をKV.Registry.lookup(ets, ...)
に変更。 - まだ失敗する。。。
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
ここで落ちる。- lookupをよく見るとcastでコールバックを呼んでいる。castは非同期なのでcreateの完了をまたずして次に進んで取り出そうとしている。
- 再び
lib/kv/registry.ex
を修正する。
def create(server, name) do # cast から call に GenServer.call(server, {:create, name}) end # handle_callに。callになったので _fromを追加。使わないけど def handle_call({:create, name}, _from, state) do case lookup(state.names, name) do {:ok, pid} -> # returnが必要なのでbucketのpidを返す。 {:reply, pid, state} # Reply with pid :error -> {:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets) ref = Process.monitor(pid) refs = HashDict.put(state.refs, ref, name) :ets.insert(state.names, {name, pid}) GenEvent.sync_notify(state.events, {:create, name, pid}) # returnが必要なのでbucket のpidを返す。 {:reply, pid, %{state | refs: refs}} # Reply with pid end end
- これで
mix test
を実行すると、、、期待していたところは通るが、"removes buckets on exit" のケースが通らなくなる。 - 上と同じ理屈でプロセスを止めても、ETSから削除するのを待たずにassertをかけているので、無いはずのものがあるという状態になる。
- さきほどと違い、プロセスの停止のイベントを
handle_info
で拾って削除しているので同期的に待つことができない。代わりに削除するときにEvent Manager
の通知を送っているのでそれを利用してテストを修正する。
test "removes buckets on exit", %{registry: registry, ets: ets} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(ets, "shopping") Agent.stop(bucket) assert_receive {:exit, "shopping", ^bucket} # event managerから :exitを受け取るまで待つ。 assert KV.Registry.lookup(ets, "shopping") == :error end
- これでようやくテストが通るようになる。これでアプリケーションに登録する準備ができたので
liv/kv/supervisor.ex
を編集してETS
のプロセスを追加する。
@manager_name KV.EventManager @registry_name KV.Registry @ets_registry_name KV.Registry @bucket_sup_name KV.Bucket.Supervisor def init(:ok) do children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [@ets_registry_name, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
ETS as persistent storage
- これまでに
ETS
のプロセスは明示的に開始したが、閉じるときは特に何もしていない。ETS
のプロセスは開始したプロセスにリンクしているので、上記のコードで言えば、registry
のプロセスが終了すれば自動的にETS
のプロセスも終了する。 では、
ETS
をregistry
のプロセスから切り離したらどうなるか。bucket
の情報がETS
に保存されているのであればregistry
が死んだとしてもETS
からbucket
のプロセスをregistry
のsupervisor
に復帰させることができる。liv/kv/supervisor.ex
のinitを以下のように編集する。
def init(:ok) do # init時にetsを開始する。 ets = :ets.new(@ets_registry_name, [:set, :public, :named_table, {:read_concurrency, true}]) # テーブル名ではなくetsのプロセスを渡す。 children = [ worker(GenEvent, [[name: @manager_name]]), supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]), worker(KV.Registry, [ets, @manager_name, @bucket_sup_name, [name: @registry_name]]) ] supervise(children, strategy: :one_for_one) end
- 合わせて
lib/kv/registry.ex
のinitも編集する。今まではここでetsを開始していたがすでに開始済みのプロセスを受け取る。
def init({ets, events, buckets}) do refs = HashDict.new {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end
- 最後にテストを直す。setup時に
ETS
を開始して開始済みのプロセスをregistry
に渡してやるようにする。ここまででmix test
をして通ることを確認する。
setup do ets = :ets.new(:registry_table, [:set, :public]) registry = start_registry(ets) {:ok, registry: registry, ets: ets} end defp start_registry(ets) do {:ok, sup} = KV.Bucket.Supervisor.start_link {:ok, manager} = GenEvent.start_link {:ok, registry} = KV.Registry.start_link(ets, manager, sup) GenEvent.add_mon_handler(manager, Forwarder, self()) registry end
- では最後のテストケースを追加する。
ETS
のプロセスをregistry
から分離したことでregistry
が死んでもETS
は死ななくなった。これを検証するテストケースを追加する。
test "monitors existing entries", %{registry: registry, ets: ets} do # とりあえずbucketを作成 bucket = KV.Registry.create(registry, "shopping") # registryのプロセスを終了する Process.unlink(registry) Process.exit(registry, :shutdown) # 生き残っているetsを使ってregisryを起動しなおす start_registry(ets) # 最初に作成したbucketが取れることを確認 assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket} # bucketのプロセスを終了 Process.exit(bucket, :shutdown) # ETSからの削除されていることを確認 assert_receive {:exit, "shopping", ^bucket} assert KV.Registry.lookup(ets, "shopping") == :error end
- これを通すための修正を
lib/kv/registry.ex
に入れる。
def init({ets, events, buckets}) do # initでetsに保存されているbucketをrefsに詰め直す。空であれば単にHashDictを返す refs = :ets.foldl(fn {name, pid}, acc -> HashDict.put(acc, Process.monitor(pid), name) end, HashDict.new, ets) {:ok, %{names: ets, refs: refs, events: events, buckets: buckets}} end