Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

futureを並行に合成する

この章では、futureを合成するさらに多くの方法を扱います。特に、futureを並行に(ただし並列ではなく)実行できる新しい方法をいくつか取り上げます。表面的には、この章で導入する新しい関数/マクロはかなり単純です。しかし、その背後にある概念は非常に微妙な場合があります。まずfuture、並行性、並列性について振り返りますが、以前のセクションである並行性と並列性の比較も再確認するとよいでしょう。

futureは遅延された計算です。futureはawaitを使用して進めることができます。これにより制御がランタイムに渡され、現在のタスクはその計算結果を待機します。abがfutureである場合、それらは片方をawaitしてからもう片方をawaitすることで、逐次的に合成できます(つまり、aを完了まで実行し、その後bを完了まで実行するfutureを作るように組み合わせられます): async { a.await; b.await}

また、spawnを使用したfutureの並列合成も見てきました: async { let a = spawn(a); let b = spawn(b); (a.await, b.await)}は2つのfutureを並列に実行します。タプル内のawaitはfutureそのものを待機しているのではなく、futureが完了したときにその結果を取得するためのJoinHandleを待機していることに注意してください。

この章では、並列性なしでfutureを並行に合成する2つの方法、joinselect/raceを紹介します。どちらの場合も、futureはタイムスライシングによって並行に実行されます。合成された各futureが順番に実行され、次のfutureに順番が回ります。これはasyncランタイムを介さずに行われます(したがって、複数のOSスレッドも使わず、並列性が生じる可能性もありません)。合成を行う構成要素は、futureをローカルでインターリーブします。これらの構成要素は、単一のasyncタスク内で構成要素となるfutureを実行するミニexecutorのようなものだと考えることができます。

joinとselect/raceの根本的な違いは、futureが作業を完了したときの扱い方です。joinはすべてのfutureが完了したときに終了し、select/raceは1つのfutureが完了したときに終了します(他のすべてはキャンセルされます)。どちらにも、エラー処理のためのバリエーションがあります。

これらの構成要素(または類似の概念)はstreamとともに使われることがよくあります。これについては以下で少し触れますが、詳しくはstreamの章で説明します。

並列性が必要な場合(または並列性を明示的に不要としていない場合)、タスクをspawnすることは、これらの合成構成要素よりも単純な代替手段であることが多いです。タスクをspawnする方が、通常はエラーが起こりにくく、より汎用的で、パフォーマンスも予測しやすくなります。一方で、spawnは本質的に構造化の度合いが低いため、ライフサイクルやリソース管理について推論しにくくなる場合があります。

パフォーマンスの問題について、もう少し深く考える価値があります。並行合成で潜在的なパフォーマンス問題になるのは、時間共有の公平性です。プログラム内に100個のタスクがある場合、通常、リソースを共有する最適な方法は、各タスクがプロセッサ時間の1%を得ることです(または、タスクがすべて待機している場合は、それぞれが同じ確率で起床されることです)。100個のタスクをspawnした場合、通常は(おおよそ)そのようになります。しかし、2つのタスクをspawnし、そのうちの1つのタスクで99個のfutureをjoinした場合、スケジューラは2つのタスクしか認識せず、1つのタスクが時間の50%を得て、99個のfutureはそれぞれ0.5%を得ることになります。

通常、タスクの分布はそこまで偏っていませんし、join/selectなどはタイムアウトのように、この振る舞いが実際に望ましい場面で使うことが非常に多いです。しかし、プログラムが意図したパフォーマンス特性を持つようにするためには、考慮する価値があります。

Join

Tokioのjoinマクロは、futureのリストを受け取り、それらすべてを並行に完了まで実行します(すべての結果をタプルとして返します)。すべてのfutureが完了したときに返ります。futureは常に同じスレッド上で実行されます(並行であり、並列ではありません)。

簡単な例を示します。

async fn main() {
  let (result_1, result_2) = join!(do_a_thing(), do_a_thing());
  // `result_1` と `result_2` を使用する。
}

ここでは、do_a_thingの2つの実行が並行に発生し、両方が完了したときに結果が準備できています。結果を取得するためにawaitしていないことに注意してください。join!は暗黙的にfutureをawaitし、値を生成します。futureを作成するわけではありません。それでも、asyncコンテキスト内(たとえばasync関数内)で使用する必要があります。

上の例では見えませんが、join!はfutureに評価される式を受け取ります1joinはその本体内にasyncコンテキストを作成しないため、joinに渡すfutureをawaitすべきではありません(そうしないと、joinされたfutureより前に評価されてしまいます)。

