将我们的单线程服务器转变为多线程服务器
现在,服务器将依次处理每个请求,这意味着它不会 处理第二个连接,直到第一个连接完成处理。如果 server 收到越来越多的请求,则此串行执行会更少,并且 不太理想。如果服务器收到请求,需要很长时间才能 进程,后续请求将不得不等待,直到长请求 已完成,即使新请求可以快速处理。我们需要修复 但首先,我们要看看实际问题。
在当前 Server 实现中模拟慢速请求
我们将了解处理缓慢的请求如何影响向 我们当前的 Server 实现。示例 20-10 实现了处理请求 更改为 /sleep,并模拟缓慢响应,这将导致服务器进入睡眠状态 5 秒后才做出回应。
文件名: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
示例 20-10:通过休眠 来模拟慢速请求 5 秒
我们从if
自match
现在我们有三个案例。我们需要
显式匹配request_line
以模式匹配
字符串文本值;match
不执行自动引用,并且
dereferencing 就像 Equality 方法一样。
第一个臂与if
块。第二臂
匹配对 /sleep 的请求。收到该请求后,服务器将
休眠 5 秒,然后渲染成功的 HTML 页面。第三个分支是
与else
块。
你可以看到我们的服务器是多么原始:真正的库会处理 以不那么冗长的方式识别多个请求!
使用 启动 服务器cargo run
.然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果
像以前一样,您输入 / URI 几次,您将看到它快速响应。
但是,如果你输入 /sleep,然后加载 /,你会看到 / 会一直等到sleep
在加载前睡了整整 5 秒。
我们可以使用多种技术来避免请求备份 一个缓慢的请求;我们将实现的是线程池。
使用线程池提高吞吐量
线程池是一组正在等待并准备好的衍生线程 处理任务。当程序收到新任务时,它会分配一个 线程添加到任务池中,该线程将处理该任务。这 池中的剩余线程可用于处理出现的任何其他任务 in 中。当第一个线程完成时 处理其任务时,它被返回到空闲线程池,准备处理 新任务。线程池允许您并发处理连接, 增加服务器的吞吐量。
我们将池中的线程数限制为较小的数字以保护我们 拒绝服务 (DoS) 攻击;如果我们让我们的程序创建一个新线程 对于每个收到的请求,有人向我们的 服务器可能会耗尽我们服务器的所有资源并磨练 停止处理请求。
因此,我们将拥有固定数量的
在池中等待的线程。传入的请求将发送到池
加工。池将维护传入请求的队列。每个
池中的线程将从这个队列中弹出一个请求,处理该请求,
,然后向队列请求另一个请求。有了这个设计,我们可以处理
自N
requests 并发,其中N
是线程数。如果每个
线程正在响应长时间运行的请求,后续请求仍可以
备份到队列中,但我们增加了长时间运行的请求的数量
我们可以在达到那个点之前处理。
这种技术只是提高 Web 吞吐量的众多方法之一 服务器。您可以探索的其他选项包括 fork/join 模型、单线程异步 I/O 模型或多线程异步 I/O 模型。如果 您对此主题感兴趣,您可以阅读有关其他解决方案和 尝试实施它们;对于像 Rust 这样的低级语言,所有这些 选项是可能的。
在我们开始实现线程池之前,我们先谈谈如何使用 pool 应该看起来像 .当您尝试设计代码时,编写客户端 Interface First 可以帮助指导您的设计。编写代码的 API,使其 以您想要的方式构建;然后实现该功能 而不是实现功能,然后 设计公共 API。
类似于我们在第 12 章项目中使用测试驱动开发的方式, 我们将在此处使用编译器驱动的开发。我们将编写调用 函数,然后我们将查看编译器中的错误以确定 我们接下来应该更改什么才能让代码工作。但是,在我们这样做之前, 我们将探索不打算用作起点的技术。
为每个请求生成一个线程
首先,让我们来探讨一下,如果代码确实为
每一个连接。如前所述,这不是我们的最终计划,因为
问题,但这是一个
首先获取正常工作的多线程服务器的起点。然后,我们将添加
thread pool 作为一种改进,对比这两种解决方案将是
容易。示例 20-11 显示了要对main
生成新线程
要处理for
圈。
文件名: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
示例 20-11:为每个 流
正如您在第 16 章中学到的,thread::spawn
将创建一个新线程,然后
在新线程中运行 Closure 中的代码。如果你运行这段代码并在浏览器中加载 /sleep,然后在另外两个浏览器选项卡中加载 /sleep,你确实会看到
对 / 的请求不必等待 /sleep 完成。但是,由于
我们提到,这最终会压垮系统,因为你会让
没有任何限制的新线程。
创建有限数量的线程
我们希望我们的线程池以类似、熟悉的方式工作,因此从
线程池不需要对使用
我们的 API。示例 20-12 显示了ThreadPool
struct 而不是thread::spawn
.
文件名: src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
示例 20-12:我们的理想ThreadPool
接口
我们使用ThreadPool::new
创建具有可配置编号的新线程池
线程,在本例中为 4 个。然后,在for
圈pool.execute
具有
与thread::spawn
因为它需要一个关闭,池应该
run 的我们需要实施pool.execute
所以它需要
closure 并将其提供给池中的线程运行。此代码尚未
compile,但我们会尝试让编译器指导我们如何修复它。
建筑ThreadPool
使用 Compiler Driven Development
将示例 20-12 中的 src/main.rs 修改,然后让我们使用
编译器错误cargo check
以推动我们的发展。这是第一个
我们得到的错误:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
伟大!这个错误告诉我们需要一个ThreadPool
type 或 module,因此我们将
立即构建一个。我们ThreadPool
implementation 将独立于 kind
我们的 Web 服务器正在做的工作。那么,让我们切换一下hello
crate 来自
binary crate 复制到库 crate 中,以保存我们的ThreadPool
实现。后
我们改成一个库 crate,我们也可以使用单独的线程池
库来执行我们想要使用线程池做的任何工作,而不仅仅是用于服务
Web 请求。
创建一个包含以下内容的 src/lib.rs,这是最简单的
a 的定义ThreadPool
struct 的 Structure:
文件名: src/lib.rs
pub struct ThreadPool;
然后编辑 main.rs 文件以引入ThreadPool
into 范围
crate,将以下代码添加到 src/main.rs 的顶部:
文件名: src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
这段代码仍然不起作用,但让我们再次检查它以获取下一个错误 我们需要解决:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
此错误表示接下来我们需要创建一个名为new
为ThreadPool
.我们也知道new
需要有一个参数
可以接受4
作为参数,并且应该返回一个ThreadPool
实例。
让我们实现最简单的new
函数将具有这些
特性:
文件名: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
我们选择了usize
作为size
参数,因为我们知道
负数线程没有任何意义。我们也知道我们将使用它
4 作为线程集合中的元素数,即usize
type 用于,如第 3 章的 “整数类型” 部分所述。
让我们再次检查代码:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
现在出现错误是因为我们没有execute
method 开启ThreadPool
.
回想一下“创建有限数量的
Threads“部分,我们
决定我们的线程池应该有一个类似于thread::spawn
.在
此外,我们将实现execute
函数,因此它接受它的
given,并将其提供给池中的空闲线程运行。
我们将定义execute
method 开启ThreadPool
将闭包作为
参数。从“将捕获的值移出 Closure 和Fn
性状”第 13 章中我们可以采取的
closure 作为具有三个不同特征的参数:Fn
,FnMut
和FnOnce
.我们需要决定在这里使用哪种 closure 类型。我们知道我们会
最终会做一些类似于 Standard Library 的作thread::spawn
implementation 的 Signature 的 Signature 的边界,这样我们就可以看看thread::spawn
has 的参数。该文档向我们展示了以下内容:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
这F
type 参数是我们在这里关注的那个;这T
类型
parameter 与返回值相关,我们不关心这一点。我们
可以看到spawn
使用FnOnce
作为 trait bound onF
.这可能是
我们想要的,因为我们最终会传递我们得到的参数execute
自spawn
.我们可以进一步确信FnOnce
是我们的特点
want to use,因为用于运行请求的线程只会执行该
request 的闭包,它与Once
在FnOnce
.
这F
type 参数也具有 trait 绑定Send
和生命周期绑定'static
,这在我们的情况中很有用:我们需要Send
要传输
从一个线程到另一个线程的 closure 和'static
因为我们不知道多久
线程将执行。让我们创建一个execute
method 开启ThreadPool
,它将采用F
具有以下边界:
文件名: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们仍然使用 after()
FnOnce
因为这个FnOnce
表示一个闭包
,它不带任何参数并返回 Unit 类型 。就像功能一样
定义,则可以从签名中省略返回类型,但即使我们
没有参数,我们仍然需要括号。()
同样,这是execute
方法:确实
什么都没有,但我们只是尝试让我们的代码编译。让我们再检查一次:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
它编译!但请注意,如果您尝试cargo run
并在
browser,您将在浏览器中看到我们在
章节。我们的库实际上并没有调用传递给execute
还!
注意:您可能听说过使用严格编译器的语言,例如 Haskell 和 Rust 的意思是“如果代码编译,它就可以工作”。但这种说法并非如此 普遍正确。我们的项目可以编译,但它绝对不做任何事情!如果我们 正在构建一个真实、完整的项目,这将是一个开始的好时机 编写单元测试来检查代码是否可编译并具有 要。
验证 中的 线程数new
我们没有对参数执行任何作new
和execute
.让我们
使用我们想要的行为实现这些函数的主体。首先,
让我们考虑一下new
.之前,我们为size
参数,因为线程数为负数的池没有意义。
但是,线程为零的池也没有意义,但 Zero 是完美的
有效usize
.我们将添加代码来检查size
之前大于零
我们返回一个ThreadPool
实例,如果程序收到
zero 使用assert!
宏,如示例 20-13 所示。
文件名: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
示例 20-13:实现ThreadPool::new
如果出现 panicsize
为零
我们还为ThreadPool
带有 doc 注释。
请注意,我们遵循了良好的文档实践,添加了一个部分
调用函数可能 panic 的情况,如
第 14 章.尝试运行cargo doc --open
并单击ThreadPool
结构
查看生成的 docs 的用途new
肖!
而不是添加assert!
宏,我们可以更改new
到build
并返回一个Result
就像我们对Config::build
在 I/O 中
project 中的实例。但是我们决定,在本例中,尝试创建一个
没有任何线程的线程池应该是不可恢复的错误。如果你是
雄心勃勃,尝试编写一个名为build
替换为
签名与new
功能:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
创建空间以存储线程
现在我们有了一种方法来知道我们有有效数量的线程要存储
池中,我们可以创建这些线程并将它们存储在ThreadPool
结构
在返回结构体之前。但是我们如何 “存储” 线程呢?让我们再来看一个
看看thread::spawn
签名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
这spawn
函数返回一个JoinHandle<T>
哪里T
是
closure 返回。让我们尝试使用JoinHandle
也,看看会发生什么。在我们的
case 中,我们传递给线程池的闭包将处理 Connection
并且不返回任何内容,因此T
将为 单位类型 。()
示例 20-14 中的代码可以编译,但还没有创建任何线程。
我们更改了ThreadPool
来保存thread::JoinHandle<()>
实例,将 vector 初始化为size
,设置for
循环,它将运行一些代码来创建线程,以及
返回了ThreadPool
实例。
文件名: src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
示例 20-14:为 创建 vectorThreadPool
按住
线程
我们带来了std::thread
到 library crate 中,因为我们是
用thread::JoinHandle
作为向量中项目的类型ThreadPool
.
收到有效大小后,我们的ThreadPool
创建一个新的 vector
拿size
项目。这with_capacity
函数执行与Vec::new
但有一个重要的区别:它在
向量。因为我们知道我们需要存储size
元素中,执行
这种预先分配比使用Vec::new
,
它会在插入元素时调整自身大小。
当您运行cargo check
同样,它应该会成功。
一个Worker
负责从ThreadPool
到线程
我们在for
循环中关于
线程。在这里,我们将看看我们实际上是如何创建线程的。标准
library 提供thread::spawn
作为创建线程的一种方式,以及thread::spawn
期望获取一些代码,线程应该在
线程。但是,在我们的例子中,我们想要创建线程并拥有
它们等待我们稍后发送的代码。标准库的
threads 的实现不包括任何执行此作的方法;我们必须
手动实现它。
我们将通过在ThreadPool
以及将管理此新行为的线程。我们将调用
this data structure Worker,这是池化中的常用术语
实现。Worker 拾取需要运行的代码,并运行
code 在 Worker 的线程中。想想在
restaurant:工作人员等待客户收到订单,然后
他们负责接受这些订单并完成它们。
而不是存储JoinHandle<()>
线程池中的实例,
我们将存储Worker
结构。每Worker
将存储单个JoinHandle<()>
实例。然后,我们将在Worker
那会
获取要运行的代码的闭包,并将其发送到已在运行的线程
执行。我们还将为每个 worker 提供一个id
因此我们可以区分
日志记录或调试时池中的不同工作程序。
以下是我们创建ThreadPool
.我们将
实现将闭包发送到线程的代码Worker
按以下方式设置:
- 定义一个
Worker
结构体,该结构包含一个id
以及JoinHandle<()>
. - 改变
ThreadPool
来保存Worker
实例。 - 定义一个
Worker::new
函数,该函数采用id
number 并返回一个Worker
实例,该实例保存id
以及一个由空的 关闭。 - 在
ThreadPool::new
,请使用for
循环计数器生成id
创造 一个新的Worker
有了那个id
,并将 worker 存储在 vector 中。
如果您准备好迎接挑战,请尝试在此之前自行实施这些更改 看看示例 20-15 中的代码。
准备?这是示例 20-15,其中一种方法可以进行上述修改。
文件名: src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
示例 20-15:修改ThreadPool
按住Worker
实例,而不是直接持有线程
我们更改了 上的字段名称ThreadPool
从threads
自workers
因为它现在持有Worker
实例而不是JoinHandle<()>
实例。我们在for
loop 作为参数传递给Worker::new
,并将每个新的Worker
在名为workers
.
外部代码(如我们在 src/main.rs 中的服务器)不需要知道
有关使用Worker
struct 中的ThreadPool
,
因此,我们制作了Worker
struct 及其new
function private 函数。这Worker::new
函数使用id
我们给它并存储一个JoinHandle<()>
实例,该实例是通过使用空闭包生成新线程创建的。
注意:如果作系统无法创建线程,因为没有
足够的系统资源,thread::spawn
会恐慌。这将导致我们的
整个服务器都会导致 panic,即使创建某些线程可能会
成功。为了简单起见,这种行为很好,但在 Production 中
thread pool 实现,您可能希望使用std::thread::Builder
及其spawn
方法,该方法返回Result
相反。
此代码将编译并存储Worker
实例
指定为ThreadPool::new
.但我们仍然没有处理
我们得到的闭包execute
.让我们看看接下来如何做到这一点。
通过通道向线程发送请求
我们要解决的下一个问题是,给thread::spawn
做
绝对没有。目前,我们得到了我们想要在execute
方法。但我们需要付出thread::spawn
一个 Closure 来运行。
创建每个Worker
在创建ThreadPool
.
我们希望Worker
我们刚刚创建的结构体,用于获取要从中运行的代码
在ThreadPool
并将该代码发送到其线程以运行。
我们在第 16 章中学到的通道 — 一种简单的通信方式
两个线程 - 将非常适合此使用案例。我们将使用一个 channel 来运作
作为作业队列,将execute
将从ThreadPool
自
这Worker
实例,这会将作业发送到其线程。这是计划:
- 这
ThreadPool
将创建一个频道并保留 Sender。 - 每
Worker
将紧紧抓住接收器。 - 我们将创建一个新的
Job
struct 来保存我们想要发送的闭包 沿着海峡而下。 - 这
execute
方法将发送它想要执行的作业,通过 寄件人。 - 在其线程中,
Worker
将遍历其接收器并执行 关闭它收到的任何 job。
让我们从 在ThreadPool::new
并按住发件人
在ThreadPool
实例,如示例 20-16 所示。这Job
结构
目前不包含任何内容,但将是我们将发送的物品类型
频道。
文件名: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
示例 20-16:修改ThreadPool
要存储
发送Job
实例
在ThreadPool::new
,我们创建新通道并让池保存
寄件人。这将成功编译。
让我们尝试将通道的接收器作为线程池传递给每个 worker
创建频道。我们知道我们想在线程中使用接收器,
worker 生成,因此我们将引用receiver
参数。这
示例 20-17 中的代码还不能完全编译。
文件名: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
示例 20-17:将接收器传递给 worker
我们做了一些小而直接的更改:我们将接收器传递给Worker::new
,然后在闭包中使用它。
当我们尝试检查此代码时,我们收到以下错误:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
代码正在尝试传递receiver
到多个Worker
实例。这
不起作用,因为您会记得第 16 章:通道实现
Rust 提供多个生产者、单个消费者。这意味着我们不能
只需克隆通道的消费端即可修复此代码。我们也不
希望向多个使用者多次发送消息;我们想要一个列表
的消息,以便每条消息都得到处理一次。
此外,从通道队列中移除作业涉及更改receiver
,因此线程需要一种安全的方式来共享和修改receiver
;
否则,我们可能会得到 race conditions (如 Chapter 16 所述)。
回想一下第 16 章:共享中讨论的线程安全智能指针
所有权,并允许线程更改值,则
需要使用Arc<Mutex<T>>
.这Arc
类型将允许多个 worker 拥有
receiver 和Mutex
将确保只有一个 worker 从
接收器。示例 20-18 显示了我们需要做的更改。
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
示例 20-18:在 worker 之间共享接收器
用Arc
和Mutex
在ThreadPool::new
,我们将接收器放在Arc
以及Mutex
.对于每个
new worker 中,我们克隆Arc
以增加引用计数,以便 worker 可以
接管人的股份所有权。
通过这些更改,代码将编译!我们快到了!
实施execute
方法
最后,我们来实现execute
method 开启ThreadPool
.我们还将更改Job
从结构体更改为 trait 对象的类型别名,该对象持有
closure 的execute
接收。如“创建类型同义词
with Type Aliases“部分,类型别名允许我们缩短
易于使用。请看示例 20-19。
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
示例 20-19:创建一个Job
type 别名Box
保存每个闭包,然后将作业发送到通道
创建新的Job
实例使用我们得到的execute
我们
将该作业发送到通道的发送端。我们正在调用unwrap
上send
对于发送失败的情况。例如,如果我们
停止执行所有线程,这意味着接收端已停止
接收新消息。目前,我们无法阻止我们的线程
执行:只要池存在,我们的线程就会继续执行。这
我们使用的原因unwrap
是我们知道失败的情况不会发生,但是
编译器不知道这一点。
但我们还没有完全完成!在 worker 中,我们的 closure 被传递给thread::spawn
still 仅引用 channel 的接收端。
相反,我们需要 Closure 永远循环,要求
channel 的 channel,并在获取 job 时运行 Job。让我们做出改变
如示例 20-20 所示到Worker::new
.
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
示例 20-20:接收和执行 worker 的线程
在这里,我们首先调用lock
在receiver
获取互斥锁,然后我们
叫unwrap
对任何错误产生 panic。如果互斥锁
处于中毒状态,如果其他线程在
持有锁而不是释放锁。在这种情况下,调用unwrap
让这个线程 panic 是正确的作。随意
更改此项unwrap
更改为expect
带有一条对
你。
如果我们获得互斥锁,则调用recv
以接收Job
从
渠道。决赛unwrap
也移过此处可能发生的任何错误
如果保存发送方的线程已关闭,类似于send
method 返回Err
如果接收器关闭。
对recv
块,因此如果还没有 job,则当前线程将
等待作业可用。这Mutex<T>
确保只有一个Worker
thread 正在尝试请求作业。
我们的线程池现在处于工作状态!给它一个cargo run
并制作一些
请求:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功!现在,我们有一个异步执行连接的线程池。 创建的线程永远不会超过四个,因此我们的系统不会获得 如果服务器收到大量请求,则为 overloaded。如果我们向 /sleep 发出请求,服务器将能够通过让另一个 thread 运行它们。
注意:如果您同时在多个浏览器窗口中打开 /sleep,则它们 可能会以 5 秒的间隔一次加载一个。某些 Web 浏览器执行 出于缓存原因,按顺序执行同一请求的多个实例。这 限制不是由我们的 Web 服务器引起的。
了解了while let
循环中,你可能想知道
为什么我们没有编写示例 20-21 所示的 worker 线程代码。
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
示例 20-21:的另一种实现Worker::new
用while let
此代码可编译并运行,但不会产生所需的线程
行为:慢速请求仍会导致其他请求等待
处理。原因有点微妙:Mutex
struct 没有 publicunlock
方法,因为锁的所有权基于
这MutexGuard<T>
在LockResult<MutexGuard<T>>
该lock
method 返回。在编译时,借用检查器可以强制执行该规则
由Mutex
无法访问,除非我们持有
锁。但是,此实现也可能导致锁被持有
如果我们不注意MutexGuard<T>
.
示例 20-20 中使用let job = receiver.lock().unwrap().recv().unwrap();
有效,因为使用let
任何
表达式中使用的 Temporary 值位于 equals 的右侧
符号会立即丢弃let
语句结束。然而while let
(以及if let
和match
) 在
关联的块。在示例 20-21 中,锁在 continue continue 中保持
对job()
,这意味着其他工作人员无法接收作业。
本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准