事件循环

事件循环#

Python 的 asyncio 事件循环是异步编程的核心调度器,其本质是单线程内的高效任务协调机制。以下从核心概念、工作原理、系统差异、使用方法四个角度进行说明:

核心概念与作用

  1. 任务调度中枢
    事件循环(Event Loop)负责管理和调度所有异步任务(如协程、回调函数、I/O 操作等),通过轮询机制决定何时执行哪个任务。其核心作用包括:

    • 执行协程:按优先级推进协程的执行。

    • 处理 I/O 操作:监控网络、文件等非阻塞 I/O 事件。

    • 运行子进程:管理异步子进程的启动和结果获取。

  2. 单线程并发基础
    通过“协作式多任务”而非抢占式调度,事件循环在一个线程内实现高并发。协程主动让出控制权时,事件循环才会切换到其他任务,减少线程切换的开销。

工作原理

  1. 任务队列与调度

    • 所有待执行任务(如协程、回调)被加入队列。

    • 事件循环按优先级轮询队列,执行就绪任务。例如,当一个 I/O 操作完成时,关联的回调会被触发。

  2. I/O 多路复用

    • 使用 selectors 模块选择最优系统接口(如 Linux 的 epoll、BSD 的 kqueue)监控 I/O 事件,实现非阻塞读写。

    • Windows 系统默认采用 ProactorEventLoop(基于 IOCP),而 Unix 类系统使用 SelectorEventLoop

  3. 协程与 Future 管理

    • 协程被封装为 Task 对象,事件循环跟踪其状态,通过 await 挂起或恢复执行。

    • Future 对象表示异步操作结果,事件循环监控其完成状态并触发回调。

系统差异与优化

  1. 不同系统的事件循环类型

    • Unix 系统:默认使用 SelectorEventLoop,基于 epoll/kqueue 高效处理大量并发连接。

    • Windows 系统:默认使用 ProactorEventLoop,依赖 IOCP 实现真正的异步 I/O,性能优于 select

  2. 性能优化策略

    • 替换事件循环策略,例如在 Unix 系统使用 uvloop(基于 libuv)可大幅提升性能。

    • 避免在非主线程直接操作事件循环,需通过 asyncio.run() 或协程内调用 get_running_loop()

使用方法与最佳实践

  1. 创建与获取事件循环

    • 高层 API:优先使用 asyncio.run(main()) 自动创建和管理事件循环。

    • 底层 API:通过 loop = asyncio.new_event_loop() 手动创建,或 get_event_loop() 获取当前循环(需注意线程安全)。

  2. 运行与管理任务

    # 示例:启动事件循环并执行任务
    async def main():
        await asyncio.sleep(1)
        print("Done")
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    
  • 使用 run_until_complete() 执行单个任务,或 run_forever() 持续运行。

  1. 注意事项

    • 避免混用不同线程的事件循环,可能导致未定义行为。

    • 及时关闭不再使用的循环(loop.close()),防止资源泄漏。


import asyncio
import selectors

class MyPolicy(asyncio.DefaultEventLoopPolicy):
   def new_event_loop(self):
      selector = selectors.SelectSelector()
      return asyncio.SelectorEventLoop(selector)

asyncio.set_event_loop_policy(MyPolicy())

事件循环策略在整个进程内是单例的,所有的线程共享一个策略;事件循环在所在的线程内是单例的,一个线程内部只会有一个事件循环。所有线程对应的循环均位于策略的 _local 属性中,获取的时候根据线程 ID 区分。

  • 策略的 new_event_loop 方法:创建事件循环;

  • 策略的 set_event_loop 方法:设置事件循环;

  • 策略的 get_event_loop 方法:获取事件循环,会先检测策略的 _local 中是否有当前线程对应的事件循环,有则获取,没有则通过 new_event_loop 创建、set_event_loop 设置,然后返回;

但是 get_event_loopset_event_loopnew_event_loop 我们一般不会手动通过策略去调用,而是会通过 asyncio 去调用,比如 asyncio.get_event_loop。当然在 asyncio.get_event_loop 内部,也是先通过 get_event_loop_policy() 获取策略,然后调用策略的 get_event_loop 方法,获取线程对应的循环,两者本质是一样的,因为策略是单例的。

所以无论主线程还是子线程,毫无疑问都是可以创建事件循环的。只不过主线程既可以手动调用 new_event_loopset_event_loop 来创建,也可以调用 get_event_loop (当循环不存在时内部自动创建)。但对于子线程而言,只能采用第一种方式,也就是手动创建,如果直接调用 get_event_loop 是会报错的。 当循环不存在时,必须是主线程才会自动创建,而子线程不会。所以结果就是因为循环为空,导致程序报错。

小技巧

最佳实践 对于主线程,在外部我们会调用 get_event_loop,在协程内部我们会调用 get_running_loop;如果是子线程,那么在外部则需要 new_event_loop + set_event_loop 来实现。

import asyncio

# 创建事件循环
loop = asyncio.new_event_loop()
# 设置在策略的 _local 属性中
# 调用 asyncio.get_event_loop 时,会直接返回
# 因为循环存在,就不会再创建了
asyncio.set_event_loop(loop)

print(
    asyncio.get_event_loop() is loop is asyncio.get_event_loop_policy()._local._loop
)  # True

对于新创建的事件循环,还要附加到事件循环策略监视器中,以确保我们的事件循环可以监视在 UNIX 系统上新生成的子进程的终止状态。

import asyncio
from asyncio import get_event_loop_policy
import platform

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

if platform.system() != "Windows":
    watcher = asyncio.get_child_watcher()
    watcher.attach_loop(loop)