ストリーム:順に現れる Future
この章の前の方にある
「メッセージ受け渡し」 節で、async
チャネルのレシーバーをどのように使ったかを思い出してください。非同期の
recv メソッドは、時間の経過とともに一連のアイテムを生成します。これは、
ストリーム として知られる、はるかに一般的なパターンの一例です。多くの概念は自然に
ストリームとして表現できます。たとえば、キュー内で利用可能になるアイテム、
完全なデータセットがコンピューターのメモリに収まりきらないときに
ファイルシステムから段階的に取り出されるデータのチャンク、あるいは
時間の経過とともにネットワーク越しに到着するデータです。
ストリームは Future なので、ほかの種類の Future と一緒に使い、
興味深い方法で組み合わせることができます。たとえば、イベントをまとめて
ネットワーク呼び出しが多くなりすぎるのを避けたり、長時間実行される一連の
操作にタイムアウトを設定したり、不要な処理を避けるために
ユーザーインターフェイスのイベントをスロットリングしたりできます。
第 13 章で、「Iterator トレイトと next メソッド」 節で Iterator
トレイトを見たときにも、一連のアイテムを扱いました。ただし、イテレータと
async チャネルのレシーバーの間には 2 つの違いがあります。1 つ目の違いは時間です。
イテレータは同期的ですが、チャネルのレシーバーは非同期です。2 つ目の違い
は API です。Iterator を直接扱うときは、その同期的な
next メソッドを呼び出します。特に trpl::Receiver ストリームでは、代わりに
非同期の recv メソッドを呼び出しました。そのほかの点では、これらの API は非常によく似ており、
その類似性は偶然ではありません。ストリームは、反復処理の非同期版の
ようなものです。ただし、trpl::Receiver は具体的にはメッセージを受信するのを
待ちますが、汎用のストリーム API はそれよりずっと広範です。
Iterator と同じように次のアイテムを提供しますが、それを非同期に行います。
Rust におけるイテレータとストリームの類似性は、実際には
任意のイテレータからストリームを作成できることを意味します。イテレータと同様に、
ストリームでもその next メソッドを呼び出し、その出力を await して扱えます。これはリスト
17-21 に示すとおりですが、このコードはまだコンパイルできません。
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
まず数値の配列から始め、それをイテレータに変換してから
map を呼び出し、すべての値を 2 倍にします。次に、
trpl::stream_from_iter 関数を使って、そのイテレータを
ストリームに変換します。その後、while let ループを使って、
到着した順にストリーム内のアイテムを処理します。
残念ながら、このコードを実行しようとするとコンパイルできず、
代わりに利用可能な next メソッドがないと報告されます。
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
この出力が説明しているように、このコンパイラエラーの原因は、next
メソッドを使えるようにするために適切なトレイトをスコープに入れる必要があることです。
ここまでの説明から、そのトレイトは Stream だと考えるのがもっともですが、
実際には StreamExt です。extension の略である Ext は、
あるトレイトを別のトレイトで拡張することを表す、
Rust コミュニティで一般的なパターンです。
Stream トレイトは、実質的に Iterator トレイトと Future
トレイトを組み合わせた低レベルのインターフェイスを定義します。
StreamExt は Stream の上に、より高レベルな API 群を提供します。
これには next メソッドに加えて、Iterator
トレイトが提供するものに似たほかのユーティリティメソッドも含まれます。Stream と
StreamExt はまだ Rust の標準ライブラリの一部ではありませんが、
エコシステム内のほとんどのクレートでは、これと似た定義を使っています。
このコンパイラエラーを修正するには、リスト 17-22 のように
trpl::StreamExt に対する use 文を追加します。
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
これらの要素をすべて組み合わせると、このコードは期待どおりに動作します。さらに、
StreamExt がスコープに入ったので、イテレータのときと同じように、
そのすべてのユーティリティメソッドを使えます。