Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

共有状態並行性

メッセージ受け渡しは並行性を扱うための優れた方法ですが、それだけが唯一の方法ではありません。 別の方法として、複数のスレッドが同じ共有データにアクセスするやり方があります。 ここで、Go言語のドキュメントにあるスローガンのこの部分をもう一度考えてみましょう: 「メモリを共有することで通信してはならない」。

メモリ共有による通信とは、どのようなものなのでしょうか。さらに、なぜ メッセージ受け渡しの支持者たちはメモリ共有を使わないよう注意を促すのでしょうか。

ある意味では、どのプログラミング言語においてもチャネルは単一所有権に似ています。 なぜなら、ある値をチャネルに送ったなら、その値はもはや使うべきではないからです。 共有メモリ並行性は、複数所有権のようなものです: 複数のスレッドが同時に同じメモリ位置にアクセスできます。第15章で、スマートポインタが複数所有権を可能にしたときに見たように、複数所有権は 管理すべき異なる所有者が存在するため、複雑さを増します。Rustの型システム と所有権ルールは、この管理を正しく行ううえで大いに役立ちます。例として、共有メモリにおけるより一般的な並行性プリミティブの1つであるミューテックスを見ていきましょう。

Mutexによるアクセス制御

Mutexmutual exclusion の略で、ミューテックスはある時点で1つの スレッドだけがあるデータにアクセスできるようにします。ミューテックス内の データにアクセスするには、スレッドはまず、ミューテックスのロックを取得したい と要求することで、アクセスしたいことを知らせなければなりません。lock は ミューテックスの一部であるデータ構造で、現在誰がそのデータへの排他的アクセス権を持っているかを追跡します。したがって、ミューテックスはロック機構によって保持しているデータを guarding していると説明されます。

ミューテックスは使うのが難しいという評判があります。なぜなら、次の2つのルールを 覚えておかなければならないからです:

  1. データを使う前に、必ずロックの取得を試みなければならない。
  2. ミューテックスが保護しているデータの使用が終わったら、他のスレッドがロックを取得できるように、そのデータのロックを解除しなければならない。

ミューテックスの現実世界での比喩として、1本のマイクしかない会議での パネルディスカッションを想像してください。パネリストが話す前には、まず マイクを使いたいと求めるか、その意思を示さなければなりません。マイクを 受け取ったら、好きなだけ話し、その後で次に発言を求めているパネリストに マイクを渡します。もしパネリストが使い終わった後にマイクを渡し忘れると、 他の誰も話せなくなります。共有マイクの管理がうまくいかなければ、その パネルは計画どおりには進みません!

ミューテックスの管理を正しく行うのは非常に難しいことがあります。そのため、 多くの人がチャネルに熱心なのです。しかし、Rustの型システムと所有権ルールの おかげで、ロックとアンロックを誤ることはありません。

Mutex<T> のAPI

ミューテックスの使い方の例として、まずはリスト16-12に示すように、単一 スレッドの文脈でミューテックスを使うところから始めましょう。

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {m:?}");
}

多くの型と同様に、Mutex<T> は関連関数 new を使って作成します。 ミューテックスの内部のデータにアクセスするには、lock メソッドを使って ロックを取得します。この呼び出しは、ロックを取得する順番が回ってくるまで、 現在のスレッドをブロックして何の作業もできないようにします。

ロックを保持していた別のスレッドがパニックした場合、lock の呼び出しは 失敗します。その場合、誰も二度とロックを取得できなくなるため、ここでは その状況にあるならこのスレッドもパニックするように、unwrap を使うことにしました。

ロックを取得した後は、この場合 num という名前の戻り値を、内部データへの 可変参照として扱えます。型システムにより、m 内の値を使う前にロックを 取得することが保証されます。m の型は i32 ではなく Mutex<i32> なので、 i32 の値を使うには 必ず lock を呼び出さなければなりません。 忘れることはできません。そうしないと、型システムが内部の i32 への アクセスを許可しないからです。

