Elixir 入門 ~ MIX AND OTP ~ その9 - Distributed tasks and configuration
- 最後に
routingの機能をKVアプリケーションに追加する。routing tableはこのようになる。
[{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
Our first distributed code
- VMに名前をつけて開始する。
iex --sname fooでREPLを起動する。 - すると以下のように
node名@computer-nameがpromptに表示される
Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help) iex(foo@your_hostname)>
- この中でモジュールを定義する。
iex(foo@your_hostname)> defmodule Hello do ...> def world, do: IO.puts "hello world" ...> end iex(foo@your_hostname)> Hello.world hello world :ok
- 次にfooを生かしたまま、別のnode名で新しくREPLを立ち上げて、先ほどの
Hello.worldを呼ぶ
iex --sname bar
iex(bar@your_hostname)> Hello.world
** (UndefinedFunctionError) undefined function: Hello.world/0 (module Hello is not available)
Hello.world()
- 当然、barには定義されていないので返ってこない。ここで
Node.spawn_linkを使ってfooの新しいプロセスを作ることができる。
iex(bar@your_hostname)> Node.spawn_link :"foo@ your_hostname", fn -> Hello.world end hello world #PID<8897.78.0>
- hello world` が 表示された。他のnodeのプロセスを生んでそのpidを返したことがわかる。次にそのpidを使ってメッセージの送受信をする。
iex(bar@your_hostname)> pid = Node.spawn_link :"foo@your_hostname", fn ->
...> receive do
...> {:ping, client} -> send client, :pong
...> end
...> end
#PID<8897.86.0>
iex(bar@your_hostname)> send pid, {:ping, self}
{:ping, #PID<0.65.0>}
iex(bar@your_hostname)> flush
:pong
:ok
- foo nodeに :ping を受け取って :pong を返す関数を定義したプロセスを作った。そしてbarから fooに :ping を送って :pong が帰ってきたことがわかる。ただ、このやり方は
supervison treeの管理からは外れるので避けたい。他の方法も探してみよう。
async/await
- Elixirは
async/awaitパターンも提供している。asyncで先に計算してawaitであとで結果を取り出すというようなことが可能。
iex> task = Task.async(fn ->
...> :timer.sleep(5000)
...> "hogehoge"
...> end)
%Task{pid: #PID<0.82.0>, ref: #Reference<0.0.2.103>}
iex> Task.await(task)
"hogehoge"
- Task.supervisorの下で使うことも可能。その場合は
Task.Supervisor.start_child/2の代わりにTask.Supervisor.async/2でプロセスを作ってやればよい。結果の取り出しはTask.await/2で可能
Distributed tasks
- distributeされたtaskはsuperviseされたtaskとほとんど同じで、違いは
supervisorにtaskを作成するときにnode名を渡しているだけである。lib/kv/supervisor.exを開いてTask Supervisorを追加しよう。
supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),
- 保存したらKVアプリケーションのルートで
iex --sname foo -S mixとiex --sname bar -S mixをそれぞれ立ち上げる。 - 片方からもう片方にnode名を返すタスクを作って
async/awaitでnode名が返ることを確認する。
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn ->
...> {:ok, node()}
...> end
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn ->
...> (bar@your_hostname)> {:ok, node()}
...> (bar@your_hostname)> end
%Task{pid: #PID<12875.125.0>, ref: #Reference<0.0.7.9>}
iex(bar@your_hostname)> Task.await(task)
{:ok, :"foo@your_hostname"}
Routing layer
lib/kv/router.exを作ってroutingの機能を実装する。- 以降のコードの
your_hostnameのあたりはサンプルを動かす実行環境に合わせて修正
defmodule KV.Router do @doc """ Dispatch the given `mod`, `fun`, `args` request to the appropriate node based on the `bucket`. """ def route(bucket, mod, fun, args) do # Get the first byte of the binary first = :binary.first(bucket) # Try to find an entry in the table or raise entry = Enum.find(table, fn {enum, node} -> first in enum end) || no_entry_error(bucket) # If the entry node is the current node if elem(entry, 1) == node() do apply(mod, fun, args) else sup = {KV.RouterTasks, elem(entry, 1)} Task.Supervisor.async(sup, fn -> KV.Router.route(bucket, mod, fun, args) end) |> Task.await() end end defp no_entry_error(bucket) do raise "could not find entry for #{inspect bucket} in table #{inspect table}" end @doc """ The routing table. """ def table do # Replace computer-name with your local machine name. [{?a..?m, :"foo@your_hostname"}, {?n..?z, :"bar@your_hostname"}] end end
- テストも
test/kv/router_test.exsを追加する。
defmodule KV.RouterTest do
use ExUnit.Case, async: true
# 対応するnodeがrouterから返ってくるか
test "route requests across nodes" do
assert KV.Router.route("hello", Kernel, :node, []) ==
:"foo@your_hostname"
assert KV.Router.route("world", Kernel, :node, []) ==
:"bar@your_hostname"
end
test "raises on unknown entries" do
assert_raise RuntimeError, ~r/could not find entry/, fn ->
KV.Router.route(<<0>>, Kernel, :node, [])
end
end
end
- 作成したらテストを走らせるために事前に
iex --sname bar -S mixで bar node を作成し、その後にelixir --sname foo -S mix testで foo node でテストが実行される。通ればOK。
Test filters and tags
- テストは通ったが
mix testだけでは走ることができない複雑な構造になってしまった。 tagを使おう。test/kv/router_test.exsを開いてテストにtagをつけてみる。
# @tag distributed: true と書いても同じ @tag :distributed test "route requests across nodes" do
- 次に
test/test_helper.exsを開いて、Nodeが生きている場合のみdistributed tagが付いているテストが実行されるようにする。
exclude = if Node.alive?, do: [], else: [distributed: true] ExUnit.start(exclude: exclude)
mix testを走らせると、route requests across nodesテストが対象から外れて実行されなくなったはずだ。tagを使ったmix testのオプションとして以下がある。mix test --include tag: include オプションで指定されたtagはテスト対象に含まれるmix test --exclude tag: exclude オプションで指定されたtagはテスト対象から除外されるmix test --only tag: only オプションで指定されたtagはそのtagがついたテストのみ実行される
- 今回のケースで
distributed tagがついたケースだけを実行させたい場合はelixir --sname foo -S mix test --only distributedで実行される。もちろん、この場合はテストで使うnodeが動いていなければテストは通らない。
Application environment and configuration
Routing tableをKV..Routerモジュールにハードコードしたが、これはうまくない。developとproductionで異なるものを使いたいときもあるだろう。application environmentを使う。apps/kv/mix.exsを開いてapplication/0を次のように書き直す。
def application do
[applications: [],
env: [routing_table: []],
mod: {KV, []}]
end
- 新たに
:envkeyを追加した。env:で設定されたkey-valueがapplication environmentのデフォルト値となる。KV.Router.table/0' を書き換えてapplication environmentからRouting table`の設定を読み込むようにする。
@doc """ The routing table. """ def table do Application.get_env(:kv, :routing_table) end
Application.get_env/2で指定したapplication environmentの値を取ることができる。次に実際に値を設定する。設定する場所は決まっていてconfig/config.exsに書く。routing_tableの設定を書こう。
# Replace computer-name with your local machine nodes.
config :kv, :routing_table,
[{?a..?m, :"foo@your_hostname"},
{?n..?z, :"bar@your_hostname"}]
config/config.exsはアプリケーションごとに持っていて共有されない。共有したい場合は他のアプリケーションのconfigをimportすることができる。例えばkv_umbrellaのconfig.exsと共有したい場合は次のようにimportする。
import_config "../apps/kv/config/config.exs"
- 実際にはumbrellaオプションをつけて作成したアプリケーションは自動的に
apps配下のすべてのconfigをimportするようになっているのでこのコードを書く必要はない。
さいごに
- 課題っぽいのが与えられているので解いてみる
- 内部のハードコードされているポートを
application environmentから取るようにする。 kv_serverがローカルのKV.Registryからbucketを作っているところをroutingを使うようにする。テストも直す。
- 内部のハードコードされているポートを
- 最後はやっつけ仕事になってしまった。。。
とりあえず、今までのコード
elixir_training/kv_umbrella at master · rei-m/elixir_training · GitHub