Превращение однопоточного сервера в многопоточный
В данный момент сервер обрабатывает каждый запрос последовательно, то есть не начнёт обрабатывать второе соединение, пока не завершит первое. Если сервер получает всё больше и больше запросов, такое последовательное выполнение становится всё менее оптимальным. Если сервер получает запрос, обработка которого занимает много времени, последующие запросы будут вынуждены ждать, пока долгий запрос не завершится, даже если новые запросы можно обработать быстро. Нам нужно это исправить, но сначала посмотрим на проблему в действии.
Имитация медленного запроса в текущей реализации сервера
Посмотрим, как медленная обработка запроса может повлиять на другие запросы к нашей текущей реализации сервера. Листинг 21-10 реализует обработку запроса к /sleep с имитацией медленного ответа, из-за которого сервер будет ждать пять секунд перед ответом.
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Мы заменили if на match, так как теперь у нас три случая. Нам нужно явно сопоставлять со срезом request_line для сравнения со строковыми литералами; match не выполняет автоматическое обращение и разыменование, как это делает метод равенства.
Первая ветка такая же, как блок if из листинга 21-9. Вторая ветка соответствует запросу к /sleep. Когда такой запрос получен, сервер будет ждать пять секунд, прежде чем отобразить успешную HTML-страницу. Третья ветка такая же, как блок else из листинга 21-9.
Вы видите, насколько примитивен наш сервер: реальные библиотеки обрабатывают распознавание множества запросов гораздо менее многословно!
Запустите сервер, используя cargo run. Затем откройте два окна браузера: одно для http://127.0.0.1:7878/, а другое для http://127.0.0.1:7878/sleep. Если вы несколько раз откроете URI /, как и раньше, вы увидите, что он отвечает быстро. Но если вы откроете /sleep, а затем загрузите /, вы увидите, что / ждёт, пока sleep полностью отслушает свои пять секунд.
Существует несколько техник, которые мы могли бы использовать, чтобы избежать накопления запросов позади медленного, включая использование async, как мы делали в главе 17; та, которую мы реализуем, — это пул потоков.
Повышение пропускной способности с помощью пула потоков
Пул потоков — это группа порождённых потоков, которые ждут и готовы обработать задачу. Когда программа получает новую задачу, она назначает один из потоков в пуле для этой задачи, и этот поток обработает её. Остальные потоки в пуле доступны для обработки любых других задач, поступающих, пока первый поток занят. Когда первый поток завершает обработку своей задачи, он возвращается в пул простаивающих потоков, готовый обработать новую задачу. Пул потоков позволяет обрабатывать соединения параллельно, увеличивая пропускную способность сервера.
Мы ограничим количество потоков в пуле небольшим числом, чтобы защититься от DoS-атак; если бы наша программа создавала новый поток для каждого поступающего запроса, кто-то, делающий 10 миллионов запросов к нашему серверу, мог бы навредить, исчерпав все ресурсы сервера и остановив обработку запросов.
Вместо создания неограниченного количества потоков у нас будет фиксированное количество потоков, ожидающих в пуле. Поступающие запросы отправляются в пул для обработки. Пул будет поддерживать очередь входящих запросов. Каждый из потоков в пуле будет забирать запрос из этой очереди, обрабатывать его, а затем запрашивать у очереди следующий запрос. При таком дизайне мы можем обрабатывать до N запросов параллельно, где N — количество потоков. Если каждый поток отвечает на долгий запрос, последующие запросы всё ещё могут накапливаться в очереди, но мы увеличили количество долгих запросов, которые можем обработать, прежде чем достигнем этой точки.
Эта техника — лишь одна из многих способов повысить пропускную способность веб-сервера. Другие варианты, которые вы могли бы изучить, — это модель fork/join, модель однопоточного асинхронного ввода-вывода и модель многопоточного асинхронного ввода-вывода. Если вас интересует эта тема, вы можете подробнее узнать о других решениях и попробовать их реализовать; с низкоуровневым языком, таким как Rust, все эти варианты возможны.
Прежде чем мы начнём реализовывать пул потоков, давайте поговорим о том, как должно выглядеть его использование. Когда вы пытаетесь спроектировать код, написание клиентского интерфейса сначала может помочь направить ваш дизайн. Напишите API кода так, чтобы он был структурирован так, как вы хотите его вызывать; затем реализуйте функциональность внутри этой структуры, а не реализуйте функциональность, а потом проектируйте публичный API.
Подобно тому, как мы использовали разработку через тесты в проекте в главе 12, здесь мы будем использовать разработку, управляемую компилятором. Мы напишем код, который вызывает функции, которые мы хотим, а затем посмотрим на ошибки компилятора, чтобы определить, что нам нужно изменить дальше, чтобы заставить код работать. Однако перед этим мы изучим технику, которую мы не будем использовать в качестве отправной точки.
Создание потока для каждого запроса
Сначала давайте рассмотрим, как мог бы выглядеть наш код, если бы он действительно создавал новый поток для каждого соединения. Как упоминалось ранее, это не наш окончательный план из-за проблем с потенциальным созданием неограниченного количества потоков, но это отправная точка, чтобы сначала получить работающий многопоточный сервер. Затем мы добавим пул потоков как улучшение, и сравнение двух решений будет проще. Листинг 21-11 показывает изменения, которые нужно внести в main, чтобы создать новый поток для обработки каждого потока в цикле for.
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Как вы узнали в главе 16, thread::spawn создаст новый поток, а затем выполнит код в замыкании в новом потоке. Если вы запустите этот код и загрузите /sleep в браузере, а затем / в двух других вкладках браузера, вы действительно увидите, что запросы к / не должны ждать завершения /sleep. Однако, как мы упоминали, это в конечном итоге перегрузит систему, потому что вы бы создавали новые потоки без какого-либо ограничения.
Вы также могли вспомнить из главы 17, что это именно та ситуация, где async и await действительно сияют! Держите это в уме, когда мы строим пул потоков и думаем, как всё выглядело бы по-другому или так же с async.
Создание конечного количества потоков
Мы хотим, чтобы наш пул потоков работал похожим, знакомым способом, чтобы переход от потоков к пулу потоков не требовал больших изменений в коде, который использует наш API. Листинг 21-12 показывает гипотетический интерфейс для структуры ThreadPool, который мы хотим использовать вместо thread::spawn.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ThreadPoolМы используем ThreadPool::new для создания нового пула потоков с настраиваемым количеством потоков, в данном случае четырьмя. Затем, в цикле for, pool.execute имеет интерфейс, аналогичный thread::spawn, в том смысле, что он принимает замыкание, которое пул должен выполнить для каждого потока. Нам нужно реализовать pool.execute так, чтобы он принимал замыкание и передавал его потоку в пуле для выполнения. Этот код ещё не скомпилируется, но мы попробуем, чтобы компилятор мог направлять нас, что нужно изменить дальше, чтобы заставить код работать.
Построение ThreadPool с использованием разработки, управляемой компилятором
Внесите изменения из листинга 21-12 в src/main.rs, а затем давайте используем ошибки компилятора от cargo check, чтобы управлять нашей разработкой. Вот первая ошибка, которую мы получаем:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Отлично! Эта ошибка говорит нам, что нам нужен тип или модуль ThreadPool, так что мы сейчас построим один. Наша реализация ThreadPool будет независима от вида работы, которую делает наш веб-сервер. Поэтому давайте переключим крейт hello из бинарного крейта в библиотечный крейт, чтобы хранить нашу реализацию ThreadPool. После того как мы перейдём на библиотечный крейт, мы также сможем использовать отдельную библиотеку пула потоков для любой работы, которую мы хотим выполнять с помощью пула потоков, а не только для обслуживания веб-запросов.
Создайте файл src/lib.rs, содержащий следующее, что является самым простым определением структуры ThreadPool, которое мы можем иметь на данный момент:
pub struct ThreadPool;
Затем отредактируйте файл main.rs, чтобыBring ThreadPool в область видимости из библиотечного крейта, добавив следующий код в начало src/main.rs:
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Этот код всё ещё не будет работать, но давайте проверим его снова, чтобы получить следующую ошибку, которую нам нужно исправить:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Эта ошибка указывает, что следующее нам нужно создать связанную функцию с именем new для ThreadPool. Мы также знаем, что new должен иметь один параметр, который может принять 4 в качестве аргумента, и должен возвращать экземпляр ThreadPool. Давайте реализуем самую простую функцию new, которая будет иметь эти характеристики:
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
Мы выбрали usize в качестве типа параметра size, потому что знаем, что отрицательное количество потоков не имеет смысла. Мы также знаем, что будем использовать это 4 как количество элементов в коллекции потоков, для чего и предназначен тип usize, как обсуждалось в “Integer Types” в главе 3.
Давайте проверим код снова:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Теперь ошибка возникает потому, что у нас нет метода execute на ThreadPool. Вспомните из “Creating a Finite Number of Threads”, что мы решили, что наш пул потоков должен иметь интерфейс, аналогичный thread::spawn. Кроме того, мы реализуем функцию execute так, чтобы она принимала замыкание, которое ей дано, и передавала его простаивающему потоку в пуле для выполнения.
Мы определим метод execute на ThreadPool, чтобы он принимал замыкание в качестве параметра. Вспомните из “Moving Captured Values Out of the Closure and the Fn Traits” в главе 13, что мы можем принимать замыкания в качестве параметров с тремя разными типажами: Fn, FnMut и FnOnce. Нам нужно решить, какой вид замыкания использовать здесь. Мы знаем, что в конечном итоге сделаем что-то похожее на реализацию thread::spawn из стандартной библиотеки, так что мы можем посмотреть, какие границы имеет сигнатура thread::spawn для своего параметра. Документация показывает нам следующее:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Параметр типа F — это тот, который нас интересует здесь; параметр типа T связан с возвращаемым значением, и нас он не интересует. Мы видим, что spawn использует FnOnce в качестве границы типажа на F. Это, вероятно, то, что мы хотим и здесь, потому что в конечном итоге мы передадим аргумент, который получаем в execute, в spawn. Мы можем быть ещё более уверены, что FnOnce — это типаж, который мы хотим использовать, потому что поток для выполнения запроса выполнит замыкание этого запроса только один раз, что соответствует Once в FnOnce.
Параметр типа F также имеет границу типажа Send и границу времени жизни 'static, которые полезны в нашей ситуации: нам нужно Send для передачи замыкания из одного потока в другой и 'static, потому что мы не знаем, сколько времени займёт выполнение потока. Давайте создадим метод execute на ThreadPool, который будет принимать обобщённый параметр типа F с этими границами:
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Мы всё ещё используем () после FnOnce, потому что этот FnOnce представляет замыкание, которое не принимает параметров и возвращает тип единицы (). Как и в определениях функций, возвращаемый тип может быть опущен из сигнатуры, но даже если у нас нет параметров, нам всё ещё нужны круглые скобки.
Опять же, это самая простая реализация метода execute: она ничего не делает, но мы только пытаемся заставить наш код скомпилироваться. Давайте проверим его снова:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Он компилируется! Но обратите внимание, что если вы попробуете cargo run и сделаете запрос в браузере, вы увидите ошибки в браузере, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute!
Примечание: Высказывание, которое вы можете услышать о языках со строгими компиляторами, таких как Haskell и Rust, — «если код компилируется, он работает». Но это высказывание не универсально верно. Наш проект компилируется, но он абсолютно ничего не делает! Если бы мы строили реальный, завершённый проект, это было бы хорошее время, чтобы начать писать модульные тесты для проверки того, что код компилируется и имеет желаемое поведение.
Подумайте: что было бы здесь другим, если бы мы собирались выполнять future вместо замыкания?
Проверка количества потоков в new
Мы ничего не делаем с параметрами new и execute. Давайте реализуем тела этих функций с желаемым поведением. Для начала давайте подумаем о new. Ранее мы выбрали беззнаковый тип для параметра size, потому что пул с отрицательным количеством потоков не имеет смысла. Однако пул с нулевым количеством потоков также не имеет смысла, хотя ноль — это совершенно действительный usize. Мы добавим код для проверки, что size больше нуля, прежде чем возвращать экземпляр ThreadPool, и заставим программу завершиться с паникой, если она получит ноль, используя макрос assert!, как показано в листинге 21-13.
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool::new для паники, если size равен нулюМы также добавили некоторую документацию для нашего ThreadPool с помощью комментариев документации. Обратите внимание, что мы последовали хорошей практике документации, добавив раздел, который указывает на ситуации, в которых наша функция может вызвать панику, как обсуждалось в главе 14. Попробуйте запустить cargo doc --open и нажать на структуру ThreadPool, чтобы увидеть, как выглядят сгенерированные документы для new!
Вместо добавления макроса assert!, как мы сделали здесь, мы могли бы изменить new на build и вернуть Result, как мы делали с Config::build в проекте ввода-вывода в листинге 12-9. Но мы решили в этом случае, что попытка создать пул потоков без каких-либо потоков должна быть невосстанавливаемой ошибкой. Если вы чувствуете амбиции, попробуйте написать функцию с именем build со следующей сигнатурой, чтобы сравнить с функцией new:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
Создание пространства для хранения потоков
Теперь, когда у нас есть способ знать, что у нас есть допустимое количество потоков для хранения в пуле, мы можем создать эти потоки и сохранить их в структуре ThreadPool перед возвратом структуры. Но как нам «хранить» поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Функция spawn возвращает JoinHandle<T>, где T — это тип, который возвращает замыкание. Давайте попробуем использовать JoinHandle тоже и посмотрим, что произойдёт. В нашем случае замыкания, которые мы передаём в пул потоков, будут обрабатывать соединение и ничего не возвращать, так что T будет типом единицы ().
Код в листинге 21-14 скомпилируется, но пока не создаёт никаких потоков. Мы изменили определение ThreadPool на хранение вектора экземпляров thread::JoinHandle<()>, инициализировали вектор ёмкостью size, настроили цикл for, который будет выполнять код для создания потоков, и вернули экземпляр ThreadPool, содержащий их.
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool для хранения потоковМы привели std::thread в область видимости в библиотечном крейте, потому что используем thread::JoinHandle в качестве типа элементов в векторе в ThreadPool.
Как только действительный размер получен, наш ThreadPool создаёт новый вектор, который может содержать size элементов. Функция with_capacity выполняет ту же задачу, что и Vec::new, но с важным отличием: она предварительно выделяет пространство в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, это выделение заранее немного более эффективно, чем использование Vec::new, который изменяет размер по мере вставки элементов.
Когда вы снова запустите cargo check, он должен завершиться успешно.
Структура Worker, ответственная за отправку кода из ThreadPool в поток
Мы оставили комментарий в цикле for в листинге 21-14 относительно создания потоков. Здесь мы посмотрим, как мы на самом деле создаём потоки. Стандартная библиотека предоставляет thread::spawn как способ создания потоков, и thread::spawn ожидает получить некоторый код, который поток должен выполнить сразу после создания. Однако в нашем случае мы хотим создать потоки и заставить их ждать код, который мы отправим позже. Реализация потоков в стандартной библиотеке не включает способа сделать это; мы должны реализовать это вручную.
Мы реализуем это поведение, введя новую структуру данных между ThreadPool и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру данных Worker, что является общим термином в реализациях пулов. Worker забирает код, который нужно запустить, и запускает код в потоке Worker.
Подумайте о людях, работающих на кухне в ресторане: работники ждут, пока поступят заказы от клиентов, а затем они ответственны за взятие этих заказов и их выполнение.
Вместо хранения вектора экземпляров JoinHandle<()> в пуле потоков, мы будем хранить экземпляры структуры Worker. Каждый Worker будет хранить один экземпляр JoinHandle<()>. Затем мы реализуем метод на Worker, который будет принимать замыкание кода для выполнения и отправлять его уже запущенному потоку для выполнения. Мы также дадим каждому Worker id, чтобы мы могли различать разные экземпляры Worker в пуле при ведении журнала или отладке.
Вот новый процесс, который будет происходить, когда мы создаём ThreadPool. Мы реализуем код, который отправляет замыкание в поток после того, как Worker настроен таким образом:
- Определите структуру
Worker, которая хранитidиJoinHandle<()>. - Измените
ThreadPoolна хранение вектора экземпляровWorker. - Определите функцию
Worker::new, которая принимает номерidи возвращает экземплярWorker, который хранитidи поток, порождённый с пустым замыканием. - В
ThreadPool::newиспользуйте счётчик циклаforдля генерацииid, создайте новыйWorkerс этимidи сохраните работника в векторе.
Если вы готовы к вызову, попробуйте реализовать эти изменения самостоятельно, прежде чем посмотреть код в листинге 21-15.
Готовы? Вот листинг 21-15 с одним из способов сделать предыдущие модификации.
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool на хранение экземпляров Worker вместо прямого хранения потоковМы изменили имя поля на ThreadPool с threads на workers, потому что теперь он хранит экземпляры Worker вместо экземпляров JoinHandle<()>. Мы используем счётчик в цикле for в качестве аргумента для Worker::new и сохраняем каждого нового Worker в векторе с именем workers.
Внешний код (как наш сервер в src/main.rs) не должен знать детали реализации относительно использования структуры Worker внутри ThreadPool, поэтому мы делаем структуру Worker и её функцию new приватными. Функция Worker::new использует id, который мы ей даём, и хранит экземпляр JoinHandle<()>, который создаётся путём порождения нового потока с использованием пустого замыкания.
Примечание: Если операционная система не может создать поток из-за нехватки системных ресурсов,
thread::spawnвызовет панику. Это приведёт к панике всего нашего сервера, даже если создание некоторых потоков может завершиться успехом. Для простоты это поведение приемлемо, но в реальной реализации пула потоков вы, вероятно, захотели бы использоватьstd::thread::Builderи его методspawn, который возвращаетResultвместо этого.
Этот код скомпилируется и будет хранить количество экземпляров Worker, которое мы указали в качестве аргумента для ThreadPool::new. Но мы всё ещё не обрабатываем замыкание, которое получаем в execute. Давайте посмотрим, как это сделать дальше.
Отправка запросов потокам через каналы
Следующая проблема, которую мы решим, заключается в том, что замыкания, переданные thread::spawn, абсолютно ничего не делают. В данный момент мы получаем замыкание, которое хотим выполнить, в методе execute. Но нам нужно дать thread::spawn замыкание для выполнения, когда мы создаём каждого Worker во время создания ThreadPool.
Мы хотим, чтобы экземпляры Worker, которые мы только что создали, забирали код для выполнения из очереди, хранящейся в ThreadPool, и отправляли этот код в свой поток для выполнения.
Каналы, о которых мы узнали в главе 16 — простой способ общения между двумя потоками — идеально подойдут для этого случая. Мы будем использовать канал в качестве очереди задач, и execute отправит задачу из ThreadPool экземплярам Worker, которые отправят задачу в свой поток. Вот план:
ThreadPoolсоздаст канал и будет хранить отправителя.- Каждый
Workerбудет хранить получателя. - Мы создадим новую структуру
Job, которая будет хранить замыкания, которые мы хотим отправить по каналу. - Метод
executeотправит задачу, которую он хочет выполнить, через отправителя. - В своём потоке
Workerбудет бесконечно циклировать по своему получателю, запрашивая задачу и выполняя её, когда получит.
Давайте начнём с создания канала в ThreadPool::new и хранения отправителя в экземпляре ThreadPool, как показано в листинге 21-16. Структура Job пока ничего не хранит, но будет типом элемента, который мы отправляем по каналу.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool на хранение отправителя канала, передающего экземпляры JobВ ThreadPool::new мы создаём наш новый канал и заставляем пул хранить отправителя. Это успешно скомпилируется.
Давайте попробуем передать получатель канала каждому Worker при создании пула потоков канала. Мы знаем, что хотим использовать получатель в потоке, который экземпляры Worker порождают, так что мы сослаемся на параметр receiver в замыкании. Код в листинге 21-17 ещё не скомпилируется.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
WorkerМы внесли небольшие и простые изменения: передаём получатель в Worker::new, а затем используем его внутри замыкания.
Когда мы пытаемся проверить этот код, мы получаем эту ошибку:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
Код пытается передать receiver нескольким экземплярам Worker. Это не сработает, как вы вспомните из главы 16: реализация канала, которую предоставляет Rust, — это несколько производителей, один потребитель. Это означает, что мы не можем просто клонировать потребительский конец канала, чтобы исправить этот код. Мы также не хотим отправлять сообщение несколько раз нескольким потребителям; мы хотим один список сообщений с несколькими экземплярами Worker так, чтобы каждое сообщение обрабатывалось один раз.
Кроме того, взятие задачи из очереди канала включает изменение receiver, поэтому потокам нужен безопасный способ разделения и изменения receiver; иначе мы можем получить гонки данных (как рассматривалось в главе 16).
Вспомните умные указатели с потокобезопасностью, рассмотренные в главе 16: чтобы разделить владение несколькими потоками и позволить потокам изменять значение, нам нужно использовать Arc<Mutex<T>>. Тип Arc позволит нескольким экземплярам Worker владеть получателем, а Mutex обеспечит, что только один Worker за раз получает задачу из получателя. Листинг 21-18 показывает изменения, которые нам нужно сделать.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Worker с использованием Arc и MutexВ ThreadPool::new мы помещаем получатель в Arc и Mutex. Для каждого нового Worker мы клонируем Arc, чтобы увеличить количество ссылок, так что экземпляры Worker могут разделять владение получателем.
С этими изменениями код компилируется! Мы приближаемся!
Реализация метода execute
Давайте наконец реализуем метод execute на ThreadPool. Мы также изменим Job со структуры на псевдоним типа для объекта типажа, который хранит тип замыкания, которое получает execute. Как обсуждалось в “Creating Type Synonyms with Type Aliases” в главе 20, псевдонимы типов позволяют нам делать длинные типы короче для удобства использования. Посмотрите на листинг 21-19.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Job для Box, хранящего каждое замыкание, а затем отправка задачи по каналуПосле создания нового экземпляра Job с использованием замыкания, которое мы получаем в execute, мы отправляем эту задачу по отправляющему концу канала. Мы вызываем unwrap на send на случай, если отправка завершится неудачей. Это может произойти, например, если мы остановим все наши потоки от выполнения, что означает, что принимающий конец перестал получать новые сообщения. В данный момент мы не можем остановить наши потоки от выполнения: наши потоки продолжают выполняться, пока существует пул. Причина, по которой мы используем unwrap, заключается в том, что мы знаем, что случай неудачи не произойдёт, но компилятор не знает этого.
Но мы ещё не совсем закончили! В Worker наше замыкание, передаваемое в thread::spawn, всё ещё только ссылается на принимающий конец канала. Вместо этого нам нужно, чтобы замыкание бесконечно циклировало, запрашивая у принимающего конца канала задачу и выполняя задачу, когда она получена. Давайте внесём изменение, показанное в листинге 21-20, в Worker::new.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
WorkerЗдесь мы сначала вызываем lock на receiver, чтобы получить мьютекс, а затем вызываем unwrap, чтобы вызвать панику при любых ошибках. Получение блокировки может завершиться неудачей, если мьютекс находится в отравленном состоянии, что может произойти, если какой-то другой поток вызвал панику, удерживая блокировку, а не освобождая её. В этой ситуации вызов unwrap, чтобы заставить этот поток вызвать панику, — правильное действие. Не стесняйтесь изменить этот unwrap на expect с сообщением об ошибке, которое имеет смысл для вас.
Если мы получаем блокировку на мьютексе, мы вызываем recv для получения Job из канала. Последний unwrap преодолевает любые ошибки здесь также, что может произойти, если поток, удерживающий отправителя, завершился, аналогично тому, как метод send возвращает Err, если получатель завершается.
Вызов recv блокируется, так что если задачи ещё нет, текущий поток будет ждать, пока задача не станет доступной. Mutex<T> обеспечивает, что только один поток Worker за раз пытается запросить задачу.
Наш пул потоков теперь в рабочем состоянии! Дайте ему cargo run и сделайте несколько запросов:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Успех! Теперь у нас есть пул потоков, который выполняет соединения асинхронно. Никогда не создаётся более четырёх потоков, так что наша система не будет перегружена, если сервер получает много запросов. Если мы делаем запрос к /sleep, сервер сможет обслуживать другие запросы, заставив другой поток выполнять их.
Примечание: Если вы откроете
/sleepв нескольких окнах браузера одновременно, они могут загружаться по одному с интервалами в пять секунд. Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно по причинам кэширования. Это ограничение не вызвано нашим веб-сервером.
Это хорошее время, чтобы остановиться и подумать, как код в листингах 21-18, 21-19 и 21-20 отличался бы, если бы мы использовали futures вместо замыкания для работы, которую нужно выполнить. Какие типы изменились? Как бы изменились сигнатуры методов, если бы они изменились? Какие части кода остались бы такими же?
После изучения цикла while let в главах 17 и 18 вы можете задаться вопросом, почему мы не написали код рабочего потока, как показано в листинге 21-21.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker::new с использованием while letЭтот код компилируется и работает, но не приводит к желаемому поведению потоков: медленный запрос всё ещё заставит другие запросы ждать обработки. Причина несколько тонкая: у структуры Mutex нет публичного метода unlock, потому что владение блокировкой основано на времени жизни MutexGuard<T> внутри LockResult<MutexGuard<T>>, который возвращает метод lock. На этапе компиляции проверка заимствований может затем обеспечить правило, что ресурс, защищённый Mutex, не может быть доступен, если мы не удерживаем блокировку. Однако это также может привести к тому, что блокировка удерживается дольше, чем предполагалось, если мы не внимательны ко времени жизни MutexGuard<T>.
Код в листинге 21-20, который использует let job = receiver.lock().unwrap().recv().unwrap();, работает, потому что с let любые временные значения, используемые в выражении справа от знака равенства, немедленно удаляются, когда завершается оператор let. Однако while let (и if let, и match) не удаляет временные значения до конца связанного блока. В листинге 21-21 блокировка остаётся удерживаемой на время вызова job(), что означает, что другие экземпляры Worker не могут получать задачи.