Применение конкурентности с async
В этом разделе мы применим async к некоторым тем же задачам конкурентности, которые решали с помощью потоков в главе 16. Поскольку мы уже обсудили там многие ключевые идеи, в этом разделе мы сосредоточимся на различиях между потоками и futures.
Во многих случаях API для работы с конкурентностью через async очень похожи на те, что используются с потоками. В других случаях они оказываются довольно разными. Даже когда API выглядят похожими между потоками и async, они часто имеют разное поведение — и почти всегда имеют разные характеристики производительности.
Создание новой задачи с spawn_task
Первая операция, которую мы рассмотрели в Создание нового потока с
Spawn, — это подсчёт в двух отдельных потоках.
Давайте сделаем то же самое, используя async. Крейт trpl предоставляет
функцию spawn_task, которая очень похожа на API thread::spawn, и функцию
sleep, которая является асинхронной версией API thread::sleep. Мы можем
использовать их вместе для реализации примера с подсчётом, как показано в
Листинге 17-6.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
В качестве отправной точки мы настраиваем функцию main с помощью trpl::run,
чтобы наша верхнеуровневая функция могла быть async.
Примечание: Начиная с этого момента в главе каждый пример будет включать этот точно такой же обёрточный код с
trpl::runвmain, поэтому мы часто будем его пропускать, как и делаем сmain. Не забывайте включать его в свой код!
Затем мы пишем два цикла внутри этого блока, каждый из которых содержит вызов
trpl::sleep, который ждёт полсекунды (500 миллисекунд) перед отправкой
следующего сообщения. Мы помещаем один цикл в тело trpl::spawn_task, а другой
в верхнеуровневый цикл for. Мы также добавляем await после вызовов sleep.
Этот код ведёт себя аналогично реализации на основе потоков — включая тот факт, что вы можете видеть, как сообщения появляются в вашем терминале в другом порядке при запуске:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
Эта версия останавливается, как только цикл for в теле основного async-блока
завершается, потому что задача, созданная spawn_task, завершается, когда
функция main заканчивается. Если вы хотите, чтобы она работала до завершения
задачи, вам нужно использовать handle соединения (join handle), чтобы дождаться
завершения первой задачи. С потоками мы использовали метод join, чтобы
“заблокироваться” до завершения потока. В Листинге 17-7 мы можем использовать
await для того же самого, потому что handle задачи сам является future. Его
тип Output — это Result, поэтому мы также разворачиваем (unwrap) его после
ожидания (await).
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
await с join handle для выполнения задачи до завершенияЭта обновлённая версия работает до завершения обоих циклов.
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Пока что кажется, что async и потоки дают нам те же основные результаты, просто
с разным синтаксисом: использование await вместо вызова join на join handle
и ожидание (await) вызовов sleep.
Более существенное различие в том, что нам не нужно было создавать ещё один
операционный системный поток для этого. Фактически, нам даже не нужно создавать
задачу здесь. Поскольку async-блоки компилируются в анонимные futures, мы можем
поместить каждый цикл в async-блок и позволить рантайму выполнить оба до
завершения, используя функцию trpl::join.
В разделе Ожидание завершения всех потоков с помощью join
handles мы показали, как использовать метод
join на типе JoinHandle, возвращаемом при вызове std::thread::spawn.
Функция trpl::join похожа, но для futures. Когда вы передаёте ей два future,
она производит один новый future, вывод (output) которого — это кортеж,
содержащий вывод каждого переданного future после того, как они оба
завершатся. Таким образом, в Листинге 17-8 мы используем trpl::join, чтобы
дождаться завершения как fut1, так и fut2. Мы не ожидаем (await) fut1 и
fut2, а ожидаем новый future, произведённый trpl::join. Мы игнорируем вывод,
потому что это просто кортеж, содержащий два значения типа unit.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
trpl::join для ожидания двух анонимных futuresПри запуске мы видим, что оба future выполняются до завершения:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Теперь вы увидите точно такой же порядок каждый раз, что очень отличается от
того, что мы видели с потоками. Это потому, что функция trpl::join является
справедливой (fair), что означает, что она проверяет каждый future одинаково
часто, чередуя их, и никогда не позволяет одному обогнать другой, если другой
готов. С потоками операционная система решает, какой поток проверять и как долго
ему позволять работать. С async Rust рантайм решает, какую задачу проверять.
(На практике детали усложняются, потому что асинхронный рантайм может
использовать операционные системные потоки под капотом как часть управления
конкурентностью, поэтому гарантировать справедливость может быть более сложной
задачей для рантайма — но это всё же возможно!) Рантаймы не обязаны гарантировать
справедливость для любой данной операции, и они часто предлагают разные API,
позволяющие выбрать, хотите ли вы справедливость или нет.
Попробуйте некоторые из этих вариаций ожидания futures и посмотрите, что они делают:
- Удалите async-блок вокруг одного или обоих циклов.
- Ожидайте (await) каждый async-блок сразу после его определения.
- Оберните только первый цикл в async-блок и ожидайте (await) полученный future после тела второго цикла.
Для дополнительного вызова попробуйте понять, каким будет вывод в каждом случае до запуска кода!
Подсчёт в двух задачах с использованием передачи сообщений
Обмен данными между futures тоже будет знаком: мы снова будем использовать передачу сообщений, но на этот раз с асинхронными версиями типов и функций. Мы возьмём немного другой путь, чем в Использование передачи сообщений для передачи данных между потоками, чтобы проиллюстрировать некоторые ключевые различия между конкурентностью на основе потоков и на основе futures. В Листинге 17-9 мы начнём с одного async-блока — не создавая отдельную задачу, как мы создавали отдельный поток.
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("Got: {received}"); }); }
tx и rxЗдесь мы используем trpl::channel, асинхронную версию API канала
многопроизводитель-однопотребитель, который мы использовали с потоками в главе
16. Асинхронная версия API отличается от версии на основе потоков лишь немного:
она использует изменяемого (mutable), а не неизменяемого (immutable) приёмника
rx, и её метод recv производит future, который нам нужно ожидать (await),
вместо того чтобы производить значение напрямую. Теперь мы можем отправлять
сообщения от отправителя к получателю. Обратите внимание, что нам не нужно
создавать отдельный поток или даже задачу; нам merely нужно ожидать (await) вызов
rx.recv.
Синхронный метод Receiver::recv в std::mpsc::channel блокируется до получения
сообщения. Метод trpl::Receiver::recv не блокируется, потому что он async.
Вместо блокировки он возвращает управление рантайму до тех пор, либо пока не
будет получено сообщение, либо пока отправительская сторона канала не закроется.
В отличие от этого, мы не ожидаем (await) вызов send, потому что он не
блокируется. Ему не нужно блокироваться, потому что канал, в который мы его
отправляем, неограниченный (unbounded).
Примечание: Поскольку весь этот async-код выполняется в async-блоке в вызове
trpl::run, всё внутри него может избегать блокировок. Однако код вне его будет блокироваться на возврате функцииrun. В этом и есть вся суть функцииtrpl::run: она позволяет вам выбрать, где блокироваться на некотором наборе async-кода, и, следовательно, где выполнять переход между sync и async кодом. В большинстве async-рантаймовrunна самом деле называетсяblock_onименно по этой причине.
Обратите внимание на две вещи в этом примере. Во-первых, сообщение приходит сразу. Во-вторых, хотя мы используем future здесь, конкуренции (concurrency) ещё нет. Всё в листинге происходит последовательно, как если бы futures не было.
Давайте рассмотрим первую часть, отправив серию сообщений и поставив задержку (sleep) между ними, как показано в Листинге 17-10.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
await между каждым сообщениемПомимо отправки сообщений, нам нужно их получать. В этом случае, поскольку мы
знаем, сколько сообщений ожидается, мы могли бы сделать это вручную, вызвав
rx.recv().await четыре раза. Однако в реальном мире мы обычно будем ждать
неизвестное количество сообщений, поэтому нам нужно продолжать ждать, пока не
определим, что больше сообщений нет.
В Листинге 16-10 мы использовали цикл for для обработки всех элементов,
полученных из синхронного канала. Однако у Rust пока нет способа написать цикл
for по асинхронной серии элементов, поэтому нам нужно использовать цикл,
который мы раньше не видели: условный цикл while let. Это версия цикла для
конструкции if let, которую мы видели в разделе Лаконичный поток управления с
if let и let else. Цикл будет продолжаться выполнения,
пока шаблон (pattern), который он указывает, продолжает соответствовать значению.
Вызов rx.recv производит future, который мы ожидаем (await). Рантайм приостановит
future, пока он не будет готов. Как только сообщение arrives, future разрешится
(resolve) в Some(message) столько раз, сколько arrives сообщений. Когда канал
закрывается, независимо от того, прибыло ли хоть одно сообщение, future вместо
этого разрешится в None, чтобы указать, что больше нет значений и, следовательно,
нам следует прекратить опрос (polling) — то есть прекратить ожидание (await).
Цикл while let объединяет всё это. Если результат вызова rx.recv().await —
Some(message), мы получаем доступ к сообщению и можем использовать его в теле
цикла, как и с if let. Если результат — None, цикл завершается. Каждый раз,
когда цикл завершается, он снова достигает точки ожидания (await point), поэтому
рантайм снова приостанавливает его до прибытия следующего сообщения.
Теперь код успешно отправляет и получает все сообщения. К сожалению, есть ещё несколько проблем. Во-первых, сообщения не прибывают с интервалом в полсекунды. Они прибывают все сразу, через 2 секунды (2000 миллисекунд) после запуска программы. Во-вторых, эта программа никогда не завершается! Вместо этого она ждет вечно новых сообщений. Вам нужно будет завершить её, используя клавиши ctrl-c.
Давайте начнём с изучения того, почему сообщения приходят все сразу после полной
задержки, а не с задержкой между каждым. В пределах данного async-блока порядок,
в котором ключевые слова await появляются в коде, — это также порядок, в котором
они выполняются при запуске программы.
В Листинге 17-10 есть только один async-блок, поэтому всё в нём выполняется
линейно. Конкуренции (concurrency) всё ещё нет. Все вызовы tx.send происходят,
вперемешку со всеми вызовами trpl::sleep и связанными с ними точками ожидания
(await points). Только тогда цикл while let получает пройти через какие-либо из
точек ожидания (await points) на вызовах recv.
Чтобы получить желаемое поведение, когда задержка сна происходит между каждым
сообщением, нам нужно поместить операции tx и rx в их собственные async-блоки,
как показано в Листинге 17-11. Затем рантайм может выполнить каждый из них
отдельно, используя trpl::join, как в примере с подсчётом. Опять же, мы ожидаем
(await) результат вызова trpl::join, а не отдельные futures. Если бы мы
ожидали (await) отдельные futures последовательно, мы просто вернулись бы к
последовательному потоку — именно того, чего мы не хотим.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send и recv в их собственные блоки async и ожидание (await) futures для этих блоковС обновлённым кодом в Листинге 17-11 сообщения выводятся с интервалом в 500 миллисекунд, а не все сразу через 2 секунды.
Программа всё ещё никогда не завершается, однако, из-за того, как цикл while let
взаимодействует с trpl::join:
- Future, возвращаемый из
trpl::join, завершается только после того, как оба future, переданные ему, завершатся. - Future
txзавершается после того, как он заканчивает спать после отправки последнего сообщения вvals. - Future
rxне завершится, пока не завершится циклwhile let. - Цикл
while letне закончится, пока ожидание (await)rx.recvне вернётNone. - Ожидание (await)
rx.recvвернётNoneтолько после того, как другой конец канала будет закрыт. - Канал закроется только если мы вызовем
rx.closeили когда отправительская сторона,tx, будет удалена (dropped). - Мы нигде не вызываем
rx.close, иtxне будет удалён до конца внешнего async-блока, переданного вtrpl::run. - Блок не может закончиться, потому что он заблокирован на завершении
trpl::join, что возвращает нас в начало этого списка.
Мы могли бы вручную закрыть rx, вызвав rx.close где-то, но это не имеет
большого смысла. Остановка после обработки некоторого произвольного количества
сообщений заставила бы программу завершиться, но мы могли бы пропустить сообщения.
Нам нужен другой способ убедиться, что tx удаляется до конца функции.
Сейчас async-блок, где мы отправляем сообщения, только заимствует (borrows) tx,
потому что отправка сообщения не требует владения (ownership), но если бы мы
могли переместить (move) tx в этот async-блок, он был бы удалён после завершения
этого блока. В разделе главы 13 Захват ссылок или перемещение владения вы узнали, как использовать ключевое слово move с замыканиями, и, как обсуждалось в разделе главы 16 Использование замыканий move с потоками, нам часто нужно перемещать данные в замыкания при работе с потоками. Те же самые основные динамики применимы к async-блокам, поэтому ключевое слово move работает с async-блоками так же, как и с замыканиями.
В Листинге 17-12 мы меняем блок, используемый для отправки сообщений, с async на
async move. Когда мы запускаем эту версию кода, она завершается корректно
после отправки и получения последнего сообщения.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Этот асинхронный канал также является каналом с несколькими производителями, так
что мы можем вызвать clone на tx, если хотим отправлять сообщения из нескольких
futures, как показано в Листинге 17-13.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
Сначала мы клонируем tx, создавая tx1 вне первого async-блока. Мы перемещаем
(move) tx1 в этот блок, как и делали раньше с tx. Затем, позже, мы перемещаем
оригинальный tx в новый async-блок, где отправляем больше сообщений с немного
более медленной задержкой. Мы помещаем этот новый async-блок после async-блока для
получения сообщений, но он мог бы быть и перед ним. Ключевым является порядок, в
котором futures ожидаются (awaited), а не порядок, в котором они создаются.
Оба async-блока для отправки сообщений должны быть блоками async move, чтобы и
tx, и tx1 удалялись при завершении этих блоков. В противном случае мы вернёмся
к тому же бесконечному циклу, с которого начинали. Наконец, мы переходим с
trpl::join на trpl::join3 для обработки дополнительного future.
Теперь мы видим все сообщения из обоих futures отправки, и поскольку futures отправки используют немного разные задержки после отправки, сообщения также получаются с этими разными интервалами.
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
Это хорошее начало, но оно ограничивает нас лишь небольшим количеством futures:
двумя с join или тремя с join3. Давайте посмотрим, как мы можем работать с
большим количеством futures.