Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

IO とブロッキングの問題

IO(input/output、入出力)を効率的に処理することは、async プログラミングの主な動機の 1 つであり、ほとんどの async プログラムは大量の IO を行います。根本的に、IO の問題は、計算よりも桁違いに時間がかかることです。そのため、他の作業を進める代わりに IO の完了をただ待つのは非常に非効率です。理想的には、async プログラミングによって、プログラムは IO を待っている間に他の作業を進められます。

この章は、async コンテキストにおける IO の入門です。ブロッキング IO とノンブロッキング IO の重要な違い、およびブロッキング IO と async プログラミングが(少なくとも少し考慮と工夫をしない限り)相性が悪い理由を扱います。ノンブロッキング IO の使い方を扱い、その後、IO と async プログラミングで発生し得るいくつかの問題を見ていきます。また、OS が IO をどのように扱うかを見て、io_uring のようないくつかの代替 IO 手法も少し覗いてみます。

最後に、async タスクをブロックする(これは悪いことです)その他の方法と、async プログラミングをブロッキング IO や長時間実行される CPU 集約的なコードと適切に組み合わせる方法を扱います。

ブロッキング IO とノンブロッキング IO

IO はオペレーティングシステムによって実装されます。IO の作業は、別のプロセスや専用ハードウェア、またはいずれの場合もプログラムのプロセス外で行われます。IO は同期または非同期(それぞれブロッキングおよびノンブロッキングとも呼ばれます)のいずれかです。同期 IO とは、IO が行われている間、プログラム(または少なくともスレッド)が待機(別名ブロック)し、IO が完了して結果が OS から受け取られるまで処理を開始しないことを意味します。非同期 IO とは、IO が行われている間もプログラムが処理を進められ、後で結果を取得できることを意味します。どちらの種類の IO にも多くの異なる OS API がありますが、非同期の領域の方が種類は豊富です。

非同期 IO と非同期プログラミングは本質的に結び付いているわけではありません。しかし、async プログラミングは使いやすく高性能な async IO を可能にし、それが async プログラミングの大きな動機になっています。同期 IO によるブロッキングは、async プログラミングにおけるパフォーマンス問題の主要な原因であり、これを避けるよう注意しなければなりません(これについては後述します)。

Rust の標準ライブラリには、ブロッキング IO のための関数とトレイトが含まれています。ノンブロッキング IO には、専用のライブラリを使用する必要があります。これらは多くの場合、async ランタイムの一部です。たとえば Tokio の io モジュールがあります。

例を簡単に見てみましょう(Tokio のドキュメントから改変):

#![allow(unused)]
fn main() {
use tokio::{io::AsyncWriteExt, net::TcpStream};

async fn write_hello() -> Result<(), Box<dyn std::error::Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    stream.write_all(b"hello world!").await?;

    Ok(())
}
}

write_all は、stream にデータを書き込む async IO メソッドです。これは即座に完了することもありますが、より可能性が高いのは完了までに多少時間がかかる場合です。そのため、stream.write_all(...).await によって、OS が書き込みを処理するのを待つ間、現在のタスクは一時停止されます。スケジューラは他のタスクを実行し、書き込みが完了すると、そのタスクを起こして処理を続行するようスケジュールします。

しかし、標準ライブラリの書き込み関数を使った場合、async スケジューラは関与せず、OS は IO が完了するまでスレッド全体を一時停止します。つまり、現在のタスクが一時停止されるだけでなく、そのスレッドを使って他のタスクを実行することもできません。これがランタイムのスレッドプール内のすべてのスレッドで発生した場合(状況によってはスレッドが 1 つだけの場合もあります)、プログラム全体が停止し、処理を進められなくなります。これはスレッド(またはプログラム)をブロックすると呼ばれ、パフォーマンスにとって非常に悪いものです。async プログラムではスレッドを決してブロックしないことが重要であり、したがって async タスク内でブロッキング IO を使うことは避けるべきです。

スレッドのブロックは、ブロッキング IO だけでなく、長時間実行されるタスクやロックを待つタスクによっても引き起こされることがあります。これについては、この章の最後でさらに説明します。

繰り返し読み取りや書き込みを行うのは一般的なパターンであり、ストリームとシンク(別名 async イテレータ)はそのための便利な仕組みです。これらについては専用の章で扱います。

読み取りと書き込み

