Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

チャネルを介した通信。

チャネルは、実行中のタスク間でデータをやり取りするために使用できます。チャネルは本質的には待機キューであり、複数のプロデューサーと単一のレシーバーを持つタスク間の通信を可能にします。チャネルは init タスクで構築され、静的に割り当てられたメモリを基盤としています。送信エンドポイントと受信エンドポイントは ソフトウェア タスクに配布されます。

...
const CAPACITY: usize = 5;
#[init]
    fn init(_: init::Context) -> (Shared, Local) {
        let (s, r) = make_channel!(u32, CAPACITY);
        receiver::spawn(r).unwrap();
        sender1::spawn(s.clone()).unwrap();
        sender2::spawn(s.clone()).unwrap();
        ...

この場合、チャネルは u32 型のデータを保持し、容量は 5 要素です。

チャネルは ハードウェア タスクからも使用できますが、Try API を使用した非 async の方法でのみ可能です。

データの送信

send メソッドは、以下のようにチャネルにメッセージを送信します。

#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
    hprintln!("Sender 1 sending: 1");
    sender.send(1).await.unwrap();
}

データの受信

受信側は到着するメッセージを await できます。

#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
    while let Ok(val) = receiver.recv().await {
        hprintln!("Receiver got: {}", val);
        ...
    }
}

チャネルは、競合状態を防ぐために、小さな(グローバルな)クリティカルセクション(CS)を使って実装されています。ユーザーは CS 実装を提供しなければなりません。例を --features test-critical-section 付きでコンパイルすると、可能な実装の 1 つが得られます。

完全な例:

//! examples/async-channel.rs

#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]

use panic_semihosting as _;

#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
    use cortex_m_semihosting::{debug, hprintln};
    use rtic_sync::{channel::*, make_channel};

    #[shared]
    struct Shared {}

    #[local]
    struct Local {}

    const CAPACITY: usize = 5;
    #[init]
    fn init(_: init::Context) -> (Shared, Local) {
        let (s, r) = make_channel!(u32, CAPACITY);

        receiver::spawn(r).unwrap();
        sender1::spawn(s.clone()).unwrap();
        sender2::spawn(s.clone()).unwrap();
        sender3::spawn(s).unwrap();

        (Shared {}, Local {})
    }

    #[task]
    async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
        while let Ok(val) = receiver.recv().await {
            hprintln!("Receiver got: {}", val);
            if val == 3 {
                debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
            }
        }
    }

    #[task]
    async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 1 sending: 1");
        sender.send(1).await.unwrap();
    }

    #[task]
    async fn sender2(_c: sender2::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 2 sending: 2");
        sender.send(2).await.unwrap();
    }

    #[task]
    async fn sender3(_c: sender3::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 3 sending: 3");
        sender.send(3).await.unwrap();
    }
}
$ cargo xtask qemu --verbose --example async-channel --features test-critical-section
Sender 1 sending: 1
Sender 2 sending: 2
Sender 3 sending: 3
Receiver got: 1
Receiver got: 2
Receiver got: 3

また、送信エンドポイントも await できます。チャネル容量がまだ上限に達していない場合、送信側を await した処理は直ちに進行できます。一方、容量に達している場合、キューに空きができるまで送信側はブロックされます。このようにしてデータが失われることはありません。

以下の例では、CAPACITY が 1 に減らされており、送信タスクはチャネル内のデータが受信されるまで待機することになります。

//! examples/async-channel-done.rs

#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]

use panic_semihosting as _;

#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
    use cortex_m_semihosting::{debug, hprintln};
    use rtic_sync::{channel::*, make_channel};

    #[shared]
    struct Shared {}

    #[local]
    struct Local {}

    const CAPACITY: usize = 1;
    #[init]
    fn init(_: init::Context) -> (Shared, Local) {
        let (s, r) = make_channel!(u32, CAPACITY);

        receiver::spawn(r).unwrap();
        sender1::spawn(s.clone()).unwrap();
        sender2::spawn(s.clone()).unwrap();
        sender3::spawn(s).unwrap();

        (Shared {}, Local {})
    }

    #[task]
    async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
        while let Ok(val) = receiver.recv().await {
            hprintln!("Receiver got: {}", val);
            if val == 3 {
                debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
            }
        }
    }

    #[task]
    async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 1 sending: 1");
        sender.send(1).await.unwrap();
        hprintln!("Sender 1 done");
    }

    #[task]
    async fn sender2(_c: sender2::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 2 sending: 2");
        sender.send(2).await.unwrap();
        hprintln!("Sender 2 done");
    }

    #[task]
    async fn sender3(_c: sender3::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 3 sending: 3");
        sender.send(3).await.unwrap();
        hprintln!("Sender 3 done");
    }
}

