Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

ランタイムに制御を譲る

「最初の async プログラム」 節で見たように、各 await ポイントで、await されている future の準備ができて いない場合、Rust はランタイムにそのタスクを一時停止して別のタスクへ切り替える 機会を与えます。逆もまた真です。Rust が async ブロックを一時停止し、 ランタイムに制御を返すのは await ポイントでのみ です。await ポイントの 間にあるものはすべて同期的です。

つまり、await ポイントなしに async ブロックの中で大量の作業をすると、 その future はほかの future が進行するのを妨げます。これを、ある future が ほかの future を 飢餓状態にする と表現するのを耳にすることもあるでしょう。 場合によっては、それは大した問題ではないかもしれません。しかし、何らかの コストの高い初期化や長時間にわたる作業をしている場合、あるいは特定のタスクを 際限なく実行し続ける future がある場合には、いつどこでランタイムに 制御を返すかを考える必要があります。

この飢餓の問題を示すために、長時間実行される操作をシミュレートしてから、 それをどう解決するかを見ていきましょう。リスト 17-14 では slow 関数を 導入します。

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

このコードでは trpl::sleep ではなく std::thread::sleep を使っているので、 slow を呼び出すと、現在のスレッドが一定のミリ秒数だけブロックされます。 slow は、長時間実行され、しかもブロッキングでもある現実世界の操作の 代わりとして使えます。

リスト 17-15 では、2 つの future でこの種の CPU バウンドな作業を行う様子を slow を使って再現します。

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

各 future は、時間のかかる操作をいくつも実行した 後で初めて ランタイムに制御を返します。このコードを実行すると、次のような出力になります。

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

2 つの URL を取得する future を競わせるために trpl::select を使った リスト 17-5 と同様に、selecta が終わるとすぐに完了します。ただし、 2 つの future での slow の呼び出しはインターリーブされません。a future は trpl::sleep の呼び出しが await されるまでの作業をすべて行い、その後で b future が自身の trpl::sleep の呼び出しが await されるまでの作業をすべて行い、 最後に a future が完了します。両方の future が時間のかかるタスクの合間にも 進行できるようにするには、ランタイムに制御を返せる await ポイントが必要です。 つまり、await できる何かが必要なのです!

リスト 17-15 でも、この種の制御の受け渡しが起きていることはすでにわかります。 a future の末尾にある trpl::sleep を取り除くと、b future が まったく 実行されないまま完了してしまうからです。操作どうしが進行を交代できるように するための出発点として、リスト 17-16 に示すように trpl::sleep 関数を使ってみましょう。

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

slow の各呼び出しの間に、await ポイントを伴う trpl::sleep の呼び出しを 追加しました。これで 2 つの future の作業はインターリーブされます。

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

a future は trpl::sleep を呼び出す前に slow を呼び出しているため、 制御を b に渡す前にまだ少しの間は動き続けます。しかしその後は、どちらかが await ポイントに達するたびに、future は前後に切り替わります。この場合は、 slow の呼び出しごとにそうしていますが、作業は自分たちにとって最も理にかなう やり方で分割できます。

