摘要:import asyncioasync def main:print(await func)async def func:# Do time intensive stuff...return "Hello, world!"if __name__ == "__m
在 Python 3.5+ 发布之前,asyncio 模块使用生成器来模仿异步调用,因此其语法与当前的 Python 3.5 版本不同。
Python 3.x Version ≥ 3.5
Python 3.5 引入了 async 和 await 关键字。请注意,await func 调用周围没有括号。
import asyncioasync def main:print(await func)async def func:# Do time intensive stuff...return "Hello, world!"if __name__ == "__main__":loop = asyncio.get_event_looploop.run_until_complete(main)Python 3.x Version ≥ 3.3 Version
在 Python 3.5 之前,@asyncio.coroutine装饰器用于定义一个 coroutine。yield from 表达式用于生成器委托。注意 yield from func 周围的括号。
import asyncio@asyncio.coroutinedef main:print((yield from func))@asyncio.coroutinedef func:# Do time intensive stuff..return "Hello, world!"if __name__ == "__main__":loop = asyncio.get_event_looploop.run_until_complete(main)下面的示例展示了如何异步运行两个函数:
import asyncioasync def cor1:print("cor1 start")for i in range(10):await asyncio.sleep(1.5)print("cor1", i)async def cor2:print("cor2 start")for i in range(15):await asyncio.sleep(1)print("cor2", i)loop = asyncio.get_event_loopcors = asyncio.wait([cor1, cor2])loop.run_until_complete(cors)Note: Uses the Python 3.5+ async/await syntax
asyncio 支持使用 concurrent.futures 中的 Executor 对象来异步调度任务。事件循环有一个函数 run_in_executor,它接收一个 Executor 对象、一个可调用程序和可调用程序的参数。
为执行器调度任务
import asynciofrom concurrent.futures import ThreadPoolExecutordef func(a, b):# Do time intensive stuff...return a + basync def main(loop):executor = ThreadPoolExecutorresult = await loop.run_in_executor(executor, func, "Hello,", " world!")print(result)if __name__ == "__main__":loop = asyncio.get_event_looploop.run_until_complete(main(loop))每个事件循环也有一个 "default" 执行器槽,可以分配给一个 "执行器"。要分配 "执行器" 并从循环中调度任务,需要使用 set_default_executor 方法。
import asynciofrom concurrent.futures import ThreadPoolExecutordef func(a, b):# Do time intensive stuff...return a + basync def main(loop):# NOTE: Using `None` as the first parameter designates the `default` Executor.result = await loop.run_in_executor(None, func, "Hello,", " world!")print(result)if __name__ == "__main__":loop = asyncio.get_event_looploop.set_default_executor(ThreadPoolExecutor)loop.run_until_complete(main(loop))在concurrent.futures中,有两种主要的 Executor 类型:ThreadPoolExecutor 和 ProcessPoolExecutor。ThreadPoolExecutor 包含一个线程池,可以通过构造函数手动设置为特定数量的线程,也可以默认为机器内核数乘以5。ThreadPoolExecutor 使用线程池执行分配给它的任务,通常更擅长 CPU 绑定操作,而不是 I/O 绑定操作。相比之下,ProcessPoolExecutor 会为分配给它的每个任务生成一个新进程。ProcessPoolExecutor 只能接受可剔除的任务和参数。最常见的不可剔除的任务是对象的方法。如果必须在 Executor中将对象的方法安排为任务,则必须使用 ThreadPoolExecutor。
uvloop 是基于 libuv(由 nodejs 使用)的 asyncio.AbstractEventLoop 的实现。它兼容 99% 的 asyncio 功能,速度比传统的 asyncio.EventLoop 快得多。uvloop 目前还未在 Windows 上提供,请使用 pip install uvloop 安装。
import asyncioimport uvloopif __name__ == "__main__":asyncio.set_event_loop(uvloop.new_event_loop)# Do your stuff here ...还可以通过将EventLoopPolicy设置为uvloop中的事件循环工厂来更改事件循环工厂。
import asyncioimport uvloopif __name__ == "__main__":asyncio.set_event_loop_policy(uvloop.EventLoopPolicy)loop = asyncio.new_event_loop概念
使用事件来同步调度多个例行程序。
简单地说,事件就像跑步比赛中的发令枪:它让选手们冲出起跑线。
示例
import asyncioimport functools# 事件触发功能def trigger(event):print('EVENT SET')event.set # 唤醒等待的程序# 事件消费async def consumer_a(event):consumer_name = 'Consumer A'print('{} waiting'.format(consumer_name))await event.waitprint('{} triggered'.format(consumer_name))async def consumer_b(event):consumer_name = 'Consumer B'print('{} waiting'.format(consumer_name))await event.waitprint('{} triggered'.format(consumer_name))# eventevent = asyncio.Event# wrap coroutines in one futuremain_future = asyncio.wait([consumer_a(event), consumer_b(event)])# event loopevent_loop = asyncio.get_event_loopevent_loop.call_later(0.1, functools.partial(trigger, event)) # trigger event in 0.1 sec# complete main_futuredone, pending = event_loop.run_until_complete(main_future)Output:
Consumer B waitingConsumer A waitingEVENT SETConsumer B triggeredConsumer A triggered在这里,我们使用 asyncio 制作一个简单的 echo websocket。我们定义了用于连接服务器和发送/接收消息的例行程序。websocket 的通信在一个 main 例程中运行,该例程由一个事件循环运行。本示例修改自 stackoverflow上的一篇文章。
import asyncioimport aiohttpsession = aiohttp.ClientSession # handles the context managerclass EchoWebsocket:async def connect(self):self.websocket = await session.ws_connect("wss://echo.websocket.org")async def send(self, message):self.websocket.send_str(message)async def receive(self):result = (await self.websocket.receive)return result.dataasync def main:echo = EchoWebsocketawait echo.connectawait echo.send("Hello World!")print(await echo.receive) # "Hello World!"if __name__ == '__main__':# The main looploop = asyncio.get_event_looploop.run_until_complete(main)关于 asnycio,最常见的误解可能是它可以让你并行运行任何任务--避开 GIL(全局解释器锁),从而并行(在独立线程上)执行阻塞作业! asyncio(以及为与 asyncio 协作而构建的库)建立在 ”例程"(coroutines)的基础上:这些函数(以协作方式)将控制流交还给调用函数。time.sleep 是一个阻塞函数的例子。程序的执行流程将在此停止,只有在 time.sleep 结束后才会返回。
另一方面,asyncio.aiohttp 在构建时就考虑到了 asyncio。它的例行程序将并发运行。
如果你有长期运行的、与 CPU 相关的任务,你希望并行运行 asyncio,那么 asyncio 并不适合你。为此,你需要 threads或 multiprocessing。如果运行的是 IO 绑定任务,则可以使用 asyncio 并发运行它们。来源:山岚