Python单线程并发:asyncio协程模型
asyncio 协程模型深度解析
写给 Java 开发者的认知迁移指南
一、核心认知:asyncio 是什么?
asyncio 是 Python 的标准异步 I/O 框架,其核心是在单线程内通过事件循环(Event Loop) 调度多个协程(Coroutine),实现高效的并发。
可以这样理解:事件循环就像一个“大脑”,管理着成百上千个轻量级任务(协程)。当一个任务遇到 I/O 等待时,它会主动把控制权交还给事件循环,事件循环立刻切换去执行其他就绪的任务。整个过程没有操作系统线程切换的开销,也没有锁的困扰。
与 Java 并发的本质区别
|
维度 |
Java 线程模型 |
Python asyncio |
|
并发单元 |
线程(重量级,MB 级栈) |
协程(轻量级,KB 级栈) |
|
调度方式 |
操作系统抢占式调度 |
事件循环协作式调度 |
|
切换成本 |
高(内核态,保存寄存器) |
极低(用户态,保存 Python 栈帧) |
|
并发数量 |
数百线程已达极限 |
轻松管理数万协程 |
|
适合场景 |
CPU 密集型 + I/O 密集型 |
主要是 I/O 密集型 |
|
同步原语 |
, |
, |
一句话类比:asyncio 协程之于 Python,类似 Java 的虚拟线程(Project Loom)——都是用用户态轻量任务替代操作系统线程。
为什么 Python 需要 asyncio?(GIL 的必然选择)
Python 的 GIL(全局解释器锁)使得多线程无法真正并行执行 CPU 密集型任务。对于 I/O 密集型任务,多线程虽能工作,但线程切换开销大、内存占用高。asyncio 通过“单线程 + 非阻塞 I/O + 事件循环”,绕过了 GIL 的限制,同时实现了极高的 I/O 并发效率。
二、核心组件全景图
2.1 事件循环(Event Loop)—— 大脑
事件循环是一个无限循环,负责:
- 检查哪些协程可以继续执行
- 执行就绪的协程,直到遇到
await - 将
await后面的 I/O 操作注册到系统选择器(如 epoll) - 当 I/O 就绪时,唤醒对应协程继续执行
Java 类比:类似 Netty 的 EventLoopGroup 或 Reactor 的 Scheduler,但 asyncio 是语言内置的。
2.2 协程(Coroutine)—— 轻量任务
用 async def 定义的函数是协程函数,调用它返回一个协程对象,并不会立即执行任何代码。
async def fetch_data():
print("开始请求")
await asyncio.sleep(1)
print("请求完成")
return "data"
# 只是创建了一个协程对象,代码还没运行
coro = fetch_data()
# 必须通过事件循环来执行
result = asyncio.run(coro)
2.3 任务(Task)—— 被调度的协程
协程对象本身不能直接被事件循环调度。必须将它包装成 Task(通过 asyncio.create_task() 或某些隐式转换),任务才会被注册到事件循环中并得到执行。Task 是 Future 的子类,代表一个“正在运行”的协程。
async def main():
# 显式创建 Task,立即加入事件循环
task1 = asyncio.create_task(fetch_data())
task2 = asyncio.create_task(fetch_data())
# 等待两个任务完成
results = await asyncio.gather(task1, task2)
Java 类比:CompletableFuture.supplyAsync(),但 asyncio Task 更轻量,且默认在同一个事件循环中执行。
2.4 Future —— 结果的占位符
Future 是一个尚未完成的结果的占位符。协程可以 await 一个 Future,当结果就绪时事件循环会唤醒该协程。Task 就是 Future 的子类。
三、执行模型:协作式多任务
3.1 await 的魔法:主动让出控制权
当协程执行到 await 时,它会主动告诉事件循环:“我正在等待某个操作完成,请先执行其他协程,等操作就绪再叫醒我。”
import asyncio
import time
async def task(name, delay):
print(f"[{time.strftime('%X')}] {name} 开始")
await asyncio.sleep(delay) # 在这里主动让出控制权
print(f"[{time.strftime('%X')}] {name} 完成")
async def main():
await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 0.5)
)
asyncio.run(main())
输出示例:
[14:30:01] A 开始
[14:30:01] B 开始
[14:30:01] C 开始
[14:30:01] C 完成
[14:30:02] B 完成
[14:30:03] A 完成
关键洞察:总耗时 ≈ 2 秒(最长任务的时间),而不是 2+1+0.5=3.5 秒。三个任务并发执行,因为它们在 await asyncio.sleep() 时都让出了控制权。
3.2 与 Java 线程模型的对比
|
对比点 |
Java 线程 |
asyncio 协程 |
|
让出时机 |
时间片耗尽(抢占式) |
显式 (协作式) |
|
是否需要锁 |
必须,因为线程随时可能被挂起 |
仅在 处可能切换,共享数据相对安全 |
|
阻塞操作 |
阻塞整个线程 |
只阻塞当前协程,事件循环继续运行 |
致命陷阱:在协程中调用同步阻塞函数(如 time.sleep(5)、requests.get())会阻塞整个事件循环,导致所有协程卡死。必须使用异步版本:await asyncio.sleep(5)、await httpx.AsyncClient().get()。
四、从协程到任务:隐式转换的规则
很多初学者困惑:协程对象到底什么时候开始执行?关键是要区分 协程对象 和 任务。
- 协程对象:调用
async def函数得到,只是一个可等待对象,不会自己运行。 - 任务:协程对象被包装成
Task并注册到事件循环后,才会被调度执行。
隐式转换的三种情况
- 直接
await一个协程对象
事件循环会将这个协程隐式包装成Task,然后立即交出控制权去运行它(或者等待它)。这意味着await coro()会阻塞当前协程,直到coro完成。 - 把协程对象放进
asyncio.gather()gather会将传入的每个协程隐式包装成Task,并注册到事件循环。然后返回一个Future,当你await这个Future时,事件循环会并发执行所有这些任务。 - 显式调用
asyncio.create_task(coro())
只负责把协程包装成Task并注册,不交出控制权。当前协程继续执行后面的代码,直到遇到下一个await才可能让事件循环去调度新创建的任务。
对比示例
async def demo():
# 方式1:直接 await —— 顺序执行
await asyncio.sleep(1) # 隐式转 Task,交出控制权,等待完成
await asyncio.sleep(1) # 上一个完成后才执行这个 → 总耗时 2 秒
# 方式2:create_task 后分别 await —— 并发执行
t1 = asyncio.create_task(asyncio.sleep(1)) # 只注册,不交出控制权
t2 = asyncio.create_task(asyncio.sleep(1)) # 连续注册两个任务
await t1 # 现在交出控制权,事件循环会并发执行 t1 和 t2
await t2 # 但这里 t2 很可能已经完成了 → 总耗时约 1 秒
# 方式3:gather —— 最简洁的并发
await asyncio.gather(
asyncio.sleep(1),
asyncio.sleep(1)
) # 总耗时约 1 秒
总结:
await coro()→ 顺序执行,阻塞等待。create_task()+ 后续await→ 可以启动多个任务并发。gather()→ 批量并发执行,自动收集返回值。
五、在 FastAPI 中使用 asyncio
FastAPI 充分利用了 asyncio,同时为同步代码提供了线程池回退。
5.1 路由函数:async def vs def
|
函数类型 |
执行线程 |
适用场景 |
|
|
主事件循环线程 |
I/O 密集型:数据库查询、外部 API 调用 |
|
|
外部线程池 |
CPU 密集型或同步阻塞代码 |
from fastapi import FastAPI
import asyncio
import time
app = FastAPI()
# 异步 I/O
@app.get("/async-data")
async def get_async_data():
await asyncio.sleep(1)
return {"message": "async"}
# CPU 密集型,用 def(自动放到线程池)
@app.get("/compute")
def compute():
time.sleep(1) # 不会阻塞事件循环
return {"result": 42}
# 错误:async def 内调用同步阻塞函数
@app.get("/bad")
async def bad_example():
time.sleep(1) # 阻塞整个事件循环!
return {"error": "这会卡死服务"}
5.2 异步数据库操作(SQLAlchemy 2.0 + asyncpg)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
return user
5.3 并发外部 API 调用(asyncio.gather)
import httpx
@app.get("/aggregate")
async def aggregate_data():
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
client.get("https://api1.example.com"),
client.get("https://api2.example.com"),
client.get("https://api3.example.com"),
)
return {f"api{i+1}": r.json() for i, r in enumerate(results)}
六、常见陷阱与最佳实践
6.1 陷阱:在协程中调用同步阻塞函数
# 错误
async def bad_fetch():
import requests
resp = requests.get("https://httpbin.org/delay/5") # 阻塞事件循环!
return resp.json()
# 正确1:使用异步库
import httpx
async def good_fetch():
async with httpx.AsyncClient() as client:
resp = await client.get("https://httpbin.org/delay/5")
return resp.json()
# 正确2:委托给线程池
async def good_fetch_thread():
loop = asyncio.get_running_loop()
# run_in_executor 会把同步函数放到线程池执行
return await loop.run_in_executor(None, sync_fetch)
6.2 陷阱:忘记 await 导致任务“消失”
async def main():
# 错误:task 被创建但从未被 await,可能未完成程序就退出
asyncio.create_task(fetch_data())
# 正确:保存引用并等待
task = asyncio.create_task(fetch_data())
await task
6.3 陷阱:在已有事件循环中调用 asyncio.run()
# 错误:FastAPI、Jupyter 已经运行了事件循环
@app.get("/")
def endpoint():
asyncio.run(some_coro()) # RuntimeError!
# 正确:直接 await
@app.get("/")
async def endpoint():
return await some_coro()
6.4 最佳实践清单
|
实践 |
说明 |
|
优先使用 生态库 |
, , , |
|
CPU 密集型任务用 或 |
避免阻塞事件循环 |
|
合理设置并发限制 |
使用 防止资源耗尽 |
|
使用 |
避免一个任务异常导致其他任务被取消 |
|
在 FastAPI 中使用 管理资源 |
替代已弃用的 |
七、性能对比:同步 vs 多线程 vs 异步
基准测试:100 个并发请求,每个请求模拟 0.1 秒 I/O
|
模式 |
实现方式 |
总耗时 |
内存占用 |
|
同步 |
+ for 循环 |
~10 秒 |
低 |
|
多线程 |
(100 线程) |
~0.5 秒 |
高 (~200MB) |
|
异步 |
+ |
~0.15 秒 |
极低 (~30MB) |
结论:对于高并发 I/O 场景,asyncio 在性能和资源效率上远超传统多线程。
八、Java 开发者的思维转变要点
- 放弃“多线程思维”
不要用Thread类比协程。拥抱“单线程事件循环 + 协作式调度”。 await是让出点,不是阻塞点
遇到await意味着当前协程暂停,事件循环去执行其他任务。- 同步库是毒药
在异步代码中,必须使用异步版本的库(httpx,asyncpg,aiofiles)。 - 区分协程对象与任务
协程对象只是“蓝图”,只有被包装成Task并注册到事件循环才会执行。await和gather会隐式创建任务,create_task显式创建。 - FastAPI 自动处理细节
定义async def路由,框架会帮你管理事件循环。 - 先理解再编码
花 30 分钟运行几个asyncio.sleep示例,观察执行顺序,比看十篇文章都管用。
掌握了 asyncio,你就掌握了 FastAPI 高性能的钥匙,也为后续深入 AI Agent 的异步工具调用、流式响应打下坚实基础。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)