Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

テストケース: map-reduce

Rust を使うと、従来このような試みに伴いがちな多くの悩みなしに、データ処理を非常に簡単に並列化できます。

標準ライブラリは、優れたスレッドプリミティブを標準で提供しています。 これらは、Rust の所有権という概念やエイリアシング規則と組み合わさることで、自動的に データ競合を防ぎます。

エイリアシング規則(1つの書き込み可能な参照 XOR 多数の読み取り可能な参照)は、他のスレッドから見える状態を 操作してしまうことを自動的に防ぎます。(同期が必要な場合には、 MutexChannel のような 同期プリミティブがあります。)

この例では、数値のブロック内にあるすべての桁の合計を計算します。 これを、ブロックのチャンクを別々のスレッドに分配することで実現します。各スレッドは 自分の小さな桁ブロックの合計を求め、その後で各 スレッドが生成した中間合計を合計します。

スレッド境界を越えて参照を渡しているにもかかわらず、Rust は、私たちが 読み取り専用の参照だけを渡していること、したがって unsafe な状況やデータ競合が発生し得ないことを理解します。また、 渡している参照には 'static ライフタイムがあるため、Rust は、これらのスレッドがまだ実行中の間に データが破棄されることはないと理解します。(スレッド間で非 static データを共有する必要がある場合は、 Arc のようなスマートポインタを使ってデータを生存させ、非 static ライフタイムを避けることができます。)

use std::thread;

// これは `main` スレッドです
fn main() {

    // これは処理対象のデータです。
    // スレッド化された map-reduce アルゴリズムによって、すべての桁の合計を計算します。
    // 空白で区切られた各チャンクは、それぞれ別のスレッドで処理されます。
    //
    // TODO: 空白を挿入すると出力がどうなるか確認してみましょう!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // 生成する子スレッドを保持するためのベクタを作成します。
    let mut children = vec![];

    /*************************************************************************
     * "Map" フェーズ
     *
     * データをセグメントに分割し、初期処理を適用する
     ************************************************************************/

    // データを個別に計算するためのセグメントに分割します
    // 各チャンクは実際のデータへの参照 (&str) になります
    let chunked_data = data.split_whitespace();

    // データセグメントを反復処理します。
    // .enumerate() は、反復される各要素に現在のループインデックスを追加します
    // 結果として得られるタプル "(index, element)" は、その直後に
    // "分解代入" によって、"i" と "data_segment" という
    // 2つの変数へ "分解" されます
    for (i, data_segment) in chunked_data.enumerate() {
        println!("data segment {} is \"{}\"", i, data_segment);

        // 各データセグメントを別々のスレッドで処理します
        //
        // spawn() は新しいスレッドへのハンドルを返します。
        // 返り値にアクセスするために、これは必ず保持しなければなりません
        //
        // 'move || -> u32' は、次のようなクロージャの構文です:
        // * 引数を取りません ('||')
        // * キャプチャした変数の所有権を取得し ('move')、
        // * 符号なし32ビット整数を返します ('-> u32')
        //
        // Rust は十分に賢いため、クロージャ自体から '-> u32' を
        // 推論できるので、省くこともできました。
        //
        // TODO: 'move' を削除して何が起こるか試してみましょう
        children.push(thread::spawn(move || -> u32 {
            // このセグメントの中間合計を計算します:
            let result = data_segment
                        // セグメントの文字を反復処理します..
                        .chars()
                        // .. テキスト文字を数値に変換します..
                        .map(|c| c.to_digit(10).expect("should be a digit"))
                        // .. そして結果として得られた数値のイテレータを合計します
                        .sum();

            // println! は stdout をロックするため、テキストの混在は発生しません
            println!("processed segment {}, result={}", i, result);

            // Rust は「式言語」なので、"return" は不要です。
            // 各ブロック内で最後に評価された式が、自動的にその値になります。
            result

        }));
    }


    /*************************************************************************
     * "Reduce" フェーズ
     *
     * 中間結果を収集し、それらを最終結果へ結合する
     ************************************************************************/

    // 各スレッドの中間結果を、単一の最終的な合計へ結合します。
    //
    // sum() に型ヒントを与えるために "turbofish" ::<> を使用します。
    //
    // TODO: turbofish を使わず、代わりに final_result の型を
    // 明示的に指定して試してみましょう
    let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();

    println!("Final sum result: {}", final_result);
}

課題

スレッド数を、ユーザーが入力したデータに依存させるのは賢明ではありません。 ユーザーがたくさんの空白を挿入することにしたらどうなるでしょうか?本当に 2,000個のスレッドを生成したいでしょうか? プログラムの先頭で定義される静的定数によって、データが常に限られた数のチャンクへ分割されるように、 プログラムを変更してください。

関連項目: