使用消息传递在线程之间传输数据

确保安全并发的一种越来越流行的方法是消息 passing,其中线程或 Actor 通过相互发送消息进行通信 包含数据。这是 Go 语言中的口号中的想法 文档: “不要通过共享内存来交流;相反,通过通信来共享内存。

为了实现消息发送并发,Rust 的标准库提供了一个 通道的实现。通道是一个通用的编程概念,由 哪些数据从一个线程发送到另一个线程。

你可以把编程中的通道想象成一个定向通道 水,例如溪流或河流。如果你放一个类似橡皮鸭的东西 进入河流,它将顺流而下到达水道的尽头。

通道有两半:发射器和接收器。发射器的一半是 将橡皮鸭放入河中的上游位置,以及 接收器的一半是橡皮鸭子最终到达下游的地方。您的一部分 code 使用您要发送的数据调用 transmitter 上的方法,以及 另一部分检查接收端是否有到达的消息。表示一个频道 如果发射器或接收器的一半被丢弃,则关闭

在这里,我们将创建一个程序,该程序具有一个线程来生成值和 将它们发送到一个通道,另一个线程将接收值和 打印出来。我们将使用通道在线程之间发送简单值 来说明该功能。熟悉该技术后,您可以 将 Channel 用于需要相互通信的任何线程,例如 作为聊天系统或许多线程执行部分计算的系统 并将各部分发送到一个聚合结果的线程。

首先,在示例 16-6 中,我们将创建一个 channel,但不对它做任何事情。 请注意,这还不会编译,因为 Rust 无法判断我们 想要通过通道发送。

文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

示例 16-6:创建一个 channel 并分配两个 halfves 设置为txrx

我们使用mpsc::channel功能;mpsc代表多个生产者、单个消费者。简而言之,Rust 的标准库 implements channels 意味着一个 channel 可以有多个发送端,这些 生成值,但只有一个接收端使用这些值。想象 多条溪流汇成一条大河:一切顺流而下 的溪流最终会汇入一条河流。我们将从一个 producer 的 Producer 创建,但当我们获得此示例时,我们将添加多个 producer 加工。

mpsc::channelfunction 返回一个元组,其第一个元素是 发送端 — 发送端 — 第二个元素是接收端 — 该 接收器。缩写txrx传统上应用于许多领域 对于发射器接收器,因此我们这样命名我们的变量 以指示每个端点。我们使用的是let语句,其模式为 解构 Tuples;我们将在let语句 和解构。现在,要知道使用let陈述 这种方式是提取返回的 Tuples 片段的便捷方法 由mpsc::channel.

让我们将传输端移动到一个生成的线程中,并让它发送一个 string,以便生成的线程与主线程通信,如 示例 16-7.这就像在上游的河里放一只橡皮鸭子或 将聊天消息从一个线程发送到另一个线程。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

示例 16-7: 移动tx发送到生成的线程,并发送 “嗨”

同样,我们正在使用thread::spawn创建新线程,然后使用move移动tx放入闭包中,因此生成的线程拥有tx.生成的 thread 需要拥有发送器才能通过 渠道。发射器有一个send方法,该方法获取我们想要的值 发送。这sendmethod 返回一个Result<T, E>type,因此如果接收器具有 已经被丢弃,并且没有地方可以发送值,则 send作 将返回错误。在此示例中,我们将调用unwrap以防万一 的 error。但在实际应用程序中,我们会正确处理它:return 到 第 9 章回顾正确处理错误的策略。

在示例 16-8 中,我们将从主线程中的接收器获取值。这 就像从河尽头的水中捞出橡皮鸭,或者 接收聊天消息。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

示例 16-8:在主线程中接收值 “hi” 并打印它

接收器有两种有用的方法:recvtry_recv.我们正在使用recv, receive 的缩写,它将阻塞主线程的执行并等待 直到沿通道发送值。发送值后,recv将 将其返回为Result<T, E>.当发射器关闭时,recv将返回 一个 error 来表示不会有更多值。

