日本a√视频在线,久久青青亚洲国产,亚洲一区欧美二区,免费g片在线观看网站

        <style id="k3y6c"><u id="k3y6c"></u></style>
        <s id="k3y6c"></s>
        <mark id="k3y6c"></mark>
          
          

          <mark id="k3y6c"></mark>

          "); //-->

          博客專欄

          EEPW首頁 > 博客 > Rust多線程:Worker 結構體與線程池中任務的傳遞機制

          Rust多線程:Worker 結構體與線程池中任務的傳遞機制

          發(fā)布人:天翼云開發(fā)者 時間:2025-09-18 來源:工程師 發(fā)布文章

          本文分享自天翼云開發(fā)者社區(qū)《Rust多線程:Worker 結構體與線程池中任務的傳遞機制》,作者:l****n

          Rust多線程:Worker 結構體與線程池中任務的傳遞機制

          **在實現(xiàn)一個多線程的 Web 服務器時,我們會遇到一個問題:如何在創(chuàng)建線程之后讓它們在沒有任務時保持等待狀態(tài),并且在任務到來時可以立即執(zhí)行。這是一個典型的線程池設計問題。在 Rust 中,我們需要通過自定義設計來實現(xiàn)這個功能,因為標準庫中的 **thread::spawn 并不直接支持這種用法。

          問題描述

          **Rust 的 **thread::spawn 方法會立即執(zhí)行傳入的閉包。如果我們想要在線程池中創(chuàng)建線程并讓它們等待任務(即在創(chuàng)建時不執(zhí)行任何任務),我們就需要自己設計一種機制,能夠在稍后將任務傳遞給這些已經(jīng)創(chuàng)建好的線程。

          解決方案:引入 Worker 結構體

          **為了解決這個問題,我們引入了一個 **Worker 結構體來管理線程池中的每個線程。Worker 的作用類似于一個工人,它等待任務的到來并在接收到任務時執(zhí)行。

          1. Worker 結構體的定義

          Worker 結構體包含兩個字段:

          • id:用于標識每個 Worker。

          • thread:存放線程的 JoinHandle<()>,它是由 thread::spawn 返回的。

          代碼如下:

          struct Worker {
              id: usize,
              thread: thread::JoinHandle<()>,
          }
          2. 創(chuàng)建 Worker 實例

          **為了讓 **Worker 在沒有任務時處于等待狀態(tài),我們可以在 Worker::new 函數(shù)中使用 thread::spawn 創(chuàng)建線程,并傳入一個空的閉包:

          impl Worker {
              fn new(id: usize) -> Worker {        let thread = thread::spawn(|| {});
          
                  Worker { id, thread }
              }
          }

          **在這里,我們創(chuàng)建了一個 **Worker 實例,每個 Worker 都會啟動一個線程。但這個線程目前還什么都不做,因為我們傳遞給 spawn 的閉包是空的。

          3. 將 Worker 集成到線程池中

          **接下來,我們修改 **ThreadPool 的實現(xiàn),使其存儲 Worker 的實例而不是直接存儲線程的 JoinHandle<()>。在 ThreadPool::new 中,我們使用一個 for 循環(huán)創(chuàng)建多個 Worker 實例,并將它們存儲在一個 Vec<Worker> 中:

          pub struct ThreadPool {
              workers: Vec<Worker>,
          }impl ThreadPool {    pub fn new(size: usize) -> ThreadPool {        assert!(size > 0);        let mut workers = Vec::with_capacity(size);        for id in 0..size {
                      workers.push(Worker::new(id));
                  }
          
                  ThreadPool { workers }
              }
          }

          **這樣,我們就為線程池創(chuàng)建了一個由多個 **Worker 組成的集合。每個 Worker 都有一個唯一的 ID,并且都啟動了一個線程,雖然這些線程目前還沒有執(zhí)行任何有用的任務。

          向 Worker 發(fā)送任務

          現(xiàn)在,我們解決了創(chuàng)建線程并讓它們等待任務的問題。接下來,我們需要設計一個機制,使得線程池能夠在任務到來時將任務發(fā)送給等待中的線程。

          1. 使用信道傳遞任務

          **在 Rust 中,信道(channel)是一種非常適合在線程之間傳遞數(shù)據(jù)的工具。我們可以使用一個信道來傳遞任務。線程池會創(chuàng)建一個信道的發(fā)送端,每個 **Worker 會擁有信道的接收端。任務通過信道從線程池傳遞到 Worker,再由 Worker 中的線程執(zhí)行。

          use std::{sync::mpsc, thread};pub struct ThreadPool {
              workers: Vec<Worker>,
              sender: mpsc::Sender<Job>,
          }struct Job;impl ThreadPool {    pub fn new(size: usize) -> ThreadPool {        assert!(size > 0);        let (sender, receiver) = mpsc::channel();        let mut workers = Vec::with_capacity(size);        for id in 0..size {
                      workers.push(Worker::new(id));
                  }
          
                  ThreadPool { workers, sender }
              }
          }
          2. Worker 處理任務

          **為了讓 **Worker 能夠處理任務,我們將信道的接收端傳遞給每個 Worker 的線程。線程會不斷地從信道中接收任務,并執(zhí)行這些任務。

          impl Worker {    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {        let thread = thread::spawn(move || {
                      receiver;
                  });
          
                  Worker { id, thread }
              }
          }

          **不過在這段代碼中,存在一個問題:信道的接收端 **receiver 被移交給了第一個 Worker,導致無法將其傳遞給其他 Worker。

          3. 使用 Arc 和 Mutex 共享接收端

          **為了解決這個問題,我們需要使用 **Arc<Mutex<T>> 來共享信道的接收端,這樣所有的 Worker 都可以安全地從同一個信道接收任務:

          use std::{sync::{mpsc, Arc, Mutex}, thread};type Job = Box<dyn FnOnce() + Send + 'static>;
          
          impl ThreadPool {
              pub fn new(size: usize) -> ThreadPool {
                  assert!(size > 0);
          
                  let (sender, receiver) = mpsc::channel();
                  let receiver = Arc::new(Mutex::new(receiver));
          
                  let mut workers = Vec::with_capacity(size);
          
                  for id in 0..size {
                      workers.push(Worker::new(id, Arc::clone(&receiver)));
                  }
          
                  ThreadPool { workers, sender }
              }
              
               pub fn execute<F>(&self, f: F)
              where
                  F: FnOnce() + Send + 'static,
              {        let job = Box::new(f);        self.sender.send(job).unwrap();
              }
          }impl Worker {    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {        let thread = thread::spawn(move || loop {            let job = receiver.lock().unwrap().recv().unwrap();            println!("Worker {id} got a job; executing.");            job();
                  });
          
                  Worker { id, thread }
              }
          }

          **在 **Worker::new 中,線程會不斷地嘗試獲取鎖來接收任務,并在收到任務后執(zhí)行。這里我們使用了 Arc 來共享接收端,使用 Mutex 來確保一次只有一個 Worker 能夠接收任務。

          type Job = Box<dyn FnOnce() + Send + 'static>;

          **這行代碼定義了一個類型別名 **Job。它代表了一個特定的任務類型:

          • Box<dyn FnOnce() + Send + 'static> 是一個動態(tài)分發(fā)的閉包(或函數(shù)),其具體實現(xiàn)類型在編譯時不確定。Box 是一個堆分配的智能指針,用于將閉包存儲在堆上。

          • dyn FnOnce() 表示這個閉包實現(xiàn)了 FnOnce trait,可以被調用一次。

          • Send 表示這個閉包可以在線程之間安全地傳遞。

          • 'static 表示閉包的生命周期是整個程序的生命周期,確保閉包在多個線程中可以安全使用。

          execute 方法

          **這個方法的功能是將一個新的任務(閉包)添加到線程池的任務隊列中,以供線程池中的工作線程執(zhí)行。下面是對 **F: FnOnce() + Send + 'static 的解釋:

          • F: FnOnce() + Send + 'static

            ** 是一個泛型約束,表示必須是一個實現(xiàn)了 FnOnce、Send和 'static的閉包類型。**

            • FnOnce() 確保閉包可以被調用一次。

            • Send 確保閉包可以安全地在線程之間傳遞。

            • 'static 確保閉包的生命周期足夠長,可以在整個程序運行期間有效。

          **在 **execute 方法中,你將傳入的閉包 f 轉換成 Job 類型(即 Box<dyn FnOnce() + Send + 'static>),然后通過 self.sender 將其發(fā)送到任務隊列中。這使得線程池的工作線程可以從隊列中接收并執(zhí)行這些任務。

          總結

          **通過引入 **Worker 結構體并使用信道進行任務傳遞,我們成功地實現(xiàn)了一個可以延遲分配任務的線程池。每個 Worker 都是在創(chuàng)建時啟動的,但它們會等待任務的到來,只有在接收到任務后才會開始執(zhí)行。這種設計不僅提高了服務器的吞吐量,還確保了線程資源的高效利用。


          *博客內容為網(wǎng)友個人發(fā)布,僅代表博主個人觀點,如有侵權請聯(lián)系工作人員刪除。


          關鍵詞: 云計算

          相關推薦

          技術專區(qū)

          關閉