lock の呼び出しは MutexGuard という型を返します。これは LockResult に包まれており、unwrap の呼び出しで処理しました。 MutexGuard 型は、内部データを指すために Deref を実装しています。 この型はさらに Drop も実装しており、MutexGuard がスコープを抜けたとき、 つまり内部スコープの終わりで、自動的にロックを解放します。その結果、 ロックの解放は自動的に行われるため、ロックを解放し忘れてミューテックスが 他のスレッドから使えなくなる危険はありません。

ロックがドロップされた後、ミューテックスの値を出力すると、内部の i326 に変更できたことがわかります。

Mutex<T> への共有アクセス

次に、Mutex<T> を使って複数のスレッド間で値を共有してみましょう。10個の スレッドを起動し、それぞれがカウンタの値を1ずつ増やすようにするので、 カウンタは0から10になります。リスト16-13の例ではコンパイラエラーが発生し、 そのエラーを手がかりに Mutex<T> の使い方と、それをRustがどのように正しく 使えるよう助けてくれるかを詳しく見ていきます。

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

counter 変数を作成し、リスト16-12で行ったように Mutex<T> の内部に i32 を保持します。次に、数値の範囲を反復処理して10個のスレッドを作成します。 thread::spawn を使い、すべてのスレッドに同じクロージャを渡します。これは カウンタをスレッドにムーブし、lock メソッドを呼び出して Mutex<T> の ロックを取得し、その後ミューテックス内の値に1を加えるクロージャです。 スレッドがそのクロージャの実行を終えると、num はスコープを抜け、ロックを 解放するので、別のスレッドがそれを取得できるようになります。

メインスレッドでは、すべてのjoinハンドルを収集します。それから、リスト16-2で 行ったように、すべてのスレッドが終了することを確認するために各ハンドルに 対して join を呼び出します。その時点で、メインスレッドがロックを取得し、 このプログラムの結果を出力します。

