index

Rust 多執行緒共享計數器

· 6min

Conclusion

// 260531 多執行緒共享計數器
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let counter_clone = Arc::clone(&counter);

    let handle = thread::spawn(move || {
        for _ in 0..5 {
            thread::sleep(Duration::from_millis(500));
            println!("counter_clone: {}", counter_clone.lock().unwrap());
        }
    });

    {
        thread::sleep(Duration::from_millis(1000));

        let mut data = counter.lock().unwrap();
        *data = 42;
    }

    handle.join().unwrap();
}

Practice

Mutex / Arc

quote


Now imagine installing a special bathroom lock with these rules:

  1. Anyone wanting to use the bathroom must get the key first
  2. Only one person can have the key at any time
  3. Others must wait until the key is returned
  4. The person with the key has complete, exclusive access

一群室友爭著同一間浴室,但為了避免混亂必需設立一些規則,像是

  1. 必需擁有鑰匙才能開門
  2. 一次只有一個人能拿到鑰匙
  3. 其他人必需等到上一個人歸還鑰匙才能使用浴室
  4. 擁有鑰匙的人就擁有浴室的使用權

Mutex

這就是 Mutex(mutual exclusion,互斥鎖),對應到資料上,表示永遠只有一個人能碰那個資料,在所有權的限制下,提供了安全且共享可變狀態 (兩個人同時爭奪並修改同一個記憶體)的解決方案。

Arc

Arc( Atomic Reference Counted,原子性引用計數),先看例子:

use std::sync::Arc;
use std::thread;

fn main() {
    let apple = Arc::new("the same apple");

    for _ in 0..10 {
        let apple = Arc::clone(&apple);

        thread::spawn(move || {
            // 由于使用了Arc,线程可以使用分配在 `Arc` 变量指针位置的值来生成。
            println!("{:?}", apple);
        });
    }
}

Arc 的本質就是一個智慧指標(Smart Pointer),也就是除了記憶位置之外還附帶了額外功能,其中一個是 counter,記錄目前有多少個 Arc 正指著這塊記憶體。

一但其中一個 Arc 被 drop out,其計數會減 1,直到歸 0 就會自動呼叫記憶體釋放。

相關的另有 Rc<T>(Reference Counted)主要使用在 single thread 上,而 Arc 主要使用在 multi thread。

補充
  • 在上述這個範例,因為只有讀取 apple 而不修改,所以沒有使用 Mutex,而且 Arc 預設是 immutable。
  • 理論上可以用 reference 改寫,但實際應用上因為無法確定 main 可以活多久(準確來說是 Arc 儘可能延長資料的生命週期,因為 main 結束時,整個進程都會被強制關閉),有可能外部的 apple 會先消失,容易有活得不夠長的風險。

Process / Thread

在作業系統中,當你啟動一個程式,系統會分給這個程式一整塊獨立的記憶體空間,這叫做進程(Process)。一個進程裡面可以包含好幾個執行緒(Thread)

這塊進行中的記憶體,主要分為兩個大區域:

棧(Stack)

每個 thread 在被建立(spawn)時,作業系統會分給它一塊專屬於它自己的記憶體空間,採取先進後出(First in-Last out,FILO) 依序堆疊放置在棧內

每個程式所分配到的 Stack 大小在編譯完成就固定了,如果執行時塞入 stack 的資料大於它所擁有的空間,就會爆出記憶體空間不足,即大名鼎鼎的 stack overflow

堆(Heap)

Process 裡所有的 thread(主執行緒、音訊執行緒等)共同擁有、共同存取這塊區域。通常儲存會動態改變的資料,分配的空間也較大,像是音訊樣本陣列 Vec<f32>、大型物件等,由程式設計師分配與釋放。

就是因為大家都能伸手進來,所以才會發生前面提到的 data race 問題,也因為 heap 是手動管理的,必須確保每個分配的內存塊都被正確地釋放,否則就會產生記憶體泄漏(memory leak) 造成效能上的浪費。

請求 heap 記憶體區塊時時需要與作業系統溝通,因此速度會比 stack 來的較慢

再談 Rust 的 Ownership

Rust 裡面的所有權機制,本質上是指資料在 thread 之間的轉移

let my_data = vec![0.1, 0.2, 0.3]; // 原本在主執行緒的堆疊上持有這個 Vec 的所有權

std::thread::spawn(move || {
    // 因為加了 `move` keyword
    // my_data 的「所有權(控制權)」被強行轉移進了這個新執行緒
    println!("{:?}", my_data); 
});

// ❌ 這裡如果你試圖再使用 my_data,編譯器會直接報錯
// 主執行緒已經失去了它的所有權

JoinHandle

透過儲存 thread::spawn 回傳的數值為變數,我們可以修正產生的執行緒完全沒有執行或沒有執行完成的問題。thread::spawn 的回傳型別為 JoinHandle

JoinHandle 是個有所有權的數值,當我們對它呼叫 join 方法時,它就會等待它的執行緒完成,即「主執行緒停在這裡,卡住不准動,直到背景執行緒(handle)執行完畢並關閉為止」。


References