大多数人用 asyncio 停留在 async def + await——能用,但出了问题不知道怎么排查。这篇文章从事件循环的底层机制出发,拆解 Task、Future 和协程调度的完整内幕。


问题:await 到底在等什么

先看一段代码:

import asyncio

async def fetch_data(url: str) -> str:
    print(f"Start fetching {url}")
    await asyncio.sleep(1)        # ① await 在这里挂起
    print(f"Done fetching {url}")
    return f"data from {url}"

async def main():
    task1 = asyncio.create_task(fetch_data("url1"))  # ② Task 被创建
    task2 = asyncio.create_task(fetch_data("url2"))
    
    result1 = await task1  # ③ await Task
    result2 = await task2
    print(result1, result2)

asyncio.run(main())

三个关键问题:

  • await asyncio.sleep(1) 时,CPU 在做什么? —— 不是空转,是把控制权还给事件循环
  • create_task 做了什么? —— 创建了一个 Task 对象并注册到事件循环
  • await task1 时,为什么可以同时等 task2? —— Task 内部用 Future 管理状态

这三个问题的答案构成了 asyncio 的内部机制。


协程的本质:可以暂停和恢复的函数

普通函数 vs 协程函数

# 普通函数:调用即执行,执行完返回
def normal():
    return 42
result = normal()  # 立即得到 42

# 协程函数:调用返回一个协程对象,不执行
async def coro():
    return 42
obj = coro()       # 得到一个 coroutine 对象,NOT 42
# result = await obj  # 用 await 驱动执行

调用 async def 函数只是创建了一个协程对象,不执行任何代码。协程对象需要一个"驱动器"来推进——这个驱动器就是事件循环。

import asyncio

async def greet():
    print("Hello")
    await asyncio.sleep(0)
    print("World")

# 方式一:asyncio.run() —— 隐式创建事件循环并运行
asyncio.run(greet())

# 方式二:手动操作事件循环 —— 展示底层机制
loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(greet())  # 驱动协程直到完成
finally:
    loop.close()

协程的两种状态

coro = greet()
print(coro)  # <coroutine object greet at 0x...>

# 用 send(None) 手动驱动协程——底层原理
try:
    coro.send(None)  # 执行到第一个 await,打印 "Hello"
except StopIteration:
    pass  # 协程执行完毕

协程对象的核心状态:

  • CREATED:协程被创建,未开始执行
  • RUNNING:正在执行
  • SUSPENDED:在 await 处挂起,等待被恢复
  • CLOSED:执行完毕或抛出异常

事件循环:asyncio 的心脏

事件循环在做什么

事件循环的本质是一个永不停止的 while 循环,不断检查"有没有可以执行的任务":

完成

await 挂起

事件循环启动

有待执行的
回调/任务?

取出最早的就绪任务

执行该任务

任务执行到
await / 完成?

将任务标记为等待中
注册 I/O 或定时器

等待 I/O 事件
(select/epoll)

事件循环的核心数据结构:

# 简化版事件循环的内部结构(不是真实 CPython 代码)
class SimplifiedEventLoop:
    def __init__(self):
        self._ready: list[asyncio.Handle] = []       # 就绪回调队列
        self._scheduled: list[asyncio.TimerHandle] = []  # 定时器队列
        self._selector = selectors.DefaultSelector()   # I/O 多路复用器
    
    def call_soon(self, callback, *args):
        """立即调度回调——加入就绪队列"""
        handle = asyncio.Handle(callback, args, self)
        self._ready.append(handle)
    
    def call_later(self, delay, callback, *args):
        """延迟调度回调——加入定时器队列"""
        timer = asyncio.TimerHandle(delay, callback, args, self)
        self._scheduled.append(timer)
    
    def run_forever(self):
        """主循环"""
        while True:
            # 1. 检查定时器,把到期的移入就绪队列
            now = time.monotonic()
            for timer in self._scheduled:
                if timer.when() <= now:
                    self._ready.append(timer)
                    self._scheduled.remove(timer)
            
            # 2. 执行所有就绪的回调
            while self._ready:
                handle = self._ready.pop(0)
                handle._run()
            
            # 3. 如果无就绪任务,阻塞等待 I/O 或最近的定时器
            if not self._ready and not self._scheduled:
                break
            
            timeout = self._get_next_timeout()
            events = self._selector.select(timeout)
            for key, mask in events:
                callback = key.data
                self._ready.append(callback)

