Многозадачность

Многозадачность и параллелизм являются невероятно важными проблемами в информатике. Это актуальная тема для современной индустрии. У компьютеров все больше и больше ядер, но многие программисты не готовы в полной мере использовать их.

Средства Rust для безопасной работы с памятью в полной мере применимы и при работе в многозадачной среде. Даже многозадачные программы на Rust должны безопасно работать с памятью, и не создавать состояний гонок по данным. Система типов Rust достаточно мощна, чтобы справиться с этими задачами на этапе компиляции.

Прежде чем мы поговорим об особенностях многозадачности в Rust, важно понять вот что: Rust — достаточно низкоуровневый язык, поэтому вся поддержка многозадачности реализована в стандартной библиотеке, а не в самом языке. Это означает, что если вам не нравится какой-то аспект реализации многозадачности в Rust, вы всегда можете создать альтернативную библиотеку. mio — реально существующий пример такого подхода.

Справочная информация: Send и Sync

Рассуждать о многозадачности довольно трудно. Rust строго статически типизирован, и это помогает нам делать выводы о коде. В связи с этим Rust предоставляет два типажа, помогающих нам разбираться в любом коде, который вообще может быть многозадачным.

Send

Первый типаж, о котором мы будем говорить, называется Send. Когда тип T реализует Send, это указывает компилятору, что владение переменными этого типа можно безопасно перемещать между потоками.

Это важно для соблюдения некоторых ограничений. Например, это имеет значение, когда у нас есть канал, соединяющий два потока, и мы хотим отправлять некоторые данные по каналу из одного потока в другой. Следовательно, мы должны гарантировать, что для отправляемого типа данных реализован типаж Send.

И наоборот, если мы оборачиваем библиотеку чужого кода (FFI), и она не является потокобезопасной, то нам не следует реализовывать типаж Send, и компилятор поможет нам убедиться в невозможности покинуть текущий поток.

Sync

Второй из этих типажей называется Sync. Когда тип T реализует Sync, это указывает компилятору, что использование переменных этого типа не приводит к небезопасной работе с памятью в многопоточной среде.

Например, совместное использование неизменяемых данных с помощью атомарного счетчика ссылок является потокобезопасным. Rust обеспечивает такой тип, Arc<T>, и он реализует Sync, так что при помощи этого типа можно безопасно обмениваться данными между потоками.

Эти два типажа позволяют использовать систему типов, чтобы получить надежные гарантии о свойствах вашего кода в условиях многозадачности. Прежде чем мы покажем, как этого достигнуть, сначала мы должны узнать, как вообще написать многозадачную программу в Rust!

Потоки

Стандартная библиотека Rust предоставляет библиотеку многопоточности, которая позволяет запускать код на Rust параллельно. Вот простой пример использования std::thread:

use std::thread; fn main() { thread::spawn(|| { println!("Hello from a thread!"); }); }
use std::thread;

fn main() {
    thread::spawn(|| {
        println!("Hello from a thread!");
    });
}

Метод thread::spawn() в качестве единственного аргумента принимает замыкание, которое выполняется в новом потоке. Он возвращает дескриптор потока, который используется для ожидания завершения этого потока и извлечения его результата:

use std::thread; fn main() { let handle = thread::spawn(|| { "Hello from a thread!" }); println!("{}", handle.join().unwrap()); }
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        "Hello from a thread!"
    });

    println!("{}", handle.join().unwrap());
}

Многие языки имеют возможность выполнять потоки, но это дико опасно. Есть целые книги о том, как избежать ошибок, которые происходят от совместного использования изменяемого состояния. В Rust снова помогает система типов, которая предотвращает гонки данных на этапе компиляции. Давайте поговорим о том, как же на самом деле обеспечивается совместное использование чего-либо в условиях нескольких потоков.

Безопасное совместное использование изменяемого состояния

Вчитайтесь: «безопасное совместное использование изменяемого состояния». Похоже на ложь, не так ли? Многие программисты считают, что организовать многопоточную работу с изменяемым состоянием очень сложно и почти невозможно. Но благодаря системе типов Rust, это всё же правда — безопасно работать с изменяемыми данными можно.

Кто-то однажды сказал это:

Совместно используемое изменяемое состояние является корнем всех зол. Большинство языков пытаются решить эту проблему через часть, отвечающую за «изменяемое», но Rust решает ее через часть, отвечающую за «совместно используемое».

Та же самая система владения, которая помогает предотвратить неправильное использование указателей, также помогает исключить гонки по данным, один из худших видов ошибок многозадачности.

В качестве примера приведем программу на Rust, которая входила бы в состояние гонки по данным на многих языках. На Rust она не скомпилируется:

use std::thread; fn main() { let mut data = vec![1u32, 2, 3]; for i in 0..3 { thread::spawn(move || { data[i] += 1; }); } thread::sleep_ms(50); }
use std::thread;

