select!
futures::select マクロは複数の Future を同時に実行し、
いずれかの Future が完了した時点ですぐに応答できるようにします。
#![allow(unused)]
fn main() {
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();
pin_mut!(t1, t2);
select! {
() = t1 => println!("task one completed first"),
() = t2 => println!("task two completed first"),
}
}
}
上の関数は t1 と t2 の両方を並行して実行します。
t1 または t2 のいずれかが終了すると、対応するハンドラーが println! を呼び出し、
残りのタスクを完了させることなく関数は終了します。
select の基本構文は <pattern> = <expression> => <code>, であり、
select の対象にしたい Future の数だけ繰り返します。
default => ... と complete => ...
select は default ブランチと complete ブランチもサポートしています。
default ブランチは、select の対象になっている Future のどれもまだ完了していない場合に実行されます。
したがって、default ブランチを持つ select は常に即座に返ります。
他の Future のどれも準備できていなければ、default が実行されるためです。
complete ブランチは、select の対象になっているすべての Future が完了し、
それ以上進捗しなくなった場合を処理するために使用できます。
これは、select! をループで使うときに便利なことがよくあります。
#![allow(unused)]
fn main() {
use futures::{future, select};
async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => unreachable!(), // never runs (futures are ready, then complete)
};
}
assert_eq!(total, 10);
}
}
Unpin および FusedFuture との相互作用
上の最初の例で気づいたかもしれませんが、
2 つの async fn が返す Future に対して .fuse() を呼び出し、
さらに pin_mut でピン留めする必要がありました。
これらの呼び出しはいずれも必要です。
なぜなら、select で使用する Future は Unpin トレイトと
FusedFuture トレイトの両方を実装していなければならないためです。
select で使用される Future は値として受け取られるのではなく、
可変参照として受け取られるため、Unpin が必要です。
Future の所有権を取得しないことで、未完了の Future は
select の呼び出し後に再び使用できます。
同様に、FusedFuture トレイトが必要なのは、
select が完了後の Future を poll してはならないためです。
FusedFuture は、自身が完了したかどうかを追跡する Future によって実装されます。
これにより、ループ内で select を使用し、
まだ完了していない Future だけを poll することが可能になります。
これは上の例で確認できます。ループの 2 回目では、
a_fut または b_fut が完了しています。
future::ready が返す Future は FusedFuture を実装しているため、
それを再び poll しないよう select に伝えることができます。
Stream には、対応する FusedStream トレイトがあることに注意してください。
このトレイトを実装している Stream、または .fuse() を使ってラップされた Stream は、
.next() / .try_next() コンビネータから FusedFuture な Future を yield します。
#![allow(unused)]
fn main() {
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}
total
}
}
Fuse と FuturesUnordered を使った select ループ内の並行タスク
少し見つけにくいものの便利な関数の 1 つに Fuse::terminated() があります。
これは、すでに終了している空の Future を構築し、
後から実行する必要のある Future で埋めることを可能にします。
これは、select ループ中に実行する必要があるものの、
その select ループ自体の内部で作成されるタスクがある場合に便利です。
.select_next_some() 関数を使っている点に注意してください。
これは select と組み合わせて使用でき、
Stream から返される Some(_) 値に対してのみブランチを実行し、
None を無視できます。
#![allow(unused)]
fn main() {
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) { /* ... */ }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived -- start a new `run_on_new_num_fut`,
// dropping the old one.
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
// Run the `run_on_new_num_fut`
() = run_on_new_num_fut => {},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
}
同じ Future の多数のコピーを同時に実行する必要がある場合は、
FuturesUnordered 型を使用します。
次の例は上の例に似ていますが、新しい Future が作成されたときにそれらを中止するのではなく、
run_on_new_num_fut の各コピーを最後まで実行します。
また、run_on_new_num_fut が返す値も出力します。
#![allow(unused)]
fn main() {
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived -- start a new `run_on_new_num_fut`.
run_on_new_num_futs.push(run_on_new_num(new_num));
},
// Run the `run_on_new_num_futs` and check if any have completed
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
}