Waker によるタスクのウェイクアップ
Future が最初に poll された時点では完了できないことはよくあります。これが起きた場合、その Future は、さらに処理を進められる状態になった時点で再度 poll されるようにする必要があります。これは Waker 型によって行われます。
Future が poll されるたびに、それは「タスク」の一部として poll されます。タスクとは、Executor に送信されたトップレベルの Future です。
Waker は、関連付けられたタスクを起床させるべきであることを Executor に伝えるために使用できる wake() メソッドを提供します。wake() が呼び出されると、Executor は Waker に関連付けられたタスクが処理を進められる状態であり、その Future を再度 poll すべきであることを認識します。
Waker は clone() も実装しているため、コピーして持ち回ったり保存したりできます。
Waker を使って、簡単なタイマー Future を実装してみましょう。
実践: タイマーを構築する
この例では、タイマーが作成されたときに新しいスレッドを起動し、必要な時間だけスリープしてから、時間枠が経過したときにタイマー Future に通知するだけにします。
まず、cargo new --lib timer_future で新しいプロジェクトを開始し、作業を始めるために必要な import を src/lib.rs に追加します。
#![allow(unused)]
fn main() {
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
}
まず、Future 型そのものを定義するところから始めましょう。この Future には、タイマーが経過し、Future が完了すべきであることをスレッドが伝えるための方法が必要です。スレッドと Future の間で通信するために、共有された Arc<Mutex<..>> 値を使用します。
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
struct SharedState {
/// Whether or not the sleep time has elapsed
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
waker: Option<Waker>,
}
それでは、実際に Future 実装を書いてみましょう!
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
かなりシンプルですよね? スレッドが shared_state.completed = true を設定していれば、完了です! そうでなければ、現在のタスクの Waker を clone して shared_state.waker に渡し、スレッドがそのタスクを再び起床できるようにします。
重要なのは、Future が poll されるたびに Waker を更新しなければならないことです。なぜなら、その Future が異なる Waker を持つ別のタスクへ移動している可能性があるためです。これは、Future が poll された後にタスク間で受け渡される場合に起こります。
最後に、実際にタイマーを構築し、スレッドを開始するための API が必要です。
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
やりました! これで簡単なタイマー Future を構築するために必要なものはすべて揃いました。あとは、この Future を実行するための Executor さえあればいいのですが…