出力を見ると、Sender 2Sender 1 によって送信されたデータが受信されるまで待機することがわかります。

注意 同じ優先度の ソフトウェア タスクは互いに非同期に実行されるため、厳密な順序を 一切 仮定できません。(ここで示している順序は現在の実装にのみ当てはまるものであり、RTIC フレームワークのリリース間で変わる可能性があります。)

$ cargo xtask qemu --verbose --example async-channel-done --features test-critical-section
Sender 1 sending: 1
Sender 1 done
Sender 2 sending: 2
Sender 3 sending: 3
Receiver got: 1
Sender 2 done
Receiver got: 2
Sender 3 done
Receiver got: 3

エラーハンドリング

すべての送信側がドロップされている場合、空の受信チャネルを await するとエラーになります。これにより、さまざまな種類のシャットダウン処理を適切に実装できます。

//! examples/async-channel-no-sender.rs

#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]

use panic_semihosting as _;

#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
    use cortex_m_semihosting::{debug, hprintln};
    use rtic_sync::{channel::*, make_channel};

    #[shared]
    struct Shared {}

    #[local]
    struct Local {}

    const CAPACITY: usize = 1;
    #[init]
    fn init(_: init::Context) -> (Shared, Local) {
        let (_s, r) = make_channel!(u32, CAPACITY);

        receiver::spawn(r).unwrap();

        (Shared {}, Local {})
    }

    #[task]
    async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
        hprintln!("Receiver got: {:?}", receiver.recv().await);

        debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
    }
}
$ cargo xtask qemu --verbose --example async-channel-no-sender --features test-critical-section
Receiver got: Err(NoSender)

同様に、レシーバーがドロップされている場合、送信チャネルを await するとエラーになります。これにより、アプリケーションレベルのエラーハンドリングを適切に実装できます。

その際のエラーはデータを送信側に返すため、送信側は適切な対応(たとえば、後で再送するためにデータを保存するなど)を取ることができます。

//! examples/async-channel-no-receiver.rs

#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]

use panic_semihosting as _;

#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
    use cortex_m_semihosting::{debug, hprintln};
    use rtic_sync::{channel::*, make_channel};

    #[shared]
    struct Shared {}

    #[local]
    struct Local {}

    const CAPACITY: usize = 1;
    #[init]
    fn init(_: init::Context) -> (Shared, Local) {
        let (s, _r) = make_channel!(u32, CAPACITY);

        sender1::spawn(s.clone()).unwrap();

        (Shared {}, Local {})
    }

    #[task]
    async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 1 sending: 1 {:?}", sender.send(1).await);
        debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
    }
}
$ cargo xtask qemu --verbose --example async-channel-no-receiver --features test-critical-section
Sender 1 sending: 1 Err(NoReceiver(1))

Try API

Try API を使うと、操作の成功を前提とせずに、また非 async コンテキストでも、チャネルへの送信およびチャネルからの受信を行えます。

この API は Receiver::try_recvSender::try_send を通じて提供されています。

//! examples/async-channel-try.rs

#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]

use panic_semihosting as _;

#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
    use cortex_m_semihosting::{debug, hprintln};
    use rtic_sync::{channel::*, make_channel};

    #[shared]
    struct Shared {}

    #[local]
    struct Local {
        sender: Sender<'static, u32, CAPACITY>,
    }

    const CAPACITY: usize = 1;
    #[init]
    fn init(_: init::Context) -> (Shared, Local) {
        let (s, r) = make_channel!(u32, CAPACITY);

        receiver::spawn(r).unwrap();
        sender1::spawn(s.clone()).unwrap();

        (Shared {}, Local { sender: s.clone() })
    }

    #[task]
    async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
        while let Ok(val) = receiver.recv().await {
            hprintln!("Receiver got: {}", val);
        }
    }

    #[task]
    async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
        hprintln!("Sender 1 sending: 1");
        sender.send(1).await.unwrap();
        hprintln!("Sender 1 try sending: 2 {:?}", sender.try_send(2));
        debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
    }

    // This interrupt is never triggered, but is used to demonstrate that
    // one can (try to) send data into a channel from a hardware task.
    #[task(binds = GPIOA, local = [sender])]
    fn hw_task(cx: hw_task::Context) {
        cx.local.sender.try_send(3).ok();
    }
}
$ cargo xtask qemu --verbose --example async-channel-try --features test-critical-section
Sender 1 sending: 1
Sender 1 try sending: 2 Err(Full(2))