Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

スレッド

短命なスレッドを生成する

std-badge cat-concurrency-badge

std::thread::scope は新しいスコープ付きスレッドを生成し、そのスレッドはクロージャから戻る前に必ず終了することが保証されています。つまり、Arc やその他の所有権に関するテクニックを使わなくても、呼び出し元の関数のデータを参照できます。

この例では、配列を半分に分割し、それぞれ別のスレッドで処理を実行します。

fn main() {
    let arr = &[1, 25, -4, 10];
    let max = find_max(arr);
    assert_eq!(max, Some(25));
}

fn find_max(arr: &[i32]) -> Option<i32> {
    const THRESHOLD: usize = 2;

    if arr.len() <= THRESHOLD {
        return arr.iter().cloned().max();
    }

    let mid = arr.len() / 2;
    let (left, right) = arr.split_at(mid);

    std::thread::scope(|s| {
        let thread_l = s.spawn(|| find_max(left));
        let thread_r = s.spawn(|| find_max(right));

        let max_l = thread_l.join().unwrap()?;
        let max_r = thread_r.join().unwrap()?;

        Some(max_l.max(max_r))
    })
}

並列パイプラインを作成する

crossbeam-badge cat-concurrency-badge

この例では、crossbeam および crossbeam-channel クレートを使って、 ZeroMQ の guide で説明されているものに似た並列パイプラインを作成します。 データソースとデータシンクがあり、データはソースからシンクへ向かう途中で、 2 つのワーカースレッドによって並列に処理されます。

ここでは、crossbeam_channel::bounded を使って、容量 1 の bounded チャネルを 使用します。プロデューサーは独自のスレッド上になければなりません。これは、 プロデューサーがワーカーよりも速くメッセージを生成するためです (ワーカーは 0.5 秒スリープするため)。つまり、ワーカーのどちらか一方が チャネル内のデータを処理するまで、プロデューサーは crossbeam_channel::Sender::send の呼び出しで 0.5 秒間ブロックされます。 また、チャネル内のデータは先に receive を呼び出したワーカーによって消費されるため、 各メッセージは両方のワーカーではなく、1 つのワーカーにだけ配送されることにも注意してください。

イテレーターメソッド crossbeam_channel::Receiver::iter を介して チャネルから読み取ると、新しいメッセージを待つ間、またはチャネルが閉じられるまで、 処理はブロックされます。チャネルは crossbeam::scope 内で作成されているため、 ワーカーの for ループでプログラム全体がブロックされるのを防ぐには、 drop を使って手動で閉じる必要があります。drop の呼び出しは、 これ以上メッセージが送信されないことを知らせるシグナルと考えることができます。

use std::thread;
use std::time::Duration;
use crossbeam::channel::bounded;

fn main() {
    let (snd1, rcv1) = bounded(1);
    let (snd2, rcv2) = bounded(1);
    let n_msgs = 4;
    let n_workers = 2;

    crossbeam::scope(|s| {
        // プロデューサースレッド
        s.spawn(|_| {
            for i in 0..n_msgs {
                snd1.send(i).unwrap();
                println!("ソースが {} を送信", i);
            }
            // チャネルを閉じる - これはワーカー内の
            // for ループを終了するために必要
            drop(snd1);
        });

        // 2 つのスレッドによる並列処理
        for _ in 0..n_workers {
            // シンクに送信し、ソースから受信
            let (sendr, recvr) = (snd2.clone(), rcv1.clone());
            // 別々のスレッドでワーカーを生成
            s.spawn(move |_| {
                thread::sleep(Duration::from_millis(500));
                // チャネルが閉じられるまで受信
                for msg in recvr.iter() {
                    println!("ワーカー {:?} が {} を受信しました。",
                             thread::current().id(), msg);
                    sendr.send(msg * 2).unwrap();
                }
            });
        }
        // チャネルを閉じる。そうしないとシンクが
        // for ループを終了できない
        drop(snd2);

        // シンク
        for msg in rcv2.iter() {
            println!("シンクが {} を受信", msg);
        }
    }).unwrap();
}