ただし、ここで本当にしたいのは スリープ することではありません。できるだけ 速く処理を進めたいのです。必要なのは、ランタイムに制御を返すことだけです。 それは trpl::yield_now 関数を使って直接行えます。リスト 17-17 では、 それらの trpl::sleep 呼び出しをすべて trpl::yield_now に置き換えます。

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::block_on(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::select(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

このコードは、実際の意図をより明確に表しているうえ、sleep を使うよりも かなり速くなる可能性があります。というのも、sleep が使うようなタイマーには どこまで細かい粒度で動けるかの制限がしばしばあるからです。たとえば、ここで 使っている sleep は、1 ナノ秒の Duration を渡したとしても、常に少なくとも 1 ミリ秒はスリープします。繰り返しますが、現代のコンピューターは 高速 です。 1 ミリ秒の間にもたくさんのことができます!

つまり、プログラムがほかに何をしているかによっては、計算集約的なタスクに 対しても async は有用になりえます。なぜなら、それはプログラムの異なる部分 どうしの関係を構造化するための便利な道具を提供するからです(ただし、 async 状態機械のオーバーヘッドというコストはあります)。これは 協調的マルチタスク の一種で、各 future は、await ポイントを通じていつ 制御を渡すかを自分で決める力を持っています。したがって各 future には、 長くブロックしすぎない責任もあります。Rust ベースの組み込みオペレーティング システムの中には、これが 唯一の マルチタスク方式であるものもあります!

もちろん、実際のコードでは、1 行ごとに関数呼び出しと await ポイントを 交互に並べるようなことは普通しません。このように制御を譲ることは比較的 低コストですが、ただではありません。多くの場合、計算集約的なタスクを分割しようと すると、かえってかなり遅くなることがあります。そのため、全体的な パフォーマンスのためには、処理を短時間ブロックさせたほうがよいこともあります。 コードの実際の性能ボトルネックが何なのかは、必ず測定して確認してください。 ただし、同時に起こるはずだと思っていた大量の処理が実際には直列に進んでいるのを 実際に 目にしているのであれば、この背後にある力学を念頭に置いておくことは 重要です!

独自の async 抽象化を構築する

future を組み合わせて、新しいパターンを作ることもできます。たとえば、 すでに持っている async の構成要素を使って timeout 関数を構築できます。 そうしてできあがるものは、さらに別の async 抽象化を作るために使える、 また別の構成要素になります。

リスト 17-18 は、この timeout が時間のかかる future に対してどのように 動作することを期待するかを示しています。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

これを実装してみましょう! まずは、timeout の API について考えてみましょう。

  • これを await できるようにするため、timeout 自体も async 関数である必要があります。
  • 1 つ目のパラメータは実行する future であるべきです。任意の future で動作できるように、これをジェネリックにできます。
  • 2 つ目のパラメータは待機する最大時間です。Duration を使えば、それを trpl::sleep に簡単に渡せます。
  • 戻り値は Result であるべきです。future が正常に完了した場合、Result は future が生成した値を持つ Ok になります。先にタイムアウトが経過した場合、Result はタイムアウトが待機した duration を持つ Err になります。

リスト 17-19 にこの宣言を示します。

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}

これで型に関する目標は満たせました。では次に、必要な 振る舞い について考えてみましょう。渡された future と duration を競争させたいのです。trpl::sleep を使って duration からタイマー future を作り、trpl::select を使ってそのタイマーと、呼び出し元が渡した future を一緒に実行できます。

リスト 17-20 では、trpl::selectawait した結果に対してマッチすることで timeout を実装します。

extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::block_on(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::select(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}

trpl::select の実装は公平ではありません。常に、渡された順序で引数を poll します(他の select 実装では、どの引数を最初に poll するかをランダムに選ぶものもあります)。そのため、max_time が非常に短い duration であっても future_to_try に完了の機会を与えられるように、future_to_try を先に select に渡します。future_to_try が先に完了した場合、selectfuture_to_try からの出力を持つ Left を返します。timer が先に完了した場合、select はタイマーの出力である () を持つ Right を返します。

future_to_try が成功して Left(output) を受け取った場合は、Ok(output) を返します。代わりに sleep タイマーが経過して Right(()) を受け取った場合は、_() を無視し、代わりに Err(max_time) を返します。

これで、他の 2 つの async ヘルパーを組み合わせて動作する timeout ができました。コードを実行すると、タイムアウト後の失敗モードが出力されます。

2 秒後に失敗しました

future は他の future と合成できるため、より小さな async の構成要素を使って非常に強力なツールを構築できます。たとえば、同じアプローチを使ってタイムアウトとリトライを組み合わせ、さらにそれらをネットワーク呼び出しのような操作(リスト 17-5 にあるものなど)で使うことができます。

実際には、通常は asyncawait を直接使い、次に select のような関数や join! マクロのようなマクロを使って、最も外側の future がどのように実行されるかを制御します。

ここまでで、複数の future を同時に扱うさまざまな方法を見てきました。次は、stream を使って、時間の経過に沿った順序の中で複数の future を扱う方法を見ていきます。