Erlang 基礎ポイント4 - 並行処理1

今日はErlangで並行処理プログラムを作成する方法を勉強します。

並行処理

並行処理とは、一つのプログラムの中で複数の処理を同時に実行させることです。
WindowsLinuxなどのOSでは、複数のプログラムが複数のプロセスとして同時に実行されます。
Apache TomcatなどのAPサーバの内部では、複数の処理単位(リクエスト、サーバ)が複数のスレッドとして同時に実行されます。

Erlangでは、複数の処理が複数のプロセスとして実行されます。
しかし、名前はプロセスですが、ErlangのプロセスはOSのプロセスとは違います
あくまでErlang内に存在するプロセスなのです。
これからErlangの並行処理に登場するプロセスは、Erlangプロセスと意識するようにしましょう。

基本

Erlangで作成する並行処理プログラムはJavaなどの言語で記述するスレッド基盤のプログラムとは記述方法が違います。
が、プログラム(又はメインスレッド・メインプロセス)の実行中にいきなり並行に何か(Erlangプロセス)を起動するということは同じです。

Erlangで並行処理プログラムを書くのはとても簡単ですが、それでも掘り下げると色々出てきます。
ここでは最小限の説明だけをします。後はマニュアルや他のウェブサイトを参考して、知識を拡張していけばいいと思います。

まずは、サンプルプログラムを分析し、その後、並行処理プログラムの作成方法を整理してみましょう。

% concurrent.erl
-module(concurrent).
-export([start/0]).
start() -> 
  % A
  spawn(fun() -> echo(1) end).

echo(NoMsgCnt) ->
  % B
  receive
    shutdown ->
      io:format("Shutdown echo~n");
    {one, Msg} ->
      io:format("One Msg ~p~n", [Msg]),
      echo(NoMsgCnt);
    {two, Msg} ->
      io:format("Two Msg ~p~n", [Msg]),
      echo(NoMsgCnt);
    Unknown ->
      io:format("Unknown Msg ~p~n", [Unknown]),
      echo(NoMsgCnt)
  after
  % C
    7000 ->
      io:format("No Msg ~p~n",[NoMsgCnt]),
      echo(NoMsgCnt + 1)
  end.

まず、start関数内のコメントAを見てみましょう。
spawn関数が見えますね。
spawn関数を呼び出すと、引数に指定した関数が処理内容として新しいErlangプロセスが生成及び実行されます
新しいErlangプロセスが生成されると、spawn関数を実行したErlangプロセスと新しいErlangプロセスが同時、つまり、並行で
実行されるようになります。
spawn関数はErlangプロセスのID(PID)を返します。PIDは後ほど、Erlangプロセスと通信する時に必要です。

spawn関数に指定した関数の内部ではecho関数を呼び出しています。
echo関数の内部を見てみましょう。新しいErlangの文法が出てきました。
receiveafterですね。
基本的にspawn関数に指定する関数の内容は何でもいいです。
非同期で何かの処理を分散して行うように、新しいErlangプロセスを実行した後、そのErlangプロセスを忘れてもいい場面もあるでしょうが、
自分を実行した親Erlangプロセスはもちろんのこと、他のErlangプロセスと通信する必要がある場面がもっと多いと思います。

プロセス間の通信のため、Javaでしたら、同期しながら、変数を値を調べるとか...色々と頭が痛くなりますが、
(concurrentパッケージにあるクラスを使えば大体解決できますが、Erlangよりは確かに冗長です)
Erlangでは非常に単純にErlangプロセス間の通信が実現できます。

新しいErlangプロセスが立ち上がると、そのErlangプロセスのためのメールボックスが用意されます。
形こそ、hotmailgmailのようなメールとは違いますが、概念的には同じです。
receive文が実行されると、メールボックスにメッセージが入ってくるまで待機(ブロッキング)状態になります。
メッセージが同時に多数届いた場合、それぞれのメッセージが非同期で処理され、処理が終わったメッセージは削除されます。
メッセージはreceive内のパターンと照合され、マッチしたパターンの処理内容が実行されます。
もし、どのパターンにもマッチしなかった場合、何も実行されません。

afterには一定の時間がすぎるまでメッセージが届いていない場合、つまり、タイムアウトした場合、
処理される内容が置かれます。単位はmsです。
上記には7000とあるので、7秒間、メッセージが届かない場合、No Msg Xと表示されます。

receive内の各パターン及びafterの処理内容を見ると、処理内容の最後に再帰呼出(echo関数の呼出)をしているのが分かります。
これはまた次のメッセージを待つためです。再帰呼出をしないと、そのままErlangプロセスが終了してしまいますので、ご注意下さい。
shutdownパターンには再帰呼出がないことから、Erlangプロセスを終了するということが分かります。
パターンにあるアトムがshutdownですから、意図的ですね。

これでErlangで並行処理を記述することができました。