TODO

  • async の Read と Write トレイト
    • ランタイムの一部
  • 使い方
  • 具体的な実装
    • ネットワークとディスク
      • tcp、udp
      • ファイルシステムは実際には async ではないが、io_uring(その章への参照)
    • 実践的な例
    • stdout など
    • pipe、fd など

メモリ管理

データを読み取るときは、それをどこかに置く必要があり、データを書き込むときは、書き込みが完了するまでどこかに保持しておく必要があります。どちらの場合でも、そのメモリがどのように管理されるかが重要です。

TODO

  • バッファ管理と async IO に関する問題
  • さまざまな解決策と長所・短所
    • ゼロコピーアプローチ
    • 共有バッファアプローチ
  • これを支援するユーティリティクレート、Bytes など

IO に関する高度なトピック

TODO

  • buf read/write
  • Read + Write、split、join
  • copy
  • simplex と duplex
  • キャンセル
  • 同期 IO を行わなければならない場合はどうするか?スレッドを spawn するか、spawn_blocking を使う(下記参照)

OS から見た IO

TODO

  • さまざまな種類の IO と仕組み、completion IO、高度なセクションの completion IO の章への参照
    • 異なるランタイムがこれを容易にできる
    • 低レベルインターフェースとしての mio

その他のブロッキング操作

章の冒頭で述べたように、スレッドをブロックしないことは async プログラムのパフォーマンスにとって極めて重要です。さまざまな種類のブロッキング IO はブロックを引き起こす一般的な方法ですが、大量の計算を行ったり、async スケジューラが調整していない方法で待機したりすることによってもブロックする可能性があります。

待機は、多くの場合、async を意識していない同期機構を使うことで発生します。たとえば、async mutex ではなく std::sync::Mutex を使う場合や、非 async チャネルを待つ場合です。この問題については、チャネル、ロック、同期の章で説明します。ブロッキング的な方法で待機してしまう他の方法もあり、一般にはノンブロッキングまたは何らかの形で async に適した仕組みを見つける必要があります。たとえば、標準ライブラリのものではなく async の sleep 関数を使うことです。待機はビジーウェイト(実質的には何の作業もせずにただループすること、別名スピンロック)である場合もありますが、おそらくこれは避けるべきです。

CPU 集約的な処理

長時間実行される(つまり CPU 集約的または CPU バウンドな)処理を行うと、スケジューラが他のタスクを実行できなくなります。これは一種のブロッキングです。ただし、少なくともプログラムが何らかの進捗をしているため、IO でブロックしたり待機したりするほど悪いものではありません。しかし(注意と配慮なしには)、何らかの尺度(たとえばテールレイテンシ)でパフォーマンスにとって最適ではない可能性が高く、実行できないタスクが特定の時刻に実行される必要があった場合には、正しさの問題にもなり得ます。CPU 集約的な処理には async Rust(または Tokio のような汎用 async ランタイム)を単純に使うべきではない、というミームがありますが、それは過度の単純化です。正しいのは、特別な扱いなしに IO バウンドなタスクと CPU バウンドなタスク(より正確には、長時間実行されるタスクとレイテンシに敏感なタスク)を混在させて、うまくいくと期待することはできない、ということです。 このセクションの残りでは、レイテンシに敏感なタスクと、長時間実行される CPU 負荷の高いタスクが混在しているものとします。レイテンシに敏感なものが何もない場合は、状況は少し異なります(たいていはより簡単です)。

長時間実行されるタスクやブロッキングするタスクを実行するための解決策は、基本的に 3 つあります。ランタイムの組み込み機能を使う、別スレッドを使う、または別のランタイムを使う、です。

Tokio では、ブロックする可能性のあるタスクを生成するために spawn_blocking を使えます。これはタスクを生成するための spawn のように機能しますが、ブロックする可能性のあるタスク向けに最適化された別のスレッドプールでタスクを実行します(そのタスクはおそらく専用のスレッドで実行されます)。これは通常の同期コードを実行するものであり、非同期タスクではない点に注意してください。つまり、そのタスクはキャンセルできません(その JoinHandleabort メソッドがあってもです)。他のランタイムも同様の機能を提供しています。

この例では、標準ライブラリの同期ファイルシステム関数を呼び出すことで、spawn_blocking を使ってブロッキング I/O を実行しています。なお、tokio::fs も存在し、非同期ファイルシステム API を提供しています。ただし、内部的にはこれも spawn_blocking でラップされたブロッキング操作を使っています。

