Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Потоки: Фьючерсы в последовательности

До сих пор в этой главе мы в основном рассматривали отдельные фьючерсы. Единственным большим исключением был асинхронный канал, который мы использовали. Вспомните, как мы использовали приёмник для нашего асинхронного канала ранее в этой главе в разделе «Передача сообщений». Асинхронный метод recv производит последовательность элементов с течением времени. Это экземпляр гораздо более общего паттерна, известного как поток (stream).

Мы видели последовательность элементов в Главе 13, когда рассматривали типаж Iterator в разделе Типаж Iterator и метод next, но между итераторами и асинхронным приёмником канала есть два различия. Первое различие — время: итераторы синхронны, а приёмник канала — асинхронен. Второе — API. При прямой работе с Iterator мы вызываем его синхронный метод next. С потоком trpl::Receiver в частности, мы вызывали асинхронный метод recv. В остальном эти API очень похожи, и это сходство — не совпадение. Поток — это асинхронная форма итерации. В то время как trpl::Receiver конкретно ожидает получения сообщений, универсальный API потока гораздо шире: он предоставляет следующий элемент так же, как Iterator, но асинхронно.

Сходство между итераторами и потоками в Rust означает, что мы можем фактически создать поток из любого итератора. Как и с итератором, мы можем работать с потоком, вызывая его метод next, а затем ожидая результат, как в Листинге 17-30.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-30: Создание потока из итератора и вывод его значений

Мы начинаем с массива чисел, который преобразуем в итератор, а затем вызываем map, чтобы удвоить все значения. Затем мы преобразуем итератор в поток с помощью функции trpl::stream_from_iter. Далее мы перебираем элементы в потоке по мере их поступления в цикле while let.

К сожалению, при попытке запустить код, он не компилируется, а вместо этого сообщает, что метод next недоступен:

