前言
在 Python 中,多进程是一种并发编程的方式,它允许程序同时执行多个任务,提高程序的性能和效率。与单线程或异步编程相比,多进程能够更好地利用多核处理器、提高计算密集型任务的执行速度,并且能够更好地处理IO密集型任务。通过多进程,可以将任务分配给不同的进程来并行执行,从而加快整体的处理速度。
Python 提供了 multiprocessing
模块来实现多进程编程,这个模块可以让我们轻松地创建、启动和管理多个进程。使用多进程编程时需要注意合理利用进程间通信(IPC)机制,避免竞争条件和数据共享问题。
目录
多进程运行图
任务尽可能地划分为多个独立的子任务,这样每个进程可以独立地执行自己的子任务。
多个子任务在队列中等候,一旦某个子任务完成,就马上分配下一个子任务.
应用场景
- CPU 密集型任务:当任务需要大量的 CPU 计算而不涉及太多的 I/O 操作时,使用多进程可以充分利用多核 CPU 的优势,加速任务的执行。例如,数值计算、图像处理和模型训练等。
- 并行处理:如果有多个相互独立的任务需要并行执行,而这些任务之间不需要共享数据,可以通过多进程实现并行处理,提高整体处理能力。例如,同时处理多个文件的数据分析任务。
- 任务分发:将一个大任务分解成多个小任务,每个小任务由一个单独的进程处理,最后合并结果。这种方式可以提高任务的整体处理速度和效率。例如,分布式计算系统中的任务分发和结果汇总。
- 程序健壮性:通过多进程可以将不同模块或功能独立运行在各自的进程中,即使其中一个进程出现问题崩溃,其他进程仍可以继续运行,提高程序的健壮性和稳定性。
- 利用多核处理器:在拥有多核处理器的计算机上,使用多进程可以更好地利用硬件资源,实现并行计算,提高程序的运行效率。
相关第三方库 - multiprocessing
Python 中用于编写多进程程序的内置模块。它提供了创建和管理进程、进程间通信、进程池等功能,使得在 Python 中实现多进程编程变得更加简单和方便。
- 进程创建和管理
提供了
Process
类,可以通过实例化该类来创建新的进程,并通过调用start()
方法启动进程的执行。另外,还可以使用current_process()
函数获取当前进程的信息。- 进程间通信:
multiprocessing
提供了多种进程间通信的工具,如队列(Queue
)、管道(Pipe
)和共享内存(Value
、Array
)等,用于在多个进程之间进行数据交换和共享。这些工具能够保证数据在不同进程之间的安全传输和共享。- 同步原语:
multiprocessing
提供了锁(Lock
)、事件(Event
)、条件变量(Condition
)等同步原语,用于控制多个进程之间的并发访问和协作,避免数据竞争和死锁等问题。- 进程池:
multiprocessing
提供了Pool
类,可以创建一个进程池,并通过调用其方法来提交任务,进程池会自动分配任务给空闲的进程执行。进程池可以充分利用多核 CPU,提高并行任务的执行效率。- 子进程管理:
multiprocessing
提供了Manager
类,可以创建一个子进程管理器,用于管理在主进程和子进程之间共享的数据,如字典、列表、命名空间等。通过Manager
类,可以在不同进程之间安全地操作和共享数据。- 异常处理:
multiprocessing
能够捕获并处理子进程中抛出的异常,防止异常在主进程中导致程序崩溃或无法继续执行。
multiprocessing.Pool
Pool()
: 创建一个进程池对象,可以通过传递参数processes
指定进程池中的进程数量,默认使用 CPU 的核心数作为进程数量。apply(func, args)
: 在进程池中同步执行函数func
,并将参数args
传递给该函数。返回函数执行的结果。map(func, iterable)
: 在进程池中同步地按顺序执行函数func
,并将可迭代对象iterable
中的素作为参数传递给该函数。返回一个结果列表,其中每个素是func
函数在对应输入上的返回值。imap(func, iterable)
: 在进程池中异步地按顺序执行函数func
,并将可迭代对象iterable
中的素作为参数传递给该函数。返回一个迭代器对象,通过遍历该迭代器可以获取函数执行的结果。apply_async(func, args, callback=None)
: 在进程池中异步执行函数func
,并将参数args
传递给该函数。返回一个AsyncResult
对象,通过该对象可以获取函数执行的结果。map_async(func, iterable, callback=None)
: 在进程池中异步按顺序执行函数func
,并将可迭代对象iterable
中的素作为参数传递给该函数。返回一个AsyncResult
对象,通过该对象可以获取函数执行的结果。close()
: 关闭进程池,不再接受新的任务提交。join()
: 主程序等待所有进程池中的任务执行完成后再继续执行。
流程图
开始
|
创建进程池
|
提交任务给进程池
|
|--- 执行任务1
|--- 执行任务2
|--- ...
|--- 执行任务N
|
收集任务结果
|
结束
流程图说明
- 开始:开始执行任务的流程。
- 创建进程池:使用
multiprocessing.Pool
创建一个进程池对象,并指定进程数。 - 提交任务给进程池:将需要执行的任务提交给进程池,可以使用
apply()
、map()
或imap()
等方法提交任务。这些方法可以同步或异步地提交任务,具体取决于你的需求。 - 执行任务:进程池会自动分配任务给空闲的进程执行。每个进程将独立执行一个任务,直到所有任务都执行完成。
- 收集任务结果:可以使用
get()
方法获取任务的执行结果,或者等待所有任务执行完毕后再一次性获取所有结果。 - 结束:任务执行完毕后,关闭进程池,释放资源。
示例1
def func(): ... def func_callback(msg): # func执行完后,返回的信息 ... def func_error(): # 错误处理 ... # 查询电脑的CPU数量 use_cpu_num = multiprocessing.cpu_count() # 调用线程池 process_obj = multiprocessing.Pool(use_cpu_num - 1) # 采用异步非阻塞 apply_async 参数只有一个时,后面要加个 , # func 要执行的函数 # callback为回调函数,执行完后要 执行的下一步动作. 由于多进程之间无法共享,要好好利用回调函数 # error_callback为错误函数 process_obj.apply_async(func, args=(n,), callback=func_callback, error_callback=func_error) # 关闭进程池,不再接受新的进程 process_obj.close() # 主进程阻塞等待子进程的退出 process_obj.join()
示例2
#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName : random_plot.py # @Time : 2022/8/23 14:30 # @Author : Marst.Zhang """ task: output: 应用场景: """ from datetime import datetime import multiprocessing import matplotlib.pyplot as plt from random import choice import matplotlib import os desk = os.path.join(os.path.expanduser("~"), 'Desktop') report_folder = os.path.join(desk, "plot") if not os.path.exists(report_folder): os.mkdir(report_folder) class RandomWalkPicture: """一个生成随机漫步的属性""" def __init__(self, num_points=5000): """初始化随机漫步的属性"。""" self.num_points = num_points # 所有随机漫步都始于(0,0) self.x_values = [0] self.y_values = [0] def walk(self): """计算随机漫步包含的所有点""" # 不断漫步, 直到列表达到指定的长度 while len(self.x_values) < self.num_points: x_direction = choice([1, -1]) x_distance = choice(range(1, 9, 1)) x_step = x_direction * x_distance y_direction = choice([1, -1]) y_distance = choice(range(1, 9, 1)) y_step = y_direction * y_distance # 拒绝原地踏步 if x_step == 0 and y_step == 0: continue # 计算下一个点的x值和y值。 x = self.x_values[-1] + x_step y = self.y_values[-1] + y_step self.x_values.append(x) self.y_values.append(y) def start_draw(random_val): rw = RandomWalkPicture(random_val) rw.walk() _style_name = matplotlib.style.available[0] plt.style.use('dark_background') fig, ax = plt.subplots() # 绘得图案以渐变的颜色来显示 ax.scatter(rw.x_values, rw.y_values, c=rw.y_values, cmap=plt.cm.Reds, s=2) """突出起点和终点""" ax.scatter(0, 0, c='g', edgecolors='none', s=100) ax.scatter(rw.x_values[-1], rw.y_values[-1], c='b', edgecolors='none', s=100) # 隐藏坐标 设置坐标轴的可见和不可见 ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt_path = os.path.join(report_folder, f"{random_val}_{_style_name}.png") plt.savefig(plt_path) plt.close() def time_format(sec): m, s = divmod(sec, 60) h, m = divmod(m, 60) if h: return "%02d:%02d:%02d" % (h, m, s) else: return "%02d:%02d" % (m, s) def cost_time(start_time): now = datetime.now() cost_time = (now - start_time).seconds print(f"cost_time={time_format(cost_time)}s") def func_call_back(msg): print(msg) def normal_run(): # 创建一个RandomWalk实例。 _start_time = datetime.now() for n in range(100, 1000): start_draw(random_val=n) cost_time(_start_time) def func_process(): use_cpu_num = multiprocessing.cpu_count() - 1 process_plot = multiprocessing.Pool(use_cpu_num) print(f"use_cpu_num= {use_cpu_num}") _start_time = datetime.now() for n in range(100, 1000): # 输入参数只要一个时,后面要加 逗号 , process_plot.apply_async(start_draw, args=(n,)) # 关闭进程池,不再接受新的进程 process_plot.close() # 主进程阻塞等待子进程的退出 process_plot.join() cost_time(_start_time) def run(): if 1: # 线性执行 900个图 cost_time=01:51s normal_run() if 0: # 多进程 900个图 7核 cost_time=00:57s func_process() if __name__ == '__main__': run()
实验-效率对比(一般&多进程)
电脑配置是8核,测试时用了7个核,绘制900个图,比较一下耗时.
结论:绘图效率提升,节省了差不多一半的时间.
总结
优缺点
优点:
- 充分利用多核处理器 支持多进程并行处理,可以利用多核 CPU 提高任务的执行效率和速度。
- 独立的内存空间 每个进程都有独立的内存空间,相互之间不会干扰,这降低了数据共享和同步的复杂性,提供了更高的安全性。
- 解决全局解释器锁(GIL)问题 由于每个进程都有自己的Python解释器,进程池可以避开全局解释器锁 (GIL),允许多个进程并行执行Python代码。
- 提供了简单易用的 API,方便进行任务的分发和结果的收集,代码编写相对简单。
- 可以在一定程度上降低代码复杂度,例如避免手动创建和管理多个进程等。
- 可以处理阻塞操作 与线程池不同,进程池中的进程可以通过操作系统级别的并发来处理阻塞式 I/O 操作,从而提高程序的响应能力。
缺点:
- 进程切换开销较大,特别是当进程池中进程数量过多时,容易产生性能瓶颈。因此在使用进程池时需要适当调整进程数,以充分利用系统资源。
- 进程之间的通信需要通过 IPC(Inter-Process Communication)机制进行,可能会涉及到数据序列化和反序列化,对于大量或大型数据的处理,可能会影响处理效率和性能。
- 如果任务本身不是密集型或者IO密集型任务,进程池可能并不能带来明显的性能提升,甚至有可能因为额外的进程切换开销导致性能下降。
踩坑与心得
执行的函数在设计之时,本身应该是可以独立执行,互不影响的.
①回调函数只能返回一个参数
②apply_async属于异步,执行先后顺序还是和添加的任务顺序多少有点关系.
③添加任务时,传入的参数不能是对象
④添加任务时,传入的参数只有一个时,要加个,
⑤多进程之间无法共享变量,要实现这一点比较麻烦,建议利用好回调函数的使用.
绘图方法参考了以下文档
https://blog.csdn.net/kuwola/article/details/
今天的文章 (python)多进程分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/101499.html