在Python中进程间共享数据比较常用的方法就是多进程队列multiprocessesing.queues.Queue
,官方文档在这里是这么说的:
The Queue class is a near clone of Queue.Queue; Queues are thread and process safe.
然而现实永远不可能这么简单地美好起来,没有坑的代码那就不能叫代码了,这里面有两个地方需要注意一下。
奇怪的Exception
首先多进程队列和普通队列有一点不同,即其最大的大小是32767, 所以get_nowait(or get(False))
和 put_nowait(or put(False))
,分别有可能因为队列为空和队列为满而抛出 Queue.Empty
和 Queue.Full
,注意这里的Queue是普通的队列而不是多进程队列,非常不明白为什么在多进程队列里面为什么要用普通队列的异常,也就是说如果要在多进程队列里面捕捉异常的话就得这么写:
1 2 3 4 5 6 7 8 9 10 |
import multiprocessing as mp import Queue task_queue = mp.Queue def worker(task_queue): try: task_queue.get_nowait() except Queue.Empty: pass |
实在是觉得这么设计很奇怪。
坑爹的Queue
上面的那个设计只是看起来很奇怪,接下来这个就可以说是真坑了,在有很多数据被push到多进程队列的时候,队列内部会启用一个buffer,这样做肯定是没有什么问题,合情合理天经地义,然而接下来的设计就很DT了,一旦启用了这个buffer过后,除非buffer被清空,否则多进程队列所在的子进程就无法join,下面这段代码看起来没有什么问题的代码就会死锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
import multiprocessing as mp import Queue import sys def worker(in_q, out_q, ): while True: try: num = in_q.get_nowait() except Queue.Empty: break num = num + 1 out_q.put_nowait(num) print "DONE" if __name__ == '__main__': in_q = mp.Queue() out_q = mp.Queue() for i in range(10000): in_q.put(i) jobs = [] for i in range(4): p = mp.Process(target=worker, args=(in_q, out_q, )) jobs.append(p) for each in jobs: each.start() for each in jobs: each.join() print "JOIN" |
输出结果为:
1 2 3 4 5 6 |
DONE JOIN DONE DONE DONE ...... |
如果把range(10000)改成range(1000)就会正常输出4个DONE和4个JOIN,因为这里1000个item并没有启用buffer,我觉得要么就干脆设置Queue
不为空就不让join,实在是看不懂为什么得启用了buffer才设置join锁。
References
http://stackoverflow.com/questions/26025486/python-processes-not-joining
https://docs.python.org/2/library/multiprocessing.html#pipes-and-queues