error[E0599]: no method named `next` found for struct `Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
   = note: consider using `--verbose` to print the full type name to the console
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

Как объясняет этот вывод, причина ошибки компилятора в том, что нам нужен правильный типаж в области видимости, чтобы использовать метод next. Учитывая наше обсуждение до сих пор, вы могли бы разумно ожидать, что это будет типаж Stream, но на самом деле это StreamExt. Сокращение от extension (расширение), Ext — общий паттерн в сообществе Rust для расширения одного типажа другим.

Мы объясним типажи Stream и StreamExt немного подробнее в конце главы, но пока всё, что вам нужно знать, — это то, что типаж Stream определяет низкоуровневый интерфейс, который фактически объединяет типажи Iterator и Future. StreamExt предоставляет набор API более высокого уровня поверх Stream, включая метод next, а также другие служебные методы, подобные тем, которые предоставляет типаж Iterator. Stream и StreamExt ещё не являются частью стандартной библиотеки Rust, но большинство крейтов экосистемы используют одно и то же определение.

Исправление ошибки компилятора — добавить инструкцию use для trpl::StreamExt, как в Листинге 17-31.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-31: Успешное использование итератора в качестве основы для потока

С всеми этими частями, собранными вместе, этот код работает так, как мы хотим! Более того, теперь, когда у нас есть StreamExt в области видимости, мы можем использовать все его служебные методы, как с итераторами. Например, в Листинге 17-32 мы используем метод filter, чтобы отфильтровать всё, кроме кратных трём и пяти.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-32: Фильтрация потока с помощью метода StreamExt::filter

Конечно, это не очень интересно, так как мы могли бы сделать то же самое с обычными итераторами и без всякого async. Давайте посмотрим, что мы можем сделать, что действительно уникально для потоков.

Комбинирование потоков

Многие концепции естественным образом представляются как потоки: элементы, становящиеся доступными в очереди, фрагменты данных, извлекаемые постепенно из файловой системы, когда полный набор данных слишком велик для памяти компьютера, или данные, поступающие по сети с течением времени. Поскольку потоки являются фьючерсами, мы можем использовать их с любым другим видом фьючерса и комбинировать интересными способами. Например, мы можем группировать события, чтобы избежать слишком большого количества сетевых вызовов, устанавливать таймауты для последовательностей длительных операций или ограничивать события пользовательского интерфейса, чтобы избежать ненужной работы.

Давайте начнём с создания небольшого потока сообщений в качестве замены потока данных, который мы могли бы видеть из WebSocket или другого протокола реального времени, как показано в Листинге 17-33.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {
            println!("{message}");
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}
Listing 17-33: Использование приёмника rx как ReceiverStream

Сначала мы создаём функцию под названием get_messages, которая возвращает impl Stream<Item = String>. Для её реализации мы создаём асинхронный канал, перебираем первые 10 букв английского алфавита и отправляем их через канал.

Мы также используем новый тип: ReceiverStream, который преобразует приёмник rx из trpl::channel в Stream с методом next. Назад в main мы используем цикл while let, чтобы вывести все сообщения из потока.

Когда мы запускаем этот код, мы получаем именно те результаты, которые ожидаем:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

Опять же, мы могли бы сделать это с обычным API Receiver или даже с обычным API Iterator, так что давайте добавим функцию, требующую потоков: добавление таймаута, который применяется ко всем элементам потока, и задержки к отправляемым элементам, как показано в Листинге 17-34.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}
Listing 17-34: Использование метода StreamExt::timeout для установки ограничения по времени на элементы в потоке

Мы начинаем с добавления таймаута к потоку с помощью метода timeout, который поступает из типажа StreamExt. Затем мы обновляем тело цикла while let, потому что поток теперь возвращает Result. Вариант Ok указывает, что сообщение прибыло вовремя; вариант Err указывает, что таймаут истёк до того, как какое-либо сообщение прибыло. Мы используем match для этого результата и либо выводим сообщение при успешном получении, либо выводим уведомление о таймауте. Наконец, обратите внимание, что мы фиксируем сообщения после применения к ним таймаута, потому что вспомогательная функция таймаута производит поток, который нужно зафиксировать, чтобы его можно было опросить.

Однако, поскольку между сообщениями нет задержек, этот таймаут не изменяет поведение программы. Давайте добавим переменную задержку к отправляемым сообщениям, как показано в Листинге 17-35.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-35: Отправка сообщений через tx с асинхронной задержкой, не делая get_messages асинхронной функцией

В get_messages мы используем метод итератора enumerate с массивом messages, чтобы получить индекс каждого отправляемого элемента вместе с самим элементом. Затем мы применяем 100-миллисекундную задержку к элементам с чётными индексами и 300-миллисекундную задержку к элементам с нечётными индексами, чтобы имитировать различные задержки, которые мы могли бы видеть из потока сообщений в реальном мире. Поскольку наш таймаут составляет 200 миллисекунд, это должно повлиять на половину сообщений.

Чтобы спать между сообщениями в функции get_messages без блокировки, нам нужно использовать async. Однако мы не можем сделать get_messages самой асинхронной функцией, потому что тогда мы вернём Future<Output = Stream<Item = String>> вместо Stream<Item = String>>. Вызывающий должен будет дождаться get_messages сам, чтобы получить доступ к потоку. Но помните: всё в данном фьючерсе происходит линейно; конкурентность происходит между фьючерсами. Ожидание get_messages потребовало бы, чтобы оно отправило все сообщения, включая задержку сна между каждым сообщением, прежде чем вернуть поток приёмника. В результате таймаут был бы бесполезен. Все задержки в потоке произошли бы до того, как поток стал бы вообще доступен.

Вместо этого мы оставляем get_messages обычной функцией, возвращающей поток, и мы порождаем задачу для обработки асинхронных вызовов sleep.

Примечание: Вызов spawn_task таким образом работает, потому что мы уже настроили нашу среду выполнения; если бы мы этого не сделали, это вызвало бы панику. Другие реализации выбирают разные компромиссы: они могут порождать новую среду выполнения и избегать паники, но получат немного дополнительных накладных расходов, или они могут просто не предоставлять автономный способ порождать задачи без ссылки на среду выполнения. Убедитесь, что вы знаете, какой компромисс выбрала ваша среда выполнения, и пишите код соответственно!

Теперь наш код имеет гораздо более интересный результат. Между каждой другой парой сообщений возникает ошибка Problem: Elapsed(()).

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

Таймаут не мешает сообщениям в конечном итоге прибыть. Мы всё ещё получаем все исходные сообщения, потому что наш канал неограниченный: он может содержать столько сообщений, сколько мы можем вместить в память. Если сообщение не прибывает до таймаута, наш обработчик потока учтёт это, но когда он снова опрашивает поток, сообщение, возможно, уже прибыло.

Вы можете получить другое поведение, если нужно, используя другие виды каналов или, в общем случае, другие виды потоков. Давайте посмотрим на один из них на практике, объединив поток интервалов времени с этим потоком сообщений.

Объединение потоков

Сначала давайте создадим другой поток, который будет излучать элемент каждую миллисекунду, если мы позволим ему работать напрямую. Для простоты мы можем использовать функцию sleep для отправки сообщения с задержкой и объединить её с тем же подходом, который мы использовали в get_messages, создавая поток из канала. Разница в том, что на этот раз мы будем отправлять обратно счётчик прошедших интервалов, поэтому тип возвращаемого значения будет impl Stream<Item = u32>, и мы можем назвать функцию get_intervals (см. Листинг 17-36).

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-36: Создание потока со счётчиком, который будет излучаться каждую миллисекунду

Мы начинаем с определения count в задаче. (Мы могли бы определить его и вне задачи, но ограничить область видимости любой данной переменной яснее.) Затем мы создаём бесконечный цикл. Каждая итерация цикла асинхронно спит одну миллисекунду, увеличивает счётчик, а затем отправляет его через канал. Поскольку всё это обёрнуто в задачу, созданную spawn_task, всё это — включая бесконечный цикл — будет очищено вместе со средой выполнения.

Такой бесконечный цикл, который заканчивается только тогда, когда вся среда выполнения разбирается, довольно распространён в асинхронном Rust: многие программы должны работать бесконечно. С async это не блокирует ничего другого, при условии, что в каждой итерации цикла есть хотя бы одна точка await.

Теперь, обратно в асинхронном блоке нашей функции main, мы можем попытаться объединить потоки messages и intervals, как показано в Листинге 17-37.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals();
        let merged = messages.merge(intervals);

        while let Some(result) = merged.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-37: Попытка объединить потоки messages и intervals

Мы начинаем с вызова get_intervals. Затем мы объединяем потоки messages и intervals с помощью метода merge, который объединяет несколько потоков в один поток, производящий элементы из любого из исходных потоков, как только элементы становятся доступными, не накладывая никакого конкретного порядка. Наконец, мы перебираем этот объединённый поток вместо messages.

На данный момент ни messages, ни intervals не нужно фиксировать или делать изменяемыми, потому как оба будут объединены в один поток merged. Однако этот вызов merge не компилируется! (Также не компилируется вызов next в цикле while let, но мы к этому вернёмся.) Это потому, что два потока имеют разные типы. Поток messages имеет тип Timeout<impl Stream<Item = String>>, где Timeout — это тип, который реализует Stream для вызова timeout. Поток intervals имеет тип impl Stream<Item = u32>. Чтобы объединить эти два потока, нам нужно преобразовать один из них, чтобы он соответствовал другому. Мы переработаем поток интервалов, потому что сообщения уже в том базовом формате, который мы хотим, и должны обрабатывать ошибки таймаута (см. Листинг 17-38).

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-38: Выравнивание типа потока intervals с типом потока messages

Сначала мы можем использовать вспомогательный метод map, чтобы преобразовать intervals в строку. Во-вторых, нам нужно сопоставить Timeout из messages. Поскольку мы на самом деле не хотим таймаут для intervals, мы можем просто создать таймаут, который длиннее, чем другие используемые нами длительности. Здесь мы создаём 10-секундный таймаут с помощью Duration::from_secs(10). Наконец, нам нужно сделать stream изменяемым, чтобы вызовы next в цикле while let могли перебирать поток, и зафиксировать его, чтобы это было безопасно сделать. Это почти приводит нас туда, где нам нужно быть. Всё типизируется. Если вы запустите это, однако, будет две проблемы. Во-первых, он никогда не остановится! Вам нужно будет остановить его с помощью ctrl-c. Во-вторых, сообщения из английского алфавита будут затеряны среди всех сообщений счётчика интервалов:

--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--

Листинг 17-39 показывает один способ решения этих последних двух проблем.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .throttle(Duration::from_millis(100))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-39: Использование throttle и take для управления объединёнными потоками

Сначала мы используем метод throttle на потоке intervals, чтобы он не перегружал поток messages. Ограничение частоты (throttling) — это способ ограничения скорости, с которой функция будет вызываться — или, в этом случае, как часто поток будет опрашиваться. Раз в 100 миллисекунд должно подойти, потому что примерно так часто прибывают наши сообщения.

Чтобы ограничить количество элементов, которые мы примем из потока, мы применяем метод take к объединённому потоку merged, потому что хотим ограничить окончательный вывод, а не только один поток или другой.

Теперь, когда мы запускаем программу, она останавливается после извлечения 20 элементов из потока, и интервалы не перегружают сообщения. Мы также не получаем Interval: 100 или Interval: 200 и так далее, а вместо этого получаем Interval: 1, Interval: 2 и так далее — даже хотя у нас есть исходный поток, который может производить событие каждую миллисекунду. Это потому, что вызов throttle производит новый поток, который оборачивает исходный поток так, что исходный поток опрашивается только с частотой ограничения, а не на своей собственной «родной» частоте. У нас нет кучи необработанных сообщений интервалов, которые мы выбираем игнорировать. Вместо этого мы никогда не производим эти сообщения интервалов с самого начала! Это присущая Rust «ленивость» фьючерсов, позволяющая нам выбирать наши характеристики производительности.

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

Осталось последнее, что нам нужно обработать: ошибки! С обоими этими каналовыми потоками вызовы send могут завершиться ошибкой, когда другая сторона канала закрывается — и это просто вопрос того, как среда выполнения выполняет фьючерсы, составляющие поток. До сих пор мы игнорировали эту возможность, вызывая unwrap, но в хорошо написанном приложении мы должны явно обработать ошибку, как минимум завершив цикл, чтобы не пытаться отправлять больше сообщений. Листинг 17-40 показывает простую стратегию ошибок: вывести проблему, а затем break из циклов.

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-40: Обработка ошибок и завершение циклов

Как обычно, правильный способ обработки ошибки отправки сообщения будет разным; просто убедитесь, что у вас есть стратегия.

Теперь, когда мы увидели много async на практике, давайте сделаем шаг назад и углубимся в некоторые детали того, как Future, Stream и другие ключевые типажи, которые Rust использует для работы async.