Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Работа с любым количеством фьючерсов

Когда мы перешли от использования двух фьючерсов к трём в предыдущем разделе, нам также пришлось перейти с join на join3. Было бы неудобно каждый раз при изменении количества фьючерсов, которые мы хотим объединить, вызывать другую функцию. К счастью, у нас есть макроформа join!, в которую можно передать произвольное количество аргументов. Она также самостоятельно обрабатывает ожидание фьючерсов. Таким образом, мы можем переписать код из листинга 17-13, используя join! вместо join3, как в листинге 17-14.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
Listing 17-14: Использование join! для ожидания нескольких фьючерсов

Это определённо улучшение по сравнению с переключением между join, join3, join4 и так далее! Однако даже эта макроформа работает только когда мы заранее знаем количество фьючерсов. В реальном Rust, однако, помещение фьючерсов в коллекцию и последующее ожидание завершения некоторых или всех из них — распространённый шаблон.

Чтобы проверить все фьючерсы в некоторой коллекции, нам нужно перебрать и объединить все из них. Функция trpl::join_all принимает любой тип, реализующий типаж Iterator, о котором вы узнали в главе 13 Типаж Iterator и метод next, поэтому она кажется идеальным решением. Давайте попробуем поместить наши фьючерсы в вектор и заменить join! на join_all, как показано в листинге 17-15.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures = vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}
Listing 17-15: Хранение анонимных фьючерсов в векторе и вызов join_all

К сожалению, этот код не компилируется. Вместо этого мы получаем эту ошибку:

error[E0308]: mismatched types
  --> src/main.rs:45:37
   |
