Asyncのトレイトを詳しく見る
この章を通して、Future、Stream、StreamExt
トレイトをさまざまな形で使ってきました。ただしこれまでは、それらがどのように動作するのか、あるいはどのように組み合わさるのかという詳細には、あまり深く立ち入りませんでした。日々の Rust の作業では、たいていそれで十分です。しかし場合によっては、これらのトレイトについてもう少し詳しく理解する必要がある場面に出会います。その際には、Pin 型や Unpin トレイトもあわせて理解する必要があります。この節では、そのような場面で役立つだけの内容を掘り下げます。ただし、本当に深い説明はほかのドキュメントに譲ることにします。
Future トレイト
まずは、Future トレイトがどのように動作するのかを、もう少し詳しく見ていきましょう。Rust では次のように定義されています。
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}
このトレイト定義には、新しい型がいくつも登場し、これまで見たことのない構文も含まれています。そこで、この定義をひとつずつ見ていきましょう。
まず、Future の関連型 Output は、その future が最終的に何に解決されるかを示します。これは、Iterator トレイトにおける関連型 Item に相当します。
次に、Future には poll メソッドがあります。このメソッドは、self パラメータとして特別な Pin 参照と、Context 型への可変参照を受け取り、Poll<Self::Output> を返します。Pin と Context については、すぐ後で詳しく説明します。ひとまず今は、このメソッドが返す Poll 型に注目しましょう。
#![allow(unused)]
fn main() {
pub enum Poll<T> {
Ready(T),
Pending,
}
}
この Poll 型は Option に似ています。値を持つバリアント Ready(T) と、値を持たない Pending があります。ただし、Poll の意味は Option とはかなり異なります。Pending バリアントは、その future にまだやるべき仕事が残っていることを示しており、そのため呼び出し側はあとで再度確認する必要があります。一方 Ready バリアントは、その Future が作業を完了し、T の値が利用可能になったことを示します。
注:
pollを直接呼び出す必要があることはめったにありませんが、もし必要になった場合は、ほとんどの future ではReadyを返したあとに呼び出し側が再度pollを呼んではならない、という点に注意してください。多くの future は、準備完了後に再度 poll されると panic します。再度 poll しても安全な futures については、そのことがドキュメントに明示されています。これはIterator::nextの振る舞いに似ています。
await を使うコードを見るとき、Rust はその裏側で、それを poll を呼び出すコードにコンパイルしています。1 つの URL に対して解決後にページタイトルを出力したリスト 17-4 を振り返ると、Rust はそれをだいたい次のようなコードにコンパイルします(ただし正確に同じではありません)。
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// しかし、ここには何が入るのでしょうか?
}
}
future がまだ Pending のとき、私たちは何をすべきでしょうか。future が最終的に ready になるまで、何度も何度も何度も試せる何らかの方法が必要です。言い換えれば、ループが必要です。
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// 続ける
}
}
}
しかし、もし Rust が本当にそのコードそのままにコンパイルしていたなら、すべての await はブロッキングになってしまいます。これは、私たちが目指していたものとまさに逆です。そこで Rust は、このループが制御を何か別のものに渡せるようにします。その何かは、この future の作業を一時停止して別の future に取り組み、その後でまたこの future を確認できます。これまで見てきたように、その「何か」とは async ランタイムであり、このスケジューリングと調停の処理はその主要な仕事の 1 つです。
「メッセージパッシングを使って2つのタスク間でデータを送信する」節では、
rx.recv を待機することについて説明しました。recv 呼び出しは future を返し、その future を await すると poll が行われます。チャネルが閉じたとき、Some(message) または None のどちらかで準備が整うまで、ランタイムがその future を一時停止することを確認しました。Future トレイト、特に Future::poll についてより深く理解すると、それがどのように動くのかが分かります。その future が Poll::Pending を返したとき、ランタイムはまだ ready ではないことを認識します。逆に、poll が Poll::Ready(Some(message)) または
Poll::Ready(None) を返したとき、ランタイムはその future が ready になったことを認識し、処理を先へ進めます。
ランタイムがそれをどのように実現しているかという正確な詳細は、この本の範囲を超えます。しかし重要なのは、future の基本的な仕組みを理解することです。つまり、ランタイムは自分が担当する各 future を poll し、まだ ready でなければその future を再び休止状態に戻します。
Pin 型と Unpin トレイト
リスト 17-13 では、trpl::join! マクロを使って 3 つの
future を await しました。しかし、実行時まで数が分からない複数の futures を含む、ベクタのようなコレクションを持つことはよくあります。そこで、リスト
17-13 を、3 つの futures をベクタに入れ、代わりに trpl::join_all 関数を呼び出すリスト 17-23 のコードに変更してみましょう。ただし、これはまだコンパイルできません。
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
各 future を Box に入れて トレイトオブジェクト にしています。これは第 12 章の「run からエラーを返す」節で行ったのと同じです。(トレイトオブジェクトについては第 18 章で詳しく扱います。)トレイトオブジェクトを使うことで、これらの型が生成するそれぞれの無名の future を同じ型として扱えます。なぜなら、それらはすべて Future トレイトを実装しているからです。
これは意外に思えるかもしれません。というのも、どの async ブロックも何も返さないので、それぞれは Future<Output = ()> を生成するからです。ただし、Future はトレイトであること、そしてコンパイラは、出力型が同じであっても async
ブロックごとに固有の enum を生成することを思い出してください。手で書いた 2 つの異なる構造体を 1 つの Vec に入れられないのと同じように、コンパイラ生成の enum どうしを混在させることもできません。
その後、future のコレクションを trpl::join_all 関数に渡して、その結果を await します。しかし、これはコンパイルできません。以下がエラーメッセージの関連部分です。
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
このエラーメッセージの注記は、値を ピン留め するために pin! マクロを使うべきだと伝えています。これは、値をメモリ内で移動しないことを保証する Pin 型の中に入れる、という意味です。エラーメッセージがピン留めが必要だと言っているのは、dyn Future<Output = ()> が Unpin トレイトを実装する必要があるのに、現時点では実装していないためです。
trpl::join_all 関数は JoinAll という構造体を返します。この構造体は型 F に対してジェネリックであり、その F には Future トレイトを実装するという制約があります。await を使って future を直接 await すると、その future は暗黙的にピン留めされます。これが、future を await したいあらゆる場面で pin! を使う必要がない理由です。
しかし、ここでは future を直接 await しているわけではありません。代わりに、future のコレクションを join_all 関数に渡して、新しい future である JoinAll を構築しています。join_all のシグネチャでは、コレクション内の各要素の型がすべて Future トレイトを実装していることが必要であり、Box<T> が Future を実装するのは、それが包む T が Unpin トレイトを実装する future である場合に限られます。
これは一度に理解するにはなかなか多いですね! 本当に理解するために、Future トレイトが実際にどのように動作しているのか、特にピン留めまわりについて、もう少し踏み込んで見ていきましょう。もう一度 Future トレイトの定義を見てください。
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
// 必須メソッド
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
}
cx パラメータとその Context 型は、ランタイムが遅延性を保ちながら、いつ任意の future を確認すべきかを実際に知る仕組みの鍵です。繰り返しますが、その仕組みの詳細はこの章の範囲を超えており、通常は独自の Future 実装を書くときにだけ考えれば十分です。ここでは代わりに self の型に注目します。self に型注釈が付いたメソッドを見るのはこれが初めてです。self に対する型注釈は、ほかの関数パラメータに対する型注釈と似たように機能しますが、2つの重要な違いがあります。
- そのメソッドを呼び出すために
selfがどの型でなければならないかを Rust に伝えます。 - ただし、任意の型にできるわけではありません。そのメソッドが実装されている型、その型への参照またはスマートポインタ、あるいはその型への参照を包んだ
Pinに制限されます。
この構文については 第18章 でさらに見ていきます。今のところは、future を poll してそれが Pending か Ready(Output) かを確認したいなら、その型への Pin で包まれた可変参照が必要だと知っておけば十分です。
Pin は、&、&mut、Box、Rc のようなポインタ風の型を包むラッパーです。(厳密には、Pin は Deref または DerefMut トレイトを実装する型に対して機能しますが、これは実質的には参照とスマートポインタだけを扱うのと同じです。)Pin 自体はポインタではなく、Rc や Arc が参照カウントに関して持つような独自の振る舞いもありません。これは純粋に、コンパイラがポインタの使い方に関する制約を強制するために使える道具です。
await が poll の呼び出しによって実装されていることを思い出すと、先ほど見たエラーメッセージの説明が少し見えてきますが、あのメッセージで言及されていたのは Pin ではなく Unpin でした。では、Pin は Unpin と正確にはどう関係していて、なぜ Future は poll を呼び出すために self が Pin 型に入っている必要があるのでしょうか。
この章の前の方で見たように、future の中にある一連の await ポイントは状態機械にコンパイルされ、コンパイラはその状態機械が借用や所有権を含む Rust の通常の安全性ルールすべてに従うようにします。それを実現するために、Rust はある await ポイントと次の await ポイント、あるいは async ブロックの終わりとの間でどのデータが必要になるかを調べます。そして、コンパイル後の状態機械に対応するバリアントを作成します。各バリアントは、ソースコードのその区間で使われるデータに必要なアクセス権を受け取ります。それは、そのデータの所有権を取得する場合もあれば、そのデータへの可変参照または不変参照を取得する場合もあります。
ここまでは順調です。ある async ブロックにおける所有権や参照について何か間違えれば、借用チェッカーが教えてくれます。そのブロックに対応する future を移動したくなると――たとえば、それを Vec に移動して join_all に渡すような場合には――話はもっと厄介になります。
future を移動するということは――join_all でイテレータとして使うためにデータ構造に push する場合でも、関数からそれを返す場合でも――実際には Rust が私たちのために作った状態機械を移動するということです。そして、Rust におけるほとんどのほかの型とは異なり、Rust が async ブロックのために作る future では、図17-4 の簡略図に示すように、各バリアントのフィールドの中に自分自身への参照が含まれることがあります。
ただし、デフォルトでは、自分自身への参照を持つオブジェクトはどれも移動すると安全ではありません。参照は常に、それが指す対象の実際のメモリアドレスを指すからです(図17-5 を参照)。データ構造そのものを移動すると、それらの内部参照は古い場所を指したままになります。しかし、そのメモリ位置はもはや無効です。まず第一に、データ構造に変更を加えても、その値は更新されません。さらに、もっと重要なこととして、コンピュータはそのメモリをほかの目的のために自由に再利用できるようになります! その結果、あとでまったく無関係なデータを読んでしまうおそれがあります。
Pin はこれを土台にして、私たちが必要とするまさにその保証を与えて
くれます。値へのポインタを Pin でラップしてその値を ピン留め すると、
その値はもはや移動できません。したがって、Pin<Box<SomeType>> がある
場合、実際にピン留めされるのは SomeType の値であり、Box ポインタ
ではありません。図 17-6 はこの過程を示しています。
実際には、Box ポインタ自体はなお自由に動かせます。思い出して
ください。重要なのは、最終的に参照されているデータがその場にとどまる
ようにすることです。図 17-7 のように、ポインタが動いても、
それが指しているデータ が同じ場所にあるなら、問題が起こる可能性は
ありません。(練習として、これらの型のドキュメントと std::pin
モジュールのドキュメントを読み、Box をラップする Pin でこれを
どう実現するか考えてみてください。)重要なのは、自己参照型そのものは
依然としてピン留めされているため移動できない、という点です。
しかし、たとえ Pin ポインタの背後にあったとしても、ほとんどの型は
移動してまったく安全です。ピン留めについて考える必要があるのは、値が
内部参照を持つ場合だけです。数値や真偽値のようなプリミティブ値は安全
です。明らかに内部参照を持たないからです。
Rust で通常扱うほとんどの型も同様です。たとえば Vec は、心配せずに
移動できます。ここまで見てきたことを踏まえると、Pin<Vec<String>> が
ある場合、Vec<String> にはほかの参照がなければ常に安全に移動できる
にもかかわらず、Pin が提供する安全ではあるものの制約の強い API を
通してすべてを行わなければなりません。このような場合には値を移動しても
よいとコンパイラに伝える方法が必要であり、そこで Unpin の出番です。
Unpin は、16 章で見た Send トレイトや Sync トレイトと同様の
マーカートレイトであり、それ自体には何の機能もありません。マーカー
トレイトは、あるトレイトを実装した型を特定の文脈で安全に使えることを
コンパイラに伝えるためだけに存在します。Unpin は、ある型について、
その値を安全に移動できるかどうかに関して特別な保証を維持する必要が
ない ことをコンパイラに知らせます。
Send や Sync の場合と同様に、コンパイラは、安全だと証明できる
すべての型に対して自動的に Unpin を実装します。ここでも Send や
Sync と同じく、型に Unpin が 実装されない 場合が特別なケース
です。その表記は impl !Unpin for SomeType であり、
ここで SomeType は、その型へのポインタが Pin で
使われるときに安全であるために、それらの保証を 守る必要がある 型の
名前です。
言い換えると、Pin と Unpin の関係について覚えておくべきことは
2 つあります。1 つ目は、Unpin が「通常」のケースであり、!Unpin が
特別なケースだということです。2 つ目は、型が Unpin と !Unpin の
どちらを実装しているかが意味を持つのは、Pin<&mut
SomeType> のような、その型へのピン留めされたポインタを
使うとき だけ だということです。
これを具体的にするために、String を考えてみましょう。String には
長さと、それを構成する Unicode 文字があります。図 17-8 にあるように、
String を Pin でラップできます。しかし、String は Rust の
ほとんどのほかの型と同様に、自動的に Unpin を実装します。
その結果、String が代わりに !Unpin を実装していたなら不正になる
ようなこと、たとえば図 17-9 のようにメモリ内のまったく同じ場所で
1 つの文字列を別の文字列に置き換えることができます。これは Pin の
契約に違反しません。String には、移動することを安全でなくする内部
参照がないからです。まさにそれが、!Unpin ではなく Unpin を実装
している理由です。
これで、先ほどリスト17-23にあったその join_all 呼び出しで報告された
エラーを理解するのに十分な知識が身に付きました。もともと、async ブロックが生成した
future を Vec<Box<dyn Future<Output = ()>>> に移動しようとしましたが、これまでに見たように、
それらの future は内部参照を持つ可能性があるため、自動的には
Unpin を実装しません。いったんそれらを pin すれば、生成された Pin 型を
Vec に渡せます。そうすれば、future の基になるデータは 移動されない と
確信できます。リスト17-24は、3つの future のそれぞれを定義している箇所で
pin! マクロを呼び出し、トレイトオブジェクト型を調整することで、コードをどのように修正するかを示しています。
extern crate trpl; // required for mdbook test
use std::pin::{Pin, pin};
// --snip--
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = pin!(async move {
// --snip--
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
});
let rx_fut = pin!(async {
// --snip--
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
let tx_fut = pin!(async move {
// --snip--
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
});
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
この例はこれでコンパイルも実行もでき、実行時にベクタへ future を追加したり削除したりして、それらをすべて join できます。
Pin と Unpin が主に重要になるのは、日常的な Rust コードを書くときというより、
より低レベルなライブラリを構築するときや、ランタイムそのものを実装するときです。
とはいえ、エラーメッセージの中でこれらのトレイトを見かけたときには、これで
自分のコードをどう修正すればよいか、よりよく分かるようになったはずです。
注:
PinとUnpinのこの組み合わせにより、そうでなければ 自己参照であるために実装が難しい、複雑な型の一群を Rust で安全に 実装できるようになります。今日ではPinを必要とする型は async Rust で最もよく見られますが、ときどき他の文脈で目にすることもあります。
PinとUnpinがどのように動作するかの詳細や、それらが守ることを求められる 規則については、std::pinの API ドキュメントで広範に扱われているので、 さらに学びたいならそこから始めるのがよいでしょう。さらに詳しく内部でどのように動作しているのかを理解したい場合は、 Asynchronous Programming in Rust の第2章と 第4章 を参照してください。
Stream トレイト
これで Future、Pin、Unpin の各トレイトをより深く理解できたので、
次は Stream トレイトに目を向けましょう。この章の前の方で学んだように、
ストリームは非同期イテレータに似ています。ただし、Iterator と
Future と違って、本稿執筆時点では Stream は標準ライブラリには定義されていません。しかし、
エコシステム全体で使われている futures クレート由来の非常に一般的な定義が
存在します。
Stream トレイトがそれらをどのように組み合わせるのかを見る前に、
Iterator と Future の各トレイトの定義を確認しましょう。Iterator からは、
シーケンスという考え方を得ます。つまり、その next メソッドは
Option<Self::Item> を返します。Future からは、時間の経過に伴う準備完了という考え方を得ます。
つまり、その poll メソッドは Poll<Self::Output> を返します。時間の経過とともに
準備が整う要素のシーケンスを表現するために、これらの性質を組み合わせた
Stream トレイトを次のように定義します。
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}
}
Stream トレイトは、ストリームが生成する要素の型として Item という
関連型を定義します。これは Iterator に似ており、要素は 0 個の場合もあれば多数ある場合もあります。これに対して Future では、たとえ単位型 () であっても、Output は常に 1 つだけです。
Stream は、それらの要素を取得するためのメソッドも定義します。これを poll_next と
呼ぶのは、Future::poll と同じように poll し、Iterator::next と同じように
要素のシーケンスを生成することを明確にするためです。その戻り値の型は
Poll と Option を組み合わせたものです。外側の型が Poll なのは、
future と同じく、準備ができているかを確認する必要があるからです。内側の型が Option
なのは、イテレータと同じく、まだ要素があるかどうかを示す必要が
あるからです。
この定義に非常に近いものが、いずれ Rust の 標準ライブラリの一部になる可能性が高いでしょう。それまでは、これはほとんどのランタイムの ツールキットの一部なので、これを前提にしてかまいませんし、これから説明する内容は一般に当てはまるはずです。
ただし、「ストリーム: 連なった Future」 節で見た例では、poll_next も Stream も使わず、
代わりに next と StreamExt を使いました。もちろん、
自分たちで Stream の状態機械を手書きすれば、poll_next API を
直接使って処理することもできます。ちょうど、future をその poll
メソッド経由で直接扱うこともできるのと同じです。しかし、
await を使う方がずっと扱いやすく、StreamExt トレイトはそのための next
メソッドを提供してくれます。
#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};
trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
trait StreamExt: Stream {
async fn next(&mut self) -> Option<Self::Item>
where
Self: Unpin;
// other methods...
}
}
注: この章の前の方で実際に使用した定義は、これとは少し 異なっています。というのも、トレイトで async 関数を使うことをまだ サポートしていなかった Rust のバージョンにも対応しているからです。その結果、 次のようになっています。
fn next(&mut self) -> Next<'_, Self> where Self: Unpin;この
Next型はFutureを実装するstructで、selfへの参照の ライフタイムをNext<'_, Self>で表せるようにするため、このメソッドでawaitが機能します。
StreamExt トレイトには、ストリームで使える興味深いメソッドがすべて
含まれています。Stream を実装するすべての型に対して StreamExt は
自動的に実装されますが、これらのトレイトが別々に定義されているのは、
基盤となるトレイトに影響を与えずにコミュニティが利便性 API を改善していけるようにするためです。
trpl クレートで使っている版の StreamExt では、このトレイトは next
メソッドを定義するだけでなく、Stream::poll_next の呼び出しに関わる詳細を
正しく処理する next のデフォルト実装も提供しています。これは、
自分でストリーミングデータ型を書く必要がある場合でも、実装しなければならないのは Stream だけでよく、
そのデータ型を使う人は誰でも自動的に StreamExt とそのメソッドを利用できる、
ということです。
これらのトレイトに関する低レベルな詳細については、ここまでにします。締めくくりとして、 future(ストリームを含む)、タスク、スレッドがそれぞれどのように 結び付くのかを考えてみましょう!