asyncio 协程模型深度解析

写给 Java 开发者的认知迁移指南

一、核心认知:asyncio 是什么?

asyncio 是 Python 的标准异步 I/O 框架,其核心是在单线程内通过事件循环(Event Loop) 调度多个协程(Coroutine),实现高效的并发。

可以这样理解:事件循环就像一个“大脑”,管理着成百上千个轻量级任务(协程)。当一个任务遇到 I/O 等待时,它会主动把控制权交还给事件循环,事件循环立刻切换去执行其他就绪的任务。整个过程没有操作系统线程切换的开销,也没有锁的困扰。

与 Java 并发的本质区别

维度

Java 线程模型

Python asyncio

并发单元

线程(重量级,MB 级栈)

协程(轻量级,KB 级栈)

调度方式

操作系统抢占式调度

事件循环协作式调度

切换成本

高(内核态,保存寄存器)

极低(用户态,保存 Python 栈帧)

并发数量

数百线程已达极限

轻松管理数万协程

适合场景

CPU 密集型 + I/O 密集型

主要是 I/O 密集型

同步原语

synchronized

, ReentrantLock

asyncio.Lock

, Semaphore

一句话类比:asyncio 协程之于 Python,类似 Java 的虚拟线程(Project Loom)——都是用用户态轻量任务替代操作系统线程。

为什么 Python 需要 asyncio?(GIL 的必然选择)

Python 的 GIL(全局解释器锁)使得多线程无法真正并行执行 CPU 密集型任务。对于 I/O 密集型任务,多线程虽能工作,但线程切换开销大、内存占用高。asyncio 通过“单线程 + 非阻塞 I/O + 事件循环”,绕过了 GIL 的限制,同时实现了极高的 I/O 并发效率。


二、核心组件全景图

2.1 事件循环(Event Loop)—— 大脑

事件循环是一个无限循环,负责:

  • 检查哪些协程可以继续执行
  • 执行就绪的协程,直到遇到 await
  • await 后面的 I/O 操作注册到系统选择器(如 epoll)
  • 当 I/O 就绪时,唤醒对应协程继续执行

Java 类比:类似 Netty 的 EventLoopGroup 或 Reactor 的 Scheduler,但 asyncio 是语言内置的。

2.2 协程(Coroutine)—— 轻量任务

async def 定义的函数是协程函数,调用它返回一个协程对象,并不会立即执行任何代码。

async def fetch_data():
    print("开始请求")
    await asyncio.sleep(1)
    print("请求完成")
    return "data"

# 只是创建了一个协程对象,代码还没运行
coro = fetch_data()  

# 必须通过事件循环来执行
result = asyncio.run(coro)
2.3 任务(Task)—— 被调度的协程

协程对象本身不能直接被事件循环调度。必须将它包装成 Task(通过 asyncio.create_task() 或某些隐式转换),任务才会被注册到事件循环中并得到执行。TaskFuture 的子类,代表一个“正在运行”的协程。

async def main():
    # 显式创建 Task,立即加入事件循环
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())
    # 等待两个任务完成
    results = await asyncio.gather(task1, task2)

Java 类比CompletableFuture.supplyAsync(),但 asyncio Task 更轻量,且默认在同一个事件循环中执行。

2.4 Future —— 结果的占位符

Future 是一个尚未完成的结果的占位符。协程可以 await 一个 Future,当结果就绪时事件循环会唤醒该协程。Task 就是 Future 的子类。


三、执行模型:协作式多任务

3.1 await 的魔法:主动让出控制权

当协程执行到 await 时,它会主动告诉事件循环:“我正在等待某个操作完成,请先执行其他协程,等操作就绪再叫醒我。”

import asyncio
import time

async def task(name, delay):
    print(f"[{time.strftime('%X')}] {name} 开始")
    await asyncio.sleep(delay)   # 在这里主动让出控制权
    print(f"[{time.strftime('%X')}] {name} 完成")

async def main():
    await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 0.5)
    )

