【Python】process使用

【Python】process使用文章介绍了 Python 的 multiprocess 模块在多进程编程中的应用 包括 apply async 实现异步进程池 Manager 创建安全的数据共享 Pool 进程池的使用 以及 Queue Pipe 和 joinableQueu 等数据结构在进程间通信和资源共享的角色

1、apply_async

# encoding:utf-8 # -*- coding:utf-8 -*- # 异步进程池(非阻塞) from multiprocessing import Pool def test(i): print(i) if __name__ == "__main__": pool = Pool(8) for i in range(100): pool.apply_async(test, args=(i,)) # 结果是乱序的 #子进程和父进程是异步的 print("test") pool.close() pool.join() 

2、Manager

# encoding:utf-8 import multiprocessing # Manager()返回的manager对象控制了一个server进程, # 此进程包含的python对象可以被其他的进程通过proxies来访问。 # 从而达到多进程间数据通信且安全。Manager模块常与Pool模块一起使用。 # 以下类型均不是进程安全的,需要加锁. # Array(self,*args,kwds) # BoundedSemaphore(self,*args,kwds) # Condition(self,*args,kwds) # Event(self,*args,kwds) # JoinableQueue(self,*args,kwds) # Lock(self,*args,kwds) # Namespace(self,*args,kwds) # Pool(self,*args,kwds) # Queue(self,*args,kwds) # RLock(self,*args,kwds) # Semaphore(self,*args,kwds) # Value(self,*args,kwds) # dict(self,*args,kwds) # list(self,*args,kwds) def f(x, arr, l, d, n): x.value = 3.14 arr[0] = 5 l.append('xx') d[1] = 2 n.a = 10 if __name__ == '__main__': server = multiprocessing.Manager() x = server.Value('d', 0.0) arr = server.Array('i', range(10)) l = server.list() d =server.dict() n = server.Namespace() proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n)) proc.start() proc.join() print(x.value) print(arr) print(l) print(d) print(n) 

3、Pool进程池

# encoding:utf-8 # -*- coding:utf-8 -*- # Pool+map import datetime import os from multiprocessing import Pool from time import sleep def test(i): print(i) sleep(5) print('Run child process (%s)...' % ( os.getpid())) if __name__ == "__main__": lists = range(30) pool = Pool(8) # 控制每时只有8个进程 t1=datetime.datetime.now() pool.map(test, lists) pool.close() # close() 关闭pool,使其不在接受新的任务 pool.join() # 主进程阻塞,等待子进程的退出 t2 = datetime.datetime.now() print(f'cost==={ 
     t2-t1}') 

4、queue资源共享

 # 普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。 # Queue是用来创建进程间资源共享的队列的类,使用Queue可以达到多进程间数据传递的功能(缺点:只适用Process类,不能在Pool进程池中使用)。 from multiprocessing import Process, Queue import os import time import random def write(q): print("process to write:{}" .format(os.getpid())) for value in [1, 2, 3]: print("put {} to queue..." .format(value)) q.put(value) time.sleep(random.random()) def read(q): print("process to read:{}" .format(os.getpid())) while True: value = q.get(True) print("get {} from queue..." .format(value)) if __name__ == '__main__': q = Queue() p_w = Process(target=write, args=(q,)) p_r = Process(target=read, args=(q,)) p_w.start() p_r.start() p_w.join() # 等待pw结束 p_r.terminate() # pr进程里是死循环,无法等待其结束,只能强行终止 

5、Pipe

# encoding:utf-8 from multiprocessing import Process, Pipe import time # Pipe可以在进程之间创建一条管道,并返回组(conn1,conn2), # 其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。 # 子进程执行方法 def f(Subconn): time.sleep(1) Subconn.send("吃了吗") print("来自父亲的问候:{}" .format(Subconn.recv())) Subconn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() # 创建管道两端 p = Process(target=f, args=(child_conn,)) # 创建子进程 p.start() print("来自儿子的问候:{}" .format(parent_conn.recv())) parent_conn.send("嗯") 

6、joinableQueue

# encoding:utf-8 from multiprocessing import Process, JoinableQueue # JoinableQueue就像是一个Queue对象, # 但队列允许项目的使用者通知生成者项目已经被成功处理。 # 通知进程是使用共享的信号和条件变量来实现的。 import time, random # 消费者 接收到生产者的join请求后执行task_done def consumer(q): while True: res = q.get() print('消费者拿到了 %s' % res) q.task_done() # 生产者 生产完了负责发送join  def producer(seq, q): for item in seq: time.sleep(random.randrange(1,2)) q.put(item) print('生产者做好了 %s' % item) q.join() if __name__ == "__main__": q = JoinableQueue() seq = ('产品%s' % i for i in range(5)) p = Process(target=consumer, args=(q,)) p.daemon = True # 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有素 p.start() producer(seq, q) print('主线程') 

7、Value_Array进程资源共享

# encoding:utf-8 # 用于进程通信,资源共享,注意:Value和Array只适用于Process类。 # multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。 import multiprocessing def f(n, a): n.value = 3.14 a[0] = 5 if __name__ == '__main__': num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10)) p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) 
今天的文章 【Python】process使用分享到此就结束了,感谢您的阅读。
编程小号
上一篇 2025-01-06 21:57
下一篇 2025-01-06 21:51

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/103142.html