(python)多进程

(python)多进程工作中遇到任务较重且耗时相对较长的情况 这时候想起来用多进程处理这类 IO 密集型任务 最重要的是 multiprocess Pool 实在是简单又高效 充分利用了多核 CPU 同时并行处理多个任务 支持多进程并行处理 可以利用多核 CPU 提高任务的执行效率和速度

前言

        在 Python 中,多进程是一种并发编程的方式,它允许程序同时执行多个任务,提高程序的性能和效率。与单线程或异步编程相比,多进程能够更好地利用多核处理器、提高计算密集型任务的执行速度,并且能够更好地处理IO密集型任务。通过多进程,可以将任务分配给不同的进程来并行执行,从而加快整体的处理速度。

Python 提供了 multiprocessing 模块来实现多进程编程,这个模块可以让我们轻松地创建、启动和管理多个进程。使用多进程编程时需要注意合理利用进程间通信(IPC)机制,避免竞争条件和数据共享问题。

目录

多进程运行图

应用场景

相关第三方库 - multiprocessing 

multiprocessing.Pool 

流程图

流程图说明 

示例1

示例2

实验-效率对比(一般&多进程)

总结

优缺点

踩坑与心得


多进程运行图

        任务尽可能地划分为多个独立的子任务,这样每个进程可以独立地执行自己的子任务。

        多个子任务在队列中等候,一旦某个子任务完成,就马上分配下一个子任务.

应用场景

  1. CPU 密集型任务:当任务需要大量的 CPU 计算而不涉及太多的 I/O 操作时,使用多进程可以充分利用多核 CPU 的优势,加速任务的执行。例如,数值计算、图像处理和模型训练等。
  2. 并行处理:如果有多个相互独立的任务需要并行执行,而这些任务之间不需要共享数据,可以通过多进程实现并行处理,提高整体处理能力。例如,同时处理多个文件的数据分析任务。
  3. 任务分发:将一个大任务分解成多个小任务,每个小任务由一个单独的进程处理,最后合并结果。这种方式可以提高任务的整体处理速度和效率。例如,分布式计算系统中的任务分发和结果汇总。
  4. 程序健壮性:通过多进程可以将不同模块或功能独立运行在各自的进程中,即使其中一个进程出现问题崩溃,其他进程仍可以继续运行,提高程序的健壮性和稳定性。
  5. 利用多核处理器:在拥有多核处理器的计算机上,使用多进程可以更好地利用硬件资源,实现并行计算,提高程序的运行效率。

相关第三方库 - multiprocessing 

Python 中用于编写多进程程序的内置模块。它提供了创建和管理进程、进程间通信、进程池等功能,使得在 Python 中实现多进程编程变得更加简单和方便。

        

  1. 进程创建和管理          提供了 Process 类,可以通过实例化该类来创建新的进程,并通过调用 start() 方法启动进程的执行。另外,还可以使用 current_process() 函数获取当前进程的信息。
  2. 进程间通信:multiprocessing 提供了多种进程间通信的工具,如队列(Queue)、管道(Pipe)和共享内存(ValueArray)等,用于在多个进程之间进行数据交换和共享。这些工具能够保证数据在不同进程之间的安全传输和共享。
  3. 同步原语:multiprocessing 提供了锁(Lock)、事件(Event)、条件变量(Condition)等同步原语,用于控制多个进程之间的并发访问和协作,避免数据竞争和死锁等问题。
  4. 进程池multiprocessing 提供了 Pool 类,可以创建一个进程池,并通过调用其方法来提交任务,进程池会自动分配任务给空闲的进程执行。进程池可以充分利用多核 CPU,提高并行任务的执行效率。
  5. 子进程管理:multiprocessing 提供了 Manager 类,可以创建一个子进程管理器,用于管理在主进程和子进程之间共享的数据,如字典、列表、命名空间等。通过 Manager 类,可以在不同进程之间安全地操作和共享数据。
  6. 异常处理:multiprocessing 能够捕获并处理子进程中抛出的异常,防止异常在主进程中导致程序崩溃或无法继续执行。
