将我们的单线程服务器转变为多线程服务器

现在,服务器将依次处理每个请求,这意味着它不会 处理第二个连接,直到第一个连接完成处理。如果 server 收到越来越多的请求,则此串行执行会更少,并且 不太理想。如果服务器收到请求,需要很长时间才能 进程,后续请求将不得不等待,直到长请求 已完成,即使新请求可以快速处理。我们需要修复 但首先,我们要看看实际问题。

在当前 Server 实现中模拟慢速请求

我们将了解处理缓慢的请求如何影响向 我们当前的 Server 实现。示例 20-10 实现了处理请求 更改为 /sleep,并模拟缓慢响应,这将导致服务器进入睡眠状态 5 秒后才做出回应。

文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

示例 20-10:通过休眠 来模拟慢速请求 5 秒

我们从ifmatch现在我们有三个案例。我们需要 显式匹配request_line以模式匹配 字符串文本值;match不执行自动引用,并且 dereferencing 就像 Equality 方法一样。

第一个臂与if块。第二臂 匹配对 /sleep 的请求。收到该请求后,服务器将 休眠 5 秒,然后渲染成功的 HTML 页面。第三个分支是 与else块。

你可以看到我们的服务器是多么原始:真正的库会处理 以不那么冗长的方式识别多个请求!

使用 启动 服务器cargo run.然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果 像以前一样,您输入 / URI 几次,您将看到它快速响应。 但是,如果你输入 /sleep,然后加载 /,你会看到 / 会一直等到sleep在加载前睡了整整 5 秒。

我们可以使用多种技术来避免请求备份 一个缓慢的请求;我们将实现的是线程池。

使用线程池提高吞吐量

线程池是一组正在等待并准备好的衍生线程 处理任务。当程序收到新任务时,它会分配一个 线程添加到任务池中,该线程将处理该任务。这 池中的剩余线程可用于处理出现的任何其他任务 in 中。当第一个线程完成时 处理其任务时,它被返回到空闲线程池,准备处理 新任务。线程池允许您并发处理连接, 增加服务器的吞吐量。

我们将池中的线程数限制为较小的数字以保护我们 拒绝服务 (DoS) 攻击;如果我们让我们的程序创建一个新线程 对于每个收到的请求,有人向我们的 服务器可能会耗尽我们服务器的所有资源并磨练 停止处理请求。

因此,我们将拥有固定数量的 在池中等待的线程。传入的请求将发送到池 加工。池将维护传入请求的队列。每个 池中的线程将从这个队列中弹出一个请求,处理该请求, ,然后向队列请求另一个请求。有了这个设计,我们可以处理 自Nrequests 并发,其中N是线程数。如果每个 线程正在响应长时间运行的请求,后续请求仍可以 备份到队列中,但我们增加了长时间运行的请求的数量 我们可以在达到那个点之前处理。

这种技术只是提高 Web 吞吐量的众多方法之一 服务器。您可以探索的其他选项包括 fork/join 模型单线程异步 I/O 模型多线程异步 I/O 模型。如果 您对此主题感兴趣,您可以阅读有关其他解决方案和 尝试实施它们;对于像 Rust 这样的低级语言,所有这些 选项是可能的。

在我们开始实现线程池之前,我们先谈谈如何使用 pool 应该看起来像 .当您尝试设计代码时,编写客户端 Interface First 可以帮助指导您的设计。编写代码的 API,使其 以您想要的方式构建;然后实现该功能 而不是实现功能,然后 设计公共 API。

类似于我们在第 12 章项目中使用测试驱动开发的方式, 我们将在此处使用编译器驱动的开发。我们将编写调用 函数,然后我们将查看编译器中的错误以确定 我们接下来应该更改什么才能让代码工作。但是,在我们这样做之前, 我们将探索不打算用作起点的技术。

为每个请求生成一个线程

首先,让我们来探讨一下,如果代码确实为 每一个连接。如前所述,这不是我们的最终计划,因为 问题,但这是一个 首先获取正常工作的多线程服务器的起点。然后,我们将添加 thread pool 作为一种改进,对比这两种解决方案将是 容易。示例 20-11 显示了要对main生成新线程 要处理for圈。

文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

示例 20-11:为每个 流