use tokio;

#[tokio::main]
async fn main() {
    let contents = tokio::task::spawn_blocking(|| {
		std::fs::read_to_string("file.txt").unwrap()
    })
	.await
	.unwrap();

	// contents を使って何かを行う
}

spawn_blocking で生成されたタスクは中止できないため、最終的に完了する作業を対象としています。受信リクエストを待ち受けるサーバーのように無期限にブロックする可能性のあるタスクは、Tokio のブロッキング用スレッドプールのスレッドを長時間占有しないよう、専用スレッドで実行するほうが適しています。これは std::thread::spawn または同様の API で作成できます。

大量のタスクを実行する必要がある場合は、おそらく何らかのスレッドプールやワークスケジューラーが必要になります。スレッドを生成し続け、利用可能なコア数を大きく上回る数になると、スループットを犠牲にすることになります。Rayon は、並列タスクの実行と管理を容易にする人気のある選択肢です。ワークロードにより特化していたり、実行されるタスクに関する知識を持っていたりするものを使うと、より良いパフォーマンスが得られるかもしれません。

以下は、Rayon を Tokio と併用する例です。Rayon によって生成されたタスクと Tokio 内の現在のタスクとの間で結果をやり取りするために、tokio::oneshot::channel を利用しています。

use rayon::prelude::*;

#[tokio::main]
async fn main() {
    let data = 1..=10;

    let (send, recv) = tokio::sync::oneshot::channel();
    // 現在のタスクをブロックしないように rayon 上でタスクを生成する
    std::thread::spawn(move || {
        // rayon の並列イテレーターを使って結果を並列に計算する
        let results = data.into_par_iter().map(compute).collect::<Vec<_>>();
        // 結果を Tokio に送り返す。
        send.send(results).unwrap();
    });

    // rayon タスクを待機して結果を取得する
    let results = recv.await.unwrap();
    println!("Results: {:?}", results);
}

fn compute(input: u64) -> u64 {
    // 大量の整数を合計することで、
    // CPU 負荷の高い計算をシミュレートする。
    let mut sum = 0u64;
    for i in 0..100_000_000 {
        sum = sum.wrapping_add(i * i);
    }
    sum % input
}

レイテンシに敏感なタスクと、長時間実行されるタスクとで、非同期ランタイムの別々のインスタンスを使うことができます。これは CPU バウンドなタスクに適していますが、長時間実行されるタスク用のランタイム上であっても、ブロッキング IO は使うべきではありません。CPU バウンドなタスクに対しては、長時間実行されるタスクを非同期タスクにできる唯一の解決策であるという点で、これは優れた解決策です。また柔軟でもあります(ランタイムは実行するタスクの種類に最適化されるよう設定できるためです。実際、最適なパフォーマンスを得るにはランタイム設定にある程度の労力をかける必要があります)。さらに、Tokio のような成熟した、よく設計されたサブシステムを使う恩恵も受けられます。2 つの異なる非同期ランタイムを使うことさえできます。いずれの場合でも、ランタイムは異なるスレッド上で実行されなければなりません。

一方で、もう少し考える必要があります。正しいランタイム上でタスクを実行していることを確実にしなければならず(これは見た目より難しい場合があります)、タスク間の通信は複雑になることがあります。同期コンテキストと非同期コンテキストの間の同期については次に説明しますが、複数の非同期ランタイム間ではさらに厄介になることがあります。各ランタイムはそれぞれ独自の小さなタスクの宇宙であり、スケジューラーは完全に独立しています。Tokio のチャネルやロックは、異なるランタイム(Tokio 以外のランタイムであっても)から使うことができますが、他のランタイムのプリミティブはこのようには動作しない場合があります。

各ランタイム内のスケジューラーは他のランタイムを認識しておらず(また OS はどの非同期スケジューラーも認識していないため)、スケジューリングの調整や共有された優先順位付けはなく、ランタイム間でワークを盗むこともできません。そのため、タスクのスケジューリングは最適でない場合があります(特にランタイムがワークロードに合わせて十分にチューニングされていない場合)。さらに、すべてのスケジューリングは協調的であるため、長時間実行されるタスクが依然としてリソース不足に陥ることがあり、レイテンシが悪化する可能性があります。長時間実行されるタスクをより協調的にする方法については、次のセクションを参照してください。

