Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

アクターパターン

Tokio を用いたアクターパターン(ハンドル/アクター/メッセージ)

tokio-badge cat-concurrency-badge cat-rust-patterns-badge

Arc<Mutex<T>> を使わずに共有可変状態を管理する一般的なパターンの 1 つが、Alice Ryhl によって広く知られるようになったアクターパターンです。アクターは自身のデータを所有する struct であり、単一の spawn されたタスク内で実行されます。クローン可能な ハンドルmpsc::Sender を保持し、公開 API となります。メッセージ は、アクターが処理できるすべてのコマンドを記述する enum です。

データにアクセスするのは常に 1 つのタスクだけであるため、ロックは不要です。リクエスト–レスポンスの組は、メッセージバリアントに埋め込まれた oneshot チャネルを使用します。

use thiserror::Error;
use tokio::sync::{mpsc, oneshot};

#[derive(Debug, Error)]
enum ActorError {
    #[error("failed to send message to actor")]
    Send(#[from] mpsc::error::SendError<Message>),
    #[error("actor dropped response channel")]
    Recv(#[from] oneshot::error::RecvError),
    #[error("task failed")]
    Join(#[from] tokio::task::JoinError),
}

// The Message enum represents commands sent to the actor.
enum Message {
    UpdateLocation {
        driver_id: u32,
        lat: f64,
        lng: f64,
    },
    GetDriverStatus {
        driver_id: u32,
        respond_to: oneshot::Sender<Option<DriverStatus>>,
    },
}

#[derive(Debug, Clone)]
struct DriverStatus {
    driver_id: u32,
    lat: f64,
    lng: f64,
    update_count: u64,
}

// The Actor: a struct that owns data and lives inside a spawned task.
// No Arc<Mutex> needed—only one task accesses the data.
struct DriverActor {
    receiver: mpsc::Receiver<Message>,
    drivers: std::collections::HashMap<u32, DriverStatus>,
}

impl DriverActor {
    fn new(receiver: mpsc::Receiver<Message>) -> Self {
        Self {
            receiver,
            drivers: std::collections::HashMap::new(),
        }
    }

    async fn run(&mut self) {
        while let Some(msg) = self.receiver.recv().await {
            self.handle_message(msg);
        }
    }

    fn handle_message(&mut self, msg: Message) {
        match msg {
            Message::UpdateLocation {
                driver_id,
                lat,
                lng,
            } => {
                let status = self
                    .drivers
                    .entry(driver_id)
                    .or_insert(DriverStatus {
                        driver_id,
                        lat: 0.0,
                        lng: 0.0,
                        update_count: 0,
                    });
                status.lat = lat;
                status.lng = lng;
                status.update_count += 1;
            }
            Message::GetDriverStatus {
                driver_id,
                respond_to,
            } => {
                let status = self.drivers.get(&driver_id).cloned();
                // Ignore send errors if the caller dropped the receiver.
                let _ = respond_to.send(status);
            }
        }
    }
}

// The Handle: a clonable struct that holds an mpsc::Sender.
// This is what you pass around the application.
#[derive(Clone)]
struct DriverHandle {
    sender: mpsc::Sender<Message>,
}

impl DriverHandle {
    fn new() -> Self {
        let (sender, receiver) = mpsc::channel(32);
        let mut actor = DriverActor::new(receiver);
        tokio::spawn(async move { actor.run().await });
        Self { sender }
    }

    async fn update_location(
        &self,
        driver_id: u32,
        lat: f64,
        lng: f64,
    ) -> Result<(), ActorError> {
        self.sender
            .send(Message::UpdateLocation {
                driver_id,
                lat,
                lng,
            })
            .await?;
        Ok(())
    }

    async fn get_driver_status(
        &self,
        driver_id: u32,
    ) -> Result<Option<DriverStatus>, ActorError> {
        let (tx, rx) = oneshot::channel();
        self.sender
            .send(Message::GetDriverStatus {
                driver_id,
                respond_to: tx,
            })
            .await?;
        Ok(rx.await?)
    }
}

#[tokio::main]
async fn main() -> Result<(), ActorError> {
    let handle = DriverHandle::new();

    // Multiple clones can be sent to different tasks.
    let h1 = handle.clone();
    let h2 = handle.clone();

    let task1 = tokio::spawn(async move {
        h1.update_location(1, 40.7128, -74.0060).await?;
        h1.update_location(1, 40.7130, -74.0062).await?;
        Ok::<(), ActorError>(())
    });

    let task2 = tokio::spawn(async move {
        h2.update_location(2, 34.0522, -118.2437).await?;
        Ok::<(), ActorError>(())
    });

    task1.await??;
    task2.await??;

    if let Some(s) = handle.get_driver_status(1).await? {
        println!("Driver {}: ({}, {}), updates: {}", s.driver_id, s.lat, s.lng, s.update_count);
    }

    if let Some(s) = handle.get_driver_status(2).await? {
        println!("Driver {}: ({}, {}), updates: {}", s.driver_id, s.lat, s.lng, s.update_count);
    }

    let missing = handle.get_driver_status(99).await?;
    println!("Driver 99: {:?}", missing);

    Ok(())
}