2 つのスレッド間でデータを受け渡す

crossbeam-badge cat-concurrency-badge

この例では、単一プロデューサー・単一コンシューマー(SPSC)の設定で crossbeam-channel を使用する方法を示します。ex-crossbeam-spawn の例をベースに、crossbeam::scopeScope::spawn を使ってプロデューサースレッドを管理します。データは crossbeam_channel::unbounded チャネルを使って 2 つのスレッド間でやり取りされます。これは、格納可能なメッセージ数に制限がないことを意味します。プロデューサースレッドは、各メッセージの間で 0.5 秒スリープします。

use std::{thread, time};
use crossbeam::channel::unbounded;

fn main() {
    let (snd, rcv) = unbounded();
    let n_msgs = 5;
    crossbeam::scope(|s| {
        s.spawn(|_| {
            for i in 0..n_msgs {
                snd.send(i).unwrap();
                thread::sleep(time::Duration::from_millis(100));
            }
        });
    }).unwrap();
    for _ in 0..n_msgs {
        let msg = rcv.recv().unwrap();
        println!("Received {}", msg);
    }
}

グローバルな可変状態を維持する

lazy_static-badge cat-rust-patterns-badge

lazy_static を使ってグローバルな状態を宣言します。lazy_static は、グローバルに利用可能な static ref を作成します。これを変更可能にするには Mutex が必要です(RwLock も参照してください)。Mutex によるラップにより、複数のスレッドから同時に状態へアクセスできなくなり、競合状態を防ぎます。Mutex に格納された値を読み取ったり変更したりするには、MutexGuard を取得する必要があります。

use anyhow::{Result, anyhow};
use lazy_static::lazy_static;
use std::sync::Mutex;

lazy_static! {
    static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

fn insert(fruit: &str) -> Result<()> {
    let mut db = FRUIT.lock().map_err(|_| anyhow!("Failed to acquire MutexGuard"))?;
    db.push(fruit.to_string());
    Ok(())
}

fn main() -> Result<()> {
    insert("apple")?;
    insert("orange")?;
    insert("peach")?;
    {
        let db = FRUIT.lock().map_err(|_| anyhow!("Failed to acquire MutexGuard"))?;

        db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item));
    }
    insert("grape")?;
    Ok(())
}

iso ファイルの SHA256 を並行して計算する

threadpool-badge num_cpus-badge walkdir-badge ring-badge cat-concurrency-badgecat-filesystem-badge

この例では、現在のディレクトリにある iso 拡張子を持つすべてのファイルについて SHA256 を計算します。スレッドプールは、num_cpus::get で取得したシステム内のコア数と同じ数のスレッドを生成します。Walkdir::new は現在のディレクトリを走査し、execute を呼び出して、読み込みと SHA256 ハッシュの計算を実行します。

use walkdir::WalkDir;
use std::fs::File;
use std::io::{BufReader, Read, Error};
use std::path::Path;
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
use ring::digest::{Context, Digest, SHA256};

// iso 拡張子を検証する
fn is_iso(entry: &Path) -> bool {
    match entry.extension() {
        Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true,
        _ => false,
    }
}

fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> {
    let mut buf_reader = BufReader::new(File::open(&filepath)?);
    let mut context = Context::new(&SHA256);
    let mut buffer = [0; 1024];

    loop {
        let count = buf_reader.read(&mut buffer)?;
        if count == 0 {
            break;
        }
        context.update(&buffer[..count]);
    }

    Ok((context.finish(), filepath))
}

fn main() -> Result<(), Error> {
    let pool = ThreadPool::new(num_cpus::get());

    let (tx, rx) = channel();

    for entry in WalkDir::new("/home/user/Downloads")
        .follow_links(true)
        .into_iter()
        .filter_map(|e| e.ok())
        .filter(|e| !e.path().is_dir() && is_iso(e.path())) {
            let path = entry.path().to_owned();
            let tx = tx.clone();
            pool.execute(move || {
                let digest = compute_digest(path);
                tx.send(digest).expect("Could not send data!");
            });
        }

    drop(tx);
    for t in rx.iter() {
        let (sha, path) = t?;
        println!("{:?} {:?}", sha, path);
    }
    Ok(())
}

