キャンセル

future をドロップすると、それ以降その future が再び poll されることは ありません。これを キャンセル と呼び、これは任意の await ポイントで 発生し得ます。future がキャンセルされた場合でもシステムが正しく動作する ように、注意が必要です。たとえば、デッドロックしたりデータを失ったりしては いけません。

// Copyright 2024 Google LLC
// SPDX-License-Identifier: Apache-2.0

use std::io;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};

struct LinesReader {
    stream: DuplexStream,
}

impl LinesReader {
    fn new(stream: DuplexStream) -> Self {
        Self { stream }
    }

    async fn next(&mut self) -> io::Result<Option<String>> {
        let mut bytes = Vec::new();
        let mut buf = [0];
        while self.stream.read(&mut buf[..]).await? != 0 {
            bytes.push(buf[0]);
            if buf[0] == b'\n' {
                break;
            }
        }
        if bytes.is_empty() {
            return Ok(None);
        }
        let s = String::from_utf8(bytes)
            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
        Ok(Some(s))
    }
}

async fn slow_copy(source: String, mut dest: DuplexStream) -> io::Result<()> {
    for b in source.bytes() {
        dest.write_u8(b).await?;
        tokio::time::sleep(Duration::from_millis(10)).await
    }
    Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let (client, server) = tokio::io::duplex(5);
    let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));

    let mut lines = LinesReader::new(server);
    let mut interval = tokio::time::interval(Duration::from_millis(60));
    loop {
        tokio::select! {
            _ = interval.tick() => println!("tick!"),
            line = lines.next() => if let Some(l) = line? {
                print!("{}", l)
            } else {
                break
            },
        }
    }
    handle.await.unwrap()?;
    Ok(())
}
  • コンパイラはキャンセル安全性については助けてくれません。API ドキュメントを読み、async fn がどのような状態を保持しているかを 考える必要があります。

  • panic? と異なり、キャンセルは(エラーハンドリングでは なく)通常の制御フローの一部です。

  • この例では、文字列の一部が失われます。

    • tick() 分岐が先に完了するたびに、next() とその buf が ドロップされます。

    • buf を構造体の一部にすれば、LinesReader をキャンセル安全に できます:

      #![allow(unused)]
      fn main() {
      // Copyright 2024 Google LLC
      // SPDX-License-Identifier: Apache-2.0
      
      struct LinesReader {
          stream: DuplexStream,
          bytes: Vec<u8>,
          buf: [u8; 1],
      }
      
      impl LinesReader {
          fn new(stream: DuplexStream) -> Self {
              Self { stream, bytes: Vec::new(), buf: [0] }
          }
          async fn next(&mut self) -> io::Result<Option<String>> {
              // buf と bytes に self. を付ける。
              // ...
              let raw = std::mem::take(&mut self.bytes);
              let s = String::from_utf8(raw)
                  .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?;
              // ...
          }
      }
      }
  • Interval::tick は、tick がすでに「通知済み」かどうかを追跡しているため、 キャンセル安全です。

  • AsyncReadExt::read は、復帰するか、まったくデータを読み取らないかのどちらかであるため、 キャンセル安全です。

  • AsyncBufReadExt::read_line はこの例と似ており、キャンセル安全ではありません。詳細と代替手段に ついては、そのドキュメントを参照してください。