アクターパターン
Tokio を用いたアクターパターン(ハンドル/アクター/メッセージ)
Arc<Mutex<T>> を使わずに共有可変状態を管理する一般的なパターンの 1 つが、Alice Ryhl によって広く知られるようになったアクターパターンです。アクターは自身のデータを所有する struct であり、単一の spawn されたタスク内で実行されます。クローン可能な ハンドル は mpsc::Sender を保持し、公開 API となります。メッセージ は、アクターが処理できるすべてのコマンドを記述する enum です。
データにアクセスするのは常に 1 つのタスクだけであるため、ロックは不要です。リクエスト–レスポンスの組は、メッセージバリアントに埋め込まれた oneshot チャネルを使用します。
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Error)]
enum ActorError {
#[error("failed to send message to actor")]
Send(#[from] mpsc::error::SendError<Message>),
#[error("actor dropped response channel")]
Recv(#[from] oneshot::error::RecvError),
#[error("task failed")]
Join(#[from] tokio::task::JoinError),
}
// The Message enum represents commands sent to the actor.
enum Message {
UpdateLocation {
driver_id: u32,
lat: f64,
lng: f64,
},
GetDriverStatus {
driver_id: u32,
respond_to: oneshot::Sender<Option<DriverStatus>>,
},
}
#[derive(Debug, Clone)]
struct DriverStatus {
driver_id: u32,
lat: f64,
lng: f64,
update_count: u64,
}
// The Actor: a struct that owns data and lives inside a spawned task.
// No Arc<Mutex> needed—only one task accesses the data.
struct DriverActor {
receiver: mpsc::Receiver<Message>,
drivers: std::collections::HashMap<u32, DriverStatus>,
}
impl DriverActor {
fn new(receiver: mpsc::Receiver<Message>) -> Self {
Self {
receiver,
drivers: std::collections::HashMap::new(),
}
}
async fn run(&mut self) {
while let Some(msg) = self.receiver.recv().await {
self.handle_message(msg);
}
}
fn handle_message(&mut self, msg: Message) {
match msg {
Message::UpdateLocation {
driver_id,
lat,
lng,
} => {
let status = self
.drivers
.entry(driver_id)
.or_insert(DriverStatus {
driver_id,
lat: 0.0,
lng: 0.0,
update_count: 0,
});
status.lat = lat;
status.lng = lng;
status.update_count += 1;
}
Message::GetDriverStatus {
driver_id,
respond_to,
} => {
let status = self.drivers.get(&driver_id).cloned();
// Ignore send errors if the caller dropped the receiver.
let _ = respond_to.send(status);
}
}
}
}
// The Handle: a clonable struct that holds an mpsc::Sender.
// This is what you pass around the application.
#[derive(Clone)]
struct DriverHandle {
sender: mpsc::Sender<Message>,
}
impl DriverHandle {
fn new() -> Self {
let (sender, receiver) = mpsc::channel(32);
let mut actor = DriverActor::new(receiver);
tokio::spawn(async move { actor.run().await });
Self { sender }
}
async fn update_location(
&self,
driver_id: u32,
lat: f64,
lng: f64,
) -> Result<(), ActorError> {
self.sender
.send(Message::UpdateLocation {
driver_id,
lat,
lng,
})
.await?;
Ok(())
}
async fn get_driver_status(
&self,
driver_id: u32,
) -> Result<Option<DriverStatus>, ActorError> {
let (tx, rx) = oneshot::channel();
self.sender
.send(Message::GetDriverStatus {
driver_id,
respond_to: tx,
})
.await?;
Ok(rx.await?)
}
}
#[tokio::main]
async fn main() -> Result<(), ActorError> {
let handle = DriverHandle::new();
// Multiple clones can be sent to different tasks.
let h1 = handle.clone();
let h2 = handle.clone();
let task1 = tokio::spawn(async move {
h1.update_location(1, 40.7128, -74.0060).await?;
h1.update_location(1, 40.7130, -74.0062).await?;
Ok::<(), ActorError>(())
});
let task2 = tokio::spawn(async move {
h2.update_location(2, 34.0522, -118.2437).await?;
Ok::<(), ActorError>(())
});
task1.await??;
task2.await??;
if let Some(s) = handle.get_driver_status(1).await? {
println!("Driver {}: ({}, {}), updates: {}", s.driver_id, s.lat, s.lng, s.update_count);
}
if let Some(s) = handle.get_driver_status(2).await? {
println!("Driver {}: ({}, {}), updates: {}", s.driver_id, s.lat, s.lng, s.update_count);
}
let missing = handle.get_driver_status(99).await?;
println!("Driver 99: {:?}", missing);
Ok(())
}