欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Python之路34:万字总结:并发与并行、锁(GIL、同步锁、死锁与递归锁)、信号量、线程队列、生消模型、进程(基础使用、进程通信、进程池、回调函数)、协程

时间:2023-05-31

内容:

同步锁

死锁、递归锁

信号量和同步对象(暂时了解即可)

队列------生产者和消费者模型

进程(基础使用、进程通信、进程池、回调函数)

协程

一、并发并行与同步异步的概念 1.1、并发和并行 概念

并行处理(Parallel Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。

并发处理(concurrency Processing):指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行

简单来说:

并发:指系统具有处理多个任务的能力并行:指系统具有 同时 处理多个任务的能力

并发的关键是你有处理多个任务的能力,不一定要同时。

并行的关键是你有同时处理多个任务的能力。

所以说,并行是并发的子集

​ 无论是并行还是并发,在用户看来都是’同时’运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)

并行: 并行:同时运行,只有具备多个cpu才能实现并行

​ 单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)

​ 有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,

​ 一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术

​ 而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行

1.2、同步与异步 概念

当进程执行到一个IO(等待外部数据)的时候

同步:等异步:不等,一直到数据接受成功过,再回来处理 二、GIL(Global Interpreter Lock) 前言

'''定义:In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once、This lock is necessary mainly because CPython’s memory management is not thread-safe、(However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)'''结论:在Cpython解释器中,同一个进程下开启的多线程,无论你开启多少个线程、你有多少个CPU,Python在执行的时候会淡定的在同一时刻只允许一个线程运行,无法利用多核优势

PS:

​ 首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf

所以,你想用Python在一个进程下跑多个线程,不可能

可能大家会想:我靠,学毛的Python!!

且看下文

2.1、GIL的概念

​ GIL:Global Interpreter Lock,又叫全局解释器锁,每个线程在执行的过程中都需要先获取GIL,保证同一时刻只有一个线程在运行,目的是解决多线程同时竞争程序中的全局变量而出现的线程安全问题。

​ GIL其本质类似操作系统的Mutex。

​ GIL的功能:在CPython解释器中执行的每一个Python线程,都会先锁住自己,以阻止别的线程执行

​ CPython引进GIL,可以最大程度上规避内存管理这样复杂的竞争风险问题

​ 当然,CPython不可能容忍一个线程一直独占解释器,它会轮流执行Python线程。这样一来,用户看到的就是“伪”并行,即Python线程在交替执行,来模拟真正并行的线程

2.2、关于 IO密集型任务 与 计算密集型任务 1、概念

计算密集型:

​ 要进行大量的数值计算,例如进行上亿的数字计算、计算圆周率、对视频进行高清解码等等。这种计算密集型任务虽然也可以用多任务完成,但是花费的主要时间在任务切换的时间,此时CPU执行任务的效率比较低。

IO密集型:

​ 涉及到网络请求(time.sleep())、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。

​ 有了GIL的存在,同一时刻同一进程中只有一个线程被执行

​ 听到这里,有的同学立马质问:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了,php才是最牛逼的语言?

​ No,No,No,要解决这个问题,我们需要在几个点上达成一致:

1、cpu到底是用来做计算的,还是用来做I/O的?2、多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能3、每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处

引例:

​ 一个工人相当于cpu,此时计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。

如果你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一个人,在等材料的过程中让工人去干别的活,

反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高

结论:

对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用

当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地

2、任务分析

#(1)分析:我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:方案一:开启四个进程方案二:一个进程下,开启四个线程#(2)单核情况下,分析结果:   如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜  如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜#(3)多核情况下,分析结果:  如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜  如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜 #(4)结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

3、总结

对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用

对于IO密集型任务:

​ Python的多线程是有意义的,可以采用 多进程 + 协程

对于计算密集型任务:

​ Python的多线程就不推荐,多进程效率相对较高,但本质上讲:Python就不适用了

