摘要:在 Python 中实现并行计算可以通过多种方式,具体取决于任务类型(CPU 密集型或 I/O 密集型)和需求(多线程、多进程或分布式计算)。以下是常见的几种方法及其示例:
在 Python 中实现并行计算可以通过多种方式,具体取决于任务类型(CPU 密集型或 I/O 密集型)和需求(多线程、多进程或分布式计算)。以下是常见的几种方法及其示例:
1. 多线程 (threading 模块)
适用场景:I/O 密集型任务(如网络请求、文件读写)。注意:由于 Python 的全局解释器锁(GIL),多线程无法充分利用多核 CPU。示例:python
import threading
def task(n):
print(f"Processing {n}")
threads =
for i in range(5):
t = threading.Thread(target=task, args=(i,))
threads.append(t)
t.start
for t in threads:
t.join
2. 多进程 (multiprocessing 模块)
适用场景:CPU 密集型任务(如数值计算)。优势:绕过 GIL,充分利用多核 CPU。示例:python
import multiprocessing
def square(n):
return n * n
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, ..., 81]
3. concurrent.futures 模块
高级接口:提供 ThreadPoolExecutor(线程池)和 ProcessPoolExecutor(进程池)。示例(进程池):python
from concurrent.futures import ProcessPoolExecutor
def task(n):
return n ** 2
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(10)]
results = [f.result for f in futures]
print(results)
4. 第三方库:joblib
适用场景:简化并行循环任务(常用于机器学习)。示例:python
from joblib import Parallel, delayed
def process_item(item):
return item * 2
results = Parallel(n_jobs=4)(delayed(process_item)(i) for i in range(10))
print(results) # [0, 2, 4, ..., 18]
5. 异步编程 (asyncio)
适用场景:高并发 I/O 操作(如 HTTP 请求、数据库查询)。示例:python
import asyncio
async def fetch_data(id):
await asyncio.sleep(1) # 模拟 I/O 延迟
return f"Data {id}"
async def main:
tasks = [fetch_data(i) for i in range(3)]
results = await asyncio.gather(*tasks)
print(results) # ['Data 0', 'Data 1', 'Data 2']
asyncio.run(main)
6. 分布式并行:Dask 或 Ray
适用场景:大规模数据或分布式计算。Dask 示例:python
import dask
from dask import compute, delayed
@delayed
def add(x, y):
return x + y
tasks = [add(i, i) for i in range(10)]
results = compute(*tasks) # 并行执行
print(results) # (0, 2, 4, ..., 18)
进程间通信:多进程需要显式处理数据传递(如 Queue、Pipe 或共享内存)。调试复杂性:并行代码更难调试,建议使用日志跟踪。资源管理:避免创建过多线程/进程导致资源耗尽。来源:老客数据一点号