構造化並行性
プログラムが複数のタスクを同時に実行する場合、それらを追跡し、 いつ終了したかを把握し、結果を集め、何か問題が起きたときにクリーンアップする方法が必要です。 このセクションでは、その方法を示します。
JoinSet
JoinSet は、同時に実行されるノンブロッキングタスクのグループを保持するコンテナです。
これにタスクを追加し、その後、それらが1つずつ終了するのを待つことができます。タスクが終了したら、
その結果を取得できます。すべてのタスクが完了する前に JoinSet を破棄すると、
未完了のタスクはすべて自動的にキャンセルされます。
この例では、5つのジョブが作成されて JoinSet に追加されます。各ジョブはそれぞれ単独で作業を行い、 プログラムはそのすべてが完了するまで待ってから先へ進みます。
use tokio::task::JoinSet;
struct Job {
id: usize,
data: String,
}
impl Job {
fn new(id: usize) -> Self {
let data = "d".repeat(id);
Self { id, data }
}
async fn process(&mut self) {
// ネットワーク経由でさらにデータを取得するなど、何らかの処理を行う
self.data = self.data.replace("d", "x");
println!("Id {} -> {}", self.id, self.data);
}
}
#[tokio::main]
async fn main() {
let mut set = JoinSet::new();
const JOB_COUNT: usize = 5;
let jobs = (0..JOB_COUNT).map(Job::new).collect::<Vec<Job>>();
for mut job in jobs {
set.spawn(async move { job.process().await });
}
let mut complete = 0usize;
while set.join_next().await.is_some() { // 各ジョブの終了を待つ
complete += 1;
}
assert_eq!(complete, JOB_COUNT);
}
macrosおよびrt機能を有効にして、Cargo.tomlにtokioを追加してください。[dependencies] tokio = { version = "*", features = ["macros", "rt"] }