すべてのfutureは同じスレッド上で実行されるため、いずれかのfutureがそのスレッドをブロックすると、どのfutureも進行できません。mutexやその他のロックを使用している場合、あるfutureが別のfutureによって保持されているロックを待機していると、簡単にデッドロックにつながる可能性があります。

joinはfutureの結果を気にしません。特に、futureがキャンセルされたりエラーを返したりしても、他のfutureには影響しません。他のfutureは実行を継続します。「fail fast」の振る舞いが必要な場合は、try_joinを使用してください。try_joinjoinと同様に動作しますが、いずれかのfutureがErrを返した場合、他のすべてのfutureはキャンセルされ、try_joinはただちにそのエラーを返します。

以前のasync/awaitの章では、spawnされたタスクをjoinすることについて話すために「join」という言葉を使いました。その名前が示すように、futureのjoinとタスクのjoinは関連しています。joinとは、複数のfutureを並行に実行し、継続する前にその結果を待つことを意味します。構文は異なります。JoinHandleを使用する場合とjoinマクロを使用する場合がありますが、考え方は似ています。重要な違いは、タスクをjoinする場合、タスクは並行かつ並列に実行される一方、join!を使用する場合、futureは並行に実行されますが並列には実行されないことです。さらに、spawnされたタスクはランタイムのスケジューラ上でスケジューリングされますが、join!ではfutureはローカルに「スケジューリング」されます(同じタスク上で、マクロの実行の時間的スコープ内で)。もう1つの違いは、spawnされたタスクがpanicした場合、そのpanicはランタイムによって捕捉されますが、join内のfutureがpanicした場合、タスク全体がpanicすることです。

代替手段

futureを並行に実行してその結果を収集することは、よくある要件です。そうしない十分な理由がない限り(つまり、並列性を明示的に望んでいない場合であり、その場合でもspawn_localを使う方を好むかもしれません)、おそらくspawnJoinHandleを使用すべきです。JoinSet抽象化は、そのようなspawnされたタスクをjoin!に似た方法で管理します。 ほとんどのランタイム(および futures.rs)には、Tokio の join マクロに相当するものがあり、それらはほぼ同じように振る舞います。join 関数もあり、これはマクロに似ていますが、柔軟性は少し劣ります。たとえば、futures.rs には 2つの Future を join するための join、名前から分かる数の Future を join するための join3join4join5、そして Future のコレクションを join するための join_all があります(これらそれぞれの try_ バリエーションもあります)。

Futures-concurrency も join(および try_join)の機能を提供しています。futures-concurrency のスタイルでは、これらの操作はタプル、Vec、配列などの Future のグループに対するトレイトメソッドです。たとえば、2つの Future を join するには、(fut1, fut2).join().await と書きます(ここでは await が明示的であることに注意してください)。

一緒に join したい Future の集合が動的に変化する場合(たとえば、ネットワーク経由で入力が届くにつれて新しい Future が作成される場合)、またはすべての Future が完了したときではなく、完了した順に結果を取得したい場合は、ストリームと FuturesUnordered または FuturesOrdered の機能を使う必要があります。これらについては ストリーム の章で扱います。

Race/select

Future を join することの対応物は、それらを race させること(別名、それらに対して select すること)です。race/select では Future は並行に実行されますが、すべての Future の完了を待つのではなく、最初の1つが完了するのを待ってから他をキャンセルします。これは join と似ているように聞こえますが、今度はキャンセルについて考える必要があるため、かなり興味深く(そして時にエラーを起こしやすく)なります。

Tokio の select マクロを使った例を示します。

async fn main() {
  select! {
    result = do_a_thing() => {
      println!("計算が完了し、{result} を返しました");
    }
    _ = timeout() => {
      println!("計算がタイムアウトしました");
    }
  }
}

select マクロの中で Future の結果を処理するため、join マクロよりもすでに物事が興味深くなっていることに気付くでしょう。これは match 式に少し似ていますが、select ではすべての分岐が並行に実行され、最初に完了した分岐の本体がその結果とともに実行されます(他の分岐は実行されず、Future は drop されることによってキャンセルされます)。この例では、do_a_thingtimeout が並行に実行され、先に完了した方のブロックが実行されます(つまり、実行される println は1つだけです)。もう一方の Future はキャンセルされます。join マクロと同様に、Future の await は暗黙的です。

