Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

async を使った並行性の適用

この節では、第16章でスレッドを使って取り組んだものと同じ並行性の課題のいくつかに async を適用します。そこで多くの重要な考え方についてはすでに説明したので、 この節ではスレッドと future の違いに焦点を当てます。

多くの場合、async を使って並行性を扱うための API は、スレッドを使うための API と 非常によく似ています。別の場合には、かなり異なるものになります。スレッドと async の間で API が似て 見える 場合であっても、振る舞いが異なることが多く、 そしてほとんどの場合、パフォーマンス特性は異なります。

spawn_task で新しいタスクを作成する

第16章の spawn で新しいスレッドを作る」節で 最初に扱った操作は、2 つの別々のスレッドで数を数え上げることでした。同じことを async を使ってやってみましょう。trpl クレートは、thread::spawn API と 非常によく似た spawn_task 関数と、thread::sleep API の async 版である sleep 関数を提供しています。これらを組み合わせて、リスト 17-6 に示すように カウントの例を実装できます。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

出発点として、トップレベルの関数を async にできるように、main 関数を trpl::block_on で設定します。

注: この章ではここから先、どの例でも main にまったく同じ trpl::block_on のラップ用コードを含めます。そのため、main の場合と同じように、 しばしばそれを省略します。自分のコードには必ず含めるようにしてください!

次に、そのブロック内に 2 つのループを書きます。どちらにも trpl::sleep の 呼び出しが含まれており、次のメッセージを送る前に 0.5 秒(500 ミリ秒)待機します。 一方のループを trpl::spawn_task の本体に入れ、もう一方をトップレベルの for ループに入れます。また、sleep の呼び出しの後に await も追加します。

このコードの振る舞いはスレッドベースの実装と似ています。これを実行したときに、 自分のターミナルではメッセージの表示順が異なる可能性がある点も同じです。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

この版は、メインの async ブロック本体にある for ループが終了するとすぐに止まります。 これは、spawn_task で生成したタスクが、main 関数の終了時に停止されるからです。 タスクが最後まで完了するまで実行させたい場合は、join ハンドルを使って最初のタスクの 完了を待つ必要があります。スレッドでは、スレッドの実行が終わるまで「ブロック」するために join メソッドを使いました。リスト 17-7 では、タスクハンドル自体が future なので、 同じことを await で行えます。その Output 型は Result なので、await した後で それに対して unwrap も行います。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

この更新版では、両方の ループが終了するまで実行されます。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

ここまでを見ると、async とスレッドは、構文が違うだけで似た結果をもたらすように見えます。 つまり、join ハンドルに対して join を呼び出す代わりに await を使い、 sleep の呼び出しも await します。

より大きな違いは、これを行うために別のオペレーティングシステムのスレッドを 生成する必要がなかったことです。実際、ここではタスクを spawn する必要すらありません。 async ブロックは無名の future にコンパイルされるため、それぞれのループを async ブロックに入れ、ランタイムに trpl::join 関数を使って両方を完了まで 実行させることができます。

第16章の 「すべてのスレッドの終了を待つ」節では、 std::thread::spawn を呼び出したときに返される JoinHandle 型に対して join メソッドを使う方法を示しました。trpl::join 関数はこれに似ていますが、 future 用です。これに 2 つの future を渡すと、両方が完了した時点で、渡した各 future の 出力を含むタプルを出力とする 1 つの新しい future を生成します。したがって、 リスト 17-8 では trpl::join を使って fut1fut2 の両方が終わるのを待ちます。 fut1fut2 を await するのではなく、trpl::join が生成する新しい future を await します。出力は 2 つの unit 値を含むタプルにすぎないので、これを無視します。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

これを実行すると、両方の future が完了まで実行されることがわかります。

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

これで、毎回まったく同じ順序になることがわかるでしょう。これは、 スレッドやリスト17-7の trpl::spawn_task で見たものとは大きく異なり ます。これは、trpl::join 関数が_公平_だからです。つまり、各フュー チャーを同じ頻度で確認し、それらを交互に切り替え、一方の準備ができて いるなら他方だけが先に進みすぎることはありません。スレッドでは、どの スレッドを確認し、どれだけ長く実行させるかをオペレーティングシステム が決定します。async Rust では、どのタスクを確認するかをランタイムが決 定します。(実際には、asyncランタイムは並行性を管理する方法の一部とし て内部でオペレーティングシステムのスレッドを使うことがあるため、詳細 は複雑になります。そのため、公平性を保証するのはランタイムにとってよ り手間のかかる作業になる場合があります――それでも可能です!)ランタ イムは、任意の操作について公平性を保証しなければならないわけではなく、 公平性が必要かどうかを選べるように、異なるAPIを提供していることもよ くあります。

