多进程
同一时刻并行的处理多个任务,即为多进程。比如,你一边喝茶、看书还听着音乐。真正的并行多任务只能在多核的CPU上实现,由于任务数量是远远多于CPU的核数,所以操作系统会自动将多任务短时间轮流切换执行,给我们的感觉就像同时在执行一样。
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。编写的代码没有运行叫程序,正在运行的代码就是进程。
fork
Python中可以使用os模块fork()函数来创建子进程。程序执行os.fork()时,操作系统会创建一个子进程,然后复制父进程的所有信息到子进程中;调用一次os.fork()时,会返回两次值。返回给子进程的值一定是0,返回给父进程的是子进程中的pid号。返回给父进程pid(子进程的)号,是因为父进程可以fork出多个子进程,所以有必要记住子进程的pid号。
import os print("当前进程pid= %d"% os.getpid()) num = 0 pid = os.fork() if pid == 0: print("我是子进程: %s,父进程是:%s"%(os.getpid(),os.getppid())) num += 1 print("num = %d"%num) else: print("我是父进程:%s,我的子进程是:%s"%(os.getpid(),pid)) num += 1 print("num = %d"%num) print("父、子进程都可以执行") 输出: 当前进程pid= 4262 我是父进程:4262,我的子进程是:4263 num = 1 父、子进程都可以执行 我是子进程: 4263,父进程是:4262 num = 1 父、子进程都可以执行
多进程中,每个进程都各自拥有一份,互不影响,如上num = 1。
multiprocessing
由于fork函数存在于Linux、Unix、Mac操作系统中,Windows操作系统无fork函数调用,Python作为一个跨平台的语言,使用multiprocessing模块封装fork函数来创建多进程。multiprocessing提供一个Process类来代表一个进程对象。
Process使用:
Process([group [, target [, name [, args [, kwargs]]]]])
target:表示这个进程实例所调用对象;
args:表示调用对象的位置参数元组;
kwargs:表示调用对象的关键字参数字典;
name:为当前进程实例的别名;
group:大多数情况下用不到;
Process类常用方法:
is_alive():判断进程实例是否还在执行;
join([timeout]):是否等待进程实例执行结束,或等待多少秒;
start():启动进程实例(创建子进程);
run():如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法;
terminate():不管任务是否完成,立即终止;
Process类常用属性:
name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数;
pid:当前进程实例的PID值;
from multiprocessing import Process import os import time def run_proc(name,age,**kwargs): for i in range(10): print('子进程运行中,name= %s,age=%d,pid=%d'%(name,age,os.getpid())) print(kwargs) time.sleep(1) if __name__ == '__main__': print('父进程 %d'%os.getpid()) p = Process(target=run_proc,args=('MI',18),kwargs={'S':99}) print('将要执行子进程') p.start() time.sleep(2) p.terminate() p.join() print('子进程结束了') 输出: 父进程 4460 将要执行子进程 子进程运行中,name= MI,age=18,pid=4461 {'S': 99} 子进程运行中,name= MI,age=18,pid=4461 {'S': 99} 子进程结束了
Process创建子进程,只需要传入一个函数和函数参数,创建一个Process实例对象,然后用start方法启动,这样创建比fork()更简单。join()方法可以等待子进程结束后主进程继续往下执行,通常用于进程同步。
from multiprocessing import Process import os import time import signal def run_proc(name,age,**kwargs): for i in range(10): print('子进程运行中,name= %s,age=%d,pid=%d'%(name,age,os.getpid())) print(kwargs) time.sleep(0.1) if __name__ == '__main__': print('父进程 %d'%os.getpid()) p = Process(target=run_proc,args=('MI',18),kwargs={'S':99}) print('将要执行子进程') p.start() time.sleep(0.2) os.killpg(os.getpid(),signal.SIGKILL) #杀死父进程 print('子进程结束了') 输出: 父进程 3791 将要执行子进程 子进程运行中,name= MI,age=18,pid=3792 {'S': 99} 子进程运行中,name= MI,age=18,pid=3792 {'S': 99} 已杀死
Process中主进程会默认等待子进程执行完后继续执行父进程,如果父进程退出或者异常被终止了,子进程也会退出运行。当进程中处理的事物比较复杂,一个函数不能完成时,我们可以让一个类去继承Process实现要处理的任务。
进程池Pool
当创建的子进程不多的时候,可以使用Process动态成生多个进程,如果需要创建成百上千时,手动创建工作量大,最主要的是不断创建和删除子进程调度系统分配资源很耗时间。所以我们可以创建一个进程池,预先放一些进程进去,要用的时候就直接调用,用完之后再把进程归还给进程池,省下创建删除进程的时间,提高了效率。
multiprocessing.Pool常用函数:
- apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执 行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
- apply(func[, args[, kwds]]):使用阻塞方式调用func
- close():关闭Pool,使其不再接受新的任务;
- terminate():不管任务是否完成,立即终止;
- join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;
from multiprocessing import Pool import os,time,random def worker(msg): tStart = time.time() print("%s开始执行,进程号为%d"%(msg,os.getpid())) time.sleep(random.random()*2) tStop = time.time() print(msg,"执行完毕,耗时%0.2f"%(tStop-tStart)) if __name__ == '__main__': #创建一个进程池,最大进程数3 po = Pool(3) for i in range(0,5): po.apply_async(worker,(i,)) print('----start----') #关闭进程池,不再添加新的请求 po.close() #必须放在close()之后,等待po中所有子进程执行完毕 po.join() print('----end----') 输出: ----start---- 0开始执行,进程号为4049 1开始执行,进程号为4050 2开始执行,进程号为4051 1 执行完毕,耗时0.96 3开始执行,进程号为4050 0 执行完毕,耗时0.98 4开始执行,进程号为4049 2 执行完毕,耗时1.30 4 执行完毕,耗时0.77 3 执行完毕,耗时1.11 ----end----
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。
if __name__ == '__main__': #创建一个进程池,最大进程数3 po = Pool(3) for i in range(0,5): po.apply_async(worker,(i,)) print('----start----') #关闭进程池,不再添加新的请求 #po.close() #必须放在close()之后,等待po中所有子进程执行完毕 #po.join() print('----end----') 输出: ----start---- ----end----
如果我们在进程池之后没有添加join()会导致进程池中的任务不被执行。主进程创建/添加任务后,主进程默认不会等待进程池中的任务执行完后才结束,而是当主进程的任务做完之后立马结束。
进程通信Queue
Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序。
Queue使用:
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
Queue.qsize():返回当前队列包含的消息数量;
Queue.empty():如果队列为空,返回True,反之False ;
Queue.full():如果队列满了,返回True,反之False;
Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如 果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出”Queue.Empty”异常;
2)如果block值为False,消息列队如果为空,则会立刻抛出”Queue.Empty”异常;
Queue.get_nowait():相当Queue.get(False);
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾 出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出”Queue.Full”异常;
2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full”异常;
Queue.put_nowait(item):相当Queue.put(item, False);
from multiprocessing import Process,Queue import os,time,random def write(q): print('Process to write: %s '%os.getpid()) for value in ['A','B','C']: print('put %s to queue--'%value) q.put(value) time.sleep(random.random()) def read(q): print('Process to read: %s '%os.getpid()) while True: value = q.get(True) print('get %s from queue '%value) if __name__ == '__main__': q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read,args=(q,)) pw.start() pr.start() pw.join() pr.terminate() print('Complete reading and writing') 输出: Process to write: 4707 put A to queue-- Process to read: 4708 get A from queue put B to queue-- get B from queue put C to queue-- get C from queue Complete reading and writing
参考:
https://blog.csdn.net/sayhello_world/article/details/72829329
https://blog.csdn.net/hello_bravo_/article/details/52528283
今天的文章多进程分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/56270.html