Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

応用: エグゼキューターを構築する

Rust の Future は遅延評価されます。完了まで能動的に駆動されない限り、何もしません。future を完了まで駆動する方法の 1 つは、async 関数の中でそれを .await することですが、それは問題を 1 段階上に押し上げるだけです。トップレベルの async 関数から返される future は誰が実行するのでしょうか?答えは、Future エグゼキューターが必要だということです。

Future エグゼキューターは、トップレベルの Future の集合を受け取り、その Future が進行できるようになるたびに poll を呼び出すことで、それらを完了まで実行します。通常、エグゼキューターは最初に future を 1 回 poll して開始します。Futurewake() を呼び出して進行する準備ができたことを示すと、それらはキューに戻され、再び poll が呼び出されます。これを Future が完了するまで繰り返します。

このセクションでは、多数のトップレベル future を並行して完了まで実行できる、独自のシンプルなエグゼキューターを書きます。

この例では、Waker を簡単に構築する方法を提供する ArcWake トレイトのために、futures クレートに依存します。Cargo.toml を編集して、新しい依存関係を追加します。

[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2021"

[dependencies]
futures = "0.3"

次に、src/main.rs の先頭に以下のインポートが必要です。

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};
// The timer we wrote in the previous section:
use timer_future::TimerFuture;

私たちのエグゼキューターは、実行するタスクをチャネル経由で送信することで動作します。エグゼキューターはチャネルからイベントを取り出して実行します。タスクがさらに処理を行う準備ができた(起床された)とき、そのタスクは自分自身をチャネルに戻すことで、再び poll されるようにスケジュールできます。

この設計では、エグゼキューター自体に必要なのはタスクチャネルの受信側だけです。ユーザーは新しい future を spawn できるように、送信側を取得します。タスク自体は、自分自身を再スケジュールできる future にすぎないため、future と、そのタスクが自分自身をキューに戻すために使える sender のペアとして格納します。

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need to use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

新しい future を簡単に spawn できるように、spawner にメソッドも追加しましょう。このメソッドは future 型を受け取り、それを box 化し、その中に future を持つ新しい Arc<Task> を作成します。これはエグゼキューター上のキューに入れることができます。

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.try_send(task).expect("too many tasks queued");
    }
}

future を poll するには、Waker を作成する必要があります。 タスクのウェイクアップのセクションで説明したように、Wakerwake が呼び出された後にタスクが再び poll されるようスケジュールする責任を持ちます。Waker は、どのタスクが準備完了になったかをエグゼキューターに正確に伝えるため、エグゼキューターは進行する準備ができた future だけを poll できます。新しい Waker を作成する最も簡単な方法は、ArcWake トレイトを実装し、その後 waker_ref または .into_waker() 関数を使って Arc<impl ArcWake>Waker に変換することです。タスクを Waker に変換して起床できるようにするため、タスクに対して ArcWake を実装しましょう。

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .try_send(cloned)
            .expect("too many tasks queued");
    }
}

Arc<Task> から Waker が作成されると、その上で wake() を呼び出すことで、Arc のコピーがタスクチャネルへ送信されます。その後、エグゼキューターはそのタスクを受け取り、poll する必要があります。これを実装しましょう。

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if future.as_mut().poll(context).is_pending() {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

おめでとうございます!これで動作する futures エグゼキューターができました。これを使って、async/.await コードや、以前に書いた TimerFuture のようなカスタム future を実行することもできます。

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}