正如您在第 16 章中学到的,thread::spawn将创建一个新线程,然后 在新线程中运行 Closure 中的代码。如果你运行这段代码并在浏览器中加载 /sleep,然后在另外两个浏览器选项卡中加载 /sleep,你确实会看到 对 / 的请求不必等待 /sleep 完成。但是,由于 我们提到,这最终会压垮系统,因为你会让 没有任何限制的新线程。

创建有限数量的线程

我们希望我们的线程池以类似、熟悉的方式工作,因此从 线程池不需要对使用 我们的 API。示例 20-12 显示了ThreadPoolstruct 而不是thread::spawn.

文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

示例 20-12:我们的理想ThreadPool接口

我们使用ThreadPool::new创建具有可配置编号的新线程池 线程,在本例中为 4 个。然后,在forpool.execute具有 与thread::spawn因为它需要一个关闭,池应该 run 的我们需要实施pool.execute所以它需要 closure 并将其提供给池中的线程运行。此代码尚未 compile,但我们会尝试让编译器指导我们如何修复它。

建筑ThreadPool使用 Compiler Driven Development

将示例 20-12 中的 src/main.rs 修改,然后让我们使用 编译器错误cargo check以推动我们的发展。这是第一个 我们得到的错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

伟大!这个错误告诉我们需要一个ThreadPooltype 或 module,因此我们将 立即构建一个。我们ThreadPoolimplementation 将独立于 kind 我们的 Web 服务器正在做的工作。那么,让我们切换一下hellocrate 来自 binary crate 复制到库 crate 中,以保存我们的ThreadPool实现。后 我们改成一个库 crate,我们也可以使用单独的线程池 库来执行我们想要使用线程池做的任何工作,而不仅仅是用于服务 Web 请求。

创建一个包含以下内容的 src/lib.rs,这是最简单的 a 的定义ThreadPoolstruct 的 Structure:

文件名: src/lib.rs

pub struct ThreadPool;

然后编辑 main.rs 文件以引入ThreadPoolinto 范围 crate,将以下代码添加到 src/main.rs 的顶部:

文件名: src/main.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

这段代码仍然不起作用,但让我们再次检查它以获取下一个错误 我们需要解决:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

此错误表示接下来我们需要创建一个名为newThreadPool.我们也知道new需要有一个参数 可以接受4作为参数,并且应该返回一个ThreadPool实例。 让我们实现最简单的new函数将具有这些 特性:

文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

我们选择了usize作为size参数,因为我们知道 负数线程没有任何意义。我们也知道我们将使用它 4 作为线程集合中的元素数,即usizetype 用于,如第 3 章的 “整数类型” 部分所述。

让我们再次检查代码:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

现在出现错误是因为我们没有executemethod 开启ThreadPool. 回想一下“创建有限数量的 Threads“部分,我们 决定我们的线程池应该有一个类似于thread::spawn.在 此外,我们将实现execute函数,因此它接受它的 given,并将其提供给池中的空闲线程运行。

我们将定义executemethod 开启ThreadPool将闭包作为 参数。从“将捕获的值移出 Closure 和Fn性状”第 13 章中我们可以采取的 closure 作为具有三个不同特征的参数:Fn,FnMutFnOnce.我们需要决定在这里使用哪种 closure 类型。我们知道我们会 最终会做一些类似于 Standard Library 的作thread::spawnimplementation 的 Signature 的 Signature 的边界,这样我们就可以看看thread::spawnhas 的参数。该文档向我们展示了以下内容:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Ftype 参数是我们在这里关注的那个;这T类型 parameter 与返回值相关,我们不关心这一点。我们 可以看到spawn使用FnOnce作为 trait bound onF.这可能是 我们想要的,因为我们最终会传递我们得到的参数executespawn.我们可以进一步确信FnOnce是我们的特点 want to use,因为用于运行请求的线程只会执行该 request 的闭包,它与OnceFnOnce.

Ftype 参数也具有 trait 绑定Send和生命周期绑定'static,这在我们的情况中很有用:我们需要Send要传输 从一个线程到另一个线程的 closure 和'static因为我们不知道多久 线程将执行。让我们创建一个executemethod 开启ThreadPool,它将采用F具有以下边界:

文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

我们仍然使用 after()FnOnce因为这个FnOnce表示一个闭包 ,它不带任何参数并返回 Unit 类型 。就像功能一样 定义,则可以从签名中省略返回类型,但即使我们 没有参数,我们仍然需要括号。()

