Грациозное завершение и очистка
Код в Листинге 21-20 асинхронно обрабатывает запросы с помощью пула потоков, как и задумывалось. Мы получаем предупреждения о том, что поля workers, id и thread не используются напрямую, что напоминает нам об отсутствии очистки. Когда мы используем менее элегантный метод ctrl-c для остановки основного потока, все остальные потоки также немедленно останавливаются, даже если они находятся в процессе обработки запроса.
Далее мы реализуем типаж Drop, чтобы вызывать join для каждого потока в пуле, позволяя им завершить текущие задачи перед закрытием. Затем мы реализуем способ сообщить потокам, что они должны прекратить принимать новые задачи и завершить работу. Чтобы увидеть этот код в действии, мы модифицируем наш сервер так, чтобы он принимал только два запроса перед грациозным завершением пула потоков.
Одна деталь, на которую стоит обратить внимание: ничто из этого не затрагивает части кода, отвечающие за выполнение замыканий, поэтому всё здесь останется таким же, если бы мы использовали пул потоков для асинхронного рантайма.
Реализация типажа Drop для ThreadPool
Начнём с реализации Drop для нашего пула потоков. Когда пул удаляется, все наши потоки должны объединиться (join), чтобы убедиться, что они завершат свою работу. Листинг 21-22 показывает первую попытку реализации Drop; этот код пока не будет компилироваться.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Сначала мы перебираем всех workers пула потоков. Мы используем &mut для этого, потому что self — изменяемая ссылка, и нам также нужно иметь возможность изменять worker. Для каждого рабочего мы выводим сообщение о том, что этот конкретный экземпляр Worker завершает работу, а затем вызываем join у потока этого экземпляра Worker. Если вызов join завершится неудачей, мы используем unwrap, чтобы вызвать панику Rust и перейти к неграциозному завершению.
Вот ошибка, которую мы получаем при компиляции этого кода:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
Ошибка сообщает, что мы не можем вызвать join, потому что у нас только изменяемая заимствованная ссылка на каждого worker, а join принимает владение своим аргументом. Чтобы решить эту проблему, нам нужно переместить поток из экземпляра Worker, который владеет thread, чтобы join мог потреблять поток. Один из способов сделать это — использовать тот же подход, что и в Листинге 18-15. Если бы Worker содержал Option<thread::JoinHandle<()>>, мы могли бы вызвать метод take у Option, чтобы переместить значение из варианта Some и оставить вариант None на его месте. Другими словами, Worker, который выполняется, имел бы вариант Some в thread, а когда мы захотели бы очистить Worker, мы заменили бы Some на None, чтобы у Worker не было потока для выполнения.
Однако, единственный раз, когда это потребуется, — это при удалении Worker. Взамен нам пришлось бы иметь дело с Option<thread::JoinHandle<()>> везде, где мы обращаемся к worker.thread. Идиоматический Rust часто использует Option, но когда вы обнаруживаете, что оборачиваете что-то, что, как вы знаете, всегда будет присутствовать, в Option как обходной путь, это хороший признак поиска альтернативных подходов. Они могут сделать ваш код чище и менее подверженным ошибкам.
В этом случае существует лучшая альтернатива: метод Vec::drain. Он принимает параметр диапазона для указания, какие элементы удалить из Vec, и возвращает итератор этих элементов. Передача синтаксиса диапазона .. удалит все значения из Vec.
Таким образом, нам нужно обновить реализацию drop для ThreadPool следующим образом:
#![allow(unused)] fn main() { use std::{ sync::{Arc, Mutex, mpsc}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } } }
Это решает ошибку компилятора и не требует никаких других изменений в нашем коде.
Сигнализация потокам о прекращении прослушивания задач
Со всеми внесёнными изменениями наш код компилируется без предупреждений. Однако плохая новость в том, что этот код пока не работает так, как мы хотим. Ключевой момент — логика в замыканиях, выполняемых потоками экземпляров Worker: в данный момент мы вызываем join, но это не остановит потоки, потому что они loop вечно в поисках задач. Если мы попытаемся удалить наш ThreadPool с текущей реализацией drop, основной поток будет блокироваться вечно, ожидая завершения первого потока.
Чтобы исправить эту проблему, нам потребуется изменение в реализации drop для ThreadPool и затем изменение в цикле Worker.
Сначала мы изменим реализацию drop для ThreadPool, чтобы явно удалить sender перед ожиданием завершения потоков. Листинг 21-23 показывает изменения в ThreadPool для явного удаления sender. В отличие от потока, здесь нам действительно нужно использовать Option, чтобы иметь возможность переместить sender из ThreadPool с помощью Option::take.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
// --snip--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
sender перед объединением потоков WorkerУдаление sender закрывает канал, что указывает, что больше сообщений не будет отправлено. Когда это происходит, все вызовы recv, которые экземпляры Worker выполняют в бесконечном цикле, вернут ошибку. В Листинге 21-24 мы изменяем цикл Worker, чтобы грациозно выйти из цикла в этом случае, что означает, что потоки завершатся, когда реализация drop для ThreadPool вызовет join для них.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
recv ошибкиЧтобы увидеть этот код в действии, давайте изменим main так, чтобы он принимал только два запроса перед грациозным завершением сервера, как показано в Листинге 21-25.
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Вам не хотелось бы, чтобы реальный веб-сервер завершал работу после обработки только двух запросов. Этот код просто демонстрирует, что грациозное завершение и очистка работают.
Метод take определён в типаже Iterator и ограничивает итерацию первыми двумя элементами максимум. ThreadPool выйдет из области видимости в конце main, и реализация drop будет выполнена.
Запустите сервер с помощью cargo run и сделайте три запроса. Третий запрос должен завершиться ошибкой, и в вашем терминале вы должны увидеть вывод, похожий на этот:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Вы можете увидеть другой порядок идентификаторов Worker и выводимых сообщений. Мы можем понять, как работает этот код, по сообщениям: экземпляры Worker 0 и 3 получили первые два запроса. Сервер перестал принимать соединения после второго соединения, и реализация Drop на ThreadPool начала выполняться, прежде чем Worker 3 даже начал свою задачу. Удаление sender отключает все экземпляры Worker и сообщает им о завершении. Каждый экземпляр Worker выводит сообщение при отключении, а затем пул потоков вызывает join, чтобы дождаться завершения каждого потока Worker.
Обратите внимание на один интересный аспект этого конкретного выполнения: ThreadPool удалил sender, и прежде чем какой-либо Worker получил ошибку, мы попытались объединить Worker 0. Worker 0 ещё не получил ошибку от recv, поэтому основной поток блокировался, ожидая завершения Worker 0. Между тем Worker 3 получил задачу, а затем все потоки получили ошибку. Когда Worker 0 завершился, основной поток ждал завершения остальных экземпляров Worker. На тот момент они все вышли из своих циклов и остановились.
Поздравляем! Мы завершили наш проект; у нас есть базовый веб-сервер, который использует пул потоков для асинхронного ответа. Мы способны выполнить грациозное завершение сервера, которое очищает все потоки в пуле.
Вот полный код для справки:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Мы могли бы сделать больше! Если вы хотите продолжить улучшать этот проект, вот несколько идей:
- Добавить больше документации к
ThreadPoolи его публичным методам. - Добавить тесты функциональности библиотеки.
- Заменить вызовы
unwrapна более надёжную обработку ошибок. - Использовать
ThreadPoolдля выполнения задач, отличных от обслуживания веб-запросов. - Найти пул потоков на crates.io и реализовать аналогичный веб-сервер, используя крейт вместо нашего. Затем сравнить его API и надёжность с пулом потоков, который мы реализовали.
Заключение
Молодец! Вы дошли до конца книги! Мы хотим поблагодарить вас за это путешествие по Rust. Теперь вы готовы реализовывать свои собственные проекты на Rust и помогать в проектах других людей. Помните, что существует гостеприимное сообщество других растаянцев, которые с радостью помогут вам с любыми трудностями, которые вы встретите на своём пути в Rust.