multiprocessing

multiprocessing #

https://docs.python.org/3/library/multiprocessing.html

multiprocessing是一个支持使用与 threading 模块类似的 API 来生成进程的包。multiprocessing包提供了本地和远程的并发性,通过使用子进程而非线程,绕过了全局解释器锁(GIL)的限制。因此,multiprocessing 模块允许程序员充分利用机器上的多个处理器。

当需要并行执行CPU密集型任务,并且计算机有多核CPU时,更适合使用multiprocessing库。如果进程将大部分时间花在等待I/O(例如网络、磁盘、数据库或键盘)上,多进程就不那么有用了,这时候更适合使用多线程或者协程。

Pool #

multiprocessing 模块还引入了一些在 threading 模块中没有对应的 API。其中一个重要的例子是 Pool 对象,它提供了一种简便的方法,可以在多个输入值上并行执行函数,将输入数据分布到不同的进程中(数据并行)。下面是一个使用 Pool 进行数据并行的基本示例。

 1from multiprocessing import Pool
 2
 3
 4def f(x):
 5    return x * x
 6
 7
 8if __name__ == "__main__":
 9    with Pool(5) as p:
10        print(p.map(f, [1, 2, 3]))
11# 输出 [1, 4, 9]

进程池multiprocessing.pool.Pool隐藏了进程间通信的细节,使用Pool看起来很像函数调用:将数据传递给一个函数,它在另一个或多个进程中执行,当工作完成时,返回一个值。这依赖于背后的很多工作:一个进程中的对象使用pickle进行序列化并被传递到操作系统进程管道中。然后,另一个进程从管道中获取数据并反序列化。所需工作在子进程中完成,并产生一个结果。结果被序列化,并通过管道传递回来。最终,主进程会反序列化并返回它。总

相关内容:concurrent.futures.ProcessPoolExecutor

concurrent.futures.ProcessPoolExecutor 提供了一个更高级的接口,可将任务推送到后台进程,而不会阻塞调用进程的执行。与直接使用 Pool 接口相比,concurrent.futures API 更便于将任务提交到底层进程池,并可以与等待结果的过程分开。

Process #

可以通过创建一个Process对象并调用其start()方法来生成新进程。

 1from multiprocessing import Process
 2import os
 3
 4
 5def info(title):
 6    print(title)
 7    print("module name:", __name__)
 8    print("parent process:", os.getppid())
 9    print("process id:", os.getpid())
10
11
12def f(name):
13    info("function f")
14    print("hello", name)
15
16
17if __name__ == "__main__":
18    info("main line")
19    p = Process(target=f, args=("bob",))
20    p.start()
21    p.join()

进程间通信和multiprocessing.Queue #

如果需要对进程间的通信进行更多的控制,可以使用数据结构multiprocessing.Queue。它提供了几种从一个进程向另一个或多个其他进程发送消息的方法。任何可以被pickle序列化的对象都可以被发送到一个Queue中,但是pickle可能是一个开销很大的操作,所以应尽量让这种对象很小。

 1from multiprocessing import Process, Queue
 2import time
 3
 4
 5def producer(queue):
 6    """生产者进程,将数据放入队列"""
 7    for i in range(5):
 8        item = f"数据 {i}"
 9        print(f"生产者:将 {item} 放入队列")
10        queue.put(item)
11        time.sleep(1)  # 模拟生产延迟
12    queue.put(None)  # 表示结束信号
13
14
15def consumer(queue):
16    """消费者进程,从队列中获取数据"""
17    while True:
18        item = queue.get()
19        if item is None:  # 检查是否为结束信号
20            break
21        print(f"消费者:从队列中取出 {item}")
22        time.sleep(2)  # 模拟处理延迟
23
24
25if __name__ == "__main__":
26    # 创建一个队列用于进程间通信
27    queue = Queue()
28
29    # 创建生产者和消费者进程
30    producer_process = Process(target=producer, args=(queue,))
31    consumer_process = Process(target=consumer, args=(queue,))
32
33    # 启动进程
34    producer_process.start()
35    consumer_process.start()
36
37    # 等待进程完成
38    producer_process.join()
39    consumer_process.join()
40
41    print("所有进程已完成")

multiprocessing的问题 #

  • 进程间共享数据的成本很高,需要进行序列化和反序列化,占用大量处理时间。
  • 可被共享的 Python 对象需要满足许多限制,使得共享内存的使用相对复杂。
  • 很难判断变量或方法是在哪个进程中被访问的,需要额外的管理。
  • 父进程不会看到子进程对继承的可变对象(如列表、字典等)的修改,需要额外处理。
  • 进程间通信的方式(如队列、管道、共享内存等)需要额外的开销和复杂性。
© 2025 青蛙小白 | 总访问量 | 总访客数