应用:

多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析

三、同步锁 3.1、引例

下面我们通过几个小例子来了解同步锁

示例代码一:

我们设置一个全局变量num = 100,执行100次函数subtractNum,每次执行 num 都减去 1

import timeimport threadingnum = 100 # 设定一个共享变量thread_list = []def subtractNum(): global num # 在每个线程中都获取这个全局变量 num -= 1 # temp = num # # print('--get num:',num ) # time.sleep(0.1) # num = temp - 1 # 对此公共变量进行-1操作for i in range(100): t = threading.Thread(target=subtractNum) t.start() thread_list.append(t)for t in thread_list: # 等待所有线程执行完毕 t.join()print('final num:', num)

运行结果:

final num: 0进程已结束,退出代码为 0

示例代码二:

对函数 subtractNum 稍作修改

我们添加temp = num 、 num = temp 、time.sleep

都是为了使得subtractNum()的执行时间变长

import timeimport threadingnum = 100 # 设定一个共享变量thread_list = []def subtractNum(): global num # 在每个线程中都获取这个全局变量 # num -= 1 temp = num print('--get num:', num) time.sleep(0.1) num = temp - 1 # 对此公共变量进行-1操作for i in range(100): t = threading.Thread(target=subtractNum) t.start() thread_list.append(t)for t in thread_list: # 等待所有线程执行完毕 t.join()print('final num:', num)

运行结果:

--get num: 100--get num: 100--get num: 100--get num: 100--get num: 100--get num: 100--get num: 100...........final num: 99进程已结束,退出代码为 0

为啥示例代码二 num 最终不是 0 了呢??

因为我们的减法是在一个子线程结束才能实现的,而在示例代码二中,(受time.sleep等的影响)第一个开启的线程还没结束,100个线程已经全部开启,每个线程拿到的num值都是100,而不是递减得到的结果

我们可以修改time.sleep的时间,num的最终值可以发生改变

如:我的电脑

time.sleep(0.0000000001)

运行结果:

--get num: 100--get num: 100.........--get num: 100--get num: 100--get num: 100--get num: 99--get num: 99final num: 98

示例代码一中实现了最终num = 0 是因为计算实在太快了,下一个线程还没有开始,上一个线程已经计算结束了

总结:

出现线程安全问题

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gKx8GPUO-1643895313065)(C:UserspcAppDataRoamingTyporatypora-user-imagesimage-20220202154623848.png)]

多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?(join会造成串行,失去所线程的意义)

==> 同步锁

3.2、同步锁

对上面的示例代码稍做修改,添加三行代码即可

l = threading.Lock()l.acquire()l.release()

import timeimport threadingnum = 100 # 设定一个共享变量thread_list = []l = threading.Lock()def subtractNum(): global num # 在每个线程中都获取这个全局变量 l.acquire() # num -= 1 temp = num print('--get num:', num) time.sleep(0.01) num = temp - 1 # 对此公共变量进行-1操作 l.release()for i in range(100): t = threading.Thread(target=subtractNum) t.start() thread_list.append(t)for t in thread_list: # 等待所有线程执行完毕 t.join()print('final num:', num)

运行结果:

--get num: 100--get num: 99......--get num: 1final num: 0进程已结束,退出代码为 0

四、死锁与递归锁 4.1、死锁

死锁:

是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态

下面是一个死锁的例子:

from threading import Thread, Lockimport time# 创建A、B两把锁mutexA = Lock()mutexB = Lock()class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s 拿到A锁' % self.name) # self.name 对应线程的名字,系统会自动给线程赋名 mutexB.acquire() print('%s 拿到B锁' % self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s 拿到B锁' % self.name) time.sleep(2) mutexA.acquire() print('%s 拿到A锁' % self.name) mutexA.release() mutexB.release()if __name__ == '__main__': for i in range(5): t = MyThread() t.start()

运行结果:

Thread-1 拿到A锁Thread-1 拿到B锁Thread-1 拿到B锁Thread-2 拿到A锁

然后程序就卡在这了

解决办法:使用递归锁

4.2、递归锁

在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

示例代码:

from threading import Thread, Lock, RLockimport timemutexA = mutexB = RLock()'''一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待等待该线程释放所有锁,即counter递减到0为止'''class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() # count = 1 print('%s 拿到A锁' % self.name) # self.name 对应线程的名字,系统会自动给线程赋名 mutexB.acquire() # count = 2 print('%s 拿到B锁' % self.name) mutexB.release() # count = 1 mutexA.release() # count = 0 def func2(self): mutexB.acquire() print('%s 拿到B锁' % self.name) time.sleep(2) mutexA.acquire() print('%s 拿到A锁' % self.name) mutexA.release() mutexB.release()if __name__ == '__main__': for i in range(5): t = MyThread() t.start()

运行结果:

Thread-1 拿到A锁Thread-1 拿到B锁Thread-1 拿到B锁Thread-1 拿到A锁Thread-2 拿到A锁Thread-2 拿到B锁Thread-2 拿到B锁Thread-2 拿到A锁Thread-4 拿到A锁Thread-4 拿到B锁Thread-4 拿到B锁Thread-4 拿到A锁Thread-3 拿到A锁Thread-3 拿到B锁Thread-3 拿到B锁Thread-3 拿到A锁Thread-5 拿到A锁Thread-5 拿到B锁Thread-5 拿到B锁Thread-5 拿到A锁进程已结束,退出代码为 0

五、同步对象(Event)

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

event.isSet() # 返回event的状态值;event.wait() # 如果 event.isSet()==False将阻塞线程;event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;event.clear() # 恢复event的状态值为False。

示例代码:

我们创建一个老板与员工下班的故事(一下是废话,直接看代码就好):

老板说:*点才能下班 (此时event为Flase)

然后设置event为True

这时收到event为True

员工开始抱怨“哎……命苦啊!”,之后恢复event的状态值为False。

老板说:“下班了”

然后设置event为True

这时收到event为True

员工开始说“OhYeah!”

总之,老板通过event这个信号量控制员工是否可以说话(线程是否被阻塞)

import threadingimport timeclass Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") print(event.isSet()) # False event.set() # 开始打印“Worker:哎……命苦啊!” time.sleep(5) print("BOSS:<22:00>可以下班了。") print(event.isSet()) event.set() # 开始打印“Worker:OhYeah!”class Worker(threading.Thread): def run(self): event.wait() # event = False, 将阻塞线程, 一旦event被设定,它就是pass print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!")if __name__ == "__main__": # 创建五个Worker和一个Boss event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()

运行结果:

BOSS:今晚大家都要加班到22:00。FalseWorker:哎……命苦啊!Worker:哎……命苦啊!Worker:哎……命苦啊!Worker:哎……命苦啊!Worker:哎……命苦啊!BOSS:<22:00>可以下班了。FalseWorker:OhYeah!Worker:OhYeah!Worker:OhYeah!Worker:OhYeah!Worker:OhYeah!进程已结束,退出代码为 0

六、信号量(Semaphore)

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

BoundedSemaphore与Semaphore的唯一区别:

前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

示例代码:

import threading,timeclass myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release()if __name__=="__main__": semaphore=threading.Semaphore(5) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()

运行结果:

Thread-1Thread-2Thread-3Thread-4Thread-5Thread-6Thread-7Thread-8.....Thread-95Thread-94Thread-99Thread-98Thread-100Thread-96Thread-97进程已结束,退出代码为 0

七、线程队列

首先明白“队列”是多线程的利器,抛开多线程,列表就行了

7.1、对多线程,列表是不安全的数据结构

示例代码:

我们想用多线程不断remove一个列表的元素

我们每开一个线程,都让他去remove列表的最后一个元素