作業をスレッドプールにディスパッチしてフラクタルを描画する

threadpool-badge num-badge num_cpus-badge image-badge cat-concurrency-badgecat-science-badgecat-rendering-badge

この例では、分散計算のためにスレッドプールを使用して、Julia set からフラクタルを描画し、画像を生成します。

指定した幅と高さの出力画像用のメモリを ImageBuffer::new で確保します。 Rgb::from_channels は RGB ピクセル値を計算します。 num_cpus::get で取得したコア数と同じスレッド数で ThreadPool を作成します。 ThreadPool::execute は各ピクセルを個別のジョブとして受け取ります。

mpsc::channel はジョブを受信し、Receiver::recv はそれらを取り出します。 ImageBuffer::put_pixel はそのデータを使ってピクセルの色を設定します。 ImageBuffer::save は画像を output.png に書き込みます。

use anyhow::Result;
use std::sync::mpsc::channel;
use threadpool::ThreadPool;
use num::complex::Complex;
use image::{ImageBuffer, Pixel, Rgb};

// 輝度値を RGB に変換する関数
// http://www.efg2.com/Lab/ScienceAndEngineering/Spectra.htm に基づく
fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> {
    let wave = wavelength as f32;

    let (r, g, b) = match wavelength {
        380..=439 => ((440. - wave) / (440. - 380.), 0.0, 1.0),
        440..=489 => (0.0, (wave - 440.) / (490. - 440.), 1.0),
        490..=509 => (0.0, 1.0, (510. - wave) / (510. - 490.)),
        510..=579 => ((wave - 510.) / (580. - 510.), 1.0, 0.0),
        580..=644 => (1.0, (645. - wave) / (645. - 580.), 0.0),
        645..=780 => (1.0, 0.0, 0.0),
        _ => (0.0, 0.0, 0.0),
    };

    let factor = match wavelength {
        380..=419 => 0.3 + 0.7 * (wave - 380.) / (420. - 380.),
        701..=780 => 0.3 + 0.7 * (780. - wave) / (780. - 700.),
        _ => 1.0,
    };

    let (r, g, b) = (normalize(r, factor), normalize(g, factor), normalize(b, factor));
    Rgb([r, g, b])
}

// Julia set の距離推定を輝度値にマッピングする
fn julia(c: Complex<f32>, x: u32, y: u32, width: u32, height: u32, max_iter: u32) -> u32 {
    let width = width as f32;
    let height = height as f32;

    let mut z = Complex {
        // 点を画像座標にスケーリングして平行移動する
        re: 3.0 * (x as f32 - 0.5 * width) / width,
        im: 2.0 * (y as f32 - 0.5 * height) / height,
    };

    let mut i = 0;
    for t in 0..max_iter {
        if z.norm() >= 2.0 {
            break;
        }
        z = z * z + c;
        i = t;
    }
    i
}

// 色の輝度値を RGB の範囲内に正規化する
fn normalize(color: f32, factor: f32) -> u8 {
    ((color * factor).powf(0.8) * 255.) as u8
}

fn main() -> Result<()> {
    let (width, height) = (1920, 1080);
    let mut img = ImageBuffer::new(width, height);
    let iterations = 300;

    let c = Complex::new(-0.8, 0.156);

    let pool = ThreadPool::new(num_cpus::get());
    let (tx, rx) = channel();

    for y in 0..height {
        let tx = tx.clone();
        pool.execute(move || for x in 0..width {
                         let i = julia(c, x, y, width, height, iterations);
                         let pixel = wavelength_to_rgb(380 + i * 400 / iterations);
                         tx.send((x, y, pixel)).expect("データを送信できませんでした!");
                     });
    }

    for _ in 0..(width * height) {
        let (x, y, pixel) = rx.recv()?;
        img.put_pixel(x, y, pixel);
    }
    let _ = img.save("output.png")?;
    Ok(())
}