Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Конкурентность с общим состоянием

Передача сообщений — хороший способ работы с конкурентностью, но это не единственный вариант. Другой метод — это доступ нескольких потоков к одним и тем же общим данным. Ещё раз вспомните лозунг из документации языка Go: «Не общайтесь, разделяя память».

Как выглядит общение через разделение памяти? И почему сторонники передачи сообщений предостерегают от использования разделения памяти?

В каком-то смысле каналы в любом языке программирования похожи на единоличное владение, потому что после передачи значения по каналу вы больше не должны использовать это значение. Конкурентность с общей памятью похожа на множественное владение: несколько потоков могут одновременно обращаться к одному и тому же участку памяти. Как вы видели в главе 15, где умные указатели сделали множественное владение возможным, множественное владение может усложнить ситуацию, потому что эти разные владельцы требуют управления. Система типов и правила владения Rust значительно помогают сделать это управление правильным. Например, рассмотрим мьютексы, один из наиболее распространённых примитивов конкурентности для общей памяти.

Использование мьютексов для предоставления доступа к данным из одного потока в каждый момент времени

Мьютекс — это сокращение от взаимного исключения, поскольку мьютекс позволяет только одному потоку обращаться к некоторым данным в любой данный момент времени. Чтобы получить доступ к данным в мьютексе, поток должен сначала сигнализировать, что он хочет получить доступ, запрашивая получение блокировки мьютекса. Блокировка — это структура данных, которая является частью мьютекса и отслеживает, кто в данный момент имеет исключительный доступ к данным. Поэтому мьютекс описывается как защищающий хранимые им данные через систему блокировок.

Мьютексы имеют репутацию сложных в использовании, потому что нужно помнить два правила:

  1. Вы должны попытаться получить блокировку перед использованием данных.
  2. Когда вы закончите работу с данными, защищёнными мьютексом, вы должны разблокировать данные, чтобы другие потоки могли получить блокировку.

Для реальной метафоры мьютекса представьте панельную дискуссию на конференции с только одним микрофоном. Прежде чем участник панели сможет говорить, он должен попросить или сигнализировать, что хочет использовать микрофон. Когда он получает микрофон, он может говорить сколько угодно долго, а затем передать микрофон следующему участнику, который запросил слово. Если участник забывает передать микрофон после завершения, никто больше не может говорить. Если управление общим микрофоном идёт не так, панель не будет работать по плану!

Управление мьютексами может быть невероятно сложным, поэтому многие энтузиасты предпочитают каналы. Однако благодаря системе типов и правилам владения Rust вы не можете ошибиться с получением и освобождением блокировки.

API Mutex<T>

В качестве примера использования мьютекса начнём с использования мьютекса в однопоточном контексте, как показано в листинге 16-12.

Filename: src/main.rs
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {m:?}");
}
Listing 16-12: Изучение API Mutex<T> в однопоточном контексте для простоты

Как и со многими типами, мы создаём Mutex<T> с использованием ассоциированной функции new. Для доступа к данным внутри мьютекса мы используем метод lock для получения блокировки. Этот вызов блокирует текущий поток, чтобы он не мог выполнять никакую работу, пока не наступит наша очередь иметь блокировку.

Вызов lock завершится ошибкой, если другой поток, владеющий блокировкой, перейдёт в состояние паники. В этом случае никто никогда не сможет получить блокировку, поэтому мы решили вызвать unwrap и перевести этот поток в состояние паники, если окажемся в такой ситуации.

После того как мы получили блокировку, мы можем рассматривать возвращаемое значение, названное num в этом случае, как изменяемую ссылку на данные внутри. Система типов гарантирует, что мы получаем блокировку перед использованием значения в m. Тип m — это Mutex<i32>, а не i32, поэтому мы должны вызвать lock, чтобы использовать значение i32. Мы не можем забыть об этом; система типов не позволит нам получить доступ к внутреннему i32 иначе.

