Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Собираем всё вместе: Futures, задачи и потоки

Как мы видели в Главе 16, потоки предоставляют один из подходов к конкурентности. В этой главе мы рассмотрели другой подход: использование async с futures и потоками (streams). Если вы задаётесь вопросом, когда выбирать один метод над другим, ответ: это зависит! И во многих случаях выбор — не потоки или async, а потоки и async.

Многие операционные системы десятилетиями предоставляют модели конкурентности на основе потоков, и многие языки программирования поддерживают их как следствие. Однако эти модели не лишены компромиссов. На многих ОС они используют значительный объём памяти для каждого потока и сопряжены с некоторыми накладными расходами на запуск и завершение. Потоки также являются вариантом только тогда, когда ваша операционная система и аппаратное обеспечение их поддерживают. В отличие от мейнстримных настольных и мобильных компьютеров, некоторые встроенные системы вообще не имеют ОС, поэтому у них также нет потоков.

Модель async предоставляет другой — и в конечном счёте дополняющий — набор компромиссов. В модели async конкурентные операции не требуют своих собственных потоков. Вместо этого они могут выполняться в задачах (tasks), как когда мы использовали trpl::spawn_task для запуска работы из синхронной функции в разделе о потоках (streams). Задача похожа на поток, но вместо управления операционной системой она управляется кодом на уровне библиотеки: рантаймом (runtime).

В предыдущем разделе мы видели, что можем построить поток (stream), используя асинхронный канал и порождая асинхронную задачу, которую можно вызвать из синхронного кода. Мы можем сделать то же самое с потоком (thread). В Листинге 17-40 мы использовали trpl::spawn_task и trpl::sleep. В Листинге 17-41 мы заменяем их на API thread::spawn и thread::sleep из стандартной библиотеки в функции get_intervals.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::{pin::pin, thread, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // This is *not* `trpl::spawn` but `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}
Listing 17-41: Использование API std::thread вместо асинхронных API trpl для функции get_intervals

Если вы запустите этот код, вывод будет идентичен выводу Листинга 17-40. И обратите внимание, как мало изменяется здесь с точки зрения вызывающего кода. Более того, даже если одна из наших функций породила асинхронную задачу в рантайме, а другая — поток ОС, результирующие потоки (streams) не были затронуты этими различиями.

Несмотря на их сходство, эти два подхода ведут себя очень по-разному, хотя в этом очень простом примере нам может быть трудно это измерить. Мы могли бы породить миллионы асинхронных задач на любом современном персональном компьютере. Если бы мы попытались сделать это с потоками, мы буквально исчерпали бы память!

Однако есть причина, по которой эти API так похожи. Потоки действуют как граница для наборов синхронных операций; конкурентность возможна между потоками. Задачи действуют как граница для наборов асинхронных операций; конкурентность возможна как между, так и внутри задач, потому что задача может переключаться между futures в своём теле. Наконец, futures — это наиболее гранулярная единица конкурентности в Rust, и каждый future может представлять дерево других futures. Рантайм — а именно его исполнитель (executor) — управляет задачами, а задачи управляют futures. В этом отношении задачи похожи на лёгкие, управляемые рантаймом потоки с дополнительными возможностями, которые возникают из-за управления рантаймом, а не операционной системой.

Это не означает, что асинхронные задачи всегда лучше потоков (или наоборот). Конкурентность с потоками в некоторых отношениях является более простой программистской моделью, чем конкурентность с async. Это может быть силой или слабостью. Потоки несколько “fire and forget” (запустил и забыл); у них нет родного эквивалента future, поэтому они просто выполняются до завершения без прерываний, кроме как самой операционной системой. То есть у них нет встроенной поддержки внутризадачной (intratask) конкурентности, как у futures. Потоки в Rust также не имеют механизмов для отмены (cancellation) — темы, которую мы явно не охватывали в этой главе, но которая подразумевалась тем фактом, что всякий раз, когда мы завершали future, его состояние корректно очищалось.

Эти ограничения также делают потоки более трудными для композиции, чем futures. Например, гораздо сложнее использовать потоки для построения вспомогательных функций, таких как методы timeout и throttle, которые мы построили ранее в этой главе. Тот факт, что futures являются более богатыми структурами данных, означает, что их можно компоновать более естественно, как мы видели.

Таким образом, задачи дают нам дополнительный контроль над futures, позволяя выбирать, где и как их группировать. И оказывается, что потоки и задачи часто очень хорошо работают вместе, потому что задачи могут (по крайней мере, в некоторых рантаймах) перемещаться между потоками. Фактически, под капотом рантайм, которым мы пользовались — включая функции spawn_blocking и spawn_task — по умолчанию многопоточный! Многие рантаймы используют подход под названием work stealing (кража работы), чтобы прозрачно перемещать задачи между потоками на основе текущей загрузки потоков, улучшая общую производительность системы. Этот подход фактически требует и потоков, и задач, а следовательно, и futures.

Думая о том, какой метод использовать, учитывайте эти эмпирические правила:

  • Если работа очень параллелизуема, например, обработка набора данных, где каждая часть может обрабатываться отдельно, потоки — лучший выбор.
  • Если работа очень конкурентна, например, обработка сообщений из множества разных источников, которые могут поступать с разными интервалами или разными скоростями, async — лучший выбор.

И если вам нужны и параллелизм, и конкурентность, вам не нужно выбирать между потоками и async. Вы можете использовать их вместе свободно, позволяя каждому играть ту роль, для которой он лучше всего подходит. Например, Листинг 17-42 показывает довольно распространённый пример такой смеси в реальном коде на Rust.

Filename: src/main.rs
extern crate trpl; // for mdbook test

use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    thread::spawn(move || {
        for i in 1..11 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    trpl::run(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}
Listing 17-42: Отправка сообщений с блокирующим кодом в потоке и ожидание сообщений в асинхронном блоке

Мы начинаем с создания асинхронного канала, затем порождаем поток, который принимает владение стороной отправителя канала. Внутри потока мы отправляем числа от 1 до 10, засыпая на секунду между каждым. Наконец, мы запускаем future, созданный с помощью асинхронного блока, переданного в trpl::run, как мы делали на протяжении всей главы. В этом future мы ожидаем эти сообщения, как в других примерах передачи сообщений, которые мы видели.

Возвращаясь к сценарию, с которого мы начали главу, представьте, что вы запускаете набор задач кодирования видео, используя выделенный поток (потому что кодирование видео — вычислительно ограниченная задача), но уведомляете UI об завершении этих операций с помощью асинхронного канала. Существуют бесчисленные примеры таких комбинаций в реальных сценариях использования.

Краткое содержание

Это не последний раз, когда вы увидите конкурентность в этой книге. Проект в Главе 21 применит эти концепции в более реалистичной ситуации, чем более простые примеры, рассмотренные здесь, и более прямо сравнит решение проблем с использованием потоков и задач.

Независимо от того, какой из этих подходов вы выберете, Rust даёт вам инструменты, необходимые для написания безопасного, быстрого, конкурентного кода — будь то высокопроизводительный веб-сервер или встроенная операционная система.

Далее мы поговорим об идиоматичных способах моделирования проблем и структурирования решений по мере роста ваших программ на Rust. Кроме того, мы обсудим, как идиомы Rust соотносятся с теми, с которыми вы могли быть знакомы из объектно-ориентированного программирования.