asyncio.sleep(1) 的内部实现

# asyncio.sleep 的简化实现
import asyncio

async def sleep(delay: float, result=None):
    """协程版 sleep——不是 time.sleep,而是向事件循环注册定时器"""
    future = asyncio.get_event_loop().create_future()
    
    # 注册一个定时器:delay 秒后将 future 标记为完成
    loop = asyncio.get_event_loop()
    loop.call_later(delay, future.set_result, result)
    
    # await 挂起当前协程,控制权还给事件循环
    return await future

关键区别:asyncio.sleep(1) 不是阻塞当前线程 1 秒,而是向事件循环注册一个 1 秒后的回调,然后立即把控制权交还给事件循环。事件循环在 1 秒内可以执行其他任务。

对比:

import time
import asyncio

# time.sleep:阻塞整个线程
async def blocking():
    time.sleep(1)  # ❌ 阻塞事件循环,其他任务无法执行

# asyncio.sleep:挂起当前协程,释放事件循环
async def non_blocking():
    await asyncio.sleep(1)  # ✅ 事件循环可以执行其他任务

Task:被事件循环管理的协程

Task = Coroutine + Future

Task 是 asyncio 的核心抽象——它同时是协程和 Future:

asyncio.create_task()

await task

内部持有

继承自 Future

被加入

协程对象
async def 创建的代码块

Task 对象
被事件循环调度的执行单元

Future 对象
存根值占位符

获取结果
或传播异常

协程引用
.send() 驱动执行

状态机
PENDING → CANCELLED → FINISHED

事件循环
在就绪时调度执行

Task 的生命周期

import asyncio

async def main():
    # 创建 Task——协程开始执行(在事件循环的下一个 tick)
    task = asyncio.create_task(asyncio.sleep(2), name="my-sleeper")
    
    print(task.get_name())      # my-sleeper
    print(task.done())          # False——还在运行
    print(task.cancelled())     # False
    
    await asyncio.sleep(1)
    print(task.done())          # False——2 秒还没到
    
    await task                  # 等待 Task 完成
    print(task.done())          # True
    print(task.result())        # None(asyncio.sleep 返回 None)
    print(task.exception())     # None(没有异常)

asyncio.run(main())

create_task vs 直接 await

async def fetch(id: int) -> str:
    await asyncio.sleep(1)
    return f"result-{id}"

async def sequential():
    """顺序执行:每个 fetch 完成后才开始下一个——总耗时 3 秒"""
    r1 = await fetch(1)  # 等 1 秒
    r2 = await fetch(2)  # 再等 1 秒
    r3 = await fetch(3)  # 再等 1 秒
    return r1, r2, r3

async def concurrent():
    """并发执行:三个 fetch 同时开始——总耗时约 1 秒"""
    t1 = asyncio.create_task(fetch(1))  # 创建 Task,立即开始
    t2 = asyncio.create_task(fetch(2))
    t3 = asyncio.create_task(fetch(3))
    
    # 三个 sleep 同时在进行
    r1 = await t1
    r2 = await t2
    r3 = await t3
    return r1, r2, r3

create_task 的核心作用:告诉事件循环"这个协程可以与其他任务交替执行"。没有 create_task,协程只能顺序执行。

asyncio.gather:并发等待的语法糖

async def concurrent_gather():
    """gather 等价于 create_task + 逐个 await"""
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3),
    )
    return results

gather 内部为每个协程创建 Task,然后等待所有 Task 完成。如果任意 Task 抛出异常,gather 默认会传播该异常。

async def gather_with_exceptions():
    """gather 的异常处理"""
    results = await asyncio.gather(
        fetch(1),
        failing_task(),        # 这个会抛异常
        fetch(3),
        return_exceptions=True  # 捕获异常而不传播
    )
    # results = ["result-1", ValueError("boom"), "result-3"]
    for r in results:
        if isinstance(r, Exception):
            print(f"Task failed: {r}")
        else:
            print(f"Task result: {r}")

Future:异步世界的占位符

Future 是什么

Future 是一个存根值占位符——它代表"一个尚未完成的操作的结果"。它的核心是一个简单的状态机:

# Future 状态机(简化版)
class FutureState:
    PENDING = "PENDING"       # 初始状态,等待结果
    CANCELLED = "CANCELLED"   # 被取消
    FINISHED = "FINISHED"     # 结果就绪或异常