Tokio の select マクロは多くの機能をサポートしています。

  • パターンマッチング: 各分岐の = の左側の構文はパターンにすることができ、Future の結果がそのパターンに一致する場合にのみブロックが実行されます。パターンが一致しない場合、その Future はそれ以降 poll されません(ただし、他の Future は poll されます)。これは、任意で値を返す Future に役立ちます。たとえば、Some(x) = do_a_thing() => { ... } です。
  • if ガード: 各分岐は if ガードを持つことができます。select マクロが実行されると、各式を評価して Future を生成した後に if ガードが評価され、そのガードが true の場合にのみ Future が poll されます。たとえば、x = = do_a_thing() if false => { ... } は決して poll されません。if ガードはポーリング中に再評価されるのではなく、マクロが初期化されたときにのみ評価されることに注意してください。
  • else 分岐: selectelse => { ... } という else 分岐を持つことができます。これは、すべての Future が停止し、どのブロックも実行されていない場合に実行されます。これが else 分岐なしで発生した場合、select は panic します。

select! マクロの値は(match と同じように)実行された分岐の値であるため、すべての分岐は同じ型でなければなりません。たとえば、上の例の結果を select の外で使いたい場合は、次のように書きます。

async fn main() {
  let result = select! {
    result = do_a_thing() => {
      Some(result)
    }
    _ = timeout() => {
      None
    }
  };

  // `result` を使用
}

join! と同様に、select!Result を特別扱いしません(前述のパターンマッチングを除きます)。ある分岐がエラーで完了した場合、他のすべての分岐はキャンセルされ、そのエラーが select の結果として使われます(その分岐が正常に完了した場合と同じように)。

select マクロは本質的にキャンセルを使用するため、プログラムでキャンセルを避けようとしているなら、select! を避けなければなりません。実際、非同期プログラムにおいて select はしばしばキャンセルの主な発生源です。別の場所 で説明したように、キャンセルにはバグにつながり得る微妙な問題が数多くあります。特に、select は単に Future を drop することでキャンセルすることに注意してください。これは drop される Future に通知したり、キャンセルトークンなどをトリガーしたりしません。

select! は、ストリームやその他の Future のシーケンスを処理するために、ループの中でよく使われます。これにより、複雑さとバグの機会がさらに一段増えます。ループの各イテレーションで新しい独立した Future を作成する単純な場合には、物事はそれほど複雑にはなりません。しかし、これが必要とされることはまれです。一般には、イテレーション間で何らかの状態を保持したいものです。select をストリームとともにループ内で使い、ループの各イテレーションでストリームからの1つの結果を処理することは一般的です。例:

async fn main() {
  let mut stream = ...;

  loop {
    select! {
      result = stream.next() => {
        match result {
          Some(x) => println!("受信しました: {x}"),
          None => break,
        }
      }
      _ = timeout() => {
        println!("タイムアウトしました!");
        break;
      }
    }
  }
}

この例では、stream から値を読み取り、値がなくなるか、結果待ちがタイムアウトするまでそれらを出力します。タイムアウトの場合にストリーム内に残っているデータがどうなるかは、ストリームの実装によって異なります(失われるかもしれません! あるいは重複するかもしれません!)。これは、キャンセルに直面したときの振る舞いが重要(かつ厄介)になり得る理由の一例です。 イテレーションをまたいで、ストリームだけでなく future も再利用したい場合があります。たとえば、各イテレーションに新しいタイムアウトを適用するのではなく、すべてのイテレーションに適用される timeout future と競合させたい場合があります。これは、future をループの外側で作成し、それを参照することで可能です。

async fn main() {
  let mut stream = ...;
  let mut timeout = timeout();

  loop {
    select! {
      result = stream.next() => {
        match result {
          Some(x) => println!("received: {x}"),
          None => break,
        }
      }
      // `timeout` をムーブするのではなく、参照を作成する。
      _ = &mut timeout => {
        println!("time out!");
        break;
      }
    }
  }
}

select! の外側で作成された future や stream を使って、ループ内で select! を使用する場合には、いくつか重要な詳細があります。これらは select の動作方法から生じる根本的な帰結なので、最後の例の timeout を例として使い、select の詳細を順に追いながら紹介します。

  • timeout はループの外側で作成され、カウントダウンする時間で初期化されます。
  • ループの各イテレーションで、selecttimeout への参照を作成しますが、その状態は変更しません。
  • select が実行されると、timeout をポーリングします。残り時間がある間は Pending を返し、時間が経過すると Ready を返します。その時点でそのブロックが実行されます。

上の例では、timeout が ready になったときに、ループから break しています。しかし、そうしなかったらどうなるでしょうか?その場合、select は単に再び timeout をポーリングしますが、Futureドキュメントでは、これは起きるべきではないとされています!select にはこれを防ぐ手段がありません。timeout をポーリングすべきかどうかを判断するための状態を(イテレーション間で)持たないからです。timeout がどのように書かれているかによって、これにより panic、ロジックエラー、ある種のクラッシュが発生する可能性があります。

