博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python multiprocessing 使用
阅读量:6588 次
发布时间:2019-06-24

本文共 3008 字,大约阅读时间需要 10 分钟。

如何在Pool中使用Queue,Stack Overflow的回答,戳

其实吧看一遍应该就大部分都懂了。

需要注意的是:在使用多进程的时候,我们的进程函数的传入参数必须是pickle-able的,也就是参数必须可以被pickle保存下来,multiprocessing.Queue对象不能传递给pool.apply_*()等函数,需要使用multiprocessing.Manager().Queue()产生的对象

贴一下代码

# -*- coding: UTF-8 -*-from multiprocessing import Process, Pool, Queue, Manager, JoinableQueueimport timeimport osres = []def put_task():    msg = []    for i in xrange(50):        time.sleep(0.1)        msg.append(str(os.getpid()))    return ','.join(msg)def collect_results(result):    res.append(result)def take_task(queue):    while 1:        print(queue.get(True))def task_put(name, que):    for i in range(10):        time.sleep(1)        que.put("%d is done" % name)def task_take_queue(que, n):    i = 0    while i < n:        print(que.get(True))        i += 1def consumer(input_q):    while True:        item = input_q.get(True)        # 处理项目        print item  # 此处替换为有用的工作        # 发出信号通知任务完成        input_q.task_done()def producer(output_q):    sequence = [1, 2, 3, 4]  # range(5)[1:5]    for item in sequence:        # 将项目放入队列        time.sleep(1)        output_q.put(item)        # 建立进程def method_1():    pool = Pool()    res = pool.map_async(put_task, range(5))    pool.close()    pool.join()    from pprint import pprint    pprint(res.get())def method_2():    pool = Pool()    pool.apply_async(put_task, callback=collect_results)    pool.apply_async(put_task, callback=collect_results)    pool.apply_async(put_task, callback=collect_results)    pool.close()    pool.join()    from pprint import pprint    pprint(res)def method_3():    pool = Pool(processes=10)    m = Manager()    q = m.Queue()    for i in range(5):        pool.apply_async(task_put, (i, q))    pool.apply_async(task_take_queue, (q, 50))    pool.close()    pool.join()def method_4():    q = JoinableQueue()    # 运行使用者进程    cons_p = Process(target=consumer, args=(q,))    cons_p.daemon = True  # 定义该进程为后台运行 True - When a process exits, it attempts to terminate all of its daemonic child processes.    cons_p.start()    # 生产项目,sequence代表要发送给使用者的项目序列    # 在时间中,这可能是生成器的输出或通过一些其他方式生产出来    producer(q)    # 等待所有项目被处理    q.join()if __name__ == '__main__':    method_4()
1 import multiprocessing 2 import os 3 import time 4  5  6 def pool_init(q): 7     global queue  # make queue global in workers 8     queue = q 9 10 11 def task():12     # can use `queue` here if you like13     for i in range(5):14         time.sleep(1)15         queue.put(os.getpid())16 17 18 def take_task():19     while 1:20         print(queue.get(True))21 22 23 def run(pool):24     tasks = []25     tasks.append(pool.apply_async(take_task))26     for i in range(os.cpu_count()):27         tasks.append(pool.apply_async(task))28     for t in tasks:29         print(t.get(), )30 31 32 if __name__ == '__main__':33     queue = multiprocessing.Queue()34     pool = multiprocessing.Pool(initializer=pool_init, initargs=(queue,))35     run(pool)36     pool.close()37     pool.join()
View Code
 

 

 

转载于:https://www.cnblogs.com/chen-kh/p/7436362.html

你可能感兴趣的文章
web.xml 相关 listener filter servlet
查看>>
MyEclipse注册机
查看>>
Arrays类的六个基本方法
查看>>
jeecg开源框架 我看行!!!
查看>>
ListCtrl排序扩展类--CSortListCtrl
查看>>
关于CentOS-6.6-x86_64-bin-DVD安装vsftp问题
查看>>
zabbix安装过程中遇到的问题
查看>>
postgresql 角色 用户区别
查看>>
ArrayList和LinkedList内部实现、区别、使用场景
查看>>
1)gitlab+jenkins自动化发布;gitlab搭建
查看>>
Git 常用命令详解(二)
查看>>
Spring数据源的配置:c3p0、dbcp、druid
查看>>
区块链100讲:从村里的账本来看什么是区块链
查看>>
第五次课
查看>>
跟我一起学docker(17)--多节点mesos集群
查看>>
Android 的生命周期深入剖析
查看>>
AI行业强者愈强?Tesra超算网络助力中小AI开发企业!
查看>>
Nginx 目录配置详解
查看>>
关于 PHP 5.4 你所需要知道的
查看>>
codeforces 810A
查看>>