読者です 読者をやめる 読者になる 読者になる

もやもやエンジニア

IT系のネタで思ったことや技術系のネタを備忘録的に綴っていきます。フロント率高め。

Elixir 入門 ~ MIX AND OTP ~ その9 - Distributed tasks and configuration

Elixir OTP Mix
  • 最後にroutingの機能をKVアプリケーションに追加する。routing table はこのようになる。
[{?a..?m, :"foo@computer-name"},
 {?n..?z, :"bar@computer-name"}]

Our first distributed code

  • VMに名前をつけて開始する。iex --sname fooでREPLを起動する。
  • すると以下のように node名@computer-name がpromptに表示される
Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help)
iex(foo@your_hostname)>
  • この中でモジュールを定義する。
iex(foo@your_hostname)> defmodule Hello do
...>  def world, do: IO.puts "hello world"
...> end

iex(foo@your_hostname)> Hello.world
hello world
:ok
  • 次にfooを生かしたまま、別のnode名で新しくREPLを立ち上げて、先ほどのHello.worldを呼ぶ
iex --sname bar
iex(bar@your_hostname)> Hello.world
** (UndefinedFunctionError) undefined function: Hello.world/0 (module Hello is not available)
    Hello.world()
  • 当然、barには定義されていないので返ってこない。ここで Node.spawn_link を使ってfooの新しいプロセスを作ることができる。