import threadingimport timel = [1, 2, 3, 4, 5]def pri(): while l: a = l[-1] print(a) time.sleep(1) try: l.remove(a) except Exception as e: print('----', a, e)t1 = threading.Thread(target=pri, args=())t1.start()t2 = threading.Thread(target=pri, args=())t2.start()

运行结果:

554---- 5 list.remove(x): x not in list43---- 4 list.remove(x): x not in list32---- 3 list.remove(x): x not in list21---- 2 list.remove(x): x not in list1---- 1 list.remove(x): x not in list进程已结束,退出代码为 0

很乱,按之前学的内容,我们肯定要加各种锁

但其实,完成上述功能,可以使用队列queue

7.2、队列 (1)方法介绍

import Queue

q = Queue.Queue(maxsize = 10)

创建一个“队列对象“
Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

q.put(10)

将一个值放入队列中

调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

q.get()

将一个值从队列中取出
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Python Queue模块有三种队列及构造函数:

1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):

q.qsize() 返回队列的大小q.empty() 如果队列为空,返回True,反之Falseq.full() 如果队列满了,返回True,反之Falseq.full 与 maxsize 大小对应q.get([block[, timeout]]) 获取队列,timeout等待时间q.get_nowait() 相当q.get(False)非阻塞 q.put(item) 写入队列,timeout等待时间q.put_nowait(item) 相当q.put(item, False)q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号q.join() 实际上意味着等到队列为空,再执行别的操作

(2)示例

示例代码一:

先进先出

import queue# 先进后出q = queue.LifoQueue()q.put(34)q.put(56)q.put(12)while True: data = q.get() print(data)

运行结果:

125634

示例代码二:

优先级

import queue# 优先级q = queue.PriorityQueue()q.put([5, 100]) # 第五级,内容为"100"q.put([7, 200]) # 第七级,内容为"200"q.put([3, "hello"]) # 第三级,内容为"hello"q.put([4, {"name": "alex"}])while True: data = q.get() print(data)

运行结果:

[3, 'hello'][4, {'name': 'alex'}][5, 100][7, 200]

八、生产者消费者模型 8.1、为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

8.2、什么是生产者消费者模式 (1)概念

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

(2)示例

创建一个做包子的老板(Producer)和消费者(Consumer)的故事