純粋なスケジューラーとして見ると、CPU 作業に Tokio を使うことは、専用の同期ワーカープールよりもわずかに高いオーバーヘッドを持つ可能性があります。非同期プログラミングをサポートするために必要な追加の作業を考えれば、これは驚くことではありません。ほとんどのユーザーにとって実際には問題になりにくいですが、コードが極めてパフォーマンスに敏感な場合は考慮する価値があるかもしれません。

上記のいずれの解決策でも、異なるコンテキスト(同期と非同期、または異なる非同期ランタイム)でタスクが実行されることになります。タスク間で通信する必要がある場合は、同期プリミティブと非同期プリミティブ(チャネル、ミューテックスなど)の正しい組み合わせと、それらのプリミティブ上の正しい(ブロッキングまたはノンブロッキングの)メソッドを使っていることに注意する必要があります。ミューテックスや同様のロックについては、await ポイントをまたいでロックを保持する必要がある場合や、IO リソースを保護する必要がある場合には、おそらく非同期版を使うべきです(ブロッキングロックメソッドを使うことで、同期コンテキストからも利用できるはずです)。一方、データを保護する場合や、await ポイントをまたいでロックを保持する必要がない場合には、同期版を使うべきです。Tokio の非同期チャネルは、ブロッキングメソッドを使うことで同期コンテキストから利用できますが、同期チャネルと非同期チャネルをいつ使うべきかについての詳細は、これらのドキュメントを参照してください。

では、上記の解決策のうちどれを使うべきでしょうか?

  • ブロッキングIOを行っている場合は、おそらく spawn_blocking を使うべきです。2つ目のランタイムや他のスレッドプールは使用できません(少なくとも最適なパフォーマンスが必要な場合)。
  • 永久に実行されるスレッドがある場合は、どのようなスレッドプールも使うのではなく、std::thread::spawn を使うべきです(プールのスレッドの1つを使い切ってしまうため)。
  • 大量の CPU処理を行っている場合は、専用のものか2つ目の非同期ランタイムのいずれかのスレッドプールを使うべきです。
  • 長時間実行される非同期コードを実行する必要がある場合は、2つ目のランタイムを使うべきです。
  • より複雑な解決策のほうが最適であっても、簡単で十分なパフォーマンスが得られるという理由で、専用スレッドや spawn_blocking を使うことを選ぶかもしれません。

Yieldすること

長時間実行されるコードが問題になるのは、スケジューラに他のタスクをスケジュールする機会を与えないためです。非同期並行処理は協調的です。スケジューラは、別のタスクを実行するためにタスクをプリエンプトできません。長時間実行されるタスクがスケジューラにyieldしない場合、スケジューラはそれを停止できません。しかし、長時間実行されるコードがスケジューラにyieldする場合は、他のタスクをスケジュールできるため、タスクが長時間実行されることは問題になりません。これは、CPU集約的な処理に別のスレッドを使う代わりとして、またはCPU集約的な処理をそれ専用のランタイムで行って(場合によっては)パフォーマンスを向上させるために使用できます。

yieldするのは簡単で、ランタイムのyield関数を呼び出すだけです。Tokioではそれが yield_now です。これは、標準ライブラリの yield_now とも、コルーチンからyieldするための yield キーワードとも異なることに注意してください。現在のfutureが select または join の中で実行されている場合、yield_now を呼び出してもスケジューラにはyieldしません(futureを並行に合成する章を参照)。それが望ましい動作かどうかは状況によります。

いつyieldする必要があるかを判断するのは少し難しくなります。まず、自分のプログラムが暗黙的にyieldしているかどうかを知る必要があります。これは .await でのみ発生し得るため、await していないならyieldしていません。しかし、awaitは自動的にスケジューラへyieldするわけではありません。それが起こるのは、await されている末端のfutureがpending(readyではない)である場合、または呼び出しスタックのどこかに明示的な yield がある場合だけです。Tokioやほとんどの非同期ランタイムは、IO関数や同期関数でこれを行いますが、一般的には、デバッグするかソースコードを調べない限り、ある await がyieldするかどうかは分かりません。

経験則として、コードは潜在的なyieldポイントに到達せずに10〜100マイクロ秒を超えて実行されるべきではありません。

参考資料