同样,这是execute方法:确实 什么都没有,但我们只是尝试让我们的代码编译。让我们再检查一次:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

它编译!但请注意,如果您尝试cargo run并在 browser,您将在浏览器中看到我们在 章节。我们的库实际上并没有调用传递给execute还!

注意:您可能听说过使用严格编译器的语言,例如 Haskell 和 Rust 的意思是“如果代码编译,它就可以工作”。但这种说法并非如此 普遍正确。我们的项目可以编译,但它绝对不做任何事情!如果我们 正在构建一个真实、完整的项目,这将是一个开始的好时机 编写单元测试来检查代码是否可编译具有 要。

验证 中的 线程数new

我们没有对参数执行任何作newexecute.让我们 使用我们想要的行为实现这些函数的主体。首先, 让我们考虑一下new.之前,我们为size参数,因为线程数为负数的池没有意义。 但是,线程为零的池也没有意义,但 Zero 是完美的 有效usize.我们将添加代码来检查size之前大于零 我们返回一个ThreadPool实例,如果程序收到 zero 使用assert!宏,如示例 20-13 所示。

文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

示例 20-13:实现ThreadPool::new如果出现 panicsize为零

我们还为ThreadPool带有 doc 注释。 请注意,我们遵循了良好的文档实践,添加了一个部分 调用函数可能 panic 的情况,如 第 14 章.尝试运行cargo doc --open并单击ThreadPool结构 查看生成的 docs 的用途new肖!

而不是添加assert!宏,我们可以更改newbuild并返回一个Result就像我们对Config::build在 I/O 中 project 中的实例。但是我们决定,在本例中,尝试创建一个 没有任何线程的线程池应该是不可恢复的错误。如果你是 雄心勃勃,尝试编写一个名为build替换为 签名与new功能:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

创建空间以存储线程

现在我们有了一种方法来知道我们有有效数量的线程要存储 池中,我们可以创建这些线程并将它们存储在ThreadPool结构 在返回结构体之前。但是我们如何 “存储” 线程呢?让我们再来看一个 看看thread::spawn签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

spawn函数返回一个JoinHandle<T>哪里T是 closure 返回。让我们尝试使用JoinHandle也,看看会发生什么。在我们的 case 中,我们传递给线程池的闭包将处理 Connection 并且不返回任何内容,因此T将为 单位类型 。()

示例 20-14 中的代码可以编译,但还没有创建任何线程。 我们更改了ThreadPool来保存thread::JoinHandle<()>实例,将 vector 初始化为size,设置for循环,它将运行一些代码来创建线程,以及 返回了ThreadPool实例。

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

示例 20-14:为 创建 vectorThreadPool按住 线程

我们带来了std::thread到 library crate 中,因为我们是 用thread::JoinHandle作为向量中项目的类型ThreadPool.

收到有效大小后,我们的ThreadPool创建一个新的 vector 拿size项目。这with_capacity函数执行与Vec::new但有一个重要的区别:它在 向量。因为我们知道我们需要存储size元素中,执行 这种预先分配比使用Vec::new, 它会在插入元素时调整自身大小。

当您运行cargo check同样,它应该会成功。

一个Worker负责从ThreadPool到线程

我们在for循环中关于 线程。在这里,我们将看看我们实际上是如何创建线程的。标准 library 提供thread::spawn作为创建线程的一种方式,以及thread::spawn期望获取一些代码,线程应该在 线程。但是,在我们的例子中,我们想要创建线程并拥有 它们等待我们稍后发送的代码。标准库的 threads 的实现不包括任何执行此作的方法;我们必须 手动实现它。

我们将通过在ThreadPool以及将管理此新行为的线程。我们将调用 this data structure Worker,这是池化中的常用术语 实现。Worker 拾取需要运行的代码,并运行 code 在 Worker 的线程中。想想在 restaurant:工作人员等待客户收到订单,然后 他们负责接受这些订单并完成它们。

而不是存储JoinHandle<()>线程池中的实例, 我们将存储Worker结构。每Worker将存储单个JoinHandle<()>实例。然后,我们将在Worker那会 获取要运行的代码的闭包,并将其发送到已在运行的线程 执行。我们还将为每个 worker 提供一个id因此我们可以区分 日志记录或调试时池中的不同工作程序。

