Închiderea ordonată și curățarea

Codul din Listarea 20-20 gestionează cererile asincron folosind un pool de fire, conform intenției noastre. Apar câteva atenționări referitoare la câmpurile workers, id și thread pe care nu le utilizăm în mod direct, ceea ce ne aduce aminte că nu facem nicio curățare. Utilizând metoda mai puțin rafinată ctrl-c pentru a opri firul principal, toate celelalte fire sunt întrerupte brusc de asemenea, chiar și atunci când sunt pe punctul de a servi o cerere.

În pasul următor, vom implementa trăsătura Drop pentru a invoca join pe fiecare din fire din pool, astfel încât să își finalizeze cererile în curs de procesare înainte de a se închide. După aceea, vom crea un mecanism prin care firele de execuție să fie informate că trebuie să se oprească din a accepta cereri noi și să se închidă. Pentru a vedea acest cod în practică, vom ajusta serverul nostru să accepte doar două cereri înainte de a-și închide în mod ordonat pool-ul de fire.

Implementarea trăsăturii Drop pe ThreadPool

Să începem prin implementarea trăsăturii Drop pentru pool-ul nostru de fire de execuție. Când acesta este eliminat, este esențial ca toate firele să fie reunite pentru a asigura finalizarea muncii lor. Listarea 20-22 ilustrează o primă tentativă de implementare pentru Drop; Codul prezentat încă nu va funcționa cum trebuie.

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Listarea 20-22: Procedura de unire a fiecărui fir odată cu ieșirea pool-ului de fire din domeniul de vizibilitate

În primul rând, iterăm prin fiecare worker din pool-ul de thread-uri. Utilizăm &mut deoarece self este o referință mutabilă și avem nevoie să modificăm worker. Pentru fiecare worker, vom afișa un mesaj care anunță că acel worker specific este pe cale de a se opri, după care apelăm metoda join pe firul respectivului worker. Dacă apelul la join eșuează, utilizăm unwrap pentru a forța Rust să genereze panică și să realizeze o închidere forțată.

Acesta este mesajul de eroare pe care îl obținem la compilarea codului:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: this function takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/thread/mod.rs:1581:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

Eroarea indică faptul că nu putem apela join deoarece avem doar un împrumut mutabil pentru fiecare worker și join își asumă posesiunea argumentului pe care îl primește. Pentru a depăși această problemă, este necesar să mutăm firul din interiorul instanței Worker ce deține thread, permițând astfel join să consume firul. Acest lucru a fost implementat în Listarea 17-15: dacă Worker are un Option<thread::JoinHandle<()>>, putem folosi metoda take pe Option pentru a extrage valoarea din varianta Some și a lăsa locul ocupat de None. Pe scurt, un Worker activ va avea o variantă Some în thread, iar când dorim să curățăm un Worker, schimbăm Some cu None, făcând astfel ca Worker să nu mai aibă un fir activ.

Prin urmare, știm că dorim să actualizăm definiția lui Worker în felul următor:

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

Să ne sprijinim pe compilator pentru a identifica și alte segmente care necesită schimbări. Examinând acest cod, ne confruntăm cu două erori:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option<JoinHandle<()>>`
   |
note: the method `join` exists on the type `JoinHandle<()>`
  --> /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/thread/mod.rs:1581:5
help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None`
   |
52 |             worker.thread.expect("REASON").join().unwrap();
   |                          +++++++++++++++++

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

Să ne ocupăm de a doua eroare, care ne îndreaptă atenția către codul de la sfârșitul lui Worker::new; trebuie să plasăm valoarea thread în Some atunci când construim un nou Worker. Efectuați următoarele ajustări pentru a remedia această eroare:

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Prima eroare se regăsește în implementarea noastră Drop. Anterior, am indicat intenția de a folosi take pe valoarea Option pentru a muta thread din worker. Modificările următoare sunt necesare pentru a realiza aceasta:

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Așa cum am explicat în Capitolul 17, metoda take pe Option scoate varianta Some și lasă None în locul ei. Utilizăm if let pentru a destructura Some și a accesa firul; după care invocăm join pe acel thread. Dacă valoarea lui thread a unui worker este deja None, asta înseamnă că firul respectiv a fost deja curățat și atunci nu se întâmplă nimic în acea situație.

Semnalizarea firelor pentru a înceta acceptarea sarcinilor

După toate modificările efectuate, codul nostru se compilează fără avertismente. Totuși, partea negativă este că acest cod nu funcționează încă așa cum ne-am propus. Elementul cheie este logica din închiderea executată de firele de execuție ale instanțelor Worker: acum când apelăm join, acest lucru nu va opri firele deoarece acestea execută o buclă loop interminabilă în căutarea de sarcini. Dacă încercăm să distrugem ThreadPool cu implementarea curentă a metodei drop, firul principal va aștepta la nesfârșit ca primul fir să finalizeze procesarea.