class Future:
    def __init__(self, loop):
        self._state = FutureState.PENDING
        self._result = None
        self._exception = None
        self._callbacks = []   # 完成时的回调列表
        self._loop = loop
    
    def set_result(self, result):
        """标记为完成并携带结果"""
        if self._state != FutureState.PENDING:
            raise asyncio.InvalidStateError("Future already resolved")
        self._result = result
        self._state = FutureState.FINISHED
        self._schedule_callbacks()
    
    def set_exception(self, exception):
        """标记为完成并携带异常"""
        if self._state != FutureState.PENDING:
            raise asyncio.InvalidStateError("Future already resolved")
        self._exception = exception
        self._state = FutureState.FINISHED
        self._schedule_callbacks()
    
    def add_done_callback(self, callback):
        """注册完成回调"""
        if self._state == FutureState.FINISHED:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)
    
    def _schedule_callbacks(self):
        """完成后调度所有回调"""
        for cb in self._callbacks:
            self._loop.call_soon(cb, self)
        self._callbacks.clear()
    
    def __await__(self):
        """支持 await 语法——如果未完成,挂起协程"""
        if self._state == FutureState.FINISHED:
            if self._exception:
                raise self._exception
            return self._result
        yield self  # 告诉事件循环"我还没好"

Future 的三种使用方式

import asyncio

async def producer(future: asyncio.Future):
    """生产者:设置 Future 的结果"""
    await asyncio.sleep(1)
    future.set_result("data is ready!")

async def consumer(future: asyncio.Future):
    """消费者:等待 Future 的结果"""
    print("Waiting for data...")
    result = await future
    print(f"Got: {result}")

async def main():
    # 创建 Future——此时没有结果
    future = asyncio.get_event_loop().create_future()
    
    # 生产者和消费者并发运行
    await asyncio.gather(
        producer(future),
        consumer(future),
    )

asyncio.run(main())
# 输出:
# Waiting for data...
# (1 秒后)
# Got: data is ready!

Task vs Future

维度 Future Task
本质 存根值占位符 被事件循环调度的协程
谁驱动 手动 set_result() 事件循环自动 send()
继承关系 基类 继承自 Future
创建方式 loop.create_future() asyncio.create_task(coro)
await 行为 等待结果被设置 等待协程执行完毕
可取消 不适用(无协程) task.cancel()

Task 实际上是一个被事件循环自动驱动的 Future——它把协程的执行过程映射到 Future 的状态机:

# Task 简化实现
class Task(Future):
    def __init__(self, coro, loop):
        super().__init__(loop)
        self._coro = coro
        self._loop = loop
        # 立即调度执行——这就是 create_task 能让协程开始的原因
        loop.call_soon(self.__step)
    
    def __step(self):
        """每步推进协程——事件循环调用的核心"""
        try:
            # 驱动协程到下一个 await
            result = self._coro.send(None)
        except StopIteration as e:
            # 协程正常完成
            self.set_result(e.value)
            return
        except Exception as e:
            # 协程抛异常
            self.set_exception(e)
            return
        
        # 协程在 await 处挂起——检查它在等什么
        if isinstance(result, Future):
            # 在等另一个 Future——注册回调:那个 Future 完成时继续
            result.add_done_callback(self.__wakeup)
        else:
            # 不应该到这里——await 后面必须是 Future/可等待对象
            self._loop.call_soon(self.__step)
    
    def __wakeup(self, future):
        """被等待的 Future 完成后——恢复执行"""
        try:
            future.result()  # 如果 future 是异常完成的,这里会抛出
        except Exception:
            # 异常会通过 __step 中的 send 传播回协程
            pass
        self.__step()

事件循环的调度策略

回调优先级与饥饿问题

import asyncio

async def demonstrate_scheduling():
    """演示 asyncio 的调度顺序"""
    
    async def worker(name: str, count: int):
        for i in range(count):
            print(f"  {name}-{i}")
            await asyncio.sleep(0)  # 让出控制权
    
    # 创建三个高频率让出的 worker
    t1 = asyncio.create_task(worker("A", 3))
    t2 = asyncio.create_task(worker("B", 3))
    
    # call_soon 注册的回调优先于 Task
    loop = asyncio.get_event_loop()
    loop.call_soon(lambda: print("  [call_soon] high priority!"))
    
    await asyncio.gather(t1, t2)

asyncio.run(demonstrate_scheduling())

输出示例:

  A-0
  [call_soon] high priority!
  B-0
  A-1
  B-1
  A-2
  B-2