fn main() {
    let mut data = vec![1u32, 2, 3];

    for i in 0..3 {
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep_ms(50);
}

Она выдает ошибку:

8:17 error: capture of moved value: `data`
        data[i] += 1;
        ^~~~

В данном случае мы знаем, что наш код должен быть безопасным, но Rust в этом не уверен. И, на самом деле, он не является безопасным: мы работаем с data в каждом потоке. При этом, поток становится владельцем того, что он получает как часть окружения замыкания. А это значит, что у нас есть три владельца! Это плохо. Мы можем исправить это с помощью типа Arc<T>, который является атомарным указателем со счетчиком ссылок. «Атомарный» означает, что им безопасно обмениваться между потоками.

Чтобы гарантировать, что его можно безопасно использовать из нескольких потоков, Arc<T> предполагает наличие еще одного свойства у вложенного типа. Он предполагает, что T реализует типаж Sync. В нашем случае мы также хотим, чтобы была возможность изменять вложенное значение. Нам нужен тип, который может обеспечить изменение своего содержимого лишь одним пользователем одновременно. Для этого мы можем использовать тип Mutex<T>. Вот вторая версия нашего кода. Она по-прежнему не работает, но по другой причине:

use std::thread; use std::sync::Mutex; fn main() { let mut data = Mutex::new(vec![1u32, 2, 3]); for i in 0..3 { let data = data.lock().unwrap(); thread::spawn(move || { data[i] += 1; }); } thread::sleep_ms(50); }
use std::thread;
use std::sync::Mutex;

fn main() {
    let mut data = Mutex::new(vec![1u32, 2, 3]);

    for i in 0..3 {
        let data = data.lock().unwrap();
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep_ms(50);
}

Вот ошибка:

<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~

Вы можете видеть, что Mutex содержит метод lock, который имеет следующую сигнатуру:

fn main() { fn lock(&self) -> LockResult<MutexGuard<T>> }
fn lock(&self) -> LockResult<MutexGuard<T>>

Так как типаж Send не был реализован для MutexGuard<T>, мы не можем перемещать охранное значение мьютекса через границы потоков, что и сказано в сообщении об ошибке.

Мы можем использовать Arc<T>, чтобы исправить это. Вот рабочая версия:

use std::sync::{Arc, Mutex}; use std::thread; fn main() { let data = Arc::new(Mutex::new(vec![1u32, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep_ms(50); }
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));

    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            data[i] += 1;
        });
    }

    thread::sleep_ms(50);
}

Теперь мы вызываем clone() для нашего Arc, что увеличивает внутренний счетчик. Затем полученная ссылка перемещается в новый поток. Давайте более подробно рассмотрим тело потока:

use std::sync::{Arc, Mutex}; use std::thread; fn main() { let data = Arc::new(Mutex::new(vec![1u32, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep_ms(50); }
thread::spawn(move || {
    let mut data = data.lock().unwrap();
    data[i] += 1;
});

Во-первых, мы вызываем метод lock(), который захватывает блокировку мьютекса. Так как вызов данного метода может потерпеть неудачу, он возвращает Result<T, E>, но, поскольку это просто пример, мы используем unwrap(), чтобы получить ссылку на данные. Реальный код должен иметь более надежную обработку ошибок в такой ситуации. После этого мы свободно изменяем данные, так как у нас есть блокировка.

Под конец мы ждём какое-то время, пока потоки отработают. Это не идеальный способ дождаться окончания их работы: возможно, мы выбрали разумное время ожидания но, скорее всего, мы будем ждать либо больше чем нужно, либо меньше чем нужно, в зависимости от того, сколько на самом деле времени потребуется потокам, чтобы закончить вычисления.

Есть более точные способы синхронизации потоков, и несколько из них реализовано в стандартной библиотеке Rust. Давайте поговорим об одном из них: каналах.

Каналы

Вот версия нашего кода, которая использует для синхронизации каналы, вместо того, чтобы ждать в течение определенного времени:

use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc; fn main() { let data = Arc::new(Mutex::new(0u32)); let (tx, rx) = mpsc::channel(); for _ in 0..10 { let (data, tx) = (data.clone(), tx.clone()); thread::spawn(move || { let mut data = data.lock().unwrap(); *data += 1; tx.send(()); }); } for _ in 0..10 { rx.recv(); } }
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0u32));

    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            tx.send(());
        });
    }

    for _ in 0..10 {
        rx.recv();
    }
}

Мы используем метод mpsc::channel(), чтобы создать новый канал. В этом примере мы в каждом из десяти потоков вызываем метод send, который передает по каналу пустой кортеж (), а затем в главном потоке ждем, пока не будут приняты все десять значений.

Хотя по этому каналу посылается просто сигнал (пустой кортеж () не несёт никаких данных), в общем случае мы можем отправить по каналу любое значение, которое реализует типаж Send!

use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); for _ in 0..10 { let tx = tx.clone(); thread::spawn(move || { let answer = 42u32; tx.send(answer); }); } rx.recv().ok().expect("Could not receive answer"); }
use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = 42u32;

            tx.send(answer);
        });
    }

   rx.recv().ok().expect("Could not receive answer");
}

u32 реализует Send, потому что мы можем сделать копию. Итак, создаётся поток, в котором вычисляется ответ, а затем этот ответ с помощью метода send() передаётся обратно по каналу.

Паника

panic! аварийно завершает выполняемый в данный момент поток. Вы можете использовать потоки Rust как простой механизм изоляции:

fn main() { use std::thread; let result = thread::spawn(move || { panic!("oops!"); }).join(); assert!(result.is_err()); }
use std::thread;

let result = thread::spawn(move || {
    panic!("oops!");
}).join();

assert!(result.is_err());

Используемый в коде выше метод join() структуры Thread возвращает Result, что позволяет нам проверить, паниковал ли поток, или он завершился нормально.