Future トレイト
Future トレイトは、Rust における非同期プログラミングの中心にあります。
Future は、値を生成できる非同期計算です
(ただし、その値は空である場合もあります。例: ())。future トレイトの
簡略化した バージョンは、次のようになるかもしれません。
#![allow(unused)]
fn main() {
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
}
Future は poll 関数を呼び出すことで進めることができ、この関数は future を
可能な限り完了に近づけるよう駆動します。future が完了した場合は、
Poll::Ready(result) を返します。future がまだ完了できない場合は、
Poll::Pending を返し、Future がさらに進行できる状態になったときに
wake() 関数が呼び出されるようにします。wake() が呼び出されると、
その Future を駆動しているエグゼキューターは再び poll を呼び出し、
Future がさらに進行できるようにします。
wake() がなければ、エグゼキューターは特定の future がいつ進行できるのかを
知る手段がなく、すべての future を常にポーリングし続ける必要があります。
wake() があれば、エグゼキューターはどの future が poll される準備が
できているのかを正確に把握できます。
たとえば、すでにデータが利用可能かもしれないし、そうでないかもしれない
ソケットから読み取りたい場合を考えてみましょう。データがあれば、それを
読み込んで Poll::Ready(data) を返せますが、データがまだ準備できていなければ、
私たちの future はブロックされ、それ以上進行できなくなります。データが
利用できない場合は、ソケット上でデータが準備できたときに wake が
呼び出されるよう登録しなければなりません。これにより、私たちの future が
進行できる状態になったことをエグゼキューターに伝えます。単純な SocketRead
future は、次のようになるかもしれません。
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it.
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data.
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
この Future のモデルにより、中間のアロケーションを必要とせずに複数の
非同期操作を合成できます。複数の future を同時に実行したり、future を
連鎖させたりすることは、次のようなアロケーション不要のステートマシンとして
実装できます。
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
// Each field may contain a future that should be run to completion.
// If the future has already completed, the field is set to `None`.
// This prevents us from polling a future after it has completed, which
// would violate the contract of the `Future` trait.
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// Attempt to complete future `a`.
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// Attempt to complete future `b`.
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// Both futures have completed -- we can return successfully
Poll::Ready(())
} else {
// One or both futures returned `Poll::Pending` and still have
// work to do. They will call `wake()` when progress can be made.
Poll::Pending
}
}
}
これは、個別のアロケーションを必要とせずに複数の future を同時に実行でき、 より効率的な非同期プログラムを可能にすることを示しています。同様に、複数の 逐次的な future を次々に実行することもできます。次のようになります。
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// We've completed the first future -- remove it and start on
// the second!
Poll::Ready(()) => self.first.take(),
// We couldn't yet complete the first future.
// Notice that we disrupt the flow of the `poll` function with the `return` statement.
Poll::Pending => return Poll::Pending,
};
}
// Now that the first future is done, attempt to complete the second.
self.second.poll(wake)
}
}
これらの例は、複数のアロケートされたオブジェクトや深くネストしたコールバックを
必要とせずに、Future トレイトを使って非同期の制御フローを表現できることを
示しています。基本的な制御フローについてはこれで済んだので、実際の Future
トレイトと、それがどのように異なるのかについて話しましょう。
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
最初に気づく変更点は、self の型がもはや &mut Self ではなく、
Pin<&mut Self> に変わっていることです。ピン留めについては後のセクションで
さらに詳しく説明しますが、今は、これによって移動不能な future を作成できるように
なると理解しておいてください。移動不能なオブジェクトは、自身のフィールド間に
ポインターを格納できます。例: struct MyFut { a: i32, ptr_to_a: *const i32 }。
ピン留めは async/await を可能にするために必要です。
次に、wake: fn() は &mut Context<'_> に変わりました。SimpleFuture では、
関数ポインター(fn())の呼び出しを使って、対象の future をポーリングすべきである
ことを future エグゼキューターに伝えていました。しかし、fn() は単なる関数
ポインターであるため、どの Future が wake を呼び出したのかに関する
データを格納できません。
現実世界のシナリオでは、Web サーバーのような複雑なアプリケーションは、
wake up をそれぞれ個別に管理すべき数千もの異なる接続を持つ場合があります。
Context 型は、特定のタスクを wake up するために使用できる Waker 型の値への
アクセスを提供することで、これを解決します。