フューチャーを await する方法を次のように変えて、どうなるか試してみて ください。

  • どちらか一方、または両方のループの周囲からasyncブロックを取り除く。
  • 各asyncブロックを定義した直後に await する。
  • 最初のループだけをasyncブロックで囲み、2番目のループ本体の後で、そ の結果のフューチャーを await する。

追加の挑戦として、コードを実行する_前に_、それぞれの場合の出力がどう なるか考えてみてください!

メッセージパッシングを使って2つのタスク間でデータを送る

フューチャー間でデータを共有する方法も、見覚えがあるでしょう。再び メッセージパッシングを使いますが、今回は型と関数のasync版を使います。 スレッドベースの並行性とフューチャーベースの並行性の重要な違いをいく つか示すため、16章の「メッセージパッシングでスレッド間でデータを転送 する」節でたどったのとは少し 異なる道筋をたどります。リスト17-9では、単一のasyncブロックだけから始 めます。別のスレッドを生成したときのように、別のタスクを_生成しませ ん_。

extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}

ここでは、16章でスレッドとともに使ったマルチプロデューサー・シングル コンシューマーのチャネルAPIのasync版である trpl::channel を使いま す。APIのasync版はスレッドベース版と少しだけ異なります。つまり、不変 ではなく可変のレシーバー rx を使い、recv メソッドは値を直接生成す る代わりに、await が必要なフューチャーを生成します。これで、送信側か ら受信側へメッセージを送れます。別のスレッドはもちろん、タスクさえ生 成する必要がないことに注意してください。必要なのは rx.recv の呼び出 しを await することだけです。

std::mpsc::channel の同期版 Receiver::recv メソッドは、メッセージ を受信するまでブロックします。trpl::Receiver::recv メソッドはasync なのでそうではありません。ブロックする代わりに、メッセージを受信する か、チャネルの送信側が閉じられるまで、制御をランタイムに返します。対 照的に、send の呼び出しはブロックしないので await しません。送信先 のチャネルは非有界なので、ブロックする必要がないのです。

注: このasyncコード全体は trpl::block_on 呼び出し内のasyncブロッ クで実行されるため、その内部のすべてはブロックを避けられます。しか し、その_外側_のコードは、block_on 関数が戻るまでブロックされま す。これこそが trpl::block_on 関数の要点です。つまり、ある一連の asyncコードのどこでブロックするか、ひいては同期コードとasyncコード のどこで切り替えるかを_選べる_ようにしてくれるのです。

この例について2つ注目してください。第一に、メッセージはすぐに到着しま す。第二に、ここではフューチャーを使っているにもかかわらず、まだ並行 性はありません。リスト内のすべては、フューチャーがまったく関係ない場 合と同じように、順番に起こります。

このうち最初の点に対処するため、リスト17-10に示すように、一連のメッ セージを送信し、その合間にスリープを入れてみましょう。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

メッセージを送るだけでなく、受け取る必要もあります。この場合は、何件 のメッセージが来るか分かっているので、rx.recv().await を4回呼び出し て手作業で受信することもできます。しかし現実には、通常は_未知の_数の メッセージを待つことになるため、もうメッセージがないと判断できるまで 待ち続ける必要があります。

リスト16-10では、同期チャネルから受信したすべての要素を処理するため に for ループを使いました。しかし、Rustにはまだ、_非同期に生成され る_一連の要素に対して for ループを使う方法がありません。そこで、ま だ見ていないループ、while let 条件付きループを使う必要があります。 これは、6章の[「if letlet...else を使った簡潔な制御フロー」] if-let節で見た if let 構文のループ版です。このループ は、指定したパターンが値にマッチし続ける限り実行を続けます。

rx.recv の呼び出しはフューチャーを生成し、それを await します。ラン タイムは、それが準備できるまでそのフューチャーを一時停止します。メッ セージが到着すると、そのたびにフューチャーは Some(message) に解決さ れます。チャネルが閉じられると、_一度でも_メッセージが到着したかどう かにかかわらず、そのフューチャーは代わりに None に解決されます。こ れは、もう値がなく、したがってポーリング、つまり await をやめるべきで あることを示します。

この while let ループが、これらすべてをまとめます。rx.recv().await を呼び出した結果が Some(message) であれば、メッセージにアクセスで き、if let と同じようにループ本体でそれを使えます。結果が None な ら、ループは終了します。ループが1回完了するたびに、再び await ポイン トに到達するので、別のメッセージが来るまでランタイムは再びそれを一時 停止します。

これでコードは、すべてのメッセージを正常に送受信できるようになりまし た。残念ながら、まだいくつか問題があります。ひとつには、メッセージは 0.5秒間隔では到着しません。プログラム開始から2秒(2,000ミリ秒)後に、 まとめて一度に到着します。もうひとつには、このプログラムはいつまでも 終了しません! その代わり、新しいメッセージを永久に待ち続けます。 ctrl-C を使って停止する必要があります。

1つのasyncブロック内のコードは直列に実行される