Pentru a corecta această problemă, trebuie să modificăm implementarea metodei drop în ThreadPool și apoi să ajustăm bucla din Worker.

În primul rând, vom schimba implementarea drop pentru ThreadPool pentru a scăpa explicit de sender înainte de a aștepta finalizarea firelor. Listarea 20-23 ne arată schimbările aduse ThreadPool pentru a renunța explicit la sender. Utilizăm aceeași tehnică Option și take cum am făcut în cazul firului, pentru a putea muta sender în afara obiectului ThreadPool:

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Listarea 20-23: Ștergerea explicită a sender înainte de a face join la firele din instanțele worker

Renunțarea la sender conduce la închiderea canalului, ceea ce semnalează faptul că nu se vor mai trimite alte mesaje. Atunci când se întâmplă asta, toate apelurile la recv efectuate de instanțele worker în bucla infinită vor rezultat într-o eroare. În Listarea 20-24, modificăm bucla Worker astfel încât să iasă elegant din buclă în acest caz, ceea ce înseamnă că firele de execuție se vor încheia odată ce implementarea drop pentru ThreadPool inițiază join pe acestea.

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Listarea 20-24: Ieșirea explicită din buclă când recv returnează o eroare

Pentru a ilustra acest cod în practică, să modificăm funcția main pentru a accepta doar două cereri înainte de a încheia activitatea serverului într-un mod ordonat, așa cum arată Listarea 20-25.

Numele fișierului: src/main.rs

use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Listarea 20-25: Încheierea activității serverului după procesarea a două cereri, ieșind din buclă

În practică, nu ar fi ideal ca un server web să se oprească după procesarea a doar două cereri. Acest exemplu are scopul de a arăta că procedura de închidere și curățare funcționează corespunzător.

Metoda take este definită în trăsătura Iterator și limitează iterarea la primele două elemente, cel mult. ThreadPool va ieși din domeniul de aplicare la sfârșitul funcției main, ceea ce va declanșa executarea implementării drop.

Pornește serverul cu comanda cargo run și efectuează trei cereri. La cea de-a treia cerere, ar trebui să se întâmpine o eroare și în terminal ar trebui să ai o ieșire similară cu următoarea:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

S-ar putea să vezi o ordonare diferită a instanțelor de worker și a mesajelor afișate. Vedem cum funcționează acest cod prin intermediul mesajelor: instanțele 0 și 3 au procesat primele două cereri. Serverul a încetat de a mai accepta conexiuni după al doilea client, și implementarea Drop a ThreadPool a început să se activeze chiar înainte ca worker-ul 3 să își înceapă sarcina. Eliminând sender-ul, se deconectează toți worker-ii și li se transmite să se oprească. Fiecare worker afișează un mesaj în momentul deconectării, după care pool-ul de fire invocă join pentru a aștepta finalizarea fiecărui fir de execuție de worker.

Putem observa un detaliu interesant în această execuție: după ce ThreadPool a eliminat sender-ul, înainte ca vreun worker să întâmpine o eroare, am încercat să facem join pe worker-ul 0. Instanța 0 nu primise încă vreo eroare de la recv, deci firul principal a fost blocat așteptând ca worker-ul 0 să-și termine sarcina. Între timp, worker-ul 3 a primit o sarcină și apoi toate firele de execuție au întâmpinat o eroare. Odată ce worker-ul 0 și-a încheiat treaba, firul principal a așteptat finalizarea celorlalte fire de execuție ale worker-ilor. Toți ieșiră deja din buclele lor și s-au oprit.

Felicitări! Am completat proiectul nostru; acum avem la dispoziție un server web de bază care utilizează un pool de fire de execuție pentru a răspunde asincron. Suntem capabili să realizăm o închidere ordonată a serverului, care finalizează toate firele din pool.

Iată codul complet pentru referință:

Numele fișierului: src/main.rs

use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

Numele fișierului: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Putem adăuga și mai multe îmbunătățiri acestui proiect! Dacă dorești să continui dezvoltarea lui, iată câteva sugestii:

  • Completează documentația pentru ThreadPool și metodele sale publice.
  • Creează teste pentru a verifica funcționalitățile bibliotecii.
  • Înlocuiește apelurile la unwrap cu mecanisme de gestionare a eroarelor mai solide.
  • Folosește ThreadPool pentru a executa o sarcină diferită de procesarea cererilor web.
  • Explorează un crate pentru thread pool de pe crates.io și construiește un server web similar folosind acel crate. Compară apoi API-ul și nivelul de robustețe cu thread pool-ul pe care l-am creat noi.

Sumar

Bravo! Ai parcurs cartea până la capăt! Îți mulțumim că ai acceptat să ni te alături în acest periplu al limbajului Rust. Acum ești pregătit să demarezi propriile proiecte Rust și să contribui la proiectele altora. Reține că avem o comunitate disponibilă de Rustaceani care te așteaptă cu brațele deschise să te ajute în fața oricăror provocări pe care le poți întâlni în drumul tău cu Rust.