キャンセル
future をドロップすると、それ以降その future が再び poll されることは ありません。これを キャンセル と呼び、これは任意の await ポイントで 発生し得ます。future がキャンセルされた場合でもシステムが正しく動作する ように、注意が必要です。たとえば、デッドロックしたりデータを失ったりしては いけません。
// Copyright 2024 Google LLC // SPDX-License-Identifier: Apache-2.0 use std::io; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream}; struct LinesReader { stream: DuplexStream, } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream } } async fn next(&mut self) -> io::Result<Option<String>> { let mut bytes = Vec::new(); let mut buf = [0]; while self.stream.read(&mut buf[..]).await? != 0 { bytes.push(buf[0]); if buf[0] == b'\n' { break; } } if bytes.is_empty() { return Ok(None); } let s = String::from_utf8(bytes) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?; Ok(Some(s)) } } async fn slow_copy(source: String, mut dest: DuplexStream) -> io::Result<()> { for b in source.bytes() { dest.write_u8(b).await?; tokio::time::sleep(Duration::from_millis(10)).await } Ok(()) } #[tokio::main] async fn main() -> io::Result<()> { let (client, server) = tokio::io::duplex(5); let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client)); let mut lines = LinesReader::new(server); let mut interval = tokio::time::interval(Duration::from_millis(60)); loop { tokio::select! { _ = interval.tick() => println!("tick!"), line = lines.next() => if let Some(l) = line? { print!("{}", l) } else { break }, } } handle.await.unwrap()?; Ok(()) }
-
コンパイラはキャンセル安全性については助けてくれません。API ドキュメントを読み、
async fnがどのような状態を保持しているかを 考える必要があります。 -
panicや?と異なり、キャンセルは(エラーハンドリングでは なく)通常の制御フローの一部です。 -
この例では、文字列の一部が失われます。
-
tick()分岐が先に完了するたびに、next()とそのbufが ドロップされます。 -
bufを構造体の一部にすれば、LinesReaderをキャンセル安全に できます:#![allow(unused)] fn main() { // Copyright 2024 Google LLC // SPDX-License-Identifier: Apache-2.0 struct LinesReader { stream: DuplexStream, bytes: Vec<u8>, buf: [u8; 1], } impl LinesReader { fn new(stream: DuplexStream) -> Self { Self { stream, bytes: Vec::new(), buf: [0] } } async fn next(&mut self) -> io::Result<Option<String>> { // buf と bytes に self. を付ける。 // ... let raw = std::mem::take(&mut self.bytes); let s = String::from_utf8(raw) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "not UTF-8"))?; // ... } } }
-
-
Interval::tickは、tick がすでに「通知済み」かどうかを追跡しているため、 キャンセル安全です。 -
AsyncReadExt::readは、復帰するか、まったくデータを読み取らないかのどちらかであるため、 キャンセル安全です。 -
AsyncBufReadExt::read_lineはこの例と似ており、キャンセル安全ではありません。詳細と代替手段に ついては、そのドキュメントを参照してください。