asyncio.run(main())

输出示例:

[14:30:01] A 开始
[14:30:01] B 开始
[14:30:01] C 开始
[14:30:01] C 完成
[14:30:02] B 完成
[14:30:03] A 完成

关键洞察:总耗时 ≈ 2 秒(最长任务的时间),而不是 2+1+0.5=3.5 秒。三个任务并发执行,因为它们在 await asyncio.sleep() 时都让出了控制权。

3.2 与 Java 线程模型的对比

对比点

Java 线程

asyncio 协程

让出时机

时间片耗尽(抢占式)

显式 await

(协作式)

是否需要锁

必须,因为线程随时可能被挂起

仅在 await

处可能切换,共享数据相对安全

阻塞操作

阻塞整个线程

只阻塞当前协程,事件循环继续运行

致命陷阱:在协程中调用同步阻塞函数(如 time.sleep(5)requests.get())会阻塞整个事件循环,导致所有协程卡死。必须使用异步版本:await asyncio.sleep(5)await httpx.AsyncClient().get()


四、从协程到任务:隐式转换的规则

很多初学者困惑:协程对象到底什么时候开始执行?关键是要区分 协程对象任务

  • 协程对象:调用 async def 函数得到,只是一个可等待对象,不会自己运行
  • 任务:协程对象被包装成 Task 并注册到事件循环后,才会被调度执行。
隐式转换的三种情况
  1. 直接 await 一个协程对象
    事件循环会将这个协程隐式包装成 Task,然后立即交出控制权去运行它(或者等待它)。这意味着 await coro() 会阻塞当前协程,直到 coro 完成。
  2. 把协程对象放进 asyncio.gather()
    gather 会将传入的每个协程隐式包装成 Task,并注册到事件循环。然后返回一个 Future,当你 await 这个 Future 时,事件循环会并发执行所有这些任务。
  3. 显式调用 asyncio.create_task(coro())
    只负责把协程包装成 Task 并注册,不交出控制权。当前协程继续执行后面的代码,直到遇到下一个 await 才可能让事件循环去调度新创建的任务。
对比示例
async def demo():
    # 方式1:直接 await —— 顺序执行
    await asyncio.sleep(1)   # 隐式转 Task,交出控制权,等待完成
    await asyncio.sleep(1)   # 上一个完成后才执行这个 → 总耗时 2 秒

    # 方式2:create_task 后分别 await —— 并发执行
    t1 = asyncio.create_task(asyncio.sleep(1))  # 只注册,不交出控制权
    t2 = asyncio.create_task(asyncio.sleep(1))  # 连续注册两个任务
    await t1   # 现在交出控制权,事件循环会并发执行 t1 和 t2
    await t2   # 但这里 t2 很可能已经完成了 → 总耗时约 1 秒

    # 方式3:gather —— 最简洁的并发
    await asyncio.gather(
        asyncio.sleep(1),
        asyncio.sleep(1)
    )   # 总耗时约 1 秒

总结

  • await coro() → 顺序执行,阻塞等待。
  • create_task() + 后续 await → 可以启动多个任务并发。
  • gather() → 批量并发执行,自动收集返回值。

五、在 FastAPI 中使用 asyncio

FastAPI 充分利用了 asyncio,同时为同步代码提供了线程池回退。

5.1 路由函数:async def vs def

函数类型

执行线程

适用场景

async def

主事件循环线程

I/O 密集型:数据库查询、外部 API 调用

def

外部线程池

CPU 密集型或同步阻塞代码

from fastapi import FastAPI
import asyncio
import time

app = FastAPI()

# 异步 I/O
@app.get("/async-data")
async def get_async_data():
    await asyncio.sleep(1)
    return {"message": "async"}

# CPU 密集型,用 def(自动放到线程池)
@app.get("/compute")
def compute():
    time.sleep(1)   # 不会阻塞事件循环
    return {"result": 42}