次は並行処理を実行し、Erlangプロセスと通信する方法を勉強しましょう。
以下は上記のconcurrentモジュールをREPL環境でテストしたものです。

1> c(concurrent).
{ok,concurrent}
2> P = concurrent:start().
<0.39.0>
No Msg 1          
3> P ! {one, "Hello"}.
One Msg "Hello"
{one,"Hello"}
4> P ! {two, "Hello"}.
Two Msg "Hello"
{two,"Hello"}
5> P ! {three, "Hello"}.
Unknown Msg {three,"Hello"}
{three,"Hello"}
No Msg 2                
No Msg 3        
6> P ! shutdown.
Shutdown echo
shutdown
7> 

concurrentモジュールをコンパイルし、start関数を呼び出しました。
戻り値のPIDを変数Pに代入しました。
PはPIDですが、Erlangプロセスへの電話だと考えればいいです。
Pへの連絡はどうすればいいか? ビックリマーク(!)を使えばいいです。

PID ! メッセージ

です。これがErlangプロセスにメッセージを送る方法です。簡単でしょう?
これでメッセージが該当PIDのErlangプロセスに送られると、
メッセージとErlangプロセスのreceive内にあるパターンとの照合が行われ、
マッチしたパターンの処理内容が実行されます。

整理してみましょう。

% 新しいErlangプロセスの生成は
PID = spawn(fun() -> xxx(引数...) end)

% Erlangプロセスでメッセージを受け取るには
xxx(引数...) ->
  receive
    パターン1 ->
      処理1,
      ...
      処理x,
      xxx(引数...); % xxx()を再帰呼出しないと、終了してしまう。
    ...
    パターンx ->
      処理1,
      ...
      処理x,
      xxx(引数...) % 最後のパターンにはセミコロンを書かない
  after % afterは省略可能
    タイムアウト値(ms) ->
      処理
     xxx(引数...)
  end.

% Erlangプロセスと通信するためには
PID ! メッセージ 

さらに

これで、Erlangで動く並行処理プログラムを書けるようになりましたが、
ちょっと足りない部分があります。
それぞれのErlangプロセスとやり取りするため、メッセージを送る時にspawn関数から返されるPidが必要ですが、
Pidをいちいち覚えておくのは面倒です。
そこで、ErlangにはPidを意味ある名前で登録できるようにする関数が用意されています。
その関数でPidを名前に登録しておき、後でその名前でErlangプロセスにメッセージを送ればいいです。

使い方は簡単です。以下のようにすればいいです。

register(名前を表すアトム, Pid)

反対に登録した名前を解除する関数もあります。

unregister(名前を表すアトム)

簡単ですね。

以下は基本で示したサンプルプログラムをregisterを使うように修正し、REPL環境で試したものです。

% concurrent2.erl
-module(concurrent2).
-export([start/0]).
start() -> 
  P = spawn(fun() -> echo(1) end),
  register(test_echo, P).

echo(NoMsgCnt) ->
  receive
    shutdown ->
      io:format("Shutdown echo~n");
    {one, Msg} ->
      io:format("One Msg ~p~n", [Msg]),
      echo(NoMsgCnt);
    {two, Msg} ->
      io:format("Two Msg ~p~n", [Msg]),
      echo(NoMsgCnt);
    Unknown ->
      io:format("Unknown Msg ~p~n", [Unknown]),
      echo(NoMsgCnt)
  after
    7000 ->
      io:format("No Msg ~p~n",[NoMsgCnt]),
      echo(NoMsgCnt + 1)
  end.

% REPL環境でテスト。
1> c(concurrent2).
{ok,concurrent2}
2> concurrent2:start().
true
No Msg 1        
No Msg 2        
No Msg 3                
3> test_echo ! {one, "Hello"}.
One Msg "Hello"
{one,"Hello"}
4> test_echo ! {two, "Hello"}.
Two Msg "Hello"
{two,"Hello"}
5> test_echo ! {three, "Hello"}.
Unknown Msg {three,"Hello"}
{three,"Hello"}
No Msg 4                        
6> test_echo ! shutdown.
Shutdown echo
shutdown
7> 

PIDでなく、test_echoという分かりやすい名前でErlangプロセスと通信できていますね。

Erlangプロセスは他の言語のスレッドやOSのプロセスより早くて効率がいいです。
数万・数十万個のプロセスも問題なく処理できるそうだから、すごいですね。
自分でErlangの威力が直接感じられるプログラムが書けたらいいですね。
イデアが浮かんだら、ぜひやってみたいと思います。
ちなみにErlangでネットワークプログラムを作ると、NodeJSと比べてどうでしょうか。
メリットが少し似ているようが気がします。ググれば出るのかな...

今日はここまでにします。
元々今日でErlang 基礎ポイントを最後にしようと思いましたが、
並行処理での例外処理について意外と書くことが多くて、
次の記事に続けて書くことにします。
最後になるかどうかは分かりません。これはあくまで個人的な理由で書くものですから...笑