Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

ストリーム:順に現れる Future

この章の前の方にある 「メッセージ受け渡し」 節で、async チャネルのレシーバーをどのように使ったかを思い出してください。非同期の recv メソッドは、時間の経過とともに一連のアイテムを生成します。これは、 ストリーム として知られる、はるかに一般的なパターンの一例です。多くの概念は自然に ストリームとして表現できます。たとえば、キュー内で利用可能になるアイテム、 完全なデータセットがコンピューターのメモリに収まりきらないときに ファイルシステムから段階的に取り出されるデータのチャンク、あるいは 時間の経過とともにネットワーク越しに到着するデータです。 ストリームは Future なので、ほかの種類の Future と一緒に使い、 興味深い方法で組み合わせることができます。たとえば、イベントをまとめて ネットワーク呼び出しが多くなりすぎるのを避けたり、長時間実行される一連の 操作にタイムアウトを設定したり、不要な処理を避けるために ユーザーインターフェイスのイベントをスロットリングしたりできます。

第 13 章で、「Iterator トレイトと next メソッド」 節で Iterator トレイトを見たときにも、一連のアイテムを扱いました。ただし、イテレータと async チャネルのレシーバーの間には 2 つの違いがあります。1 つ目の違いは時間です。 イテレータは同期的ですが、チャネルのレシーバーは非同期です。2 つ目の違い は API です。Iterator を直接扱うときは、その同期的な next メソッドを呼び出します。特に trpl::Receiver ストリームでは、代わりに 非同期の recv メソッドを呼び出しました。そのほかの点では、これらの API は非常によく似ており、 その類似性は偶然ではありません。ストリームは、反復処理の非同期版の ようなものです。ただし、trpl::Receiver は具体的にはメッセージを受信するのを 待ちますが、汎用のストリーム API はそれよりずっと広範です。 Iterator と同じように次のアイテムを提供しますが、それを非同期に行います。

Rust におけるイテレータとストリームの類似性は、実際には 任意のイテレータからストリームを作成できることを意味します。イテレータと同様に、 ストリームでもその next メソッドを呼び出し、その出力を await して扱えます。これはリスト 17-21 に示すとおりですが、このコードはまだコンパイルできません。

extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

まず数値の配列から始め、それをイテレータに変換してから map を呼び出し、すべての値を 2 倍にします。次に、 trpl::stream_from_iter 関数を使って、そのイテレータを ストリームに変換します。その後、while let ループを使って、 到着した順にストリーム内のアイテムを処理します。

残念ながら、このコードを実行しようとするとコンパイルできず、 代わりに利用可能な next メソッドがないと報告されます。

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

この出力が説明しているように、このコンパイラエラーの原因は、next メソッドを使えるようにするために適切なトレイトをスコープに入れる必要があることです。 ここまでの説明から、そのトレイトは Stream だと考えるのがもっともですが、 実際には StreamExt です。extension の略である Ext は、 あるトレイトを別のトレイトで拡張することを表す、 Rust コミュニティで一般的なパターンです。

Stream トレイトは、実質的に Iterator トレイトと Future トレイトを組み合わせた低レベルのインターフェイスを定義します。 StreamExtStream の上に、より高レベルな API 群を提供します。 これには next メソッドに加えて、Iterator トレイトが提供するものに似たほかのユーティリティメソッドも含まれます。StreamStreamExt はまだ Rust の標準ライブラリの一部ではありませんが、 エコシステム内のほとんどのクレートでは、これと似た定義を使っています。

このコンパイラエラーを修正するには、リスト 17-22 のように trpl::StreamExt に対する use 文を追加します。

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --snip--
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

これらの要素をすべて組み合わせると、このコードは期待どおりに動作します。さらに、 StreamExt がスコープに入ったので、イテレータのときと同じように、 そのすべてのユーティリティメソッドを使えます。