注記: このガイドは、長い間あまり作業されていなかった後、現在書き直しが進められています。作業中であり、未整備の部分が多く、既存の内容もやや粗い状態です。
はじめに
この本は、Rust における非同期プログラミングのガイドです。最初の一歩を踏み出し、より高度なトピックについてさらに知るための手助けとなるように設計されています。非同期プログラミングの経験(Rust でも他の言語でも)は前提としていませんが、すでに Rust には慣れていることを前提としています。Rust について学びたい場合は、The Rust Programming Language から始めるとよいでしょう。
この本には主に 2 つの部があります。第 1 部は初心者向けのガイドで、順番に読むことを想定しており、まったくの初心者から中級レベルまで進めるように設計されています。第 2 部は、より高度なトピックに関する独立した章の集まりです。第 1 部を読み終えた後、またはすでに async Rust の経験がある場合に役立つはずです。
この本は複数の方法で読み進めることができます。
- 最初から最後まで順番に読むことができます。これは、async Rust を初めて学ぶ人に推奨される進め方であり、少なくとも本の第 1 部についてはそうです。
- Web ページの左側に概要目次があります。
- 広いトピックについて情報が欲しい場合は、トピック索引から始めるとよいでしょう。
- 特定のトピックに関するすべての議論を探したい場合は、詳細索引から始めるとよいでしょう。
- あなたの疑問が FAQ で答えられているかどうか確認できます。
非同期プログラミングとは何か、なぜそれを行うのか?
並行プログラミングでは、プログラムは同時に複数のことを行います(または、少なくともそう見えます)。スレッドを使ったプログラミングは、並行プログラミングの一形態です。スレッド内のコードは逐次的なスタイルで書かれ、オペレーティングシステムがスレッドを並行に実行します。非同期プログラミングでは、並行性は完全にプログラム内で発生します(オペレーティングシステムは関与しません)。非同期ランタイム(Rust では単なる別のクレートです)が、プログラマーが await キーワードを使って明示的に制御を譲ることと連携して、非同期タスクを管理します。
オペレーティングシステムが関与しないため、非同期の世界におけるコンテキストスイッチは非常に高速です。さらに、非同期タスクはオペレーティングシステムのスレッドよりもメモリのオーバーヘッドがはるかに小さくなります。このため、非常に多くの並行タスクを処理する必要があり、それらのタスクが多くの時間を待機(たとえば、クライアントからの応答や IO の待機)に費やすシステムには、非同期プログラミングが適しています。また、メモリ量が非常に限られており、スレッドを提供するオペレーティングシステムを持たないマイクロコントローラーにも、非同期プログラミングは適しています。
非同期プログラミングは、タスクをどのように実行するか(並列性と並行性のレベル、制御フロー、スケジューリングなど)について、プログラマーに細かな制御も提供します。つまり、多くの用途において、非同期プログラミングは表現力が高く、かつ使いやすいものになり得ます。特に、Rust における非同期プログラミングには、キャンセルという強力な概念があり、多くの異なる形態の並行性(spawn とそのバリエーション、join、select、for_each_concurrent などを含む構文を使って表現される)をサポートしています。これらにより、タイムアウト、一時停止、スロットリングといった概念の、合成可能で再利用可能な実装が可能になります。
Hello, world!
async Rust がどのようなものかを少し感じてもらうために、ここに「hello, world」の例を示します。並行性はなく、非同期であることを実際に活かしているわけではありません。非同期関数を定義して使用し、“hello, world!” を出力します。
// Define an async function.
async fn say_hello() {
println!("hello, world!");
}
#[tokio::main] // Boilerplate which lets us write `async fn main`, we'll explain it later.
async fn main() {
// Call an async function and await its result.
say_hello().await;
}
詳しいことは後で説明します。今のところは、async fn を使って非同期関数を定義し、.await を使ってそれを呼び出している点に注目してください。Rust の非同期関数は、await されない限り何もしません1。
この本のすべての例と同様に、完全な例(たとえば Cargo.toml を含む)を確認したい場合や、自分のローカル環境で実行したい場合は、この本の GitHub リポジトリで見つけることができます。例: examples/hello-world。
Async Rust の開発
Rust の async 機能はしばらく前から開発されていますが、言語の「完成した」部分ではありません。Async Rust(少なくとも stable コンパイラーと標準ライブラリで利用できる部分)は信頼性が高く、高性能です。最大規模のテック企業における最も要求の厳しい状況の一部で、本番環境で使用されています。しかし、未整備の部分や粗い部分(信頼性ではなく使いやすさの意味で粗い部分)もあります。async Rust を学ぶ過程で、こうした部分のいくつかに出くわす可能性があります。未整備の部分のほとんどには回避策があり、それらはこの本で扱います。
現在、非同期イテレーター(ストリームとしても知られています)を扱う部分で、多くのユーザーが粗い部分を見つけています。トレイトにおける async の一部の使い方は、まだ十分にはサポートされていません。非同期破棄については、よい解決策がありません。
Async Rust は現在も活発に取り組まれています。開発の動向を追いたい場合は、Async Working Group のホームページを確認できます。そこにはロードマップも含まれています。または、Rust Project 内の async プロジェクト目標を読むこともできます。
Rust はオープンソースプロジェクトです。async Rust の開発に貢献したい場合は、メインの Rust リポジトリにあるコントリビューション用ドキュメントから始めてください。
-
これは実際には悪い例です。なぜなら、
printlnはブロッキング IO であり、一般に非同期関数内でブロッキング IO を行うのは良くない考えだからです。ブロッキング IO が何であるかはchapter TODOで、非同期関数内でブロッキング IO を行うべきではない理由はchapter TODOで説明します。 ↩
ナビゲーション
TODO ナビゲーションの概要
トピック索引
並行性と並列性
正確性と安全性
- キャンセル
パフォーマンス
テスト
索引
-
非同期/
async -
Futureトレイト
-
マルチタスク
-
テスト
-
トレイト
- async
Future
第1部: Rustにおける非同期プログラミングのガイド
本書のこの部は、async Rustのチュートリアル形式のガイドです。Rustにおける非同期プログラミングの初心者を対象としています。他の言語で非同期プログラミングを経験したことがあるかどうかにかかわらず役立つはずです。経験がある場合は、最初のセクションを飛ばすか、復習としてざっと読むとよいでしょう。また、この他の言語における非同期との比較も、早めに読んでおくとよいかもしれません。
中核概念
まず、プロセス、スレッド、非同期タスクを使った並行プログラミングのさまざまなモデルについて説明します。最初の章では、Rustの非同期モデルの重要な部分を扱います。その後、asyncとawaitのプログラミングパラダイムを導入する第2章で、非同期プログラミングの詳細に入ります。次の章では、さらにいくつかの非同期プログラミングの概念を扱います。
非同期プログラミングの主な動機の1つは、より高性能なI/Oです。これは次の章で扱います。また、同じ章でブロッキングについても詳しく扱います。ブロッキングは非同期プログラミングにおける大きな危険要因であり、同期的に待機する操作(多くの場合I/O)によって、スレッドの進行が妨げられることを指します。
非同期プログラミングのもう1つの動機は、並行コードの抽象化と合成のための新しいモデルを可能にすることです。それを扱った後、並行タスク間の同期に進みます。
非同期プログラミングのためのツールに関する章があります。
最後のいくつかの章では、より専門的なトピックを扱います。まずは非同期の破棄とクリーンアップから始めます(これはよくある要件ですが、現時点では優れた組み込みの解決策がないため、やや専門的なトピックです)。
このガイドの次の2つの章では、非同期プログラミングの2つの基本的な構成要素であるFutureとランタイムについて詳しく説明します。
最後に、タイマーとシグナル処理、および非同期イテレーター(別名ストリーム)を扱います。後者は、非同期イベントのシーケンスを扱う方法です(Futureや非同期関数を使って表現される個々の非同期イベントと比較してください)。これは言語が活発に開発されている領域であり、やや粗削りなところがあります。
並行プログラミング
この章の目的は、async の並行性がどのように機能し、スレッドによる並行性とどのように異なるのかについて、大まかなイメージを持ってもらうことです。実践的な詳細に入る前に、何が起きているのかについて適切なメンタルモデルを持つことが重要だと私は考えていますが、まず実際のコードを見たいタイプの人であれば、次の 1、2 章を読んでからこの章に戻ってくるのもよいでしょう。
まず動機から始め、次に逐次プログラミング、スレッドまたはプロセスを使ったプログラミング、そしてasync プログラミングを扱います。この章は、並行性と並列性に関するセクションで終わります。
ユーザーは、コンピューターに複数のことをさせたいと考えます。ときには、ユーザーはそれらのことを同時に行いたいと考えます(たとえば、エディターで入力しながら同時に音楽アプリを聴くなど)。ときには、複数のタスクを同時に行うほうが効率的です(たとえば、大きなファイルをダウンロードしている間にエディターで作業を進めるなど)。ときには、複数のユーザーが同時に 1 台のコンピューターを使いたい場合もあります(たとえば、複数のクライアントがサーバーに接続している場合など)。
より低レベルの例を挙げると、音楽プログラムは、ユーザーがユーザーインターフェイス(UI)を操作している間も音楽を再生し続ける必要があるかもしれません。「音楽を再生し続ける」ためには、サーバーから音楽データをストリーミングし、そのデータをある形式から別の形式へ処理し、処理済みのデータをオペレーティングシステム(OS)経由でコンピューターのオーディオシステムに送る必要があるかもしれません。ユーザーに対しては、ユーザーの指示に応じてサーバーにデータやコマンドを送受信する必要があるかもしれませんし、音楽を再生しているサブシステムにシグナルを送る必要があるかもしれません(たとえば、ユーザーがトラックを変更したり一時停止したりした場合)。また、グラフィカル表示を更新する必要があるかもしれません(たとえば、ボタンを強調表示したりトラック名を変更したりするなど)。そして、上記すべてを行っている間も、マウスカーソルやテキスト入力の応答性を保たなければなりません。
複数のことを同時に行うこと(またはそう見えること)は、並行性と呼ばれます。プログラムは(OS と連携して)自身の並行性を管理しなければならず、その方法は数多くあります。この章ではそのいくつかを説明しますが、まずは完全に逐次的なコード、つまり並行性がまったくないコードから始めます。
逐次実行
ほとんどのプログラミング言語(Rust を含む)におけるデフォルトの実行モードは、逐次実行です。
do_a_thing();
println!("hello!");
do_another_thing();
各文は、次の文が開始される前に完了します1。それらの文の間では何も起こりません2。これは些細なことに聞こえるかもしれませんが、コードについて推論するうえで非常に有用な性質です。しかし、それは同時に、多くの時間を無駄にしていることも意味します。上の例では、println!("hello!") が起こるのを待っている間に、do_another_thing() を実行できたかもしれません。場合によっては、3 つの文すべてを同時に実行できたかもしれません。
IO3 が発生するときはいつでも(println! を使って出力することは IO です。OS への呼び出しを通じてコンソールにテキストを出力しているためです)、プログラムは次の文を実行する前に IO が完了する4のを待ちます。実行を続ける前に IO の完了を待つことは、プログラムが他の進行を行うのをブロックします。ブロッキング IO は、利用、実装、推論が最も簡単な種類の IO ですが、同時に最も効率の低いものでもあります。逐次的な世界では、IO の完了を待っている間、プログラムは何もできません。
プロセスとスレッド
プロセスとスレッドは、並行性を提供するためにオペレーティングシステムによって提供される概念です。実行可能ファイルごとに 1 つのプロセスがあるため、複数のプロセスをサポートするということは、コンピューターが複数のプログラム5を並行して実行できるということを意味します。1 つのプロセスには複数のスレッドを持たせることができるため、プロセスの内部にも並行性が存在できます。
プロセスとスレッドの扱われ方には、多くの小さな違いがあります。最も重要な違いは、メモリはスレッド間では共有されますが、プロセス間では共有されない6という点です。つまり、プロセス間の通信は、何らかのメッセージパッシングによって行われます。これは、別々のコンピューターで実行されているプログラム間で通信するのに似ています。プログラムの観点では、単一のプロセスがその世界全体です。新しいプロセスを作成することは、新しいプログラムを実行することを意味します。一方で、新しいスレッドを作成することは、単にプログラムの通常の実行の一部です。
プロセスとスレッドのこうした違いのため、プログラマーにとってそれらは非常に異なるものに感じられます。しかし OS の観点では、それらは非常によく似ており、ここでは単一の概念であるかのようにその性質を説明します。ここではスレッドについて話しますが、特に断らない限り、それは「スレッドまたはプロセス」を意味すると理解してください。 OS はスレッドのスケジューリングを担当します。つまり、スレッドをいつ実行し、どのくらいの時間実行するかを決定します。最近のコンピューターの多くは複数のコアを備えているため、文字どおり同時に複数のスレッドを実行できます。しかし、コア数よりもはるかに多くのスレッドが存在することは一般的なので、OS は各スレッドを短い時間だけ実行し、その後それを一時停止して、別のスレッドをしばらく実行します7。複数のスレッドがこのような形で単一のコア上で実行される場合、それはインターリーブまたはタイムスライシングと呼ばれます。OS がスレッドの実行をいつ一時停止するかを選択するため、これはプリエンプティブマルチタスクと呼ばれます(ここでのマルチタスクとは、単に複数のスレッドを同時に実行することを意味します)。OS はスレッドの実行をプリエンプトします(より詳しく言えば、OS は実行をプリエンプティブに一時停止します。プリエンプティブである理由は、最初のスレッドが本来なら一時停止する前に、別のスレッドのための時間を作るために OS がそのスレッドを一時停止し、2 つ目のスレッドが実行できないことが問題になる前に実行できるようにするからです)。
IO についてもう一度見てみましょう。スレッドが IO を待ってブロックすると何が起こるでしょうか?スレッドを備えたシステムでは、OS はそのスレッドを一時停止し(いずれにせよ待つだけなので)、IO が完了したときに再び起こします8。スケジューリングアルゴリズムによっては、IO が完了してから OS が IO を待っているスレッドを起こすまでに少し時間がかかる場合があります。これは、OS が他のスレッドにいくらか作業を進めさせるために待つことがあるためです。これにより、物事ははるかに効率的になります。あるスレッドが IO を待っている間、別のスレッド(またはマルチタスクにより、より可能性が高いのは多数のスレッド)が進捗できます。しかし、IO を行っているスレッドの視点では、物事は依然として逐次的です。そのスレッドは次の操作を開始する前に IO が完了するのを待ちます。
スレッドは、通常はタイムアウト付きで sleep 関数を呼び出すことにより、自分自身を一時停止することも選択できます。この場合、OS はスレッド自身の要求によってスレッドを一時停止します。プリエンプションや IO による一時停止と同様に、OS は後で(タイムアウト後に)スレッドを再び起こして実行を継続させます。
OS が(何らかの理由で)あるスレッドを一時停止して別のスレッドを開始することは、コンテキストスイッチと呼ばれます。切り替えられるコンテキストには、レジスタ、オペレーティングシステムの記録、多くのキャッシュの内容が含まれます。これは決して些細な作業量ではありません。OS への制御の移行とスレッドへの復帰、そして古くなったキャッシュを扱うコストと合わせて、コンテキストスイッチはコストの高い操作です。
最後に、一部のハードウェアや OS はプロセスやスレッドをサポートしていないことに注意してください。これは組み込みの世界でより起こりやすいことです。
非同期プログラミング
非同期プログラミングは、スレッドによる並行処理と同じ高レベルの目標(同時に多くのことを行う)を持つ一種の並行処理ですが、実装は異なります。非同期の並行処理とスレッドによる並行処理の 2 つの大きな違いは、非同期の並行処理は OS の助けを借りずに完全にプログラム内で管理されること9、そしてマルチタスクがプリエンプティブではなく協調的であること10です(これについてはすぐに説明します)。非同期の並行処理には多くの異なるモデルがあります。これらについてはこのガイドの後半で比較しますが、今は Rust のモデルだけに焦点を当てます。
スレッドと区別するために、非同期の並行処理における一連の実行をタスクと呼ぶことにします(これらはグリーンスレッドとも呼ばれますが、これはプリエンプティブスケジューリングやタスクごとに 1 つのスタックを持つといった実装詳細の含意を伴うことがあります)。タスクが実行され、スケジュールされ、メモリ内で表現される方法はスレッドとは大きく異なりますが、高レベルの直感としては、タスクを、OS ではなく完全にプログラム内で管理されるスレッドのようなものと考えると役に立つことがあります。
非同期システムにも、次にどのタスクを実行するかを決定するスケジューラーは存在します(これはプログラムの一部であり、OS の一部ではありません)。しかし、スケジューラーはタスクをプリエンプトできません。代わりに、タスクは自発的に制御を手放し、別のタスクがスケジュールされることを許可しなければなりません。タスクは(制御を手放すことで)協調しなければならないため、これは協調的マルチタスクと呼ばれます。
プリエンプティブではなく協調的マルチタスクを使用することには、多くの影響があります。
- 制御が譲られる可能性のある地点の間では、コードが逐次的に実行されることを保証できます。予期せず一時停止されることはありません。
- タスクが yield ポイントの間で長い時間を費やす場合(たとえば、ブロッキング IO を行ったり、長時間実行される計算を実行したりする場合)、他のタスクは進捗できません。
- スケジューラーの実装ははるかに単純であり、スケジューリング(およびコンテキストスイッチ)のオーバーヘッドは少なくなります。
非同期の並行処理は、スレッドによる並行処理よりもはるかに効率的です。メモリのオーバーヘッドははるかに低く、コンテキストスイッチはずっと低コストな操作です。OS へ制御を渡してプログラムへ戻す必要がなく、切り替えるデータもはるかに少ないからです。しかし、それでもキャッシュへの影響がいくらか生じることがあります。OS のキャッシュ、たとえば TLB は変更する必要がないものの、タスクはメモリの異なる部分を操作する可能性が高いため、新しくスケジュールされたタスクに必要なデータがメモリキャッシュにない場合があります。
非同期 IO はブロッキング IO の代替です(ノンブロッキング IO と呼ばれることもあります)。非同期 IO は非同期の並行処理と直接結び付いているわけではありませんが、この 2 つは一緒に使われることがよくあります。非同期 IO では、プログラムは 1 つのシステムコールで IO を開始し、その後 IO が完了したかどうかを確認するか、完了時に通知を受け取ることができます。つまり、IO が行われている間、プログラムは他の作業を自由に進められます。Rust では、非同期 IO の仕組みは非同期ランタイムによって処理されます(スケジューラーもランタイムの一部です。ランタイムについてはこの本の後半でより詳しく説明しますが、本質的には、ランタイムは基本的な非同期関連のことの一部を処理するライブラリにすぎません)。 システム全体の観点から見ると、スレッドを使った並行システムにおけるブロッキング IO と、async 並行システムにおけるノンブロッキング IO は似ています。どちらの場合も、IO には時間がかかり、IO が行われている間に他の作業が実行されます。
- スレッドの場合、IO を行うスレッドは OS に IO を要求し、そのスレッドは OS によって一時停止され、他のスレッドが作業を実行します。そして IO が完了すると、OS はそのスレッドを起こし、IO の結果を使って実行を継続できるようにします。
- async の場合、IO を行うタスクはランタイムに IO を要求し、ランタイムは OS に IO を要求しますが、OS はランタイムに制御を返します。ランタイムは IO タスクを一時停止し、他のタスクをスケジュールして作業を実行させます。IO が完了すると、ランタイムは IO タスクを起こし、IO の結果を使って実行を継続できるようにします。
async IO を使う利点は、オーバーヘッドがはるかに低いため、システムがスレッドよりも桁違いに多くのタスクをサポートできることです。そのため、async 並行性は、多数のユーザーを持ち、IO 待ちに多くの時間を費やすタスクに特に適しています(待ち時間がそれほど多くなく、代わりに CPU バウンドな作業を大量に行う場合は、ボトルネックが CPU とメモリリソースになるため、低オーバーヘッドであることによる利点はそれほど大きくありません)。
スレッドと async は相互排他的ではありません。多くのプログラムは両方を使用します。一部のプログラムには、スレッドを使って実装する方が適している部分と、async を使って実装する方が適している部分があります。たとえば、データベースサーバーはクライアントとのネットワーク通信を管理するために async の手法を使い、データに対する計算には OS スレッドを使うことがあります。あるいは、プログラムが async 並行性だけを使って書かれていても、ランタイムが複数のスレッド上でタスクを実行することがあります。これは、プログラムが複数の CPU コアを利用するために必要です。本書の後のいくつかの箇所で、スレッドと async タスクが交差する部分を扱います。
並行性と並列性
ここまでは並行性(同時に多くのことを行う、または行っているように見えること)について話してきました。また、並列性(文字どおり同時に多くのことを行うことを可能にする複数の CPU コアの存在)にも触れてきました。これらの用語は同じ意味で使われることもありますが、異なる概念です。このセクションでは、これらの用語とその違いを正確に定義してみます。説明のために簡単な疑似コードを使います。
1 つのタスクが多数のサブタスクに分割されていると想像してください。
task1 {
subTask1-1()
subTask1-2()
...
subTask1-100()
}
このような疑似コードを実行するプロセッサのふりをしてみましょう。実行する明らかな方法は、まず subTask1-1 を実行し、次に subTask1-2 を実行し、すべてのサブタスクが完了するまで同じように続けることです。これは逐次実行です。
次に、複数のタスクを考えてみましょう。それらをどのように実行できるでしょうか。1 つのタスクを開始し、タスク全体が完了するまですべてのサブタスクを実行してから、次のタスクに取りかかることができます。2 つのタスクは逐次的に実行されています(また、各タスク内のサブタスクも逐次的に実行されています)。サブタスクだけを見ると、次のように実行することになります。
subTask1-1()
subTask1-2()
...
subTask1-100()
subTask2-1()
subTask2-2()
...
subTask2-100()
あるいは、subTask1 を実行してから task1 を脇に置き(どこまで進んだかを覚えておき)、次のタスクを取り上げてその最初のサブタスクを実行し、その後 task1 に戻ってサブタスクを実行することもできます。2 つのタスクはインターリーブされており、これを 2 つのタスクの並行実行と呼びます。次のようになります。
subTask1-1()
subTask2-1()
subTask1-2()
subTask2-2()
...
subTask1-100()
subTask2-100()
あるタスクが別のタスクの結果や副作用を観測できない限り、そのタスクの観点からは、サブタスクは依然として逐次的に実行されています。
2 つのタスクに限定する理由はありません。任意の数のタスクを、任意の順序でインターリーブできます。
どれだけ並行性を追加しても、処理全体の完了にかかる時間は同じであることに注意してください(実際には、それらの間でコンテキストスイッチを行うオーバーヘッドにより、並行性が増えるほど時間が長くなる可能性があります)。ただし、ある特定のサブタスクについては、完全に逐次的に実行する場合よりも早く完了するかもしれません(ユーザーにとっては、より応答性が高いように感じられるかもしれません)。
次に、タスクを処理しているのが自分だけではなく、手伝ってくれるプロセッサの仲間がいると想像してください。同時にタスクに取り組み、作業をより速く完了できます! これが並列実行です(これは並行でもあります)。サブタスクは次のように実行されるかもしれません。
Processor 1 Processor 2
============== ==============
subTask1-1() subTask2-1()
subTask1-2() subTask2-2()
... ...
subTask1-100() subTask2-100()
プロセッサが 2 つより多い場合は、さらに多くのタスクを並列に処理できます。また、各プロセッサ上でタスクをインターリーブしたり、プロセッサ間でタスクを共有したりすることもできます。
実際のコードでは、物事は少し複雑です。一部のサブタスク(たとえば IO)は、プロセッサが能動的に関与する必要はなく、開始して、しばらく後に結果を回収するだけで済みます。また、一部のサブタスクは、進捗するために別のタスクのサブタスクの結果(または副作用)を必要とする場合があります(同期)。これら 2 つのシナリオはいずれも、タスクを並行に実行できる有効な方法を制限します。そして、それに何らかの公平性の概念を確保することが加わるため、スケジューリングが重要になります。
ふざけた例はもう十分なので、きちんと定義してみましょう
並行性は計算の順序に関するものであり、並列性は実行の形態に関するものです。
2 つの計算があるとき、一方が他方より前に起こることを観測できる場合、それらは逐次的(つまり並行ではない)であると言います。一方が他方より前に起こることを観測できない(または、それが重要でない)場合、それらは並行であると言います。
2 つの計算は、文字どおり同時に起こっている場合に並列に起こっています。並列性はリソースとして考えることができます。利用可能な並列性が多いほど、一定の時間内により多くの計算を行うことができます(計算が同じ速度で行われると仮定した場合)。並列性を増やさずにシステムの並行性を高めても、それによってシステムが速くなることは決してありません(ただし、システムの応答性を高めることはでき、そうでなければ実用的でない最適化を実装可能にすることはあります)。 言い換えると、2つの計算は一方が他方の後に起こる場合(並行でも並列でもない)もあれば、それらの実行が単一のCPUコア上でインターリーブされる場合(並行だが並列ではない)もあり、あるいは2つのコア上で同時に実行される場合(並行かつ並列)もあります11。
もう1つ有用な捉え方12は、並行性はコードを構成する方法であり、並列性はリソースである、というものです。これは強力な主張です!並行性がコードの実行ではなくコードの構成に関するものだという点は重要です。なぜなら、プロセッサの観点から見ると、並列性を伴わない並行性は単に存在しないからです。これは async の並行性にとって特に重要です。なぜなら、それは完全にユーザー側のコードで実装されているからです。つまり、それはコードを構成すること「だけ」に関するものだというだけでなく、ソースコードを読むだけでそれを簡単に自分で確かめられます。並列性がリソースであるという点も有用です。なぜなら、並列性と性能にとって重要なのはプロセッサコアの数だけであり、並行性に関してコードがどのように構成されているか(たとえばスレッドがいくつあるか)ではない、ということを思い出させてくれるからです。
スレッドベースのシステムと async システムはどちらも、並行性と並列性の両方を提供できます。どちらの場合も、並行性はコード(スレッドまたはタスクの生成)によって制御され、並列性はスケジューラによって制御されます。スケジューラは、スレッドではOSの一部(OSのAPIによって設定される)であり、async ではランタイムライブラリの一部(ランタイムの選択、ランタイムの実装方法、およびランタイムがクライアントコードに提供するオプションによって設定される)です。ただし、慣習と一般的なデフォルトに起因する実践上の違いがあります。スレッドベースのシステムでは、各並行スレッドは、可能な限り多くの並列性を使って並列に実行されます。async システムには強いデフォルトはありません。すべてのタスクを単一のスレッドで実行するシステムもあれば、複数のタスクを単一のスレッドに割り当て、そのスレッドを1つのコアに固定するシステムもあります(そのため、タスクのグループは並列に実行されますが、グループ内では各タスクは並行に実行され、同じグループ内の他のタスクと並列に実行されることはありません)。また、タスクが制限付きまたは制限なしで並列に実行される場合もあります。このガイドの第1部では、主に最後のモデルをサポートする Tokio ランタイムを使用します。つまり、並列性に関する振る舞いは、スレッドを使った並行性の場合と似ています。さらに、async Rust には、ランタイムに依存せず、並行性は明示的にサポートするが並列性はサポートしない機能があることも見ていきます。
まとめ
- 実行モデルは数多くあります。ここでは、逐次実行、スレッドとプロセス、非同期プログラミングについて説明しました。
- スレッドは、OSによって提供され(かつスケジュールされ)る抽象化です。通常、プリエンプティブマルチタスクを伴い、デフォルトで並列であり、管理とコンテキストスイッチングにはかなり高いオーバーヘッドがあります。
- 非同期プログラミングは、ユーザー空間のランタイムによって管理されます。マルチタスクは協調的です。スレッドよりもオーバーヘッドは低いですが、第一級のスレッドではなく、異なるプログラミングプリミティブ(
asyncとawait、および Future)を使用するため、スレッドを使ったプログラミングとは少し感覚が異なります。
- 並行性と並列性は異なる概念ですが、密接に関連しています。
- 並行性は計算の順序に関するものです(操作の実行順序を観測できない場合、それらの操作は並行です)。
- 並列性は複数のプロセッサ上で計算することに関するものです(操作が文字どおり同時に起こっている場合、それらの操作は並列です)。
- OSスレッドと非同期プログラミングはいずれも、並行性と並列性を提供します。非同期プログラミングはさらに、多くのオペレーティングシステムのスレッドAPIには含まれない、柔軟または細粒度の並行性のための構成要素も提供できます。
-
これは厳密には正しくありません。現代のコンパイラーや CPU はコードを再構成し、好きな順序で実行します。逐次的な文は、多くの異なる方法で重なり合っている可能性があります。しかし、これはプログラム自身やそのユーザーから決して観測可能であってはなりません。 ↩
-
これも正しくありません。あるプログラムが完全に逐次的であっても、他のプログラムが同時に実行されている可能性があります。これについては次のセクションで詳しく説明します。 ↩
-
IO は input/output の略です。これは、プログラムからプログラム外部の世界へのあらゆる通信を意味します。ディスクやネットワークへの読み書き、端末への書き込み、キーボードやマウスからのユーザー入力の取得、OS やシステム内で実行されている別のプログラムとの通信などが含まれます。並行性の文脈で IO が興味深いのは、プログラムが内部で行うほぼあらゆるタスクよりも、発生するまでに何桁も長い時間がかかるためです。これは通常、多くの待ち時間を意味し、その待ち時間は他の作業を行う機会になります。 ↩
-
IO が正確にいつ完了するかは、実際にはかなり複雑です。プログラムの観点では、1 回の IO 呼び出しは、OS から制御が戻されたときに完了します。これは通常、データが何らかのハードウェアや別のプログラムに送信されたことを示しますが、そのデータが実際にディスクへ書き込まれた、またはユーザーに表示された、などを必ずしも意味するわけではありません。それには、ハードウェアでの追加作業、キャッシュの定期的なフラッシュ、あるいは別のプログラムがデータを読み取ることが必要な場合があります。ほとんどの場合、これについて心配する必要はありませんが、知っておくとよいでしょう。 ↩
-
ユーザーの視点では、単一のプログラムに複数のプロセスが含まれる場合がありますが、OS の視点では各プロセスは別個のプログラムです。 ↩
-
一部の OS はプロセス間でのメモリ共有をサポートしていますが、それを使うには特別な扱いが必要で、ほとんどのメモリは共有されません。 ↩
-
OS がどのスレッドをどのくらいの時間(そしてどのコアで)実行するかを正確にどのように選ぶかは、スケジューリングの重要な部分です。高レベルの戦略と、それらの戦略を設定するためのオプションの両方に、多くの選択肢があります。ここで適切な選択を行うことは良好なパフォーマンスにとって非常に重要ですが、複雑であり、ここでは掘り下げません。 ↩
-
もう 1 つの選択肢として、スレッドが IO の終了まで単にループで回り続けることでビジーウェイトすることがあります。これは、他のスレッドが実行できなくなるためあまり効率的ではなく、ほとんどの最近のシステムでは一般的ではありません。ロックの実装や非常に単純な組み込みシステムで見かけることがあるかもしれません。 ↩
-
最初は、プログラムに単一のスレッドしかないと仮定して説明を始めますが、後でそれを拡張します。システム上ではおそらく他のプロセスも実行されていますが、それらは async 並行性の仕組みに実際には影響しません。 ↩
-
プログラム内で(OS を使わずに)管理される並行性を持ちながら、スレッド間の協調に依存するのではなくプリエンプティブスケジューラーを使うプログラミング言語(あるいはライブラリさえ)もいくつかあります。Go はよく知られた例です。これらのシステムでは
asyncやawaitの表記は必要ありませんが、他の言語や OS との相互運用がはるかに難しくなることや、ランタイムが重量級になることなど、別の欠点があります。Rust のごく初期のバージョンにはそのようなシステムがありましたが、1.0 までにはその痕跡は残っていませんでした。 ↩ -
計算は並列だが並行ではない、ということはあり得るのでしょうか?ある意味ではそうですが、実際にはそうとは言えません。2つのタスク(aとb)があり、それぞれが1つのサブタスク(それぞれaとbに属する1と2)で構成されていると想像してください。同期を使うことで、サブタスク1が完了するまでサブタスク2を開始できず、タスクaはサブタスク2が完了するまで完了できないものとします。ここで、aとbは異なるプロセッサ上で実行されます。タスクをブラックボックスとして見るなら、それらは並列に実行されていると言えますが、ある意味では、それらの順序は完全に決定されているため、並行ではありません。しかし、サブタスクを見ると、それらは並列でも並行でもないことがわかります。 ↩
-
これはAaron Turonによるものだと思っており、Rustの標準ライブラリの設計の一部にも反映されています。たとえば、available_parallelism 関数です。 ↩
async と await
この章では、Rust で非同期プログラミングを始め、async キーワードと await キーワードを紹介します。
async は関数(および後で扱う trait など、その他の item)に付けるアノテーションです。await は式で使われる演算子です。しかし、これらのキーワードに入る前に、Rust における非同期プログラミングの中核となる概念をいくつか押さえる必要があります。これは前章での議論を受けたもので、ここでは物事を Rust プログラミングに直接結び付けて説明します。
Rust の非同期概念
ランタイム
非同期タスクは管理され、スケジュールされる必要があります。通常、利用可能なコア数よりもタスク数の方が多いため、すべてを同時に実行することはできません。あるタスクの実行が止まったら、別のタスクを選んで実行しなければなりません。タスクが IO や何らかのイベントを待っている場合、そのタスクはスケジュールされるべきではありませんが、それが完了したらスケジュールされるべきです。そのためには OS とやり取りし、IO 作業を管理する必要があります。
多くのプログラミング言語はランタイムを提供しています。一般に、このランタイムは非同期タスクの管理よりもはるかに多くのことを行います。メモリを管理する(ガベージコレクションを含む)こともあれば、例外処理で役割を持つこと、OS 上の抽象化レイヤーを提供すること、あるいは完全な仮想マシンであることさえあります。Rust は低レベル言語であり、ランタイムのオーバーヘッドを最小限にすることを目指しています。そのため、非同期ランタイムのスコープは、多くの他言語のランタイムよりもはるかに限定されています。また、非同期ランタイムを設計・実装する方法は多数あるため、Rust は 1 つを提供するのではなく、要件に応じて選択できるようにしています。これは、非同期プログラミングを始めるには追加の手順が必要になることを意味します。
タスクを実行してスケジュールするだけでなく、ランタイムは非同期 IO を管理するために OS とやり取りしなければなりません。また、タスクにタイマー機能を提供する必要もあります(これは IO 管理と交差します)。ランタイムがどのように構成されるべきかについて厳密な規則はありませんが、いくつかの用語と責任分担は一般的です。
- reactor、event loop、または driver(同等の用語): IO とタイマーイベントをディスパッチし、OS とやり取りし、実行を前進させる最も低レベルの駆動を行います。
- scheduler: タスクがいつ、どの OS スレッド上で実行できるかを決定します。
- executor または runtime: reactor と scheduler を組み合わせたもので、非同期タスクを実行するためのユーザー向け API です。runtime は機能全体のライブラリを意味する場合にも使われます(たとえば、
Runtime型で表される Tokio executor だけでなく、Tokio crate に含まれるすべてのもの)。
上で説明した executor に加えて、ランタイム crate には通常、多くのユーティリティ trait や関数が含まれています。これらには、trait(たとえば AsyncRead)や IO の実装、ネットワーキングやファイルシステムへのアクセスなどの一般的な IO タスクのための機能、ロック、チャネル、その他の同期プリミティブ、タイミング用ユーティリティ、OS と連携するためのユーティリティ(たとえばシグナル処理)、future や stream(非同期イテレータ)を扱うためのユーティリティ関数、監視や観測のためのツールなどが含まれる場合があります。このガイドでは、それらの多くを扱います。
選択できる非同期ランタイムは多数あります。スケジューリングポリシーが大きく異なるものや、特定のタスクやドメイン向けに最適化されているものもあります。このガイドの大部分では、Tokio ランタイムを使用します。これは汎用ランタイムであり、エコシステムで最も人気のあるランタイムです。入門にも本番環境での作業にも優れた選択肢です。状況によっては、別のランタイムを使うことで、より高いパフォーマンスを得られたり、より単純なコードを書けたりする場合があります。このガイドの後半では、利用可能な他のランタイムのいくつかと、なぜそれを選ぶのか、あるいは独自に書くのかについて説明します。
できるだけ早く動かし始めるには、少しのボイラープレートだけが必要です。Tokio crate を Cargo.toml の依存関係として含める必要があります(他の crate と同じです)。
[dependencies]
tokio = { version = "1", features = ["full"] }
そして、main 関数に tokio::main アノテーションを使い、非同期関数にできるようにします(Rust では通常これは許可されていません)。
#[tokio::main]
async fn main() { ... }
これで完了です!非同期コードを書く準備ができました!
#[tokio::main] アノテーションは Tokio ランタイムを初期化し、main 内のコードを実行するための非同期タスクを開始します。このガイドの後半では、このアノテーションが何をしているのか、そしてこれを使わずに非同期コードを使う方法(より柔軟性が得られます)について、より詳しく説明します。
Futures-rs とエコシステム
TODO コンテキストと歴史、futures-rs が何のためのものか - よく使われていたが、おそらく今は必要ない、Tokio や他のランタイムとの重複(時には微妙な意味論上の違いを伴う)、なぜ必要になるか(future を直接扱う、特に独自に書く場合、stream、いくつかのユーティリティ)
その他のエコシステム関連 - Yosh の crate、代替ランタイム、実験的なもの、その他?
Future とタスク
Rust における非同期並行処理の基本単位は future です。future は、‘Future’ trait を実装する、ごく普通の Rust オブジェクト(通常は struct または enum)です。future は遅延された計算を表します。つまり、将来のある時点で準備が整う計算です。
このガイドでは future について多く説明しますが、最初はあまり気にしすぎずに始めるのが最も簡単です。次のいくつかのセクションでは future にかなり触れますが、実際に定義したり直接使ったりするのは後になります。future の重要な側面の 1 つは、それらを組み合わせて新しい、より「大きな」future を作れることです(それらをどのように組み合わせられるかについては、後でさらに詳しく説明します)。
前章とこの章では、「非同期タスク」という用語を非形式的な形でかなり使ってきました。この用語は、実行の論理的な並び、つまりスレッドに似ているが OS によって外部から管理されるのではなく、プログラム内で管理されるもの、という意味で使ってきました。タスクという観点で考えることはしばしば有用ですが、Rust 自体にはタスクという概念はなく、この用語は異なる意味で使われます!これは混乱を招きます!さらに悪いことに、ランタイムにはタスクの概念があり、ランタイムごとにタスクの概念が少しずつ異なります。
ここから先では、タスクに関する用語を正確に使うようにします。単に「task」と言う場合、他のタスクと並行して発生し得る計算の並びという抽象概念を意味します。「async task」はまったく同じ意味で使いますが、OS スレッドとして実装されたタスクとの対比として使います。「runtime’s task」は、ランタイムが想定する何らかの種類のタスクを意味し、「tokio task」(またはその他の特定のランタイム)は Tokio におけるタスクの考え方を意味します。
Rust における非同期タスクは単なる future です(通常は、多くの他の future を組み合わせて作られた「大きな」future です)。言い換えると、タスクとは実行される future です。ただし、future がランタイムのタスクではなく「実行」される場合があります。この種の future は直感的には task ですが、runtime’s task ではありません。その例に到達したときに、これをさらに詳しく説明します。
非同期関数
async キーワードは関数宣言に付ける修飾子です。たとえば、pub async fn send_to_server(...) のように書けます。async 関数とは、単に async キーワードを使って宣言された関数のことです。これは、非同期に実行できる関数であること、言い換えれば、呼び出し元が何か他のことを行う前にその関数の完了を待たないことを選択できるという意味です。
より機械的に言うと、async 関数が呼び出されたとき、その本体は通常の関数の場合のようには実行されません。代わりに、関数本体とその引数が future にパッケージ化され、実際の結果の代わりに返されます。呼び出し元はその future をどう扱うかを決めることができます(呼び出し元が結果を「すぐに」欲しい場合は、その future を await します。次のセクションを参照してください)。
async 関数の内部では、コードは通常どおり逐次的に実行されます1。async であることによる違いはありません。async 関数から同期関数を呼び出すことができ、実行は通常どおり進みます。async 関数の内部で追加でできることの 1 つは、await を使って他の async 関数(または future)を待機することです。これにより、別のタスクが実行できるように制御が明け渡される場合があります。
await
future は、将来のある時点で準備が整う計算であると前述しました。その計算の結果を得るには、await キーワードを使います。結果がすぐに準備できている場合、または待たずに計算できる場合、await は単にその計算を行って結果を生成します。しかし、結果がまだ準備できていない場合、await はスケジューラーに制御を渡し、別のタスクが進行できるようにします(これは前の章で触れた協調的マルチタスクです)。
Rust では、await を使う構文は some_future.await です。つまり、. 演算子とともに使われる後置キーワードです。そのため、メソッド呼び出しやフィールドアクセスのチェーンの中で使いやすくなっています。これは、Python や JavaScript のように、await some_function() のような式の前に置く前置演算子として await を使う言語とは対照的です。
後置 await がしばしばより使いやすい理由を見るために、ネットワークリクエストを行う async 関数を呼び出し、レスポンスのステータスコードにアクセスしたいとします。前置 await 構文では、fetch() の前に await を付け、? でエラーを伝播するために式を括弧で囲み、その後でステータスコードにアクセスする必要があります。たとえば (await fetch())?.status_code のようになります。後置構文では、fetch().await?.status_code と書けます。これは、より長いチェーンで特に役立ちます。たとえば、2 つの前置 await を含む式は (await (await fetch())?.json())?.data のようになりますが、後置の同等の式は fetch().await?.json().await?.data であり、より自然に読めます。
それでは、実際に async と await がどのように使われるかを見てみましょう。次の関数を考えてください。
#![allow(unused)]
fn main() {
// async 関数ですが、何かを待つ必要はありません。
async fn add(a: u32, b: u32) -> u32 {
a + b
}
async fn wait_to_add(a: u32, b: u32) -> u32 {
sleep(1000).await;
a + b
}
}
add(15, 3).await を呼び出すと、結果 18 がすぐに返されます。wait_to_add(15, 3).await を呼び出すと、最終的には同じ答えが得られますが、待っている間に別のタスクが実行される機会を得ます。
この単純な例では、sleep の呼び出しは、結果を待たなければならない長時間実行される何らかのタスクの代役です。これは通常、結果が外部ソースから読み取られたデータであるか、外部の宛先への書き込みが成功したことの確認であるような IO 操作です。読み取りは let data = read(...).await? のようになります。この場合、await によって現在のタスクは読み取りが行われている間待機します。読み取りが完了すると、そのタスクは再開されます(読み取りタスクが待機している間、他のタスクが作業を進めることができます)。読み取りの結果は、正常に読み取られたデータか、エラー(? によって処理されます)です。
.await を使わずに add、wait_to_add、または read を呼び出しても、答えは何も得られないことに注意してください!
どういうことでしょうか?
async 関数を呼び出すと future が返されます。関数内のコードがすぐに実行されるわけではありません。さらに、future は await されるまで何の作業も行いません2。これは、async 関数が future を返し、その future がすぐに実行を開始する一部の他の言語とは対照的です。
これは Rust における async プログラミングの重要な点です。しばらくすると自然に身につきますが、初心者、特に他の言語で async プログラミングの経験がある人は、ここでつまずくことがよくあります。
Rust の future に関する重要な直感は、それらが不活性なオブジェクトであるということです。何らかの作業を進めるには、外部の力(通常は async ランタイム)によって駆動されなければなりません。
ここまで await をかなり操作的に説明してきました(future を実行し、結果を生成するものとして)。しかし前の章では async タスクと並行性について話しました。await はそのメンタルモデルにどのように当てはまるのでしょうか? まず、純粋な逐次コードを考えてみましょう。論理的には、関数を呼び出すことは、単にその関数内のコードを実行することです(変数への何らかの代入を伴います)。言い換えれば、現在のタスクは、その関数によって定義される次の「かたまり」のコードを実行し続けます。同様に、async コンテキストでは、非 async 関数を呼び出すと、単にその関数で実行が継続されます。async 関数を呼び出すと、実行すべきコードは見つかりますが、それは実行されません。await は、現在のタスクの実行を継続する演算子であり、現在のタスクが今は継続できない場合には、別のタスクに継続する機会を与える演算子です。
await は async コンテキストの内部でのみ使用できます。現時点では、これは async 関数の内部を意味します(後で、より多くの種類の async コンテキストを見ていきます)。その理由を理解するには、await がランタイムに制御を渡し、別のタスクが実行できるようにする可能性があることを思い出してください。制御を渡す先となるランタイムが存在するのは async コンテキストの中だけです。今のところ、ランタイムは async 関数の中でだけアクセス可能なグローバル変数のようなものだと想像してかまいません。実際にどのように動作するかは後で説明します。
最後に、await についてもう 1 つ別の見方を示します。先ほど、future は組み合わせて「より大きな」future を作ることができると述べました。async 関数は future を定義する方法の 1 つであり、await は future を組み合わせる方法の 1 つです。future に対して await を使うと、その future は、それが使われている async 関数によって生成される future に組み込まれます。この見方や、future を組み合わせる他の方法については、後でより詳しく説明します。
async/await の例
まずは、私たちの「hello, world!」の例を再訪するところから始めましょう。
// Define an async function.
async fn say_hello() {
println!("hello, world!");
}
#[tokio::main] // Boilerplate which lets us write `async fn main`, we'll explain it later.
async fn main() {
// Call an async function and await its result.
say_hello().await;
}
これで、main の周囲にあるボイラープレートを認識できるはずです。これは Tokio ランタイムを初期化し、async な main 関数を実行するための最初のタスクを作成するためのものです。
say_hello は async 関数です。これを呼び出すときは、現在のタスクの一部として実行するために、その呼び出しに続けて .await を付ける必要があります。.await を取り除くと、プログラムを実行しても何も起こらないことに注意してください!say_hello を呼び出すと future が返されますが、それは実行されないため、println は呼び出されません(少なくともコンパイラは警告してくれます)。
これは、Tokio tutorial から取った、もう少し現実的な例です。
#[tokio::main]
async fn main() -> Result<()> {
// mini-redis アドレスへの接続を開きます。
let mut client = client::connect("127.0.0.1:6379").await?;
// キー "hello" に値 "world" を設定します
client.set("hello", "world".into()).await?;
// キー "hello" を取得します
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
このコードは少し面白くなっていますが、本質的には同じことをしています。async 関数を呼び出し、その結果を実行するために await しています。今回はエラーハンドリングに ? を使用しています。これは同期 Rust の場合とまったく同じように機能します。
ここまで並行性、並列性、非同期性についていろいろ話してきましたが、これら 2 つの例はいずれも 100% 逐次的です。async 関数を呼び出して await するだけでは、await しているタスクが待機している間にスケジュールできる他のタスクがない限り、並行性は一切導入されません。これを自分たちで確認するために、もう 1 つ単純な(ただし作為的な)例を見てみましょう。
use std::io::{stdout, Write};
use tokio::time::{sleep, Duration};
async fn say_hello() {
print!("hello, ");
// Flush stdout so we see the effect of the above `print` immediately.
stdout().flush().unwrap();
}
async fn say_world() {
println!("world!");
}
#[tokio::main]
async fn main() {
say_hello().await;
// An async sleep function, puts the current task to sleep for 1s.
sleep(Duration::from_millis(1000)).await;
say_world().await;
}
“hello” と “world” を表示する間に、現在のタスクを 1 秒間スリープさせます3。プログラムを実行すると何が起こるかを観察してください。“hello” が表示され、1 秒間何も起こらず、その後 “world” が表示されます。これは、単一のタスクの実行が純粋に逐次的だからです。もし並行性があれば、その 1 秒の仮眠は、“world” を表示するような他の作業を行う絶好の機会になるでしょう。次のセクションで、その方法を見ていきます。
タスクを生成する
ここまで、async と await を async タスクでコードを実行する方法として説明してきました。また、await は IO やその他のイベントを待っている間、現在のタスクをスリープさせることができる、と述べました。そのとき、別のタスクが実行できますが、それらの他のタスクはどのように生まれるのでしょうか?新しいタスクを生成するために std::thread::spawn を使うのと同じように、新しい async タスクを生成するために tokio::spawn を使うことができます。spawn は Rust の標準ライブラリではなく、ランタイムである Tokio の関数であることに注意してください。なぜなら、タスクは純粋にランタイム上の概念だからです。
spawn を使って async 関数を別のタスク上で実行する小さな例を示します。
use tokio::{spawn, time::{sleep, Duration}};
async fn say_hello() {
// Wait for a while before printing to make it a more interesting race.
sleep(Duration::from_millis(100)).await;
println!("hello");
}
async fn say_world() {
sleep(Duration::from_millis(100)).await;
println!("world!");
}
#[tokio::main]
async fn main() {
spawn(say_hello());
spawn(say_world());
// Wait for a while to give the tasks time to run.
sleep(Duration::from_millis(1000)).await;
}
前の例と同様に、“hello” と “world!” を表示する 2 つの関数があります。しかし今回は、逐次的ではなく並行に(そして並列に)実行します。プログラムを何度か実行すると、文字列がどちらの順序でも表示されるはずです。あるときは “hello” が先、あるときは “world!” が先です。古典的な並行レースです!
ここで何が起こっているのかを詳しく見ていきましょう。関係している概念は 3 つあります。future、タスク、スレッドです。spawn 関数は future(思い出してください、これは多数のより小さな future から構成されている場合があります)を受け取り、それを新しい Tokio タスクとして実行します。タスクは、Tokio ランタイムがスケジュールし管理する概念です(個々の future ではありません)。Tokio(デフォルト設定の場合)はマルチスレッドランタイムです。つまり、新しいタスクを生成すると、そのタスクは生成元のタスクとは異なる OS スレッドで実行される可能性があります(同じスレッドで実行される場合もありますし、あるスレッドで開始されてから後で別のスレッドに移動される場合もあります)。
したがって、future がタスクとして生成されると、それは生成元のタスクや他のタスクと並行に実行されます。また、異なるスレッドにスケジュールされれば、それらのタスクと並列に実行されることもあります。
まとめると、Rust で 2 つの文を続けて書いた場合、それらは(async コードであるかどうかにかかわらず)逐次的に実行されます。await を書いても、逐次的な文の並行性は変わりません。たとえば、foo(); bar(); は厳密に逐次的です。foo が呼び出され、その後 bar が呼び出されます。これは foo と bar が async 関数であるかどうかに関係なく成り立ちます。foo().await; bar().await; も厳密に逐次的です。foo が完全に評価され、その後 bar が完全に評価されます。どちらの場合も、別のスレッドがその逐次実行に割り込むように実行される可能性があります。また 2 つ目の場合は、await ポイントで別の async タスクが割り込むように実行される可能性があります。しかし、どちらの場合も、2 つの文は互いの関係において逐次的に実行されます。
thread::spawn または tokio::spawn のいずれかを使用すると、並行性と、場合によっては並列性が導入されます。前者ではスレッド間で、後者ではタスク間で導入されます。
このガイドの後半では、future を並行に実行するものの、決して並列には実行しないケースを見ていきます。
タスクを join する
生成したタスクの実行結果を取得したい場合、生成元のタスクはその終了を待って結果を使用できます。これをタスクを join すると呼びます(スレッドの join と類似しており、join のための API も似ています)。
タスクが生成されると、spawn 関数は JoinHandle を返します。タスクに独自に実行させたいだけであれば、JoinHandle は破棄できます(JoinHandle を drop しても、生成されたタスクには影響しません)。しかし、生成元のタスクが、生成されたタスクの完了を待ってから結果を使用したい場合は、JoinHandle を await することでそれを行えます。
例として、‘Hello, world!’ の例をもう一度見直してみましょう。
use tokio::{spawn, time::{sleep, Duration}};
async fn say_hello() {
// Wait for a while before printing to make it a more interesting race.
sleep(Duration::from_millis(100)).await;
println!("hello");
}
async fn say_world() {
sleep(Duration::from_millis(100)).await;
println!("world");
}
#[tokio::main]
async fn main() {
let handle1 = spawn(say_hello());
let handle2 = spawn(say_world());
let _ = handle1.await;
let _ = handle2.await;
println!("!");
}
コードは前回と似ていますが、単に spawn を呼び出す代わりに、返された JoinHandle を保存し、後でそれらを await しています。main 関数を終了する前にそれらのタスクの完了を待っているため、main 内の sleep はもう必要ありません。
spawn された2つのタスクは依然として並行に実行されています。プログラムを何度か実行すれば、両方の順序を確認できるはずです。ただし、await された join ハンドルは並行性に対する制限になります。最後の感嘆符(‘!’)は必ず最後に出力されます(await に対する println!("!"); の位置を動かして試してみることができます。観察可能な効果を得るには、おそらく sleep 時間も変更する必要があるでしょう)。
最初の spawn の JoinHandle を保存して後で await するのではなく、すぐに await した場合(つまり、spawn(say_hello()).await; と書いた場合)、‘hello’ future を実行するための別のタスクは spawn されますが、spawn した側のタスクは他のことを行う前に、それが完了するまで待機することになります。言い換えると、並行性が生じる可能性はありません!このようなことはほとんど望ましくありません(spawn する意味があるでしょうか?単に逐次コードを書けばよいのです)。
JoinHandle
JoinHandle について、もう少し詳しく簡単に見ていきます。JoinHandle を await できるという事実は、JoinHandle 自体が future であることを示す手がかりです。spawn は async 関数ではなく、future(JoinHandle)を返す通常の関数です。future を返す前に(タスクをスケジュールするための)処理をいくらか行います(async future とは異なります)。そのため、spawn を await する必要はありません。JoinHandle を await すると、spawn されたタスクが完了するまで待機し、その後で結果を返します。上の例では結果はなく、単にタスクが完了するのを待っていただけです。JoinHandle はジェネリック型であり、その型パラメーターは spawn されたタスクが返す型です。上の例では型は JoinHandle<()> になります。String を結果として返す future であれば、型が JoinHandle<String> の JoinHandle が生成されます。
JoinHandle を await すると Result が返されます(上の例で let _ = ... を使ったのはそのためで、未使用の Result に関する警告を避けています)。spawn されたタスクが正常に完了した場合、タスクの結果は Ok バリアントに入ります。タスクが panic した、または abort された場合(キャンセルの一種)、結果は JoinError のドキュメント を含む Err になります。プロジェクトで abort によるキャンセルを使用していない場合、JoinHandle.await の結果を unwrap するのは妥当なアプローチです。これは、spawn されたタスクから spawn したタスクへ panic を実質的に伝播させることになるためです。
-
他のスレッドと同様に、async 関数が実行されているスレッドはオペレーティングシステムによってプリエンプトされ、一時停止されることがあります。その間に別のスレッドが作業を進められるようになります。ただし、関数の観点からは、他のスレッドによって変更された可能性のあるデータ(現在のスレッドが一時停止されていなくても、並行して実行されている別のスレッドによって変更されていた可能性のあるデータ)を調べない限り、これは観測できません。 ↩
-
または poll されることです。これは
awaitよりも低レベルの操作であり、awaitを使うときに舞台裏で行われます。future について詳しく話すときに、poll についても後で説明します。 ↩ -
ここでは async sleep 関数を使用していることに注意してください。もし std の
sleepを使用すると、スレッド全体をスリープさせることになります。このおもちゃの例では違いはありませんが、実際のプログラムでは、その時間中にそのスレッド上で他のタスクをスケジュールできないことを意味します。これは非常に悪いことです。 ↩
async/await のさらに詳しいトピック
ユニットテスト
async コードをユニットテストするにはどうすればよいでしょうか?問題は、await できるのは async コンテキストの内部からだけであり、Rust のユニットテストは async ではないという点です。幸いなことに、ほとんどのランタイムは async main 用のものと似た、テスト用の便利な属性を提供しています。Tokio を使う場合は、次のようになります。
#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_something() {
// ここにテストを書きます。必要なだけ `await` を含めて構いません。
}
}
テストを設定する方法は多数あります。詳細についてはドキュメントを参照してください。
async コードのテストには、さらに高度なトピックがいくつかあります(たとえば、競合状態、デッドロックなどのテスト)。このガイドの後の箇所で、その一部を取り上げます。
ブロッキングとキャンセル
async Rust でプログラミングする際には、ブロッキングとキャンセルを念頭に置くことが重要です。これらの概念は特定の機能や関数に局所化されたものではなく、正しいコードを書くために理解しなければならない、システム全体に遍在する性質です。
ブロッキング IO
スレッド(ここで話しているのは OS スレッドであり、async タスクではないことに注意してください)が進行できないとき、そのスレッドはブロックされていると言います。通常、それは OS がそのスレッドの代わりにタスク(通常は I/O)を完了するのを待っているためです。重要なのは、スレッドがブロックされている間、OS は他のスレッドが進行できるように、そのスレッドをスケジュールしないことを知っているという点です。これはマルチスレッドプログラムでは問題ありません。ブロックされたスレッドが待機している間に、他のスレッドが進行できるためです。しかし、async プログラムでは、同じ OS スレッド上でスケジュールされるべき他のタスクがありますが、OS はそれらを認識しておらず、スレッド全体を待機させ続けます。つまり、単一のタスクが I/O の完了を待つ(これは問題ありません)のではなく、多くのタスクが待たなければならない(これは問題です)ということです。
非ブロッキング/async I/O については、まもなく説明します。今のところは、非ブロッキング I/O とは async ランタイムが認識している I/O であり、そのため現在のタスクだけが待機し、スレッド自体はブロックされない、ということだけを知っておいてください。async タスクからは非ブロッキング I/O のみを使用し、ブロッキング I/O(Rust の標準ライブラリで提供される唯一の種類)を決して使用しないことが非常に重要です。
ブロッキング計算
計算を行うことによってスレッドをブロックすることもできます(OS が関与しないため、これはブロッキング I/O とまったく同じではありませんが、効果は似ています)。ランタイムに制御を譲らずに長時間実行される計算(ブロッキング I/O の有無にかかわらず)がある場合、そのタスクは、ランタイムのスケジューラが他のタスクをスケジュールする機会をまったく与えません。async プログラミングは協調的マルチタスクを使用することを思い出してください。ここではタスクが協調していないため、他のタスクは作業を進める機会を得られません。これを緩和する方法については後で説明します。
スレッド全体をブロックする方法は他にも多数あり、このガイドではブロッキングについて何度か再び取り上げます。
キャンセル
キャンセルとは、future(またはタスク)の実行を停止することを意味します。Rust では(そして他の多くの async/await システムとは対照的に)、future は外部の力(async ランタイムなど)によって前に進められなければならないため、future がもはや前に進められなければ、それ以上実行されません。future がドロップされると(future は単なる普通の Rust オブジェクトであることを思い出してください)、それ以上進行できなくなり、キャンセルされます。
キャンセルはいくつかの方法で開始できます。
- future を単にドロップする(それを所有している場合)。
- タスクの ‘JoinHandle’(または
AbortHandle)でabortを呼び出す。 CancellationTokenを介する(キャンセルされる future がトークンに気づき、協調的に自分自身をキャンセルする必要があります)。selectのような関数やマクロによって暗黙的に行う。
中央の 2 つは Tokio 固有のものですが、ほとんどのランタイムは同様の機能を提供しています。CancellationToken を使用するには、キャンセルされる future の協力が必要ですが、その他の方法では必要ありません。これらの他の場合では、キャンセルされた future はキャンセルの通知を受け取らず、(デストラクタ以外に)クリーンアップする機会もありません。future がキャンセルトークンを持っている場合でも、キャンセルトークンをトリガーしない他の方法でキャンセルされる可能性があることに注意してください。
async コード(async 関数、ブロック、future など)を書くという観点では、そのコードは任意の await(マクロ内の隠れたものを含む)で実行を停止し、二度と再開しない可能性があります。コードが正しい(具体的にはキャンセル安全である)ためには、正常に完了する場合でも、任意の await ポイントで終了する場合でも、正しく動作しなければなりません1。
#![allow(unused)]
fn main() {
async fn some_function(input: Option<Input>) {
let Some(input) = input else {
return; // ここで終了する可能性があります(`return`)。
};
let x = foo(input)?; // ここで終了する可能性があります(`?`)。
let y = bar(x).await; // ここで終了する可能性があります(`await`)。
// ...
// ここで終了する可能性があります(暗黙の return)。
}
}
これがどのように問題になるかの例として、async 関数がデータを内部バッファに読み込み、その後で次のデータを await する場合があります。データの読み込みが破壊的である場合(つまり、元のソースから再読み込みできない場合)に async 関数がキャンセルされると、内部バッファはドロップされ、その中のデータは失われます。future をキャンセルすること、future を再起動すること、または同じデータに触れる新しい future を開始することによって、future とそれが触れるデータがどのような影響を受けるかを考慮することが重要です。
このガイドでは、キャンセルとキャンセル安全性について何度か再び取り上げます。また、リファレンスセクションにはこのトピックに関する章が丸ごとあります。
async ブロック
通常のブロック({ ... })は、ソース内でコードをまとめ、名前のカプセル化スコープを作成します。実行時には、ブロックは順番に実行され、最後の式の値(または末尾の式がない場合はユニット型(()))に評価されます。
async 関数と同様に、async ブロックは通常のブロックの遅延版です。async ブロックはコードと名前を一緒にスコープ化しますが、実行時にはすぐには実行されず、future に評価されます。ブロックを実行して結果を得るには、await されなければなりません。例:
#![allow(unused)]
fn main() {
let s1 = {
let a = 42;
format!("The answer is {a}")
};
let s2 = async {
let q = question().await;
format!("The question is {q}")
};
}
continue; を呼び出していたなら、s1 は出力可能な文字列になりますが、s2 は Future になります。question() は呼び出されていません。s2 を出力するには、まず s2.await する必要があります。
async ブロックは、async コンテキストを開始して Future を作成する最も単純な方法です。これは一般に、1 か所でしか使われない小さな Future を作成するために使われます。
残念ながら、async ブロックでの制御フローには少し癖があります。async ブロックは素直に実行されるのではなく Future を作成するため、制御フローに関しては通常のブロックよりも関数に近い振る舞いをします。break と continue は、通常のブロックの場合のように async ブロックを「突き抜ける」ことはできません。代わりに return を使う必要があります。
#![allow(unused)]
fn main() {
loop {
{
if ... {
// OK
continue;
}
}
async {
if ... {
// OK ではない
// continue;
// OK - `loop` の次回の実行へ進む。ただし、async ブロックの後に
// ループ内のコードがある場合、そのコードは実行されることに注意。
return;
}
}.await
}
}
break を実装するには、ブロックの値をテストする必要があります(一般的なイディオムは、ブロックの値として ControlFlow を使うことで、これにより ? も使用できます)。
同様に、async ブロック内の ? は、エラーが存在する場合に Future の実行を終了させ、await されたブロックがそのエラーの値を取るようにしますが、周囲の関数からは抜けません(通常のブロック内の ? ならそうなります)。そのためには、await の後にもう 1 つ ? が必要です。
#![allow(unused)]
fn main() {
async {
let x = foo()?; // この `?` は async ブロックだけを抜け、周囲の関数は抜けません。
consume(x);
Ok(())
}.await?
}
厄介なことに、これによってコンパイラーが混乱することがよくあります。なぜなら、(関数とは異なり)async ブロックの「戻り値」の型は明示されていないからです。これを動作させるには、おそらく変数に型アノテーションを追加するか、turbofish による型指定を使う必要があります。たとえば、上の例では Ok(()) の代わりに Ok::<_, MyError>(()) を使います。
async ブロックを返す関数は、async 関数とかなり似ています。async fn foo() -> ... { ... } と書くことは、おおよそ fn foo() -> ... { async { ... } } と等価です。実際、呼び出し元の視点ではこれらは等価であり、一方の形式からもう一方の形式へ変更しても破壊的変更にはなりません。さらに、async トレイトを実装するときに、一方でもう一方をオーバーライドできます(下記参照)。ただし、型は調整する必要があり、async ブロック版では Future を明示します。つまり、async fn foo() -> Foo は fn foo() -> impl Future<Output = Foo> になります(たとえば Send や 'static など、他の境界も明示する必要があるかもしれません)。
通常は、より単純で明確な async 関数版を選ぶでしょう。しかし、async ブロック版のほうが柔軟です。関数が呼び出されたときに実行するコード(async ブロックの外側に書く)と、結果が await されたときに実行するコード(async ブロックの内側のコード)を分けられるためです。
Async クロージャ
- クロージャ
- 近日公開予定(https://github.com/rust-lang/rust/pull/132706, https://blog.rust-lang.org/inside-rust/2024/08/09/async-closures-call-for-testing.html)
- クロージャ内の async ブロックと async クロージャの比較
ライフタイムと借用
- 上で static ライフタイムについて言及しました
- Future 上のライフタイム境界(
Future + '_など) - await ポイントをまたぐ借用
- わかりません。async 関数にはきっともっと多くのライフタイムの問題があるはずです……
Future 上の Send + 'static 境界
- なぜそれらがあるのか、マルチスレッドランタイム
- それらを避けるために spawn local する
- async fn を
Send + 'staticにするものと、それに関するバグの修正方法
Async トレイト
- 構文
Send + 'staticの問題とその回避策- trait_variant
- 明示的な Future
- 戻り値型表記(https://blog.rust-lang.org/inside-rust/2024/09/26/rtn-call-for-testing.html)
- オーバーライド
- メソッドにおける Future 表記と async 表記
- オブジェクト安全性
- キャプチャ規則(https://blog.rust-lang.org/2024/09/05/impl-trait-capture-rules.html)
- 歴史と async-trait クレート
再帰
- 許可されています(比較的新しい)が、明示的なボックス化がいくらか必要です
- Future への前方参照、pinning
- https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
- https://blog.rust-lang.org/2024/03/21/Rust-1.77.0.html#support-for-recursion-in-async-fn
- async-recursion マクロ(https://docs.rs/async-recursion/latest/async_recursion/)
-
async プログラミングにおけるキャンセルを、スレッドのキャンセルと比較するのは興味深いことです。スレッドをキャンセルすることは可能です(たとえば C で
pthread_cancelを使用する場合。Rust にはこれを直接行う方法はありません)が、キャンセルされるスレッドはどこでも終了し得るため、ほとんどの場合、それは非常に、非常に悪い考えです。対照的に、async タスクのキャンセルは await ポイントでのみ発生します。その結果、プロセス全体を終了せずに OS スレッドをキャンセルすることは非常にまれであり、そのためプログラマとしては通常、それが発生することを心配しません。しかし async Rust では、キャンセルは間違いなく発生し得るものです。これにどう対処するかについては、進めながら説明していきます。 ↩
IO とブロッキングの問題
IO(input/output、入出力)を効率的に処理することは、async プログラミングの主な動機の 1 つであり、ほとんどの async プログラムは大量の IO を行います。根本的に、IO の問題は、計算よりも桁違いに時間がかかることです。そのため、他の作業を進める代わりに IO の完了をただ待つのは非常に非効率です。理想的には、async プログラミングによって、プログラムは IO を待っている間に他の作業を進められます。
この章は、async コンテキストにおける IO の入門です。ブロッキング IO とノンブロッキング IO の重要な違い、およびブロッキング IO と async プログラミングが(少なくとも少し考慮と工夫をしない限り)相性が悪い理由を扱います。ノンブロッキング IO の使い方を扱い、その後、IO と async プログラミングで発生し得るいくつかの問題を見ていきます。また、OS が IO をどのように扱うかを見て、io_uring のようないくつかの代替 IO 手法も少し覗いてみます。
最後に、async タスクをブロックする(これは悪いことです)その他の方法と、async プログラミングをブロッキング IO や長時間実行される CPU 集約的なコードと適切に組み合わせる方法を扱います。
ブロッキング IO とノンブロッキング IO
IO はオペレーティングシステムによって実装されます。IO の作業は、別のプロセスや専用ハードウェア、またはいずれの場合もプログラムのプロセス外で行われます。IO は同期または非同期(それぞれブロッキングおよびノンブロッキングとも呼ばれます)のいずれかです。同期 IO とは、IO が行われている間、プログラム(または少なくともスレッド)が待機(別名ブロック)し、IO が完了して結果が OS から受け取られるまで処理を開始しないことを意味します。非同期 IO とは、IO が行われている間もプログラムが処理を進められ、後で結果を取得できることを意味します。どちらの種類の IO にも多くの異なる OS API がありますが、非同期の領域の方が種類は豊富です。
非同期 IO と非同期プログラミングは本質的に結び付いているわけではありません。しかし、async プログラミングは使いやすく高性能な async IO を可能にし、それが async プログラミングの大きな動機になっています。同期 IO によるブロッキングは、async プログラミングにおけるパフォーマンス問題の主要な原因であり、これを避けるよう注意しなければなりません(これについては後述します)。
Rust の標準ライブラリには、ブロッキング IO のための関数とトレイトが含まれています。ノンブロッキング IO には、専用のライブラリを使用する必要があります。これらは多くの場合、async ランタイムの一部です。たとえば Tokio の io モジュールがあります。
例を簡単に見てみましょう(Tokio のドキュメントから改変):
#![allow(unused)]
fn main() {
use tokio::{io::AsyncWriteExt, net::TcpStream};
async fn write_hello() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.write_all(b"hello world!").await?;
Ok(())
}
}
write_all は、stream にデータを書き込む async IO メソッドです。これは即座に完了することもありますが、より可能性が高いのは完了までに多少時間がかかる場合です。そのため、stream.write_all(...).await によって、OS が書き込みを処理するのを待つ間、現在のタスクは一時停止されます。スケジューラは他のタスクを実行し、書き込みが完了すると、そのタスクを起こして処理を続行するようスケジュールします。
しかし、標準ライブラリの書き込み関数を使った場合、async スケジューラは関与せず、OS は IO が完了するまでスレッド全体を一時停止します。つまり、現在のタスクが一時停止されるだけでなく、そのスレッドを使って他のタスクを実行することもできません。これがランタイムのスレッドプール内のすべてのスレッドで発生した場合(状況によってはスレッドが 1 つだけの場合もあります)、プログラム全体が停止し、処理を進められなくなります。これはスレッド(またはプログラム)をブロックすると呼ばれ、パフォーマンスにとって非常に悪いものです。async プログラムではスレッドを決してブロックしないことが重要であり、したがって async タスク内でブロッキング IO を使うことは避けるべきです。
スレッドのブロックは、ブロッキング IO だけでなく、長時間実行されるタスクやロックを待つタスクによっても引き起こされることがあります。これについては、この章の最後でさらに説明します。
繰り返し読み取りや書き込みを行うのは一般的なパターンであり、ストリームとシンク(別名 async イテレータ)はそのための便利な仕組みです。これらについては専用の章で扱います。
読み取りと書き込み
TODO
- async の Read と Write トレイト
- ランタイムの一部
- 使い方
- 具体的な実装
- ネットワークとディスク
- tcp、udp
- ファイルシステムは実際には async ではないが、io_uring(その章への参照)
- 実践的な例
- stdout など
- pipe、fd など
- ネットワークとディスク
メモリ管理
データを読み取るときは、それをどこかに置く必要があり、データを書き込むときは、書き込みが完了するまでどこかに保持しておく必要があります。どちらの場合でも、そのメモリがどのように管理されるかが重要です。
TODO
- バッファ管理と async IO に関する問題
- さまざまな解決策と長所・短所
- ゼロコピーアプローチ
- 共有バッファアプローチ
- これを支援するユーティリティクレート、Bytes など
IO に関する高度なトピック
TODO
- buf read/write
- Read + Write、split、join
- copy
- simplex と duplex
- キャンセル
- 同期 IO を行わなければならない場合はどうするか?スレッドを spawn するか、spawn_blocking を使う(下記参照)
OS から見た IO
TODO
- さまざまな種類の IO と仕組み、completion IO、高度なセクションの completion IO の章への参照
- 異なるランタイムがこれを容易にできる
- 低レベルインターフェースとしての mio
その他のブロッキング操作
章の冒頭で述べたように、スレッドをブロックしないことは async プログラムのパフォーマンスにとって極めて重要です。さまざまな種類のブロッキング IO はブロックを引き起こす一般的な方法ですが、大量の計算を行ったり、async スケジューラが調整していない方法で待機したりすることによってもブロックする可能性があります。
待機は、多くの場合、async を意識していない同期機構を使うことで発生します。たとえば、async mutex ではなく std::sync::Mutex を使う場合や、非 async チャネルを待つ場合です。この問題については、チャネル、ロック、同期の章で説明します。ブロッキング的な方法で待機してしまう他の方法もあり、一般にはノンブロッキングまたは何らかの形で async に適した仕組みを見つける必要があります。たとえば、標準ライブラリのものではなく async の sleep 関数を使うことです。待機はビジーウェイト(実質的には何の作業もせずにただループすること、別名スピンロック)である場合もありますが、おそらくこれは避けるべきです。
CPU 集約的な処理
長時間実行される(つまり CPU 集約的または CPU バウンドな)処理を行うと、スケジューラが他のタスクを実行できなくなります。これは一種のブロッキングです。ただし、少なくともプログラムが何らかの進捗をしているため、IO でブロックしたり待機したりするほど悪いものではありません。しかし(注意と配慮なしには)、何らかの尺度(たとえばテールレイテンシ)でパフォーマンスにとって最適ではない可能性が高く、実行できないタスクが特定の時刻に実行される必要があった場合には、正しさの問題にもなり得ます。CPU 集約的な処理には async Rust(または Tokio のような汎用 async ランタイム)を単純に使うべきではない、というミームがありますが、それは過度の単純化です。正しいのは、特別な扱いなしに IO バウンドなタスクと CPU バウンドなタスク(より正確には、長時間実行されるタスクとレイテンシに敏感なタスク)を混在させて、うまくいくと期待することはできない、ということです。 このセクションの残りでは、レイテンシに敏感なタスクと、長時間実行される CPU 負荷の高いタスクが混在しているものとします。レイテンシに敏感なものが何もない場合は、状況は少し異なります(たいていはより簡単です)。
長時間実行されるタスクやブロッキングするタスクを実行するための解決策は、基本的に 3 つあります。ランタイムの組み込み機能を使う、別スレッドを使う、または別のランタイムを使う、です。
Tokio では、ブロックする可能性のあるタスクを生成するために spawn_blocking を使えます。これはタスクを生成するための spawn のように機能しますが、ブロックする可能性のあるタスク向けに最適化された別のスレッドプールでタスクを実行します(そのタスクはおそらく専用のスレッドで実行されます)。これは通常の同期コードを実行するものであり、非同期タスクではない点に注意してください。つまり、そのタスクはキャンセルできません(その JoinHandle に abort メソッドがあってもです)。他のランタイムも同様の機能を提供しています。
この例では、標準ライブラリの同期ファイルシステム関数を呼び出すことで、spawn_blocking を使ってブロッキング I/O を実行しています。なお、tokio::fs も存在し、非同期ファイルシステム API を提供しています。ただし、内部的にはこれも spawn_blocking でラップされたブロッキング操作を使っています。
use tokio;
#[tokio::main]
async fn main() {
let contents = tokio::task::spawn_blocking(|| {
std::fs::read_to_string("file.txt").unwrap()
})
.await
.unwrap();
// contents を使って何かを行う
}
spawn_blocking で生成されたタスクは中止できないため、最終的に完了する作業を対象としています。受信リクエストを待ち受けるサーバーのように無期限にブロックする可能性のあるタスクは、Tokio のブロッキング用スレッドプールのスレッドを長時間占有しないよう、専用スレッドで実行するほうが適しています。これは std::thread::spawn または同様の API で作成できます。
大量のタスクを実行する必要がある場合は、おそらく何らかのスレッドプールやワークスケジューラーが必要になります。スレッドを生成し続け、利用可能なコア数を大きく上回る数になると、スループットを犠牲にすることになります。Rayon は、並列タスクの実行と管理を容易にする人気のある選択肢です。ワークロードにより特化していたり、実行されるタスクに関する知識を持っていたりするものを使うと、より良いパフォーマンスが得られるかもしれません。
以下は、Rayon を Tokio と併用する例です。Rayon によって生成されたタスクと Tokio 内の現在のタスクとの間で結果をやり取りするために、tokio::oneshot::channel を利用しています。
use rayon::prelude::*;
#[tokio::main]
async fn main() {
let data = 1..=10;
let (send, recv) = tokio::sync::oneshot::channel();
// 現在のタスクをブロックしないように rayon 上でタスクを生成する
std::thread::spawn(move || {
// rayon の並列イテレーターを使って結果を並列に計算する
let results = data.into_par_iter().map(compute).collect::<Vec<_>>();
// 結果を Tokio に送り返す。
send.send(results).unwrap();
});
// rayon タスクを待機して結果を取得する
let results = recv.await.unwrap();
println!("Results: {:?}", results);
}
fn compute(input: u64) -> u64 {
// 大量の整数を合計することで、
// CPU 負荷の高い計算をシミュレートする。
let mut sum = 0u64;
for i in 0..100_000_000 {
sum = sum.wrapping_add(i * i);
}
sum % input
}
レイテンシに敏感なタスクと、長時間実行されるタスクとで、非同期ランタイムの別々のインスタンスを使うことができます。これは CPU バウンドなタスクに適していますが、長時間実行されるタスク用のランタイム上であっても、ブロッキング IO は使うべきではありません。CPU バウンドなタスクに対しては、長時間実行されるタスクを非同期タスクにできる唯一の解決策であるという点で、これは優れた解決策です。また柔軟でもあります(ランタイムは実行するタスクの種類に最適化されるよう設定できるためです。実際、最適なパフォーマンスを得るにはランタイム設定にある程度の労力をかける必要があります)。さらに、Tokio のような成熟した、よく設計されたサブシステムを使う恩恵も受けられます。2 つの異なる非同期ランタイムを使うことさえできます。いずれの場合でも、ランタイムは異なるスレッド上で実行されなければなりません。
一方で、もう少し考える必要があります。正しいランタイム上でタスクを実行していることを確実にしなければならず(これは見た目より難しい場合があります)、タスク間の通信は複雑になることがあります。同期コンテキストと非同期コンテキストの間の同期については次に説明しますが、複数の非同期ランタイム間ではさらに厄介になることがあります。各ランタイムはそれぞれ独自の小さなタスクの宇宙であり、スケジューラーは完全に独立しています。Tokio のチャネルやロックは、異なるランタイム(Tokio 以外のランタイムであっても)から使うことができますが、他のランタイムのプリミティブはこのようには動作しない場合があります。
各ランタイム内のスケジューラーは他のランタイムを認識しておらず(また OS はどの非同期スケジューラーも認識していないため)、スケジューリングの調整や共有された優先順位付けはなく、ランタイム間でワークを盗むこともできません。そのため、タスクのスケジューリングは最適でない場合があります(特にランタイムがワークロードに合わせて十分にチューニングされていない場合)。さらに、すべてのスケジューリングは協調的であるため、長時間実行されるタスクが依然としてリソース不足に陥ることがあり、レイテンシが悪化する可能性があります。長時間実行されるタスクをより協調的にする方法については、次のセクションを参照してください。
純粋なスケジューラーとして見ると、CPU 作業に Tokio を使うことは、専用の同期ワーカープールよりもわずかに高いオーバーヘッドを持つ可能性があります。非同期プログラミングをサポートするために必要な追加の作業を考えれば、これは驚くことではありません。ほとんどのユーザーにとって実際には問題になりにくいですが、コードが極めてパフォーマンスに敏感な場合は考慮する価値があるかもしれません。
上記のいずれの解決策でも、異なるコンテキスト(同期と非同期、または異なる非同期ランタイム)でタスクが実行されることになります。タスク間で通信する必要がある場合は、同期プリミティブと非同期プリミティブ(チャネル、ミューテックスなど)の正しい組み合わせと、それらのプリミティブ上の正しい(ブロッキングまたはノンブロッキングの)メソッドを使っていることに注意する必要があります。ミューテックスや同様のロックについては、await ポイントをまたいでロックを保持する必要がある場合や、IO リソースを保護する必要がある場合には、おそらく非同期版を使うべきです(ブロッキングロックメソッドを使うことで、同期コンテキストからも利用できるはずです)。一方、データを保護する場合や、await ポイントをまたいでロックを保持する必要がない場合には、同期版を使うべきです。Tokio の非同期チャネルは、ブロッキングメソッドを使うことで同期コンテキストから利用できますが、同期チャネルと非同期チャネルをいつ使うべきかについての詳細は、これらのドキュメントを参照してください。
では、上記の解決策のうちどれを使うべきでしょうか?
- ブロッキングIOを行っている場合は、おそらく
spawn_blockingを使うべきです。2つ目のランタイムや他のスレッドプールは使用できません(少なくとも最適なパフォーマンスが必要な場合)。 - 永久に実行されるスレッドがある場合は、どのようなスレッドプールも使うのではなく、
std::thread::spawnを使うべきです(プールのスレッドの1つを使い切ってしまうため)。 - 大量の CPU処理を行っている場合は、専用のものか2つ目の非同期ランタイムのいずれかのスレッドプールを使うべきです。
- 長時間実行される非同期コードを実行する必要がある場合は、2つ目のランタイムを使うべきです。
- より複雑な解決策のほうが最適であっても、簡単で十分なパフォーマンスが得られるという理由で、専用スレッドや
spawn_blockingを使うことを選ぶかもしれません。
Yieldすること
長時間実行されるコードが問題になるのは、スケジューラに他のタスクをスケジュールする機会を与えないためです。非同期並行処理は協調的です。スケジューラは、別のタスクを実行するためにタスクをプリエンプトできません。長時間実行されるタスクがスケジューラにyieldしない場合、スケジューラはそれを停止できません。しかし、長時間実行されるコードがスケジューラにyieldする場合は、他のタスクをスケジュールできるため、タスクが長時間実行されることは問題になりません。これは、CPU集約的な処理に別のスレッドを使う代わりとして、またはCPU集約的な処理をそれ専用のランタイムで行って(場合によっては)パフォーマンスを向上させるために使用できます。
yieldするのは簡単で、ランタイムのyield関数を呼び出すだけです。Tokioではそれが yield_now です。これは、標準ライブラリの yield_now とも、コルーチンからyieldするための yield キーワードとも異なることに注意してください。現在のfutureが select または join の中で実行されている場合、yield_now を呼び出してもスケジューラにはyieldしません(futureを並行に合成する章を参照)。それが望ましい動作かどうかは状況によります。
いつyieldする必要があるかを判断するのは少し難しくなります。まず、自分のプログラムが暗黙的にyieldしているかどうかを知る必要があります。これは .await でのみ発生し得るため、await していないならyieldしていません。しかし、awaitは自動的にスケジューラへyieldするわけではありません。それが起こるのは、await されている末端のfutureがpending(readyではない)である場合、または呼び出しスタックのどこかに明示的な yield がある場合だけです。Tokioやほとんどの非同期ランタイムは、IO関数や同期関数でこれを行いますが、一般的には、デバッグするかソースコードを調べない限り、ある await がyieldするかどうかは分かりません。
経験則として、コードは潜在的なyieldポイントに到達せずに10〜100マイクロ秒を超えて実行されるべきではありません。
参考資料
- CPUバウンドなタスクとブロッキングコードに関するTokioドキュメント
- ブログ記事: What is Blocking?
- ブログ記事: Using Rustlang’s Async Tokio Runtime for CPU-Bound Tasks
futureを並行に合成する
この章では、futureを合成するさらに多くの方法を扱います。特に、futureを並行に(ただし並列ではなく)実行できる新しい方法をいくつか取り上げます。表面的には、この章で導入する新しい関数/マクロはかなり単純です。しかし、その背後にある概念は非常に微妙な場合があります。まずfuture、並行性、並列性について振り返りますが、以前のセクションである並行性と並列性の比較も再確認するとよいでしょう。
futureは遅延された計算です。futureはawaitを使用して進めることができます。これにより制御がランタイムに渡され、現在のタスクはその計算結果を待機します。aとbがfutureである場合、それらは片方をawaitしてからもう片方をawaitすることで、逐次的に合成できます(つまり、aを完了まで実行し、その後bを完了まで実行するfutureを作るように組み合わせられます): async { a.await; b.await}。
また、spawnを使用したfutureの並列合成も見てきました: async { let a = spawn(a); let b = spawn(b); (a.await, b.await)}は2つのfutureを並列に実行します。タプル内のawaitはfutureそのものを待機しているのではなく、futureが完了したときにその結果を取得するためのJoinHandleを待機していることに注意してください。
この章では、並列性なしでfutureを並行に合成する2つの方法、joinとselect/raceを紹介します。どちらの場合も、futureはタイムスライシングによって並行に実行されます。合成された各futureが順番に実行され、次のfutureに順番が回ります。これはasyncランタイムを介さずに行われます(したがって、複数のOSスレッドも使わず、並列性が生じる可能性もありません)。合成を行う構成要素は、futureをローカルでインターリーブします。これらの構成要素は、単一のasyncタスク内で構成要素となるfutureを実行するミニexecutorのようなものだと考えることができます。
joinとselect/raceの根本的な違いは、futureが作業を完了したときの扱い方です。joinはすべてのfutureが完了したときに終了し、select/raceは1つのfutureが完了したときに終了します(他のすべてはキャンセルされます)。どちらにも、エラー処理のためのバリエーションがあります。
これらの構成要素(または類似の概念)はstreamとともに使われることがよくあります。これについては以下で少し触れますが、詳しくはstreamの章で説明します。
並列性が必要な場合(または並列性を明示的に不要としていない場合)、タスクをspawnすることは、これらの合成構成要素よりも単純な代替手段であることが多いです。タスクをspawnする方が、通常はエラーが起こりにくく、より汎用的で、パフォーマンスも予測しやすくなります。一方で、spawnは本質的に構造化の度合いが低いため、ライフサイクルやリソース管理について推論しにくくなる場合があります。
パフォーマンスの問題について、もう少し深く考える価値があります。並行合成で潜在的なパフォーマンス問題になるのは、時間共有の公平性です。プログラム内に100個のタスクがある場合、通常、リソースを共有する最適な方法は、各タスクがプロセッサ時間の1%を得ることです(または、タスクがすべて待機している場合は、それぞれが同じ確率で起床されることです)。100個のタスクをspawnした場合、通常は(おおよそ)そのようになります。しかし、2つのタスクをspawnし、そのうちの1つのタスクで99個のfutureをjoinした場合、スケジューラは2つのタスクしか認識せず、1つのタスクが時間の50%を得て、99個のfutureはそれぞれ0.5%を得ることになります。
通常、タスクの分布はそこまで偏っていませんし、join/selectなどはタイムアウトのように、この振る舞いが実際に望ましい場面で使うことが非常に多いです。しかし、プログラムが意図したパフォーマンス特性を持つようにするためには、考慮する価値があります。
Join
Tokioのjoinマクロは、futureのリストを受け取り、それらすべてを並行に完了まで実行します(すべての結果をタプルとして返します)。すべてのfutureが完了したときに返ります。futureは常に同じスレッド上で実行されます(並行であり、並列ではありません)。
簡単な例を示します。
async fn main() {
let (result_1, result_2) = join!(do_a_thing(), do_a_thing());
// `result_1` と `result_2` を使用する。
}
ここでは、do_a_thingの2つの実行が並行に発生し、両方が完了したときに結果が準備できています。結果を取得するためにawaitしていないことに注意してください。join!は暗黙的にfutureをawaitし、値を生成します。futureを作成するわけではありません。それでも、asyncコンテキスト内(たとえばasync関数内)で使用する必要があります。
上の例では見えませんが、join!はfutureに評価される式を受け取ります1。joinはその本体内にasyncコンテキストを作成しないため、joinに渡すfutureをawaitすべきではありません(そうしないと、joinされたfutureより前に評価されてしまいます)。
すべてのfutureは同じスレッド上で実行されるため、いずれかのfutureがそのスレッドをブロックすると、どのfutureも進行できません。mutexやその他のロックを使用している場合、あるfutureが別のfutureによって保持されているロックを待機していると、簡単にデッドロックにつながる可能性があります。
joinはfutureの結果を気にしません。特に、futureがキャンセルされたりエラーを返したりしても、他のfutureには影響しません。他のfutureは実行を継続します。「fail fast」の振る舞いが必要な場合は、try_joinを使用してください。try_joinはjoinと同様に動作しますが、いずれかのfutureがErrを返した場合、他のすべてのfutureはキャンセルされ、try_joinはただちにそのエラーを返します。
以前のasync/awaitの章では、spawnされたタスクをjoinすることについて話すために「join」という言葉を使いました。その名前が示すように、futureのjoinとタスクのjoinは関連しています。joinとは、複数のfutureを並行に実行し、継続する前にその結果を待つことを意味します。構文は異なります。JoinHandleを使用する場合とjoinマクロを使用する場合がありますが、考え方は似ています。重要な違いは、タスクをjoinする場合、タスクは並行かつ並列に実行される一方、join!を使用する場合、futureは並行に実行されますが並列には実行されないことです。さらに、spawnされたタスクはランタイムのスケジューラ上でスケジューリングされますが、join!ではfutureはローカルに「スケジューリング」されます(同じタスク上で、マクロの実行の時間的スコープ内で)。もう1つの違いは、spawnされたタスクがpanicした場合、そのpanicはランタイムによって捕捉されますが、join内のfutureがpanicした場合、タスク全体がpanicすることです。
代替手段
futureを並行に実行してその結果を収集することは、よくある要件です。そうしない十分な理由がない限り(つまり、並列性を明示的に望んでいない場合であり、その場合でもspawn_localを使う方を好むかもしれません)、おそらくspawnとJoinHandleを使用すべきです。JoinSet抽象化は、そのようなspawnされたタスクをjoin!に似た方法で管理します。
ほとんどのランタイム(および futures.rs)には、Tokio の join マクロに相当するものがあり、それらはほぼ同じように振る舞います。join 関数もあり、これはマクロに似ていますが、柔軟性は少し劣ります。たとえば、futures.rs には 2つの Future を join するための join、名前から分かる数の Future を join するための join3、join4、join5、そして Future のコレクションを join するための join_all があります(これらそれぞれの try_ バリエーションもあります)。
Futures-concurrency も join(および try_join)の機能を提供しています。futures-concurrency のスタイルでは、これらの操作はタプル、Vec、配列などの Future のグループに対するトレイトメソッドです。たとえば、2つの Future を join するには、(fut1, fut2).join().await と書きます(ここでは await が明示的であることに注意してください)。
一緒に join したい Future の集合が動的に変化する場合(たとえば、ネットワーク経由で入力が届くにつれて新しい Future が作成される場合)、またはすべての Future が完了したときではなく、完了した順に結果を取得したい場合は、ストリームと FuturesUnordered または FuturesOrdered の機能を使う必要があります。これらについては ストリーム の章で扱います。
Race/select
Future を join することの対応物は、それらを race させること(別名、それらに対して select すること)です。race/select では Future は並行に実行されますが、すべての Future の完了を待つのではなく、最初の1つが完了するのを待ってから他をキャンセルします。これは join と似ているように聞こえますが、今度はキャンセルについて考える必要があるため、かなり興味深く(そして時にエラーを起こしやすく)なります。
Tokio の select マクロを使った例を示します。
async fn main() {
select! {
result = do_a_thing() => {
println!("計算が完了し、{result} を返しました");
}
_ = timeout() => {
println!("計算がタイムアウトしました");
}
}
}
select マクロの中で Future の結果を処理するため、join マクロよりもすでに物事が興味深くなっていることに気付くでしょう。これは match 式に少し似ていますが、select ではすべての分岐が並行に実行され、最初に完了した分岐の本体がその結果とともに実行されます(他の分岐は実行されず、Future は drop されることによってキャンセルされます)。この例では、do_a_thing と timeout が並行に実行され、先に完了した方のブロックが実行されます(つまり、実行される println は1つだけです)。もう一方の Future はキャンセルされます。join マクロと同様に、Future の await は暗黙的です。
Tokio の select マクロは多くの機能をサポートしています。
- パターンマッチング: 各分岐の
=の左側の構文はパターンにすることができ、Future の結果がそのパターンに一致する場合にのみブロックが実行されます。パターンが一致しない場合、その Future はそれ以降 poll されません(ただし、他の Future は poll されます)。これは、任意で値を返す Future に役立ちます。たとえば、Some(x) = do_a_thing() => { ... }です。 ifガード: 各分岐はifガードを持つことができます。selectマクロが実行されると、各式を評価して Future を生成した後にifガードが評価され、そのガードが true の場合にのみ Future が poll されます。たとえば、x = = do_a_thing() if false => { ... }は決して poll されません。ifガードはポーリング中に再評価されるのではなく、マクロが初期化されたときにのみ評価されることに注意してください。else分岐:selectはelse => { ... }というelse分岐を持つことができます。これは、すべての Future が停止し、どのブロックも実行されていない場合に実行されます。これがelse分岐なしで発生した場合、selectは panic します。
select! マクロの値は(match と同じように)実行された分岐の値であるため、すべての分岐は同じ型でなければなりません。たとえば、上の例の結果を select の外で使いたい場合は、次のように書きます。
async fn main() {
let result = select! {
result = do_a_thing() => {
Some(result)
}
_ = timeout() => {
None
}
};
// `result` を使用
}
join! と同様に、select! は Result を特別扱いしません(前述のパターンマッチングを除きます)。ある分岐がエラーで完了した場合、他のすべての分岐はキャンセルされ、そのエラーが select の結果として使われます(その分岐が正常に完了した場合と同じように)。
select マクロは本質的にキャンセルを使用するため、プログラムでキャンセルを避けようとしているなら、select! を避けなければなりません。実際、非同期プログラムにおいて select はしばしばキャンセルの主な発生源です。別の場所 で説明したように、キャンセルにはバグにつながり得る微妙な問題が数多くあります。特に、select は単に Future を drop することでキャンセルすることに注意してください。これは drop される Future に通知したり、キャンセルトークンなどをトリガーしたりしません。
select! は、ストリームやその他の Future のシーケンスを処理するために、ループの中でよく使われます。これにより、複雑さとバグの機会がさらに一段増えます。ループの各イテレーションで新しい独立した Future を作成する単純な場合には、物事はそれほど複雑にはなりません。しかし、これが必要とされることはまれです。一般には、イテレーション間で何らかの状態を保持したいものです。select をストリームとともにループ内で使い、ループの各イテレーションでストリームからの1つの結果を処理することは一般的です。例:
async fn main() {
let mut stream = ...;
loop {
select! {
result = stream.next() => {
match result {
Some(x) => println!("受信しました: {x}"),
None => break,
}
}
_ = timeout() => {
println!("タイムアウトしました!");
break;
}
}
}
}
この例では、stream から値を読み取り、値がなくなるか、結果待ちがタイムアウトするまでそれらを出力します。タイムアウトの場合にストリーム内に残っているデータがどうなるかは、ストリームの実装によって異なります(失われるかもしれません! あるいは重複するかもしれません!)。これは、キャンセルに直面したときの振る舞いが重要(かつ厄介)になり得る理由の一例です。
イテレーションをまたいで、ストリームだけでなく future も再利用したい場合があります。たとえば、各イテレーションに新しいタイムアウトを適用するのではなく、すべてのイテレーションに適用される timeout future と競合させたい場合があります。これは、future をループの外側で作成し、それを参照することで可能です。
async fn main() {
let mut stream = ...;
let mut timeout = timeout();
loop {
select! {
result = stream.next() => {
match result {
Some(x) => println!("received: {x}"),
None => break,
}
}
// `timeout` をムーブするのではなく、参照を作成する。
_ = &mut timeout => {
println!("time out!");
break;
}
}
}
}
select! の外側で作成された future や stream を使って、ループ内で select! を使用する場合には、いくつか重要な詳細があります。これらは select の動作方法から生じる根本的な帰結なので、最後の例の timeout を例として使い、select の詳細を順に追いながら紹介します。
timeoutはループの外側で作成され、カウントダウンする時間で初期化されます。- ループの各イテレーションで、
selectはtimeoutへの参照を作成しますが、その状態は変更しません。 selectが実行されると、timeoutをポーリングします。残り時間がある間はPendingを返し、時間が経過するとReadyを返します。その時点でそのブロックが実行されます。
上の例では、timeout が ready になったときに、ループから break しています。しかし、そうしなかったらどうなるでしょうか?その場合、select は単に再び timeout をポーリングしますが、Future のドキュメントでは、これは起きるべきではないとされています!select にはこれを防ぐ手段がありません。timeout をポーリングすべきかどうかを判断するための状態を(イテレーション間で)持たないからです。timeout がどのように書かれているかによって、これにより panic、ロジックエラー、ある種のクラッシュが発生する可能性があります。
この種のバグは、いくつかの方法で防ぐことができます。
- 再ポーリングが安全になるように、fused な future または stream を使用する。
- たとえばループを抜ける(前の例のように)、または
ifガードを使用するなどして、future が決して再ポーリングされないようにコードを構造化する。
次に、&mut timeout の型について考えてみましょう。timeout() は Future を実装する型を返すものと仮定します。それは async 関数からの匿名型かもしれませんし、Timeout のような名前付き型かもしれません。例を簡単にするため後者だと仮定します(ただし、どちらの場合でもロジックは適用されます)。Timeout が Future を実装しているとして、&mut Timeout は Future を実装するでしょうか?必ずしもそうではありません!これを真にする包括的な impl がありますが、それは Timeout が Unpin を実装している場合に限られます。これはすべての future に当てはまるわけではないため、最後の例のようなコードを書くと、しばしば型エラーが発生します。ただし、このようなエラーは、たとえば let mut timeout = pin!(timeout()); のように pin マクロを使うことで簡単に修正できます。
ループ内の select によるキャンセルは、微妙なバグの豊富な発生源です。これらは通常、future が何らかのデータに関する状態を含んでいるものの、データ自体は含んでいない場合に発生します。キャンセルによって future がドロップされると、その状態は失われますが、基になるデータは更新されません。これにより、データが失われたり、複数回処理されたりする可能性があります。
代替手段
Futures.rs には独自の select マクロ があり、futures-concurrency には Tokio の select マクロの代替となる Race trait があります。これらはいずれも、複数の future を並行に競合させ、最初の結果を処理して他をキャンセルするという同じ中核的なセマンティクスを持ちますが、構文が異なり、細部にも違いがあります。
Futures.rs の select は表面的には Tokio のものと似ています。違いを要約すると、futures.rs 版では次のようになります。
- Future は常に fused でなければならない(型チェックによって強制されます)。
selectにはelse分岐ではなく、default分岐とcomplete分岐があります。selectはifガードをサポートしません。
Futures-concurrency の Race は、そのバージョンの join に似た非常に異なる構文を持ちます。たとえば、(future_a, future_b).race().await です(タプルだけでなく Vec や配列でも動作します)。この構文はマクロほど柔軟ではありませんが、ほとんどの async コードにうまく馴染みます。ループ内で race を使用する場合でも、select と同じ問題が発生し得ることに注意してください。
join と同様に、タスクを spawn して並列に実行させることは、select を使用する代替手段として適していることがよくあります。ただし、最初のタスクが完了した後に残りのタスクをキャンセルするには、追加の作業が必要です。これはチャネルやキャンセルトークンを使って実現できます。どちらの場合でも、キャンセルされるタスク側で何らかのアクションが必要になります。つまり、そのタスクは後片付けやその他のグレースフルシャットダウンを行うことができます。
select の一般的な用途(特にループ内)は、stream を扱うことです。select のいくつかの用途を置き換えられる stream コンビネータメソッドがあります。たとえば、futures-concurrency の merge は、複数の stream を結合するための優れた代替手段です。
最後に
このセクションでは、future のグループを並行に実行する 2 つの方法について説明しました。future を join するとは、それらすべての完了を待つことを意味します。future を select する(別名 race する)とは、最初のものが完了するのを待つことを意味します。タスクを spawn する場合とは対照的に、これらの合成は並列性を利用しません。
join と select はどちらも、あらかじめわかっている future の集合に対して動作します(多くの場合、実行時ではなくプログラムを書く時点でわかっています)。場合によっては、合成する future があらかじめわかっていないことがあります。実行中に、合成された future の集合へ future を追加しなければなりません。このためには、独自の合成操作を持つ stream が必要です。
これらの合成演算子は強力で表現力豊かですが、多くの場合、タスクと spawn を使用するほうが簡単で適切であることを改めて強調しておく価値があります。並列性が望ましいことは多く、キャンセルやブロッキングに関するバグが発生しにくく、リソース割り当ても通常はより公平(または少なくともより単純)で予測しやすくなります。
-
式は
IntoFutureを実装する型でなければなりません。式は評価され、マクロによって Future に変換されます。つまり、実際に Future に評価される必要はなく、Future に変換できる何かであればよいのですが、これはかなり小さな違いです。式自体は、結果として得られる Future が実行される前に順番に評価されます。 ↩
チャネル、ロック、同期
同期プリミティブのランタイム固有性に関する注記
同期のものを使うのではなく、非同期プリミティブが必要な理由
チャネル
- 基本的に std のものと同じだが、await する
- タスク間で通信する(同じスレッドまたは別のスレッド)
- ワンショット
- mpsc
- その他のチャネル
- 有界チャネルと無界チャネル
ロック
- 非同期 Mutex
- 比較: std::Mutex - await ポイントをまたいで保持できる(ガード内で mutex を借用する、ガードは Send、スケジューラ対応? それとも単に lock が async だから?)、lock は async(ロックが利用可能になるのを待つ間スレッドをブロックしない)
- await をまたいでガードを保持することに対する Clippy lint さえある(https://rust-lang.github.io/rust-clippy/master/index.html#await_holding_lock)
- await をまたいで保持できるため、より高コスト
- 可能なら std::Mutex を使う
- try_lock を使える、または mutex が競合状態にならないと想定される
- 可能なら std::Mutex を使う
- yield しても lock が魔法のように drop されるわけではない(それこそが lock の要点のようなもの!)
- await をまたいで mutex を保持することによるデッドロック
- タスクはデッドロックするが、他のタスクは進行できるため、プロセス統計/ツール/OS ではデッドロックのように見えない可能性がある
- 通常の助言 - スコープを限定する、ロックを最小化する、ロックの順序を決める、代替手段を優先する
- mutex poisoning はない
- lock_owned
- blocking_lock
- async では使用できない
- 他のロックにも当てはまる(上記は mutex を具体的に論じる前に移動すべきか? おそらくそう)
- 比較: std::Mutex - await ポイントをまたいで保持できる(ガード内で mutex を借用する、ガードは Send、スケジューラ対応? それとも単に lock が async だから?)、lock は async(ロックが利用可能になるのを待つ間スレッドをブロックしない)
- RWLock
- Semaphore
- yield
その他の同期プリミティブ
- notify、barrier
- OnceCell
- atomics
asyncプログラミングのためのツール
- asyncに特化したツールが必要な理由
- 対象にすべき他のツールはあるか
- loom
監視
トレーシングとロギング
- asyncトレーシングに関する課題
- tracingクレート (https://github.com/tokio-rs/tracing)
デバッグ
- asyncバックトレースを理解する(RUST_BACKTRACEとデバッガー内)
- asyncコードをデバッグするための手法
- デバッグにTokio consoleを使用する
- デバッガーサポート(WinDbg?)
プロファイリング
- asyncがフレームグラフをどのように乱すか
- async IOをプロファイリングする方法
- ランタイムに関する洞察を得る
- Tokio metrics
破棄とクリーンアップ
- オブジェクトの破棄と Drop の振り返り
- ソフトウェアにおける一般的なクリーンアップ要件
- async に関する問題
- クリーンアップ中に非同期で何かを行いたい場合がある。例: 最後のメッセージを送信する
- まだ非同期的に使用されているものをクリーンアップする必要がある場合がある
- async タスクが完了またはキャンセルされ、それを捕捉する方法がない場合にクリーンアップしたい場合がある
- クリーンアップフェーズ中のランタイムの状態(特にパニック中などの場合)
- async Drop はない
- 作業中
- completion io トピックへの前方参照
キャンセル
- どのように発生するか(more-async-await.md の振り返り)
- future を drop する
- キャンセルトークン
- abort 関数
- キャンセルを「捕捉」するために何ができるか
- キャンセルのログ記録または監視
- キャンセルが他の future タスクにどのように影響するか(キャンセル安全性の章への前方参照。ここでは注意喚起に留める)
パニックと async
- タスク間でのパニックの伝播(spawn の結果)
- パニックによりデータが不整合な状態で残ること(tokio のミューテックス)
- パニック中に async コードを呼び出すこと(絶対に行わないようにする)
クリーンアップのパターン
- クリーンアップが必要にならないようにする(abort/restart)
- クリーンアップに async を使用せず、あまり心配しすぎない
- async クリーンアップメソッド + dtor bomb(つまり、クリーンアップを破棄から分離する)
- クリーンアップを別のタスク、スレッド、または supervisor オブジェクト/プロセスに集約/外部委託する
- https://tokio.rs/tokio/topics/shutdown
なぜ async Drop がないのか(まだ)
- これは高度なセクションであり、読む必要はないことに注意
- async Drop が難しい理由
- 考えられる解決策とその問題点
- 現在の状況
Future
前の章では Future について多く話してきました。Future は Rust の async プログラミングにおける重要な要素です!この章では、Future とは何か、どのように動作するのか、そして Future を直接扱うためのライブラリについて、いくつかの詳細に踏み込んでいきます。
Future トレイトと IntoFuture トレイト
- Future
- Output 関連型
- ここでは実質的な詳細は扱わない。ポーリングは次のセクションで扱い、Pin、executor/waker に関する高度なセクションを参照する
- IntoFuture
- 使用方法 - 一般的な使い方、await 内での使い方、async ビルダーパターン(使用する際の長所と短所)
- Future のボックス化、
Box<dyn Future>、以前は一般的で必要だったが、再帰などを除いて現在ではほとんど必要ないこと
ポーリング
- それが何であり、誰が行うのか、Poll 型
- ready は最終状態である
- await とどのように関係するか
- drop = キャンセル
- Future、したがってタスクについて
- async プログラミング全般への影響
- キャンセル安全性に関する章への参照
Fuse 化
futures-rs クレート
- 歴史と目的
- stream の章を参照
- executor やその他の低レベルな Future 関連のものを書くためのヘルパー
- pinning とボックス化
- 部分的なランタイムとしての executor(リファレンスの代替ランタイムを参照)
- TryFuture
- 便利な Future: pending、ready、ok/err など
- FutureExt 上のコンビネーター関数
- Tokio 関連のものに対する代替
- 関数
- IO トレイト
futures-concurrency クレート
https://docs.rs/futures-concurrency/latest/futures_concurrency/
ランタイムとランタイムの問題
非同期コードの実行
- 明示的な起動 vs async main
- tokio コンテキストの概念
- block_on
- コードに反映されるランタイム (Runtime, Handle)
- ランタイムのシャットダウン
スレッドとタスク
- デフォルトのワークスティーリング、マルチスレッド
- Send + ’static 境界の再確認
- yield
- spawn-local
- spawn-blocking (おさらい)、block-in-place
- 他のスレッドへの yield、ローカルキューとグローバルキューの違いなど、tokio 固有の事項
設定オプション
- スレッドプールサイズ
- シングルスレッド、コアごとに 1 スレッドなど
代替ランタイム
- 別のランタイムを使う、または独自に実装したい理由
- 高レベル設計にはどのようなバリエーションが存在するか
- 発展的な章への前方参照
タイマーとシグナル処理
時間とタイマー
- ランタイムとの統合、thread::sleep などを使用しない
- std の Instant と Duration
- sleep
- interval
- timeout
- 特別な future と select/race の比較
シグナル処理
- シグナル処理とは何か、なぜ async の問題なのか?
- OS に大きく依存する
- Tokio のドキュメントを参照
非同期イテレーター(旧称 streams)
- 非同期イテレーターとしての Stream、または多数の Future としての Stream
- 作業中
- 現在の状態
- futures と Tokio の Stream トレイト
- nightly のトレイト
- 同期イテレーターのように遅延的
- ピン留めと Stream(ピン留めの章への前方参照)
- fused Stream
非同期イテレーターの消費
- async next を使った while let
- for_each、for_each_concurrent
- collect
- into_future、buffered
Stream コンビネーター
- クロージャーの代わりに Future を受け取る
- いくつかのコンビネーター例
- unordered のバリエーション
- StreamGroup
Stream における join/select/race
- ループ内で select を使う際の危険性
- fusing
- 単なる Future との違い
- これらの代替
- Stream::merge など
非同期イテレーターの実装
- トレイトの実装
- 実用上の事項とユーティリティ関数
- async_iter stream マクロ
Sink
- https://docs.rs/futures/latest/futures/sink/index.html
今後の作業
- 現在の状態
- https://rust-lang.github.io/rfcs/2996-async-iterator.html
- async next と poll
- 非同期イテレーション構文
- (async)ジェネレーター
- lending iterator
キャンセルとキャンセル安全性
内部キャンセルと外部キャンセル スレッドと Future drop = キャンセル await ポイントでのみ 有用な機能 それでもやや唐突で驚きがある その他のキャンセル機構 abort キャンセルトークン
キャンセル安全性
メモリ安全性の問題や競合状態ではない データ損失またはその他のロジックエラー 異なる定義/名称 tokio の定義 一般的な定義/halt safety 複製された Future の考え方を適用する 単純なデータ損失 再開 ループ内の select または類似のものに関する問題 根本原因として、Future とコンテキストの間で状態を分割していること
ピン留め
ピン留めは、悪名高いほど難しい概念であり、いくつかの微妙で紛らわしい性質があります。このセクションでは、このトピックを詳細に(議論の余地はありますが、詳細すぎるほど)扱います。ピン留めは Rust における async プログラミングの実装にとって重要です1が、ピン留めに一度も遭遇せずに、ましてや深く理解する必要もなく、かなり先まで進むことは可能です。
最初のセクションでは、ピン留めの概要を説明します。ほとんどの async プログラマーにとっては、これを知っていれば十分であることを期待しています。この章の残りは、実装者、高度または低レベルの async プログラミングを行う人、そして好奇心のある人向けです。
概要の後、この章ではピン留めに入る前に、ムーブセマンティクスに関する背景を説明します。一般的な考え方を扱い、その後 Pin 型と Unpin 型、ピン留めがその目的をどのように達成するか、そして実際にピン留めを扱う際のいくつかのトピックを取り上げます。その後、ピン留めと async プログラミングに関するセクション、およびピン留めの代替案や拡張(本当に好奇心のある人向け)に関するセクションがあります。章の最後には、別の説明や参考資料へのリンクがあります。
TL;DR
Pin は、ポインターが、ドロップされるまで移動しないオブジェクトを指していることを示します。ピン留めは言語やコンパイラーに組み込まれているものではありません。参照先への可変参照へのアクセスを単に制限することで機能します。unsafe コードではピン留めを破るのは十分に簡単ですが、unsafe コードにおけるすべての安全性保証と同様に、そうしないことはプログラマーの責任です。
オブジェクトが移動しないことを保証することで、ピン留めは構造体のあるフィールドから別のフィールドへの参照(自己参照と呼ばれることもあります)を持つことを安全にします。これは async 関数の実装に必要です(async 関数は、変数がフィールドとして保存されるデータ構造として実装されます。変数同士が参照し合う可能性があるため、async 関数を実装する Future のフィールド同士も参照し合える必要があります)。ほとんどの場合、プログラマーはこの詳細を意識する必要はありませんが、Future を直接扱う場合には、Future::poll のシグネチャーが self にピン留めされていることを要求するため、意識する必要があるかもしれません。
Future を参照で使用している場合、その参照が引き続き Future トレイトを実装することを保証するために、pin!(...) を使って参照をピン留めする必要があるかもしれません(これは select マクロでよく発生します)。同様に、Future に対して手動で poll を呼び出したい場合(通常は別の Future を実装しているため)、その Future へのピン留めされた参照が必要になります(pin! を使用するか、引数がピン留めされた型を持つことを保証してください)。Future を実装している場合、または何らかの別の理由でピン留めされた参照を持っており、オブジェクトの内部への可変アクセスが必要な場合は、ピン留めされたフィールドに関する以下のセクションを理解し、どのように行うか、いつ安全なのかを知る必要があります。
ムーブセマンティクス
ピン留めや関連するトピックを議論するうえで有用な概念が、場所 という考え方です。場所とは、値が存在できるメモリの塊(アドレスを持つもの)です。参照は実際には値を指しているのではなく、場所を指しています。だからこそ *ref = ... は意味を持ちます。デリファレンスによって得られるのは値のコピーではなく、場所です。場所は言語実装者にはよく知られていますが、通常、プログラミング言語では暗黙的です(Rust でも暗黙的です)。プログラマーは通常、場所についてのよい直感を持っていますが、それを明示的に考えてはいないかもしれません。
参照だけでなく、変数やフィールドアクセスも場所として評価されます。実際、代入の左辺に現れることができるものは何であれ、実行時には場所でなければなりません(そのため、コンパイラー用語では場所は「lvalue」と呼ばれます)。
Rust では、可変性は場所の性質であり、借用の結果として「凍結」されることも同様です(その場所は借用されている、と言うこともできます)。
Rust における代入はデータを 移動 します(ほとんどの場合です。一部の単純なデータはコピーセマンティクスを持ちますが、ここではあまり重要ではありません)。let b = a; と書くと、a によって識別される場所のメモリにあったデータが、b によって識別される場所へ移動されます。つまり、代入後にはデータは b に存在しますが、a にはもはや存在しません。別の言い方をすれば、そのオブジェクトのアドレスは代入によって変更されます2。
移動元の場所を指すポインターが存在していた場合、それらのポインターはもはやそのオブジェクトを指していないため無効になります。これが、借用参照がムーブを防ぐ理由です。let r = &a; let b = a; は不正です。r の存在によって a の移動が防がれます。
コンパイラーが認識しているのは、オブジェクトの外部からそのオブジェクトへの参照だけです(上の例や、オブジェクトのフィールドへの参照など)。オブジェクトの完全に内部にある参照は、コンパイラーからは見えません。次のようなものを書くことが許されていたと想像してみてください。
#![allow(unused)]
fn main() {
struct Bad {
field: u64,
r: &'self u64,
}
}
Bad のインスタンス b があり、b.r が b.field を指しているとします。let a = b; では、b.field への内部参照 b.r はコンパイラーから見えないため、b への参照は存在しないように見え、したがって a へのムーブは問題ないように見えます。しかし、もしそれが起きた場合、ムーブ後には a.r は期待どおり a.field を指すのではなく、b.field の古い位置にある無効なメモリを指すことになり、Rust の安全性保証に違反します。
データの移動は値に限定されません。データは一意参照から取り出して移動することもできます。Box をデリファレンスすると、データはヒープからスタックへ移動します。take、replace、swap(いずれも std::mem にあります)は、可変参照(&mut T)からデータを移動します。Box から移動すると、指されていた場所は無効になります。可変参照から移動すると、その場所は有効なままですが、別のデータを含むようになります。
抽象的には、ムーブは元の位置から宛先へビットをコピーし、その後元のビットを消去することで実装されます。しかし、コンパイラーはこれを多くの方法で最適化できます。
ピン留め
重要な注意: まず、ピン留めという抽象的な概念について説明します。これは、特定の型によって表現されるものと正確に同じではありません。進むにつれてこの概念をより具体化し、最終的には異なる型が何を意味するのかについて正確な定義に到達しますが、これらの型のどれも、最初に扱うピン留めの概念と完全に同じ意味を持つわけではありません。 オブジェクトは、移動されたりその他の形で無効化されたりしない場合、ピン留めされています。上で説明したように、これは新しい概念ではありません。オブジェクトを借用すると、その借用の期間中はオブジェクトの移動が防止されます。オブジェクトを移動できるかどうかは Rust の型では明示されませんが、コンパイラには把握されています(そのため “cannot move out of” エラーメッセージが出ることがあります)。借用(および借用によって生じる一時的な移動制限)とは異なり、ピン留めされていることは永続的です。オブジェクトはピン留めされていない状態からピン留めされた状態に変化できますが、いったんピン留めされると、drop されるまでピン留めされたままでなければなりません3。
ポインター型が指し先の所有権や可変性を反映する(たとえば Box と &、&mut と &)のと同じように、ピン留めされているかどうかもポインター型に反映したいと考えます。これはポインターの性質ではありません。ポインター自体がピン留めされている、あるいは移動可能であるわけではありません。これは指されている場所の性質、つまり指し先をその場所から移動できるかどうかです。
大まかに言えば、Pin<Box<T>> は所有されたピン留め済みオブジェクトへのポインターであり、Pin<&mut T> は一意に借用された可変なピン留め済みオブジェクトへのポインターです(これに対して &mut T は、一意に借用された可変なオブジェクトへのポインターであり、そのオブジェクトはピン留めされている場合もされていない場合もあります)。
ピン留めの概念は Rust 1.0 より後に追加されたものであり、後方互換性の理由から、オブジェクトがピン留めされているかどうかを明示的に表現する方法はありません。表現できるのは、参照がピン留め済みオブジェクトを指しているか、ピン留めされていないオブジェクトを指しているかだけです。
ピン留めは可変性と直交します。オブジェクトは可変で、かつピン留めされている(Pin<&mut T>)場合も、されていない(&mut T)場合もあります(つまり、オブジェクトは変更可能であり、さらにその場にピン留めされているか、移動可能であるかのどちらかです)。また、不変で、かつピン留めされている(Pin<&T>)場合も、されていない(T)場合もあります(つまり、オブジェクトは変更できず、さらに移動もできないか、変更はできないが移動はできるかのどちらかです)。&T は変更も移動もできませんが、ピン留めされているわけではありません。なぜなら、その移動不能性は一時的なものにすぎないからです。
Unpin
移動するかしないかという観点からピン留めを導入しましたし、名前からもある程度それが示唆されますが、Pin は実際には、指し先が実際に移動するかどうかについて多くを教えてくれるものではありません。
何だって? はぁ。
ピン留めは実際には、移動に関するものではなく、有効性に関する契約です。これは、オブジェクトがアドレスに依存する場合、そのアドレスは変化しない(したがって、そのフィールドのアドレスなど、そこから派生したアドレスも変化しない)ことを保証します。Rust のほとんどのデータはアドレスに依存しません。移動されてもすべて問題ありません。Pin は、指し先がそのアドレスに関して有効であり続けることを保証します。指し先がアドレスに依存する場合、それは移動できません。アドレスに依存しない場合、移動されるかどうかは問題ではありません。
Unpin は、オブジェクトがアドレスに依存するかどうかを表すトレイトです。オブジェクトが Unpin を実装している場合、それはアドレスに依存しません。オブジェクトが !Unpin である場合、それはアドレスに依存します。別の見方をすれば、ピン留めをオブジェクトをその場所に保持する行為と考えるなら、Unpin はその行為を解除してオブジェクトの移動を許可しても安全であることを意味します。
Unpin は auto-trait であり、ほとんどの型は Unpin です。!Unpin なフィールドを持つ型、または明示的にオプトアウトした型だけが Unpin ではありません。オプトアウトするには、PhantomPinned フィールドを持たせるか、(nightly を使用している場合は)impl !Unpin for ... {} を使います。
Unpin を実装する型に対しては、Pin は本質的に何もしません。Pin<Box<T>> と Pin<&mut T> は、Box<T> や &mut T> とまったく同じように使用できます。実際、Unpin 型については、Pin::new と Pin::into_inner を使って、Pin されたポインターと通常のポインターを自由に相互変換できます。改めて述べる価値があります。Pin<...> は指し先が移動しないことを保証するものではなく、指し先が !Unpin である場合にのみ移動しないことを保証します。
上記の実用上の含意は、Unpin 型とピン留めを扱うことは、Unpin ではない型を扱う場合よりもはるかに簡単だということです。実際、Pin マーカーは Unpin 型および Unpin 型へのポインターに対しては基本的に効果を持たず、ピン留めに関する保証や要件は基本的に無視できます。
Unpin はオブジェクト単体の性質として理解すべきではありません。Unpin が変える唯一のことは、オブジェクトが Pin とどのように相互作用するかです。ピン留めの文脈以外で Unpin 境界を使っても、コンパイラの振る舞いや、そのオブジェクトに対して何ができるかには影響しません。Unpin を使う唯一の理由は、ピン留めと組み合わせるため、またはその境界をピン留めとともに使われる場所まで伝播させるためです。
Pin
Pin はマーカー型です。型検査において重要ですが、コンパイル時に消去され、実行時には存在しません(Pin<Ptr> は Ptr と同じメモリレイアウトおよび ABI を持つことが保証されています)。これはポインター(Box など)のラッパーであるため、ポインター型のように振る舞いますが、間接参照を追加するわけではありません。プログラムの実行時には、Box<Foo> と Pin<Box<Foo>> は同じです。Pin はポインターそのものというより、ポインターに対する修飾子と考える方が適切です。
Pin<Ptr> は、Ptr 自体ではなく、Ptr の指し先がピン留めされていることを意味します。つまり、Pin は、指し先が drop されるまで、そのアドレスに関して有効であり続けることを保証します。指し先がアドレスに依存する(つまり !Unpin である)場合、その指し先は移動されません。
値のピン留め
オブジェクトはピン留めされた状態で作成されるわけではありません。オブジェクトはピン留めされていない状態から始まり(自由に移動される可能性があり)、そのオブジェクトを指すピン留めポインターが作成されたときにピン留めされた状態になります。オブジェクトが Unpin であれば、これは Pin::new を使えば些細なことです。しかし、オブジェクトが Unpin でない場合、それをピン留めするには、エイリアスを通じて移動または無効化されないことを保証しなければなりません。
ヒープ上のオブジェクトをピン留めするには、Box::pin を使用して新しいピン留め用の Box を作成するか、Box::into_pin を使用して既存の Box をピン留め用の Box に変換できます。どちらの場合でも、最終的には Pin<Box<T>> が得られます。他のいくつかのポインター(Arc や Rc など)にも同様の仕組みがあります。そうでないポインター、または独自のポインター型については、Pin::new_unchecked を使用してピン留めされたポインターを作成する必要があります4。これは unsafe 関数であるため、プログラマーは Pin の不変条件が維持されることを保証しなければなりません。つまり、指し先はどのような状況でも、そのデストラクタが呼び出されるまで有効なままでなければなりません。これを保証するにはいくつか微妙な詳細があります。詳細については、その関数のドキュメント または以下のセクション pinning の仕組み を参照してください。
Box::pin はオブジェクトをヒープ内の場所にピン留めします。スタック上のオブジェクトをピン留めするには、pin マクロを使用して、可変参照(Pin<&mut T>)を作成し、ピン留めできます5。
Tokio にも pin マクロがあり、std のマクロと同じことを行い、さらにマクロ内で変数への代入もサポートしています。futures-rs クレートと pin-utils クレートには pin_mut マクロがあり、以前は一般的に使われていましたが、現在は std のマクロを優先するため非推奨になっています。
Pin::static_ref と Pin::static_mut を使用して static 参照をピン留めすることもできます。
ピン留めされた型の使用
理論上、ピン留めされたポインターの使用は他のポインター型の使用と同じです。しかし、これは最も直感的な抽象化ではなく、言語サポートもないため、ピン留めされたポインターの使用はかなり扱いにくくなりがちです。pinning を使用する最も一般的なケースは Future と Stream を扱う場合であり、それらの詳細については以下でより詳しく説明します。
ピン留めされたポインターを不変借用参照として使用するのは、Pin が Deref を実装しているため簡単です。必要であれば明示的な deref() を使用しつつ、ほとんどの場合 Poll<Ptr<T>> を &T として扱えます。同様に、Pin<&T> を取得するのも as_ref() を使用すれば非常に簡単です。
ピン留めされた型を扱う最も一般的な方法は Pin<&mut T> を使用することです(たとえば Future::poll)。しかし、ピン留めされたオブジェクトを生成する最も簡単な方法は Box::pin であり、これは Pin<Box<T>> を返します。後者は Pin::as_mut を使用して前者に変換できます。しかし、参照を再利用するための言語サポート(暗黙の再借用)がないため、その結果を再利用するのではなく、as_mut を呼び続ける必要があります。例(as_mut のドキュメントより):
#![allow(unused)]
fn main() {
impl Type {
fn method(self: Pin<&mut Self>) {
// 何かを行う
}
fn call_method_twice(mut self: Pin<&mut Self>) {
// `method` は `self` を消費するため、`as_mut` を介して `Pin<&mut Self>` を再借用する。
self.as_mut().method();
self.as_mut().method();
}
}
}
ピン留めされた指し先に他の方法でアクセスする必要がある場合は、Pin::into_inner_unchecked を介して行えます。ただし、これは unsafe であり、Pin の安全性要件が尊重されることを保証するために、非常に 注意しなければなりません。
pinning の仕組み
Pin は、ポインターのための単純なラッパー構造体(別名 newtype)です。ジェネリックパラメータに Deref 境界を要求することで、有用なことを行うにはポインターに対してのみ機能するよう強制されていますが、これは安全性を保つためというより、意図を表現するためのものにすぎません。ほとんどの newtype ラッパーと同様に、Pin は実行時の効果のためではなく、コンパイル時に不変条件を表現するために存在します。実際、ほとんどの状況では、Pin と pinning の仕組みはコンパイル中に完全に消えます。
正確には、Pin が表現する不変条件は、単なる移動可能性ではなく有効性に関するものです。また、これはポインターがピン留めされた後にのみ適用される有効性の不変条件でもあります。それ以前には Pin は効果を持たず、何かがピン留めされる前に何が起こるかについて何の要件も課しません。ポインターがいったんピン留めされると、Pin は、その指し先のオブジェクトが、オブジェクトのデストラクタが呼び出されるまで、メモリ内の同じアドレスで有効なままであることを要求します(そして安全なコードでは保証します)。
不変ポインター(たとえば借用参照)の場合、Pin は効果を持ちません。指し先は変更も置換もできないため、無効化される危険がないからです。
変更を許すポインター(たとえば Box や &mut)の場合、そのポインターへ直接アクセスできること、または指し先への可変参照(&mut)にアクセスできることにより、指し先の変更や移動が可能になる場合があります。Pin は、ポインターまたは可変参照への直接アクセスを得るための(unsafe でない)方法を一切提供しません。ポインターがその指し先への可変参照を提供する通常の方法は DerefMut を実装することですが、Pin は指し先が Unpin の場合にのみ DerefMut を実装します。
この実装は非常に単純です!まとめると、Pin はポインターを包むラッパー構造体であり、指し先への不変アクセスのみを提供します(指し先が Unpin の場合は可変アクセスも提供します)。それ以外は詳細です(そして unsafe コードのための微妙な不変条件です)。利便性のため、Pin は Pin 型間で変換するための機能などを提供します(ポインターが Pin から抜け出せないため常に安全です)。
Pin は、ピン留めされたポインターを作成したり、基盤となるデータへアクセスしたりするための unsafe 関数も提供します。すべての unsafe 関数と同様に、安全性の不変条件を維持する責任はコンパイラではなくプログラマーにあります。残念ながら、pinning の安全性の不変条件はいくぶん散在しています。つまり、それらは異なる場所で強制されており、グローバルで統一された形で説明するのが困難です。ここでは詳細には説明せずドキュメントを参照してもらいますが、要約を試みます(詳細な概要についてはモジュールドキュメントを参照してください):
- 新しいピン留めされたポインター
new_uncheckedを作成すること。プログラマーは、参照先がピン留めされていること(つまり、ピン留め不変条件に従っていること)を保証しなければなりません。この要件は、ポインター型だけで満たされる場合もあれば(例:Boxの場合)、参照先の型の関与が必要な場合もあります(例:&mutの場合)。これには次が含まれます(ただし、これらに限定されません)。DerefおよびDerefMutでselfからムーブしないこと。Dropを適切に実装すること。drop 保証を参照してください。- ピン留め保証が必要な場合は、
PhantomPinnedを使用してUnpinからオプトアウトすること。 - 参照先は
#[repr(packed)]であってはなりません。
- ピン留めされた値
into_inner_unchecked、get_unchecked_mut、map_unchecked、およびmap_unchecked_mutにアクセスすること。データがアクセスされた瞬間から、そのデストラクタが実行されるまで、ピン留め保証(データをムーブしないことを含む)を強制する責任はプログラマーにあります(この責任範囲は unsafe 呼び出しの範囲を超えて広がり、基になるデータに何が起きても適用されることに注意してください)。 - ピン留めされた型からデータをムーブする他の手段を一切提供しないこと(そのような手段には unsafe な実装が必要になります)。
ピン留めポインター型
前述のとおり、Pin はポインター型をラップします。Pin<Box<T>>、Pin<&T>、Pin<&mut T> はよく見かけます。技術的には、ピン留めポインター型に対する唯一の要件は Deref を実装していることです。しかし、他の任意のポインター型について Pin<Ptr> を作成する方法は、unsafe コードを使用する場合(new_unchecked 経由)を除いてありません。そうする場合、ピン留め契約を保証するためにポインター型に対する要件があります。
- ポインターの
DerefおよびDerefMutの実装は、その参照先からムーブしてはなりません。 Pinが作成された後は、Pinがドロップされた後であっても、いかなる時点でも参照先への&mut参照を取得できてはなりません(これが、&mut TからPin<&mut T>を安全に構築できない理由です)。これは複数の手順を経由しても、参照を経由しても真であり続けなければなりません(これによりRcやArcの使用が妨げられます)。- ポインターの
Dropの実装は、その参照先をムーブしてはならず、また他の方法で無効化してもなりません。
詳細については、new_unchecked のドキュメントを参照してください。
ピン留めと Drop
ピン留め契約は、ピン留めされたオブジェクトがドロップされるまで適用されます(技術的には、これは drop メソッドが呼び出された時点ではなく、戻った時点を意味します)。通常、オブジェクトが破棄されると drop は自動的に呼び出されるため、これはかなり単純です。オブジェクトのライフサイクルに関して手動で何かを行っている場合は、もう少し考慮が必要になるかもしれません。ピン留めされている(またはピン留めされている可能性がある)オブジェクトがあり、そのオブジェクトが Unpin ではない場合、そのオブジェクトのメモリまたはアドレスを解放または再利用する前に、その drop メソッドを(drop_in_place を使用して)呼び出さなければなりません。詳細については、std ドキュメントを参照してください。
アドレスに敏感な型(つまり !Unpin である型)を実装している場合は、Drop の実装に特別な注意を払わなければなりません。drop における self 型は &mut Self ですが、その self 型を Pin<&mut Self> として扱わなければなりません。言い換えると、drop 関数が戻るまでオブジェクトが有効なままであることを保証しなければなりません。これをソースコードで明示する方法の 1 つは、次のイディオムに従うことです。
#![allow(unused)]
fn main() {
impl Drop for Type {
fn drop(&mut self) {
// この値はドロップ後に二度と使われないことが分かっているため、
// `new_unchecked` は問題ない。
inner_drop(unsafe { Pin::new_unchecked(self)});
fn inner_drop(this: Pin<&mut Self>) {
// 実際の drop コードはここに記述する。
}
}
}
}
有効性要件は、実装される型に依存することに注意してください。特にオブジェクトの破棄に関して、これらの要件を正確に定義することが推奨されます。複数のオブジェクトが関与する可能性がある場合(例: 侵入型リンクリスト)は特にそうです。ここで正しさを保証することは、興味深いものになるでしょう!
メソッドにおけるピン留めされた self
ピン留めされた型でメソッドを呼び出すと、それらのメソッドにおける self 型について考えることになります。メソッドが self を変更する必要がない場合、Pin<...> は借用参照へ逆参照できるため、引き続き &self を使用できます。しかし、self を変更する必要があり(かつ型が Unpin ではない)場合は、&mut self と self: Pin<&mut Self> のどちらかを選択する必要があります(ピン留めポインターは後者の型へ暗黙的に型強制できませんが、Pin::as_mut を使用して簡単に変換できます)。
&mut self を使用すると実装は容易になりますが、そのメソッドはピン留めされたオブジェクトでは呼び出せないことになります。self: Pin<&mut Self> を使用すると、ピン射影(次のセクションを参照)を考慮する必要があり、ピン留めされたオブジェクトでしか呼び出せません。これは少し分かりにくいものの、ピン留めが段階的な概念であることを思い出すと直感的に理解できます。オブジェクトはピン留めされていない状態で始まり、ある時点で段階の変化を経てピン留めされた状態になります。&mut self メソッドは最初の(ピン留めされていない)段階で呼び出せるものであり、self: Pin<&mut Self> メソッドは 2 番目の(ピン留めされた)段階で呼び出せるものです。
drop は(どちらの段階で呼び出される可能性があるにもかかわらず)&mut self を受け取ることに注意してください。これは言語の制限と後方互換性への要望によるものです。これにはコンパイラでの特別な扱いが必要であり、安全性要件を伴います。
ピン留めされたフィールド、構造的ピン留め、ピン射影
オブジェクトがピン留めされている場合、そのフィールドの「ピン留めされている」性質について何が分かるでしょうか? 答えはデータ型の実装者が行った選択に依存し、普遍的な答えはありません(実際、同じオブジェクトの異なるフィールドで異なる場合があります)。
オブジェクトのピン留めされている性質がフィールドへ伝播する場合、そのフィールドは「構造的ピン留め」を示す、またはピン留めがそのフィールドに射影される、と言います。この場合、射影メソッド fn get_field(self: Pin<&mut Self>) -> Pin<&mut Field> があるべきです。フィールドが構造的にピン留めされていない場合、射影メソッドはシグネチャ fn get_field(self: Pin<&mut Self>) -> &mut Field を持つべきです。どちらのメソッドを実装する場合(または類似のコードを実装する場合)も unsafe コードが必要であり、どちらの選択にも安全性への影響があります。ピンの伝播は一貫していなければならず、フィールドは常に構造的にピン留めされているか、そうでないかのいずれかでなければなりません。フィールドがある時点では構造的にピン留めされ、別の時点ではそうでないというのは、ほとんど常に非健全です。
ピン留めは、そのフィールドが集約データ型のアドレスに敏感な部分である場合、そのフィールドへ射影されるべきです。つまり、ピン留めされている集約が、ピン留めされているフィールドに依存している場合、ピン留めはそのフィールドへ射影されなければなりません。たとえば、集約の別の部分からそのフィールドへの参照がある場合や、フィールド内に自己参照がある場合、ピン留めはそのフィールドへ射影されなければなりません。一方、ジェネリックなコレクションでは、コレクションはその内容の振る舞いに依存しないため、ピン留めをその内容へ射影する必要はありません(これは、コレクションが含んでいるジェネリックな要素の実装に依存できないため、コレクション自体もその要素のアドレスに依存できないからです)。
unsafe コードを書く場合、ピン留めの保証が適用されると仮定できるのは、構造的にピン留めされたオブジェクトのフィールドに対してのみです。一方で、構造的にピン留めされていないフィールドは安全に移動可能として扱うことができ、それらに対するピン留め要件を気にする必要はありません。特に、あるフィールドが Unpin でなくても、そのフィールドが常に構造的にピン留めされていないものとして扱われる限り、構造体は Unpin になれます。
フィールドが構造的にピン留めされている場合、集約構造体に対するピン留め要件はそのフィールドにも及びます。集約がピン留めされている間は、いかなる状況でもコードがそのフィールドの内容を移動することはできません(これには常に unsafe コードが必要になります)。構造的にピン留めされたフィールドは、移動される前(割り当て解除を含む)に、パニックの場合であっても drop されなければなりません。つまり、集約の Drop 実装内では注意が必要です。さらに、集約構造体は、その構造的にピン留めされたフィールドがすべて Unpin でない限り、Unpin になることはできません。
ピン射影のためのマクロ
ピン射影を助けるために利用できるマクロがあります。
pin-project クレートは、#[pin_project] 属性マクロ(および #[pin] ヘルパー属性)を提供します。これは、アノテーションされた型のピン留め版を作成し、その型上の project メソッドを使ってアクセスできるようにすることで、安全なピン射影を実装してくれます。
Pin-project-lite は、宣言的マクロ(pin_project!)を使う代替手段で、pin-project と非常によく似た方法で動作します。Pin-project-lite は、手続き型マクロではないため、手続き型マクロを実装するための依存関係をプロジェクトに追加しないという意味で軽量です。ただし、pin-project より表現力が低く、カスタムエラーメッセージも提供しません。手続き型マクロの依存関係を追加したくない場合は Pin-project-lite が推奨され、それ以外の場合は pin-project が推奨されます。
Pin-utils は、ピン射影の実装を助ける unsafe_pinned マクロを提供しますが、クレート全体が上記のクレートおよび現在 std にある機能を優先して非推奨になっています。
ピン留めされたポインターへの代入
一般に、ピン留めされたポインターへ代入することは安全です。これは通常の方法(*p = ...)ではできませんが、Pin::set を使えば可能です。より一般には、unsafe コードを使ってポインティーのフィールドへ代入できます。
Pin::set を使うことは常に安全です。これは、以前にピン留めされていたポインティーが drop されてピン留め要件を満たし、新しいポインティーはピン留めされた場所への移動が完了するまでピン留めされないためです。個々のフィールドへ代入することは、自動的にピン留め要件に違反するわけではありませんが、オブジェクト全体として有効であり続けることを保証するよう注意が必要です。たとえば、あるフィールドへ代入された場合、そのフィールドを参照している他のフィールドは、新しいオブジェクトでも引き続き有効でなければなりません(これはピン留め要件の一部ではありませんが、そのオブジェクトの他の不変条件の一部である可能性があります)。
あるピン留めされたオブジェクトを別のピン留めされた場所へコピーすることは、unsafe コードでのみ可能です。安全性がどのように維持されるかは、個々のオブジェクトに依存します。ピン留め要件に対する一般的な違反はありません。置き換えられるオブジェクトは移動しておらず、コピーされるオブジェクトも移動していないためです。ただし、置き換えられるオブジェクトの有効性には、通常はピン留めによって保護される安全性要件があるかもしれませんが、この場合はプログラマーが確立しなければなりません。たとえば、a と b という 2 つのフィールドを持つ構造体があり、b が a を参照している場合、その参照が有効であり続けるにはピン留めが必要です。そのような構造体が別の場所へコピーされる場合、b の値は古い a ではなく新しい a を指すように更新されなければなりません。
ピン留めと非同期プログラミング
うまくいけば、async Rust でやりたいことはすべて実現でき、ピン留めを気にする必要はまったくないでしょう。ときには、ピン留めを使う必要がある特殊なケースに遭遇することもあります。また、future、ランタイム、または同様のものを実装したい場合は、ピン留めについて知る必要があります。このセクションでは、その理由を説明します。
async 関数は future として実装されます(セクション TODO を参照してください。これは要約的な概要であり、他の場所でより深く、例を交えて説明するようにしてください)。各 await ポイントで関数の実行が一時停止される可能性があり、その間、生存している変数の値を保存しておく必要があります。それらは本質的に構造体(enum の一部)のフィールドになります。そのような変数は、future 内に保存されている他の変数を参照することがあります。たとえば、次を考えてみてください。
#![allow(unused)]
fn main() {
async fn foo() {
let a = ...;
let b = &a;
bar().await;
// b を使う
}
}
ここで生成される future オブジェクトは、おおよそ次のようになります。
#![allow(unused)]
fn main() {
struct Foo {
a: A,
b: &'self A, // 不変条件 `self.b == &self.a`
}
}
(少し単純化しており、実行状態などは無視していますが、重要なのは変数/フィールドです)。
これは直感的には理解できますが、残念ながら Rust には 'self は存在しません。そして、それには十分な理由があります。Rust のオブジェクトは移動できることを思い出してください。そのため、次のようなコードは非健全になります。
#![allow(unused)]
fn main() {
let f1 = Foo { ... }; // f1.b == &f1.a
let f2 = f1; // f2.b == &f1.a だが、f1 は f2 へ移動したためもはや存在しない
}
これは単にライフタイムに名前を付けられないという問題ではないことに注意してください。たとえ raw ポインターを使ったとしても、そのようなコードは依然として不正です。
しかし、一度作成されたら Foo のインスタンスは決して移動しないことが分かっているなら、すべてはそのまま機能します。(コンパイラーはそのような場合のために内部的に 'self に似た概念を持っていますが、プログラマーとしては raw ポインターと unsafe コードを使う必要があります)。この移動しないという概念こそが、ピン留めが表すものです。
この要件は Future::poll のシグネチャに現れており、self(future)の型は Pin<&mut Self> です。たいていの場合、async/await を使っているときは、コンパイラーがピン留めとピン留め解除を処理してくれるため、プログラマーがそれを気にする必要はありません。
手動のピン留め
ピン留めが async/await の抽象化を通して漏れ出してくる箇所がいくつかあります。その根本には、Future::poll と Stream::poll_next のシグネチャにある Pin があります。Future や Stream を直接使う場合(async/await を通してではなく)、うまく動作させるためにピン留めについて考慮する必要があるかもしれません。ピン留めされた型が必要になる一般的な理由には、次のようなものがあります。
- Future や Stream をポーリングする場合 - アプリケーションコード内で行う場合でも、自分自身の Future を実装する場合でも。
- ボックス化された Future を使う場合。ボックス化された Future(または Stream)を使っていて、そのため async 関数を使うのではなく Future 型を書き出している場合、それらの型の中で多くの
Pin<...>を目にする可能性が高く、Future を作成するためにBox::pinを使う必要があります。 - Future を実装する場合 -
pollの中ではselfがピン留めされているため、selfのフィールドへの可変アクセスを得るには、ピン射影や unsafe コードを扱う必要があります。 - Future や Stream を組み合わせる場合。これはほとんどの場合そのまま動作しますが、Future への参照を取得してからそれをポーリングする必要がある場合(たとえば、ループの外で Future を定義し、ループの中の
select!でそれを使う場合)、その参照を Future のように使うために、Future への参照をピン留めする必要があります。 - Stream を扱う場合 - 現在 Rust では、Stream まわりの抽象化は Future ほど多くないため、Future を扱う場合よりも、コンビネータメソッド(技術的にはピン留めを必要としませんが、参照や Future/Stream の作成に関する問題がより表面化しやすくなるようです)を使ったり、場合によっては手動で
pollしたりする可能性が高くなります。
代替案と拡張
このセクションは、ピン留めに関する言語設計に興味がある人向けです。async プログラムを読み、理解し、書きたいだけであれば、このセクションを読む必要はまったくありません。
ピン留めは理解が難しく、少しぎこちなく感じられることがあるため、より良い代替案やバリエーションがあるのではないかと考える人はよくいます。ここではいくつかの代替案を取り上げ、それらがなぜ機能しないのか、あるいは予想以上に複雑なのかを示します。
ただしその前に、ピン留めの歴史的背景を理解することが重要です。まったく新しい言語を設計していて、async/await、自己参照、不動型をサポートしたいのであれば、Rust のピン留めよりも確実に良い方法があります。しかし、async/await、Future、ピン留めは Rust の 1.0 リリース後に追加され、強力な後方互換性保証の文脈で設計されました。その厳しい要件に加えて、この機能を合理的な期間で設計・実装したいという要件もありました。いくつかの解決策(たとえば線形型を含むもの)は、Rust プロジェクトのリソースと制約を考慮すると、現実的には数十年単位で測られるような基礎研究、設計、実装を必要としたでしょう。
代替案
まず、Rust の型をデフォルトで移動不能にする解決策の分類について考えてみましょう。これは Rust の基本的なセマンティクスに対する重大な変更であることに注意してください。この分類に属するどの解決策も、後方互換性を実現するには相当な労力が必要になる可能性が高いでしょう(特定の解決策についてそれが可能かどうかを推測するつもりはありませんが、auto trait、derive 属性、エディション、移行ツールなどの技法を使えば、もしかすると可能かもしれません)。
1 つの提案(実際には、セマンティクスを定義する方法がいろいろあるため提案群)は、Move マーカートレイト(Copy に似たもの)を用意して、オブジェクトを移動可能としてマークし、それ以外のすべての型を移動不能にするというものです。Pin とは対照的に、これはポインタではなく値の性質であるため、その影響ははるかに広範囲に及びます。たとえば、b が Move を実装していなければ let a = b; はエラーになります。
このアプローチの根本的な問題は、現在のピン留めが段階的な概念(ある場所は最初はピン留めされておらず、その後ピン留めされる)であるのに対し、型は値のライフタイム全体に適用されることです。(ピン留めは値ではなく場所の性質として理解するのが最も適切でもありますが、型は値に適用されます。これがトレイトベースのアプローチにとって根本的な問題なのかどうかは、私にはわかりません)。これは次の 2 つのブログ記事で検討されています: Two Ways Not to Move と Ergonomic Self-Referential Types for Rust。
さらに、どのような Move トレイトも後方互換性の問題を抱える可能性が高く、「感染性のある境界」(つまり、非常に多くの場所で Move または !Move が必要になること)につながります。
別の提案は、C++ に似たムーブコンストラクタをサポートするというものです。しかし、これはオブジェクトは常にビット単位で移動できるという Rust の基本的な不変条件を破ります。その結果、Rust ははるかに予測しにくくなり、Rust プログラムの理解とデバッグがより困難になります。これは最悪の種類の後方非互換な変更です。なぜなら、コードの作者が前提としていた可能性のある基本的な仮定を変更するため、unsafe コードを静かに壊してしまうからです。さらに、そのような根本的な変更に必要な設計と実装の労力は莫大なものになるでしょう。こうした実務上の問題に加えて、それが実際に機能するかどうかも不明です。ムーブコンストラクタは、移動されるオブジェクト内の参照を修正するために使えるかもしれませんが、オブジェクトの外部から、移動されるオブジェクトへの参照が存在する場合、それらは修正できない可能性があります。
別の種類の潜在的な解決策として、オフセット参照という考え方があります。これは絶対的な参照ではなく相対的な参照です。つまり、別のフィールドへのオフセット参照であるフィールドは、オブジェクトがメモリ内で移動されたとしても、常に同じオブジェクト内を指すことになります。オフセットポインタの問題は、フィールドはオフセットポインタか絶対ポインタのどちらかでなければならないことです。しかし、async 関数内の参照はフィールドになり、そのフィールドはときには Future オブジェクト内部のメモリを参照し、ときにはその外部のメモリを参照します。
拡張
ピン留めをより強力にしたり、扱いやすくしたりするための提案は複数あります。これらの多くは、ピン留めを純粋なライブラリ上の概念ではなく、さまざまな形で言語のより第一級の一部にするための提案です(言語だけでなく std への拡張を含むこともよくあります)。ここでは、より発展したアイデアをいくつか取り上げます。それらは互いに関連しており、いずれも、特に構造的ピン留めと drop の周辺で、ピン留めされた場所の作成と使用を容易にすることで、ピン留めのエルゴノミクスを改善するという一般的な目標を持っています。
Pinned places は、ピン留めは値や型ではなく場所の性質であるという考え方を推し進め、mut に似た pin/pinned 修飾子を参照に追加します。これは再借用やメソッド解決と統合され、ピン留めされた self を持つメソッド呼び出しのエルゴノミクスを改善します。
UnpinCell は、フィールドのネイティブなピン射影をサポートするために、pinned places のアイデアを拡張します。MinPin は、ネイティブなピン射影とより良い drop サポートのための、より最小限の(かつ後方互換性のある)提案です。
Overwrite トレイトは、オブジェクトの一部を変更する権限(foo.f = ...)と、オブジェクト全体を上書きする権限(*foo = ...)の区別を明示する提案中のトレイトです。現在は、どちらもすべての可変参照で許可されています。この提案には、イミュータブルなフィールドも含まれます。Overwrite は Unpin のある種の置き換えであり、(pinned places のアイデアの一部と組み合わせることで)pinning を扱う作業を改善できる可能性があります。残念ながら、後方互換性を保ったまま採用することは可能ですが、その移行は他の拡張よりもはるかに多くの作業を伴います。
参考資料
- std ドキュメント
Pinなどの振る舞いと保証に関する信頼できる情報源です。優れたドキュメントです。 - RFC 2349 pinning を提案した RFC です。安定化された API はここで提案されたものとは少し異なりますが、RFC には中核となる概念とその根拠についての優れた説明があります。
- pinning を説明しているブログ記事やその他のリソース:
- WithoutBoats(pinning の主要な設計者)による Pin。pinning の歴史、背景、根拠、そしてなぜそれが難しい概念なのかについて説明しています。
- std::pin::Pin はなぜこんなに奇妙なのか? pinning 設計の根拠と、実践で pinning を使うことについての詳細な解説です。
- Pin、Unpin、そして Rust にそれらが必要な理由
- async/await の pinning セクション
- Pin と苦しみ async コードと pinning を理解するための、多数の例を含む非常に会話的なスタイルの詳細なブログ記事です。
- Jon Gjengset による書籍 Rust for Rustaceans には、async/await の実装に pinning が必要な理由と、pinning がどのように機能するかについての優れた説明があります。
-
ピン留めは、async Rust の実装のために特別に設計された低レベルの構成要素であることに注目する価値があります。async Rust に直接結び付いているわけではなく、他の目的にも使用できますが、汎用的な仕組みとして設計されたものではなく、特に自己参照フィールドに対するそのまま使える解決策ではありません。async コード以外の用途でピン留めを使う場合は、一般的に、厚い抽象化の層で包まれている場合にのみうまく機能します。というのも、多くの細かく扱いにくく、推論が難しい unsafe コードが必要になるからです。 ↩
-
ここではソースコードと実行時を少し混同しています。完全に明確にすると、変数は実行時には存在しません。(コンパイルされた)スニペットは複数回実行されるかもしれません(たとえば、ループ内にある場合や、複数回呼び出される関数内にある場合)。各実行ごとに、ソースコード内の変数は実行時には異なるアドレスで表されます。 ↩
-
永続性はピン留めの根本的な側面ではなく、Rust におけるピン留めの位置づけと、それを取り巻く安全性保証の一部です。ピン留めを一時的にしても安全に表現でき、ピン留め保証の利用者がその時間的スコープを信頼できるのであれば、一時的なピン留めでも問題ありません。しかし、それは現在の Rust でも、現実的な拡張でも不可能です。 ↩
-
Box(または他の std ポインター)は、pinning の実装でもコンパイラでも特別扱いされていません。BoxはPinの API に含まれる unsafe 関数を使用してBox::pinを実装しています。Pinの安全性要件は、Boxの安全性保証によって満たされます。 ↩ -
これは厳密には、非 async 関数においてのみスタックへのピン留めです。async 関数では、すべてのローカル変数が async 疑似スタックに割り当てられるため、ピン留めされる場所は、その async 関数の基盤となる Future の一部としてヒープ上に格納される可能性が高いです。 ↩
構造化並行性
著者メモ (TODO): この章の一部については、もっと早い段階、特に設計原則として本の中で議論したほうがよいかもしれません(最初の導入は guide/intro にあります)。ただし、このトピックをよりよく理解し、何かを書き留めておくために、まずは独立した章として始めています。また、まだ少し粗い状態です。
(注: 最初のいくつかのセクションでは、構造化並行性という抽象的な概念について述べており、Rust や async プログラミングに固有のものではありません(参照: スレッドを使った同期的な並行プログラミング)。ここでは「タスク」を、任意のスレッド、async タスク、またはその他の類似する並行性プリミティブを意味するものとして使います)。
構造化並行性は、並行プログラムを設計するための考え方です。プログラムが構造化並行性の原則に完全に従うには、特定の言語機能やライブラリが必要ですが、そのような機能がなくても、この考え方に従うことで多くの利点を得られます。構造化並行性は、言語や並行性プリミティブ(スレッドか async か、など)から独立しています。多くの人が、async Rust でプログラミングする際に構造化並行性の考え方が有用であると感じています。
構造化並行性の本質的な考え方は、タスクがツリーとして構成されるということです。子タスクは親タスクの後に開始され、常に親タスクより前に終了します。これにより、結果とエラーを常に親タスクへ返せるようになり、親タスクのキャンセルが常に子タスクへ伝播されることが求められます。主に、時間的スコープはレキシカルスコープに従います。つまり、タスクは、それが作成された関数やブロックより長く生存すべきではありません。ただし、より長く生存するタスクがプログラム内で何らかの形で具象化されている限り、これは構造化並行性の要件ではありません(典型的には、親タスク内における子タスクの時間的スコープを表すオブジェクトを使います)。
TODO 図
構造化並行性という名前は、任意のジャンプ(goto)ではなく、関数やループなどを使って制御フローを構造化すべきだという考え方である構造化プログラミングとの類比によるものです。
構造化並行性について考える前に、一般的な並行設計がどのような意味で非構造化されているのかを振り返ると役に立ちます。典型的なパターンは、何らかの spawn 文を使ってタスクを開始するというものです。そのタスクはその後、システム内の他のタスク(それを spawn したタスクを含む)と並行して完了まで実行されます。どのタスクが先に終了するかについて制約はありません。プログラムは本質的に、独立して生存し、いつでも終了し得るタスクの単なる集まりです。タスク間の通信や同期はすべて場当たり的であり、プログラマーは他のどのタスクがまだ実行中であるとも仮定できません。
非構造化並行性の実用上の欠点は、タスクから結果を返すことが、その発生時期や方法に関する言語レベルの保証なしに、言語外的な方法で行われなければならない点です。言語のエラー処理機構は、非構造化並行性の制約のない制御フローには適用できないため、エラーが捕捉されない可能性があります。また、タスクの相対的な状態についても保証がありません。どのタスクも、他のタスクの状態とは無関係に、実行中であったり、正常終了していたり、エラーで終了していたり、外部からキャンセルされていたりする可能性があります1。これらすべてが、並行プログラムを理解し保守することを難しくしています。この構造の欠如は、並行プログラミングが逐次プログラミングより本質的に難しいと考えられる理由の 1 つです。
構造化並行性は、プログラムに制約を課すプログラミング規律であることに注意する価値があります。関数やループが goto より柔軟性に欠けるのと同じように、構造化並行性は単にタスクを spawn するより柔軟性に欠けます。しかし、構造化プログラミングと同様に、構造化並行性が柔軟性の面でもたらすコストは、予測可能性の向上によって補って余りあります。
構造化並行性の原則
構造化並行性の重要な考え方は、すべてのタスク(またはスレッドなど)がツリーとして組織化されるということです。すなわち、各タスク(ルートである main タスクを除く)は単一の親を持ち、親関係に循環はありません。子タスクはその親によって開始され2、親より前に必ず実行を終えなければなりません。兄弟間には制約はありません。タスクの親は変更できません。
構造化並行性を実装するプログラムについて推論する際の重要な新しい事実は、あるタスクが生存しているなら、そのすべての祖先タスクも生存していなければならないということです。これは、それらが良好な状態にあることを保証するものではありません。シャットダウン中であったり、エラー処理中であったりする可能性がありますが、何らかの形で実行中でなければなりません。これは、どのタスク(ルートタスクを除く)についても、結果やエラーを送る先となる生存中のタスクが常に存在することを意味します。実際、理想的なアプローチは、言語のエラー処理を拡張し、エラーが常に親タスクへ伝播されるようにすることです。Rust では、これは Result::Err を返すことと panic の両方に適用されるべきです。
さらに、子タスクのライフタイムは親タスク内で表現できます。一般的なケースでは、タスクのライフタイム(その時間的スコープ)は、それが開始されたレキシカルスコープに結び付けられます。たとえば、関数内で開始されたすべてのタスクは、その関数が return する前に完了すべきです。これは非常に強力な推論の道具です。もちろん、これはすべての場合には制約が強すぎるため、プログラム内のオブジェクト(しばしば「スコープ」または「nursery」と呼ばれます)を使うことで、タスクの時間的スコープをレキシカルスコープの外へ延ばせます。そのようなオブジェクトは渡したり格納したりできるため、任意のライフタイムを持つことができます。それでもなお、重要な推論の道具があります。そのオブジェクトに結び付けられたタスクは、それより長く生存できません(Rust では、この性質によってタスクをライフタイムシステムと統合できます)。
上記は、構造化並行性の別の利点につながります。複数のタスクにまたがるリソース管理について推論できるようになることです。クリーンアップコードは、リソースがもはや使われなくなるときに呼び出されます(たとえば、ファイルハンドルを閉じる場合)。逐次コードでは、いつクリーンアップコードを呼び出すかという問題は、オブジェクトがスコープを抜けるときにデストラクタが呼び出されることを保証することで解決されます。しかし、並行コードでは、オブジェクトが別のタスクによってまだ使われている可能性があるため、いつクリーンアップすべきかは明確ではありません(参照カウントやガベージコレクションは多くの場合の解決策ですが、オブジェクトのライフタイムについての推論を難しくし、エラーにつながる可能性があり、さらにランタイムオーバーヘッドも伴います)。
親タスクがその子より長く生存するという原則は、キャンセルに対して重要な含意を持ちます。あるタスクがキャンセルされた場合、そのすべての子タスクもキャンセルされなければならず、それらのキャンセルは親のキャンセルが完了する前に完了しなければなりません。これはさらに、構造化並行システムにおいてキャンセルをどのように実装できるかに影響します。 タスクがエラーによって早期に完了する場合(Rust では、これは早期 return だけでなく panic を意味することもあります)、そのタスクは return する前に、すべての子タスクが完了するのを待たなければなりません。実際には、早期 return は子タスクのキャンセルをトリガーしなければなりません。これは Rust における panic と類似しています。panic は、スタックを巻き戻す前に現在のスコープ内のデストラクタをトリガーし、プログラムが終了するか panic が捕捉されるまで、各スコープでデストラクタを呼び出します。構造化並行性の下では、早期 return は子タスクのキャンセル(したがって、それらのタスク内のオブジェクトのクリーンアップ)をトリガーし、タスクのツリーを下ってすべての(推移的な)子をキャンセルしなければなりません。
一部の設計は構造化並行性の下で非常に自然に機能します(たとえば、完了すべき単一のジョブを持つワーカータスク)。一方で、あまりうまく適合しない設計もあります。一般に、これらのパターンは、特定のタスクに結び付けられていないこと自体が機能であるものです。たとえば、ワーカープールやバックグラウンドスレッドです。これらのパターンを使用する場合でも、通常、タスクはプログラム全体より長く存続すべきではないため、常に親になれるタスクが 1 つ存在します。
構造化並行性の実装
構造化並行性の代表的な実装は、Python の Trio ライブラリです。Trio は、構造化並行性の概念を中心に設計された、async プログラミングと IO のための汎用ライブラリです。Trio プログラムは async with 構文を使用して、タスクを生成するための字句スコープを定義します。生成されたタスクは nursery オブジェクト(Rust の Scope にいくらか似ています)に関連付けられます。タスクのライフタイムは、その nursery の動的な時間的スコープに結び付けられ、一般的なケースでは async with ブロックの字句スコープに結び付けられます。これにより、タスク間の親子関係、したがって構造化並行性のツリー不変条件が強制されます。
エラーハンドリングには Python の例外が使用され、それらは自動的に親タスクへ伝播されます。
部分的な構造化並行性
多くのプログラミング技法と同様に、構造化並行性の完全な利点は、それ「のみ」を使用することで得られます。すべての並行性が構造化されていれば、プログラム全体の振る舞いについて推論することがはるかに容易になります。しかし、それには言語に対する要件があり、それらを満たすのは容易ではありません。たとえば Rust では、非構造化並行性を行うことは十分に簡単です。しかし、構造化並行性の原則を選択的に適用するだけでも、あるいは構造化並行性の観点で考えるだけでも有用です。
構造化並行性を設計規律として使用できます。プログラムを設計するときは、常にタスク間の親子関係を考慮し、文書化し、子タスクがその親より前に終了することを保証します。これは通常の実行ではたいていかなり容易ですが、キャンセルや panic に直面すると困難になることがあります。
構造化並行性のもう 1 つの要素で、かなり容易に取り入れられるものは、常にエラーを親タスクへ伝播することです。通常のエラーハンドリングと同様に、最善策はエラーを無視することかもしれませんが、その場合でも親タスクのコード内で明示的であるべきです。
構造化並行性から学べるもう 1 つのプログラミング規律は、親タスクをキャンセルする場合に、すべての子タスクをキャンセルすることです。これにより、構造化並行性の保証がはるかに信頼できるものになり、キャンセル全般についての推論が容易になります。
async Rust による実践的な構造化並行性
Rust における並行性(async であれスレッドを使用するものであれ)は、本質的に非構造化です。タスクは任意に生成でき、他のタスクで発生したエラーや panic は無視でき、キャンセルは通常即時であり、他のタスクへ伝播しません(これらの問題を容易に解決できない理由については後述します)。しかし、プログラム内で構造化並行性の利点の一部を得る方法はいくつかあります。
- 構造化並行性に従って、高いレベルでプログラムを設計する。
- 可能な場合は構造化並行性のイディオムに従う(そして非構造化のイディオムを避ける)。
- 構造化並行性をより使いやすく、信頼できるものにするために crate を使用する。
Rust で構造化並行性を使用する際の最も厄介な問題の 1 つは、キャンセルを子 Future/タスクへ伝播することです。Future を使用し、それらを並行に合成している場合、これは唐突ではあるものの自然に発生します(Future を drop すると、それが所有するすべての Future が drop され、それらがキャンセルされます)。しかし、タスクが drop されたとき、そのタスクが生成したタスクへシグナルを送る機会はありません(少なくとも Tokio ではありません3)。
これが意味するのは、「本物の」構造化並行性よりも弱い不変条件しか仮定できないということです。つまり、親タスクが常に生存していると仮定できるのではなく、その親がキャンセルされたか panic した場合を除き、常に生存しているとしか仮定できません。これは最適ではありませんが、それでもプログラミングを単純化できます。なぜなら、通常の実行では、何らかの結果を処理する親が存在しないケースを扱う必要が決してないからです。
TODO
- 所有権/ライフタイムは自然に sc へつながる
- リソースについての推論
async プログラムの設計に構造化並行性を適用する
プログラムの設計という観点では、構造化並行性を適用することにはいくつかの含意があります。
- プログラムの並行性をツリー構造で組織化する。つまり、親タスクと子タスクの観点で考える。
- 時間的スコープは、可能な場合は字句スコープに従うべきです。より具体的には、関数内で開始されたタスクが完了するまで、関数は return すべきではありません(早期 return や panic を含みます)。
- データは一般に子タスクから親タスクへ流れます。もちろん、一部のデータは親から子へ、あるいは他の方法で流れますが、主として、タスクは自分の作業結果を親タスクに渡し、さらに処理してもらいます。これにはエラーも含まれるため、親タスクはその子のエラーを処理すべきです。
ライブラリを書いていて構造化並行性を使用したい場合(または、そのライブラリを並行構造化されたプログラムで使用できるようにしたい場合)、ライブラリコンポーネントのカプセル化に時間的カプセル化が含まれていることが重要です。つまり、API 関数が return した後も実行され続けるタスクを開始しないということです。
Rust は構造化並行性の規則を強制できないため、プログラム(またはコンポーネント)がどのように構造化されているか、そしてどこで構造化並行性の規律に違反しているかを認識し、文書化することが重要です。
有用な妥協パターンの 1 つは、抽象化の最上位レベルでのみ、かつメインタスクの最も外側の関数から生成されるタスクに対してのみ、非構造化並行性を許可することです(理想的には main 関数からのみですが、プログラムにはセットアップや設定コードが含まれることが多いため、プログラムの論理的な「トップレベル」は実際には数関数分深い位置にあることがあります)。このようなパターンでは、通常 main から多数のタスクが生成され、それぞれが異なる責務を持ち、互いのやり取りは限定的です。これらのタスクは再起動されたり、任意の他のタスクによって新しいタスクが開始されたり、クライアントなどに紐づいた限定的なライフタイムを持ったりすることがあります。つまり、それらは並行・非構造化です。これら各タスクの内部では、構造化並行性が厳密に適用されます。
TODO なぜこれが有用なのか?
TODO ここにケーススタディがあると非常によい。
構造化されたイディオムと非構造化のイディオム
このサブセクションでは、並行性に対する構造化されたアプローチとうまく機能する雑多なイディオムと、並行性の構造化をより難しくするいくつかのイディオムを扱います。
構造化並行性に従う最も簡単な方法は、タスクと生成を使うのではなく、future と 並行合成を使うことです。並列性のためにタスクが必要な場合は、JoinHandle または JoinSet を使う必要があります。親タスクが panic したりキャンセルされたりした場合に、子タスクが適切にクリーンアップできるよう注意しなければなりません。子タスク内のエラーが適切に処理されるように、ハンドルのエラーを確認しなければなりません。
キャンセルの伝播がないことを回避する 1 つの方法は、子を持つ可能性のあるタスクを突然キャンセル(drop)しないようにすることです。代わりに、シグナル(例: キャンセルトークン)を使い、そのタスクが終了前に子をキャンセルできるようにします。残念ながら、これは select と互換性がありません。
プログラム(またはコンポーネント)をシャットダウンするには、コンポーネントを drop するのではなく、明示的なシャットダウンメソッドを使います。これにより、シャットダウン関数は子タスクの終了を待機したり、それらをキャンセルしたりできます(drop は async にできないためです)。
いくつかのイディオムは、構造化並行性とうまく噛み合いません。
- join handle 経由で完了を await せずにタスクを生成すること、またはそれらの join handle を drop すること。
- Select または race のマクロ/関数。これらは本質的に構造化されているわけではありませんが、future を突然キャンセルするため、非構造化なキャンセルのよくある原因になります。
- ワーカータスクまたはプール。async タスクでは、タスクの開始/シャットダウンのオーバーヘッドが非常に低いため、タスクのプールを使っても、「データ」のプール、たとえばコネクションプールを使う場合と比べて、得られる利点はほとんどない可能性があります。
- 明確な所有権構造を持たないデータ - これは必ずしも構造化並行性と矛盾するわけではありませんが、多くの場合、設計上の問題につながります。
構造化並行性のためのクレート
TODO
- クレート: moro, async-nursery
- futures-concurrency
関連トピック
このセクションは、async Rust で構造化並行性を使うために知っておく必要はありませんが、興味のある読者のために有用な背景情報として含めています。
スコープ付きスレッド
Rust のスレッドにおける構造化並行性はかなりうまく機能します。スコープなしのライフタイムでスレッドを生成することを防ぐことはできませんが、これは簡単に避けられます。代わりに、スコープ付きスレッドの使用に限定してください。その方法については scope 関数のドキュメントを参照してください。スコープ付きスレッドを使うと、子のライフタイムが制限され、panic が自動的に親スレッドへ伝播されます。ただし、親スレッドはエラーを処理するために子スレッドの結果を確認しなければなりません。Trio の nursery のように Scope オブジェクトを受け渡すことさえできます。通常、Rust のスレッドではキャンセルは問題になりませんが、スレッドキャンセルを利用する場合は、それをスコープ付きスレッドと手動で統合する必要があります。
Rust に固有の点として、スコープ付きスレッドでは子スレッドが親スレッドからデータを借用できます。これは並行・非構造化スレッドでは不可能なことです。これは非常に有用であり、構造化並行性と Rust の所有権スタイルのリソース管理がどれほどうまく連携できるかを示しています。
Async drop とスコープ付きタスク
Rust では、オブジェクトのライフタイムが終了したときにリソースがクリーンアップされることを保証するために、デストラクタ(drop)が使われます。future は単なるオブジェクトなので、そのデストラクタは子 future のキャンセルを保証するための自然な場所になります。しかし、async プログラムでは、クリーンアップ処理を非同期にしたいことが非常によくあります(そうしないと他のタスクをブロックする可能性があります)。残念ながら、Rust は現在、非同期デストラクタ(async drop)をサポートしていません。それをサポートするための作業は進行中ですが、いくつかの理由により困難です。その理由には、async デストラクタを持つオブジェクトが非 async コンテキストから drop される可能性があることや、drop の呼び出しは暗黙的であるため、明示的な await を書く場所がないことが含まれます。
スコープ付きスレッドが(一般的にも、構造化並行性にとっても)有用であることを踏まえると、もう 1 つのよい疑問は、async プログラミングに同様の構成要素(「スコープ付きタスク」)が存在しないのはなぜか、ということです。TODO これに答える
参考文献
興味があれば、さらに読むためのよいブログ記事をいくつか挙げます。
-
join ハンドルを使うことで、これらの欠点はいくらか軽減されますが、それは信頼できる保証のない場当たり的な仕組みです。構造化並行性の利点を完全に得るには、それらを常に使うことに加えて、キャンセルとエラーを適切に処理することについて細心の注意を払わなければなりません。これは、言語やライブラリのサポートなしには困難です。この点については、後でもう少し議論します。 ↩
-
これは実際には、構造化並行性における厳密な要件ではありません。タスクの時間的スコープをプログラム内で表現し、タスク間で渡せるなら、子タスクをあるタスクが開始し、別のタスクをその親にすることができます。 ↩
-
Tokio の
JoinHandleのセマンティクスは、ハンドルが drop された場合、その基盤となるタスクが「解放」される(比較: drop される)というものです。つまり、子タスクの結果は他のどのタスクによっても処理されません。 ↩
はじめに
Rust における非同期プログラミングへようこそ!非同期 Rust コードを書き始めたいと 考えているなら、ここはまさにうってつけの場所です。Web サーバー、データベース、 あるいはオペレーティングシステムを構築する場合でも、本書では Rust の非同期 プログラミングツールを使って、ハードウェアの性能を最大限に引き出す方法を 紹介します。
本書で扱う内容
本書は、Rust の async 言語機能とライブラリの使い方について、初心者にも 経験者にも適した、包括的で最新のガイドとなることを目指しています。
-
前半の章では、非同期プログラミング全般についての導入と、 Rust におけるその独自の考え方について説明します。
-
中盤の章では、非同期コードを書く際に使用できる主要なユーティリティと 制御フローツールについて説明し、パフォーマンスと再利用性を最大化するために ライブラリやアプリケーションを構成するベストプラクティスを紹介します。
-
本書の最後のセクションでは、より広範な async エコシステムを扱い、 一般的なタスクを達成する方法について、いくつかの例を示します。
前置きはこのくらいにして、Rust における非同期プログラミングの 魅力的な世界を探っていきましょう!
なぜ Async なのか?
私たちは皆、Rust によって高速で安全なソフトウェアを書けることを気に入っています。 しかし、非同期プログラミングはこのビジョンにどのように適合するのでしょうか?
非同期プログラミング、略して async は、ますます多くのプログラミング言語でサポートされている
_並行プログラミングモデル_です。
async/await 構文を通じて、通常の同期プログラミングの見た目や感覚の多くを保ちながら、
少数の OS スレッド上で多数の並行タスクを実行できます。
Async と他の並行性モデルの比較
並行プログラミングは、通常の逐次プログラミングほど成熟しておらず、「標準化」も進んでいません。その結果、言語がサポートしている並行プログラミングモデルによって、並行性の表現方法は異なります。 最も一般的な並行性モデルを簡単に概観すると、非同期プログラミングが並行プログラミングというより広い分野の中でどのように位置づけられるかを理解する助けになります。
- OS スレッドはプログラミングモデルに変更を必要としないため、 並行性を非常に簡単に表現できます。しかし、スレッド間の同期は難しい場合があり、 パフォーマンス上のオーバーヘッドも大きくなります。 スレッドプールはこれらのコストの一部を緩和できますが、大規模な IO バウンドのワークロードをサポートするには不十分です。
- イベント駆動プログラミングは、_コールバック_と組み合わせることで非常に高性能にできますが、 冗長で「非線形」な制御フローになりがちです。 データフローやエラー伝播を追いかけるのが難しいことがよくあります。
- コルーチンは、スレッドと同様にプログラミングモデルの変更を必要としないため、 使いやすいものです。async と同じく、多数のタスクもサポートできます。 しかし、システムプログラミングやカスタムランタイムの実装者にとって重要な低レベルの詳細を抽象化してしまいます。
- アクターモデルは、すべての並行計算をアクターと呼ばれる単位に分割します。 アクターは、分散システムの場合とよく似た、失敗する可能性のあるメッセージパッシングを通じて通信します。アクターモデルは効率的に実装できますが、 フロー制御やリトライロジックなど、多くの実践的な問題が未解決のまま残ります。
要約すると、非同期プログラミングは、スレッドやコルーチンの使いやすさの利点の大部分を提供しながら、 Rust のような低レベル言語に適した高性能な実装を可能にします。
Rust の Async と他の言語の比較
非同期プログラミングは多くの言語でサポートされていますが、実装によって詳細はいくつか異なります。Rust の async の実装は、いくつかの点でほとんどの言語とは異なります。
- Future は Rust では不活性であり、poll されたときにのみ進行します。future を drop すると、 それ以上進行しなくなります。
- Rust の async はゼロコストです。つまり、使ったものに対してだけコストを支払います。 具体的には、ヒープ割り当てや動的ディスパッチなしで async を使用でき、 これはパフォーマンスにとって大きな利点です! これにより、組み込みシステムなどの制約のある環境でも async を使用できます。
- Rust には組み込みランタイムがありません。代わりに、ランタイムはコミュニティが保守するクレートによって提供されます。
- Rust ではシングルスレッドおよびマルチスレッドの両方のランタイムが利用可能で、 それぞれ異なる長所と短所があります。
Rust における Async とスレッドの比較
Rust における async の主な代替手段は、OS スレッドを使うことです。これは
std::thread
を通じて直接使うことも、スレッドプールを通じて間接的に使うこともできます。
スレッドから async へ、またはその逆へ移行するには、
通常、実装の面でも、(ライブラリを構築している場合は)公開されるパブリックインターフェイスの面でも、大規模なリファクタリング作業が必要です。そのため、
早い段階でニーズに合ったモデルを選択すれば、多くの開発時間を節約できます。
OS スレッドは、スレッドには CPU とメモリのオーバーヘッドが伴うため、少数のタスクに適しています。 スレッドの生成やスレッド間の切り替えは非常に高コストであり、アイドル状態のスレッドでさえシステムリソースを消費します。 スレッドプールライブラリはこれらのコストの一部を緩和する助けになりますが、すべてではありません。 しかし、スレッドを使うと、大きなコード変更なしに既存の同期コードを再利用できます。特定のプログラミングモデルは必要ありません。 一部のオペレーティングシステムでは、スレッドの優先度を変更することもできます。 これは、ドライバーやその他のレイテンシに敏感なアプリケーションに役立ちます。
Async は、特にサーバーやデータベースのような大量の IO バウンドタスクを伴うワークロードにおいて、 CPU とメモリのオーバーヘッドを大幅に削減します。 他の条件が同じであれば、OS スレッドよりも桁違いに多くのタスクを扱えます。 これは、async ランタイムが少数の(高コストな)スレッドを使って多数の(低コストな)タスクを処理するためです。 しかし、async Rust では、async 関数から生成されるステートマシンと、各実行可能ファイルが async ランタイムを同梱することにより、 バイナリ blob が大きくなります。
最後に付け加えると、非同期プログラミングはスレッドよりも_優れている_わけではなく、 異なるものです。 パフォーマンス上の理由で async が必要ないのであれば、多くの場合、スレッドの方がより単純な代替手段になります。
例: 並行ダウンロード
この例では、2 つの Web ページを並行にダウンロードすることが目標です。 典型的なスレッドベースのアプリケーションでは、並行性を実現するためにスレッドを生成する必要があります。
fn get_two_sites() {
// Spawn two threads to do work.
let thread_one = thread::spawn(|| download("https://www.foo.com"));
let thread_two = thread::spawn(|| download("https://www.bar.com"));
// Wait for both threads to complete.
thread_one.join().expect("thread one panicked");
thread_two.join().expect("thread two panicked");
}
しかし、Web ページのダウンロードは小さなタスクです。このような少量の作業のためにスレッドを作成するのは、かなり無駄です。より大きなアプリケーションでは、 簡単にボトルネックになり得ます。async Rust では、追加のスレッドなしにこれらのタスクを並行に実行できます。
async fn get_two_sites_async() {
// Create two different "futures" which, when run to completion,
// will asynchronously download the webpages.
let future_one = download_async("https://www.foo.com");
let future_two = download_async("https://www.bar.com");
// Run both futures to completion at the same time.
join!(future_one, future_two);
}
ここでは、追加のスレッドは作成されません。さらに、すべての関数呼び出しは静的にディスパッチされ、 ヒープ割り当てもありません! しかし、そもそもコードを非同期に書く必要があります。 この本はそれを実現する手助けをします。
Rust におけるカスタム並行性モデル
最後に付け加えると、Rust はスレッドと async のどちらかを選ぶことを強制しません。 同じアプリケーション内で両方のモデルを使用できます。これは、 スレッドベースの依存関係と async の依存関係が混在している場合に役立つことがあります。 実際、イベント駆動プログラミングなど、まったく別の並行性モデルを使うことさえできます。 それを実装しているライブラリを見つけられる限り可能です。
非同期Rustの現状
async Rustの一部は、同期Rustと同じ安定性保証でサポートされています。 その他の部分はまだ成熟途上であり、時間とともに変化していきます。 async Rustでは、次のことを期待できます。
- 典型的な並行ワークロードに対する卓越したランタイム性能。
- ライフタイムやピン留めなど、高度な言語機能とのより頻繁な関わり。
- 同期コードと非同期コードの間、および異なる非同期ランタイム間の いくつかの互換性上の制約。
- 非同期ランタイムと言語サポートが継続的に進化していることによる、 より高いメンテナンス負担。
要するに、async Rustは同期Rustよりも使うのが難しく、 メンテナンス負担が高くなる可能性がありますが、 その見返りとしてクラス最高の性能を得られます。 async Rustのあらゆる領域は絶えず改善されているため、 これらの問題の影響は時間とともに薄れていくでしょう。
言語とライブラリのサポート
非同期プログラミングはRust自体によってサポートされていますが、 ほとんどの非同期アプリケーションは、 コミュニティクレートが提供する機能に依存しています。 そのため、言語機能とライブラリサポートの組み合わせに 頼る必要があります。
Futureトレイトのような、 最も基本的なトレイト、型、関数は標準ライブラリによって提供されています。async/await構文はRustコンパイラによって直接サポートされています。- 多くのユーティリティ型、マクロ、関数は
futuresクレートによって提供されています。これらは任意のasync Rustアプリケーションで使用できます。 - 非同期コードの実行、IO、タスクの生成は、Tokioやasync-stdなどの 「非同期ランタイム」によって提供されています。ほとんどの非同期アプリケーションと一部の 非同期クレートは、特定のランタイムに依存しています。詳細については “非同期エコシステム”セクションを参照してください。
同期Rustで慣れているかもしれない一部の言語機能は、まだ async Rustでは利用できません。特に、Rustでは1.75.0 stableまでトレイト内でasync 関数を宣言できませんでした(そして、それらのトレイトに対する動的ディスパッチには今も制限があります)。代わりに、同じ 結果を得るには回避策を使う必要があり、それにより記述が冗長になる場合があります。
コンパイルとデバッグ
ほとんどの場合、async Rustにおけるコンパイラエラーとランタイムエラーは、 Rustでこれまで常にそうであったのと同じように機能します。いくつかの 注目すべき違いがあります。
コンパイルエラー
async Rustのコンパイルエラーは、同期Rustと同じ高い標準に準拠していますが、 async Rustはライフタイムやピン留めなど、より複雑な言語機能に依存することが多いため、 この種のエラーにより頻繁に遭遇する可能性があります。
ランタイムエラー
コンパイラがasync関数に遭遇するたびに、内部でステートマシンを生成します。 async Rustのスタックトレースには通常、これらのステートマシンからの詳細と、 ランタイムからの関数呼び出しが含まれます。そのため、スタックトレースの解釈は、 同期Rustの場合よりも少し複雑になることがあります。
新しい失敗モード
async Rustでは、いくつかの新しい失敗モードが起こり得ます。たとえば、
非同期コンテキストからブロッキング関数を呼び出した場合や、
Futureトレイトを誤って実装した場合です。このようなエラーは、コンパイラと、
場合によってはユニットテストさえも静かに通過してしまうことがあります。
この本が提供することを目指している基礎概念をしっかり理解しておくことで、
これらの落とし穴を避けるのに役立ちます。
互換性に関する考慮事項
非同期コードと同期コードは、常に自由に組み合わせられるわけではありません。 たとえば、同期関数からasync関数を直接呼び出すことはできません。 同期コードと非同期コードはまた、異なる設計パターンを促進する傾向があり、 異なる環境向けのコードを組み合わせるのが難しくなることがあります。
非同期コード同士でさえ、常に自由に組み合わせられるわけではありません。一部のクレートは、 機能するために特定の非同期ランタイムに依存しています。その場合、通常は クレートの依存関係リストに指定されています。
これらの互換性の問題は選択肢を制限する可能性があるため、 必要になる可能性のある非同期ランタイムとクレートを早い段階で調査するようにしてください。 いったんランタイムを決めて落ち着けば、互換性について あまり心配する必要はなくなります。
性能特性
async Rustの性能は、使用している非同期ランタイムの 実装に依存します。 async Rustアプリケーションを支えるランタイムは比較的新しいものの、 ほとんどの実用的なワークロードで非常に優れた性能を発揮します。
とはいえ、非同期エコシステムの大部分は_マルチスレッド_ランタイムを前提としています。 そのため、単一スレッドの非同期アプリケーションが持つ理論上の性能上の利点、 つまりより低コストな同期処理を享受することが難しくなっています。 もう1つ見過ごされがちなユースケースは、ドライバーやGUIアプリケーションなどにとって 重要な_レイテンシに敏感なタスク_です。このようなタスクは、適切にスケジュールされるために ランタイムやOSのサポートに依存します。 将来的には、これらのユースケースに対するライブラリサポートの向上が期待できます。
async/.await入門
async/.awaitは、同期コードのように見える非同期関数を書くための Rust 組み込みのツールです。asyncはコードブロックを、Futureと呼ばれるトレイトを実装するステートマシンに変換します。同期メソッド内でブロッキング関数を呼び出すとスレッド全体がブロックされるのに対し、ブロックされたFutureはスレッドの制御を譲り、他のFutureが実行できるようにします。
Cargo.tomlファイルにいくつかの依存関係を追加しましょう。
[dependencies]
futures = "0.3"
非同期関数を作成するには、async fn構文を使用できます。
#![allow(unused)]
fn main() {
async fn do_something() { /* ... */ }
}
async fnによって返される値はFutureです。何かを実行するには、そのFutureをエグゼキューター上で実行する必要があります。
// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;
async fn hello_world() {
println!("hello, world!");
}
fn main() {
let future = hello_world(); // Nothing is printed
block_on(future); // `future` is run and "hello, world!" is printed
}
async fnの内部では、別のasync fnの出力など、Futureトレイトを実装する別の型の完了を待つために.awaitを使用できます。block_onとは異なり、.awaitは現在のスレッドをブロックせず、代わりに future が完了するのを非同期に待ちます。これにより、future が現在進行できない場合に他のタスクを実行できるようになります。
たとえば、learn_song、sing_song、danceという 3 つのasync fnがあるとします。
async fn learn_song() -> Song { /* ... */ }
async fn sing_song(song: Song) { /* ... */ }
async fn dance() { /* ... */ }
学び、歌い、踊るための 1 つの方法は、これらを個別にブロックすることです。
fn main() {
let song = block_on(learn_song());
block_on(sing_song(song));
block_on(dance());
}
しかし、この方法では可能な限り最高のパフォーマンスを得られません。一度に 1 つのことしかしていないからです!歌を歌う前にその歌を学ばなければならないのは明らかですが、歌を学んだり歌ったりするのと同時に踊ることは可能です。これを行うために、並行して実行できる 2 つの別々のasync fnを作成できます。
async fn learn_and_sing() {
// Wait until the song has been learned before singing it.
// We use `.await` here rather than `block_on` to prevent blocking the
// thread, which makes it possible to `dance` at the same time.
let song = learn_song().await;
sing_song(song).await;
}
async fn async_main() {
let f1 = learn_and_sing();
let f2 = dance();
// `join!` is like `.await` but can wait for multiple futures concurrently.
// If we're temporarily blocked in the `learn_and_sing` future, the `dance`
// future will take over the current thread. If `dance` becomes blocked,
// `learn_and_sing` can take back over. If both futures are blocked, then
// `async_main` is blocked and will yield to the executor.
futures::join!(f1, f2);
}
fn main() {
block_on(async_main());
}
この例では、歌を歌う前にその歌を学ぶ必要がありますが、学ぶことと歌うことの両方は、踊ることと同時に行えます。learn_and_singでlearn_song().awaitではなくblock_on(learn_song())を使用した場合、learn_songの実行中はそのスレッドで他に何もできません。これにより、同時に踊ることは不可能になります。learn_song future に対して.awaitすることで、learn_songがブロックされている場合に他のタスクが現在のスレッドを引き継げるようになります。これにより、同じスレッド上で複数の future を並行して完了まで実行できるようになります。
内部の仕組み: Future とタスクの実行
このセクションでは、Future と非同期タスクがどのようにスケジュールされるかについて、その基礎となる構造を扱います。既存の Future 型を使用する高レベルなコードの書き方を学ぶことだけに関心があり、Future 型がどのように動作するかの詳細に関心がない場合は、async/await の章まで読み飛ばしてかまいません。ただし、この章で取り上げるいくつかのトピックは、async/await コードがどのように動作するかを理解するうえで、また async/await コードのランタイムおよびパフォーマンス特性を理解するうえで、さらに新しい非同期プリミティブを構築するうえで役立ちます。今このセクションを読み飛ばすことにした場合でも、将来再訪できるようにブックマークしておくとよいでしょう。
では、前置きはこのくらいにして、Future トレイトについて話しましょう。
Future トレイト
Future トレイトは、Rust における非同期プログラミングの中心にあります。
Future は、値を生成できる非同期計算です
(ただし、その値は空である場合もあります。例: ())。future トレイトの
簡略化した バージョンは、次のようになるかもしれません。
#![allow(unused)]
fn main() {
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
}
Future は poll 関数を呼び出すことで進めることができ、この関数は future を
可能な限り完了に近づけるよう駆動します。future が完了した場合は、
Poll::Ready(result) を返します。future がまだ完了できない場合は、
Poll::Pending を返し、Future がさらに進行できる状態になったときに
wake() 関数が呼び出されるようにします。wake() が呼び出されると、
その Future を駆動しているエグゼキューターは再び poll を呼び出し、
Future がさらに進行できるようにします。
wake() がなければ、エグゼキューターは特定の future がいつ進行できるのかを
知る手段がなく、すべての future を常にポーリングし続ける必要があります。
wake() があれば、エグゼキューターはどの future が poll される準備が
できているのかを正確に把握できます。
たとえば、すでにデータが利用可能かもしれないし、そうでないかもしれない
ソケットから読み取りたい場合を考えてみましょう。データがあれば、それを
読み込んで Poll::Ready(data) を返せますが、データがまだ準備できていなければ、
私たちの future はブロックされ、それ以上進行できなくなります。データが
利用できない場合は、ソケット上でデータが準備できたときに wake が
呼び出されるよう登録しなければなりません。これにより、私たちの future が
進行できる状態になったことをエグゼキューターに伝えます。単純な SocketRead
future は、次のようになるかもしれません。
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it.
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data.
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
この Future のモデルにより、中間のアロケーションを必要とせずに複数の
非同期操作を合成できます。複数の future を同時に実行したり、future を
連鎖させたりすることは、次のようなアロケーション不要のステートマシンとして
実装できます。
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
// Each field may contain a future that should be run to completion.
// If the future has already completed, the field is set to `None`.
// This prevents us from polling a future after it has completed, which
// would violate the contract of the `Future` trait.
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// Attempt to complete future `a`.
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// Attempt to complete future `b`.
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// Both futures have completed -- we can return successfully
Poll::Ready(())
} else {
// One or both futures returned `Poll::Pending` and still have
// work to do. They will call `wake()` when progress can be made.
Poll::Pending
}
}
}
これは、個別のアロケーションを必要とせずに複数の future を同時に実行でき、 より効率的な非同期プログラムを可能にすることを示しています。同様に、複数の 逐次的な future を次々に実行することもできます。次のようになります。
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// We've completed the first future -- remove it and start on
// the second!
Poll::Ready(()) => self.first.take(),
// We couldn't yet complete the first future.
// Notice that we disrupt the flow of the `poll` function with the `return` statement.
Poll::Pending => return Poll::Pending,
};
}
// Now that the first future is done, attempt to complete the second.
self.second.poll(wake)
}
}
これらの例は、複数のアロケートされたオブジェクトや深くネストしたコールバックを
必要とせずに、Future トレイトを使って非同期の制御フローを表現できることを
示しています。基本的な制御フローについてはこれで済んだので、実際の Future
トレイトと、それがどのように異なるのかについて話しましょう。
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
最初に気づく変更点は、self の型がもはや &mut Self ではなく、
Pin<&mut Self> に変わっていることです。ピン留めについては後のセクションで
さらに詳しく説明しますが、今は、これによって移動不能な future を作成できるように
なると理解しておいてください。移動不能なオブジェクトは、自身のフィールド間に
ポインターを格納できます。例: struct MyFut { a: i32, ptr_to_a: *const i32 }。
ピン留めは async/await を可能にするために必要です。
次に、wake: fn() は &mut Context<'_> に変わりました。SimpleFuture では、
関数ポインター(fn())の呼び出しを使って、対象の future をポーリングすべきである
ことを future エグゼキューターに伝えていました。しかし、fn() は単なる関数
ポインターであるため、どの Future が wake を呼び出したのかに関する
データを格納できません。
現実世界のシナリオでは、Web サーバーのような複雑なアプリケーションは、
wake up をそれぞれ個別に管理すべき数千もの異なる接続を持つ場合があります。
Context 型は、特定のタスクを wake up するために使用できる Waker 型の値への
アクセスを提供することで、これを解決します。
Waker によるタスクのウェイクアップ
Future が最初に poll された時点では完了できないことはよくあります。これが起きた場合、その Future は、さらに処理を進められる状態になった時点で再度 poll されるようにする必要があります。これは Waker 型によって行われます。
Future が poll されるたびに、それは「タスク」の一部として poll されます。タスクとは、Executor に送信されたトップレベルの Future です。
Waker は、関連付けられたタスクを起床させるべきであることを Executor に伝えるために使用できる wake() メソッドを提供します。wake() が呼び出されると、Executor は Waker に関連付けられたタスクが処理を進められる状態であり、その Future を再度 poll すべきであることを認識します。
Waker は clone() も実装しているため、コピーして持ち回ったり保存したりできます。
Waker を使って、簡単なタイマー Future を実装してみましょう。
実践: タイマーを構築する
この例では、タイマーが作成されたときに新しいスレッドを起動し、必要な時間だけスリープしてから、時間枠が経過したときにタイマー Future に通知するだけにします。
まず、cargo new --lib timer_future で新しいプロジェクトを開始し、作業を始めるために必要な import を src/lib.rs に追加します。
#![allow(unused)]
fn main() {
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
}
まず、Future 型そのものを定義するところから始めましょう。この Future には、タイマーが経過し、Future が完了すべきであることをスレッドが伝えるための方法が必要です。スレッドと Future の間で通信するために、共有された Arc<Mutex<..>> 値を使用します。
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
struct SharedState {
/// Whether or not the sleep time has elapsed
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
waker: Option<Waker>,
}
それでは、実際に Future 実装を書いてみましょう!
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
かなりシンプルですよね? スレッドが shared_state.completed = true を設定していれば、完了です! そうでなければ、現在のタスクの Waker を clone して shared_state.waker に渡し、スレッドがそのタスクを再び起床できるようにします。
重要なのは、Future が poll されるたびに Waker を更新しなければならないことです。なぜなら、その Future が異なる Waker を持つ別のタスクへ移動している可能性があるためです。これは、Future が poll された後にタスク間で受け渡される場合に起こります。
最後に、実際にタイマーを構築し、スレッドを開始するための API が必要です。
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
やりました! これで簡単なタイマー Future を構築するために必要なものはすべて揃いました。あとは、この Future を実行するための Executor さえあればいいのですが…
応用: エグゼキューターを構築する
Rust の Future は遅延評価されます。完了まで能動的に駆動されない限り、何もしません。future を完了まで駆動する方法の 1 つは、async 関数の中でそれを .await することですが、それは問題を 1 段階上に押し上げるだけです。トップレベルの async 関数から返される future は誰が実行するのでしょうか?答えは、Future エグゼキューターが必要だということです。
Future エグゼキューターは、トップレベルの Future の集合を受け取り、その Future が進行できるようになるたびに poll を呼び出すことで、それらを完了まで実行します。通常、エグゼキューターは最初に future を 1 回 poll して開始します。Future が wake() を呼び出して進行する準備ができたことを示すと、それらはキューに戻され、再び poll が呼び出されます。これを Future が完了するまで繰り返します。
このセクションでは、多数のトップレベル future を並行して完了まで実行できる、独自のシンプルなエグゼキューターを書きます。
この例では、Waker を簡単に構築する方法を提供する ArcWake トレイトのために、futures クレートに依存します。Cargo.toml を編集して、新しい依存関係を追加します。
[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2021"
[dependencies]
futures = "0.3"
次に、src/main.rs の先頭に以下のインポートが必要です。
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
// The timer we wrote in the previous section:
use timer_future::TimerFuture;
私たちのエグゼキューターは、実行するタスクをチャネル経由で送信することで動作します。エグゼキューターはチャネルからイベントを取り出して実行します。タスクがさらに処理を行う準備ができた(起床された)とき、そのタスクは自分自身をチャネルに戻すことで、再び poll されるようにスケジュールできます。
この設計では、エグゼキューター自体に必要なのはタスクチャネルの受信側だけです。ユーザーは新しい future を spawn できるように、送信側を取得します。タスク自体は、自分自身を再スケジュールできる future にすぎないため、future と、そのタスクが自分自身をキューに戻すために使える sender のペアとして格納します。
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need to use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
新しい future を簡単に spawn できるように、spawner にメソッドも追加しましょう。このメソッドは future 型を受け取り、それを box 化し、その中に future を持つ新しい Arc<Task> を作成します。これはエグゼキューター上のキューに入れることができます。
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.try_send(task).expect("too many tasks queued");
}
}
future を poll するには、Waker を作成する必要があります。
タスクのウェイクアップのセクションで説明したように、Waker は wake が呼び出された後にタスクが再び poll されるようスケジュールする責任を持ちます。Waker は、どのタスクが準備完了になったかをエグゼキューターに正確に伝えるため、エグゼキューターは進行する準備ができた future だけを poll できます。新しい Waker を作成する最も簡単な方法は、ArcWake トレイトを実装し、その後 waker_ref または .into_waker() 関数を使って Arc<impl ArcWake> を Waker に変換することです。タスクを Waker に変換して起床できるようにするため、タスクに対して ArcWake を実装しましょう。
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self
.task_sender
.try_send(cloned)
.expect("too many tasks queued");
}
}
Arc<Task> から Waker が作成されると、その上で wake() を呼び出すことで、Arc のコピーがタスクチャネルへ送信されます。その後、エグゼキューターはそのタスクを受け取り、poll する必要があります。これを実装しましょう。
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if future.as_mut().poll(context).is_pending() {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
おめでとうございます!これで動作する futures エグゼキューターができました。これを使って、async/.await コードや、以前に書いた TimerFuture のようなカスタム future を実行することもできます。
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}
エグゼキューターとシステム IO
The Future Trait の前のセクションでは、ソケットに対して非同期読み取りを実行する
future の次の例について説明しました。
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it.
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data.
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
この future はソケットで利用可能なデータを読み取り、利用可能なデータがない場合は
エグゼキューターに制御を譲り、ソケットが再び読み取り可能になったときにそのタスクを
起こすよう要求します。しかし、この例からは Socket 型がどのように実装されているのか、
特に set_readable_callback 関数がどのように動作するのかは明らかではありません。
ソケットが読み取り可能になった時点で wake() が呼び出されるようにするには、
どうすればよいでしょうか。1 つの選択肢として、socket が読み取り可能かどうかを
継続的に確認し、適切なタイミングで wake() を呼び出すスレッドを用意する方法があります。
しかし、これは非常に非効率で、ブロックされた IO future ごとに個別のスレッドが必要になります。
これにより、非同期コードの効率は大きく低下します。
実際には、この問題は IO を認識するシステムのブロッキングプリミティブとの統合によって
解決されます。たとえば、Linux の epoll、FreeBSD と Mac OS の kqueue、
Windows の IOCP、Fuchsia の port などです(これらはすべて、クロスプラットフォームの
Rust クレート mio を通じて公開されています)。これらのプリミティブはいずれも、
スレッドが複数の非同期 IO イベントでブロックし、そのうちの 1 つが完了した時点で
戻ることを可能にします。実際には、これらの API は通常、次のような形になります。
struct IoBlocker {
/* ... */
}
struct Event {
// 発生し、監視対象だったイベントを一意に識別する ID。
id: usize,
// 待機するシグナル、または発生したシグナルの集合。
signals: Signals,
}
impl IoBlocker {
/// ブロック対象となる非同期 IO イベントの新しいコレクションを作成する。
fn new() -> Self { /* ... */ }
/// 特定の IO イベントへの関心を表明する。
fn add_io_event_interest(
&self,
/// イベントが発生する対象のオブジェクト
io_object: &IoObject,
/// `io_object` 上に現れる可能性があり、
/// イベントをトリガーすべきシグナルの集合。
/// その関心から生じるイベントに与える ID と対にする。
event: Event,
) { /* ... */ }
/// いずれかのイベントが発生するまでブロックする。
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// たとえばソケット 1 が読み取り可能になった場合、"Socket 1 is now READABLE" と表示する。
println!("Socket {:?} is now {:?}", event.id, event.signals);
Future エグゼキューターは、これらのプリミティブを使用して、特定の IO イベントが
発生したときに実行されるコールバックを設定できるソケットのような非同期 IO オブジェクトを
提供できます。上記の SocketRead の例の場合、Socket::set_readable_callback 関数は
次の擬似コードのようになるかもしれません。
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
// `local_executor` はローカルエグゼキューターへの参照です。
// これはソケットの作成時に提供することもできますが、実際には
// 多くのエグゼキューター実装が利便性のためにスレッドローカル
// ストレージを通じて渡します。
let local_executor = self.local_executor;
// この IO オブジェクトの一意な ID。
let id = self.id;
// IO イベントが到着した時点で呼び出せるように、
// ローカルの waker をエグゼキューターのマップに保存します。
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
これで、任意の IO イベントを受信し、適切な Waker にディスパッチできる
エグゼキュータースレッドを 1 つだけ持てるようになります。その Waker は対応するタスクを
起こし、エグゼキューターはさらに多くのタスクを完了まで進めてから、再びより多くの
IO イベントを確認するために戻ることができます(そしてこのサイクルが続きます…)。
async/.await
第1章では、async/.await について簡単に見ました。
この章では、async/.await についてより詳しく説明し、
それがどのように動作するか、そして async コードが従来の Rust プログラムと
どのように異なるかを解説します。
async/.await は Rust の特別な構文要素であり、ブロックする代わりに
現在のスレッドの制御を譲ることを可能にします。これにより、ある操作の完了を
待っている間に、他のコードを進めることができます。
async を使用する主な方法は 2 つあります。async fn と async ブロックです。
どちらも Future トレイトを実装する値を返します。
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// This `async` block results in a type that implements
// `Future<Output = u8>`.
async {
let x: u8 = foo().await;
x + 5
}
}
第1章で見たように、async 本体やその他の future は遅延評価されます。
つまり、実行されるまで何もしません。Future を実行する最も一般的な方法は、
それに対して .await することです。Future に対して .await が呼び出されると、
それを完了まで実行しようとします。Future がブロックされている場合、
現在のスレッドの制御を譲ります。さらに進められるようになると、Future は
executor によって取り上げられて実行を再開し、.await が解決できるようになります。
async のライフタイム
従来の関数とは異なり、参照やその他の非 static 引数を取る async fn は、
引数のライフタイムによって制限される Future を返します。
// This function:
async fn foo(x: &u8) -> u8 { *x }
// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
これは、async fn から返される future は、その非 static 引数がまだ有効な間に
.await されなければならないことを意味します。関数を呼び出した直後に
future を .await する一般的なケース(foo(&x).await のような場合)では、
これは問題になりません。しかし、future を保存したり、別のタスクやスレッドに
送ったりする場合には、問題になる可能性があります。
参照を引数として持つ async fn を static future に変換するための一般的な
回避策の 1 つは、async fn の呼び出しと引数を async ブロック内にまとめることです。
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
引数を async ブロックに移動することで、そのライフタイムを good の呼び出しから
返される Future のライフタイムと一致するように延長します。
async move
async ブロックとクロージャでは、通常のクロージャと同様に move キーワードを
使用できます。async move ブロックは、参照する変数の所有権を取得します。
これにより、現在のスコープより長く生存できるようになりますが、それらの変数を
他のコードと共有する能力は失われます。
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
let my_string = "foo".to_string();
let future_one = async {
// ...
println!("{my_string}");
};
let future_two = async {
// ...
println!("{my_string}");
};
// Run both futures to completion, printing "foo" twice:
let ((), ()) = futures::join!(future_one, future_two);
}
/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move {
// ...
println!("{my_string}");
}
}
マルチスレッド Executor における .await
マルチスレッドの Future executor を使用する場合、Future はスレッド間を
移動する可能性があることに注意してください。そのため、async 本体で使用される
すべての変数は、スレッド間を移動できなければなりません。どの .await も、
新しいスレッドへの切り替えを引き起こす可能性があるためです。
これは、Send トレイトを実装していない Rc、&RefCell、その他の型を使用することは
安全ではないことを意味します。これには、Sync トレイトを実装していない型への参照も
含まれます。
(注意: .await の呼び出し中にこれらの型がスコープ内にない限り、これらの型を
使用することは可能です。)
同様に、従来の future を意識しないロックを .await をまたいで保持することは
よい考えではありません。これはスレッドプールをロックアップさせる可能性があります。
あるタスクがロックを取得し、.await して executor に制御を譲ると、別のタスクが
そのロックを取得しようとしてデッドロックを引き起こす可能性があります。これを避けるには、
std::sync のものではなく、futures::lock の Mutex を使用してください。
Stream トレイト
Stream トレイトは Future に似ていますが、標準ライブラリの Iterator トレイトと同様に、完了する前に複数の値を生成できます。
trait Stream {
/// The type of the value yielded by the stream.
type Item;
/// Attempt to resolve the next item in the stream.
/// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
/// is ready, and `Poll::Ready(None)` if the stream has completed.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
Stream の一般的な例の 1 つは、futures クレートのチャネル型に対する Receiver です。これは、Sender 側から値が送信されるたびに Some(val) を生成し、Sender がドロップされ、保留中のメッセージがすべて受信されると None を生成します。
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
// `StreamExt::next` is similar to `Iterator::next`, but returns a
// type that implements `Future<Output = Option<T>>`.
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
イテレーションと並行性
同期的な Iterator と同様に、Stream 内の値をイテレートして処理する方法は数多くあります。map、filter、fold のようなコンビネータースタイルのメソッドや、それらのエラー時に早期終了する仲間である try_map、try_filter、try_fold があります。
残念ながら、for ループは Stream では使用できませんが、命令型スタイルのコードでは、while let と next/try_next 関数を使用できます。
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
しかし、単に一度に 1 つの要素を処理しているだけでは、並行性の機会を逃している可能性があります。そもそも、それこそが非同期コードを書いている理由です。ストリームから複数の項目を並行して処理するには、for_each_concurrent メソッドと try_for_each_concurrent メソッドを使用します。
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
複数の Future を一度に実行する
ここまで、主に .await を使って Future を実行してきました。これは、特定の Future が完了するまで現在のタスクをブロックします。しかし、実際の非同期アプリケーションでは、複数の異なる操作を並行して実行する必要があることがよくあります。
この章では、複数の非同期操作を同時に実行する方法をいくつか取り上げます。
join!: すべての Future が完了するまで待機しますselect!: 複数の Future のうち 1 つが完了するまで待機します- スポーン: Future をバックグラウンドで完了まで実行するトップレベルのタスクを作成します
FuturesUnordered: 各サブ Future の結果を生成する Future のグループ
join!
futures::join マクロを使うと、複数の異なる Future をすべて並行に実行しながら、それらが完了するのを待てます。
join!
複数の非同期操作を実行するとき、単純にそれらを順番に .await したくなるかもしれません。
async fn get_book_and_music() -> (Book, Music) {
let book = get_book().await;
let music = get_music().await;
(book, music)
}
しかし、これは必要以上に遅くなります。なぜなら、get_book が完了するまで get_music の実行を試み始めないためです。他の言語では、Future が暗黙的に完了まで実行されることがあるため、まず各 async fn を呼び出して Future を開始し、その後で両方を await することで、2 つの操作を並行に実行できます。
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
let book_future = get_book();
let music_future = get_music();
(book_future.await, music_future.await)
}
しかし、Rust の Future は能動的に .await されるまで何も処理しません。つまり、上記の 2 つのコードスニペットはどちらも、book_future と music_future を並行に実行するのではなく、順番に実行します。2 つの Future を正しく並行に実行するには、futures::join! を使用します。
use futures::join;
async fn get_book_and_music() -> (Book, Music) {
let book_fut = get_book();
let music_fut = get_music();
join!(book_fut, music_fut)
}
join! が返す値は、渡された各 Future の出力を含むタプルです。
try_join!
Result を返す Future では、join! ではなく try_join! の使用を検討してください。join! はすべてのサブ Future が完了して初めて完了するため、サブ Future の 1 つが Err を返した後でも、他の Future の処理を継続します。
join! とは異なり、try_join! はサブ Future の 1 つがエラーを返すと即座に完了します。
use futures::try_join;
async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
try_join! に渡される Future は、すべて同じエラー型でなければならないことに注意してください。エラー型を統合するには、futures::future::TryFutureExt の .map_err(|e| ...) 関数と .err_into() 関数の使用を検討してください。
use futures::{
future::TryFutureExt,
try_join,
};
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
select!
futures::select マクロは複数の Future を同時に実行し、
いずれかの Future が完了した時点ですぐに応答できるようにします。
#![allow(unused)]
fn main() {
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();
pin_mut!(t1, t2);
select! {
() = t1 => println!("task one completed first"),
() = t2 => println!("task two completed first"),
}
}
}
上の関数は t1 と t2 の両方を並行して実行します。
t1 または t2 のいずれかが終了すると、対応するハンドラーが println! を呼び出し、
残りのタスクを完了させることなく関数は終了します。
select の基本構文は <pattern> = <expression> => <code>, であり、
select の対象にしたい Future の数だけ繰り返します。
default => ... と complete => ...
select は default ブランチと complete ブランチもサポートしています。
default ブランチは、select の対象になっている Future のどれもまだ完了していない場合に実行されます。
したがって、default ブランチを持つ select は常に即座に返ります。
他の Future のどれも準備できていなければ、default が実行されるためです。
complete ブランチは、select の対象になっているすべての Future が完了し、
それ以上進捗しなくなった場合を処理するために使用できます。
これは、select! をループで使うときに便利なことがよくあります。
#![allow(unused)]
fn main() {
use futures::{future, select};
async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => unreachable!(), // never runs (futures are ready, then complete)
};
}
assert_eq!(total, 10);
}
}
Unpin および FusedFuture との相互作用
上の最初の例で気づいたかもしれませんが、
2 つの async fn が返す Future に対して .fuse() を呼び出し、
さらに pin_mut でピン留めする必要がありました。
これらの呼び出しはいずれも必要です。
なぜなら、select で使用する Future は Unpin トレイトと
FusedFuture トレイトの両方を実装していなければならないためです。
select で使用される Future は値として受け取られるのではなく、
可変参照として受け取られるため、Unpin が必要です。
Future の所有権を取得しないことで、未完了の Future は
select の呼び出し後に再び使用できます。
同様に、FusedFuture トレイトが必要なのは、
select が完了後の Future を poll してはならないためです。
FusedFuture は、自身が完了したかどうかを追跡する Future によって実装されます。
これにより、ループ内で select を使用し、
まだ完了していない Future だけを poll することが可能になります。
これは上の例で確認できます。ループの 2 回目では、
a_fut または b_fut が完了しています。
future::ready が返す Future は FusedFuture を実装しているため、
それを再び poll しないよう select に伝えることができます。
Stream には、対応する FusedStream トレイトがあることに注意してください。
このトレイトを実装している Stream、または .fuse() を使ってラップされた Stream は、
.next() / .try_next() コンビネータから FusedFuture な Future を yield します。
#![allow(unused)]
fn main() {
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}
total
}
}
Fuse と FuturesUnordered を使った select ループ内の並行タスク
少し見つけにくいものの便利な関数の 1 つに Fuse::terminated() があります。
これは、すでに終了している空の Future を構築し、
後から実行する必要のある Future で埋めることを可能にします。
これは、select ループ中に実行する必要があるものの、
その select ループ自体の内部で作成されるタスクがある場合に便利です。
.select_next_some() 関数を使っている点に注意してください。
これは select と組み合わせて使用でき、
Stream から返される Some(_) 値に対してのみブランチを実行し、
None を無視できます。
#![allow(unused)]
fn main() {
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) { /* ... */ }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived -- start a new `run_on_new_num_fut`,
// dropping the old one.
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
// Run the `run_on_new_num_fut`
() = run_on_new_num_fut => {},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
}
同じ Future の多数のコピーを同時に実行する必要がある場合は、
FuturesUnordered 型を使用します。
次の例は上の例に似ていますが、新しい Future が作成されたときにそれらを中止するのではなく、
run_on_new_num_fut の各コピーを最後まで実行します。
また、run_on_new_num_fut が返す値も出力します。
#![allow(unused)]
fn main() {
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived -- start a new `run_on_new_num_fut`.
run_on_new_num_futs.push(run_on_new_num(new_num));
},
// Run the `run_on_new_num_futs` and check if any have completed
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
}
Spawning
スポーンを使用すると、新しい非同期タスクをバックグラウンドで実行できます。これにより、そのタスクの実行中も他のコードの実行を継続できます。
メインスレッドをブロックせずに接続を受け付けたい Web サーバーがあるとします。
これを実現するには、async_std::task::spawn 関数を使用して、接続を処理する新しいタスクを作成して実行できます。この関数は future を受け取り、JoinHandle を返します。これは、タスクが完了した後でその結果を待機するために使用できます。
use async_std::{task, net::TcpListener, net::TcpStream};
use futures::AsyncWriteExt;
async fn process_request(stream: &mut TcpStream) -> Result<(), std::io::Error>{
stream.write_all(b"HTTP/1.1 200 OK\r\n\r\n").await?;
stream.write_all(b"Hello World").await?;
Ok(())
}
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
loop {
// Accept a new connection
let (mut stream, _) = listener.accept().await.unwrap();
// Now process this request without blocking the main loop
task::spawn(async move {process_request(&mut stream).await});
}
}
spawn によって返される JoinHandle は Future トレイトを実装しているため、.await してタスクの結果を取得できます。
これにより、スポーンされたタスクが完了するまで現在のタスクはブロックされます。タスクを await しない場合、プログラムはそのタスクを待機せずに実行を継続し、タスクが完了する前に関数が完了した場合はそのタスクをキャンセルします。
#![allow(unused)]
fn main() {
use futures::future::join_all;
async fn task_spawner(){
let tasks = vec![
task::spawn(my_task(Duration::from_secs(1))),
task::spawn(my_task(Duration::from_secs(2))),
task::spawn(my_task(Duration::from_secs(3))),
];
// If we do not await these tasks and the function finishes, they will be dropped
join_all(tasks).await;
}
}
メインタスクとスポーンされたタスクの間で通信するには、使用している非同期ランタイムが提供するチャネルを使用できます。
知って愛用したい回避策
Rust の async サポートはまだ比較的新しく、現在も活発に開発が進められている
要望の多い機能がいくつかあるほか、診断メッセージにも不十分な点がいくつかあります。
この章では、よくある悩ましい点について説明し、それらを回避する方法を解説します。
Send の近似
一部の async fn ステートマシンはスレッド間で送っても安全ですが、
そうでないものもあります。async fn の Future が Send であるかどうかは、
非 Send 型が .await ポイントをまたいで保持されるかどうかによって決まります。
コンパイラは、値が .await ポイントをまたいで保持される可能性があるタイミングを
できる限り近似しようとしますが、現在、この解析はいくつかの箇所で保守的すぎます。
たとえば、単純な非 Send 型、おそらく Rc を含む型を考えてみましょう。
#![allow(unused)]
fn main() {
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
}
NotSend 型の変数は、async fn が返す結果の Future 型が Send でなければならない場合でも、
async fn 内の一時値として短時間現れることができます。
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
NotSend::default();
bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
require_send(foo());
}
しかし、foo を変更して NotSend を変数に格納すると、この例はもはやコンパイルされません。
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
let x = NotSend::default();
bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
require_send(foo());
}
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
--> src/main.rs:15:5
|
15 | require_send(foo());
| ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
= note: required because it appears within the type `NotSend`
= note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
= note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
= note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
= note: required because it appears within the type `impl std::future::Future`
= note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
--> src/main.rs:12:1
|
12 | fn require_send(_: impl Send) {}
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error: aborting due to previous error
For more information about this error, try `rustc --explain E0277`.
このエラーは正しいものです。x を変数に格納すると、それは .await の後までドロップされず、
その時点で async fn は別のスレッド上で実行されている可能性があります。
Rc は Send ではないため、スレッドをまたいで移動することを許可すると健全ではありません。
これに対する単純な解決策の 1 つは、.await の前に Rc を drop することですが、
残念ながら現在はそれでは機能しません。
この問題をうまく回避するには、非 Send 変数をカプセル化するブロックスコープを導入する必要があるかもしれません。
これにより、これらの変数が .await ポイントをまたいで生存しないことをコンパイラが判断しやすくなります。
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
{
let x = NotSend::default();
}
bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
require_send(foo());
}
再帰
内部的には、async fn は .await される各サブFutureを含むステートマシン型を作成します。これにより、再帰的な async fn は少し厄介になります。結果として得られるステートマシン型が自分自身を含まなければならないからです。
#![allow(unused)]
fn main() {
async fn step_one() { /* ... */ }
async fn step_two() { /* ... */ }
struct StepOne;
struct StepTwo;
// この関数は:
async fn foo() {
step_one().await;
step_two().await;
}
// 次のような型を生成します:
enum Foo {
First(StepOne),
Second(StepTwo),
}
// したがって、この関数は:
async fn recursive() {
recursive().await;
recursive().await;
}
// 次のような型を生成します:
enum Recursive {
First(Recursive),
Second(Recursive),
}
}
これは動作しません。無限サイズの型を作成してしまったからです! コンパイラは次のようにエラーを出します。
error[E0733]: recursion in an async fn requires boxing
--> src/lib.rs:1:1
|
1 | async fn recursive() {
| ^^^^^^^^^^^^^^^^^^^^
|
= note: a recursive `async fn` call must introduce indirection such as `Box::pin` to avoid an infinitely sized future
これを可能にするには、Box を使って間接参照を導入する必要があります。
Rust 1.77 より前では、コンパイラの制限により、recursive() への呼び出しを Box::pin でラップするだけでは十分ではありません。これを動作させるには、recursive を .boxed() された async ブロックを返す非 async 関数にする必要があります。
#![allow(unused)]
fn main() {
use futures::future::{BoxFuture, FutureExt};
fn recursive() -> BoxFuture<'static, ()> {
async move {
recursive().await;
recursive().await;
}.boxed()
}
}
新しいバージョンの Rust では、そのコンパイラの制限は解除されました。
Rust 1.77 以降、割り当てによる間接参照を伴う async fn における再帰のサポートが安定化されたため、関数の状態が無限サイズになるのを避けるために何らかの形式の間接参照を使っている限り、再帰呼び出しが許可されます。
これは、次のようなコードが動作するようになったことを意味します。
#![allow(unused)]
fn main() {
async fn recursive_pinned() {
Box::pin(recursive_pinned()).await;
Box::pin(recursive_pinned()).await;
}
}
トレイトにおける async
現在、Rust の stable リリースでは、トレイト内で async fn を使用できません。
2022年11月17日以降、async-fn-in-trait の MVP がコンパイラツールチェーンの nightly
版で利用可能になっています。詳細はこちらを参照してください。
それまでの間、stable ツールチェーン向けの回避策として、 crates.io の async-trait クレートを使用できます。
これらのトレイトメソッドを使用すると、関数呼び出しごとにヒープ割り当てが 発生することに注意してください。これは大多数のアプリケーションにとって大きな コストではありませんが、1秒間に何百万回も呼び出されることが想定される 低レベル関数の公開 API でこの機能を使用するかどうかを決定する際には、 考慮する必要があります。
最新の更新: https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html
非同期エコシステム
Rust は現在、async コードを書くための最低限の要素のみを提供しています。 重要なこととして、エグゼキューター、タスク、リアクター、コンビネーター、低レベル I/O の Future とトレイトは まだ標準ライブラリでは提供されていません。当面は、 コミュニティが提供する非同期エコシステムがこれらの不足を補います。
Async Foundations Team は、Async Book の例を拡張して複数のランタイムを扱えるようにすることに関心があります。 このプロジェクトへの貢献に関心がある場合は、 Zulip でご連絡ください。
非同期ランタイム
非同期ランタイムは、非同期アプリケーションを実行するために使用されるライブラリです。 ランタイムは通常、リアクター と 1 つ以上の エグゼキューター をまとめて提供します。 リアクターは、非同期 I/O、プロセス間通信、タイマーなどの外部イベントに対する購読メカニズムを提供します。 非同期ランタイムでは、購読者は通常、低レベル I/O 操作を表す Future です。 エグゼキューターは、タスクのスケジューリングと実行を処理します。 エグゼキューターは、実行中および一時停止中のタスクを追跡し、Future を完了するまでポーリングし、進行可能になったタスクを起床します。 「executor」という語は、「runtime」と同じ意味で使われることがよくあります。 ここでは、互換性のあるトレイトや機能とともにランタイムをまとめたものを表すために、「エコシステム」という語を使用します。
コミュニティ提供の非同期クレート
Futures クレート
futures クレート には、async コードを書くために有用なトレイトと関数が含まれています。
これには、Stream、Sink、AsyncRead、AsyncWrite トレイト、およびコンビネーターなどのユーティリティが含まれます。
これらのユーティリティやトレイトは、最終的に標準ライブラリの一部になる可能性があります。
futures には独自のエグゼキューターがありますが、独自のリアクターはないため、非同期 I/O やタイマー Future の実行はサポートしていません。
このため、完全なランタイムとは見なされません。
一般的な選択肢は、futures のユーティリティを別のクレートのエグゼキューターと併用することです。
広く使われている非同期ランタイム
標準ライブラリには非同期ランタイムはなく、公式に推奨されているものもありません。 次のクレートは、広く使われているランタイムを提供します。
- Tokio: HTTP、gRPC、tracing フレームワークを備えた、広く使われている非同期エコシステムです。
- async-std: 標準ライブラリのコンポーネントに対応する非同期版を提供するクレートです。
- smol: 小さく簡素化された非同期ランタイムです。
UnixStreamやTcpListenerのような構造体をラップするために使用できるAsyncトレイトを提供します。 - fuchsia-async: Fuchsia OS で使用するためのエグゼキューターです。
エコシステムの互換性を判断する
すべての非同期アプリケーション、フレームワーク、ライブラリが互いに、またはすべての OS やプラットフォームと互換性があるわけではありません。 ほとんどの非同期コードは任意のエコシステムで使用できますが、一部のフレームワークやライブラリでは特定のエコシステムを使用する必要があります。 エコシステム上の制約は必ずしも文書化されているわけではありませんが、 ライブラリ、トレイト、関数が特定のエコシステムに依存しているかどうかを判断するための経験則はいくつかあります。
非同期 I/O、タイマー、プロセス間通信、またはタスクとやり取りする async コードは、 一般的に特定の非同期エグゼキューターまたはリアクターに依存します。 async 式、コンビネーター、同期型、ストリームなど、その他すべての async コードは、 入れ子になった Future もエコシステムに依存していない限り、通常はエコシステムに依存しません。 プロジェクトを始める前に、関連する非同期フレームワークやライブラリについて調査し、 選択したランタイムおよび相互の互換性を確認することをお勧めします。
特に、Tokio は mio リアクターを使用し、
AsyncRead と AsyncWrite を含む独自バージョンの非同期 I/O トレイトを定義しています。
それ自体では、async-std や smol とは互換性がありません。
これらは async-executor クレート と、futures で定義されている AsyncRead および AsyncWrite
トレイトに依存しているためです。
競合するランタイム要件は、
あるランタイム向けに書かれたコードを別のランタイム内で呼び出せるようにする互換性レイヤーによって解決できる場合があります。
たとえば、async_compat クレート は、
Tokio とその他のランタイムの間に互換性レイヤーを提供します。
async API を公開するライブラリは、特定のエグゼキューターやリアクターに依存すべきではありません。 ただし、タスクを生成する必要がある場合や、独自の非同期 I/O またはタイマー Future を定義する必要がある場合は除きます。 理想的には、タスクのスケジューリングと実行の責任はバイナリのみが担うべきです。
シングルスレッドエグゼキューターとマルチスレッドエグゼキューター
非同期エグゼキューターは、シングルスレッドまたはマルチスレッドにできます。
たとえば、async-executor クレートには、シングルスレッドの LocalExecutor とマルチスレッドの Executor の両方があります。
マルチスレッドエグゼキューターは、複数のタスクを同時に進行させます。 多数のタスクを含むワークロードでは実行を大幅に高速化できますが、 タスク間でデータを同期するコストは通常高くなります。 シングルスレッドランタイムとマルチスレッドランタイムのどちらを選ぶかを決める際には、 アプリケーションのパフォーマンスを測定することをお勧めします。
タスクは、それを作成したスレッド上で実行することも、別のスレッド上で実行することもできます。
非同期ランタイムは、多くの場合、タスクを別のスレッドに生成する機能を提供します。
タスクが別のスレッドで実行される場合でも、非ブロッキングであるべきです。
マルチスレッドエグゼキューター上でタスクをスケジュールするには、それらのタスクは Send でもなければなりません。
一部のランタイムは、非 Send タスクを生成する関数を提供しており、
これにより、すべてのタスクがそれを生成したスレッド上で実行されることが保証されます。
また、専用スレッド上にブロッキングタスクを生成する関数を提供することもあります。
これは、他のライブラリのブロッキングな同期コードを実行する場合に便利です。
最終プロジェクト: Async Rustで並行Webサーバーを構築する
この章では、非同期Rustを使ってRust bookの シングルスレッドWebサーバー を変更し、リクエストを並行して処理できるようにします。
振り返り
レッスンの最後のコードは次のようになっていました。
src/main.rs:
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main() {
// Listen for incoming TCP connections on localhost port 7878
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// Block forever, handling each request that arrives at this IP address
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// Read the first 1024 bytes of data from the stream
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
// Respond with greetings or a 404,
// depending on the data in the request
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
// Write response back to the stream,
// and flush the stream to ensure the response is sent back to the client
let response = format!("{status_line}{contents}");
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
hello.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
404.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
cargo runでサーバーを実行し、ブラウザで127.0.0.1:7878にアクセスすると、
Ferrisからの親しみやすいメッセージが表示されます!
非同期コードの実行
HTTPサーバーは複数のクライアントを並行して処理できるべきです。 つまり、現在のリクエストを処理する前に、以前のリクエストが完了するのを待つべきではありません。 本書では、 この問題を解決しています 各接続をそれぞれ専用のスレッドで処理するスレッドプールを作成することによってです。 ここでは、スレッドを追加してスループットを向上させる代わりに、非同期コードを使って同じ効果を実現します。
handle_connection を async fn として宣言し、future を返すように変更しましょう。
async fn handle_connection(mut stream: TcpStream) {
//<-- snip -->
}
関数宣言に async を追加すると、その戻り値の型は
ユニット型 () から Future<Output=()> を実装する型に変わります。
これをコンパイルしようとすると、コンパイラはこれが動作しないことを警告します。
$ cargo check
Checking async-rust v0.1.0 (file:///projects/async-rust)
warning: unused implementer of `std::future::Future` that must be used
--> src/main.rs:12:9
|
12 | handle_connection(stream);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
handle_connection の結果を await も poll もしていないため、
それは決して実行されません。サーバーを実行してブラウザで 127.0.0.1:7878 にアクセスすると、
接続が拒否されることがわかります。私たちのサーバーはリクエストを処理していません。
同期コードの中だけでは、future を await したり poll したりすることはできません。
future のスケジューリングと完了までの実行を処理するために、非同期ランタイムが必要になります。
非同期ランタイム、エグゼキューター、リアクターの詳細については、
ランタイムの選択に関するセクション
を参照してください。
一覧にあるどのランタイムでもこのプロジェクトでは動作しますが、これらの例では
async-std クレートを使用することにしました。
非同期ランタイムの追加
次の例では、同期コードを非同期ランタイムを使用するようにリファクタリングする方法を示します。ここでは async-std を使います。
async-std の #[async_std::main] 属性を使うと、非同期の main 関数を書けます。
これを使用するには、Cargo.toml で async-std の attributes 機能を有効にします。
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
最初のステップとして、非同期の main 関数に切り替え、
非同期版の handle_connection が返す future を await します。
次に、サーバーがどのように応答するかをテストします。
これは次のようになります。
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
// Warning: This is not concurrent!
handle_connection(stream).await;
}
}
では、私たちのサーバーが接続を並行して処理できるかをテストしてみましょう。
handle_connection を非同期にしただけでは、サーバーが
複数の接続を同時に処理できることを意味しません。その理由はすぐにわかります。
これを説明するために、遅いリクエストをシミュレートしてみましょう。
クライアントが 127.0.0.1:7878/sleep にリクエストを行うと、
サーバーは5秒間スリープします。
use std::time::Duration;
use async_std::task;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{status_line}{contents}");
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
これは、The Book の
遅いリクエストのシミュレーション
と非常によく似ていますが、重要な違いが1つあります。
ブロッキング関数 std::thread::sleep の代わりに、ノンブロッキング関数 async_std::task::sleep を使用しています。
コードの一部が async fn 内で実行され、await されているとしても、それが依然としてブロックする可能性があることを覚えておくことが重要です。
私たちのサーバーが接続を並行して処理するかをテストするには、handle_connection がノンブロッキングであることを確認する必要があります。
サーバーを実行すると、127.0.0.1:7878/sleep へのリクエストが
他のすべての受信リクエストを5秒間ブロックすることがわかります!
これは、handle_connection の結果を await している間に
進行できる他の並行タスクが存在しないためです。
次のセクションでは、非同期コードを使って接続を並行して処理する方法を見ていきます。
接続を並行して処理する
これまでのコードの問題は、listener.incoming() がブロッキングなイテレーターであることです。
listener が着信接続を待っている間、エグゼキューターは他の Future を実行できず、
前の接続の処理が完了するまで、新しい接続を処理できません。
これを修正するために、listener.incoming() をブロッキングな Iterator から
ノンブロッキングな Stream に変換します。Stream は Iterator と似ていますが、非同期に消費できます。
詳細については、Stream に関する章を参照してください。
ブロッキングな std::net::TcpListener をノンブロッキングな async_std::net::TcpListener に置き換え、
接続ハンドラーを更新して async_std::net::TcpStream を受け取るようにしましょう。
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
TcpListener の非同期版は、listener.incoming() に対して Stream トレイトを実装しており、
この変更には 2 つの利点があります。
1 つ目は、listener.incoming() がエグゼキューターをブロックしなくなることです。
処理すべき着信 TCP 接続がない間、
エグゼキューターは他の保留中の Future に譲ることができるようになります。
2 つ目の利点は、Stream の要素を、Stream の for_each_concurrent メソッドを使って
任意で並行に処理できることです。
ここでは、このメソッドを活用して各着信リクエストを並行して処理します。
futures クレートから Stream トレイトをインポートする必要があるため、Cargo.toml は次のようになります。
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
これで、クロージャ関数を通じて handle_connection を渡すことで、各接続を並行して処理できます。
クロージャ関数は各 TcpStream の所有権を取得し、新しい TcpStream が利用可能になり次第実行されます。
handle_connection がブロックしない限り、遅いリクエストが他のリクエストの完了を妨げることはなくなります。
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
リクエストを並列に提供する
これまでの例では、協調的マルチタスクの並行性(async コードを使用)を、
プリエンプティブなマルチタスク(スレッドを使用)の代替として主に示してきました。
しかし、async コードとスレッドは相互に排他的ではありません。
この例では、for_each_concurrent は各接続を並行して処理しますが、同じスレッド上で処理します。
async-std クレートを使うと、タスクを別々のスレッド上に spawn することもできます。
handle_connection は Send であり、かつノンブロッキングでもあるため、async_std::task::spawn と一緒に使っても安全です。
これは次のようになります。
use async_std::task::spawn;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |stream| async move {
let stream = stream.unwrap();
spawn(handle_connection(stream));
})
.await;
}
これで、協調的マルチタスクの並行性とプリエンプティブなマルチタスクの両方を使って、複数のリクエストを同時に処理しています! 詳細については、マルチスレッドエグゼキューターに関するセクションを参照してください。
TCPサーバーのテスト
handle_connection 関数のテストに進みましょう。
まず、作業対象となる TcpStream が必要です。
エンドツーエンドテストや統合テストでは、コードをテストするために実際の TCP 接続を確立したい場合があります。
これを行う戦略の 1 つは、localhost のポート 0 でリスナーを開始することです。
ポート 0 は有効な UNIX ポートではありませんが、テストには利用できます。
オペレーティングシステムが空いている TCP ポートを選択してくれます。
代わりに、この例では接続ハンドラーの単体テストを書き、
それぞれの入力に対して正しいレスポンスが返されることを確認します。
単体テストを分離され、決定的なものに保つために、TcpStream をモックに置き換えます。
まず、テストしやすくするために handle_connection のシグネチャを変更します。
handle_connection は実際には async_std::net::TcpStream を必要としません。
必要なのは、async_std::io::Read、async_std::io::Write、および marker::Unpin を実装する任意の構造体です。
これを反映するように型シグネチャを変更すると、テスト用にモックを渡せるようになります。
use async_std::io::{Read, Write};
async fn handle_connection(mut stream: impl Read + Write + Unpin) {
次に、これらのトレイトを実装するモックの TcpStream を構築しましょう。
まず、1 つのメソッド poll_read を持つ Read トレイトを実装します。
モックの TcpStream には、読み取りバッファーにコピーされるデータを含め、
読み取りが完了したことを示すために Poll::Ready を返します。
use super::*;
use futures::io::Error;
use futures::task::{Context, Poll};
use std::cmp::min;
use std::pin::Pin;
struct MockTcpStream {
read_data: Vec<u8>,
write_data: Vec<u8>,
}
impl Read for MockTcpStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
let size: usize = min(self.read_data.len(), buf.len());
buf[..size].copy_from_slice(&self.read_data[..size]);
Poll::Ready(Ok(size))
}
}
Write の実装も非常によく似ていますが、
poll_write、poll_flush、poll_close の 3 つのメソッドを書く必要があります。
poll_write は入力データをモックの TcpStream にコピーし、完了したら Poll::Ready を返します。
モックの TcpStream をフラッシュまたはクローズするために行うべき処理はないため、poll_flush と poll_close は
単に Poll::Ready を返せば構いません。
impl Write for MockTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
self.write_data = Vec::from(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
最後に、モックは Unpin を実装する必要があります。これは、メモリ内の位置を安全に移動できることを示します。
ピン留めと Unpin トレイトの詳細については、ピン留めに関するセクションを参照してください。
impl Unpin for MockTcpStream {}
これで handle_connection 関数をテストする準備ができました。
初期データを含む MockTcpStream をセットアップした後、
#[async_std::main] を使用した方法と同様に、属性 #[async_std::test] を使用して handle_connection を実行できます。
handle_connection が意図したとおりに動作することを確認するために、初期内容に基づいて正しいデータが
MockTcpStream に書き込まれたことを確認します。
use std::fs;
#[async_std::test]
async fn test_handle_connection() {
let input_bytes = b"GET / HTTP/1.1\r\n";
let mut contents = vec![0u8; 1024];
contents[..input_bytes.len()].clone_from_slice(input_bytes);
let mut stream = MockTcpStream {
read_data: contents,
write_data: Vec::new(),
};
handle_connection(&mut stream).await;
let expected_contents = fs::read_to_string("hello.html").unwrap();
let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
assert!(stream.write_data.starts_with(expected_response.as_bytes()));
}
付録:本書の翻訳
英語以外の言語によるリソース。