Rust多線程:Worker 結構體與線程池中任務的傳遞機制
本文分享自天翼云開發(fā)者社區(qū)《Rust多線程:Worker 結構體與線程池中任務的傳遞機制》,作者:l****n
Rust多線程:Worker 結構體與線程池中任務的傳遞機制**在實現(xiàn)一個多線程的 Web 服務器時,我們會遇到一個問題:如何在創(chuàng)建線程之后讓它們在沒有任務時保持等待狀態(tài),并且在任務到來時可以立即執(zhí)行。這是一個典型的線程池設計問題。在 Rust 中,我們需要通過自定義設計來實現(xiàn)這個功能,因為標準庫中的 **thread::spawn 并不直接支持這種用法。
問題描述**Rust 的 **thread::spawn 方法會立即執(zhí)行傳入的閉包。如果我們想要在線程池中創(chuàng)建線程并讓它們等待任務(即在創(chuàng)建時不執(zhí)行任何任務),我們就需要自己設計一種機制,能夠在稍后將任務傳遞給這些已經(jīng)創(chuàng)建好的線程。
解決方案:引入 Worker 結構體**為了解決這個問題,我們引入了一個 **Worker 結構體來管理線程池中的每個線程。Worker 的作用類似于一個工人,它等待任務的到來并在接收到任務時執(zhí)行。
1. Worker 結構體的定義Worker 結構體包含兩個字段:
id:用于標識每個 Worker。
thread:存放線程的 JoinHandle<()>,它是由 thread::spawn 返回的。
代碼如下:
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}2. 創(chuàng)建 Worker 實例**為了讓 **Worker 在沒有任務時處于等待狀態(tài),我們可以在 Worker::new 函數(shù)中使用 thread::spawn 創(chuàng)建線程,并傳入一個空的閉包:
impl Worker {
fn new(id: usize) -> Worker { let thread = thread::spawn(|| {});
Worker { id, thread }
}
}**在這里,我們創(chuàng)建了一個 **Worker 實例,每個 Worker 都會啟動一個線程。但這個線程目前還什么都不做,因為我們傳遞給 spawn 的閉包是空的。
3. 將 Worker 集成到線程池中**接下來,我們修改 **ThreadPool 的實現(xiàn),使其存儲 Worker 的實例而不是直接存儲線程的 JoinHandle<()>。在 ThreadPool::new 中,我們使用一個 for 循環(huán)創(chuàng)建多個 Worker 實例,并將它們存儲在一個 Vec<Worker> 中:
pub struct ThreadPool {
workers: Vec<Worker>,
}impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
}**這樣,我們就為線程池創(chuàng)建了一個由多個 **Worker 組成的集合。每個 Worker 都有一個唯一的 ID,并且都啟動了一個線程,雖然這些線程目前還沒有執(zhí)行任何有用的任務。
向 Worker 發(fā)送任務現(xiàn)在,我們解決了創(chuàng)建線程并讓它們等待任務的問題。接下來,我們需要設計一個機制,使得線程池能夠在任務到來時將任務發(fā)送給等待中的線程。
1. 使用信道傳遞任務**在 Rust 中,信道(channel)是一種非常適合在線程之間傳遞數(shù)據(jù)的工具。我們可以使用一個信道來傳遞任務。線程池會創(chuàng)建一個信道的發(fā)送端,每個 **Worker 會擁有信道的接收端。任務通過信道從線程池傳遞到 Worker,再由 Worker 中的線程執(zhí)行。
use std::{sync::mpsc, thread};pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}struct Job;impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
}2. Worker 處理任務**為了讓 **Worker 能夠處理任務,我們將信道的接收端傳遞給每個 Worker 的線程。線程會不斷地從信道中接收任務,并執(zhí)行這些任務。
impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(move || {
receiver;
});
Worker { id, thread }
}
}**不過在這段代碼中,存在一個問題:信道的接收端 **receiver 被移交給了第一個 Worker,導致無法將其傳遞給其他 Worker。
3. 使用 Arc 和 Mutex 共享接收端**為了解決這個問題,我們需要使用 **Arc<Mutex<T>> 來共享信道的接收端,這樣所有的 Worker 都可以安全地從同一個信道接收任務:
use std::{sync::{mpsc, Arc, Mutex}, thread};type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{ let job = Box::new(f); self.sender.send(job).unwrap();
}
}impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job();
});
Worker { id, thread }
}
}**在 **Worker::new 中,線程會不斷地嘗試獲取鎖來接收任務,并在收到任務后執(zhí)行。這里我們使用了 Arc 來共享接收端,使用 Mutex 來確保一次只有一個 Worker 能夠接收任務。
type Job = Box<dyn FnOnce() + Send + 'static>;**這行代碼定義了一個類型別名 **Job。它代表了一個特定的任務類型:
Box<dyn FnOnce() + Send + 'static> 是一個動態(tài)分發(fā)的閉包(或函數(shù)),其具體實現(xiàn)類型在編譯時不確定。Box 是一個堆分配的智能指針,用于將閉包存儲在堆上。
dyn FnOnce() 表示這個閉包實現(xiàn)了 FnOnce trait,可以被調用一次。
Send 表示這個閉包可以在線程之間安全地傳遞。
'static 表示閉包的生命周期是整個程序的生命周期,確保閉包在多個線程中可以安全使用。
**這個方法的功能是將一個新的任務(閉包)添加到線程池的任務隊列中,以供線程池中的工作線程執(zhí)行。下面是對 **F: FnOnce() + Send + 'static 的解釋:
F: FnOnce() + Send + 'static
** 是一個泛型約束,表示必須是一個實現(xiàn)了 FnOnce、Send和 'static的閉包類型。**
FnOnce() 確保閉包可以被調用一次。
Send 確保閉包可以安全地在線程之間傳遞。
'static 確保閉包的生命周期足夠長,可以在整個程序運行期間有效。
**在 **execute 方法中,你將傳入的閉包 f 轉換成 Job 類型(即 Box<dyn FnOnce() + Send + 'static>),然后通過 self.sender 將其發(fā)送到任務隊列中。這使得線程池的工作線程可以從隊列中接收并執(zhí)行這些任務。
總結**通過引入 **Worker 結構體并使用信道進行任務傳遞,我們成功地實現(xiàn)了一個可以延遲分配任務的線程池。每個 Worker 都是在創(chuàng)建時啟動的,但它們會等待任務的到來,只有在接收到任務后才會開始執(zhí)行。這種設計不僅提高了服務器的吞吐量,還確保了線程資源的高效利用。
*博客內容為網(wǎng)友個人發(fā)布,僅代表博主個人觀點,如有侵權請聯(lián)系工作人員刪除。