#  错误:async def 内调用同步阻塞函数
@app.get("/bad")
async def bad_example():
    time.sleep(1)   # 阻塞整个事件循环!
    return {"error": "这会卡死服务"}
5.2 异步数据库操作(SQLAlchemy 2.0 + asyncpg)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(User).where(User.id == user_id)
        )
        user = result.scalar_one_or_none()
    return user
5.3 并发外部 API 调用(asyncio.gather
import httpx

@app.get("/aggregate")
async def aggregate_data():
    async with httpx.AsyncClient() as client:
        results = await asyncio.gather(
            client.get("https://api1.example.com"),
            client.get("https://api2.example.com"),
            client.get("https://api3.example.com"),
        )
    return {f"api{i+1}": r.json() for i, r in enumerate(results)}

六、常见陷阱与最佳实践

6.1 陷阱:在协程中调用同步阻塞函数
#  错误
async def bad_fetch():
    import requests
    resp = requests.get("https://httpbin.org/delay/5")  # 阻塞事件循环!
    return resp.json()

#  正确1:使用异步库
import httpx
async def good_fetch():
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://httpbin.org/delay/5")
        return resp.json()

#  正确2:委托给线程池
async def good_fetch_thread():
    loop = asyncio.get_running_loop()
    # run_in_executor 会把同步函数放到线程池执行
    return await loop.run_in_executor(None, sync_fetch)
6.2 陷阱:忘记 await 导致任务“消失”
async def main():
    #  错误:task 被创建但从未被 await,可能未完成程序就退出
    asyncio.create_task(fetch_data())

    #  正确:保存引用并等待
    task = asyncio.create_task(fetch_data())
    await task
6.3 陷阱:在已有事件循环中调用 asyncio.run()
#  错误:FastAPI、Jupyter 已经运行了事件循环
@app.get("/")
def endpoint():
    asyncio.run(some_coro())  # RuntimeError!

#  正确:直接 await
@app.get("/")
async def endpoint():
    return await some_coro()
6.4 最佳实践清单

实践

说明

优先使用 async/await

生态库

httpx

, asyncpg

, aioredis

, aiofiles

CPU 密集型任务用 def

run_in_executor

避免阻塞事件循环

合理设置并发限制

使用 asyncio.Semaphore

防止资源耗尽

使用 asyncio.gather(*tasks, return_exceptions=True)

避免一个任务异常导致其他任务被取消

在 FastAPI 中使用 lifespan

管理资源

替代已弃用的 @app.on_event("startup")


七、性能对比:同步 vs 多线程 vs 异步

基准测试:100 个并发请求,每个请求模拟 0.1 秒 I/O

模式

实现方式

总耗时

内存占用

同步

requests

+ for 循环

~10 秒

多线程

ThreadPoolExecutor

(100 线程)

~0.5 秒

高 (~200MB)

异步

asyncio

+ httpx

~0.15 秒

极低 (~30MB)

结论:对于高并发 I/O 场景,asyncio 在性能和资源效率上远超传统多线程。


八、Java 开发者的思维转变要点

  1. 放弃“多线程思维”
    不要用 Thread 类比协程。拥抱“单线程事件循环 + 协作式调度”。
  2. await 是让出点,不是阻塞点
    遇到 await 意味着当前协程暂停,事件循环去执行其他任务。
  3. 同步库是毒药
    在异步代码中,必须使用异步版本的库(httpx, asyncpg, aiofiles)。
  4. 区分协程对象与任务
    协程对象只是“蓝图”,只有被包装成 Task 并注册到事件循环才会执行。awaitgather 会隐式创建任务,create_task 显式创建。
  5. FastAPI 自动处理细节
    定义 async def 路由,框架会帮你管理事件循环。
  6. 先理解再编码
    花 30 分钟运行几个 asyncio.sleep 示例,观察执行顺序,比看十篇文章都管用。

掌握了 asyncio,你就掌握了 FastAPI 高性能的钥匙,也为后续深入 AI Agent 的异步工具调用、流式响应打下坚实基础。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