Как вы могли предполагать, Mutex<T> — это умный указатель. Точнее, вызов lock возвращает умный указатель под названием MutexGuard, обёрнутый в LockResult, который мы обработали вызовом unwrap. Умный указатель MutexGuard реализует Deref, чтобы указывать на наши внутренние данные; умный указатель также имеет реализацию Drop, которая автоматически освобождает блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области видимости. В результате мы не рискуем забыть освободить блокировку и заблокировать мьютекс от использования другими потоками, потому что освобождение блокировки происходит автоматически.

После освобождения блокировки мы можем вывести значение мьютекса и увидеть, что смогли изменить внутренний i32 на 6.

Общий Mutex<T> между несколькими потоками

Теперь попробуем разделить значение между несколькими потоками с использованием Mutex<T>. Мы запустим 10 потоков и заставим каждый из них увеличить значение счётчика на 1, так что счётчик перейдёт от 0 до 10. Пример в листинге 16-13 не скомпилируется, и мы используем эту ошибку, чтобы узнать больше об использовании Mutex<T> и о том, как Rust помогает нам использовать его правильно.

Filename: src/main.rs
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Listing 16-13: Десять потоков, каждый из которых увеличивает счётчик, защищённый Mutex<T>

Мы создаём переменную counter для хранения i32 внутри Mutex<T>, как в листинге 16-12. Далее мы создаём 10 потоков, перебирая диапазон чисел. Мы используем thread::spawn и даём всем потокам одно и то же замыкание: то, которое перемещает счётчик в поток, получает блокировку на Mutex<T>, вызвав метод lock, а затем добавляет 1 к значению в мьютексе. Когда поток завершает выполнение своего замыкания, num выйдет из области видимости и освободит блокировку, чтобы другой поток мог её получить.

В основном потоке мы собираем все дескрипторы присоединения. Затем, как в листинге 16-2, мы вызываем join для каждого дескриптора, чтобы убедиться, что все потоки завершились. В этот момент основной поток получит блокировку и выведет результат этой программы.

