チャネルを介した通信。
チャネルは、実行中のタスク間でデータをやり取りするために使用できます。チャネルは本質的には待機キューであり、複数のプロデューサーと単一のレシーバーを持つタスク間の通信を可能にします。チャネルは init タスクで構築され、静的に割り当てられたメモリを基盤としています。送信エンドポイントと受信エンドポイントは ソフトウェア タスクに配布されます。
...
const CAPACITY: usize = 5;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
sender2::spawn(s.clone()).unwrap();
...
この場合、チャネルは u32 型のデータを保持し、容量は 5 要素です。
チャネルは ハードウェア タスクからも使用できますが、Try API を使用した非 async の方法でのみ可能です。
データの送信
send メソッドは、以下のようにチャネルにメッセージを送信します。
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
}
データの受信
受信側は到着するメッセージを await できます。
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
...
}
}
チャネルは、競合状態を防ぐために、小さな(グローバルな)クリティカルセクション(CS)を使って実装されています。ユーザーは CS 実装を提供しなければなりません。例を --features test-critical-section 付きでコンパイルすると、可能な実装の 1 つが得られます。
完全な例:
//! examples/async-channel.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 5;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
sender2::spawn(s.clone()).unwrap();
sender3::spawn(s).unwrap();
(Shared {}, Local {})
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
if val == 3 {
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
}
#[task]
async fn sender2(_c: sender2::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 2 sending: 2");
sender.send(2).await.unwrap();
}
#[task]
async fn sender3(_c: sender3::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 3 sending: 3");
sender.send(3).await.unwrap();
}
}
$ cargo xtask qemu --verbose --example async-channel --features test-critical-section
Sender 1 sending: 1
Sender 2 sending: 2
Sender 3 sending: 3
Receiver got: 1
Receiver got: 2
Receiver got: 3
また、送信エンドポイントも await できます。チャネル容量がまだ上限に達していない場合、送信側を await した処理は直ちに進行できます。一方、容量に達している場合、キューに空きができるまで送信側はブロックされます。このようにしてデータが失われることはありません。
以下の例では、CAPACITY が 1 に減らされており、送信タスクはチャネル内のデータが受信されるまで待機することになります。
//! examples/async-channel-done.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
sender2::spawn(s.clone()).unwrap();
sender3::spawn(s).unwrap();
(Shared {}, Local {})
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
if val == 3 {
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
hprintln!("Sender 1 done");
}
#[task]
async fn sender2(_c: sender2::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 2 sending: 2");
sender.send(2).await.unwrap();
hprintln!("Sender 2 done");
}
#[task]
async fn sender3(_c: sender3::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 3 sending: 3");
sender.send(3).await.unwrap();
hprintln!("Sender 3 done");
}
}
出力を見ると、Sender 2 は Sender 1 によって送信されたデータが受信されるまで待機することがわかります。
注意 同じ優先度の ソフトウェア タスクは互いに非同期に実行されるため、厳密な順序を 一切 仮定できません。(ここで示している順序は現在の実装にのみ当てはまるものであり、RTIC フレームワークのリリース間で変わる可能性があります。)
$ cargo xtask qemu --verbose --example async-channel-done --features test-critical-section
Sender 1 sending: 1
Sender 1 done
Sender 2 sending: 2
Sender 3 sending: 3
Receiver got: 1
Sender 2 done
Receiver got: 2
Sender 3 done
Receiver got: 3
エラーハンドリング
すべての送信側がドロップされている場合、空の受信チャネルを await するとエラーになります。これにより、さまざまな種類のシャットダウン処理を適切に実装できます。
//! examples/async-channel-no-sender.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (_s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
(Shared {}, Local {})
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
hprintln!("Receiver got: {:?}", receiver.recv().await);
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
$ cargo xtask qemu --verbose --example async-channel-no-sender --features test-critical-section
Receiver got: Err(NoSender)
同様に、レシーバーがドロップされている場合、送信チャネルを await するとエラーになります。これにより、アプリケーションレベルのエラーハンドリングを適切に実装できます。
その際のエラーはデータを送信側に返すため、送信側は適切な対応(たとえば、後で再送するためにデータを保存するなど)を取ることができます。
//! examples/async-channel-no-receiver.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, _r) = make_channel!(u32, CAPACITY);
sender1::spawn(s.clone()).unwrap();
(Shared {}, Local {})
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1 {:?}", sender.send(1).await);
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
}
$ cargo xtask qemu --verbose --example async-channel-no-receiver --features test-critical-section
Sender 1 sending: 1 Err(NoReceiver(1))
Try API
Try API を使うと、操作の成功を前提とせずに、また非 async コンテキストでも、チャネルへの送信およびチャネルからの受信を行えます。
この API は Receiver::try_recv と Sender::try_send を通じて提供されています。
//! examples/async-channel-try.rs
#![no_main]
#![no_std]
#![deny(warnings)]
#![deny(unsafe_code)]
#![deny(missing_docs)]
use panic_semihosting as _;
#[rtic::app(device = lm3s6965, dispatchers = [SSI0])]
mod app {
use cortex_m_semihosting::{debug, hprintln};
use rtic_sync::{channel::*, make_channel};
#[shared]
struct Shared {}
#[local]
struct Local {
sender: Sender<'static, u32, CAPACITY>,
}
const CAPACITY: usize = 1;
#[init]
fn init(_: init::Context) -> (Shared, Local) {
let (s, r) = make_channel!(u32, CAPACITY);
receiver::spawn(r).unwrap();
sender1::spawn(s.clone()).unwrap();
(Shared {}, Local { sender: s.clone() })
}
#[task]
async fn receiver(_c: receiver::Context, mut receiver: Receiver<'static, u32, CAPACITY>) {
while let Ok(val) = receiver.recv().await {
hprintln!("Receiver got: {}", val);
}
}
#[task]
async fn sender1(_c: sender1::Context, mut sender: Sender<'static, u32, CAPACITY>) {
hprintln!("Sender 1 sending: 1");
sender.send(1).await.unwrap();
hprintln!("Sender 1 try sending: 2 {:?}", sender.try_send(2));
debug::exit(debug::EXIT_SUCCESS); // Exit QEMU simulator
}
// This interrupt is never triggered, but is used to demonstrate that
// one can (try to) send data into a channel from a hardware task.
#[task(binds = GPIOA, local = [sender])]
fn hw_task(cx: hw_task::Context) {
cx.local.sender.try_send(3).ok();
}
}
$ cargo xtask qemu --verbose --example async-channel-try --features test-critical-section
Sender 1 sending: 1
Sender 1 try sending: 2 Err(Full(2))