multiprocessing #
multiprocessing是一个支持使用与 threading 模块类似的 API 来生成进程的包。multiprocessing包提供了本地和远程的并发性,通过使用子进程而非线程,绕过了全局解释器锁(GIL)的限制。因此,multiprocessing 模块允许程序员充分利用机器上的多个处理器。
当需要并行执行CPU密集型任务,并且计算机有多核CPU时,更适合使用multiprocessing库。如果进程将大部分时间花在等待I/O(例如网络、磁盘、数据库或键盘)上,多进程就不那么有用了,这时候更适合使用多线程或者协程。
Pool #
multiprocessing 模块还引入了一些在 threading 模块中没有对应的 API。其中一个重要的例子是 Pool 对象,它提供了一种简便的方法,可以在多个输入值上并行执行函数,将输入数据分布到不同的进程中(数据并行)。下面是一个使用 Pool 进行数据并行的基本示例。
1from multiprocessing import Pool
2
3
4def f(x):
5 return x * x
6
7
8if __name__ == "__main__":
9 with Pool(5) as p:
10 print(p.map(f, [1, 2, 3]))
11# 输出 [1, 4, 9]
进程池multiprocessing.pool.Pool
隐藏了进程间通信的细节,使用Pool看起来很像函数调用:将数据传递给一个函数,它在另一个或多个进程中执行,当工作完成时,返回一个值。这依赖于背后的很多工作:一个进程中的对象使用pickle进行序列化并被传递到操作系统进程管道中。然后,另一个进程从管道中获取数据并反序列化。所需工作在子进程中完成,并产生一个结果。结果被序列化,并通过管道传递回来。最终,主进程会反序列化并返回它。总
相关内容:
concurrent.futures.ProcessPoolExecutor
concurrent.futures.ProcessPoolExecutor
提供了一个更高级的接口,可将任务推送到后台进程,而不会阻塞调用进程的执行。与直接使用Pool
接口相比,concurrent.futures
API 更便于将任务提交到底层进程池,并可以与等待结果的过程分开。
Process #
可以通过创建一个Process
对象并调用其start()
方法来生成新进程。
1from multiprocessing import Process
2import os
3
4
5def info(title):
6 print(title)
7 print("module name:", __name__)
8 print("parent process:", os.getppid())
9 print("process id:", os.getpid())
10
11
12def f(name):
13 info("function f")
14 print("hello", name)
15
16
17if __name__ == "__main__":
18 info("main line")
19 p = Process(target=f, args=("bob",))
20 p.start()
21 p.join()
进程间通信和multiprocessing.Queue
#
如果需要对进程间的通信进行更多的控制,可以使用数据结构multiprocessing.Queue
。它提供了几种从一个进程向另一个或多个其他进程发送消息的方法。任何可以被pickle
序列化的对象都可以被发送到一个Queue中,但是pickle
可能是一个开销很大的操作,所以应尽量让这种对象很小。
1from multiprocessing import Process, Queue
2import time
3
4
5def producer(queue):
6 """生产者进程,将数据放入队列"""
7 for i in range(5):
8 item = f"数据 {i}"
9 print(f"生产者:将 {item} 放入队列")
10 queue.put(item)
11 time.sleep(1) # 模拟生产延迟
12 queue.put(None) # 表示结束信号
13
14
15def consumer(queue):
16 """消费者进程,从队列中获取数据"""
17 while True:
18 item = queue.get()
19 if item is None: # 检查是否为结束信号
20 break
21 print(f"消费者:从队列中取出 {item}")
22 time.sleep(2) # 模拟处理延迟
23
24
25if __name__ == "__main__":
26 # 创建一个队列用于进程间通信
27 queue = Queue()
28
29 # 创建生产者和消费者进程
30 producer_process = Process(target=producer, args=(queue,))
31 consumer_process = Process(target=consumer, args=(queue,))
32
33 # 启动进程
34 producer_process.start()
35 consumer_process.start()
36
37 # 等待进程完成
38 producer_process.join()
39 consumer_process.join()
40
41 print("所有进程已完成")
multiprocessing的问题 #
- 进程间共享数据的成本很高,需要进行序列化和反序列化,占用大量处理时间。
- 可被共享的 Python 对象需要满足许多限制,使得共享内存的使用相对复杂。
- 很难判断变量或方法是在哪个进程中被访问的,需要额外的管理。
- 父进程不会看到子进程对继承的可变对象(如列表、字典等)的修改,需要额外处理。
- 进程间通信的方式(如队列、管道、共享内存等)需要额外的开销和复杂性。