Использование передачи сообщений для обмена данными между потоками
Одним из всё более популярных подходов к обеспечению безопасной конкурентности является передача сообщений, при которой потоки или акторы общаются, отправляя друг другу сообщения, содержащие данные. Вот эта идея, выраженная в лозунге из документации языка Go: «Не общайтесь, разделяя память; вместо этого разделяйте память, общаясь».
Для реализации конкурентности на основе отправки сообщений стандартная библиотека Rust предоставляет реализацию каналов. Канал — это общая концепция программирования, с помощью которой данные отправляются из одного потока в другой.
Вы можете представить канал в программировании как направленный водный канал, такой как ручей или река. Если вы положите что-то вроде резиновой уточки в реку, она поплывёт вниз по течению до конца водного пути.
Канал имеет две половины: передатчик и приёмник. Передатчик — это место вверх по течению, где вы кладёте резиновую уточку в реку, а приёмник — то место вниз по течению, где уточка в итоге окажется. Одна часть вашего кода вызывает методы передатчика с данными, которые вы хотите отправить, а другая часть проверяет приёмник на наличие прибывших сообщений. Канал считается закрытым, если либо половина-передатчик, либо половина-приёмник удалены (drop).
Здесь мы постепенно создадим программу, в которой один поток будет генерировать значения и отправлять их по каналу, а другой поток будет получать значения и выводить их. Мы будем отправлять простые значения между потоками с помощью канала, чтобы проиллюстрировать эту возможность. Как только вы освоите эту технику, вы сможете использовать каналы для любых потоков, которым нужно общаться друг с другом, например, для системы чата или системы, где множество потоков выполняют части вычислений и отправляют их одному потоку, который агрегирует результаты.
Сначала, в Листинге 16-6, мы создадим канал, но ничего с ним не будем делать. Обратите внимание, что это пока не скомпилируется, потому что Rust не может определить, какой тип значений мы хотим отправлять по каналу.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx и rxМы создаём новый канал с помощью функции mpsc::channel; mpsc означает multiple producer, single consumer (множество производителей, один потребитель). Если коротко, способ реализации каналов в стандартной библиотеке Rust означает, что канал может иметь несколько отправляющих концов, которые производят значения, но только один принимающий конец, который потребляет эти значения. Представьте несколько потоков, сливающихся в одну большую реку: всё, что отправлено по любому из потоков, в итоге окажется в одной реке. Мы начнём с одного производителя, но добавим нескольких, когда этот пример заработает.
Функция mpsc::channel возвращает кортеж, первым элементом которого является отправляющий конец — передатчик, а вторым — принимающий конец — приёмник. Сокращения tx и rx традиционно используются во многих областях для transmitter (передатчик) и receiver (приёмник) соответственно, поэтому мы называем наши переменные так, чтобы обозначить каждый конец. Мы используем оператор let с паттерном, который деструктурирует кортеж; об использовании паттернов в операторах let и деструктурировании мы поговорим в Главе 19. Пока просто знайте, что использование оператора let таким образом — это удобный способ извлечь части кортежа, возвращаемого mpsc::channel.
Перенесём передающий конец в порождённый поток и заставим его отправить одну строку, чтобы порождённый поток общался с основным потоком, как показано в Листинге 16-7. Это похоже на то, как если бы вы положили резиновую уточку в реку вверх по течению или отправили сообщение в чате из одного потока в другой.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
tx в порождённый поток и отправка "hi"Снова мы используем thread::spawn для создания нового потока, а затем используем move, чтобы переместить tx в замыкание, поэтому порождённый поток владеет tx. Порождённому потоку нужно владеть передатчиком, чтобы иметь возможность отправлять сообщения по каналу.
У передатчика есть метод send, который принимает значение, которое мы хотим отправить. Метод send возвращает тип Result<T, E>, поэтому если приёмник уже был удалён и nowhere нет, куда отправлять значение, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap, чтобы вызвать панику в случае ошибки. Но в реальном приложении мы бы обработали это правильно: вернитесь к Главе 9, чтобы повторить стратегии правильной обработки ошибок.
В Листинге 16-8 мы получим значение из приёмника в основном потоке. Это похоже на извлечение резиновой уточки из воды в конце реки или получение сообщения в чате.
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
"hi" в основном потоке и вывод егоУ приёмника есть два полезных метода: recv и try_recv. Мы используем recv, сокращение от receive (получение), который заблокирует выполнение основного потока и будет ждать, пока значение не будет отправлено по каналу. Как только значение отправлено, recv вернёт его в Result<T, E>. Когда передатчик закрывается, recv вернёт ошибку, чтобы сигнализировать, что больше значения не придут.
Метод try_recv не блокируется, а вместо этого сразу возвращает Result<T, E>: значение Ok, содержащее сообщение, если оно доступно, и значение Err, если сообщений сейчас нет. Использование try_recv полезно, если этому потоку есть другая работа, пока он ждёт сообщений: мы могли бы написать цикл, который периодически вызывает try_recv, обрабатывает сообщение, если оно доступно, а иначе делает другую работу немного, а затем проверяет снова.
Мы использовали recv в этом примере для простоты; у нас нет другой работы для основного потока, кроме как ждать сообщений, поэтому блокировка основного потока уместна.
Когда мы запускаем код в Листинге 16-8, мы увидим значение, выведенное из основного потока:
Got: hi
Отлично!
Каналы и передача владения
Правила владения играют жизненно важную роль в отправке сообщений, потому что они помогают писать безопасный конкурентный код. Предотвращение ошибок в конкурентном программировании — это преимущество осмысления владения во всех ваших программах на Rust. Давайте проведём эксперимент, чтобы показать, как каналы и владение работают вместе, чтобы предотвратить проблемы: мы попытаемся использовать значение val в порождённом потоке после того, как отправили его по каналу. Попробуйте скомпилировать код в Листинге 16-9, чтобы увидеть, почему этот код не разрешён.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val после отправки по каналуЗдесь мы пытаемся вывести val после того, как отправили его по каналу через tx.send. Разрешить это было бы плохой идеей: как только значение отправлено в другой поток, этот поток мог бы изменить или удалить его до того, как мы попытаемся использовать значение снова. Потенциально изменения другого потока могли бы вызвать ошибки или неожиданные результаты из-за несогласованных или несуществующих данных. Однако Rust выдаёт ошибку, если мы пытаемся скомпилировать код в Листинге 16-9:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
Наша ошибка конкурентности вызвала ошибку компиляции. Функция send принимает владение своим параметром, и когда значение перемещается, приёмник принимает владение им. Это не позволяет нам случайно использовать значение снова после отправки; система владения проверяет, что всё в порядке.
Отправка нескольких значений и наблюдение за ожиданием приёмника
Код в Листинге 16-8 скомпилировался и запустился, но он не показал нам явно, что два отдельных потока общаются друг с другом по каналу. В Листинге 16-10 мы внесли некоторые изменения, которые докажут, что код в Листинге 16-8 работает конкурентно: порождённый поток теперь будет отправлять несколько сообщений и делать паузу в одну секунду между каждым сообщением.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
На этот раз порождённый поток имеет вектор строк, которые мы хотим отправить в основной поток. Мы перебираем их, отправляя каждое по отдельности, и делаем паузу между каждым, вызывая функцию thread::sleep со значением Duration в одну секунду.
В основном потоке мы больше не вызываем функцию recv явно: вместо этого мы обращаемся с rx как с итератором. Для каждого полученного значения мы выводим его. Когда канал закрывается, итерация завершится.
При запуске кода в Листинге 16-10 вы должны увидеть следующий вывод с паузой в одну секунду между каждой строкой:
Got: hi
Got: from
Got: the
Got: thread
Поскольку у нас нет кода, который делает паузу или задержку в цикле for в основном потоке, мы можем сказать, что основной поток ждёт получения значений от порождённого потока.
Создание нескольких производителей путём клонирования передатчика
Ранее мы упомянули, что mpsc — это акроним для multiple producer, single consumer (множество производителей, один потребитель). Давайте используем mpsc и расширим код в Листинге 16-10, чтобы создать несколько потоков, которые все отправляют значения одному приёмнику. Мы можем сделать это, склонировав передатчик, как показано в Листинге 16-11.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
На этот раз, прежде чем создать первый порождённый поток, мы вызываем clone у передатчика. Это даст нам новый передатчик, который мы можем передать первому порождённому потоку. Мы передаём исходный передатчик второму порождённому потоку. Это даёт нам два потока, каждый из которых отправляет разные сообщения одному приёмнику.
Когда вы запускаете код, ваш вывод должен выглядеть примерно так:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
Вы можете увидеть значения в другом порядке, в зависимости от вашей системы. Это то, что делает конкурентность интересной, а также сложной. Если вы поэкспериментируете с thread::sleep, задавая различные значения в разных потоках, каждый запуск будет более недетерминированным и создаст разный вывод каждый раз.
Теперь, когда мы рассмотрели, как работают каналы, давайте посмотрим на другой метод конкурентности.