- 最後に
routing
の機能をKVアプリケーションに追加する。routing table
はこのようになる。
[{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
Our first distributed code
- VMに名前をつけて開始する。
iex --sname foo
でREPLを起動する。 - すると以下のように
node名@computer-name
がpromptに表示される
Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help) iex(foo@your_hostname)>
- この中でモジュールを定義する。
iex(foo@your_hostname)> defmodule Hello do ...> def world, do: IO.puts "hello world" ...> end iex(foo@your_hostname)> Hello.world hello world :ok
- 次にfooを生かしたまま、別のnode名で新しくREPLを立ち上げて、先ほどの
Hello.world
を呼ぶ
iex --sname bar
iex(bar@your_hostname)> Hello.world ** (UndefinedFunctionError) undefined function: Hello.world/0 (module Hello is not available) Hello.world()
- 当然、barには定義されていないので返ってこない。ここで
Node.spawn_link
を使ってfoo
の新しいプロセスを作ることができる。
iex(bar@your_hostname)> Node.spawn_link :"foo@ your_hostname", fn -> Hello.world end hello world #PID<8897.78.0>
- hello world` が 表示された。他のnodeのプロセスを生んでそのpidを返したことがわかる。次にそのpidを使ってメッセージの送受信をする。
iex(bar@your_hostname)> pid = Node.spawn_link :"foo@your_hostname", fn -> ...> receive do ...> {:ping, client} -> send client, :pong ...> end ...> end #PID<8897.86.0> iex(bar@your_hostname)> send pid, {:ping, self} {:ping, #PID<0.65.0>} iex(bar@your_hostname)> flush :pong :ok
- foo nodeに :ping を受け取って :pong を返す関数を定義したプロセスを作った。そしてbarから fooに :ping を送って :pong が帰ってきたことがわかる。ただ、このやり方は
supervison tree
の管理からは外れるので避けたい。他の方法も探してみよう。
async/await
- Elixirは
async/await
パターンも提供している。asyncで先に計算してawaitであとで結果を取り出すというようなことが可能。
iex> task = Task.async(fn -> ...> :timer.sleep(5000) ...> "hogehoge" ...> end) %Task{pid: #PID<0.82.0>, ref: #Reference<0.0.2.103>} iex> Task.await(task) "hogehoge"
- Task.supervisorの下で使うことも可能。その場合は
Task.Supervisor.start_child/2
の代わりにTask.Supervisor.async/2
でプロセスを作ってやればよい。結果の取り出しはTask.await/2
で可能
Distributed tasks
- distributeされたtaskはsuperviseされたtaskとほとんど同じで、違いは
supervisor
にtask
を作成するときにnode名を渡しているだけである。lib/kv/supervisor.ex
を開いてTask Supervisor
を追加しよう。
supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),
- 保存したらKVアプリケーションのルートで
iex --sname foo -S mix
とiex --sname bar -S mix
をそれぞれ立ち上げる。 - 片方からもう片方にnode名を返すタスクを作って
async/await
でnode名が返ることを確認する。
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn -> ...> {:ok, node()} ...> end iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn -> ...> (bar@your_hostname)> {:ok, node()} ...> (bar@your_hostname)> end %Task{pid: #PID<12875.125.0>, ref: #Reference<0.0.7.9>} iex(bar@your_hostname)> Task.await(task) {:ok, :"foo@your_hostname"}
Routing layer
lib/kv/router.ex
を作ってroutingの機能を実装する。- 以降のコードの
your_hostname
のあたりはサンプルを動かす実行環境に合わせて修正
defmodule KV.Router do @doc """ Dispatch the given `mod`, `fun`, `args` request to the appropriate node based on the `bucket`. """ def route(bucket, mod, fun, args) do # Get the first byte of the binary first = :binary.first(bucket) # Try to find an entry in the table or raise entry = Enum.find(table, fn {enum, node} -> first in enum end) || no_entry_error(bucket) # If the entry node is the current node if elem(entry, 1) == node() do apply(mod, fun, args) else sup = {KV.RouterTasks, elem(entry, 1)} Task.Supervisor.async(sup, fn -> KV.Router.route(bucket, mod, fun, args) end) |> Task.await() end end defp no_entry_error(bucket) do raise "could not find entry for #{inspect bucket} in table #{inspect table}" end @doc """ The routing table. """ def table do # Replace computer-name with your local machine name. [{?a..?m, :"foo@your_hostname"}, {?n..?z, :"bar@your_hostname"}] end end
- テストも
test/kv/router_test.exs
を追加する。
defmodule KV.RouterTest do use ExUnit.Case, async: true # 対応するnodeがrouterから返ってくるか test "route requests across nodes" do assert KV.Router.route("hello", Kernel, :node, []) == :"foo@your_hostname" assert KV.Router.route("world", Kernel, :node, []) == :"bar@your_hostname" end test "raises on unknown entries" do assert_raise RuntimeError, ~r/could not find entry/, fn -> KV.Router.route(<<0>>, Kernel, :node, []) end end end
- 作成したらテストを走らせるために事前に
iex --sname bar -S mix
で bar node を作成し、その後にelixir --sname foo -S mix test
で foo node でテストが実行される。通ればOK。
Test filters and tags
- テストは通ったが
mix test
だけでは走ることができない複雑な構造になってしまった。 tag
を使おう。test/kv/router_test.exs
を開いてテストにtag
をつけてみる。
# @tag distributed: true と書いても同じ @tag :distributed test "route requests across nodes" do
- 次に
test/test_helper.exs
を開いて、Nodeが生きている場合のみdistributed tag
が付いているテストが実行されるようにする。
exclude = if Node.alive?, do: [], else: [distributed: true] ExUnit.start(exclude: exclude)
mix test
を走らせると、route requests across nodes
テストが対象から外れて実行されなくなったはずだ。tag
を使ったmix test
のオプションとして以下がある。mix test --include tag
: include オプションで指定されたtag
はテスト対象に含まれるmix test --exclude tag
: exclude オプションで指定されたtag
はテスト対象から除外されるmix test --only tag
: only オプションで指定されたtag
はそのtag
がついたテストのみ実行される
- 今回のケースで
distributed tag
がついたケースだけを実行させたい場合はelixir --sname foo -S mix test --only distributed
で実行される。もちろん、この場合はテストで使うnodeが動いていなければテストは通らない。
Application environment and configuration
Routing table
をKV..Router
モジュールにハードコードしたが、これはうまくない。developとproductionで異なるものを使いたいときもあるだろう。application environment
を使う。apps/kv/mix.exs
を開いてapplication/0
を次のように書き直す。
def application do [applications: [], env: [routing_table: []], mod: {KV, []}] end
- 新たに
:env
keyを追加した。env:
で設定されたkey-valueがapplication environment
のデフォルト値となる。KV.Router.table/0' を書き換えて
application environmentから
Routing table`の設定を読み込むようにする。
@doc """ The routing table. """ def table do Application.get_env(:kv, :routing_table) end
Application.get_env/2
で指定したapplication environment
の値を取ることができる。次に実際に値を設定する。設定する場所は決まっていてconfig/config.exs
に書く。routing_table
の設定を書こう。
# Replace computer-name with your local machine nodes. config :kv, :routing_table, [{?a..?m, :"foo@your_hostname"}, {?n..?z, :"bar@your_hostname"}]
config/config.exs
はアプリケーションごとに持っていて共有されない。共有したい場合は他のアプリケーションのconfig
をimport
することができる。例えばkv_umbrella
のconfig.exs
と共有したい場合は次のようにimportする。
import_config "../apps/kv/config/config.exs"
- 実際にはumbrellaオプションをつけて作成したアプリケーションは自動的に
apps
配下のすべてのconfigをimportするようになっているのでこのコードを書く必要はない。
さいごに
- 課題っぽいのが与えられているので解いてみる
- 内部のハードコードされているポートを
application environment
から取るようにする。 kv_server
がローカルのKV.Registry
からbucketを作っているところをrouting
を使うようにする。テストも直す。
- 内部のハードコードされているポートを
- 最後はやっつけ仕事になってしまった。。。
とりあえず、今までのコード
elixir_training/kv_umbrella at master · rei-m/elixir_training · GitHub