メッセージパッシング
プログラムで複数のタスクが同時に実行されているとき、それらはときどき互いにやり取りする必要が あります。チャネルはそのための仕組みで、あるタスクがメッセージを送り、別のタスクがそれを受け取ります。 パイプのようなものだと考えてください。片方の端に何かを入れると、もう片方の端から出てきます。
有界チャネル
bounded channel には、一度に保持できるメッセージ数の上限があります。チャネルがいっぱいになると、
送信側は、別のメッセージを送信できるだけの空きができるまで待機しなければなりません。
これは現実の郵便受けのようなものだと考えてください。固定数の手紙しか入れられません。いっぱいの場合、 郵便配達員は、誰かが中身を取り出して空けるまで、次の手紙を投函するのを待たなければなりません。
この例では、2 つの書店が容量 5 のチャネルを通して本を送信します。 本棚は、送られてくるものを何でも受け取ります。
use std::io;
use tokio::sync::mpsc::channel;
struct Book {
title: &'static str,
}
impl Book {
fn new(title: &'static str) -> Self {
Self { title }
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
let (book_sender, mut book_receiver) = channel(5);
// 各書店は、それぞれ送信側のコピーを受け取る。
let store_one_book_sender = book_sender.clone();
let book_store_one = tokio::task::spawn(async move {
if let Err(err) = store_one_book_sender
.send(Book::new("Shawshank Redemption"))
.await
{
eprintln!("Failed to send book from store one: {}", err);
}
});
let book_store_two = tokio::task::spawn(async move {
if let Err(err) = book_sender.send(Book::new("Secret Recipe")).await {
eprintln!("Failed to send book from store two: {}", err);
}
});
let mut shelf: Vec<Book> = Vec::new();
// 両方の書店が送信を終えるまで、届いた本をすべて集める。
while let Some(new_book) = book_receiver.recv().await {
shelf.push(new_book);
}
book_store_one.await?;
book_store_two.await?;
for book in &shelf {
println!("Title: {}", book.title);
}
Ok(())
}
Cargo.tomlにtokioを追加し、macrosとsync機能を有効にしてください。[dependencies] tokio = { version = "*", features = ["macros", "sync"] }
無制限チャネル
unbounded channel には、保持できるメッセージ数の上限がありません。送信側は待つ必要がなく、
すでにどれだけメッセージがたまっていても、いつでもメッセージを送り込めます。
デジタルな受信箱のようなものだと考えてください。新しいメッセージが届くたびに、 それは増え続けます。上限はありませんが、 メッセージが読み出されるよりも速いペースでたまっていくと、プログラムはますます多くのメモリを使用します。 受信側がまだ開いている限り、無制限チャネルへの送信は常に成功します。 受信側が遅い場合、メッセージは単にキューにたまり、待機します。
この例では、2 人がチャネルを通じてメッセージを送信します。受信箱が、届いたものを 集めます。
use std::io;
use tokio::sync::mpsc::unbounded_channel;
struct Message {
from: &'static str,
text: &'static str,
}
impl Message {
fn new(from: &'static str, text: &'static str) -> Self {
Self { from, text }
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
let (message_sender, mut message_receiver) = unbounded_channel();
let alice_message_sender = message_sender.clone();
let person_one = tokio::task::spawn(async move {
if let Err(err) = alice_message_sender.send(Message::new("Alice", "Meeting postponed")) {
eprintln!("Failed to send message from Alice: {}", err);
}
});
let person_two = tokio::task::spawn(async move {
if let Err(err) = message_sender.send(Message::new("Bob", "Secret Leaked")) {
eprintln!("Failed to send message from Bob: {}", err);
}
});
let mut inbox: Vec<Message> = Vec::new();
while let Some(new_book) = message_receiver.recv().await {
inbox.push(new_book);
}
person_one.await?;
person_two.await?;
for msg in &inbox {
println!("{} says: {}", msg.from, msg.text);
}
Ok(())
}
Cargo.tomlにtokioを追加し、macrosとsyncfeature を有効にします。[dependencies] tokio = { version = "*", features = ["macros", "sync"] }