摘要:前面的例子看起来像是为了在事件循环上调度两个协程并获取结果而写的很多样板代码。asyncio 库提供了一个包装方法 asyncio.run,它正是用来做这件事的。我们可以将程序改写如下:
本文通过简单的示例探索 Python 的 asyncio API,帮助开发者快速上手。
协程(co-routine)是一种可以在执行过程中暂停并稍后恢复的特殊函数,主要用于异步编程。以下是协程的工作原理:
使用 async def 定义协程函数。调用协程函数不会立即执行,而是返回一个协程对象,表示该协程的执行计划。协程的执行由事件循环管理。事件循环负责调度协程并在需要时切换任务。在 Python 中,asyncio 提供了事件循环的实现。
协程可以使用 await 暂停自身的执行,等待某个异步操作完成。暂停时,事件循环会切换到其他任务,避免阻塞。
协程的核心优势是高效地处理 I/O 密集型任务(如网络请求),因为它们不会阻塞线程。
前面的例子看起来像是为了在事件循环上调度两个协程并获取结果而写的很多样板代码。asyncio 库提供了一个包装方法 asyncio.run,它正是用来做这件事的。我们可以将程序改写如下:
import asyncio # 导入 asyncio 模块,提供异步编程支持# 定义一个协程函数# 协程是可以在执行过程中暂停并稍后恢复的特殊函数async def add(x: int, y: int): return x + y # 简单的加法运算,返回两个数的和# 创建一个调度协程的函数# 这个函数本身也是一个协程,可以使用 await 等待其他协程完成async def get_results: # 使用 await 关键字等待 add 协程完成并获取结果 # await 会暂停当前协程的执行,直到被等待的协程返回结果 result1 = await add(3, 4) result2 = await add(5, 5) print(result1, result2) # 打印结果:7 10# 使用 asyncio.run 运行协程# 这是 Python 3.7+ 推荐的运行协程的方式,它会自动创建事件循环# 运行协程,然后关闭事件循环,比手动管理事件循环更简洁安全asyncio.run(get_results)在这个简短的版本中,asyncio 在底层创建了一个新的事件循环( asyncio.run),并用它来运行协程 get_results。在这种情况下,我们甚至不需要显式调用事件循环的 stop 方法,asyncio.run 会自动处理。
Python 的 asyncio 提供了两种基本构造,用于在事件循环上运行特殊函数:
协程(co-routine)asyncio 任务协程是使用 async def 语法创建的,如我们之前的代码示例所示。创建 asyncio 任务有两种方法:
# 1loop = asyncio.get_event_looploop.create_task(cor) # cor = co-routine# 2import asyncioasyncio.create_task(cor)如果我们需要在程序中控制事件循环,那么选项 #1 更合适。相比之下,高级的 asyncio.run 方法更适合选项 #2。
很容易混淆任务对象和协程。协程类似于生成器对象,你不能直接使用生成器,而只能通过支持的关键字来使用。类似地,协程必须与 await 关键字一起使用。协程也可以被包装在一个任务中,以获得更细粒度的控制属性,例如取消任务、检查就绪状态等。我们将在接下来的示例中看到 asyncio 任务的使用。
事件循环会同步地执行已调度的协程,但它通过跳过协程的阻塞期来为下一个协程工作,从而实现并发,这一切都只用了一个线程。
让我们来看一个例子,假设我们有两个加法函数,一个慢,一个快。我们可以创建另一个协程,让这两个函数并发运行,输入不同。我们可以将这些函数作为基于协程的任务调度到事件循环上。为了模拟阻塞现象,我们可以在这些函数中使用 asyncio.sleep 函数。因此,修改之前的程序以添加这两个函数如下所示:
import asyncio # 导入 asyncio 模块,提供异步编程支持# 一个快速的协程函数# 模拟需要 3 秒完成的操作async def add_fast(x, y): print("starting fast add") # 打印开始执行的信息 await asyncio.sleep(3) # 模拟网络延迟,暂停协程 3 秒但不阻塞事件循环 print("result ready for fast add") # 打印操作完成的信息 return x + y # 返回计算结果# 一个慢速的协程函数# 模拟需要 5 秒完成的操作async def add_slow(x, y): print("starting slow add") # 打印开始执行的信息 await asyncio.sleep(5) # 模拟网络延迟,暂停协程 5 秒但不阻塞事件循环 print("result ready for slow add") # 打印操作完成的信息 return x + y # 返回计算结果# 创建一个调度协程的函数# 这个函数负责创建任务并等待它们的结果async def get_results: # 创建任务并立即开始执行 # 任务是协程的高级封装,允许并发执行 task1 = asyncio.create_task(add_slow(3, 4)) # 创建慢速加法任务 task2 = asyncio.create_task(add_fast(5, 5)) # 创建快速加法任务 # 等待两个任务完成并打印结果 # 注意:尽管 add_slow 需要 5 秒而 add_fast 只需要 3 秒, # 但由于它们是并发执行的,总共只需要约 5 秒(取最慢的任务时间) print(await task1, await task2) # 打印 7 10# 使用 asyncio.run 运行主协程# 这会自动创建事件循环、运行协程,然后关闭事件循环asyncio.run(get_results)我们在函数中添加了一些打印语句以便调试生命周期。与直接在协程上使用 await 不同,我们创建了任务。如果运行这个程序,我们会看到如下输出,恰好在 5 秒后出现:
正如我们之前提到的,事件循环会按照我们在 create_task 创建任务的方式同步地执行协程。但是,一旦循环看到一个阻塞等待,它就会执行下一个协程。下面的时间线图表试图捕捉这一现象。
对于 I/O 密集型操作,这就是异步协程如何节省操作时间(改进延迟)与顺序程序相比的方式。与顺序程序需要的 8 秒相比,这个并发程序只需要 5 秒就可以得到两个结果。
到目前为止,我们都是手动在代码中调度协程的。如果需要在运行时动态创建协程,最重要的构造是 asyncio.gather 方法。
假设我们有若干个整数元组,需要分别对它们进行加法运算,并在操作完成后返回结果。asyncio.gather 是实现这一目标的默认方法。
问题陈述:我们有一些整数元组,需要分别对它们进行加法运算,并在操作完成后返回结果。
import asyncio # 导入 asyncio 模块,提供异步编程支持# 定义一个协程函数# 协程是可以在执行过程中暂停并稍后恢复的特殊函数async def add(x: int, y: int): return x + y # 简单的加法运算,返回两个数的和# 创建一个调度协程的函数# 这个函数负责创建多个协程并等待它们的结果async def get_results: inputs = [(2,3), (4,5), (5,5), (7,2)] # 定义一组输入数据 # 创建一个协程列表 # 使用列表推导式为每对输入创建一个 add 协程 cors = [add(x,y) for x,y in inputs] # 使用 asyncio.gather 收集所有协程 # gather 会并发运行所有传入的协程,并返回它们的结果列表 # *cors 是解包语法,将列表中的所有协程作为单独的参数传递给 gather results = asyncio.gather(*cors) print(await results) # 等待所有协程完成并打印结果列表:[5, 9, 10, 9]# 使用 asyncio.run 运行主协程# 这会自动创建事件循环、运行协程,然后关闭事件循环asyncio.run(get_results)这个例子展示了 asyncio.gather 方法的用法,用于动态调度多个协程的执行。一旦所有协程的结果都成功收集,await 调用就会返回值并打印它们。
注意:*cors(星号语法)会解包一个列表,并将其作为关键字参数传递给 asyncio.gather。你不能直接将协程列表传递给该方法。
类比:想象一下,你走进一家餐厅,点了四样东西。服务员会准备好所有四样东西,然后把它们送到订单桌上。这就是 asyncio.gather 方法的工作方式。
你也可以修改上述程序,创建任务而不是协程,其余部分保持不变。
import asyncio # 导入 asyncio 模块,提供异步编程支持# 定义一个协程函数# 协程是可以在执行过程中暂停并稍后恢复的特殊函数async def add(x: int, y: int): return x + y # 简单的加法运算,返回两个数的和# 创建一个调度协程的函数# 这个函数负责在事件循环上调度任务并打印结果async def get_results: inputs = [(2,3), (4,5), (5,5), (7,2)] # 定义一组输入数据 # 创建任务列表 # 使用列表推导式为每对输入创建一个任务 # asyncio.create_task 会立即将协程封装为任务并开始执行 tasks = [asyncio.create_task(add(x,y)) for x,y in inputs] # 使用 asyncio.gather 等待所有任务完成 # gather 会并发运行所有传入的任务,并返回它们的结果列表 # *tasks 是解包语法,将列表中的所有任务作为单独的参数传递给 gather results = asyncio.gather(*tasks) print(await results) # 等待所有任务完成并打印结果列表:[5, 9, 10, 9]# 使用 asyncio.run 运行主协程# 这会自动创建事件循环、运行协程,然后关闭事件循环asyncio.run(get_results)有时候,你可能不想让服务员一次性把所有东西都端上来,而是希望在它们准备好时,一个接一个地享用。这可以通过 asyncio.as_completed 方法实现。与一次性收集所有并发结果不同,我们现在可以立即消费最早完成的协程结果。
import asyncio # 导入 asyncio 模块,提供异步编程支持# 定义一个协程函数# 协程是可以在执行过程中暂停并稍后恢复的特殊函数async def add(x: int, y: int): return x + y # 简单的加法运算,返回两个数的和# 创建一个调度协程的函数# 这个函数负责调度协程并按完成顺序打印结果async def get_results: inputs = [(2,3), (4,5), (5,5), (7,2)] # 定义一组输入数据 # 创建一个协程列表 # 使用列表推导式为每对输入创建一个 add 协程 cors = [add(x,y) for x,y in inputs] # 使用 asyncio.as_completed 按照协程完成的顺序处理结果 # 与 gather 不同,as_completed 返回一个迭代器,按照协程完成的顺序产生结果 # 这样可以在任何协程完成时立即处理其结果,而不必等待所有协程完成 # 注意:输出顺序是不确定的,取决于哪个协程先完成 for cor in asyncio.as_completed(cors): print(await cor) # 等待每个协程完成并立即打印其结果# 使用 asyncio.run 运行主协程# 这会自动创建事件循环、运行协程,然后关闭事件循环asyncio.run(get_results)asyncio.as_completed 方法接受一个协程列表,与 asyncio.gather 方法的关键字参数不同。asyncio.as_completed返回一个可迭代的协程,可以与await关键字一起使用。如果你愿意,可以在for 循环中立即消费结果。
注意:上述程序也可以使用任务来运行。可以尝试将其作为练习。
有时候,我们需要调度一个协程,但只等待一定的时间,然后停止执行。这时,asyncio.wait_for 方法非常有用。
asyncio.wait_for 方法接受一个协程或任务,并设置超时时间。如果在超时时间内结果尚未准备好,它会抛出一个异常(这个异常来自 asyncio 模块的 TimeoutError)。假设我们有一个协程,用于加法计算,它最多需要五秒来执行并返回结果。如果我们无法忍受这种延迟,可以指定自己的超时阈值。我们可以通过 asyncio.sleep 方法添加模拟阻塞,并使用 Python 的 random 模块引入随机延迟。
import asyncio # 导入 asyncio 模块,提供异步编程支持import random # 导入 random 模块,用于生成随机数# 定义一个协程函数# 协程是可以在执行过程中暂停并稍后恢复的特殊函数async def add(x: int, y: int): # 函数工作时间在 1 秒到 5 秒之间随机 # 使用 asyncio.sleep 模拟耗时操作,但不会阻塞事件循环 await asyncio.sleep(random.randrange(1, 5)) return x + y # 返回两个数的和# 创建一个调度协程的函数# 这个函数负责调度协程并处理超时情况async def get_results: result = None try: # 等待协程执行,但最多等待 3 秒 # asyncio.wait_for 会在超时时抛出 TimeoutError 异常 # 这是一种强制超时机制,如果操作超时会取消该协程 result = await asyncio.wait_for(add(3, 4), timeout=3) except asyncio.exceptions.TimeoutError: # 捕获超时异常并提供后备值 # 当 add 协程超过 3 秒还未完成时,会进入这个分支 result = "fallback payload" # 设置后备结果 print(result) # 打印结果(成功执行的和值或后备值)# 使用 asyncio.run 运行主协程# 这会自动创建事件循环、运行协程,然后关闭事件循环asyncio.run(get_results)从上面的代码可以看出,我们为协程设置了一个 3 秒的截止时间。如果协程在执行过程中超过了这个截止时间,就会抛出一个 TimeoutError 异常。通过这种方式,我们可以对结果做出决策。
得益于这些高级构造,Python 中的异步编程比你想象的要简单得多。只要掌握几个关键的 asyncio 模块方法,就可以编写并发程序了。
来源:架构笔记一点号