Мы намекнули, что этот пример не скомпилируется. Теперь узнаем почему!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8  |     for _ in 0..10 {
   |     -------------- inside of this loop
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
8  ~     let mut value = counter.lock();
9  ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Сообщение об ошибке гласит, что значение counter было перемещено в предыдущей итерации цикла. Rust говорит нам, что мы не можем переместить владение блокировкой counter в несколько потоков. Давайте исправим ошибку компиляции с помощью метода множественного владения, который мы обсуждали в главе 15.

Множественное владение с несколькими потоками

В главе 15 мы передавали значение нескольким владельцам, используя умный указатель Rc<T> для создания значения с подсчётом ссылок. Давайте сделаем то же самое здесь и посмотрим, что произойдёт. Мы обернём Mutex<T> в Rc<T> в листинге 16-14 и клонируем Rc<T> перед перемещением владения в поток.

Filename: src/main.rs
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Listing 16-14: Попытка использовать Rc<T> для того, чтобы позволить нескольким потокам владеть Mutex<T>

Снова компилируем и получаем… другие ошибки! Компилятор учит нас многому.

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:728:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Ух вы, это сообщение об ошибке очень многословное! Вот важная часть, на которую стоит обратить внимание: `Rc<Mutex<i32>>` не может быть безопасно отправлен между потоками. Компилятор также сообщает причину: типаж `Send` не реализован для `Rc<Mutex<i32>>. Мы поговорим о Send в следующем разделе: это один из типажей, который гарантирует, что типы, которые мы используем с потоками, предназначены для использования в конкурентных ситуациях.

К сожалению, Rc<T> небезопасен для разделения между потоками. Когда Rc<T> управляет подсчётом ссылок, он увеличивает счёт для каждого вызова clone и уменьшает счёт, когда каждый клон уничтожается. Но он не использует примитивы конкурентности, чтобы убедиться, что изменения счёта не могут быть прерваны другим потоком. Это может привести к неверным подсчётам — тонким ошибкам, которые, в свою очередь, могут привести к утечкам памяти или уничтожению значения до того, как мы с ним закончим. Нам нужен тип, который точно такой же, как Rc<T>, но который изменяет подсчёт ссылок потокобезопасным способом.

Атомарный подсчёт ссылок с Arc<T>

К счастью, Arc<T> — это тип, подобный Rc<T>, который безопасно использовать в конкурентных ситуациях. Буква a означает атомарный, то есть это атомарно подсчитываемый по ссылкам тип. Атомарные операции — это дополнительный вид примитива конкурентности, который мы здесь подробно не рассматриваем: см. документацию стандартной библиотеки по std::sync::atomic для более подробной информации. На данный момент вам просто нужно знать, что атомарные операции работают как примитивные типы, но безопасны для разделения между потоками.

Вы можете затем спросить, почему все примитивные типы не атомарны и почему типы стандартной библиотеки не реализованы для использования Arc<T> по умолчанию. Причина в том, что безопасность потоков сопряжена с штрафом производительности, который вы хотите платить только тогда, когда это действительно необходимо. Если вы просто выполняете операции над значениями в одном потоке, ваш код может работать быстрее, если ему не нужно обеспечивать гарантии, которые предоставляют атомарные операции.

Вернёмся к нашему примеру: Arc<T> и Rc<T> имеют одинаковый API, поэтому мы исправляем нашу программу, изменив строку use, вызов new и вызов clone. Код в листинге 16-15 наконец скомпилируется и запустится.

Filename: src/main.rs
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Listing 16-15: Использование Arc<T> для обёртки Mutex<T> с целью возможности разделения владения между несколькими потоками

Этот код выведет следующее:

Result: 10

У нас получилось! Мы посчитали от 0 до 10, что может показаться не очень впечатляющим, но это многое нам рассказало о Mutex<T> и безопасности потоков. Вы также могли бы использовать структуру этой программы для более сложных операций, чем просто увеличение счётчика. Используя эту стратегию, вы можете разделить вычисление на независимые части, распределить эти части по потокам, а затем использовать Mutex<T>, чтобы каждый поток обновлял окончательный результат своей частью.

Обратите внимание, что если вы выполняете простые числовые операции, существуют типы проще, чем Mutex<T>, предоставляемые модулем std::sync::atomic стандартной библиотеки. Эти типы обеспечивают безопасный, конкурентный, атомарный доступ к примитивным типам. Мы выбрали использование Mutex<T> с примитивным типом для этого примера, чтобы сосредоточиться на том, как работает Mutex<T>.

Сходство между RefCell<T>/Rc<T> и Mutex<T>/Arc<T>

Вы могли заметить, что counter является неизменяемым, но мы могли получить изменяемую ссылку на значение внутри него; это означает, что Mutex<T> обеспечивает внутреннюю изменяемость, как и семейство Cell. Так же, как мы использовали RefCell<T> в главе 15, чтобы позволить ourselves изменять содержимое внутри Rc<T>, мы используем Mutex<T> для изменения содержимого внутри Arc<T>.

Ещё один момент, на который стоит обратить внимание: Rust не может защитить вас от всех видов логических ошибок при использовании Mutex<T>. Вспомните из главы 15, что использование Rc<T> несло риск создания циклических ссылок, где два значения Rc<T> ссылаются друг на друга, вызывая утечки памяти. Аналогично, Mutex<T> несёт риск создания взаимной блокировки (deadlock). Они возникают, когда операция должна заблокировать два ресурса, и два потока уже получили по одной из блокировок, заставляя их ждать друг друга вечно. Если вас интересуют взаимные блокировки, попробуйте создать программу на Rust, которая имеет взаимную блокировку; затем изучите стратегии смягчения взаимных блокировок для мьютексов в любом языке и попробуйте реализовать их на Rust. Документация API стандартной библиотеки для Mutex<T> и MutexGuard предлагает полезную информацию.

Мы завершим эту главу, поговорив о типажах Send и Sync и о том, как мы можем использовать их с пользовательскими типами.