10 |         let tx1_fut = async move {
   |                       ---------- the expected `async` block
...
24 |         let rx_fut = async {
   |                      ----- the found `async` block
...
45 |         let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                     ^^^^^^ expected `async` block, found a different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
              found `async` block `{async block@src/main.rs:24:22: 24:27}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object

Это может удивить. В конце концов, ни один из асинхронных блоков ничего не возвращает, поэтому каждый из них produces Future<Output = ()>. Помните, что Future — это типаж, и компилятор создаёт уникальный перечисление для каждого асинхронного блока. Вы не можете поместить два разных написанных вручную структуры в Vec, и то же правило применяется к разным перечислениям, сгенерированным компилятором.

Чтобы это заработало, нам нужно использовать объекты типажей, как мы это делали в разделе «Возврат ошибок из функции run» в главе 12. (Мы подробно рассмотрим объекты типажей в главе 18.) Использование объектов типажей позволяет нам рассматривать каждый из анонимных фьючерсов, производимых этими типами, как один и тот же тип, потому что все они реализуют типаж Future.

Примечание: В разделе Использование перечисления для хранения нескольких значений в главе 8 мы обсудили другой способ включить несколько типов в Vec: использование перечисления для представления каждого типа, который может появиться в векторе. Мы не можем сделать это здесь, однако. Во-первых, у нас нет способа назвать разные типы, потому что они анонимны. Во-вторых, причина, по которой мы вообще полезли за вектором и join_all, — это возможность работать с динамической коллекцией фьючерсов, где нас интересует только то, что у них одинаковый тип вывода.

Мы начинаем с обёртывания каждого фьючерса в vec! в Box::new, как показано в листинге 17-16.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-16: Использование Box::new для выравнивания типов фьючерсов в Vec

К сожалению, этот код всё ещё не компилируется. Фактически, мы получаем ту же базовую ошибку, что и раньше, как для второго, так и для третьего вызовов Box::new, а также новые ошибки, касающиеся типажа Unpin. Мы вернёмся к ошибкам Unpin через момент. Сначала давайте исправим ошибки типов в вызовах Box::new, явно аннотируя тип переменной futures (см. листинг 17-17).

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Box<dyn Future<Output = ()>>> =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-17: Исправление остальных ошибок несоответствия типов с помощью явного объявления типа

Это объявление типа немного сложное, поэтому давайте разберём его:

  1. Самый внутренний тип — это сам фьючерс. Мы явно отмечаем, что вывод фьючерса — это единичный тип (), записывая Future<Output = ()>.
  2. Затем мы аннотируем типаж с помощью dyn, чтобы пометить его как динамический.
  3. Вся ссылка на типаж обёрнута в Box.
  4. Наконец, мы явно указываем, что futures — это Vec, содержащий эти элементы.

Это уже сильно помогло. Теперь при запуске компилятора мы получаем только ошибки, упоминающие Unpin. Хотя их три, их содержание очень похоже.

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
   --> src/main.rs:49:24
    |
49  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
   --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
  --> src/main.rs:49:9
   |
49 |         trpl::join_all(futures).await;
    |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
  --> src/main.rs:49:33
   |
49 |         trpl::join_all(futures).await;
    |                                 ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `async_await` (bin "async_await") due to 3 previous errors

Это много информации, поэтому давайте разберём её. Первая часть сообщения говорит нам, что первый асинхронный блок (src/main.rs:8:23: 20:10) не реализует типаж Unpin и предлагает использовать pin! или Box::pin для его решения. Позже в этой главе мы углубимся в несколько деталей о Pin и Unpin. На данный момент, однако, мы можем просто следовать совету компилятора, чтобы выбраться из тупика. В листинге 17-18 мы начинаем с импорта Pin из std::pin. Затем мы обновляем аннотацию типа для futures, оборачивая каждый Box в Pin. Наконец, мы используем Box::pin для закрепления самих фьючерсов.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::pin::Pin;

// -- snip --

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
            vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-18: Использование Pin и Box::pin для проверки типа Vec

Если мы скомпилируем и запустим это, мы наконец получим вывод, на который надеялись:

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

Фух!

Здесь есть ещё кое-что, что стоит изучить. Во-первых, использование Pin<Box<T>> добавляет небольшие накладные расходы от размещения этих фьючерсов в куче с помощью Box — и мы делаем это только для того, чтобы типы совпали. Нам на самом деле не нужна аллокация в куче, в конце концов: эти фьючерсы локальны для этой конкретной функции. Как отмечалось ранее, Pin сам по себе является обёртывающим типом, поэтому мы можем получить преимущество наличия одного типа в Vec — исходная причина, по которой мы полезли за Box — без выполнения аллокации в куче. Мы можем использовать Pin напрямую с каждым фьючерсом, используя макрос std::pin::pin.

Однако мы всё ещё должны быть явны о типе закреплённой ссылки; иначе Rust по-прежнему не будет знать, как интерпретировать их как динамические объекты типажей, какими нам нужно, чтобы они были в Vec. Поэтому мы добавляем pin в наш список импортов из std::pin. Затем мы можем pin! каждый фьючерс при его определении и определить futures как Vec, содержащий закреплённые изменяемые ссылки на динамический тип фьючерса, как в листинге 17-19.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::pin::{Pin, pin};

// -- snip --

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            // --snip--
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}
Listing 17-19: Использование Pin напрямую с макросом pin! для избежания ненужных аллокаций в куче

Мы дошли так далеко, игнорируя тот факт, что у нас могут быть разные типы Output. Например, в листинге 17-20 анонимный фьючерс для a реализует Future<Output = u32>, анонимный фьючерс для b реализует Future<Output = &str>, и анонимный фьючерс для c реализует Future<Output = bool>.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");
    });
}
Listing 17-20: Три фьючерса с различными типами

Мы можем использовать trpl::join! для их ожидания, потому что он позволяет передавать в него несколько типов фьючерсов и производит кортеж этих типов. Мы не можем использовать trpl::join_all, потому что она требует, чтобы все переданные фьючерсы имели одинаковый тип. Помните, что эта ошибка и стала началом нашего приключения с Pin!

Это фундаментальный компромисс: мы можем либо работать с динамическим количеством фьючерсов с помощью join_all, при условии, что у них все одинаковый тип, либо работать с фиксированным количеством фьючерсов с помощью функций join или макроса join!, даже если у них разные типы. Это тот же сценарий, с которым мы столкнулись бы при работе с любыми другими типами в Rust. Фьючерсы не особенные, даже хотя у нас есть некоторый приятный синтаксис для работы с ними, и это хорошо.

Гонка фьючерсов

Когда мы «объединяем» фьючерсы с помощью семейства функций и макросов join, мы требуем, чтобы все они завершились, прежде чем мы двинемся дальше. Иногда, однако, нам нужно, чтобы какой-нибудь фьючерс из набора завершился, прежде чем мы двинемся дальше — что-то вроде гонки одного фьючерса против другого.

В листинге 17-21 мы снова используем trpl::race для запуска двух фьючерсов, slow и fast, друг против друга.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            println!("'slow' started.");
            trpl::sleep(Duration::from_millis(100)).await;
            println!("'slow' finished.");
        };

        let fast = async {
            println!("'fast' started.");
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'fast' finished.");
        };

        trpl::race(slow, fast).await;
    });
}
Listing 17-21: Использование race для получения результата того фьючерса, который завершится первым

Каждый фьючерс печатает сообщение, когда начинает работать, делает паузу на некоторое количество времени, вызывая и ожидая sleep, а затем печатает другое сообщение, когда завершается. Затем мы передаём оба, slow и fast, в trpl::race и ждём, пока один из них не завершится. (Результат здесь не слишком удивителен: побеждает fast.) В отличие от того, когда мы использовали race в разделе «Наша первая асинхронная программа», мы просто игнорируем возвращаемый экземпляр Either, потому что всё интересное поведение происходит в теле асинхронных блоков.

Обратите внимание, что если поменять порядок аргументов у race, порядок сообщений «started» изменится, даже если фьючерс fast всегда завершается первым. Это потому, что реализация этой конкретной функции race нечестна. Она всегда запускает фьючерсы, переданные в качестве аргументов, в том порядке, в котором они переданы. Другие реализации честны и будут случайно выбирать, какой фьючерс опрашивать первым. Независимо от того, честна ли реализация race, которую мы используем, однако, один из фьючерсов будет работать до первого await в своём теле, прежде чем другая задача сможет начаться.

Вспомните из раздела Наша первая асинхронная программа, что на каждой точке ожидания Rust даёт рантайму возможность приостановить задачу и переключиться на другую, если ожидаемый фьючерс не готов. Обратное также верно: Rust только приостанавливает асинхронные блоки и возвращает управление рантайму в точке ожидания. Всё между точками ожидания синхронно.

Это означает, что если вы выполняете кучу работы в асинхронном блоке без точки ожидания, этот фьючерс заблокирует прогресс любых других фьючерсов. Иногда это называют тем, что один фьючерс голодает другие фьючерсы. В некоторых случаях это может быть не большой проблемой. Однако, если вы выполняете какой-то дорогой подготовительный или долгий рабочий процесс, или если у вас есть фьючерс, который будет выполнять какую-то particular задачу бесконечно, вам нужно подумать о том, когда и где вернуть управление рантайму.

С другой стороны, если у вас есть долгие блокирующие операции, асинхронность может быть полезным инструментом для предоставления способов, которыми разные части программы могут относиться друг к другу.

Но как вы вернули бы управление рантайму в этих случаях?

Передача управления рантайму

Давайте смоделируем долгую операцию. Листинг 17-22 вводит функцию slow.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-22: Использование std::thread::sleep для имитации медленных операций

Этот код использует std::thread::sleep вместо trpl::sleep, так что вызов slow заблокирует текущий поток на некоторое количество миллисекунд. Мы можем использовать slow в качестве замены реальным операциям в реальном мире, которые и долгие, и блокирующие.

В листинге 17-23 мы используем slow для имитации выполнения такого рода CPU-ограниченной работы в паре фьючерсов.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-23: Использование thread::sleep для имитации медленных операций

Для начала каждый фьючерс возвращает управление рантайму только после выполнения кучи медленных операций. Если вы запустите этот код, вы увидите этот вывод:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

Как и в нашем предыдущем примере, race всё ещё завершается, как только a готов. Однако нет переплетения между двумя фьючерсами. Фьючерс a выполняет всю свою работу до того, как будет ожидаться вызов trpl::sleep, затем фьючерс b выполняет всю свою работу до своего собственного вызова trpl::sleep, и наконец фьючерс a завершается. Чтобы позволить обоим фьючерсам делать прогресс между своими медленными задачами, нам нужны точки ожидания, чтобы мы могли вернуть управление рантайму. Это значит, что нам нужно что-то, что мы можем ожидать!

Мы уже видим, как такого рода передача управления происходит в листинге 17-23: если бы мы удалили trpl::sleep в конце фьючерса a, он бы завершился без того, чтобы фьючерс b запускался вообще. Давайте попробуем использовать функцию sleep в качестве отправной точки для того, чтобы операции могли поочерёдно делать прогресс, как показано в листинге 17-24.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-24: Использование sleep для того, чтобы операции могли поочерёдно делать прогресс

В листинге 17-24 мы добавляем вызовы trpl::sleep с точками ожидания между каждым вызовом slow. Теперь работа двух фьючерсов переплетается:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

Фьючерс a всё ещё работает немного, прежде чем передать управление b, потому что он вызывает slow до того, как когда-либо вызывает trpl::sleep, но после этого фьючерсы меняются каждый раз, когда один из них достигает точки ожидания. В этом случае мы сделали это после каждого вызова slow, но мы могли бы разбить работу каким-либо способом, который для нас наиболее логичен.

На самом деле мы не хотим здесь спать, однако: мы хотим делать прогресс так быстро, как можем. Нам просто нужно вернуть управление рантайму. Мы можем сделать это напрямую, используя функцию yield_now. В листинге 17-25 мы заменяем все эти вызовы sleep на yield_now.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-25: Использование yield_now для того, чтобы операции могли поочерёдно делать прогресс

Этот код как яснее отражает фактическое намерение, так и может быть значительно быстрее, чем использование sleep, потому что таймеры, такие как тот, который использует sleep, часто имеют ограничения на то, насколько они могут быть гранулярными. Версия sleep, которую мы используем, например, всегда будет спать как минимум миллисекунду, даже если мы передаём ей Duration в одну наносекунду. Опять же, современные компьютеры быстры: они могут сделать много за одну миллисекунду!

Вы можете увидеть это сами, настроив небольшой бенчмарк, такой как в листинге 17-26. (Это не особенно строгий способ тестирования производительности, но его достаточно, чтобы показать разницу здесь.)

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::{Duration, Instant};

fn main() {
    trpl::run(async {
        let one_ns = Duration::from_nanos(1);
        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::sleep(one_ns).await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'sleep' version finished after {} seconds.",
            time.as_secs_f32()
        );

        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::yield_now().await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'yield' version finished after {} seconds.",
            time.as_secs_f32()
        );
    });
}
Listing 17-26: Сравнение производительности sleep и yield_now

Здесь мы пропускаем весь вывод статуса, передаём Duration в одну наносекунду в trpl::sleep и позволяем каждому фьючерсу работать самому, без переключения между фьючерсами. Затем мы запускаем 1000 итераций и видим, сколько времени занимает фьючерс, использующий trpl::sleep, по сравнению с фьючерсом, использующим trpl::yield_now.

Версия с yield_now намного быстрее!

Это означает, что асинхронность может быть полезна даже для вычислительно-ограниченных задач, в зависимости от того, что ещё делает ваша программа, потому что она предоставляет полезный инструмент для структурирования отношений между разными частями программы. Это форма кооперативной многозадачности, где каждый фьючерс имеет возможность определять, когда он передаёт управление через точки ожидания. Каждый фьючерс, следовательно, также имеет ответственность избегать блокировки слишком долго. В некоторых встроенных операционных системах на Rust это единственный вид многозадачности!

В реальном коде вы обычно не будете чередовать вызовы функций с точками ожидания на каждой строке, конечно. Хотя передача управления таким образом относительно недорога, она не бесплатна. Во многих случаях попытка разбить вычислительно-ограниченную задачу может сделать её значительно медленнее, поэтому иногда для общей производительности лучше позволить операции кратко блокироваться. Всегда измеряйте, чтобы увидеть, каковы фактические узкие места производительности вашего кода. Однако базовый динамик важно иметь в виду, если вы видите много работы, происходящей последовательно, которую вы ожидали видеть параллельно!

Построение наших собственных асинхронных абстракций

Мы также можем компоновать фьючерсы вместе для создания новых паттернов. Например, мы можем построить функцию timeout с асинхронными строительными блоками, которые у нас уже есть. Когда мы закончим, результат будет ещё одним строительным блоком, который мы могли бы использовать для создания ещё более асинхронных абстракций.

Листинг 17-27 показывает, как мы ожидаем, что этот timeout будет работать с медленным фьючерсом.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_millis(100)).await;
            "I finished!"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}
Listing 17-27: Использование нашего предполагаемого timeout для запуска медленной операции с ограничением по времени

Давайте реализуем это! Для начала давайте подумаем об API для timeout:

  • Он должен быть асинхронной функцией сам, чтобы мы могли его ожидать.
  • Его первый параметр должен быть фьючерсом для запуска. Мы можем сделать его обобщённым, чтобы он работал с любым фьючерсом.
  • Его второй параметр будет максимальным временем ожидания. Если мы используем Duration, это облегчит передачу в trpl::sleep.
  • Он должен возвращать Result. Если фьючерс завершается успешно, Result будет Ok со значением, произведённым фьючерсом. Если истекает таймаут, Result будет Err с продолжительностью, которую ждал таймаут.

Листинг 17-28 показывает это объявление.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}
Listing 17-28: Определение сигнатуры timeout

Это удовлетворяет нашим целям для типов. Теперь давайте подумаем о поведении, которое нам нужно: мы хотим гонять фьючерс, переданный нам, против продолжительности. Мы можем использовать trpl::sleep, чтобы сделать фьючерс-таймер из продолжительности, и использовать trpl::race для запуска этого таймера с фьючерсом, который передаёт вызывающий.

Мы также знаем, что race нечестна, опрашивая аргументы в том порядке, в котором они переданы. Таким образом, мы передаём future_to_try в race первым, чтобы он получил шанс завершиться, даже если max_time очень короткая продолжительность. Если future_to_try завершается первым, race вернёт Left с выводом из future_to_try. Если таймер завершается первым, race вернёт Right с выводом таймера ().

В листинге 17-29 мы сопоставляем результат ожидания trpl::race.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::race(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
Listing 17-29: Определение timeout с помощью race и sleep

Если future_to_try успешен и мы получаем Left(output), мы возвращаем Ok(output). Если таймер сна истекает вместо этого и мы получаем Right(()), мы игнорируем () с помощью _ и возвращаем Err(max_time) вместо этого.

С этим у нас есть работающий timeout, построенный из двух других асинхронных помощников. Если мы запустим наш код, он напечатает режим неудачи после таймаута:

Failed after 2 seconds

Поскольку фьючерсы компонуются с другими фьючерсами, вы можете строить действительно мощные инструменты, используя меньшие асинхронные строительные блоки. Например, вы можете использовать тот же подход, чтобы комбинировать таймауты с повторными попытками, и, в свою очередь, использовать их с операциями, такими как сетевые вызовы (один из примеров из начала главы).

На практике вы обычно будете работать непосредственно с async и await, и вторично с функциями и макросами, такими как join, join_all, race и так далее. Вам нужно будет доставать pin время от времени, чтобы использовать фьючерсы с этими API.

Мы теперь видели несколько способов работы с несколькими фьючерсами одновременно. Далее мы посмотрим, как мы можем работать с несколькими фьючерсами в последовательности с течением времени с помощью потоков. Вот ещё несколько вещей, которые вы, возможно, захотите рассмотреть сначала:

  • Мы использовали Vec с join_all для ожидания завершения всех фьючерсов в некоторой группе. Как бы вы использовали Vec для обработки группы фьючерсов в последовательности вместо этого? Каковы компромиссы при этом?
  • Посмотрите на тип futures::stream::FuturesUnordered из крейта futures. Чем использование его будет отличаться от использования Vec? (Не беспокойтесь о том, что он из части stream крейта; он отлично работает с любой коллекцией фьючерсов.)