call_soon 的回调在当前迭代的 Task 切换点立即执行,优先级高于下一次协程恢复。

await asyncio.sleep(0) 的调度效应

async def sleep_zero_demo():
    """sleep(0) 将当前协程移到就绪队列末尾——让其他任务先执行"""
    
    async def tight_loop():
        """没有 sleep 的紧循环——会饿死其他任务"""
        for _ in range(1000):
            pass  # 连续执行,不交出控制权
    
    async def polite_loop():
        """有 sleep(0) 的循环——礼貌让出"""
        for _ in range(1000):
            await asyncio.sleep(0)  # 每轮让出控制权

sleep(0) 的行为:将当前 Task 标记为就绪(而非挂起),然后事件循环在下一轮调度它——给它后面的 Task 一个执行机会。


工程实战:异步 HTTP 获取器

"""异步 HTTP 获取器——展示 asyncio 核心机制的实际应用"""
import asyncio
import time
from typing import Any

import httpx


class AsyncFetcher:
    """异步 HTTP 获取器——支持并发请求、超时、重试、限流"""
    
    def __init__(
        self,
        max_concurrent: int = 10,
        timeout: float = 30.0,
        max_retries: int = 3,
    ):
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.max_retries = max_retries
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._client: httpx.AsyncClient | None = None
    
    async def __aenter__(self):
        self._client = httpx.AsyncClient(timeout=self.timeout)
        return self
    
    async def __aexit__(self, *args):
        if self._client:
            await self._client.aclose()
    
    async def fetch_one(self, url: str) -> dict[str, Any]:
        """获取单个 URL,支持重试"""
        if not self._client:
            raise RuntimeError("Use as context manager: async with AsyncFetcher() as f:")
        
        for attempt in range(self.max_retries):
            try:
                async with self._semaphore:  # 限流
                    response = await self._client.get(url)
                    response.raise_for_status()
                    return {
                        "url": url,
                        "status": response.status_code,
                        "body": response.text[:200],
                        "attempt": attempt + 1,
                    }
            except httpx.HTTPStatusError as e:
                if attempt == self.max_retries - 1:
                    return {"url": url, "error": str(e), "attempt": attempt + 1}
                await asyncio.sleep(2 ** attempt)  # 指数退避
            except httpx.TimeoutException:
                if attempt == self.max_retries - 1:
                    return {"url": url, "error": "timeout", "attempt": attempt + 1}
                await asyncio.sleep(2 ** attempt)
    
    async def fetch_many(
        self, urls: list[str]
    ) -> list[dict[str, Any]]:
        """并发获取多个 URL"""
        tasks = [self.fetch_one(url) for url in urls]
        
        # 监控:报告进度
        completed = 0
        total = len(tasks)
        pending = set(tasks)
        
        results: list[dict[str, Any]] = []
        while pending:
            done, pending = await asyncio.wait(
                pending,
                return_when=asyncio.FIRST_COMPLETED,
            )
            for task in done:
                results.append(task.result())
                completed += 1
            
            print(f"Progress: {completed}/{total} URLs fetched")
        
        return results


# ===== 使用示例 =====
async def main():
    urls = [
        f"https://httpbin.org/delay/{i % 3}" for i in range(20)
    ]
    
    async with AsyncFetcher(max_concurrent=5, timeout=10.0) as fetcher:
        start = time.perf_counter()
        results = await fetcher.fetch_many(urls)
        elapsed = time.perf_counter() - start
    
    print(f"\nFetched {len(results)} URLs in {elapsed:.2f}s")
    
    success = [r for r in results if "error" not in r]
    failed = [r for r in results if "error" in r]
    print(f"  Success: {len(success)}")
    print(f"  Failed:  {len(failed)}")

asyncio.run(main())

调试 asyncio 的五个武器

武器一:开启 debug 模式

# 方式一:环境变量
PYTHONASYNCIODEBUG=1 python app.py

# 方式二:代码中开启
asyncio.run(main(), debug=True)

debug 模式会检测:

  • 协程未被 awaitRuntimeWarning: coroutine 'xxx' was never awaited
  • 执行时间过长的回调Executing <Handle ...> took 1.234 seconds
  • 在错误的线程中调用:检测跨线程使用事件循环

武器二:协程未被 await 的检测

import asyncio

async def forgotten():
    return 42

async def main():
    forgotten()  # ❌ 协程创建了但没有 await
    await asyncio.sleep(0.1)