以下是我们创建ThreadPool.我们将 实现将闭包发送到线程的代码Worker按以下方式设置:

  1. 定义一个Worker结构体,该结构包含一个id以及JoinHandle<()>.
  2. 改变ThreadPool来保存Worker实例。
  3. 定义一个Worker::new函数,该函数采用idnumber 并返回一个Worker实例,该实例保存id以及一个由空的 关闭。
  4. ThreadPool::new,请使用for循环计数器生成id创造 一个新的Worker有了那个id,并将 worker 存储在 vector 中。

如果您准备好迎接挑战,请尝试在此之前自行实施这些更改 看看示例 20-15 中的代码。

准备?这是示例 20-15,其中一种方法可以进行上述修改。

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

示例 20-15:修改ThreadPool按住Worker实例,而不是直接持有线程

我们更改了 上的字段名称ThreadPoolthreadsworkers因为它现在持有Worker实例而不是JoinHandle<()>实例。我们在forloop 作为参数传递给Worker::new,并将每个新的Worker在名为workers.

外部代码(如我们在 src/main.rs 中的服务器)不需要知道 有关使用Workerstruct 中的ThreadPool, 因此,我们制作了Workerstruct 及其newfunction private 函数。这Worker::new函数使用id我们给它并存储一个JoinHandle<()>实例,该实例是通过使用空闭包生成新线程创建的。

注意:如果作系统无法创建线程,因为没有 足够的系统资源,thread::spawn会恐慌。这将导致我们的 整个服务器都会导致 panic,即使创建某些线程可能会 成功。为了简单起见,这种行为很好,但在 Production 中 thread pool 实现,您可能希望使用std::thread::Builder及其spawn方法,该方法返回Result相反。

此代码将编译并存储Worker实例 指定为ThreadPool::new.但我们仍然没有处理 我们得到的闭包execute.让我们看看接下来如何做到这一点。

通过通道向线程发送请求

我们要解决的下一个问题是,给thread::spawn做 绝对没有。目前,我们得到了我们想要在execute方法。但我们需要付出thread::spawn一个 Closure 来运行。 创建每个Worker在创建ThreadPool.

我们希望Worker我们刚刚创建的结构体,用于获取要从中运行的代码 在ThreadPool并将该代码发送到其线程以运行。

我们在第 16 章中学到的通道 — 一种简单的通信方式 两个线程 - 将非常适合此使用案例。我们将使用一个 channel 来运作 作为作业队列,将execute将从ThreadPool自 这Worker实例,这会将作业发送到其线程。这是计划:

  1. ThreadPool将创建一个频道并保留 Sender。
  2. Worker将紧紧抓住接收器。
  3. 我们将创建一个新的Jobstruct 来保存我们想要发送的闭包 沿着海峡而下。
  4. execute方法将发送它想要执行的作业,通过 寄件人。
  5. 在其线程中,Worker将遍历其接收器并执行 关闭它收到的任何 job。

让我们从 在ThreadPool::new并按住发件人 在ThreadPool实例,如示例 20-16 所示。这Job结构 目前不包含任何内容,但将是我们将发送的物品类型 频道。

文件名: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

示例 20-16:修改ThreadPool要存储 发送Job实例

ThreadPool::new,我们创建新通道并让池保存 寄件人。这将成功编译。

让我们尝试将通道的接收器作为线程池传递给每个 worker 创建频道。我们知道我们想在线程中使用接收器, worker 生成,因此我们将引用receiver参数。这 示例 20-17 中的代码还不能完全编译。

文件名: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-17:将接收器传递给 worker

我们做了一些小而直接的更改:我们将接收器传递给Worker::new,然后在闭包中使用它。

当我们尝试检查此代码时,我们收到以下错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

代码正在尝试传递receiver到多个Worker实例。这 不起作用,因为您会记得第 16 章:通道实现 Rust 提供多个生产者、单个消费者。这意味着我们不能 只需克隆通道的消费端即可修复此代码。我们也不 希望向多个使用者多次发送消息;我们想要一个列表 的消息,以便每条消息都得到处理一次。

此外,从通道队列中移除作业涉及更改receiver,因此线程需要一种安全的方式来共享和修改receiver; 否则,我们可能会得到 race conditions (如 Chapter 16 所述)。

