asyncio 进阶:事件循环、Task/Future 与协程调度内幕
文章目录
大多数人用 asyncio 停留在
async def+await——能用,但出了问题不知道怎么排查。这篇文章从事件循环的底层机制出发,拆解 Task、Future 和协程调度的完整内幕。
问题:await 到底在等什么
先看一段代码:
import asyncio
async def fetch_data(url: str) -> str:
print(f"Start fetching {url}")
await asyncio.sleep(1) # ① await 在这里挂起
print(f"Done fetching {url}")
return f"data from {url}"
async def main():
task1 = asyncio.create_task(fetch_data("url1")) # ② Task 被创建
task2 = asyncio.create_task(fetch_data("url2"))
result1 = await task1 # ③ await Task
result2 = await task2
print(result1, result2)
asyncio.run(main())
三个关键问题:
- ①
await asyncio.sleep(1)时,CPU 在做什么? —— 不是空转,是把控制权还给事件循环 - ②
create_task做了什么? —— 创建了一个 Task 对象并注册到事件循环 - ③
await task1时,为什么可以同时等 task2? —— Task 内部用 Future 管理状态
这三个问题的答案构成了 asyncio 的内部机制。
协程的本质:可以暂停和恢复的函数
普通函数 vs 协程函数
# 普通函数:调用即执行,执行完返回
def normal():
return 42
result = normal() # 立即得到 42
# 协程函数:调用返回一个协程对象,不执行
async def coro():
return 42
obj = coro() # 得到一个 coroutine 对象,NOT 42
# result = await obj # 用 await 驱动执行
调用 async def 函数只是创建了一个协程对象,不执行任何代码。协程对象需要一个"驱动器"来推进——这个驱动器就是事件循环。
import asyncio
async def greet():
print("Hello")
await asyncio.sleep(0)
print("World")
# 方式一:asyncio.run() —— 隐式创建事件循环并运行
asyncio.run(greet())
# 方式二:手动操作事件循环 —— 展示底层机制
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(greet()) # 驱动协程直到完成
finally:
loop.close()
协程的两种状态
coro = greet()
print(coro) # <coroutine object greet at 0x...>
# 用 send(None) 手动驱动协程——底层原理
try:
coro.send(None) # 执行到第一个 await,打印 "Hello"
except StopIteration:
pass # 协程执行完毕
协程对象的核心状态:
- CREATED:协程被创建,未开始执行
- RUNNING:正在执行
- SUSPENDED:在
await处挂起,等待被恢复 - CLOSED:执行完毕或抛出异常
事件循环:asyncio 的心脏
事件循环在做什么
事件循环的本质是一个永不停止的 while 循环,不断检查"有没有可以执行的任务":
事件循环的核心数据结构:
# 简化版事件循环的内部结构(不是真实 CPython 代码)
class SimplifiedEventLoop:
def __init__(self):
self._ready: list[asyncio.Handle] = [] # 就绪回调队列
self._scheduled: list[asyncio.TimerHandle] = [] # 定时器队列
self._selector = selectors.DefaultSelector() # I/O 多路复用器
def call_soon(self, callback, *args):
"""立即调度回调——加入就绪队列"""
handle = asyncio.Handle(callback, args, self)
self._ready.append(handle)
def call_later(self, delay, callback, *args):
"""延迟调度回调——加入定时器队列"""
timer = asyncio.TimerHandle(delay, callback, args, self)
self._scheduled.append(timer)
def run_forever(self):
"""主循环"""
while True:
# 1. 检查定时器,把到期的移入就绪队列
now = time.monotonic()
for timer in self._scheduled:
if timer.when() <= now:
self._ready.append(timer)
self._scheduled.remove(timer)
# 2. 执行所有就绪的回调
while self._ready:
handle = self._ready.pop(0)
handle._run()
# 3. 如果无就绪任务,阻塞等待 I/O 或最近的定时器
if not self._ready and not self._scheduled:
break
timeout = self._get_next_timeout()
events = self._selector.select(timeout)
for key, mask in events:
callback = key.data
self._ready.append(callback)
asyncio.sleep(1) 的内部实现
# asyncio.sleep 的简化实现
import asyncio
async def sleep(delay: float, result=None):
"""协程版 sleep——不是 time.sleep,而是向事件循环注册定时器"""
future = asyncio.get_event_loop().create_future()
# 注册一个定时器:delay 秒后将 future 标记为完成
loop = asyncio.get_event_loop()
loop.call_later(delay, future.set_result, result)
# await 挂起当前协程,控制权还给事件循环
return await future
关键区别:asyncio.sleep(1) 不是阻塞当前线程 1 秒,而是向事件循环注册一个 1 秒后的回调,然后立即把控制权交还给事件循环。事件循环在 1 秒内可以执行其他任务。
对比:
import time
import asyncio
# time.sleep:阻塞整个线程
async def blocking():
time.sleep(1) # ❌ 阻塞事件循环,其他任务无法执行
# asyncio.sleep:挂起当前协程,释放事件循环
async def non_blocking():
await asyncio.sleep(1) # ✅ 事件循环可以执行其他任务
Task:被事件循环管理的协程
Task = Coroutine + Future
Task 是 asyncio 的核心抽象——它同时是协程和 Future:
Task 的生命周期
import asyncio
async def main():
# 创建 Task——协程开始执行(在事件循环的下一个 tick)
task = asyncio.create_task(asyncio.sleep(2), name="my-sleeper")
print(task.get_name()) # my-sleeper
print(task.done()) # False——还在运行
print(task.cancelled()) # False
await asyncio.sleep(1)
print(task.done()) # False——2 秒还没到
await task # 等待 Task 完成
print(task.done()) # True
print(task.result()) # None(asyncio.sleep 返回 None)
print(task.exception()) # None(没有异常)
asyncio.run(main())
create_task vs 直接 await
async def fetch(id: int) -> str:
await asyncio.sleep(1)
return f"result-{id}"
async def sequential():
"""顺序执行:每个 fetch 完成后才开始下一个——总耗时 3 秒"""
r1 = await fetch(1) # 等 1 秒
r2 = await fetch(2) # 再等 1 秒
r3 = await fetch(3) # 再等 1 秒
return r1, r2, r3
async def concurrent():
"""并发执行:三个 fetch 同时开始——总耗时约 1 秒"""
t1 = asyncio.create_task(fetch(1)) # 创建 Task,立即开始
t2 = asyncio.create_task(fetch(2))
t3 = asyncio.create_task(fetch(3))
# 三个 sleep 同时在进行
r1 = await t1
r2 = await t2
r3 = await t3
return r1, r2, r3
create_task 的核心作用:告诉事件循环"这个协程可以与其他任务交替执行"。没有 create_task,协程只能顺序执行。
asyncio.gather:并发等待的语法糖
async def concurrent_gather():
"""gather 等价于 create_task + 逐个 await"""
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3),
)
return results
gather 内部为每个协程创建 Task,然后等待所有 Task 完成。如果任意 Task 抛出异常,gather 默认会传播该异常。
async def gather_with_exceptions():
"""gather 的异常处理"""
results = await asyncio.gather(
fetch(1),
failing_task(), # 这个会抛异常
fetch(3),
return_exceptions=True # 捕获异常而不传播
)
# results = ["result-1", ValueError("boom"), "result-3"]
for r in results:
if isinstance(r, Exception):
print(f"Task failed: {r}")
else:
print(f"Task result: {r}")
Future:异步世界的占位符
Future 是什么
Future 是一个存根值占位符——它代表"一个尚未完成的操作的结果"。它的核心是一个简单的状态机:
# Future 状态机(简化版)
class FutureState:
PENDING = "PENDING" # 初始状态,等待结果
CANCELLED = "CANCELLED" # 被取消
FINISHED = "FINISHED" # 结果就绪或异常
class Future:
def __init__(self, loop):
self._state = FutureState.PENDING
self._result = None
self._exception = None
self._callbacks = [] # 完成时的回调列表
self._loop = loop
def set_result(self, result):
"""标记为完成并携带结果"""
if self._state != FutureState.PENDING:
raise asyncio.InvalidStateError("Future already resolved")
self._result = result
self._state = FutureState.FINISHED
self._schedule_callbacks()
def set_exception(self, exception):
"""标记为完成并携带异常"""
if self._state != FutureState.PENDING:
raise asyncio.InvalidStateError("Future already resolved")
self._exception = exception
self._state = FutureState.FINISHED
self._schedule_callbacks()
def add_done_callback(self, callback):
"""注册完成回调"""
if self._state == FutureState.FINISHED:
self._loop.call_soon(callback, self)
else:
self._callbacks.append(callback)
def _schedule_callbacks(self):
"""完成后调度所有回调"""
for cb in self._callbacks:
self._loop.call_soon(cb, self)
self._callbacks.clear()
def __await__(self):
"""支持 await 语法——如果未完成,挂起协程"""
if self._state == FutureState.FINISHED:
if self._exception:
raise self._exception
return self._result
yield self # 告诉事件循环"我还没好"
Future 的三种使用方式
import asyncio
async def producer(future: asyncio.Future):
"""生产者:设置 Future 的结果"""
await asyncio.sleep(1)
future.set_result("data is ready!")
async def consumer(future: asyncio.Future):
"""消费者:等待 Future 的结果"""
print("Waiting for data...")
result = await future
print(f"Got: {result}")
async def main():
# 创建 Future——此时没有结果
future = asyncio.get_event_loop().create_future()
# 生产者和消费者并发运行
await asyncio.gather(
producer(future),
consumer(future),
)
asyncio.run(main())
# 输出:
# Waiting for data...
# (1 秒后)
# Got: data is ready!
Task vs Future
| 维度 | Future | Task |
|---|---|---|
| 本质 | 存根值占位符 | 被事件循环调度的协程 |
| 谁驱动 | 手动 set_result() |
事件循环自动 send() |
| 继承关系 | 基类 | 继承自 Future |
| 创建方式 | loop.create_future() |
asyncio.create_task(coro) |
await 行为 |
等待结果被设置 | 等待协程执行完毕 |
| 可取消 | 不适用(无协程) | task.cancel() |
Task 实际上是一个被事件循环自动驱动的 Future——它把协程的执行过程映射到 Future 的状态机:
# Task 简化实现
class Task(Future):
def __init__(self, coro, loop):
super().__init__(loop)
self._coro = coro
self._loop = loop
# 立即调度执行——这就是 create_task 能让协程开始的原因
loop.call_soon(self.__step)
def __step(self):
"""每步推进协程——事件循环调用的核心"""
try:
# 驱动协程到下一个 await
result = self._coro.send(None)
except StopIteration as e:
# 协程正常完成
self.set_result(e.value)
return
except Exception as e:
# 协程抛异常
self.set_exception(e)
return
# 协程在 await 处挂起——检查它在等什么
if isinstance(result, Future):
# 在等另一个 Future——注册回调:那个 Future 完成时继续
result.add_done_callback(self.__wakeup)
else:
# 不应该到这里——await 后面必须是 Future/可等待对象
self._loop.call_soon(self.__step)
def __wakeup(self, future):
"""被等待的 Future 完成后——恢复执行"""
try:
future.result() # 如果 future 是异常完成的,这里会抛出
except Exception:
# 异常会通过 __step 中的 send 传播回协程
pass
self.__step()
事件循环的调度策略
回调优先级与饥饿问题
import asyncio
async def demonstrate_scheduling():
"""演示 asyncio 的调度顺序"""
async def worker(name: str, count: int):
for i in range(count):
print(f" {name}-{i}")
await asyncio.sleep(0) # 让出控制权
# 创建三个高频率让出的 worker
t1 = asyncio.create_task(worker("A", 3))
t2 = asyncio.create_task(worker("B", 3))
# call_soon 注册的回调优先于 Task
loop = asyncio.get_event_loop()
loop.call_soon(lambda: print(" [call_soon] high priority!"))
await asyncio.gather(t1, t2)
asyncio.run(demonstrate_scheduling())
输出示例:
A-0
[call_soon] high priority!
B-0
A-1
B-1
A-2
B-2
call_soon 的回调在当前迭代的 Task 切换点立即执行,优先级高于下一次协程恢复。
await asyncio.sleep(0) 的调度效应
async def sleep_zero_demo():
"""sleep(0) 将当前协程移到就绪队列末尾——让其他任务先执行"""
async def tight_loop():
"""没有 sleep 的紧循环——会饿死其他任务"""
for _ in range(1000):
pass # 连续执行,不交出控制权
async def polite_loop():
"""有 sleep(0) 的循环——礼貌让出"""
for _ in range(1000):
await asyncio.sleep(0) # 每轮让出控制权
sleep(0) 的行为:将当前 Task 标记为就绪(而非挂起),然后事件循环在下一轮调度它——给它后面的 Task 一个执行机会。
工程实战:异步 HTTP 获取器
"""异步 HTTP 获取器——展示 asyncio 核心机制的实际应用"""
import asyncio
import time
from typing import Any
import httpx
class AsyncFetcher:
"""异步 HTTP 获取器——支持并发请求、超时、重试、限流"""
def __init__(
self,
max_concurrent: int = 10,
timeout: float = 30.0,
max_retries: int = 3,
):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.max_retries = max_retries
self._semaphore = asyncio.Semaphore(max_concurrent)
self._client: httpx.AsyncClient | None = None
async def __aenter__(self):
self._client = httpx.AsyncClient(timeout=self.timeout)
return self
async def __aexit__(self, *args):
if self._client:
await self._client.aclose()
async def fetch_one(self, url: str) -> dict[str, Any]:
"""获取单个 URL,支持重试"""
if not self._client:
raise RuntimeError("Use as context manager: async with AsyncFetcher() as f:")
for attempt in range(self.max_retries):
try:
async with self._semaphore: # 限流
response = await self._client.get(url)
response.raise_for_status()
return {
"url": url,
"status": response.status_code,
"body": response.text[:200],
"attempt": attempt + 1,
}
except httpx.HTTPStatusError as e:
if attempt == self.max_retries - 1:
return {"url": url, "error": str(e), "attempt": attempt + 1}
await asyncio.sleep(2 ** attempt) # 指数退避
except httpx.TimeoutException:
if attempt == self.max_retries - 1:
return {"url": url, "error": "timeout", "attempt": attempt + 1}
await asyncio.sleep(2 ** attempt)
async def fetch_many(
self, urls: list[str]
) -> list[dict[str, Any]]:
"""并发获取多个 URL"""
tasks = [self.fetch_one(url) for url in urls]
# 监控:报告进度
completed = 0
total = len(tasks)
pending = set(tasks)
results: list[dict[str, Any]] = []
while pending:
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
results.append(task.result())
completed += 1
print(f"Progress: {completed}/{total} URLs fetched")
return results
# ===== 使用示例 =====
async def main():
urls = [
f"https://httpbin.org/delay/{i % 3}" for i in range(20)
]
async with AsyncFetcher(max_concurrent=5, timeout=10.0) as fetcher:
start = time.perf_counter()
results = await fetcher.fetch_many(urls)
elapsed = time.perf_counter() - start
print(f"\nFetched {len(results)} URLs in {elapsed:.2f}s")
success = [r for r in results if "error" not in r]
failed = [r for r in results if "error" in r]
print(f" Success: {len(success)}")
print(f" Failed: {len(failed)}")
asyncio.run(main())
调试 asyncio 的五个武器
武器一:开启 debug 模式
# 方式一:环境变量
PYTHONASYNCIODEBUG=1 python app.py
# 方式二:代码中开启
asyncio.run(main(), debug=True)
debug 模式会检测:
- 协程未被 await:
RuntimeWarning: coroutine 'xxx' was never awaited - 执行时间过长的回调:
Executing <Handle ...> took 1.234 seconds - 在错误的线程中调用:检测跨线程使用事件循环
武器二:协程未被 await 的检测
import asyncio
async def forgotten():
return 42
async def main():
forgotten() # ❌ 协程创建了但没有 await
await asyncio.sleep(0.1)
asyncio.run(main(), debug=True)
# RuntimeWarning: coroutine 'forgotten' was never awaited
武器三:慢回调检测
import asyncio
import time
async def main():
async def slow_sync_code():
"""同步阻塞代码——事件循环被卡住"""
time.sleep(2) # ❌ 阻塞事件循环
task = asyncio.create_task(slow_sync_code())
await task
asyncio.run(main(), debug=True)
# Executing <Task ...> took 2.001 seconds
武器四:asyncio.all_tasks() 查看所有活跃 Task
import asyncio
async def hanging_task():
"""永远不完成的任务——排查死锁"""
try:
await asyncio.Event().wait() # 无限等待
except asyncio.CancelledError:
print("Task was cancelled")
async def main():
task = asyncio.create_task(hanging_task(), name="hanging")
# 检查当前所有活跃任务
all_tasks = asyncio.all_tasks()
for t in all_tasks:
print(f" Task: {t.get_name()}, done={t.done()}, cancelled={t.cancelled()}")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(main())
武器五:TaskGroup 的结构化并发
Python 3.11+ 的 TaskGroup 替代了裸 create_task——它保证所有子任务在退出上下文时已完成或取消:
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_data("url1"))
t2 = tg.create_task(fetch_data("url2"))
t3 = tg.create_task(fetch_data("url3"))
# 退出上下文时,自动等待所有子任务完成
# 如果任一子任务抛异常,其余会被取消
避坑指南
坑一:在协程中调用同步阻塞函数
# ❌ requests.get 是同步的,会阻塞事件循环
async def fetch():
import requests
return requests.get("https://httpbin.org/get")
# ✅ 使用异步 HTTP 库
async def fetch():
import httpx
async with httpx.AsyncClient() as client:
return await client.get("https://httpbin.org/get")
# ✅ 或将同步调用放到线程池
async def fetch():
return await asyncio.to_thread(
requests.get, "https://httpbin.org/get"
)
坑二:创建了 Task 但忘记了异常
async def main():
task = asyncio.create_task(failing_task())
await asyncio.sleep(1)
# 如果 failing_task 在这里抛异常,异常被吞没
# ✅ 确保获取 Task 的结果(包括异常)
try:
await task
except Exception as e:
print(f"Task failed: {e}")
坑三:在多线程中使用同一个事件循环
# ❌ asyncio 事件循环不是线程安全的
loop = asyncio.new_event_loop()
def run_in_thread():
# ❌ 不能在不同线程中使用同一个事件循环
loop.run_until_complete(some_coro())
import threading
threading.Thread(target=run_in_thread).start()
# ✅ 使用 run_coroutine_threadsafe
future = asyncio.run_coroutine_threadsafe(some_coro(), loop)
result = future.result()
事件循环的内部结构一览
这张图是 asyncio 运行时的完整快照:三个队列(就绪、定时器、I/O)统一在事件循环的主循环中轮转。每个协程的每一步执行(Task.__step())都在就绪队列中排队,await 只是把"等待条件满足时重新入队"的回调注册到对应的机制中。
小结
| 概念 | 本质 | 一句话定位 |
|---|---|---|
| 协程对象 | async def 创建的代码块 |
“可以被暂停和恢复的函数” |
| 事件循环 | 永不停止的 while 循环 | “协程的调度器——决定谁执行、谁等待” |
| Future | 存根值占位符 | “一个尚未完成的操作的结果容器” |
| Task | 被事件循环驱动的协程 + Future | “Future 的自动执行版本” |
await |
暂停当前协程,等待 Future 完成 | “把控制权还给事件循环的指令” |
create_task |
将协程注册到事件循环 | “告诉事件循环:这个可以和其他任务并发” |
理解这些概念后,async/await 不再是一个黑盒——它是一套精心设计的协作式多任务调度系统,所有魔法全部建立在"注册回调 + 轮转队列"的基础之上。
如果这篇文章对深入理解 asyncio 有帮助,点赞收藏让更多人看到!关注专栏,持续获取 Python 进阶干货。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)