Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

イテレーションと並行性

同期的な Iterator と同様に、Stream 内の値をイテレートして処理する方法は数多くあります。mapfilterfold のようなコンビネータースタイルのメソッドや、それらのエラー時に早期終了する仲間である try_maptry_filtertry_fold があります。

残念ながら、for ループは Stream では使用できませんが、命令型スタイルのコードでは、while letnext/try_next 関数を使用できます。

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

しかし、単に一度に 1 つの要素を処理しているだけでは、並行性の機会を逃している可能性があります。そもそも、それこそが非同期コードを書いている理由です。ストリームから複数の項目を並行して処理するには、for_each_concurrent メソッドと try_for_each_concurrent メソッドを使用します。

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}