Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

select!

futures::select マクロは複数の Future を同時に実行し、 いずれかの Future が完了した時点ですぐに応答できるようにします。

#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

上の関数は t1t2 の両方を並行して実行します。 t1 または t2 のいずれかが終了すると、対応するハンドラーが println! を呼び出し、 残りのタスクを完了させることなく関数は終了します。

select の基本構文は <pattern> = <expression> => <code>, であり、 select の対象にしたい Future の数だけ繰り返します。

default => ...complete => ...

selectdefault ブランチと complete ブランチもサポートしています。

default ブランチは、select の対象になっている Future のどれもまだ完了していない場合に実行されます。 したがって、default ブランチを持つ select は常に即座に返ります。 他の Future のどれも準備できていなければ、default が実行されるためです。

complete ブランチは、select の対象になっているすべての Future が完了し、 それ以上進捗しなくなった場合を処理するために使用できます。 これは、select! をループで使うときに便利なことがよくあります。

#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

Unpin および FusedFuture との相互作用

上の最初の例で気づいたかもしれませんが、 2 つの async fn が返す Future に対して .fuse() を呼び出し、 さらに pin_mut でピン留めする必要がありました。 これらの呼び出しはいずれも必要です。 なぜなら、select で使用する Future は Unpin トレイトと FusedFuture トレイトの両方を実装していなければならないためです。

select で使用される Future は値として受け取られるのではなく、 可変参照として受け取られるため、Unpin が必要です。 Future の所有権を取得しないことで、未完了の Future は select の呼び出し後に再び使用できます。

同様に、FusedFuture トレイトが必要なのは、 select が完了後の Future を poll してはならないためです。 FusedFuture は、自身が完了したかどうかを追跡する Future によって実装されます。 これにより、ループ内で select を使用し、 まだ完了していない Future だけを poll することが可能になります。 これは上の例で確認できます。ループの 2 回目では、 a_fut または b_fut が完了しています。 future::ready が返す Future は FusedFuture を実装しているため、 それを再び poll しないよう select に伝えることができます。

Stream には、対応する FusedStream トレイトがあることに注意してください。 このトレイトを実装している Stream、または .fuse() を使ってラップされた Stream は、 .next() / .try_next() コンビネータから FusedFuture な Future を yield します。

#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

FuseFuturesUnordered を使った select ループ内の並行タスク

少し見つけにくいものの便利な関数の 1 つに Fuse::terminated() があります。 これは、すでに終了している空の Future を構築し、 後から実行する必要のある Future で埋めることを可能にします。

これは、select ループ中に実行する必要があるものの、 その select ループ自体の内部で作成されるタスクがある場合に便利です。

.select_next_some() 関数を使っている点に注意してください。 これは select と組み合わせて使用でき、 Stream から返される Some(_) 値に対してのみブランチを実行し、 None を無視できます。

#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived -- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

同じ Future の多数のコピーを同時に実行する必要がある場合は、 FuturesUnordered 型を使用します。 次の例は上の例に似ていますが、新しい Future が作成されたときにそれらを中止するのではなく、 run_on_new_num_fut の各コピーを最後まで実行します。 また、run_on_new_num_fut が返す値も出力します。

#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived -- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}