iex(bar@your_hostname)> Node.spawn_link :"foo@ your_hostname", fn -> Hello.world end
hello world
#PID<8897.78.0>
  • hello world` が 表示された。他のnodeのプロセスを生んでそのpidを返したことがわかる。次にそのpidを使ってメッセージの送受信をする。
iex(bar@your_hostname)> pid = Node.spawn_link :"foo@your_hostname", fn ->
...>   receive do
...>     {:ping, client} -> send client, :pong
...>   end
...> end
#PID<8897.86.0>
iex(bar@your_hostname)> send pid, {:ping, self}
{:ping, #PID<0.65.0>}
iex(bar@your_hostname)> flush
:pong
:ok
  • foo nodeに :ping を受け取って :pong を返す関数を定義したプロセスを作った。そしてbarから fooに :ping を送って :pong が帰ってきたことがわかる。ただ、このやり方はsupervison tree の管理からは外れるので避けたい。他の方法も探してみよう。

async/await

  • Elixirは async/await パターンも提供している。asyncで先に計算してawaitであとで結果を取り出すというようなことが可能。
iex> task = Task.async(fn ->
...> :timer.sleep(5000)
...> "hogehoge"
...> end)
%Task{pid: #PID<0.82.0>, ref: #Reference<0.0.2.103>}
iex>  Task.await(task)
"hogehoge"
  • Task.supervisorの下で使うことも可能。その場合はTask.Supervisor.start_child/2の代わりにTask.Supervisor.async/2でプロセスを作ってやればよい。結果の取り出しはTask.await/2で可能

Distributed tasks

  • distributeされたtaskはsuperviseされたtaskとほとんど同じで、違いはsupervisortaskを作成するときにnode名を渡しているだけである。lib/kv/supervisor.exを開いてTask Supervisorを追加しよう。
supervisor(Task.Supervisor, [[name: KV.RouterTasks]]),
  • 保存したらKVアプリケーションのルートで iex --sname foo -S mixiex --sname bar -S mix をそれぞれ立ち上げる。
  • 片方からもう片方にnode名を返すタスクを作ってasync/awaitでnode名が返ることを確認する。
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn ->
...>   {:ok, node()}
...> end
iex(bar@your_hostname)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@your_hostname"}, fn ->
...> (bar@your_hostname)> {:ok, node()}
...> (bar@your_hostname)> end
%Task{pid: #PID<12875.125.0>, ref: #Reference<0.0.7.9>}
iex(bar@your_hostname)> Task.await(task)
{:ok, :"foo@your_hostname"}

Routing layer

  • lib/kv/router.ex を作ってroutingの機能を実装する。
  • 以降のコードのyour_hostnameのあたりはサンプルを動かす実行環境に合わせて修正
defmodule KV.Router do
  @doc """
  Dispatch the given `mod`, `fun`, `args` request
  to the appropriate node based on the `bucket`.
  """
  def route(bucket, mod, fun, args) do
    # Get the first byte of the binary
    first = :binary.first(bucket)

    # Try to find an entry in the table or raise
    entry =
      Enum.find(table, fn {enum, node} ->
        first in enum
      end) || no_entry_error(bucket)

    # If the entry node is the current node
    if elem(entry, 1) == node() do
      apply(mod, fun, args)
    else
      sup = {KV.RouterTasks, elem(entry, 1)}
      Task.Supervisor.async(sup, fn ->
        KV.Router.route(bucket, mod, fun, args)
      end) |> Task.await()
    end
  end

  defp no_entry_error(bucket) do
    raise "could not find entry for #{inspect bucket} in table #{inspect table}"
  end

  @doc """
  The routing table.
  """
  def table do
    # Replace computer-name with your local machine name.
    [{?a..?m, :"foo@your_hostname"},
     {?n..?z, :"bar@your_hostname"}]
  end
end
  • テストもtest/kv/router_test.exs を追加する。
defmodule KV.RouterTest do
  use ExUnit.Case, async: true

  # 対応するnodeがrouterから返ってくるか
  test "route requests across nodes" do
    assert KV.Router.route("hello", Kernel, :node, []) ==
           :"foo@your_hostname"
    assert KV.Router.route("world", Kernel, :node, []) ==
           :"bar@your_hostname"
  end

  test "raises on unknown entries" do
    assert_raise RuntimeError, ~r/could not find entry/, fn ->
      KV.Router.route(<<0>>, Kernel, :node, [])
    end
  end
end
  • 作成したらテストを走らせるために事前に iex --sname bar -S mix で bar node を作成し、その後に elixir --sname foo -S mix test で foo node でテストが実行される。通ればOK。

Test filters and tags

  • テストは通ったが mix testだけでは走ることができない複雑な構造になってしまった。
  • tagを使おう。test/kv/router_test.exsを開いてテストにtagをつけてみる。
# @tag distributed: true と書いても同じ
@tag :distributed
test "route requests across nodes" do
  • 次に test/test_helper.exs を開いて、Nodeが生きている場合のみdistributed tag が付いているテストが実行されるようにする。
exclude =  if Node.alive?, do: [], else: [distributed: true]

ExUnit.start(exclude: exclude)
  • mix testを走らせると、route requests across nodes テストが対象から外れて実行されなくなったはずだ。
  • tagを使ったmix testのオプションとして以下がある。
    • mix test --include tag : include オプションで指定されたtagはテスト対象に含まれる
    • mix test --exclude tag : exclude オプションで指定されたtagはテスト対象から除外される
    • mix test --only tag : only オプションで指定されたtagはそのtagがついたテストのみ実行される
  • 今回のケースでdistributed tagがついたケースだけを実行させたい場合は elixir --sname foo -S mix test --only distributed で実行される。もちろん、この場合はテストで使うnodeが動いていなければテストは通らない。

Application environment and configuration

  • Routing tableKV..Routerモジュールにハードコードしたが、これはうまくない。developとproductionで異なるものを使いたいときもあるだろう。
  • application environmentを使う。apps/kv/mix.exsを開いてapplication/0を次のように書き直す。
def application do
  [applications: [],
   env: [routing_table: []],
   mod: {KV, []}]
end
  • 新たに:env keyを追加した。env:で設定されたkey-valueがapplication environmentのデフォルト値となる。KV.Router.table/0' を書き換えてapplication environmentからRouting table`の設定を読み込むようにする。
@doc """
The routing table.
"""
def table do
  Application.get_env(:kv, :routing_table)
end
  • Application.get_env/2 で指定したapplication environmentの値を取ることができる。次に実際に値を設定する。設定する場所は決まっていてconfig/config.exsに書く。routing_tableの設定を書こう。
# Replace computer-name with your local machine nodes.
config :kv, :routing_table,
       [{?a..?m, :"foo@your_hostname"},
        {?n..?z, :"bar@your_hostname"}]
  • config/config.exsはアプリケーションごとに持っていて共有されない。共有したい場合は他のアプリケーションのconfigimportすることができる。例えばkv_umbrellaconfig.exsと共有したい場合は次のようにimportする。
import_config "../apps/kv/config/config.exs"
  • 実際にはumbrellaオプションをつけて作成したアプリケーションは自動的にapps配下のすべてのconfigをimportするようになっているのでこのコードを書く必要はない。

さいごに

  • 課題っぽいのが与えられているので解いてみる
    • 内部のハードコードされているポートをapplication environmentから取るようにする。
    • kv_serverがローカルのKV.Registryからbucketを作っているところをroutingを使うようにする。テストも直す。
  • 最後はやっつけ仕事になってしまった。。。

とりあえず、今までのコード

elixir_training/kv_umbrella at master · rei-m/elixir_training · GitHub