この種のバグは、いくつかの方法で防ぐことができます。

  • 再ポーリングが安全になるように、fusedfuture または stream を使用する。
  • たとえばループを抜ける(前の例のように)、または if ガードを使用するなどして、future が決して再ポーリングされないようにコードを構造化する。

次に、&mut timeout の型について考えてみましょう。timeout()Future を実装する型を返すものと仮定します。それは async 関数からの匿名型かもしれませんし、Timeout のような名前付き型かもしれません。例を簡単にするため後者だと仮定します(ただし、どちらの場合でもロジックは適用されます)。TimeoutFuture を実装しているとして、&mut TimeoutFuture を実装するでしょうか?必ずしもそうではありません!これを真にする包括的な impl がありますが、それは TimeoutUnpin を実装している場合に限られます。これはすべての future に当てはまるわけではないため、最後の例のようなコードを書くと、しばしば型エラーが発生します。ただし、このようなエラーは、たとえば let mut timeout = pin!(timeout()); のように pin マクロを使うことで簡単に修正できます。

ループ内の select によるキャンセルは、微妙なバグの豊富な発生源です。これらは通常、future が何らかのデータに関する状態を含んでいるものの、データ自体は含んでいない場合に発生します。キャンセルによって future がドロップされると、その状態は失われますが、基になるデータは更新されません。これにより、データが失われたり、複数回処理されたりする可能性があります。

代替手段

Futures.rs には独自の select マクロ があり、futures-concurrency には Tokio の select マクロの代替となる Race trait があります。これらはいずれも、複数の future を並行に競合させ、最初の結果を処理して他をキャンセルするという同じ中核的なセマンティクスを持ちますが、構文が異なり、細部にも違いがあります。

Futures.rs の select は表面的には Tokio のものと似ています。違いを要約すると、futures.rs 版では次のようになります。

  • Future は常に fused でなければならない(型チェックによって強制されます)。
  • select には else 分岐ではなく、default 分岐と complete 分岐があります。
  • selectif ガードをサポートしません。

Futures-concurrency の Race は、そのバージョンの join に似た非常に異なる構文を持ちます。たとえば、(future_a, future_b).race().await です(タプルだけでなく Vec や配列でも動作します)。この構文はマクロほど柔軟ではありませんが、ほとんどの async コードにうまく馴染みます。ループ内で race を使用する場合でも、select と同じ問題が発生し得ることに注意してください。

join と同様に、タスクを spawn して並列に実行させることは、select を使用する代替手段として適していることがよくあります。ただし、最初のタスクが完了した後に残りのタスクをキャンセルするには、追加の作業が必要です。これはチャネルやキャンセルトークンを使って実現できます。どちらの場合でも、キャンセルされるタスク側で何らかのアクションが必要になります。つまり、そのタスクは後片付けやその他のグレースフルシャットダウンを行うことができます。

select の一般的な用途(特にループ内)は、stream を扱うことです。select のいくつかの用途を置き換えられる stream コンビネータメソッドがあります。たとえば、futures-concurrency の merge は、複数の stream を結合するための優れた代替手段です。

最後に

このセクションでは、future のグループを並行に実行する 2 つの方法について説明しました。future を join するとは、それらすべての完了を待つことを意味します。future を select する(別名 race する)とは、最初のものが完了するのを待つことを意味します。タスクを spawn する場合とは対照的に、これらの合成は並列性を利用しません。

joinselect はどちらも、あらかじめわかっている future の集合に対して動作します(多くの場合、実行時ではなくプログラムを書く時点でわかっています)。場合によっては、合成する future があらかじめわかっていないことがあります。実行中に、合成された future の集合へ future を追加しなければなりません。このためには、独自の合成操作を持つ stream が必要です。

これらの合成演算子は強力で表現力豊かですが、多くの場合、タスクと spawn を使用するほうが簡単で適切であることを改めて強調しておく価値があります。並列性が望ましいことは多く、キャンセルやブロッキングに関するバグが発生しにくく、リソース割り当ても通常はより公平(または少なくともより単純)で予測しやすくなります。


  1. 式は IntoFuture を実装する型でなければなりません。式は評価され、マクロによって Future に変換されます。つまり、実際に Future に評価される必要はなく、Future に変換できる何かであればよいのですが、これはかなり小さな違いです。式自体は、結果として得られる Future が実行される前に順番に評価されます。