import queueimport randomimport threadingimport timeq = queue.Queue()def Producer(name): count = 0 while count < 10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' % (name, count)) count += 1 # q.task_done() # q.join() print("ok......")def Consumer(name): count = 0 while count < 10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() # q.task_done() # q.join() print(data) print('33[32;1mConsumer %s has eat %s baozi...33[0m' % (name, data)) else: print("-----no baozi anymore----") count += 1p1 = threading.Thread(target=Producer, args=('A',))c1 = threading.Thread(target=Consumer, args=('B',))# c2 = threading.Thread(target=Consumer, args=('C',))# c3 = threading.Thread(target=Consumer, args=('D',))p1.start()c1.start()# c2.start()# c3.start()

运行结果:

making........Producer A has produced 0 baozi..0ok......making........Consumer B has eat 0 baozi...Producer A has produced 1 baozi..ok......making........1Consumer B has eat 1 baozi...-----no baozi anymore----Producer A has produced 2 baozi..2ok......making........Producer A has produced 3 baozi..Consumer B has eat 2 baozi...ok......making........Producer A has produced 4 baozi..ok......making........3Consumer B has eat 3 baozi...Producer A has produced 5 baozi..ok......making........4Consumer B has eat 4 baozi...Producer A has produced 6 baozi..ok......making........Producer A has produced 7 baozi..ok......making........5Consumer B has eat 5 baozi...Producer A has produced 8 baozi..ok......making........Producer A has produced 9 baozi..ok......6Consumer B has eat 6 baozi...7Consumer B has eat 7 baozi...8Consumer B has eat 8 baozi...进程已结束,退出代码为 0

拓展:

添加 消费者C 与 消费者D

使用 q.task_done() 与 q.join() 完善代码

九、进程

仅看使用方法,跟线程几乎一样,还是那几个功能

5.1、进程的调用 (1)方法一:

from multiprocessing import Processimport timedef func(name): time.sleep(1) print('hello', name, time.ctime())if __name__ == '__main__': p_list = [] for i in range(3): p = Process(target=func, args=('coder',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')

运行结果:

hello coder Thu Feb 3 14:45:55 2022hello coder Thu Feb 3 14:45:55 2022hello coder Thu Feb 3 14:45:55 2022end进程已结束,退出代码为 0

(2)方法二:

from multiprocessing import Processimport timeclass MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): time.sleep(1) print('hello', self.name, time.ctime())if __name__ == '__main__': p_list = [] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')

运行结果:

hello MyProcess-2 Thu Feb 3 14:48:28 2022hello MyProcess-1 Thu Feb 3 14:48:28 2022hello MyProcess-3 Thu Feb 3 14:48:28 2022end进程已结束,退出代码为 0

(3)用PID了解父进程子进程

from multiprocessing import Processimport osimport timedef info(title): print("title:", title) print('parent process:', os.getppid()) # os.getppid() ==》 父进程的PID print('process id:', os.getpid()) # os.getpid() ==》 本身的PIDdef f(name): info('function f') print('hello', name)if __name__ == '__main__': info('main process line') time.sleep(1) print("------------------") p = Process(target=info, args=('coder',)) p.start() p.join()

运行结果:

title: main process lineparent process: 21960process id: 7088------------------title: coderparent process: 7088process id: 14012进程已结束,退出代码为 0

5.2、进程的相关方法

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])  group: 线程组,目前还没有实现,库引用中提示必须是None;   target: 要执行的方法;   name: 进程名;   args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。  start():进程准备就绪,等待CPU调度  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样  name:进程名字。  pid:进程号。

5.3、进程间通信

目的:

进程间互传消息共享数据

内容:

通信:

队列 Queue管道 Pipe

数据共享:

manager

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

(1)进程队列Queue

进程间通信(IPC)方式二:队列(推荐使用)

from multiprocessing import Process, Queueimport queuedef f(q, n): # q.put([123, 456, 'hello']) q.put(n) print("son process", id(q))if __name__ == '__main__': q = Queue() # try: q=queue.Queue() print("main process", id(q)) for i in range(3): p = Process(target=f, args=(q, i)) p.start() print(q.get()) print(q.get()) print(q.get())

运行结果:

main process 2126061918528son process 18518087507520son process 30302407958081son process 24155666587202进程已结束,退出代码为 0

(2)管道

进程间通信(IPC)方式二:管道(不推荐使用,了解即可)

1、介绍

创建管道的类:

Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

主要方法:

conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

其他方法:

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法conn1.fileno():返回连接使用的整数文件描述符conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

2、案例

示例代码:

我们通过创建一个双向管道,通过

conn.send() # 发送对象conn.recv() # 接收对象

来实现简单的通信

from multiprocessing import Process, Pipedef f(conn): conn.send([12, {"name": "coder"}, 'hello']) # 通过连接发送对象 response = conn.recv() # 接收conn2.send(obj)发送的对象。 print("response", response) conn.close() # 关闭连接 print("q_ID2:", id(conn))if __name__ == '__main__': parent_conn, child_conn = Pipe() # 创建一个双向管道 print("q_ID1:", id(child_conn)) p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # # 接收发送的对象。 parent_conn.send("儿子你好!") p.join()

运行结果:

q_ID1: 1478665507024[12, {'name': 'coder'}, 'hello']response 儿子你好!q_ID2: 2056262961424进程已结束,退出代码为 0

(3)manage 1、介绍

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据(事实上Manager的功能远不止于此)。

他支持的类型非常多,包括:Value、Araay、list、dict、Queue、Lock等。

2、案例

示例一:

示例代码:

我们在主进程创建俩共享数据:d 和 l ,然后在子进程中修改他们

import multiprocessingdef worker(d, l): l += range(11, 16) for i in range(1, 6): key = "key{0}".format(i) val = "val{0}".format(i) d[key] = valif __name__ == "__main__": # 创建Manager对象 manager = multiprocessing.Manager() # 创建共享字典 d = manager.dict() # 创建共享列表 l = manager.list() p = multiprocessing.Process(target=worker, args=(d, l)) p.start() p.join() print(d) print(l)

运行结果:

{'key1': 'val1', 'key2': 'val2', 'key3': 'val3', 'key4': 'val4', 'key5': 'val5'}[11, 12, 13, 14, 15]进程已结束,退出代码为 0

示例二:

示例代码:

from multiprocessing import Process, Managerdef f(d, l, n): d[n] = n d["name"] = "alvin" l.append(n) # print("l",l)if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l, i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)

运行结果:

{0: 0, 'name': 'alvin', 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]进程已结束,退出代码为 0

5.4、进程池 1、概念

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

2、用法 (1)主要的方法: apply:从进程池里取一个进程并执行apply_async:apply的异步版本terminate:立刻关闭线程池join:主进程等待所有子进程执行完毕,必须在close或terminate之后close:等待所有进程结束后,才关闭线程池 (2)具体用法:

创建进程池的类:

​ 如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

Pool([numprocess [,initializer [, initargs]]]):创建进程池

参数介绍:

(1)numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值(2)initializer:是每个工作进程启动时要执行的可调用对象,默认为None(3)initargs:是要传给initializer的参数组

方法介绍:

主要方法:

(1)p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()(2)p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。(3)p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成(4)P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

其他方法(了解部分)

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法(1)obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。(2)obj.ready():如果调用完成,返回True(3)obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常(4)obj.wait([timeout]):等待结果变为可用。(5)obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

3、案例

示例代码:

我们在进程池中创建五个进程

现在需要执行10次函数foo()

我们利用 apply_async()不断的从进程池中取进程去执行foo()

from multiprocessing import Poolimport timedef foo(args): time.sleep(1) print(args)if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(func=foo, args=(i,)) # 注意join与close调运顺序是固定的,先close后join p.close() # 等子进程执行完毕后关闭线程池 # time.sleep(2) # p.terminate() # 立刻关闭线程池 p.join()

运行结果:

0123465978进程已结束,退出代码为 0

4、回调函数 (1)概念

百度百科说:回调函数就是一个通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,我们就说这是回调函数。回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。

通俗理解就是:把一个函数作为参数传给另一个函数,第一个函数称为回调函数。这个被传入的参数其实是函数指针,即指向一个函数的指针(地址)。

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

(2)案例

示例一:

示例代码:

from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url): print('<进程%s> get %s' % (os.getpid(), url)) respone = requests.get(url) if respone.status_code == 200: return {'url': url, 'text': respone.text}def pasrse_page(res): print('<进程%s> parse %s' % (os.getpid(), res['url'])) parse_res = 'url:<%s> size:[%s]n' % (res['url'], len(res['text'])) with open('db.txt', 'a') as f: f.write(parse_res)if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p = Pool(3) res_l = [] for url in urls: res = p.apply_async(get_page, args=(url,), callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) # 拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

运行结果:

<进程12944> get https://www.baidu.com<进程4036> get https://www.python.org<进程16732> get https://www.openstack.org<进程12944> get https://help.github.com/<进程19608> parse https://www.baidu.com<进程4036> get http://www.sina.com.cn/<进程19608> parse https://www.python.org<进程19608> parse https://help.github.com/<进程19608> parse http://www.sina.com.cn/<进程19608> parse https://www.openstack.org[{'url': 'https://www.baidu.com', 'text': 'rn