

Elixir 入門 ~ MIX AND OTP ~ その9 - Distributed tasks and configuration

  • 最後にroutingの機能をKVアプリケーションに追加する。routing table はこのようになる。
[{?a..?m, :"foo@computer-name"},
 {?n..?z, :"bar@computer-name"}]

Our first distributed code

  • VMに名前をつけて開始する。iex --sname fooでREPLを起動する。
  • すると以下のように node名@computer-name がpromptに表示される
Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help)
  • この中でモジュールを定義する。
iex(foo@your_hostname)> defmodule Hello do
...>  def world, do: IO.puts "hello world"
...> end

iex(foo@your_hostname)> Hello.world
hello world
  • 次にfooを生かしたまま、別のnode名で新しくREPLを立ち上げて、先ほどのHello.worldを呼ぶ
iex --sname bar
iex(bar@your_hostname)> Hello.world
** (UndefinedFunctionError) undefined function: Hello.world/0 (module Hello is not available)
  • 当然、barには定義されていないので返ってこない。ここで Node.spawn_link を使ってfooの新しいプロセスを作ることができる。
iex(bar@your_hostname)> Node.spawn_link :"foo@ your_hostname", fn -> Hello.world end
hello world
  • hello world` が 表示された。他のnodeのプロセスを生んでそのpidを返したことがわかる。次にそのpidを使ってメッセージの送受信をする。
iex(bar@your_hostname)> pid = Node.spawn_link :"foo@your_hostname", fn ->
...>   receive do
...>     {:ping, client} -> send client, :pong
...>   end
...> end
iex(bar@your_hostname)> send pid, {:ping, self}
{:ping, #PID<0.65.0>}
iex(bar@your_hostname)> flush
  • foo nodeに :ping を受け取って :pong を返す関数を定義したプロセスを作った。そしてbarから fooに :ping を送って :pong が帰ってきたことがわかる。ただ、このやり方はsupervison tree の管理からは外れるので避けたい。他の方法も探してみよう。


  • Elixirは async/await パターンも提供している。asyncで先に計算してawaitであとで結果を取り出すというようなことが可能。
iex> task = Task.async(fn ->
...> :timer.sleep(5000)
...> "hogehoge"
...> end)
%Task{pid: #PID<0.82.0>, ref: #Reference<>}
iex>  Task.await(task)
  • Task.supervisorの下で使うことも可能。その場合はTask.Supervisor.start_child/2の代わりにTask.Supervisor.async/2でプロセスを作ってやればよい。結果の取り出しはTask.await/2で可能

Distributed tasks

  • distributeされたtaskはsuperviseされたtaskとほとんど同じで、違いはsupervisortaskを作成するときにnode名を渡しているだけである。lib/kv/supervisor.exを開いてTask Supervisorを追加しよう。
supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),
  • 保存したらKVアプリケーションのルートで iex --sname foo -S mixiex --sname bar -S mix をそれぞれ立ち上げる。
  • 片方からもう片方にnode名を返すタスクを作ってasync/awaitでnode名が返ることを確認する。
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn ->
...>   {:ok, node()}
...> end
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn ->
...> (bar@your_hostname)> {:ok, node()}
...> (bar@your_hostname)> end
%Task{pid: #PID<12875.125.0>, ref: #Reference<>}
iex(bar@your_hostname)> Task.await(task)
{:ok, :"foo@your_hostname"}

Routing layer

  • lib/kv/router.ex を作ってroutingの機能を実装する。
  • 以降のコードのyour_hostnameのあたりはサンプルを動かす実行環境に合わせて修正
defmodule KV.Router do
  @doc """
  Dispatch the given `mod`, `fun`, `args` request
  to the appropriate node based on the `bucket`.
  def route(bucket, mod, fun, args) do
    # Get the first byte of the binary
    first = :binary.first(bucket)

    # Try to find an entry in the table or raise
    entry =
      Enum.find(table, fn {enum, node} ->
        first in enum
      end) || no_entry_error(bucket)

    # If the entry node is the current node
    if elem(entry, 1) == node() do
      apply(mod, fun, args)
      sup = {KV.RouterTasks, elem(entry, 1)}
      Task.Supervisor.async(sup, fn ->
        KV.Router.route(bucket, mod, fun, args)
      end) |> Task.await()

  defp no_entry_error(bucket) do
    raise "could not find entry for #{inspect bucket} in table #{inspect table}"

  @doc """
  The routing table.
  def table do
    # Replace computer-name with your local machine name.
    [{?a..?m, :"foo@your_hostname"},
     {?n..?z, :"bar@your_hostname"}]
  • テストもtest/kv/router_test.exs を追加する。
defmodule KV.RouterTest do
  use ExUnit.Case, async: true

  # 対応するnodeがrouterから返ってくるか
  test "route requests across nodes" do
    assert KV.Router.route("hello", Kernel, :node, []) ==
    assert KV.Router.route("world", Kernel, :node, []) ==

  test "raises on unknown entries" do
    assert_raise RuntimeError, ~r/could not find entry/, fn ->
      KV.Router.route(<<0>>, Kernel, :node, [])
  • 作成したらテストを走らせるために事前に iex --sname bar -S mix で bar node を作成し、その後に elixir --sname foo -S mix test で foo node でテストが実行される。通ればOK。

Test filters and tags

  • テストは通ったが mix testだけでは走ることができない複雑な構造になってしまった。
  • tagを使おう。test/kv/router_test.exsを開いてテストにtagをつけてみる。
# @tag distributed: true と書いても同じ
@tag :distributed
test "route requests across nodes" do
  • 次に test/test_helper.exs を開いて、Nodeが生きている場合のみdistributed tag が付いているテストが実行されるようにする。
exclude =  if Node.alive?, do: [], else: [distributed: true]

ExUnit.start(exclude: exclude)
  • mix testを走らせると、route requests across nodes テストが対象から外れて実行されなくなったはずだ。
  • tagを使ったmix testのオプションとして以下がある。
    • mix test --include tag : include オプションで指定されたtagはテスト対象に含まれる
    • mix test --exclude tag : exclude オプションで指定されたtagはテスト対象から除外される
    • mix test --only tag : only オプションで指定されたtagはそのtagがついたテストのみ実行される
  • 今回のケースでdistributed tagがついたケースだけを実行させたい場合は elixir --sname foo -S mix test --only distributed で実行される。もちろん、この場合はテストで使うnodeが動いていなければテストは通らない。

Application environment and configuration

  • Routing tableKV..Routerモジュールにハードコードしたが、これはうまくない。developとproductionで異なるものを使いたいときもあるだろう。
  • application environmentを使う。apps/kv/mix.exsを開いてapplication/0を次のように書き直す。
def application do
  [applications: [],
   env: [routing_table: []],
   mod: {KV, []}]
  • 新たに:env keyを追加した。env:で設定されたkey-valueがapplication environmentのデフォルト値となる。KV.Router.table/0' を書き換えてapplication environmentからRouting table`の設定を読み込むようにする。
@doc """
The routing table.
def table do
  Application.get_env(:kv, :routing_table)
  • Application.get_env/2 で指定したapplication environmentの値を取ることができる。次に実際に値を設定する。設定する場所は決まっていてconfig/config.exsに書く。routing_tableの設定を書こう。
# Replace computer-name with your local machine nodes.
config :kv, :routing_table,
       [{?a..?m, :"foo@your_hostname"},
        {?n..?z, :"bar@your_hostname"}]
  • config/config.exsはアプリケーションごとに持っていて共有されない。共有したい場合は他のアプリケーションのconfigimportすることができる。例えばkv_umbrellaconfig.exsと共有したい場合は次のようにimportする。
import_config "../apps/kv/config/config.exs"
  • 実際にはumbrellaオプションをつけて作成したアプリケーションは自動的にapps配下のすべてのconfigをimportするようになっているのでこのコードを書く必要はない。


  • 課題っぽいのが与えられているので解いてみる
    • 内部のハードコードされているポートをapplication environmentから取るようにする。
    • kv_serverがローカルのKV.Registryからbucketを作っているところをroutingを使うようにする。テストも直す。
  • 最後はやっつけ仕事になってしまった。。。


