Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

エグゼキューターとシステム IO

The Future Trait の前のセクションでは、ソケットに対して非同期読み取りを実行する future の次の例について説明しました。

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

この future はソケットで利用可能なデータを読み取り、利用可能なデータがない場合は エグゼキューターに制御を譲り、ソケットが再び読み取り可能になったときにそのタスクを 起こすよう要求します。しかし、この例からは Socket 型がどのように実装されているのか、 特に set_readable_callback 関数がどのように動作するのかは明らかではありません。 ソケットが読み取り可能になった時点で wake() が呼び出されるようにするには、 どうすればよいでしょうか。1 つの選択肢として、socket が読み取り可能かどうかを 継続的に確認し、適切なタイミングで wake() を呼び出すスレッドを用意する方法があります。 しかし、これは非常に非効率で、ブロックされた IO future ごとに個別のスレッドが必要になります。 これにより、非同期コードの効率は大きく低下します。

実際には、この問題は IO を認識するシステムのブロッキングプリミティブとの統合によって 解決されます。たとえば、Linux の epoll、FreeBSD と Mac OS の kqueue、 Windows の IOCP、Fuchsia の port などです(これらはすべて、クロスプラットフォームの Rust クレート mio を通じて公開されています)。これらのプリミティブはいずれも、 スレッドが複数の非同期 IO イベントでブロックし、そのうちの 1 つが完了した時点で 戻ることを可能にします。実際には、これらの API は通常、次のような形になります。

struct IoBlocker {
    /* ... */
}

struct Event {
    // 発生し、監視対象だったイベントを一意に識別する ID。
    id: usize,

    // 待機するシグナル、または発生したシグナルの集合。
    signals: Signals,
}

impl IoBlocker {
    /// ブロック対象となる非同期 IO イベントの新しいコレクションを作成する。
    fn new() -> Self { /* ... */ }

    /// 特定の IO イベントへの関心を表明する。
    fn add_io_event_interest(
        &self,

        /// イベントが発生する対象のオブジェクト
        io_object: &IoObject,

        /// `io_object` 上に現れる可能性があり、
        /// イベントをトリガーすべきシグナルの集合。
        /// その関心から生じるイベントに与える ID と対にする。
        event: Event,
    ) { /* ... */ }

    /// いずれかのイベントが発生するまでブロックする。
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// たとえばソケット 1 が読み取り可能になった場合、"Socket 1 is now READABLE" と表示する。
println!("Socket {:?} is now {:?}", event.id, event.signals);

Future エグゼキューターは、これらのプリミティブを使用して、特定の IO イベントが 発生したときに実行されるコールバックを設定できるソケットのような非同期 IO オブジェクトを 提供できます。上記の SocketRead の例の場合、Socket::set_readable_callback 関数は 次の擬似コードのようになるかもしれません。

impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` はローカルエグゼキューターへの参照です。
        // これはソケットの作成時に提供することもできますが、実際には
        // 多くのエグゼキューター実装が利便性のためにスレッドローカル
        // ストレージを通じて渡します。
        let local_executor = self.local_executor;

        // この IO オブジェクトの一意な ID。
        let id = self.id;

        // IO イベントが到着した時点で呼び出せるように、
        // ローカルの waker をエグゼキューターのマップに保存します。
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

これで、任意の IO イベントを受信し、適切な Waker にディスパッチできる エグゼキュータースレッドを 1 つだけ持てるようになります。その Waker は対応するタスクを 起こし、エグゼキューターはさらに多くのタスクを完了まで進めてから、再びより多くの IO イベントを確認するために戻ることができます(そしてこのサイクルが続きます…)。