Python进程间通信multiProcessingQueue队列实现详解⼀、进程间通信
IPC(Inter-Process Communication)
IPC机制:实现进程之间通讯
管道:pipe 基于共享的内存空间
队列:pipe+锁的概念--->queue
⼆、队列(Queue)
2.1 概念-----multiProcess.Queue
创建共享的进程队列,Queue是多进程安全的队列,可以使⽤Queue实现多进程之间的数据传递。
Queue([maxsize])创建共享的进程队列。
参数:maxsize是队列中允许的最⼤项数。如果省略此参数,则⽆⼤⼩限制。
底层队列使⽤管道和锁定实现。
2.2 Queue⽅法使⽤
2.2.的使⽤:
是从队列⾥⾯取值并且把队列⾯的取出来的值删掉,没有参数的情况下就是是默认⼀直等着取值
就算是队列⾥⾯没有可取的值的时候,程序也不会结束,就会卡在哪⾥,⼀直等着
from multiprocessing import Queue
q = Queue() # ⽣成⼀个队列对象
# put⽅法是往队列⾥⾯放值
q.put('Cecilia陈')
q.put('xuchen')
q.put('喜陈')
# get⽅法是从队列⾥⾯取值
())
())
())
q.put(5)
q.put(6)
())
Cecilia陈
xuchen
喜陈
5
2.2.2 Queue(参数) +参数的使⽤:
Queue加参数以后,参数是数值
参数实⼏就表⽰实例化的这个Queue队列可以放⼏个值
当队列已经满的时候,再放值,程序会阻塞,但不会结束
from multiprocessing import Queue
q = Queue(3)
q.put('Cecilia陈')
q.put('xuchen')
q.put('喜陈')
print(q.full()) # 判断队列是否满了返回的是True/False
q.put(2) # 当队列已经满的时候,再放值,程序会阻塞,但不会结束
True 队列已经满了
2.2.3 q.put(参数1,参数2,参数3,参数4):
q.put(self, obj, block=True, timeout=None)
self :put就相当于是Queue⾥的⼀个⽅法,这个时候q.put就相当于是队列对象q来调⽤对象的绑定⽅法,这个参数可以省略即可
obj:是我们需要往队列⾥⾯放的值
block=True :队列如果满了的话,再往队列⾥放值的话会等待,程序不会结束
timeout=None:是再block这个参数的基础上的,当block的值为真的时候,timeout是⽤来等待多少秒,如果再这个时间⾥,队列⼀直是满的,那么程序就会报错并结束(Queue.Full异常)
from multiprocessing import Queue
q = Queue(3)
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=2)
q.put('zhao',block=True,timeout=5) # 此时程序将对等待5秒以后报错了
2.2.(参数1,参数2,参数3,参数4):
<(self,block=True, timeout=None)
self :get就相当于是Queue⾥的⼀个⽅法,这个时候q.get就相当于是队列对象q来调⽤对象的绑定⽅法,这个参数可以省略即可
block=True :从队列q对象⾥⾯取值,如果娶不到值的话,程序不会结束进程间通信实验总结
timeout=None:是再block这个参数的基础上的,当block的值为真的时候,timeout是⽤来等待多少秒,如果再这个时间⾥,get 取不到队列⾥⾯的值的话,那么程序就会报错并结束(queue.Empty异常)
from multiprocessing import Queue
q = Queue()
q.put('Cecilia陈')
())
<(block=True,timeout=2) # 此时程序会等待2秒后,报错了,队列⾥⾯没有值了
2.2.5 block=False:
如果block的值是False的话,那么put⽅法再队列是满的情况下,不会等待阻塞,程序直接报错(Queue.Full异常)结束
如果block的值是False的话,那么get⽅法再队列⾥⾯没有值的情况下,再去取的时候,不会等待阻塞,程序直接报错(queue.Empty异常)结束
1.put()的block=False
from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陈')
q.put('喜陈')
print(q.full())
q.put('xichen',block=False) # 队列已经满了,我不等待了,直接报错
<()的block=Flase
from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陈')
q.put('喜陈')
())
())
(block=False)) # 队列已经没有值了,我不等待了,直接报错
2.2.6 put_nowait()/get_nowait()
1.put_nowait() 相当于bolok=False,队列满的时候,再放值的时候,程序不等待,不阻塞,直接报错
from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陈')
q.put('喜陈')
print(q.full())
q.put_nowait('xichen') # 程序不等待,不阻塞,直接报错
<_nowait() 相当于bolok=False,当队列⾥没有值的时候,再取值的时候,程序不等待,不阻塞,程序直接报错from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陈')
q.put('喜陈')
())
())
print(q.full())
<_nowait()# 再取值的时候,程序不等待,不阻塞,程序直接报错
三、代码实例
3.1 单看队列的存取数据⽤法
这个例⼦还没有加⼊进程通信,只是先来看看队列为我们提供的⽅法,以及这些⽅法的使⽤和现象。
'''
multiprocessing模块⽀持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接⼝
'''
from multiprocessing import Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)  # 如果队列已经满了,程序就会停在这⾥,等待数据被别⼈取⾛,再将数据放⼊队列。
# 如果队列中的数据⼀直不被取⾛,程序就会永远停在这⾥。
try:
q.put_nowait(3) # 可以使⽤put_nowait,如果队列满了不会阻塞,但是会因为队列满了⽽报错。
except: # 因此我们可以⽤⼀个try语句来处理这个错误。这样程序不会⼀直阻塞下去,但是会丢掉这个消息。
print('队列已经满了')
# 因此,我们再放⼊数据之前,可以先看⼀下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了
())
())
())
# ()) # 同put⽅法⼀样,如果队列已经空了,那么继续取就会出现阻塞。
try:
<_nowait(3) # 可以使⽤get_nowait,如果队列满了不会阻塞,但是会因为没取到值⽽报错。
except: # 因此我们可以⽤⼀个try语句来处理这个错误。这样程序不会⼀直阻塞下去。
print('队列已经空了')
pty()) #空了
3.2 ⼦进程向⽗进程发送数据
这是⼀个queue的简单应⽤,使⽤队列q对象调⽤get函数来取得队列中最先进⼊的数据。
from multiprocessing import Process, Queue
def f(q,name,age):
q.put(name,age) #调⽤主函数中p进程传递过来的进程参数 put函数为向队列中添加⼀条数据。
if __name__ == '__main__':
q = Queue() #创建⼀个Queue对象
p = Process(target=f, args=(q,'Cecilia陈',18)) #创建⼀个进程
p.start()
())
p.join()
['Cecilia陈', 18]
四、⽣产者消费者模型
⽣产者: ⽣产数据的任务
消费者: 处理数据的任务
⽣产者--队列(盆)-->消费者
⽣产者可以不停的⽣产,达到了⾃⼰最⼤的⽣产效率,消费者可以不停的消费,也达到了⾃⼰最⼤的消费效率.
⽣产者消费者模型⼤⼤提⾼了⽣产者⽣产的效率和消费者消费的效率.
补充: queue不适合传⼤⽂件,通产传⼀些消息.
在并发编程中使⽤⽣产者和消费者模式能够解决绝⼤多数并发问题。该模式通过平衡⽣产线程和消费线程的⼯作能⼒来提⾼程序的整体处理数据的速度。
4.1 为什么要使⽤⽣产者和消费者模型
在线程世界⾥,⽣产者就是⽣产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果⽣产者处理速度很快,⽽消费者处理速度很慢,那么⽣产者就必须等待消费者处理完,才能继续⽣产数据。同样的道理,如果消费者的处理能⼒⼤于⽣产者,那么消费者就必须等待⽣产者。为了解决这个问题于是引⼊了⽣产者和消费者模式。
4.2 什么是⽣产者消费者模型
⽣产者消费者模式是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。
4.3 基于Queue队列实现的⽣产者消费者模型
from multiprocessing import Queue,Process
# ⽣产者
def producer(q,name,food):
for i in range(3):
print(f'{name}⽣产了{food}{i}')
res = f'{food}{i}'
q.put(res)
# 消费者
def consumer(q,name):
while True:
res = q.get(timeout=5)
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue() # 为的是让⽣产者和消费者使⽤同⼀个队列,使⽤同⼀个队列进⾏通讯
p1 = Process(target=producer,args=(q,'Cecilia陈','巧克⼒'))
c1 = Process(target=consumer,args=(q,'Tom'))
p1.start()
c1.start()
此时的问题是主进程永远不会结束,原因是:⽣产者p在⽣产完后就结束了,但是消费者c在取空了q之后,则⼀直处于死循环中且卡在q.get()这⼀步。
解决⽅式⽆⾮是让⽣产者在⽣产完毕后,往队列中再发⼀个结束信号,这样消费者在接收到结束信号后就可以break出死循环。
4.4 改良版----⽣产者消费者模型
注意:结束信号None,不⼀定要由⽣产者发,主进程⾥同样可以发,但主进程需要等⽣产者结束后才应该发送该信号
from multiprocessing import Queue,Process
def producer(q,name,food):
for i in range(3):
print(f'{name}⽣产了{food}{i}')
res = f'{food}{i}'
q.put(res)
q.put(None) # 当⽣产者结束⽣产的的时候,我们再队列的最后再做⼀个表⽰,告诉消费者,⽣产者已经不⽣产了,让消费者不要再去队列⾥拿东西了def consumer(q,name):
while True:
res = q.get(timeout=5)
if res == None:break # 判断队列拿出的是不是⽣产者放的结束⽣产的标识,如果是则不取,直接退出,结束程序
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue() # 为的是让⽣产者和消费者使⽤同⼀个队列,使⽤同⼀个队列进⾏通讯
p1 = Process(target=producer,args=(q,'Cecilia陈','巧克⼒'))
c1 = Process(target=consumer,args=(q,'Tom'))
p1.start()
c1.start()
4.5 主进程在⽣产者⽣产结束以后,发送结束信号
使⽤这个⽅法的话,是很low的,有⼏个消费者就要在主进程中向队列中put⼏个结束信号
from multiprocessing import Queue,Process
import time,random
def producer(q,name,food):
for i in range(3):
print(f'{name}⽣产了{food}{i}')
time.sleep((random.randint(1,3)))
res = f'{food}{i}'
q.put(res)
# q.put(None) # 当⽣产者结束⽣产的的时候,我们再队列的最后再做⼀个表⽰,告诉消费者,⽣产者已经不⽣产了,让消费者不要再去队列⾥拿东西了def consumer(q,name):
while True:
res = q.get(timeout=5)
if res == None:break # 判断队列拿出的是不是⽣产者放的结束⽣产的标识,如果是则不取,直接退出,结束程序
time.sleep((random.randint(1, 3)))
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue() # 为的是让⽣产者和消费者使⽤同⼀个队列,使⽤同⼀个队列进⾏通讯
# 多个⽣产者进程
p1 = Process(target=producer,args=(q,'Cecilia陈','巧克⼒'))
p2 = Process(target=producer,args=(q,'xichen','冰激凌'))
p3 = Process(target=producer,args=(q,'喜陈','可乐'))
# 多个消费者进程
c1 = Process(target=consumer,args=(q,'Tom'))
c2 = Process(target=consumer,args=(q,'jack'))
# 告诉操作系统启动⽣产者进程
p1.start()
p2.start()
p3.start()
# 告诉操作系统启动消费者进程
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None) # ⼏个消费者put⼏次
q.put(None)
五、JoinableQueue⽅法
创建可连接的共享进程队列。这就像是⼀个Queue对象,但队列允许项⽬的使⽤者通知⽣产者项⽬已经被成功处理。通知进程是使⽤共享的信号和条件变量来实现的。
5.1 ⽅法介绍
JoinableQueue的实例p除了与Queue对象相同的⽅法之外,还具有以下⽅法:
q.task_done():使⽤者使⽤此⽅法发出信号,表⽰q.get()返回的项⽬已经被处理。如果调⽤此⽅法的次数⼤于从队列中删除的项⽬数量,将引发ValueError异常。
q.join():⽣产者将使⽤此⽅法进⾏阻塞,直到队列中所有项⽬均被处理。阻塞将持续到为队列中的每个项⽬均调⽤
q.task_done()⽅法为⽌。
5.2 joinableQueue队列实现⽣产者消费者模型
from multiprocessing import Queue,Process,JoinableQueue
import time,random

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。