まず、各メッセージの間に遅延を挟んで届くのではなく、なぜ完全な遅延の 後にまとめて届くのかを見ていきましょう。あるasyncブロックの中では、 コード内で await キーワードが現れる順序が、そのままプログラム実行時 にそれらが実行される順序でもあります。 リスト17-10には async ブロックが1つしかないので、その中のすべては順番に実行されます。まだ並行性はありません。すべての tx.send 呼び出しが、すべての trpl::sleep 呼び出しとそれに対応する await ポイントに挟まれながら実行されます。その後になって初めて、while let ループが recv 呼び出し上の await ポイントを通過できるようになります。

私たちが欲しい振る舞い、つまり各メッセージの間に sleep の遅延が発生するようにするには、リスト17-11に示すように、txrx の操作をそれぞれ独自の async ブロックに入れる必要があります。そうすれば、リスト17-8と同様に、ランタイムは trpl::join を使ってそれぞれを別個に実行できます。ここでも、個々の Future ではなく、trpl::join の呼び出し結果を await します。個々の Future を順番に await してしまうと、単に逐次的なフローに戻ってしまいます。これはまさに、私たちが したくない ことです。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

リスト17-11の更新後のコードでは、メッセージは2秒後にまとめて一気に表示されるのではなく、500ミリ秒間隔で表示されます。

所有権を async ブロックに移動する

ただし、このプログラムは依然として終了しません。これは、while let ループと trpl::join の相互作用によるものです。

  • trpl::join から返される Future は、それに渡された 両方の Future が完了したときにのみ完了します。
  • tx_fut Future は、vals 内の最後のメッセージを送信した後の sleep を終えると完了します。
  • rx_fut Future は、while let ループが終了するまで完了しません。
  • while let ループは、rx.recvawait した結果が None になるまで終了しません。
  • rx.recvawait すると、チャネルの反対側が閉じられたときにのみ None が返ります。
  • チャネルが閉じるのは、rx.close を呼び出すか、送信側である tx がドロップされたときだけです。
  • rx.close はどこでも呼び出しておらず、txtrpl::block_on に渡した最も外側の async ブロックが終わるまでドロップされません。
  • そのブロックは trpl::join の完了待ちでブロックされているため終われず、その結果、このリストの先頭に戻ることになります。

現時点では、メッセージを送信する async ブロックは tx借用 しているだけです。というのも、メッセージの送信に所有権は必要ないからです。しかし、もし tx をその async ブロックへ move できれば、そのブロックが終了した時点で tx はドロップされます。第13章の 「参照をキャプチャするか、所有権を移動する」 節では、クロージャで move キーワードを使う方法を学びました。また、第16章の 「スレッドで move クロージャを使う」 節で説明したように、スレッドを扱うときには、データをクロージャへ移動する必要がしばしばあります。同じ基本的な力学が async ブロックにも当てはまるため、move キーワードはクロージャと同じように async ブロックでも機能します。

リスト17-12では、メッセージ送信に使っているブロックを async から async move に変更します。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

この バージョンのコードを実行すると、最後のメッセージが送信され受信された後に正常に終了します。次に、複数の Future からデータを送信するには何を変更する必要があるかを見てみましょう。

join! マクロで複数の Future を結合する

この非同期チャネルはマルチプロデューサチャネルでもあるので、リスト17-13に示すように、複数の Future からメッセージを送信したい場合は tx に対して clone を呼び出せます。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}

まず、tx をクローンして、最初の async ブロックの外側で tx1 を作成します。そして、先ほど tx で行ったのと同じように、tx1 をそのブロックに move します。その後、元の tx新しい async ブロックに move し、そこで少し長めの遅延でさらにメッセージを送信します。この新しい async ブロックはたまたまメッセージ受信用の async ブロックの後に置いていますが、同じように前に置くこともできます。重要なのは、Future が作成される順序ではなく、それらが await される順序です。

メッセージ送信用の2つの async ブロックは、どちらも async move ブロックである必要があります。そうすることで、それらのブロックが終了したときに txtx1 の両方がドロップされます。そうしないと、結局また最初と同じ無限ループに戻ってしまいます。

最後に、追加の Future を扱うために trpl::join から trpl::join! に切り替えます。join! マクロは、コンパイル時に Future の数が分かっている場合に、任意個の Future を await できます。数が不明な Future のコレクションを await する方法については、この章の後半で説明します。

これで、2つの送信 Future からのすべてのメッセージが見えるようになります。また、送信 Future は送信後に少しずつ異なる遅延を使っているため、メッセージもその異なる間隔で受信されます。

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Future 間でデータを送るためにメッセージパッシングを使う方法、async ブロック内のコードがどのように逐次実行されるか、所有権を async ブロックへ移動する方法、そして複数の Future を結合する方法を見てきました。次は、ランタイムに別のタスクへ切り替えてよいことをどのように、そしてなぜ伝えるのかを説明しましょう。