multiprocessing.Pool 
  1. Pool(): 创建一个进程池对象,可以通过传递参数 processes 指定进程池中的进程数量,默认使用 CPU 的核心数作为进程数量。
  2. apply(func, args): 在进程池中同步执行函数 func,并将参数 args 传递给该函数。返回函数执行的结果。
  3. map(func, iterable): 在进程池中同步地按顺序执行函数 func,并将可迭代对象 iterable 中的素作为参数传递给该函数。返回一个结果列表,其中每个素是 func 函数在对应输入上的返回值。
  4. imap(func, iterable): 在进程池中异步地按顺序执行函数 func,并将可迭代对象 iterable 中的素作为参数传递给该函数。返回一个迭代器对象,通过遍历该迭代器可以获取函数执行的结果。
  5. apply_async(func, args, callback=None): 在进程池中异步执行函数 func,并将参数 args 传递给该函数。返回一个 AsyncResult 对象,通过该对象可以获取函数执行的结果。
  6. map_async(func, iterable, callback=None): 在进程池中异步按顺序执行函数 func,并将可迭代对象 iterable 中的素作为参数传递给该函数。返回一个 AsyncResult 对象,通过该对象可以获取函数执行的结果。
  7. close(): 关闭进程池,不再接受新的任务提交。
  8. join(): 主程序等待所有进程池中的任务执行完成后再继续执行。

流程图

开始
|
创建进程池
|
提交任务给进程池
|
|--- 执行任务1
|--- 执行任务2
|--- ...
|--- 执行任务N
|
收集任务结果
|
结束

流程图说明 

  1. 开始:开始执行任务的流程。
  2. 创建进程池:使用 multiprocessing.Pool 创建一个进程池对象,并指定进程数。
  3. 提交任务给进程池:将需要执行的任务提交给进程池,可以使用 apply()map() 或 imap() 等方法提交任务。这些方法可以同步或异步地提交任务,具体取决于你的需求。
  4. 执行任务:进程池会自动分配任务给空闲的进程执行。每个进程将独立执行一个任务,直到所有任务都执行完成。
  5. 收集任务结果:可以使用 get() 方法获取任务的执行结果,或者等待所有任务执行完毕后再一次性获取所有结果。
  6. 结束:任务执行完毕后,关闭进程池,释放资源。

示例1

def func(): ... def func_callback(msg): # func执行完后,返回的信息 ... def func_error(): # 错误处理 ... # 查询电脑的CPU数量 use_cpu_num = multiprocessing.cpu_count() # 调用线程池 process_obj = multiprocessing.Pool(use_cpu_num - 1) # 采用异步非阻塞 apply_async 参数只有一个时,后面要加个 , # func 要执行的函数 # callback为回调函数,执行完后要 执行的下一步动作. 由于多进程之间无法共享,要好好利用回调函数 # error_callback为错误函数 process_obj.apply_async(func, args=(n,), callback=func_callback, error_callback=func_error) # 关闭进程池,不再接受新的进程 process_obj.close() # 主进程阻塞等待子进程的退出 process_obj.join()

示例2

