线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。
线程池Pool
pub struct Pool { max_workers: usize, // 定义最大线程数}impl Pool { fn new(max_workers: usize) -> Pool {} fn execute
用execute来执行任务,F: Fnonce() + 'static + Send 是使用thread::spawn线程执行需要满足的trait, 代表F是一个能在线程里执行的闭包函数。
另一点自然而然会想到在Pool添加一个线程数组, 这个线程数组就是用来执行任务的。比如Vec
可以看作在一个线程里不断执行获取任务并执行的Worker。
struct Worker where{ _id: usize, // worker 编号}
要怎么把任务发送给Worker执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,let (tx, rx) = mpsc::channel() 可以获取到一对发送端和接收端。
把发送端添加到Pool里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。
这里有一点需要特别注意,channel的接收端receiver需要安全的在多个线程间共享,因此需要用Arc
Pool的完整定义
pub struct Pool { workers: Vec
该是时候定义我们要发给Worker的消息Message了
定义如下的枚举值
type Job = Box
Job是一个要发送给Worker执行的闭包函数,这里ByeBye用来通知Worker可以终止当前的执行,退出线程。
只剩下实现Worker和Pool的具体逻辑了。
Worker的实现
impl Worker{ fn new(id: usize, receiver: Arc::
let message = receiver.lock().unwrap().recv().unwrap(); 这里获取锁后从receiver获取到消息体,然后let message结束后rust的生命周期会自动释放掉锁。
但如果写成
while let message = receiver.lock().unwrap().recv().unwrap() {};
while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面let message要锁定久时间。
rust的mutex锁没有对应的unlock方法,由mutex的生命周期管理。
我们给Pool实现Drop trait, 让Pool被销毁时,自动暂停掉worker线程的执行。
impl Drop for Pool { fn drop(&mut self) { for _ in 0..self.max_workers { self.sender.send(Message::ByeBye).unwrap(); } for w in self.workers.iter_mut() { if let Some(t) = w.t.take() { t.join().unwrap(); } } }}
drop方法里面用了两个循环,而不是在一个循环里做完两件事?
for w in self.workers.iter_mut() { if let Some(t) = w.t.take() { self.sender.send(Message::ByeBye).unwrap(); t.join().unwrap(); }}
这里面隐藏了一个会造成死锁的陷阱,比如两个Worker, 在单个循环里面迭代所有Worker,再将终止信息发送给通道后,直接调用join,
我们预期是第一个worker要收到消息,并且等他执行完。当情况可能是第二个worker获取到了消息,第一个worker没有获取到,那接下来的join就会阻塞造成死锁。
注意到没有,Worker是被包装在Option内的,这里有两个点需要注意
t.join 需要持有t的所有权在我们这种情况下,self.workers只能作为引用被for循环迭代。 这里考虑让Worker持有Option
换而言之,让运行中的worker持有Some的变体,清理worker时,可以使用None替换掉Some,从而让Worker失去可以运行的线程
struct Worker where{ _id: usize, t: Option
Mutex依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁Vec