接続を並行して処理する
これまでのコードの問題は、listener.incoming() がブロッキングなイテレーターであることです。
listener が着信接続を待っている間、エグゼキューターは他の Future を実行できず、
前の接続の処理が完了するまで、新しい接続を処理できません。
これを修正するために、listener.incoming() をブロッキングな Iterator から
ノンブロッキングな Stream に変換します。Stream は Iterator と似ていますが、非同期に消費できます。
詳細については、Stream に関する章を参照してください。
ブロッキングな std::net::TcpListener をノンブロッキングな async_std::net::TcpListener に置き換え、
接続ハンドラーを更新して async_std::net::TcpStream を受け取るようにしましょう。
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
TcpListener の非同期版は、listener.incoming() に対して Stream トレイトを実装しており、
この変更には 2 つの利点があります。
1 つ目は、listener.incoming() がエグゼキューターをブロックしなくなることです。
処理すべき着信 TCP 接続がない間、
エグゼキューターは他の保留中の Future に譲ることができるようになります。
2 つ目の利点は、Stream の要素を、Stream の for_each_concurrent メソッドを使って
任意で並行に処理できることです。
ここでは、このメソッドを活用して各着信リクエストを並行して処理します。
futures クレートから Stream トレイトをインポートする必要があるため、Cargo.toml は次のようになります。
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
これで、クロージャ関数を通じて handle_connection を渡すことで、各接続を並行して処理できます。
クロージャ関数は各 TcpStream の所有権を取得し、新しい TcpStream が利用可能になり次第実行されます。
handle_connection がブロックしない限り、遅いリクエストが他のリクエストの完了を妨げることはなくなります。
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
リクエストを並列に提供する
これまでの例では、協調的マルチタスクの並行性(async コードを使用)を、
プリエンプティブなマルチタスク(スレッドを使用)の代替として主に示してきました。
しかし、async コードとスレッドは相互に排他的ではありません。
この例では、for_each_concurrent は各接続を並行して処理しますが、同じスレッド上で処理します。
async-std クレートを使うと、タスクを別々のスレッド上に spawn することもできます。
handle_connection は Send であり、かつノンブロッキングでもあるため、async_std::task::spawn と一緒に使っても安全です。
これは次のようになります。
use async_std::task::spawn;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |stream| async move {
let stream = stream.unwrap();
spawn(handle_connection(stream));
})
.await;
}
これで、協調的マルチタスクの並行性とプリエンプティブなマルチタスクの両方を使って、複数のリクエストを同時に処理しています! 詳細については、マルチスレッドエグゼキューターに関するセクションを参照してください。