try_recv方法不会阻塞,而是返回一个Result<T, E>立即:一个Ok值(如果有)来保存消息,并且Err值(如果这次没有任何消息)。用try_recv在以下情况下很有用 这个线程在等待消息时还有其他工作要做:我们可以编写一个 循环,该循环调用try_recv每隔一段时间,如果 available,否则会执行其他工作一段时间,直到选中 再。

我们使用了recv在此示例中,为简单起见;我们没有任何其他工作 对于主线程来说,除了等待消息之外,所以阻塞主线程 thread 是合适的。

当我们运行示例 16-8 中的代码时,我们会看到从 main 线:

Got: hi

完善!

渠道和所有权转移

所有权规则在消息发送中起着至关重要的作用,因为它们可以帮助您 编写安全的并发代码。防止并发编程中的错误是 在整个 Rust 程序中考虑所有权的优势。让我们开始吧 一个实验,用于展示频道和所有权如何协同工作来防止 问题:我们将尝试使用valvalue 把它送到了 Channel 上。尝试编译示例 16-9 中的代码,看看为什么 不允许使用此代码:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

示例 16-9:尝试使用val在我们发送之后 沿着英吉利海峡

在这里,我们尝试打印val在我们通过tx.send. 允许这将是一个坏主意:一旦该值已发送到另一个值 thread 的 Thread 中,该线程可以在我们尝试使用值 再。其他线程的修改可能会导致错误或 由于数据不一致或不存在而导致的意外结果。但是,Rust 提供 如果我们尝试编译示例 16-9 中的代码,则会出现错误:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

我们的并发错误导致了编译时错误。这send功能 获取其参数的所有权,当值被移动时,接收器 拥有它的所有权。这可以防止我们再次意外地使用 value 发送后;所有权系统检查一切正常。

发送多个值并看到接收方正在等待

示例 16-8 中的代码编译并运行,但它没有清楚地告诉我们 两个独立的线程通过频道相互通信。在列表中 16-10 我们做了一些修改,以证明示例 16-8 中的代码是 同时运行:生成的线程现在将发送多条消息,并且 在每条消息之间暂停一秒钟。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

示例 16-10:发送多条消息并暂停 在每个

这一次,生成的线程有一个我们想要发送到的字符串向量 主线程。我们迭代它们,单独发送每个,然后暂停 通过调用thread::sleep函数替换为Duration的值 1 秒。

在主线程中,我们不会调用recv函数: 相反,我们正在处理rx作为迭代器。对于收到的每个值,我们是 打印它。当通道关闭时,迭代将结束。

运行示例 16-10 中的代码时,您应该会看到以下输出 每行之间有 1 秒的停顿:

Got: hi
Got: from
Got: the
Got: thread

因为我们没有任何代码会在for循环中 main thread 中,我们可以看出主线程正在等待接收来自 生成的线程。

通过克隆发射机创建多个生产者

前面我们提到过mpsc是 Multiple Producer 的首字母缩写词, 单一消费者。让我们把mpsc使用和扩展示例 16-10 中的代码 创建多个线程,这些线程都向同一个接收器发送值。我们能做到 所以通过克隆发射器,如示例 16-11 所示:

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}

示例 16-11:从多个发送多条消息 生产者

这一次,在我们创建第一个生成的线程之前,我们调用clone在 发射机。这将为我们提供一个新的发射器,我们可以传递给第一个 spawned thread 的 Thread 中。我们将原始 transmitter 传递给第二个生成的线程。 这为我们提供了两个线程,每个线程向一个接收器发送不同的消息。

当您运行代码时,您的输出应如下所示:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

您可能会按其他顺序看到值,具体取决于您的系统。这是 是什么让并发既有趣又困难。如果你试验了thread::sleep,在不同的线程中为其提供不同的值,每次运行 将更加不确定,并且每次都会创建不同的输出。

现在我们已经了解了渠道的工作原理,让我们看看 并发。

本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准