グレースフルシャットダウンとクリーンアップ
リスト 21-20 のコードは、意図したとおり、スレッドプールを使ってリクエストに非同期に応答しています。直接は使っていない workers、id、thread フィールドについていくつか警告が出ますが、これは何もクリーンアップしていないことを思い出させてくれます。あまり洗練されていない ctrl-C の方法でメインスレッドを停止すると、たとえリクエストの処理中であっても、ほかのすべてのスレッドも即座に停止します。
次に、Drop トレイトを実装して、プール内の各スレッドに対して join を呼び出し、クローズする前に処理中のリクエストを完了できるようにします。続いて、スレッドに新しいリクエストの受け付けを停止してシャットダウンすべきことを伝える方法を実装します。このコードが実際に動作するところを見るために、サーバーが 2 つのリクエストだけを受け付けたあとでスレッドプールをグレースフルにシャットダウンするように変更します。
途中で 1 つ注意しておくべきことがあります。これらはいずれもクロージャの実行を扱うコード部分には影響しないので、非同期ランタイムのためにスレッドプールを使っていたとしても、ここでの内容はすべて同じです。
ThreadPool に Drop トレイトを実装する
まずはスレッドプールに Drop を実装するところから始めましょう。プールがドロップされるとき、スレッドはすべて join して、作業を確実に完了させる必要があります。リスト 21-22 は Drop 実装の最初の試みを示しています。このコードは、まだ完全には動きません。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
まず、スレッドプールの各 workers を順にループします。ここで &mut を使うのは、self が可変参照であり、さらに worker も変更できる必要があるからです。各 worker について、この特定の Worker インスタンスがシャットダウン中であることを示すメッセージを表示し、その後でその Worker インスタンスのスレッドに対して join を呼び出します。join の呼び出しに失敗した場合は、unwrap を使って Rust にパニックを起こさせ、グレースフルでないシャットダウンに入ります。
このコードをコンパイルすると、次のエラーが表示されます。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
このエラーは、各 worker に対して可変借用しか持っておらず、join は引数の所有権を取るため、join を呼び出せないことを示しています。この問題を解決するには、thread を所有している Worker インスタンスからそのスレッドを取り出して、join がそのスレッドを消費できるようにする必要があります。これを行う方法の 1 つは、リスト 18-15 で取ったのと同じアプローチを使うことです。Worker が Option<thread::JoinHandle<()>> を保持していれば、Option に対して take メソッドを呼び出して、Some バリアントから値を取り出し、その場所に None バリアントを残せます。言い換えると、実行中の Worker では thread に Some バリアントが入り、Worker をクリーンアップしたいときには Some を None に置き換えることで、その Worker が実行すべきスレッドを持たないようにできるわけです。
しかし、これが問題になるのは Worker をドロップするときの 唯一の 場面です。その代わりに、worker.thread にアクセスするあらゆる場所で Option<thread::JoinHandle<()>> を扱わなければならなくなります。慣用的な Rust では Option はかなり多く使われますが、このような回避策として、常に存在すると分かっているものを Option で包んでいることに気づいたら、コードをよりクリーンでエラーが起こりにくくする別のアプローチを探すのがよい考えです。
この場合には、よりよい代替手段があります。それが Vec::drain メソッドです。これは、ベクタからどの要素を削除するかを指定する範囲パラメータを受け取り、それらの要素のイテレータを返します。.. という範囲構文を渡すと、ベクタから すべての 値が削除されます。
したがって、ThreadPool の drop 実装を次のように更新する必要があります。
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
}
これでコンパイラエラーは解消され、コードにほかの変更は必要ありません。なお、パニック中に drop が呼び出されることもあるため、unwrap もさらにパニックし、二重パニックを引き起こす可能性があります。そうなるとプログラムは即座にクラッシュし、進行中のクリーンアップも終了します。これはサンプルプログラムとしては問題ありませんが、本番コードでは推奨されません。
ジョブの受信を停止するようスレッドに通知する
ここまでの変更によって、コードは警告なしでコンパイルされるようになりました。しかし、悪い知らせとして、このコードはまだ私たちの望むようには動作しません。鍵になるのは、Worker インスタンスのスレッドによって実行されるクロージャ内のロジックです。現時点では join を呼び出していますが、それではスレッドはシャットダウンしません。なぜなら、それらはジョブを探して永遠に loop し続けるからです。現在の drop 実装のままで ThreadPool をドロップしようとすると、メインスレッドは最初のスレッドが終了するのを待って永遠にブロックされます。
この問題を修正するには、まず ThreadPool の drop 実装を変更し、そのあとで Worker ループも変更する必要があります。
まず、スレッドの終了を待つ前に sender を明示的にドロップするように、ThreadPool の drop 実装を変更します。リスト 21-23 は、sender を明示的にドロップするための ThreadPool への変更を示しています。スレッドの場合とは異なり、ここでは Option::take を使って sender を ThreadPool からムーブできるようにするために、実際に Option を使う必要があります。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
sender をドロップするとチャネルが閉じられ、それ以上メッセージが送られないことが示されます。そうなると、Worker インスタンスが無限ループ内で行っている recv の呼び出しはすべてエラーを返すようになります。リスト 21-24 では、その場合にループをグレースフルに抜けるよう Worker ループを変更しています。これにより、ThreadPool の drop 実装がそれらに対して join を呼び出したときに、スレッドは終了します。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
このコードが実際に動作するところを見るために、リスト 21-25 に示すように、サーバーをグレースフルにシャットダウンする前に main が 2 つのリクエストだけを受け付けるよう変更しましょう。
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
現実の Web サーバーを、わずか 2 つのリクエストを処理しただけでシャットダウンさせたいとは思わないでしょう。このコードは、グレースフルシャットダウンとクリーンアップが正しく機能していることを示すためのものです。
take メソッドは Iterator トレイトで定義されており、イテレーションを最大で最初の 2 項目に制限します。ThreadPool は main の終わりでスコープを抜け、drop 実装が実行されます。
cargo run でサーバーを起動し、3 回リクエストを送ってください。3 回目のリクエスト
はエラーになるはずで、ターミナルには次のような出力が表示されるはずです:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Worker の ID や表示されるメッセージの順序は、異なる場合があります。この
メッセージから、このコードがどのように動作しているかがわかります。Worker
インスタンス 0 と 3 が最初の 2 つのリクエストを受け取りました。サーバーは 2 つ目
の接続のあとで接続の受け付けを停止し、ThreadPool の Drop 実装は、Worker 3
が仕事を開始する前から実行を始めます。sender をドロップすると、すべての
Worker インスタンスが切断され、シャットダウンするよう通知されます。各
Worker インスタンスは切断されたときにそれぞれメッセージを表示し、そのあと
スレッドプールは join を呼び出して各 Worker スレッドの終了を待ちます。
この特定の実行には、1 つ興味深い点があります。ThreadPool が sender を
ドロップし、どの Worker もまだエラーを受け取っていない段階で、Worker 0 に
対して join を試みました。Worker 0 はまだ recv からエラーを受け取って
いなかったため、メインスレッドはブロックされ、Worker 0 が完了するのを待ち
ました。その間に、Worker 3 は仕事を受け取り、そのあとですべてのスレッドが
エラーを受け取りました。Worker 0 が完了すると、メインスレッドは残りの
Worker インスタンスが完了するのを待ちました。その時点で、それらはすべて
ループを抜けて停止していました。
おめでとうございます! これでプロジェクトは完了です。スレッドプールを使って 非同期に応答する基本的な Web サーバーができました。サーバーをグレースフルに シャットダウンできるため、プール内のすべてのスレッドをクリーンアップできます。
参考までに、完全なコードを示します:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
ここからさらに発展させることもできます! このプロジェクトを引き続き改善したい なら、次のようなアイデアがあります:
ThreadPoolとその公開メソッドに、さらにドキュメントを追加する。- ライブラリの機能に対するテストを追加する。
unwrapの呼び出しを、より堅牢なエラーハンドリングに変更する。- Web リクエストの処理以外の何らかのタスクを実行するために
ThreadPoolを使う。 - crates.io でスレッドプールクレートを探し、そのクレートを使って同様の Web サーバーを実装する。次に、その API と 堅牢性を、私たちが実装したスレッドプールと比較する。
まとめ
お疲れさまでした! ついに本書を最後まで読み終えました! Rust の旅に参加して くれて、ありがとうございます。これで、自分自身の Rust プロジェクトを実装したり、 ほかの人のプロジェクトを手伝ったりする準備が整いました。Rust の学習の道のりで どんな課題に出会っても、喜んで助けてくれる温かい Rustacean のコミュニティが あることを忘れないでください。