メッセージパッシングでスレッド間のデータを受け渡す
安全な並行性を確保するためのアプローチとして、ますます人気が高まって いるのがメッセージパッシングです。これは、スレッドやアクターがデータを 含むメッセージを互いに送り合うことで通信する方式です。この考え方は、 Go 言語のドキュメント の次の標語によく表れています: 「メモリを共有して通信するな。代わりに、通信することでメモリを共有せよ。」
Rust の標準ライブラリは、メッセージ送信による並行性を実現するための チャネル実装を提供しています。チャネル とは、データをあるスレッドから 別のスレッドへ送るための一般的なプログラミング概念です。
プログラミングにおけるチャネルは、小川や川のように一方向に流れる水路の ようなものだと考えることができます。アヒルのおもちゃのようなものを川に 入れると、それは下流へ流れていき、水路の終点にたどり着きます。
チャネルは二つの部分から成ります。送信側と受信側です。送信側は、川の 上流でアヒルのおもちゃを川に入れる場所にあたり、受信側は、そのアヒルの おもちゃが下流で行き着く場所にあたります。コードの一方では、送信したい データとともに送信側のメソッドを呼び出し、もう一方では、受信側に届く メッセージを確認します。送信側または受信側のどちらかがドロップされると、 そのチャネルは 閉じられた と見なされます。
ここでは、値を生成してチャネルに送る 1 つのスレッドと、その値を受信して 表示する別のスレッドを持つプログラムを段階的に作っていきます。この機能を 説明するために、チャネルを使ってスレッド間で単純な値を送ります。この手法に 慣れたら、チャットシステムや、多数のスレッドが計算の一部を実行してその部分を 結果を集約する 1 つのスレッドに送るようなシステムなど、互いに通信する必要が あるあらゆるスレッドでチャネルを使えるようになります。
まず、リスト 16-6 ではチャネルを作成しますが、まだそれでは何もしません。 なお、現時点ではこれはまだコンパイルできません。Rust には、どの型の値を チャネルで送りたいのかが分からないためです。
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
mpsc::channel 関数を使って新しいチャネルを作成します。mpsc は
multiple producer, single consumer の略です。要するに、Rust の標準ライブラリ
におけるチャネル実装では、1 つのチャネルに、値を生成する複数の 送信 側を
持てますが、それらの値を消費する 受信 側は 1 つだけです。複数の小川が
合流して 1 本の大きな川になるところを想像してください。どの小川に流したものも、
最後には 1 本の川に流れ着きます。ここではまず 1 つのプロデューサーから
始めますが、この例が動くようになったら複数のプロデューサーを追加します。
mpsc::channel 関数はタプルを返します。その第 1 要素が送信側、つまり
トランスミッターであり、第 2 要素が受信側、つまりレシーバーです。略語
tx と rx は、多くの分野でそれぞれ transmitter と receiver を表すものとして
伝統的に使われているため、それぞれの端を示すように変数名をそのように
しています。ここでは、タプルを分解するパターンを伴う let 文を使っています。
let 文におけるパターンの使い方と分配束縛については第 19 章で説明します。
今は、このように let 文を使うことで、mpsc::channel が返すタプルの各要素を
便利に取り出せると知っておいてください。
次に、送信側を spawn したスレッドに移し、そのスレッドから 1 つの文字列を 送るようにして、生成したスレッドがメインスレッドと通信するようにしましょう。 これは、上流でアヒルのおもちゃを川に入れること、あるいはあるスレッドから 別のスレッドへチャットメッセージを送ることに似ています。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
ここでも、新しいスレッドを作成するために thread::spawn を使い、続いて move
を使って tx をクロージャへ移動し、生成したスレッドが tx を所有するように
しています。生成したスレッドは、チャネルを通じてメッセージを送るために、
送信側を所有している必要があります。
送信側には、送信したい値を受け取る send メソッドがあります。send
メソッドは Result<T, E> 型を返すため、受信側がすでにドロップされていて
値の送り先がない場合、送信操作はエラーを返します。この例では、エラー時に
パニックするために unwrap を呼び出しています。しかし実際のアプリケーション
では、これを適切に処理することになります。適切なエラーハンドリングの戦略を
復習するには、第 9 章に戻ってください。
リスト 16-8 では、メインスレッドで受信側から値を取得します。これは、川の 終わりで水の中からアヒルのおもちゃを取り出すこと、あるいはチャット メッセージを受け取ることに似ています。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
受信側には、便利なメソッドが 2 つあります。recv と try_recv です。ここで
使っているのは receive の略である recv で、これはメインスレッドの実行を
ブロックし、チャネルに値が送られるまで待機します。値が送信されると、
recv はそれを Result<T, E> で返します。送信側が閉じられると、recv は、
それ以上値が来ないことを示すためにエラーを返します。
try_recv メソッドはブロックしません。その代わり、即座に Result<T, E> を
返します。利用可能なメッセージがあればそれを保持した Ok 値を返し、今回
メッセージがなければ Err 値を返します。try_recv は、このスレッドに、
メッセージを待っている間にほかにする仕事がある場合に便利です。一定間隔で
try_recv を呼び出し、メッセージがあればそれを処理し、そうでなければ
少しのあいだ別の仕事をしてから再び確認する、というループを書くことができます。
この例では単純化のために recv を使っています。メインスレッドには、
メッセージを待つ以外にする仕事がないため、メインスレッドをブロックするのは
適切です。
リスト 16-8 のコードを実行すると、メインスレッドから表示された次の値が 見えるはずです。
Got: hi
完璧です!
チャネルを通じて所有権を移動する
所有権のルールは、メッセージ送信において重要な役割を果たします。なぜなら、それによって安全な並行コードを書けるようになるからです。Rustプログラム全体を通して所有権を意識する利点は、並行プログラミングにおけるエラーを防げることです。チャネルと所有権がどのように連携して問題を防ぐのかを示すために、実験をしてみましょう。チャネルに送信した 後で、生成されたスレッド内で val の値を使おうとしてみます。このコードが許可されない理由を確認するために、リスト16-9のコードをコンパイルしてみてください。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
ここでは、tx.send を通じて val をチャネルに送信した後で、それを表示しようとしています。これを許可するのはよくありません。いったん値が別のスレッドへ送信されると、そのスレッドが、こちらが再びその値を使おうとする前に、それを変更したりドロップしたりできてしまうからです。場合によっては、他方のスレッドによる変更が、一貫性のないデータや存在しないデータを原因として、エラーや予期しない結果を引き起こす可能性があります。しかし、リスト16-9のコードをコンパイルしようとすると、Rustはエラーを出してくれます。
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:27
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
この並行性に関するミスは、コンパイル時エラーを引き起こしました。send 関数はその引数の所有権を受け取り、値がムーブされると受信側がその所有権を取得します。これにより、送信後に誤ってその値を再び使うことが防がれます。所有権システムが、すべて問題ないことを検査してくれるのです。
複数の値を送信する
リスト16-8のコードはコンパイルも実行もできましたが、2つの別々のスレッドがチャネルを介して互いにやり取りしていることは、はっきりとは示していませんでした。
リスト16-10では、リスト16-8のコードが並行して動いていることを示すための変更をいくつか加えました。生成されたスレッドは今度は複数のメッセージを送信し、各メッセージの間で1秒停止します。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
今回は、生成されたスレッドがメインスレッドへ送りたい文字列のベクタを持っています。それらを反復処理して、それぞれを個別に送信し、1秒の Duration 値を指定して thread::sleep 関数を呼び出すことで、各送信の間に一時停止を入れます。
メインスレッドでは、もはや recv 関数を明示的に呼び出していません。代わりに、rx をイテレータとして扱っています。受信した各値について、それを表示しています。チャネルが閉じられると、反復は終了します。
リスト16-10のコードを実行すると、各行の間に1秒の間隔を空けて、次のような出力が表示されるはずです。
Got: hi
Got: from
Got: the
Got: thread
メインスレッドの for ループには一時停止や遅延を行うコードがないので、メインスレッドが生成されたスレッドから値を受け取るのを待っていることがわかります。
複数のプロデューサを作成する
先ほど、mpsc は 複数のプロデューサ、単一のコンシューマ の頭字語だと述べました。では mpsc を実際に使って、リスト16-10のコードを拡張し、すべて同じ受信側に値を送る複数のスレッドを作成してみましょう。これは、リスト16-11に示すように送信側をクローンすることで実現できます。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
今回は、最初の生成されたスレッドを作成する前に、送信側に対して clone を呼び出します。これにより、最初の生成されたスレッドに渡せる新しい送信側が得られます。元の送信側は2つ目の生成されたスレッドに渡します。これで、同じ1つの受信側にそれぞれ異なるメッセージを送る2つのスレッドができます。
このコードを実行すると、出力はおおよそ次のようになります。
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
システムによっては、値が別の順序で表示されるかもしれません。これこそが、並行性を興味深くもあり難しくもしている点です。thread::sleep にさまざまな値を与えて異なるスレッドで試してみると、実行ごとの非決定性はさらに高まり、毎回異なる出力が生まれます。
チャネルがどのように動作するかを見てきたので、次は別の並行性の手法を見ていきましょう。