この例はコンパイルできないと示唆していました。では、その理由を見てみましょう!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
 5 |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
 8 |     for _ in 0..10 {
   |     -------------- inside of this loop
 9 |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
 8 ~     let mut value = counter.lock();
 9 ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

エラーメッセージは、counter の値がループの前回の反復ですでにムーブされたと 述べています。Rustは、ロック counter の所有権を複数のスレッドにムーブすることはできないと教えてくれています。第15章で説明した複数所有権の方法で、このコンパイラエラーを修正しましょう。

複数スレッドにおける複数所有権

第15章では、スマートポインタ Rc<T> を使って参照カウントされる値を作成する ことで、ある値を複数の所有者に渡しました。ここでも同じことをして、何が起こるかを見てみましょう。リスト16-14では Mutex<T>Rc<T> でラップし、スレッドに所有権をムーブする前に Rc<T> をクローンします。

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

もう一度コンパイルすると……今度は別のエラーが出ます! コンパイラは 多くのことを教えてくれています:

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<std::sync::Mutex<i32>>`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:723:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

これはずいぶん長いエラーメッセージです! ここで注目すべき重要な部分は 次のとおりです: `Rc<Mutex<i32>>` cannot be sent between threads safely。コンパイラは、 その理由も教えてくれています: the trait `Send` is not implemented for `Rc<Mutex<i32>>`Send については次の節で説明します。これは、 スレッドとともに使う型が並行な状況での使用に適していることを保証する トレイトの 1 つです。

残念ながら、Rc<T> はスレッド間で共有しても安全ではありません。 Rc<T> は参照カウントを管理する際、clone が呼ばれるたびにカウントを 増やし、各クローンがドロップされるとカウントを減らします。しかし、 カウントの変更が別のスレッドによって中断されないことを保証するための 並行性プリミティブは一切使っていません。その結果、カウントが誤る 可能性があります。そうした微妙なバグは、メモリリークや、まだ使い終えて いない値が早くドロップされてしまうことにつながりかねません。私たちに 必要なのは、Rc<T> とまったく同じようでありながら、参照カウントの 変更をスレッドセーフな方法で行う型です。

Arc<T> によるアトミックな参照カウント

幸い、Arc<T> 、並行な状況で安全に使える Rc<T> のような型です。 aatomic を表し、つまり アトミックに参照カウントされる 型という 意味です。アトミックは、ここでは詳しく扱わない別種の並行性プリミティブ です。詳しくは、標準ライブラリの std::sync::atomic のドキュメントを参照してください。現時点では、アトミックはプリミティブ型 のように動作しつつ、スレッド間で安全に共有できることだけ知っていれば 十分です。

すると、なぜすべてのプリミティブ型がアトミックではなく、なぜ標準 ライブラリの型がデフォルトで Arc<T> を使うよう実装されていないのか、 と不思議に思うかもしれません。その理由は、スレッド安全性には性能上の コストが伴い、そのコストは本当に必要なときにだけ払いたいからです。 単一スレッド内の値に対して操作しているだけなら、アトミックが提供する 保証を強制しなくてよいぶん、コードはより高速に動作できます。

例に戻りましょう。Arc<T>Rc<T> は同じ API を持っているので、 use 行、new の呼び出し、clone の呼び出しを変更すれば、私たちの プログラムを修正できます。リスト 16-15 のコードは、ついにコンパイルして 実行できるようになります。

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

このコードは次のように出力します:

Result: 10

やりました! 0 から 10 まで数えられました。あまりすごいことには思えない かもしれませんが、Mutex<T> とスレッド安全性について多くのことを学べ ました。このプログラムの構造は、単にカウンタを増やすだけでなく、もっと 複雑な操作にも使えます。この戦略を使えば、計算を独立した部分に分割し、 その各部分をスレッドに振り分け、Mutex<T> を使って各スレッドが自分の 担当分で最終結果を更新するようにできます。

単純な数値演算をしているのであれば、標準ライブラリの std::sync::atomic モジュール には Mutex<T> より 単純な型が用意されていることにも注意してください。これらの型は、 プリミティブ型に対する安全で並行なアトミックアクセスを提供します。 この例でプリミティブ型に対して Mutex<T> を使うことにしたのは、 Mutex<T> がどのように動くかに集中できるようにするためです。

RefCell<T>/Rc<T>Mutex<T>/Arc<T> の比較

counter は不変なのに、その中の値への可変参照を取得できたことに気づいた かもしれません。これは、Cell ファミリーがそうであるように、 Mutex<T> が内部可変性を提供していることを意味します。第 15 章で Rc<T> の中身を変更できるように RefCell<T> を使ったのと同じように、 Arc<T> の中身を変更するために Mutex<T> を使います。

もう 1 つ注意すべき点は、Mutex<T> を使うとき、Rust はあらゆる種類の ロジックエラーからあなたを守ってくれるわけではないということです。 第 15 章で、Rc<T> を使うと参照サイクルを作ってしまう危険があり、2 つの Rc<T> の値が互いを参照し合うことでメモリリークが起きる可能性があると 学びました。同様に、Mutex<T> には デッドロック を引き起こす危険が あります。これは、ある操作が 2 つのリソースをロックする必要があり、 2 つのスレッドがそれぞれ片方のロックを獲得した結果、互いを永遠に待ち 続けるようになったときに起こります。デッドロックに興味があるなら、 デッドロックを起こす Rust プログラムを作ってみてください。その後、 どんな言語でもよいのでミューテックスのデッドロック軽減戦略を調べ、 Rust で実装してみましょう。標準ライブラリの Mutex<T>MutexGuard の API ドキュメントにも、有用な情報があります。

この章の締めくくりとして、Send トレイトと Sync トレイト、そして それらをカスタム型とともにどう使えるかについて説明します。