并发编程--潘登同学的python并发编程笔记
串行、并行与并发进程、线程和协程同步与异步 线程的使用
线程的创建方式join守护线程 GIL全局锁
未使用线程同步和互斥锁的情况使用线程同步死锁问题线程信号量
应用场景 Event事件对象生产者/消费者模式
缓冲区和queue对象 进程
进程的创建方式(方法模式)进程间通信
Pipe(管道)实现进程间通信Manage实现进程间通信 进程池POOL协程(重点)
协程的核心(控制流的让出和恢复)协程的优点asyncio实现协程 串行、并行与并发
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vcijuIAg-1644766841076)(./img/串行并行和并发.png)]
串行(serial):一个CPU上,按顺序完成多个任务并行(parallelism):指的是任务数小于等于cpu核数,即任务真的是一起执行的并发(concurrency):一个CPU采用时间片管理方式,交替的处理多个任务。一般是是任务数多余cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已) 进程、线程和协程
注意:协程只是一种做事的方式
进程、线程和协程的关系
线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;
调度和切换:线程上下文切换比进程上下文切换要快得多。
进程(Process):拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;进程切换需要的资源最大,效率低线程(Thread):拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)协程(coroutine):拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度;协程切换任务资源很小,效率高 同步与异步
同步和异步强调的是消息通信机制,所以异步编程异步只出现在网络通信时
同步: A调用B,等待B返回结果后,A继续执行异步: A调用B,A继续执行,不等待B返回结果;B有结果了,通知A,A再做处理。 线程的使用 线程的创建方式
Python的标准库提供了两个模块: _thread 和 threading , _thread 是低级模块, threading 是高级模块,对 _thread 进行了封装。绝大多数情况下,我们只需要使用 threading 这个高级模块。
线程的创建可以通过两种方式:
1.方法包装
from threading import Threadfrom time import sleepdef func1(name): print(f'线程{name} start') for i in range(3): print(f'线程: {name}、{i}') sleep(1) print(f'线程{name} end')if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = Thread(target=func1,args=("1",)) t2 = Thread(target=func1,args=("2",)) # 启动线程 t1.start() t2.start() print('主线程: end')
2.类包装
from threading import Threadfrom time import sleepclass MyThread(Thread): def __init__(self,name): Thread.__init__(self) self.name = name # 方法重写,这个run函数名不能改 def run(self): print(f'线程{self.name} start') for i in range(3): print(f'线程: {self.name}、{i}') sleep(1) print(f'线程{self.name} end')if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = MyThread('1') t2 = MyThread('2') # 启动线程 t1.start() t2.start() print('主线程: end')
线程的执行统一通过 start() 方法
join之前的代码中,我们会发现: 主线程不会等待子线程的结束;我们可以通过join方法,让主线程等待子线程的结束;
from threading import Threadfrom time import sleepdef func1(name): print(f'线程{name} start') for i in range(3): print(f'线程: {name}、{i}') sleep(1) print(f'线程{name} end')if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = Thread(target=func1,args=("1",)) t2 = Thread(target=func1,args=("2",)) # 启动线程 t1.start() t2.start() # 主线程等待子线程的结束 t1.join() t2.join() print('主线程: end')
守护线程守护线程,主要的特征是它的生命周期。主线程死亡,它也就随之死亡。在python中,线程通过 setDaemon(True|False) 来设置是否为守护线程。
守护线程的作用: 守护线程作用是为其他线程提供便利服务,守护线程最典型的应用就是 GC (垃圾收集器)。
观察如下代码:
from threading import Threadfrom time import sleepclass MyThread(Thread): def __init__(self,name): Thread.__init__(self) self.name = name # 方法重写,这个run函数名不能改 def run(self): print(f'线程{self.name} start') for i in range(3): print(f'线程: {self.name}、{i}') sleep(1) print(f'线程{self.name} end')if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = MyThread('1') t2 = MyThread('2') # 设置守护线程 t1.daemon = True # 主线程消亡,t1线程也会消亡 # 启动线程 t1.start() t2.start() print('主线程: end')
结果:
这里将t1线程设置为守护线程,按道理,应该是主线程end之后,t1线程应该不再执行了,但实际上: 由于主线程下有两个线程,主线程虽然执行完毕但是没有真正消亡,主线程在等待t2线程执行完毕并且终止t1(守护线程)的运行后才终止
那如果我们将两个线程都设置为守护线程,结果就能如我们所愿了呢?
from threading import Threadfrom time import sleepclass MyThread(Thread): def __init__(self,name): Thread.__init__(self) self.name = name # 方法重写,这个run函数名不能改 def run(self): print(f'线程{self.name} start') for i in range(3): print(f'线程: {self.name}、{i}') sleep(1) print(f'线程{self.name} end')if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = MyThread('1') t2 = MyThread('2') # 设置守护线程 t1.daemon = True # 主线程消亡,t1线程也会消亡 t2.daemon = True # 主线程消亡,t2线程也会消亡 # 启动线程 t1.start() t2.start() print('主线程: end')
结果如我们所愿
GIL全局锁Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象。 这时候,我们就需要用到“线程同步”。 线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用。
未使用线程同步和互斥锁的情况模拟场景: 老王和他老婆同时到了ATM机前面取钱(不同地点),账户中只有100元,而两人都想取80元,如果线程不同步,那会出现什么情况? 写个模拟程序看看
from threading import Threadfrom time import sleepclass Account: def __init__(self, money, name): self.money = money self.name = name# 模拟提款动作class Drawing(Thread): def __init__(self,drawingNum,account): Thread.__init__(self) self.drawingNum = drawingNum self.account = account self.expenseTotal = 0 def run(self): # 如果想屁吃 if self.account.money < self.drawingNum: return sleep(1) # 判断万可以取钱,则阻塞,就是为了测试发生冲突问题 self.account.money -= self.drawingNum self.expenseTotal += self.drawingNum print(f'账户:{self.account.name}, 余额是:{self.account.money}') print(f'账户:{self.account.name}, 总共取了:{self.expenseTotal}')if __name__ == '__main__': a1 = Account(100,'老王') draw1 = Drawing(80,a1) # 定义一个取钱的线程 draw2 = Drawing(80,a1) # 再定义一个取钱的线程 draw1.start() draw2.start()
可以发现账户余额成为了负数,这就是在未使用线程同步时操作的结果…
使用线程同步我们可以通过“锁机制”来实现线程同步问题,锁机制有如下几个要点:
必须使用同一个锁对象互斥锁的作用就是保证同一时刻只能有一个线程去操作共享数据,保证共享数据不会出现错误问题使用互斥锁的好处确保某段关键代码只能由一个线程从头到尾完整地去执行使用互斥锁会影响代码的执行效率同时持有多把锁,容易出现死锁的情况
互斥锁
对共享数据进行锁定,保证同一时刻只能有一个线程去操作。注意: 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁。
threading 模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁。
from threading import Thread, Lockfrom time import sleepclass Account: def __init__(self, money, name): self.money = money self.name = name# 模拟提款动作class Drawing(Thread): def __init__(self,drawingNum,account): Thread.__init__(self) self.drawingNum = drawingNum self.account = account self.expenseTotal = 0 def run(self): lock1.acquire() # 拿锁 # 如果想屁吃 if self.account.money < self.drawingNum: print('账户余额不足') return sleep(1) # 判断万可以取钱,则阻塞,就是为了测试发生冲突问题 self.account.money -= self.drawingNum self.expenseTotal += self.drawingNum lock1.release() print(f'账户:{self.account.name}, 余额是:{self.account.money}') print(f'账户:{self.account.name}, 总共取了:{self.expenseTotal}')if __name__ == '__main__': a1 = Account(100,'老王') lock1 = Lock() draw1 = Drawing(80,a1) # 定义一个取钱的线程 draw2 = Drawing(80,a1) # 再定义一个取钱的线程 draw1.start() draw2.start()
死锁问题在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的。
举例:有两个人都要做饭,都需要“锅”和“菜刀”才能炒菜。
from threading import Thread, Lockfrom time import sleepdef fun1(): lock1.acquire() print('fun1拿到菜刀') sleep(2) lock2.acquire() print('fun1拿到锅') lock2.release() print('fun1释放锅') lock1.release() print('fun1释放菜刀')def fun2(): lock2.acquire() print('fun2拿到锅') lock1.acquire() print('fun2拿到菜刀') lock1.release() print('fun2释放菜刀') lock2.release() print('fun2释放锅')if __name__ == '__main__': lock1 = Lock() lock2 = Lock() t1 = Thread(target=fun1) t2 = Thread(target=fun2) t1.start() t2.start()
死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题,思路很简单,就是:同一个代码块,不要同时持有两个对象锁。
线程信号量互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问?这时候,可以使用信号量。信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过。
应用场景在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。在做爬虫抓取数据时。
# 一个房间,只允许两个人进入from threading import Semaphore,Threadfrom time import sleepdef home(name,se): se.acquire() print(f'{name}进入房间') sleep(2) print(f'***{name}走出房间') se.release()if __name__ == '__main__': se = Semaphore(2) # 信号量对象 for i in range(7): t = Thread(target = home,args=(f'tom {i}', se)) t.start()
Event事件对象原理: Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行
Event() 可以创建一个事件管理标志,该标志(event)默认为False, event对象主要有四种方法可以调用:
下面我们用程序模拟下面这幅图:
import threadingimport timedef chihuoguo(name): print(f'{name}已经启动') print(f'小伙伴{name}已经进入就餐状态') time.sleep(1) event.wait() print(f'{name}收到通知了') print(f'小伙伴{name}开始吃咯!')if __name__ == '__main__': event = threading.Event() thread1 = threading.Thread(target=chihuoguo,agrs=('tom',)) thread2 = threading.Thread(target=chihuoguo,agrs=('cherry',)) # 开启线程 thread1.start() thread2.start() # 等待event对象解开 for i in range(10): time.sleep(1) print(">"*(i+1) + '-' * (9-i)) print('--->>> 主线程通知小伙伴开吃咯') event.set()
生产者/消费者模式多线程环境下,我们经常需要多个线程的并发和协作。这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式”。
生产者: 生产者指的是负责生产数据的模块(这里模块可能是:方法、对象、线程、进程)消费者: 消费者指的是负责处理数据的模块(这里模块可能是:方法、对象、线程、进程)缓冲区: 消费者不能直接使用生产者的数据,它们之间有个“缓冲区”。生产者将生产好的数据放入“缓冲区”,消费者从“缓冲区”拿要处理的数据。
缓冲区是实现并发的核心,缓冲区的设置有3个好处
实现线程的并发协作解耦了生产者和消费者(而是通过中间件…)解决忙闲不均,提高效率 缓冲区和queue对象
从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。
from queue import Queuefrom time import sleepimport randomfrom threading import Threaddef producer(): num = 1 while True: if queue.qsize() < 5: print(f'生产{num}号,大馒头') queue.put(f"大馒头:{num}号") num += 1 sleep(random.randint(1,4)) else: print('馒头筐慢了,等待人来取') sleep(1)def consumer(): while True: if queue.qsize() > 0: print(f'获取馒头:{queue.get()}') sleep(random.randint(1,5)) else: print('快点生产我要饿死啦...') sleep(1)if __name__ == '__main__': queue = Queue() t1 = Thread(target=producer) t2 = Thread(target=consumer) t1.start() t2.start()
进程进程的优点:
可以使用计算机多核,进行任务的并行执行,提高执行效率运行不受其他进程影响,创建方便空间独立,数据安全 进程的创建方式(方法模式)
1.方法包装2.类包装
创建进程后,使用start()启动进程
from multiprocessing import Processimport osfrom time import sleepdef fun(name): print(f'当前进程ID:{os.getpid()}') print(f'父进程ID:{os.getppid()}') print(f'Process: {name}, start') sleep(3) print(f'Process:{name} end')# 类方法创建class MyProcess(Process): def __init__(self,name): Process.__init__(self) self.name = name def run(self): print(f'当前进程ID:{os.getpid()}') print(f'父进程ID:{os.getppid()}') print(f'Process: {self.name}, start') sleep(3) print(f'Process:{self.name} end')# windows上多进程实现的bug,如果不加main的限制,就会无限递归创建进程,if __name__ == '__main__': print("当前进程ID:",os.getpid()) p1 = Process(target=fun,args=('p1',)) p2 = Process(target=fun,args=('p2',)) p1.start() p2.start() # p1 = MyProcess('p1') # p2 = MyProcess('p2') # p1.start() # p2.start()
进程间通信值得注意的是: 进程间通信要给每个进程传入数据,虽然明面上看来这个数据是一个全局变量,但是每个进程运行起来的时候是相互独立,不共享数据的…
from multiprocessing import Process,Queuefrom time import sleepclass MyProcess(Process): def __init__(self,name,mq): Process.__init__(self) self.name = name self.mq = mq def run(self): print(f'Process: {self.name}, start') temp = self.mq.get() print(f'get Date:{temp}') sleep(2) print(f'put data:{temp}' + '1') self.mq.put(temp+'1') print(f'Process:{self.name} end')if __name__ == '__main__': mq = Queue() mq.put('1') mq.put('2') mq.put('3') # 进程列表 p_list = [] for i in range(3): p1 = MyProcess(f'p{i}',mq) p_list.append(p1) p1.start() p1.join() # 让主进程等着 for i in range(3): print(mq.get())
Pipe(管道)实现进程间通信Pipe方法返回(conn1, conn2)代表一个管道的两个端。
Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个参数是全双工模式,也就是说conn1和conn2均可收发。若duplex为False,conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
import multiprocessingfrom time import sleepdef func1(conn1): sub_info = "Hello!" print(f'进程1--{multiprocessing.current_process().pid}发送数据: {sub_info}') sleep(1) conn1.send(sub_info) print(f'来自进程2:{conn1.recv()}') sleep(1)def func2(conn2): sub_info = "你好!" print(f'进程2--{multiprocessing.current_process().pid}发送数据: {sub_info}') sleep(1) conn2.send(sub_info) print(f'来自进程1:{conn2.recv()}') sleep(1)if __name__ == '__main__': conn1,conn2 = multiprocessing.Pipe() process1 = multiprocessing.Process(target=func1, args=(conn1,)) process2 = multiprocessing.Process(target=func2, args=(conn2,)) # 启动子进程 process1.start() process2.start()
Manage实现进程间通信from multiprocessing import Process,Managerdef func(name,m_list,m_dict): m_dict['age'] = 19 m_list.append('我是大帅哥!!')if __name__ == '__main__': # Manager与multiprocessing.Queue类似,也类似全局变量一样的通信方式 # 这里虽然只写了一个进程,写两个也是一样的,能通信即可... with Manager() as mgr: m_list = mgr.list() m_dict = mgr.dict() m_list.append('我是PD!!!') p1 = Process(target=func, args=('p1',m_list,m_dict)) p1.start() p1.join() print(m_dict) print(m_dict)
进程池POOL进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。
使用进程池的优点
提高效率,节省开辟进程和开辟内存空间的时间及销毁进程的时间节省内存空间 类/方法 功能 参数 Pool(processes)创建进程池对象processes表示进程池中有多少进程pool.apply_async(func,args,kwds)异步执行;将事件放入到进程池队列func 事件函数 args 以元组形式给func传参 kwds 以字典形式给func传参 返回值:返回一个代表进程池事件的对象,通过返回值的get方法可以得到事件函数的返回值pool.apply(func,args,kwds)同步执行;将事件放入到进程池队列func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参pool.close()关闭进程池pool.join()回收进程池pool.map(func,iter)类似于python的map函数,将要做的事件放入进程池func 要执行的函数 iter 迭代对象
from multiprocessing import Poolimport osfrom time import sleepdef func(name): print(f'当前进程ID:{os.getpid()},{name}') sleep(2) return namedef func2(args): print(f'callback:{args}')if __name__ == '__main__': pool = Pool(5) pool.apply_async(func=func,args=('pd',),callback=func2) pool.apply_async(func=func,args=('pdd',),callback=func2) pool.apply_async(func=func,args=('cpdd',),callback=func2) pool.apply_async(func=func,args=('你好pd',)) pool.apply_async(func=func,args=('你好pdd',)) pool.apply_async(func=func,args=('你好cpdd',)) pool.apply_async(func=func,args=('再见pd',)) pool.apply_async(func=func,args=('再见pdd',)) pool.apply_async(func=func,args=('再见cpdd',)) pool.close() # 如果用with就不需要关闭了 pool.join()
函数式编程写法:
from multiprocessing import Poolimport osfrom time import sleepdef func1(name): print(f'当前进程的ID: {os.getpid()},{name}') sleep(2) return nameif __name__ == '__main__': with Pool(5) as pool: args = pool.map(func1,('pd','pdd','cpdd','你好pd','你好pdd', '你好cpdd','再见pd','再见pdd','再见cpdd')) for a in args: print(a)
协程(重点)协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。
当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。
协程的核心(控制流的让出和恢复)每个协程有自己的执行栈,可以保存自己的执行现场可以由用户程序按需创建协程(比如:遇到io操作)协程“主动让出(yield)”执行权时候,会保存执行现场(保存中断时的寄存器上下文和栈),然后切换到其他协程协程恢复执行(resume)时,根据之前保存的执行现场恢复到中断前的状态,继续执行,这样就通过协程实现了轻量的由用户态调度的多任务模型 协程的优点
由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;无需原子操作的锁定及同步的开销;方便切换控制流,简化编程模型单线程内就可以实现并发的效果,最大限度地利用cpu,且可扩展性高,成本低(注:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理)
注: asyncio协程是写爬虫比较好的方式。比多线程和多进程都好.开辟新的线程和进程是非常耗时的。
协程的缺点
无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。 asyncio实现协程
正常的函数执行时是不会中断的,所以你要写一个能够中断的函数,就需要加 asyncasync 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是 sleep(5) )消失后,也就是5秒到了再回来执行await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。asyncio 是python3.5之后的协程模块,是python实现并发重要的包,这个包使用事件循环驱动实现并发。
import timeimport asyncioasync def func1(): for i in range(1,4): print(f'pd:第{i}次呼叫指挥部!!') await asyncio.sleep(1) return 'pd呼叫完毕...请回答'async def func2(): for k in range(1,4): print(f'指挥部:第{k}次呼叫PD!') await asyncio.sleep(1) return '指挥部呼叫完毕...请回答'async def main(): res = await asyncio.gather(func1(), func2()) # await异步执行func1方法 # gather会交替执行func1() 和 func2() # 返回值为函数的返回值列表 print(res)if __name__ == '__main__': start = time.time() asyncio.run(main()) end = time.time() print(f'运行时间:{end-start}')
回来执行
await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。asyncio 是python3.5之后的协程模块,是python实现并发重要的包,这个包使用事件循环驱动实现并发。
import timeimport asyncioasync def func1(): for i in range(1,4): print(f'pd:第{i}次呼叫指挥部!!') await asyncio.sleep(1) return 'pd呼叫完毕...请回答'async def func2(): for k in range(1,4): print(f'指挥部:第{k}次呼叫PD!') await asyncio.sleep(1) return '指挥部呼叫完毕...请回答'async def main(): res = await asyncio.gather(func1(), func2()) # await异步执行func1方法 # gather会交替执行func1() 和 func2() # 返回值为函数的返回值列表 print(res)if __name__ == '__main__': start = time.time() asyncio.run(main()) end = time.time() print(f'运行时间:{end-start}')