Pin

async 関数またはブロックは、Future を実装し、すべてのローカル変数を含む型を作成することを思い出してください。これらの変数の一部は、他のローカル変数への参照(ポインタ)を保持できます。それらが常に有効であることを保証するために、その future は別のメモリ位置へ移動されてはなりません。

future 型がメモリ内で移動されるのを防ぐために、それは pin されたポインタを通してのみ poll できます。Pin は参照を包むラッパーであり、それが指しているインスタンスを別のメモリ位置へ移動させるあらゆる操作を禁止します。

// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0

use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{Duration, sleep};

// 作業項目。この場合は、指定された時間だけ sleep し、
// `respond_on` チャネルでメッセージを返します。
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender<u32>,
}

// キュー上の作業を待ち受けて実行するワーカー。
async fn worker(mut work_queue: mpsc::Receiver<Work>) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // 作業しているふりをします。
                work.respond_on
                    .send(work.input * 1000)
                    .expect("failed to send response");
                iterations += 1;
            }
            // TODO: 100ms ごとに反復回数を報告する
        }
    }
}

// 作業を要求し、その完了を待つリクエスター。
async fn do_work(work_queue: &mpsc::Sender<Work>, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work { input, respond_on: tx })
        .await
        .expect("failed to send on work queue");
    rx.await.expect("failed waiting for response")
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("work result for iteration {i}: {resp}");
    }
}
  • これは actor パターンの例だと気づいたかもしれません。actor は通常、ループ内で select! を呼び出します。

  • これは前のいくつかのレッスンのまとめにもなっているので、時間をかけて取り組んでください。

    • 素朴に _ = sleep(Duration::from_millis(100)) => { println!(..) }select! に追加してみてください。これは決して実行されません。なぜでしょうか?

    • 代わりに、その future を含む timeout_futloop の外側に追加します。

      #![allow(unused)]
      fn main() {
      // Copyright 2024 Google LLC
      // SPDX-License-Identifier: Apache-2.0
      
      let timeout_fut = sleep(Duration::from_millis(100));
      loop {
          select! {
              ..,
              _ = timeout_fut => { println!(..); },
          }
      }
      }
    • これでもまだ動作しません。コンパイラエラーに従い、move を回避するために select! 内の timeout_fut&mut を追加し、その後 Box::pin を使ってください。

      #![allow(unused)]
      fn main() {
      // Copyright 2024 Google LLC
      // SPDX-License-Identifier: Apache-2.0
      
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              ..,
              _ = &mut timeout_fut => { println!(..); },
          }
      }
      }
    • これはコンパイルできますが、いったんタイムアウトが期限切れになると、毎回の反復で Poll::Ready になります(この場合、fused future が役立ちます)。期限切れになるたびに timeout_fut をリセットするように更新してください。

      #![allow(unused)]
      fn main() {
      // Copyright 2024 Google LLC
      // SPDX-License-Identifier: Apache-2.0
      
      let mut timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
      loop {
          select! {
              _ = &mut timeout_fut => {
                  println!(..);
                  timeout_fut = Box::pin(sleep(Duration::from_millis(100)));
              },
          }
      }
      }
  • Box はヒープに割り当てます。場合によっては、std::pin::pin!(比較的最近安定化され、古いコードでは tokio::pin! がよく使われていました)も選択肢ですが、再代入される future に対して使うのは難しいです。

  • 別の選択肢として、pin をまったく使わず、100ms ごとに oneshot チャネルへ送信する別のタスクを spawn する方法もあります。

  • 自分自身へのポインタを含むデータは self-referential と呼ばれます。通常、Rust の borrow checker は self-referential なデータが移動されることを防ぎます。というのも、参照はその参照先のデータより長く生存できないからです。しかし、async ブロックや関数に対するコード変換は borrow checker によって検証されません。

  • Pin は参照を包むラッパーです。pin されたポインタを使うと、オブジェクトをその場所から移動できません。ただし、pin されていないポインタを通してであれば、依然として移動できます。

  • Future トレイトの poll メソッドは、インスタンスを参照するために &mut Self ではなく Pin<&mut Self> を使います。これが、pin されたポインタでしか呼び出せない理由です。