asyncio.run(main(), debug=True)
# RuntimeWarning: coroutine 'forgotten' was never awaited

武器三:慢回调检测

import asyncio
import time

async def main():
    async def slow_sync_code():
        """同步阻塞代码——事件循环被卡住"""
        time.sleep(2)  # ❌ 阻塞事件循环
    
    task = asyncio.create_task(slow_sync_code())
    await task

asyncio.run(main(), debug=True)
# Executing <Task ...> took 2.001 seconds

武器四:asyncio.all_tasks() 查看所有活跃 Task

import asyncio

async def hanging_task():
    """永远不完成的任务——排查死锁"""
    try:
        await asyncio.Event().wait()  # 无限等待
    except asyncio.CancelledError:
        print("Task was cancelled")

async def main():
    task = asyncio.create_task(hanging_task(), name="hanging")
    
    # 检查当前所有活跃任务
    all_tasks = asyncio.all_tasks()
    for t in all_tasks:
        print(f"  Task: {t.get_name()}, done={t.done()}, cancelled={t.cancelled()}")
    
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

武器五:TaskGroup 的结构化并发

Python 3.11+ 的 TaskGroup 替代了裸 create_task——它保证所有子任务在退出上下文时已完成或取消:

async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(fetch_data("url1"))
        t2 = tg.create_task(fetch_data("url2"))
        t3 = tg.create_task(fetch_data("url3"))
    # 退出上下文时,自动等待所有子任务完成
    # 如果任一子任务抛异常,其余会被取消

避坑指南

坑一:在协程中调用同步阻塞函数

# ❌ requests.get 是同步的,会阻塞事件循环
async def fetch():
    import requests
    return requests.get("https://httpbin.org/get")

# ✅ 使用异步 HTTP 库
async def fetch():
    import httpx
    async with httpx.AsyncClient() as client:
        return await client.get("https://httpbin.org/get")

# ✅ 或将同步调用放到线程池
async def fetch():
    return await asyncio.to_thread(
        requests.get, "https://httpbin.org/get"
    )

坑二:创建了 Task 但忘记了异常

async def main():
    task = asyncio.create_task(failing_task())
    await asyncio.sleep(1)
    # 如果 failing_task 在这里抛异常,异常被吞没
    
    # ✅ 确保获取 Task 的结果(包括异常)
    try:
        await task
    except Exception as e:
        print(f"Task failed: {e}")

坑三:在多线程中使用同一个事件循环

# ❌ asyncio 事件循环不是线程安全的
loop = asyncio.new_event_loop()

def run_in_thread():
    # ❌ 不能在不同线程中使用同一个事件循环
    loop.run_until_complete(some_coro())

import threading
threading.Thread(target=run_in_thread).start()

# ✅ 使用 run_coroutine_threadsafe
future = asyncio.run_coroutine_threadsafe(some_coro(), loop)
result = future.result()

事件循环的内部结构一览

事件循环 (Event Loop)

到期

取出执行

await new_future

await sleep

await socket

I/O 就绪

_ready 队列
就绪回调

_scheduled 队列
定时器堆

I/O Selector
select/epoll/kqueue

Task A
协程对象

Task B
协程对象

sleep(1) 回调

执行回调
Task.__step()

注册 Future
add_done_callback

这张图是 asyncio 运行时的完整快照:三个队列(就绪、定时器、I/O)统一在事件循环的主循环中轮转。每个协程的每一步执行(Task.__step())都在就绪队列中排队,await 只是把"等待条件满足时重新入队"的回调注册到对应的机制中。


小结

概念 本质 一句话定位
协程对象 async def 创建的代码块 “可以被暂停和恢复的函数”
事件循环 永不停止的 while 循环 “协程的调度器——决定谁执行、谁等待”
Future 存根值占位符 “一个尚未完成的操作的结果容器”
Task 被事件循环驱动的协程 + Future “Future 的自动执行版本”
await 暂停当前协程,等待 Future 完成 “把控制权还给事件循环的指令”
create_task 将协程注册到事件循环 “告诉事件循环:这个可以和其他任务并发”

理解这些概念后,async/await 不再是一个黑盒——它是一套精心设计的协作式多任务调度系统,所有魔法全部建立在"注册回调 + 轮转队列"的基础之上。


如果这篇文章对深入理解 asyncio 有帮助,点赞收藏让更多人看到!关注专栏,持续获取 Python 进阶干货。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