#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName : random_plot.py # @Time : 2022/8/23 14:30 # @Author : Marst.Zhang """ task: output: 应用场景: """ from datetime import datetime import multiprocessing import matplotlib.pyplot as plt from random import choice import matplotlib import os desk = os.path.join(os.path.expanduser("~"), 'Desktop') report_folder = os.path.join(desk, "plot") if not os.path.exists(report_folder): os.mkdir(report_folder) class RandomWalkPicture: """一个生成随机漫步的属性""" def __init__(self, num_points=5000): """初始化随机漫步的属性"。""" self.num_points = num_points # 所有随机漫步都始于(0,0) self.x_values = [0] self.y_values = [0] def walk(self): """计算随机漫步包含的所有点""" # 不断漫步, 直到列表达到指定的长度 while len(self.x_values) < self.num_points: x_direction = choice([1, -1]) x_distance = choice(range(1, 9, 1)) x_step = x_direction * x_distance y_direction = choice([1, -1]) y_distance = choice(range(1, 9, 1)) y_step = y_direction * y_distance # 拒绝原地踏步 if x_step == 0 and y_step == 0: continue # 计算下一个点的x值和y值。 x = self.x_values[-1] + x_step y = self.y_values[-1] + y_step self.x_values.append(x) self.y_values.append(y) def start_draw(random_val): rw = RandomWalkPicture(random_val) rw.walk() _style_name = matplotlib.style.available[0] plt.style.use('dark_background') fig, ax = plt.subplots() # 绘得图案以渐变的颜色来显示 ax.scatter(rw.x_values, rw.y_values, c=rw.y_values, cmap=plt.cm.Reds, s=2) """突出起点和终点""" ax.scatter(0, 0, c='g', edgecolors='none', s=100) ax.scatter(rw.x_values[-1], rw.y_values[-1], c='b', edgecolors='none', s=100) # 隐藏坐标 设置坐标轴的可见和不可见 ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt_path = os.path.join(report_folder, f"{random_val}_{_style_name}.png") plt.savefig(plt_path) plt.close() def time_format(sec): m, s = divmod(sec, 60) h, m = divmod(m, 60) if h: return "%02d:%02d:%02d" % (h, m, s) else: return "%02d:%02d" % (m, s) def cost_time(start_time): now = datetime.now() cost_time = (now - start_time).seconds print(f"cost_time={time_format(cost_time)}s") def func_call_back(msg): print(msg) def normal_run(): # 创建一个RandomWalk实例。 _start_time = datetime.now() for n in range(100, 1000): start_draw(random_val=n) cost_time(_start_time) def func_process(): use_cpu_num = multiprocessing.cpu_count() - 1 process_plot = multiprocessing.Pool(use_cpu_num) print(f"use_cpu_num= {use_cpu_num}") _start_time = datetime.now() for n in range(100, 1000): # 输入参数只要一个时,后面要加 逗号 , process_plot.apply_async(start_draw, args=(n,)) # 关闭进程池,不再接受新的进程 process_plot.close() # 主进程阻塞等待子进程的退出 process_plot.join() cost_time(_start_time) def run(): if 1: # 线性执行 900个图 cost_time=01:51s normal_run() if 0: # 多进程 900个图 7核 cost_time=00:57s func_process() if __name__ == '__main__': run() 
实验-效率对比(一般&多进程)

电脑配置是8核,测试时用了7个核,绘制900个图,比较一下耗时.

结论:绘图效率提升,节省了差不多一半的时间.

总结

优缺点

优点:

  1. 充分利用多核处理器 支持多进程并行处理,可以利用多核 CPU 提高任务的执行效率和速度
  2. 独立的内存空间 每个进程都有独立的内存空间,相互之间不会干扰,这降低了数据共享和同步的复杂性,提供了更高的安全性。
  3. 解决全局解释器锁(GIL)问题 由于每个进程都有自己的Python解释器,进程池可以避开全局解释器锁 (GIL),允许多个进程并行执行Python代码。
  4. 提供了简单易用的 API,方便进行任务的分发和结果的收集,代码编写相对简单
  5. 可以在一定程度上降低代码复杂度,例如避免手动创建和管理多个进程等。
  6. 可以处理阻塞操作 与线程池不同,进程池中的进程可以通过操作系统级别的并发来处理阻塞式 I/O 操作,从而提高程序的响应能力。

缺点:

  1. 进程切换开销较大,特别是当进程池中进程数量过多时,容易产生性能瓶颈。因此在使用进程池时需要适当调整进程数,以充分利用系统资源。
  2. 进程之间的通信需要通过 IPC(Inter-Process Communication)机制进行,可能会涉及到数据序列化和反序列化,对于大量或大型数据的处理,可能会影响处理效率和性能。
  3. 如果任务本身不是密集型或者IO密集型任务,进程池可能并不能带来明显的性能提升,甚至有可能因为额外的进程切换开销导致性能下降。

踩坑与心得

执行的函数在设计之时,本身应该是可以独立执行,互不影响的.

①回调函数只能返回一个参数

apply_async属于异步,执行先后顺序还是和添加的任务顺序多少有点关系.

③添加任务时,传入的参数不能是对象

④添加任务时,传入的参数只有一个时,要加个,

⑤多进程之间无法共享变量,要实现这一点比较麻烦,建议利用好回调函数的使用.

绘图方法参考了以下文档

https://blog.csdn.net/kuwola/article/details/

今天的文章 (python)多进程分享到此就结束了,感谢您的阅读。
编程小号
上一篇 2025-01-04 17:30
下一篇 2025-01-04 17:27

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/101499.html