シングルスレッドサーバーからマルチスレッドサーバーへ
現時点では、このサーバーは各リクエストを順番に処理するため、最初の接続の処理が完了するまで2つ目の接続は処理されません。サーバーが受け取るリクエストが増えれば増えるほど、この逐次実行はますます最適ではなくなります。サーバーが処理に長い時間のかかるリクエストを受け取ると、たとえ新しいリクエストをすばやく処理できるとしても、その長いリクエストの処理が終わるまで後続のリクエストは待たなければなりません。これを修正する必要がありますが、その前にまず実際にその問題が起こる様子を見てみましょう。
遅いリクエストをシミュレートする
処理に時間のかかるリクエストが、現在のサーバー実装に対するほかのリクエストにどのような影響を与えるかを見てみましょう。リスト21-10では、/sleep へのリクエスト処理を、応答前にサーバーが5秒間スリープするようにして、遅いレスポンスをシミュレートしています。
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --snip--
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// --snip--
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --snip--
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ケースが3つになったので、if から match に切り替えました。文字列リテラル値に対してパターンマッチするには、request_line のスライスを明示的にマッチさせる必要があります。match は、等価比較メソッドが行うような自動の参照および参照外しを行わないためです。
最初のアームは、リスト21-9の if ブロックと同じです。2番目のアームは、/sleep へのリクエストにマッチします。そのリクエストを受け取ると、サーバーは正常なHTMLページを返す前に5秒間スリープします。3番目のアームは、リスト21-9の else ブロックと同じです。
このサーバーがどれほど原始的かがわかるでしょう。実際のライブラリであれば、複数のリクエストの認識をこれほど冗長でない方法で処理するはずです!
cargo run を使ってサーバーを起動してください。次に、2つのブラウザウィンドウを開きます。1つは http://127.0.0.1:7878、もう1つは http://127.0.0.1:7878/sleep です。前と同じように / のURIを数回入力すると、すばやく応答することがわかります。しかし、/sleep を入力してから / を読み込むと、/ は sleep が5秒間すべてスリープし終えるまで待ってから読み込まれることがわかります。
遅いリクエストの後ろにリクエストが滞留しないようにするために使える手法は複数あります。第17章で行ったように async を使う方法もその1つです。ここで実装するのは、スレッドプールです。
スレッドプールでスループットを向上させる
スレッドプール とは、タスクを処理する準備ができて待機している、生成済みスレッドの集まりです。プログラムが新しいタスクを受け取ると、そのタスクをプール内のいずれかのスレッドに割り当て、そのスレッドがタスクを処理します。プール内の残りのスレッドは、最初のスレッドが処理している間に到着するほかのタスクを処理できる状態のままです。最初のスレッドがタスクの処理を終えると、そのスレッドは待機中のスレッドのプールに戻され、新しいタスクを処理する準備が整います。スレッドプールを使うと、接続を並行して処理できるため、サーバーのスループットが向上します。
DoS攻撃から身を守るために、プール内のスレッド数は少ない数に制限します。もし受け取った各リクエストごとにプログラムが新しいスレッドを作成するようにしていたら、誰かがサーバーに1000万件のリクエストを送ることで、サーバーのリソースをすべて使い果たし、リクエスト処理を停止状態に追い込むという大混乱を引き起こせてしまうからです。
そこで、無制限にスレッドを生成するのではなく、固定数のスレッドをプール内で待機させます。到着したリクエストは、処理のためにプールへ送られます。プールは到着したリクエストのキューを維持します。プール内の各スレッドは、このキューから1件のリクエストを取り出して処理し、その後キューに別のリクエストを要求します。この設計により、最大で N 件のリクエストを同時に処理できます。ここで N はスレッド数です。各スレッドが長時間実行されるリクエストに応答している場合、後続のリクエストは依然としてキューに滞留する可能性がありますが、その状態に達するまでに処理できる長時間実行リクエストの数は増えています。
この手法は、Webサーバーのスループットを向上させる多くの方法のうちの1つにすぎません。ほかに検討できる選択肢としては、fork/joinモデル、シングルスレッドの async I/O モデル、マルチスレッドの async I/O モデルなどがあります。このトピックに興味があれば、ほかの解決策についてさらに調べて、実装してみることもできます。Rustのような低レベル言語であれば、これらの選択肢はすべて実現可能です。
スレッドプールの実装を始める前に、プールの使い方がどのように見えるべきかを考えてみましょう。コードを設計しようとしているときは、最初にクライアントインターフェースを書くことが、設計の指針になることがあります。コードのAPIを、自分が呼び出したい形になるように書き、その構造の中に機能を実装していくのです。まず機能を実装してから公開APIを設計するのではありません。
第12章のプロジェクトでテスト駆動開発を使ったのと同じように、ここではコンパイラ駆動開発を使います。まず使いたい関数を呼び出すコードを書き、その後コンパイラのエラーを見て、コードを動作させるために次に何を変更すべきかを判断します。ただしその前に、出発点として使わないことにした手法を見ておきましょう。
リクエストごとにスレッドを生成する
まず、接続ごとに新しいスレッドを作成するとしたら、コードがどのようになるかを見てみましょう。前に述べたように、スレッド数が無制限に増える可能性があるという問題があるため、これは最終的な計画ではありません。しかし、まず動作するマルチスレッドサーバーを作るための出発点にはなります。その後で改善としてスレッドプールを追加すれば、2つの解決策を対比するのも簡単になります。
リスト21-11は、for ループ内で各ストリームを処理するために新しいスレッドを生成するよう、main に加える変更を示しています。
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
第16章で学んだように、thread::spawn は新しいスレッドを作成し、その後その新しいスレッド内でクロージャ内のコードを実行します。このコードを実行して、ブラウザで /sleep を読み込み、その後さらに2つのブラウザタブで / を読み込むと、確かに / へのリクエストは /sleep の完了を待たなくてよいことがわかります。しかし、前に述べたように、制限なしに新しいスレッドを作り続けることになるため、これはいずれシステムを圧迫します。
また、第17章で、まさにこの種の状況こそ async と await が真価を発揮する場面だと学んだことも思い出してください。スレッドプールを構築しながら、async では何が異なり、何が同じように見えるかを考えてみてください。
有限個のスレッドを作成する
スレッドプールが、同様で親しみやすい方法で動作するようにしたいと考えています。そうすれば、スレッドからスレッドプールに切り替える際に、私たちの API を使うコードへ大きな変更を加える必要がありません。リスト 21-12 は、thread::spawn の代わりに使いたい ThreadPool 構造体の仮想的なインターフェイスを示しています。
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ThreadPool::new を使って、設定可能な数のスレッドを持つ新しいスレッドプールを作成します。この場合は 4 つです。続いて for ループの中で、pool.execute は thread::spawn と同様のインターフェイスを持ち、プールが各ストリームに対して実行すべきクロージャを受け取ります。pool.execute を実装して、そのクロージャを受け取り、実行のためにプール内のスレッドへ渡す必要があります。このコードはまだコンパイルできませんが、どう修正すべきかをコンパイラに導いてもらうために、まず試してみます。
コンパイラ駆動開発で ThreadPool を構築する
リスト 21-12 の変更を src/main.rs に加え、それから cargo check のコンパイラエラーを手がかりに開発を進めていきましょう。最初に得られるエラーは次のとおりです。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
すばらしいです!このエラーは、ThreadPool 型またはモジュールが必要だと教えてくれているので、ここでそれを作成しましょう。私たちの ThreadPool 実装は、Web サーバーがどのような仕事をしているかには依存しません。そこで、hello クレートをバイナリクレートからライブラリクレートへ切り替え、ThreadPool 実装をそこに置くことにしましょう。ライブラリクレートに変更すれば、Web リクエストを処理するためだけでなく、スレッドプールを使って行いたいあらゆる作業に、その独立したスレッドプールライブラリを利用できるようになります。
以下を含む src/lib.rs ファイルを作成してください。これは、現時点で用意できる最も単純な ThreadPool 構造体の定義です。
pub struct ThreadPool;
次に、main.rs ファイルを編集して、以下のコードを src/main.rs の先頭に追加し、ライブラリクレートから ThreadPool をスコープに持ち込みます。
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
このコードもまだ動きませんが、対処すべき次のエラーを得るために、もう一度確認してみましょう。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
このエラーは、次に ThreadPool のために new という関連関数を作成する必要があることを示しています。また、new は引数として 4 を受け取れる 1 つのパラメータを持ち、ThreadPool インスタンスを返す必要があることもわかっています。それらの性質を持つ最も単純な new 関数を実装してみましょう。
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
size パラメータの型として usize を選んだのは、スレッド数が負の値であることには意味がないとわかっているからです。また、この 4 はスレッドのコレクション内の要素数として使うこともわかっています。これは usize 型の用途であり、第 3 章の 「整数型」 の節で説明したとおりです。
もう一度コードを確認してみましょう。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
今度のエラーは、ThreadPool に execute メソッドがないために発生しています。「有限個のスレッドを作成する」 の節で、スレッドプールは thread::spawn に似たインターフェイスを持つべきだと決めたことを思い出してください。さらに、execute 関数は、与えられたクロージャを受け取り、それをプール内のアイドル状態のスレッドに渡して実行させるように実装します。
ThreadPool の execute メソッドは、パラメータとしてクロージャを受け取るように定義します。第 13 章の 「キャプチャした値をクロージャの外へムーブする」 で見たように、クロージャは 3 種類の異なるトレイト Fn、FnMut、FnOnce のいずれかとしてパラメータに取ることができます。ここでは、どの種類のクロージャを使うべきかを決める必要があります。最終的には標準ライブラリの thread::spawn 実装に似たことをすることになるので、thread::spawn のシグネチャがそのパラメータにどのような境界を課しているかを見てみましょう。ドキュメントには次のようにあります。
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
ここで関心があるのは型パラメータ F です。型パラメータ T は戻り値に関係しており、今回は関係ありません。spawn が F に対するトレイト境界として FnOnce を使っていることがわかります。これはおそらく私たちにも望ましいものです。というのも、最終的には execute で受け取った引数を spawn に渡すことになるからです。さらに、FnOnce が使いたいトレイトだとより確信できます。なぜなら、リクエストを処理するスレッドはそのリクエストのクロージャを 1 回だけ実行するので、これは FnOnce の Once と一致するからです。
型パラメータ F には、トレイト境界 Send とライフタイム境界 'static もあります。これらは今回の状況で有用です。クロージャをあるスレッドから別のスレッドへ移動するには Send が必要であり、またスレッドの実行にどれくらい時間がかかるかわからないため 'static が必要です。これらの境界を持つ型 F のジェネリックパラメータを受け取る execute メソッドを ThreadPool に作成しましょう。
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
FnOnce の後ろに引き続き () を使っているのは、この FnOnce が、パラメータを取らず、ユニット型 () を返すクロージャを表しているからです。関数定義と同様に、戻り値の型はシグネチャから省略できますが、パラメータがない場合でも括弧は必要です。
繰り返しますが、これは execute メソッドの最も単純な実装です。何もしませんが、私たちはただコードをコンパイル可能にしようとしているだけです。もう一度確認してみましょう。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
コンパイルできました!ただし、cargo run を試してブラウザでリクエストを行うと、この章の冒頭で見たものと同じエラーがブラウザに表示されることに注意してください。私たちのライブラリは、execute に渡されたクロージャをまだ実際には呼び出していません。
注: Haskell や Rust のような厳格なコンパイラを持つ言語について、「コードがコンパイルできるなら、動く」という言い回しを耳にすることがあるかもしれません。しかし、この言い回しは普遍的に正しいわけではありません。私たちのプロジェクトはコンパイルできますが、まったく何もしません! 実際の完全なプロジェクトを構築しているのであれば、この時点でユニットテストを書き始め、コードがコンパイルできることと、望んでいる振る舞いをすることの両方を確認するのがよいでしょう。
考えてみてください: ここで、クロージャの代わりに future を実行しようとしていたら、何が異なっていたでしょうか?
new でスレッド数を検証する
new と execute のパラメータについては、まだ何もしていません。これらの関数本体を、望んでいる振る舞いになるように実装していきましょう。まずは new について考えます。先ほど、size パラメータには符号なし型を選びました。負の数のスレッドを持つプールには意味がないからです。しかし、スレッド数が 0 のプールにも意味はありません。それでも 0 は完全に有効な usize です。ThreadPool インスタンスを返す前に、size が 0 より大きいことを確認するコードを追加し、assert! マクロを使って 0 を受け取った場合にはプログラムを panic させます。これはリスト 21-13 に示されています。
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
また、doc コメントを使って ThreadPool のドキュメントもいくつか追加しました。第 14 章で説明したように、関数が panic しうる状況を明示するセクションを追加するという、よいドキュメント作成の実践に従っている点に注目してください。cargo doc --open を実行し、ThreadPool 構造体をクリックして、new 用に生成されたドキュメントがどのように見えるか確認してみてください!
ここで行ったように assert! マクロを追加する代わりに、new を build に変更し、リスト 12-9 の I/O プロジェクトで Config::build に対して行ったのと同じように Result を返すこともできます。しかしこの場合、スレッドを 1 本も持たないスレッドプールを作ろうとすることは回復不能なエラーであると判断しました。意欲があれば、次のシグネチャを持つ build という名前の関数を書いて、new 関数と比較してみてください。
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
スレッドを格納するための領域を作成する
これで、プールに格納するスレッド数が有効であることを確認する方法ができたので、そのスレッドを作成し、構造体を返す前に ThreadPool 構造体へ格納できます。しかし、スレッドを「格納する」とはどういうことでしょうか? thread::spawn のシグネチャをもう一度見てみましょう。
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn 関数は JoinHandle<T> を返します。ここで T はクロージャが返す型です。JoinHandle も使ってみて、何が起こるか見てみましょう。今回の場合、スレッドプールに渡すクロージャは接続を処理し、何も返しません。そのため T はユニット型 () になります。
リスト 21-14 のコードはコンパイルできますが、まだスレッドは 1 本も作成しません。ThreadPool の定義を変更して thread::JoinHandle<()> インスタンスのベクタを保持するようにし、そのベクタを size の容量で初期化し、スレッドを作成するコードを実行する for ループを用意し、それらを含む ThreadPool インスタンスを返すようにしました。
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ライブラリクレート内で std::thread をスコープに持ち込みました。これは、ThreadPool 内のベクタの要素型として thread::JoinHandle を使っているためです。
有効な size を受け取ると、ThreadPool は size 個の要素を保持できる新しいベクタを作成します。with_capacity 関数は Vec::new と同じ役割を果たしますが、重要な違いがあります。それは、ベクタ内に領域を事前に確保することです。ベクタに size 個の要素を格納する必要があるとわかっているため、最初にこの確保を行うのは、要素が挿入されるたびに自分自身をリサイズする Vec::new を使うより、わずかに効率的です。
再び cargo check を実行すると、成功するはずです。
ThreadPool からスレッドへコードを送る
リスト 21-14 の for ループには、スレッドの作成に関するコメントを残していました。ここでは、実際にどのようにスレッドを作成するのかを見ていきます。標準ライブラリはスレッドを作成する方法として thread::spawn を提供しており、thread::spawn はスレッド作成直後にそのスレッドが実行すべきコードを受け取ることを想定しています。しかし今回の場合、私たちはスレッドを作成したうえで、あとから送るコードを 待機 させたいのです。標準ライブラリのスレッド実装にはそれを行う方法は含まれていないため、自分たちで実装する必要があります。
この振る舞いを実装するために、ThreadPool とスレッドの間に新しいデータ構造を導入し、この新しい振る舞いを管理させます。このデータ構造を Worker と呼ぶことにします。これはプーリング実装で一般的な用語です。Worker は実行すべきコードを受け取り、そのコードを自分のスレッド上で実行します。
レストランの厨房で働く人たちを考えてみてください。ワーカーたちは顧客から注文が入るまで待ち、注文が入ったらそれを受け取り、内容をこなす責任を負います。
スレッドプール内に JoinHandle<()> インスタンスのベクタを格納する代わりに、Worker 構造体のインスタンスを格納します。各 Worker は 1 つの JoinHandle<()> インスタンスを保持します。そして Worker にメソッドを実装し、実行すべきコードのクロージャを受け取って、それをすでに動作中のスレッドへ送って実行させるようにします。また、ログ出力やデバッグの際にプール内の異なる Worker インスタンスを区別できるよう、各 Worker に id も持たせます。
ThreadPool を作成するときに起こる新しい処理は次のとおりです。このように Worker を準備したあとで、クロージャをスレッドへ送るコードを実装します。
idとJoinHandle<()>を保持するWorker構造体を定義する。ThreadPoolがWorkerインスタンスのベクタを保持するように変更する。id番号を受け取り、そのidと空のクロージャで生成されたスレッドを保持するWorkerインスタンスを返すWorker::new関数を定義する。ThreadPool::newでforループのカウンタを使ってidを生成し、そのidを持つ新しいWorkerを作成して、ベクタに格納する。
挑戦してみたいなら、リスト 21-15 のコードを見る前に、これらの変更を自分で実装してみてください。
準備はできましたか? それでは、前述の変更を行う 1 つの方法を示したリスト 21-15 を見ていきましょう。
```rust,noplayground
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
# /// Create a new ThreadPool.
# ///
# /// The size is the number of threads in the pool.
# ///
# /// # Panics
# ///
# /// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
#
# pub fn execute<F>(&self, f: F)
# where
# F: FnOnce() + Send + 'static,
# {
# }
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool のフィールド名を threads から workers に変更しました。
これは、保持しているのが JoinHandle<()> のインスタンスではなく、
Worker のインスタンスになったためです。for ループ内のカウンタを
Worker::new の引数として使い、新しい Worker をそれぞれ workers という名前のベクタに格納します。
外部のコード(src/main.rs にある私たちのサーバーなど)は、
ThreadPool 内で Worker 構造体を使っている実装の詳細を知る必要が
ないため、Worker 構造体とその new 関数は非公開にします。Worker::new
関数は、与えられた id を使い、空のクロージャで新しいスレッドを生成して作られた
JoinHandle<()> インスタンスを保持します。
注: システムリソースが不足していてオペレーティングシステムがスレッドを 作成できない場合、
thread::spawnはパニックします。すると、一部の スレッドの作成には成功していても、サーバー全体がパニックすることに なります。単純化のため、この振る舞いでも問題ありませんが、本番用の スレッドプール実装では、代わりにstd::thread::Builderと そのResultを返すspawnメソッドを使いたく なるでしょう。
このコードはコンパイルでき、ThreadPool::new の引数として指定した数の
Worker インスタンスを格納します。しかし、私たちは まだ execute で受け取る
クロージャを処理していません。次に、それをどう行うかを見ていきましょう。
チャネルを介してスレッドにリクエストを送る
次に取り組む問題は、thread::spawn に渡しているクロージャが
まったく何もしないことです。現在、実行したいクロージャは execute
メソッドで受け取っています。しかし、ThreadPool の作成中に各 Worker
を生成するとき、thread::spawn に実行させるクロージャを渡す必要があります。
今作成した Worker 構造体には、ThreadPool が保持しているキューから
実行するコードを取り出し、そのコードをスレッドに送って実行させたいと
考えています。
第 16 章で学んだチャネルは、2 つのスレッド間で通信するための単純な方法であり、
このユースケースにぴったりです。ジョブのキューとして機能するようにチャネルを使い、
execute は ThreadPool から Worker インスタンスへジョブを送り、
Worker インスタンスはそのジョブを自分のスレッドに送ります。計画は次のとおりです。
ThreadPoolはチャネルを作成し、送信側を保持します。- 各
Workerは受信側を保持します。 - チャネルに送りたいクロージャを保持する新しい
Job構造体を作成します。 executeメソッドは、実行したいジョブを送信側経由で送ります。Workerは自分のスレッド内で受信側に対してループし、受け取った ジョブのクロージャを実行します。
まず、リスト 21-16 に示すように、ThreadPool::new でチャネルを作成し、
送信側を ThreadPool インスタンスに保持させましょう。Job 構造体は
今のところ何も保持しませんが、チャネルを通して送るアイテムの型になります。
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool::new では、新しいチャネルを作成し、プールに送信側を保持させます。
これは問題なくコンパイルできます。
次に、スレッドプールがチャネルを作成するときに、チャネルの受信側を
各 Worker に渡してみましょう。受信側は Worker インスタンスが
生成するスレッドの中で使いたいので、クロージャ内で receiver
引数を参照することになります。リスト 21-17 のコードは、まだ完全にはコンパイルしません。
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
小さくてわかりやすい変更をいくつか行いました。受信側を Worker::new に
渡し、その後クロージャの中でそれを使っています。
このコードをチェックしようとすると、次のエラーが出ます。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
コードは receiver を複数の Worker インスタンスに渡そうとしています。これは、
第 16 章で思い出せるように、うまくいきません。Rust が提供するチャネル実装は、
複数の producer、単一の consumer だからです。つまり、このコードを修正するために
チャネルの受信側を単純にクローンすることはできません。また、複数の consumer に
メッセージを複数回送りたいわけでもありません。必要なのは、複数の Worker
インスタンスがある 1 つのメッセージのリストであり、各メッセージが 1 回だけ
処理されるようにしたいのです。
さらに、チャネルキューからジョブを取り出すには receiver を変更する必要が
あるため、スレッドには receiver を安全に共有して変更する方法が必要です。
そうしないと、競合状態が発生する可能性があります(第 16 章で扱いました)。
第 16 章で説明したスレッドセーフなスマートポインタを思い出してください。
複数のスレッドで所有権を共有し、かつスレッドが値を変更できるようにするには、
Arc<Mutex<T>> を使う必要があります。Arc 型によって複数の Worker
インスタンスが受信側を所有できるようになり、Mutex は一度に 1 つの Worker
だけが受信側からジョブを取得できるようにします。リスト 21-18 に必要な変更を示します。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
ThreadPool::new では、受信側を Arc と Mutex に入れます。新しい
各 Worker について、参照カウントを増やすために Arc をクローンし、
Worker インスタンスが受信側の所有権を共有できるようにします。
これらの変更により、コードはコンパイルできます! だいぶ近づいてきました!
execute メソッドを実装する
最後に、ThreadPool の execute メソッドを実装しましょう。また、Job
を構造体から、execute が受け取るクロージャの型を保持するトレイトオブジェクトの
型エイリアスに変更します。第 20 章の 「型シノニムと型エイリアス」 節で
説明したように、型エイリアスを使うと、長い型を使いやすいように短くできます。
リスト 21-19 を見てください。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
execute で受け取ったクロージャを使って新しい Job インスタンスを作成した後、
そのジョブをチャネルの送信側から送ります。送信に失敗する場合に備えて、send に対して
unwrap を呼び出しています。たとえば、すべてのスレッドの実行を停止させて、
受信側が新しいメッセージを受け取らなくなった場合に、これが起こりえます。現時点では、
スレッドの実行を止めることはできません。プールが存在する限り、スレッドは実行を
続けます。unwrap を使っている理由は、この失敗ケースは起こらないとわかっている
からですが、コンパイラはそれを知らないからです。
ですが、まだ完全には終わっていません! Worker では、thread::spawn に渡して
いるクロージャが、依然としてチャネルの受信側を 参照しているだけ です。
代わりに、このクロージャが永遠にループし、チャネルの受信側にジョブを求め、
ジョブを受け取ったらそのジョブを実行する必要があります。Worker::new に、
リスト21-20に示す変更を加えましょう。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
ここでは、まず receiver に対して lock を呼び出して mutex を取得し、それから
エラーがあれば panic するために unwrap を呼び出します。mutex が ポイズン
状態にあると、ロックの取得は失敗する可能性があります。これは、ほかのスレッドが
ロックを解放せずに保持したまま panic した場合に起こり得ます。この状況では、
このスレッドも panic するように unwrap を呼び出すのが正しい対処です。必要で
あれば、この unwrap を、自分にとって意味のあるエラーメッセージを持つ
expect に変更して構いません。
mutex のロックを取得できたら、recv を呼び出してチャネルから Job を受け取り
ます。最後の unwrap でも、ここで起こり得るエラーがあれば panic します。これ
は、受信側がシャットダウンしたときに send メソッドが Err を返すのと同様に、
送信側を保持しているスレッドが停止している場合に発生し得ます。
recv の呼び出しはブロックするので、まだジョブがなければ、現在のスレッドは
ジョブが利用可能になるまで待機します。Mutex<T> により、ある時点でジョブを
要求しようとする Worker スレッドは1つだけであることが保証されます。
これでスレッドプールは動作する状態になりました! cargo run で実行して、
いくつかリクエストを送ってみましょう:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功です!これで、接続を非同期に処理するスレッドプールができました。 作成されるスレッドは4本を超えないので、サーバーが大量のリクエストを受け取っても、 システムに過負荷はかかりません。/sleep にリクエストしても、別の スレッドがそれらを実行できるため、サーバーはほかのリクエストを処理できます。
注: /sleep を複数のブラウザウィンドウで同時に開くと、5秒間隔で1つずつ 読み込まれることがあります。一部のWebブラウザは、キャッシュ上の理由から、 同じリクエストを複数回行う場合に順番に実行します。この制限は、私たちの Webサーバーが原因ではありません。
ここでいったん立ち止まって、リスト21-18、21-19、21-20のコードが、処理すべき 作業にクロージャではなく future を使っていたらどう違っていたかを考えるのは 良いタイミングです。どの型が変わるでしょうか。メソッドシグネチャは、変わると すればどのように違っていたでしょうか。コードのどの部分は同じままでしょうか。
第17章と第19章で while let ループについて学んだあとでは、Worker の
スレッドコードをなぜリスト21-21のように書かなかったのか、不思議に思うかも
しれません。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
このコードはコンパイルも実行もできますが、望んだスレッディングの振る舞いには
なりません。つまり、遅いリクエストがほかのリクエストの処理待ちを引き起こした
ままになります。その理由はやや微妙です。Mutex 構造体には公開された unlock
メソッドがありません。というのも、ロックの所有権は、lock メソッドが返す
LockResult<MutexGuard<T>> 内の MutexGuard<T> のライフタイムに基づいて
いるからです。そうすることで、コンパイル時に借用チェッカが、Mutex によって
保護されたリソースにはロックを保持しているときにしかアクセスできない、という
規則を強制できます。しかし、この実装では、MutexGuard<T> のライフタイムに
注意していないと、意図したよりも長くロックを保持してしまうこともあります。
let job = receiver.lock().unwrap().recv().unwrap(); を使っているリスト21-20のコードが
動くのは、let では、等号の右辺の式で使われた一時値が let 文の終わりで
直ちにドロップされるからです。しかし、while let(および if let と match)
は、対応するブロックの終わりまで一時値をドロップしません。リスト21-21では、
ロックは job() の呼び出し中ずっと保持されたままになるため、ほかの Worker
インスタンスはジョブを受け取れません。