async と await
この章では、Rust で非同期プログラミングを始め、async キーワードと await キーワードを紹介します。
async は関数(および後で扱う trait など、その他の item)に付けるアノテーションです。await は式で使われる演算子です。しかし、これらのキーワードに入る前に、Rust における非同期プログラミングの中核となる概念をいくつか押さえる必要があります。これは前章での議論を受けたもので、ここでは物事を Rust プログラミングに直接結び付けて説明します。
Rust の非同期概念
ランタイム
非同期タスクは管理され、スケジュールされる必要があります。通常、利用可能なコア数よりもタスク数の方が多いため、すべてを同時に実行することはできません。あるタスクの実行が止まったら、別のタスクを選んで実行しなければなりません。タスクが IO や何らかのイベントを待っている場合、そのタスクはスケジュールされるべきではありませんが、それが完了したらスケジュールされるべきです。そのためには OS とやり取りし、IO 作業を管理する必要があります。
多くのプログラミング言語はランタイムを提供しています。一般に、このランタイムは非同期タスクの管理よりもはるかに多くのことを行います。メモリを管理する(ガベージコレクションを含む)こともあれば、例外処理で役割を持つこと、OS 上の抽象化レイヤーを提供すること、あるいは完全な仮想マシンであることさえあります。Rust は低レベル言語であり、ランタイムのオーバーヘッドを最小限にすることを目指しています。そのため、非同期ランタイムのスコープは、多くの他言語のランタイムよりもはるかに限定されています。また、非同期ランタイムを設計・実装する方法は多数あるため、Rust は 1 つを提供するのではなく、要件に応じて選択できるようにしています。これは、非同期プログラミングを始めるには追加の手順が必要になることを意味します。
タスクを実行してスケジュールするだけでなく、ランタイムは非同期 IO を管理するために OS とやり取りしなければなりません。また、タスクにタイマー機能を提供する必要もあります(これは IO 管理と交差します)。ランタイムがどのように構成されるべきかについて厳密な規則はありませんが、いくつかの用語と責任分担は一般的です。
- reactor、event loop、または driver(同等の用語): IO とタイマーイベントをディスパッチし、OS とやり取りし、実行を前進させる最も低レベルの駆動を行います。
- scheduler: タスクがいつ、どの OS スレッド上で実行できるかを決定します。
- executor または runtime: reactor と scheduler を組み合わせたもので、非同期タスクを実行するためのユーザー向け API です。runtime は機能全体のライブラリを意味する場合にも使われます(たとえば、
Runtime型で表される Tokio executor だけでなく、Tokio crate に含まれるすべてのもの)。
上で説明した executor に加えて、ランタイム crate には通常、多くのユーティリティ trait や関数が含まれています。これらには、trait(たとえば AsyncRead)や IO の実装、ネットワーキングやファイルシステムへのアクセスなどの一般的な IO タスクのための機能、ロック、チャネル、その他の同期プリミティブ、タイミング用ユーティリティ、OS と連携するためのユーティリティ(たとえばシグナル処理)、future や stream(非同期イテレータ)を扱うためのユーティリティ関数、監視や観測のためのツールなどが含まれる場合があります。このガイドでは、それらの多くを扱います。
選択できる非同期ランタイムは多数あります。スケジューリングポリシーが大きく異なるものや、特定のタスクやドメイン向けに最適化されているものもあります。このガイドの大部分では、Tokio ランタイムを使用します。これは汎用ランタイムであり、エコシステムで最も人気のあるランタイムです。入門にも本番環境での作業にも優れた選択肢です。状況によっては、別のランタイムを使うことで、より高いパフォーマンスを得られたり、より単純なコードを書けたりする場合があります。このガイドの後半では、利用可能な他のランタイムのいくつかと、なぜそれを選ぶのか、あるいは独自に書くのかについて説明します。
できるだけ早く動かし始めるには、少しのボイラープレートだけが必要です。Tokio crate を Cargo.toml の依存関係として含める必要があります(他の crate と同じです)。
[dependencies]
tokio = { version = "1", features = ["full"] }
そして、main 関数に tokio::main アノテーションを使い、非同期関数にできるようにします(Rust では通常これは許可されていません)。
#[tokio::main]
async fn main() { ... }
これで完了です!非同期コードを書く準備ができました!
#[tokio::main] アノテーションは Tokio ランタイムを初期化し、main 内のコードを実行するための非同期タスクを開始します。このガイドの後半では、このアノテーションが何をしているのか、そしてこれを使わずに非同期コードを使う方法(より柔軟性が得られます)について、より詳しく説明します。
Futures-rs とエコシステム
TODO コンテキストと歴史、futures-rs が何のためのものか - よく使われていたが、おそらく今は必要ない、Tokio や他のランタイムとの重複(時には微妙な意味論上の違いを伴う)、なぜ必要になるか(future を直接扱う、特に独自に書く場合、stream、いくつかのユーティリティ)
その他のエコシステム関連 - Yosh の crate、代替ランタイム、実験的なもの、その他?
Future とタスク
Rust における非同期並行処理の基本単位は future です。future は、‘Future’ trait を実装する、ごく普通の Rust オブジェクト(通常は struct または enum)です。future は遅延された計算を表します。つまり、将来のある時点で準備が整う計算です。
このガイドでは future について多く説明しますが、最初はあまり気にしすぎずに始めるのが最も簡単です。次のいくつかのセクションでは future にかなり触れますが、実際に定義したり直接使ったりするのは後になります。future の重要な側面の 1 つは、それらを組み合わせて新しい、より「大きな」future を作れることです(それらをどのように組み合わせられるかについては、後でさらに詳しく説明します)。
前章とこの章では、「非同期タスク」という用語を非形式的な形でかなり使ってきました。この用語は、実行の論理的な並び、つまりスレッドに似ているが OS によって外部から管理されるのではなく、プログラム内で管理されるもの、という意味で使ってきました。タスクという観点で考えることはしばしば有用ですが、Rust 自体にはタスクという概念はなく、この用語は異なる意味で使われます!これは混乱を招きます!さらに悪いことに、ランタイムにはタスクの概念があり、ランタイムごとにタスクの概念が少しずつ異なります。
ここから先では、タスクに関する用語を正確に使うようにします。単に「task」と言う場合、他のタスクと並行して発生し得る計算の並びという抽象概念を意味します。「async task」はまったく同じ意味で使いますが、OS スレッドとして実装されたタスクとの対比として使います。「runtime’s task」は、ランタイムが想定する何らかの種類のタスクを意味し、「tokio task」(またはその他の特定のランタイム)は Tokio におけるタスクの考え方を意味します。
Rust における非同期タスクは単なる future です(通常は、多くの他の future を組み合わせて作られた「大きな」future です)。言い換えると、タスクとは実行される future です。ただし、future がランタイムのタスクではなく「実行」される場合があります。この種の future は直感的には task ですが、runtime’s task ではありません。その例に到達したときに、これをさらに詳しく説明します。
非同期関数
async キーワードは関数宣言に付ける修飾子です。たとえば、pub async fn send_to_server(...) のように書けます。async 関数とは、単に async キーワードを使って宣言された関数のことです。これは、非同期に実行できる関数であること、言い換えれば、呼び出し元が何か他のことを行う前にその関数の完了を待たないことを選択できるという意味です。
より機械的に言うと、async 関数が呼び出されたとき、その本体は通常の関数の場合のようには実行されません。代わりに、関数本体とその引数が future にパッケージ化され、実際の結果の代わりに返されます。呼び出し元はその future をどう扱うかを決めることができます(呼び出し元が結果を「すぐに」欲しい場合は、その future を await します。次のセクションを参照してください)。
async 関数の内部では、コードは通常どおり逐次的に実行されます1。async であることによる違いはありません。async 関数から同期関数を呼び出すことができ、実行は通常どおり進みます。async 関数の内部で追加でできることの 1 つは、await を使って他の async 関数(または future)を待機することです。これにより、別のタスクが実行できるように制御が明け渡される場合があります。
await
future は、将来のある時点で準備が整う計算であると前述しました。その計算の結果を得るには、await キーワードを使います。結果がすぐに準備できている場合、または待たずに計算できる場合、await は単にその計算を行って結果を生成します。しかし、結果がまだ準備できていない場合、await はスケジューラーに制御を渡し、別のタスクが進行できるようにします(これは前の章で触れた協調的マルチタスクです)。
Rust では、await を使う構文は some_future.await です。つまり、. 演算子とともに使われる後置キーワードです。そのため、メソッド呼び出しやフィールドアクセスのチェーンの中で使いやすくなっています。これは、Python や JavaScript のように、await some_function() のような式の前に置く前置演算子として await を使う言語とは対照的です。
後置 await がしばしばより使いやすい理由を見るために、ネットワークリクエストを行う async 関数を呼び出し、レスポンスのステータスコードにアクセスしたいとします。前置 await 構文では、fetch() の前に await を付け、? でエラーを伝播するために式を括弧で囲み、その後でステータスコードにアクセスする必要があります。たとえば (await fetch())?.status_code のようになります。後置構文では、fetch().await?.status_code と書けます。これは、より長いチェーンで特に役立ちます。たとえば、2 つの前置 await を含む式は (await (await fetch())?.json())?.data のようになりますが、後置の同等の式は fetch().await?.json().await?.data であり、より自然に読めます。
それでは、実際に async と await がどのように使われるかを見てみましょう。次の関数を考えてください。
#![allow(unused)]
fn main() {
// async 関数ですが、何かを待つ必要はありません。
async fn add(a: u32, b: u32) -> u32 {
a + b
}
async fn wait_to_add(a: u32, b: u32) -> u32 {
sleep(1000).await;
a + b
}
}
add(15, 3).await を呼び出すと、結果 18 がすぐに返されます。wait_to_add(15, 3).await を呼び出すと、最終的には同じ答えが得られますが、待っている間に別のタスクが実行される機会を得ます。
この単純な例では、sleep の呼び出しは、結果を待たなければならない長時間実行される何らかのタスクの代役です。これは通常、結果が外部ソースから読み取られたデータであるか、外部の宛先への書き込みが成功したことの確認であるような IO 操作です。読み取りは let data = read(...).await? のようになります。この場合、await によって現在のタスクは読み取りが行われている間待機します。読み取りが完了すると、そのタスクは再開されます(読み取りタスクが待機している間、他のタスクが作業を進めることができます)。読み取りの結果は、正常に読み取られたデータか、エラー(? によって処理されます)です。
.await を使わずに add、wait_to_add、または read を呼び出しても、答えは何も得られないことに注意してください!
どういうことでしょうか?
async 関数を呼び出すと future が返されます。関数内のコードがすぐに実行されるわけではありません。さらに、future は await されるまで何の作業も行いません2。これは、async 関数が future を返し、その future がすぐに実行を開始する一部の他の言語とは対照的です。
これは Rust における async プログラミングの重要な点です。しばらくすると自然に身につきますが、初心者、特に他の言語で async プログラミングの経験がある人は、ここでつまずくことがよくあります。
Rust の future に関する重要な直感は、それらが不活性なオブジェクトであるということです。何らかの作業を進めるには、外部の力(通常は async ランタイム)によって駆動されなければなりません。
ここまで await をかなり操作的に説明してきました(future を実行し、結果を生成するものとして)。しかし前の章では async タスクと並行性について話しました。await はそのメンタルモデルにどのように当てはまるのでしょうか? まず、純粋な逐次コードを考えてみましょう。論理的には、関数を呼び出すことは、単にその関数内のコードを実行することです(変数への何らかの代入を伴います)。言い換えれば、現在のタスクは、その関数によって定義される次の「かたまり」のコードを実行し続けます。同様に、async コンテキストでは、非 async 関数を呼び出すと、単にその関数で実行が継続されます。async 関数を呼び出すと、実行すべきコードは見つかりますが、それは実行されません。await は、現在のタスクの実行を継続する演算子であり、現在のタスクが今は継続できない場合には、別のタスクに継続する機会を与える演算子です。
await は async コンテキストの内部でのみ使用できます。現時点では、これは async 関数の内部を意味します(後で、より多くの種類の async コンテキストを見ていきます)。その理由を理解するには、await がランタイムに制御を渡し、別のタスクが実行できるようにする可能性があることを思い出してください。制御を渡す先となるランタイムが存在するのは async コンテキストの中だけです。今のところ、ランタイムは async 関数の中でだけアクセス可能なグローバル変数のようなものだと想像してかまいません。実際にどのように動作するかは後で説明します。
最後に、await についてもう 1 つ別の見方を示します。先ほど、future は組み合わせて「より大きな」future を作ることができると述べました。async 関数は future を定義する方法の 1 つであり、await は future を組み合わせる方法の 1 つです。future に対して await を使うと、その future は、それが使われている async 関数によって生成される future に組み込まれます。この見方や、future を組み合わせる他の方法については、後でより詳しく説明します。
async/await の例
まずは、私たちの「hello, world!」の例を再訪するところから始めましょう。
// Define an async function.
async fn say_hello() {
println!("hello, world!");
}
#[tokio::main] // Boilerplate which lets us write `async fn main`, we'll explain it later.
async fn main() {
// Call an async function and await its result.
say_hello().await;
}
これで、main の周囲にあるボイラープレートを認識できるはずです。これは Tokio ランタイムを初期化し、async な main 関数を実行するための最初のタスクを作成するためのものです。
say_hello は async 関数です。これを呼び出すときは、現在のタスクの一部として実行するために、その呼び出しに続けて .await を付ける必要があります。.await を取り除くと、プログラムを実行しても何も起こらないことに注意してください!say_hello を呼び出すと future が返されますが、それは実行されないため、println は呼び出されません(少なくともコンパイラは警告してくれます)。
これは、Tokio tutorial から取った、もう少し現実的な例です。
#[tokio::main]
async fn main() -> Result<()> {
// mini-redis アドレスへの接続を開きます。
let mut client = client::connect("127.0.0.1:6379").await?;
// キー "hello" に値 "world" を設定します
client.set("hello", "world".into()).await?;
// キー "hello" を取得します
let result = client.get("hello").await?;
println!("got value from the server; result={:?}", result);
Ok(())
}
このコードは少し面白くなっていますが、本質的には同じことをしています。async 関数を呼び出し、その結果を実行するために await しています。今回はエラーハンドリングに ? を使用しています。これは同期 Rust の場合とまったく同じように機能します。
ここまで並行性、並列性、非同期性についていろいろ話してきましたが、これら 2 つの例はいずれも 100% 逐次的です。async 関数を呼び出して await するだけでは、await しているタスクが待機している間にスケジュールできる他のタスクがない限り、並行性は一切導入されません。これを自分たちで確認するために、もう 1 つ単純な(ただし作為的な)例を見てみましょう。
use std::io::{stdout, Write};
use tokio::time::{sleep, Duration};
async fn say_hello() {
print!("hello, ");
// Flush stdout so we see the effect of the above `print` immediately.
stdout().flush().unwrap();
}
async fn say_world() {
println!("world!");
}
#[tokio::main]
async fn main() {
say_hello().await;
// An async sleep function, puts the current task to sleep for 1s.
sleep(Duration::from_millis(1000)).await;
say_world().await;
}
“hello” と “world” を表示する間に、現在のタスクを 1 秒間スリープさせます3。プログラムを実行すると何が起こるかを観察してください。“hello” が表示され、1 秒間何も起こらず、その後 “world” が表示されます。これは、単一のタスクの実行が純粋に逐次的だからです。もし並行性があれば、その 1 秒の仮眠は、“world” を表示するような他の作業を行う絶好の機会になるでしょう。次のセクションで、その方法を見ていきます。
タスクを生成する
ここまで、async と await を async タスクでコードを実行する方法として説明してきました。また、await は IO やその他のイベントを待っている間、現在のタスクをスリープさせることができる、と述べました。そのとき、別のタスクが実行できますが、それらの他のタスクはどのように生まれるのでしょうか?新しいタスクを生成するために std::thread::spawn を使うのと同じように、新しい async タスクを生成するために tokio::spawn を使うことができます。spawn は Rust の標準ライブラリではなく、ランタイムである Tokio の関数であることに注意してください。なぜなら、タスクは純粋にランタイム上の概念だからです。
spawn を使って async 関数を別のタスク上で実行する小さな例を示します。
use tokio::{spawn, time::{sleep, Duration}};
async fn say_hello() {
// Wait for a while before printing to make it a more interesting race.
sleep(Duration::from_millis(100)).await;
println!("hello");
}
async fn say_world() {
sleep(Duration::from_millis(100)).await;
println!("world!");
}
#[tokio::main]
async fn main() {
spawn(say_hello());
spawn(say_world());
// Wait for a while to give the tasks time to run.
sleep(Duration::from_millis(1000)).await;
}
前の例と同様に、“hello” と “world!” を表示する 2 つの関数があります。しかし今回は、逐次的ではなく並行に(そして並列に)実行します。プログラムを何度か実行すると、文字列がどちらの順序でも表示されるはずです。あるときは “hello” が先、あるときは “world!” が先です。古典的な並行レースです!
ここで何が起こっているのかを詳しく見ていきましょう。関係している概念は 3 つあります。future、タスク、スレッドです。spawn 関数は future(思い出してください、これは多数のより小さな future から構成されている場合があります)を受け取り、それを新しい Tokio タスクとして実行します。タスクは、Tokio ランタイムがスケジュールし管理する概念です(個々の future ではありません)。Tokio(デフォルト設定の場合)はマルチスレッドランタイムです。つまり、新しいタスクを生成すると、そのタスクは生成元のタスクとは異なる OS スレッドで実行される可能性があります(同じスレッドで実行される場合もありますし、あるスレッドで開始されてから後で別のスレッドに移動される場合もあります)。
したがって、future がタスクとして生成されると、それは生成元のタスクや他のタスクと並行に実行されます。また、異なるスレッドにスケジュールされれば、それらのタスクと並列に実行されることもあります。
まとめると、Rust で 2 つの文を続けて書いた場合、それらは(async コードであるかどうかにかかわらず)逐次的に実行されます。await を書いても、逐次的な文の並行性は変わりません。たとえば、foo(); bar(); は厳密に逐次的です。foo が呼び出され、その後 bar が呼び出されます。これは foo と bar が async 関数であるかどうかに関係なく成り立ちます。foo().await; bar().await; も厳密に逐次的です。foo が完全に評価され、その後 bar が完全に評価されます。どちらの場合も、別のスレッドがその逐次実行に割り込むように実行される可能性があります。また 2 つ目の場合は、await ポイントで別の async タスクが割り込むように実行される可能性があります。しかし、どちらの場合も、2 つの文は互いの関係において逐次的に実行されます。
thread::spawn または tokio::spawn のいずれかを使用すると、並行性と、場合によっては並列性が導入されます。前者ではスレッド間で、後者ではタスク間で導入されます。
このガイドの後半では、future を並行に実行するものの、決して並列には実行しないケースを見ていきます。
タスクを join する
生成したタスクの実行結果を取得したい場合、生成元のタスクはその終了を待って結果を使用できます。これをタスクを join すると呼びます(スレッドの join と類似しており、join のための API も似ています)。
タスクが生成されると、spawn 関数は JoinHandle を返します。タスクに独自に実行させたいだけであれば、JoinHandle は破棄できます(JoinHandle を drop しても、生成されたタスクには影響しません)。しかし、生成元のタスクが、生成されたタスクの完了を待ってから結果を使用したい場合は、JoinHandle を await することでそれを行えます。
例として、‘Hello, world!’ の例をもう一度見直してみましょう。
use tokio::{spawn, time::{sleep, Duration}};
async fn say_hello() {
// Wait for a while before printing to make it a more interesting race.
sleep(Duration::from_millis(100)).await;
println!("hello");
}
async fn say_world() {
sleep(Duration::from_millis(100)).await;
println!("world");
}
#[tokio::main]
async fn main() {
let handle1 = spawn(say_hello());
let handle2 = spawn(say_world());
let _ = handle1.await;
let _ = handle2.await;
println!("!");
}
コードは前回と似ていますが、単に spawn を呼び出す代わりに、返された JoinHandle を保存し、後でそれらを await しています。main 関数を終了する前にそれらのタスクの完了を待っているため、main 内の sleep はもう必要ありません。
spawn された2つのタスクは依然として並行に実行されています。プログラムを何度か実行すれば、両方の順序を確認できるはずです。ただし、await された join ハンドルは並行性に対する制限になります。最後の感嘆符(‘!’)は必ず最後に出力されます(await に対する println!("!"); の位置を動かして試してみることができます。観察可能な効果を得るには、おそらく sleep 時間も変更する必要があるでしょう)。
最初の spawn の JoinHandle を保存して後で await するのではなく、すぐに await した場合(つまり、spawn(say_hello()).await; と書いた場合)、‘hello’ future を実行するための別のタスクは spawn されますが、spawn した側のタスクは他のことを行う前に、それが完了するまで待機することになります。言い換えると、並行性が生じる可能性はありません!このようなことはほとんど望ましくありません(spawn する意味があるでしょうか?単に逐次コードを書けばよいのです)。
JoinHandle
JoinHandle について、もう少し詳しく簡単に見ていきます。JoinHandle を await できるという事実は、JoinHandle 自体が future であることを示す手がかりです。spawn は async 関数ではなく、future(JoinHandle)を返す通常の関数です。future を返す前に(タスクをスケジュールするための)処理をいくらか行います(async future とは異なります)。そのため、spawn を await する必要はありません。JoinHandle を await すると、spawn されたタスクが完了するまで待機し、その後で結果を返します。上の例では結果はなく、単にタスクが完了するのを待っていただけです。JoinHandle はジェネリック型であり、その型パラメーターは spawn されたタスクが返す型です。上の例では型は JoinHandle<()> になります。String を結果として返す future であれば、型が JoinHandle<String> の JoinHandle が生成されます。
JoinHandle を await すると Result が返されます(上の例で let _ = ... を使ったのはそのためで、未使用の Result に関する警告を避けています)。spawn されたタスクが正常に完了した場合、タスクの結果は Ok バリアントに入ります。タスクが panic した、または abort された場合(キャンセルの一種)、結果は JoinError のドキュメント を含む Err になります。プロジェクトで abort によるキャンセルを使用していない場合、JoinHandle.await の結果を unwrap するのは妥当なアプローチです。これは、spawn されたタスクから spawn したタスクへ panic を実質的に伝播させることになるためです。
-
他のスレッドと同様に、async 関数が実行されているスレッドはオペレーティングシステムによってプリエンプトされ、一時停止されることがあります。その間に別のスレッドが作業を進められるようになります。ただし、関数の観点からは、他のスレッドによって変更された可能性のあるデータ(現在のスレッドが一時停止されていなくても、並行して実行されている別のスレッドによって変更されていた可能性のあるデータ)を調べない限り、これは観測できません。 ↩
-
または poll されることです。これは
awaitよりも低レベルの操作であり、awaitを使うときに舞台裏で行われます。future について詳しく話すときに、poll についても後で説明します。 ↩ -
ここでは async sleep 関数を使用していることに注意してください。もし std の
sleepを使用すると、スレッド全体をスリープさせることになります。このおもちゃの例では違いはありませんが、実際のプログラムでは、その時間中にそのスレッド上で他のタスクをスケジュールできないことを意味します。これは非常に悪いことです。 ↩