回想一下第 16 章:共享中讨论的线程安全智能指针 所有权,并允许线程更改值,则 需要使用Arc<Mutex<T>>.这Arc类型将允许多个 worker 拥有 receiver 和Mutex将确保只有一个 worker 从 接收器。示例 20-18 显示了我们需要做的更改。

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-18:在 worker 之间共享接收器 用ArcMutex

ThreadPool::new,我们将接收器放在Arc以及Mutex.对于每个 new worker 中,我们克隆Arc以增加引用计数,以便 worker 可以 接管人的股份所有权。

通过这些更改,代码将编译!我们快到了!

实施execute方法

最后,我们来实现executemethod 开启ThreadPool.我们还将更改Job从结构体更改为 trait 对象的类型别名,该对象持有 closure 的execute接收。如“创建类型同义词 with Type Aliases“部分,类型别名允许我们缩短 易于使用。请看示例 20-19。

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-19:创建一个Jobtype 别名Box保存每个闭包,然后将作业发送到通道

创建新的Job实例使用我们得到的execute我们 将该作业发送到通道的发送端。我们正在调用unwrapsend对于发送失败的情况。例如,如果我们 停止执行所有线程,这意味着接收端已停止 接收新消息。目前,我们无法阻止我们的线程 执行:只要池存在,我们的线程就会继续执行。这 我们使用的原因unwrap是我们知道失败的情况不会发生,但是 编译器不知道这一点。

但我们还没有完全完成!在 worker 中,我们的 closure 被传递给thread::spawnstill 仅引用 channel 的接收端。 相反,我们需要 Closure 永远循环,要求 channel 的 channel,并在获取 job 时运行 Job。让我们做出改变 如示例 20-20 所示到Worker::new.

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

示例 20-20:接收和执行 worker 的线程

在这里,我们首先调用lockreceiver获取互斥锁,然后我们 叫unwrap对任何错误产生 panic。如果互斥锁 处于中毒状态,如果其他线程在 持有锁而不是释放锁。在这种情况下,调用unwrap让这个线程 panic 是正确的作。随意 更改此项unwrap更改为expect带有一条对 你。

如果我们获得互斥锁,则调用recv以接收Job从 渠道。决赛unwrap也移过此处可能发生的任何错误 如果保存发送方的线程已关闭,类似于sendmethod 返回Err如果接收器关闭。

recv块,因此如果还没有 job,则当前线程将 等待作业可用。这Mutex<T>确保只有一个Workerthread 正在尝试请求作业。

我们的线程池现在处于工作状态!给它一个cargo run并制作一些 请求:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: `hello` (lib) generated 3 warnings
    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功!现在,我们有一个异步执行连接的线程池。 创建的线程永远不会超过四个,因此我们的系统不会获得 如果服务器收到大量请求,则为 overloaded。如果我们向 /sleep 发出请求,服务器将能够通过让另一个 thread 运行它们。

注意:如果您同时在多个浏览器窗口中打开 /sleep,则它们 可能会以 5 秒的间隔一次加载一个。某些 Web 浏览器执行 出于缓存原因,按顺序执行同一请求的多个实例。这 限制不是由我们的 Web 服务器引起的。

了解了while let循环中,你可能想知道 为什么我们没有编写示例 20-21 所示的 worker 线程代码。

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

示例 20-21:的另一种实现Worker::newwhile let

此代码可编译并运行,但不会产生所需的线程 行为:慢速请求仍会导致其他请求等待 处理。原因有点微妙:Mutexstruct 没有 publicunlock方法,因为锁的所有权基于 这MutexGuard<T>LockResult<MutexGuard<T>>lockmethod 返回。在编译时,借用检查器可以强制执行该规则 由Mutex无法访问,除非我们持有 锁。但是,此实现也可能导致锁被持有 如果我们不注意MutexGuard<T>.

示例 20-20 中使用let job = receiver.lock().unwrap().recv().unwrap();有效,因为使用let任何 表达式中使用的 Temporary 值位于 equals 的右侧 符号会立即丢弃let语句结束。然而while let(以及if letmatch) 在 关联的块。在示例 20-21 中,锁在 continue continue 中保持 对job(),这意味着其他工作人员无法接收作业。

本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准