如何在Pool中使用Queue,Stack Overflow的回答,戳
其实吧看一遍应该就大部分都懂了。
需要注意的是:在使用多进程的时候,我们的进程函数的传入参数必须是pickle-able的,也就是参数必须可以被pickle保存下来,multiprocessing.Queue对象不能传递给pool.apply_*()等函数,需要使用multiprocessing.Manager().Queue()产生的对象
贴一下代码
# -*- coding: UTF-8 -*-from multiprocessing import Process, Pool, Queue, Manager, JoinableQueueimport timeimport osres = []def put_task(): msg = [] for i in xrange(50): time.sleep(0.1) msg.append(str(os.getpid())) return ','.join(msg)def collect_results(result): res.append(result)def take_task(queue): while 1: print(queue.get(True))def task_put(name, que): for i in range(10): time.sleep(1) que.put("%d is done" % name)def task_take_queue(que, n): i = 0 while i < n: print(que.get(True)) i += 1def consumer(input_q): while True: item = input_q.get(True) # 处理项目 print item # 此处替换为有用的工作 # 发出信号通知任务完成 input_q.task_done()def producer(output_q): sequence = [1, 2, 3, 4] # range(5)[1:5] for item in sequence: # 将项目放入队列 time.sleep(1) output_q.put(item) # 建立进程def method_1(): pool = Pool() res = pool.map_async(put_task, range(5)) pool.close() pool.join() from pprint import pprint pprint(res.get())def method_2(): pool = Pool() pool.apply_async(put_task, callback=collect_results) pool.apply_async(put_task, callback=collect_results) pool.apply_async(put_task, callback=collect_results) pool.close() pool.join() from pprint import pprint pprint(res)def method_3(): pool = Pool(processes=10) m = Manager() q = m.Queue() for i in range(5): pool.apply_async(task_put, (i, q)) pool.apply_async(task_take_queue, (q, 50)) pool.close() pool.join()def method_4(): q = JoinableQueue() # 运行使用者进程 cons_p = Process(target=consumer, args=(q,)) cons_p.daemon = True # 定义该进程为后台运行 True - When a process exits, it attempts to terminate all of its daemonic child processes. cons_p.start() # 生产项目,sequence代表要发送给使用者的项目序列 # 在时间中,这可能是生成器的输出或通过一些其他方式生产出来 producer(q) # 等待所有项目被处理 q.join()if __name__ == '__main__': method_4()
1 import multiprocessing 2 import os 3 import time 4 5 6 def pool_init(q): 7 global queue # make queue global in workers 8 queue = q 9 10 11 def task():12 # can use `queue` here if you like13 for i in range(5):14 time.sleep(1)15 queue.put(os.getpid())16 17 18 def take_task():19 while 1:20 print(queue.get(True))21 22 23 def run(pool):24 tasks = []25 tasks.append(pool.apply_async(take_task))26 for i in range(os.cpu_count()):27 tasks.append(pool.apply_async(task))28 for t in tasks:29 print(t.get(), )30 31 32 if __name__ == '__main__':33 queue = multiprocessing.Queue()34 pool = multiprocessing.Pool(initializer=pool_init, initargs=(queue,))35 run(pool)36 pool.close()37 pool.join()