进程间通信
IPC(Inter-Process Communication)
1、进程之间数据是隔离的,代码示例如下:
from multiprocessing import Process
def task():
global n
n = 100
print("子进程中:", n)
if __name__ == '__main__':
p = Process(target=task, )
p.start()
n = 10
print("主进程中:", n)
代码运行结果如下:
主进程中: 10
子进程中: 100
实现进程间的通信,用到了队列
2、什么是队列
队列就是先进去的先出来
我们用到的就是multiprocessing中的Queue
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
底层队列使用管道和锁定实现。
3、Queue方法介绍
Queue(maxsize) #maxsize是队列中允许的最大项数,如果不传参数maxsize则无大小限制
(类Queue中有maxsize=0,if maxsize<=0 则maxsize就没有限制)
Queue的实例q具有以下方法:
1.q.get( [ block [ ,timeout ] ] ):返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,
默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。
如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
2.q.get_nowait() :同q.get(False)方法。
3.q.put(item [, block [,timeout ] ] ) :将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,
默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。
timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
4.q.qsize() :返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,
队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
5.q.empty() :如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,
在返回和使用结果之间,队列中可能已经加入新的项目。
6.q.full() :如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)
4、解决进程间数据隔离的方法,代码示例如下:
from multiprocessing import Queue, Process
import os, time
def task(queue):
print("这个进程id:%s开始放数据了" % os.getpid())
time.sleep(2)
queue.put('ly is dsb')
print("这个进程id:%s数据放完了" % os.getpid())
if __name__ == '__main__':
q = Queue(3)
p = Process(target=task, args=(q,))
p.start()
print("主进程")
res = q.get()
print("主进程中取值:",res)
5、多进程间通信
from multiprocessing import Queue, Process
import os, time
def get_task(queue):
print("%s:%s" % (os.getpid(), queue.get()))
def put_task(queue):
queue.put("%s开始写数据了" % os.getpid())
if __name__ == '__main__':
q = Queue(3)
p = Process(target=put_task, args=(q,))
p.start()
p1 = Process(target=put_task, args=(q,))
p1.start()
p2 = Process(target=get_task, args=(q,))
p2.start()
p3 = Process(target=get_task, args=(q,))
p3.start()
6、生产者和消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
1.为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,
那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这个问题于是引入了生产者和消费者模式。
2.什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,
而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,
而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
3.基于队列实现生产者消费者模型
版本1:等生产者生产的包子被消费者吃完,就堵塞到那,而且生产和吃是无序的
def producer(queue):
# 把数据全部放在Queue
for i in range(10):
data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i)
print(data)
time.sleep(random.randint(0,1))
# 放入数据
queue.put("第%s个包子" % i)
def consumer(queue):
while True:
res = queue.get()
data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
print(data)
if __name__ == '__main__':
q = Queue(3)
p = Process(target=producer, args=(q, ))
p.start()
p1 = Process(target=consumer, args=(q,))
p1.start()
版本2:生产者生产的包子被消费者吃完,等消费者拿到None时就结束,但生产和吃还是无序的
def producer(queue):
# 把数据全部放在Queue
for i in range(10):
data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i)
print(data)
time.sleep(random.randint(0,1))
# 放入数据
queue.put("第%s个包子" % i)
queue.put(None)
def consumer(queue):
while True:
res = queue.get()
if not res:break
data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
print(data)
if __name__ == '__main__':
q = Queue(3)
p = Process(target=producer, args=(q,))
p.start()
p1 = Process(target=consumer, args=(q,))
p1.start()
版本3:生产者生产的包子被消费者吃完,等消费者拿到None时就结束,而且生产和吃是有序的
def producer(queue):
# 把数据全部放在Queue
for i in range(10):
data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i)
print(data)
time.sleep(random.randint(0,1))
# 放入数据
queue.put("第%s个包子" % i)
def consumer(queue):
while True:
res = queue.get()
if not res:break
data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
print(data)
if __name__ == '__main__':
q = Queue(3)
p = Process(target=producer, args=(q,))
p.start()
p1 = Process(target=consumer, args=(q,))
p1.start()
# time.sleep(1000)
# none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了
p.join()
q.put(None)
版本4:生产者大于消费者
def producer(queue, food):
# 把数据全部放在Queue
for i in range(10):
data = "这个进程id:%s, 生产了%s个%s" % (os.getpid(), i, food)
print(data)
time.sleep(random.randint(1, 3))
# 放入数据
queue.put("第%s个%s" % (i, food))
def consumer(queue):
while True:
res = queue.get()
if not res:break
data = "这个进程id:%s, 吃了%s" % (os.getpid(), res)
print(data)
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer, args=(q, '面包'))
p2 = Process(target=producer, args=(q, '奶粉'))
p3 = Process(target=producer, args=(q, '冰淇淋'))
p1.start()
p2.start()
p3.start()
p4 = Process(target=consumer, args=(q,))
p5 = Process(target=consumer, args=(q,))
p4.start()
p5.start()
# time.sleep(1000)
# none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了
# p.join()
# q.put(None)
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
q.put(None)
版本5:消费者大于生产者
def producer(queue, food):
# 把数据全部放在Queue
for i in range(10):
data = "这个进程id:%s, 生产了%s个%s" % (os.getpid(), i, food)
print(data)
time.sleep(random.randint(1, 3))
# 放入数据
queue.put("第%s个%s" % (i, food))
def consumer(queue, name):
while True:
try:
res = queue.get(timeout=5)
if not res:break
data = "这个消费者:%s, 吃了%s" % (name, res)
print(data)
except Exception as e:
print(e)
break
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer, args=(q, '面包'))
p2 = Process(target=producer, args=(q, '冰淇淋'))
p1.start()
p2.start()
p3 = Process(target=consumer, args=(q, '许鹏'))
p4 = Process(target=consumer, args=(q, '勇哥'))
p5 = Process(target=consumer, args=(q, '勇哥2'))
p3.start()
p4.start()
p5.start()
# time.sleep(1000)
# none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了
# p.join()
# q.put(None)
p1.join()
p2.join()
q.put(None)
q.put(None)
q.put(None)