Python asyncio 异步编程完全指南:从协程原理到高并发实战
一、为什么需要异步编程?
在日常开发中,大多数程序的性能瓶颈不是 CPU 计算,而是 I/O 等待—— 等网络响应、等数据库查询、等文件读写。传统的同步代码在等待期间会一直阻塞, CPU 只能干坐着什么都不做。
用一个生活中的类比来理解:
同步模式:你去咖啡馆点了一杯拿铁,然后站在柜台前一动不动地等5分钟,直到咖啡做好才去找座位。
异步模式:你点好单拿到号码,然后先去找座位、刷手机,服务员做好后叫你去取。
异步编程就是让程序在等待 I/O 的时候去处理别的任务,充分利用等待时间, 显著提升 I/O 密集型程序的吞吐量。
同步 vs 异步:性能差异有多大?
| 场景 | 同步方式 | asyncio 方式 | 提升倍数 |
|---|---|---|---|
| 并发请求 100 个 URL | ~50 秒 | ~2 秒 | 约 25x |
| 批量数据库查询 1000 次 | ~30 秒 | ~3 秒 | 约 10x |
| 读取 500 个小文件 | ~8 秒 | ~1 秒 | 约 8x |
ℹ️ 适用场景提示:asyncio 适合 I/O 密集型任务(网络请求、数据库、文件操作)。 对于 CPU 密集型任务(大规模计算、图像处理),应使用 multiprocessing 多进程方案。
二、协程基础:async 与 await
2.1 什么是协程?
协程(Coroutine)是一种可以在执行过程中 暂停并让出控制权 的函数。 与普通函数不同,调用协程函数不会立即执行,而是返回一个"协程对象", 需要由事件循环(Event Loop)来驱动执行。
定义一个协程函数只需在 def 前加 async:
import asyncio
async def say_hello(name: str) -> None:
"""一个简单的协程函数"""
# await 关键字:暂停当前协程,让出事件循环控制权
await asyncio.sleep(1) # 模拟 1 秒的 I/O 等待
print(f"Hello, {name}!")
# 运行协程
asyncio.run(say_hello("World"))
预期输出:
Hello, World!
⚠️ 常见误区:直接调用 say_hello("World") 不会执行任何代码, 只会返回一个协程对象 <coroutine object say_hello>。 必须通过 asyncio.run() 或 await 来驱动执行。
2.2 await 关键字的作用
await 只能用在 async 函数内部。它的作用是: 暂停当前协程的执行,等待右侧的可等待对象(Awaitable)完成,期间将控制权交还给事件循环。
可以被 await 的对象有三类:
- 协程对象(
coroutine) - Task 对象(
asyncio.Task) - Future 对象(
asyncio.Future)
import asyncio
async def fetch_data(url: str) -> str:
"""模拟一次网络请求"""
print(f"开始请求: {url}")
await asyncio.sleep(2) # ← 在这里暂停,让出控制权
print(f"请求完成: {url}")
return f"来自 {url} 的数据"
async def main() -> None:
result = await fetch_data("https://api.example.com/data")
print(result)
asyncio.run(main())
三、事件循环:asyncio 的心脏
事件循环(Event Loop)是 asyncio 的核心调度器。 它持续监听所有注册的协程和 I/O 事件,当某个协程在 await 处暂停时, 立即切换去执行另一个就绪的协程——这就是"并发"的来源。
┌─────────────────────────────────────────┐
│ Event Loop │
│ │
│ ┌──────────┐ await ┌──────────┐ │
│ │ 协程 A │ ──暂停──▶ │ I/O等待 │ │
│ └──────────┘ └──────────┘ │
│ │ 切换 │完成 │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ 协程 B │ ──运行──▶ │ 协程 A │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────────┘
3.1 事件循环的正确打开方式
Python 3.7+ 推荐使用 asyncio.run(),它会自动创建、运行并关闭事件循环:
import asyncio
async def main() -> None:
"""所有异步代码的入口点"""
await asyncio.sleep(1)
print("事件循环运行完毕")
# ✅ 推荐写法(Python 3.7+)
asyncio.run(main())
# ❌ 旧写法(不推荐,Python 3.10 已废弃)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
四、并发执行:asyncio.gather() 与 asyncio.create_task()
这是 asyncio 最关键的使用场景——同时发起多个异步任务,不互相等待。 理解了这一点,才算真正掌握了 asyncio。
4.1 顺序执行 vs 并发执行(性能对比)
先看问题:下面的代码是真正的并发吗?
import asyncio
import time
async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} 完成"
async def wrong_way() -> None:
"""❌ 错误写法:逐个 await,实际上是顺序执行!"""
start = time.time()
r1 = await task("任务A", 2)
r2 = await task("任务B", 2)
r3 = await task("任务C", 2)
print(f"耗时: {time.time() - start:.1f}s") # 输出:耗时: 6.0s
asyncio.run(wrong_way())
❌ 顺序 await(错误)
每个任务完成后才开始下一个。3个各等2秒的任务 = 总共等6秒。
✅ gather 并发(正确)
3个任务同时开始,一起等待。3个各等2秒的任务 = 总共只等2秒。
4.2 asyncio.gather():最常用的并发方式
import asyncio
import time
async def task(name: str, delay: float) -> str:
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name} 的结果"
async def right_way() -> None:
"""✅ 正确写法:gather 并发执行所有任务"""
start = time.time()
# gather 同时启动所有协程,等待全部完成
results = await asyncio.gather(
task("任务A", 2),
task("任务B", 2),
task("任务C", 2),
)
print(f"耗时: {time.time() - start:.1f}s") # 输出:耗时: 2.0s
print(f"所有结果: {results}")
asyncio.run(right_way())
预期输出:
任务A 开始
任务B 开始
任务C 开始
任务A 完成
任务B 完成
任务C 完成
耗时: 2.0s
所有结果: ['任务A 的结果', '任务B 的结果', '任务C 的结果']
4.3 asyncio.create_task():更灵活的任务控制
如果你需要在创建任务后继续执行其他代码,而不是立即等待,使用 create_task():
import asyncio
async def background_job(name: str) -> str:
await asyncio.sleep(3)
return f"{name} 后台任务完成"
async def main() -> None:
# 立即将协程封装为 Task 并调度执行(不阻塞当前代码)
task1 = asyncio.create_task(background_job("爬虫"))
task2 = asyncio.create_task(background_job("数据库同步"))
print("任务已提交,主程序继续运行...")
await asyncio.sleep(1)
print("主程序做了其他事情")
# 最后再等待任务完成、获取结果
result1 = await task1
result2 = await task2
print(result1)
print(result2)
asyncio.run(main())
| 方法 | 特点 | 适用场景 |
|---|---|---|
asyncio.gather() |
传入多个协程,一次性并发,返回结果列表 | 已知所有任务,一起等待结果 |
asyncio.create_task() |
立即调度单个协程为 Task,可独立控制 | 动态添加任务,需要精细控制执行时机 |
asyncio.wait() |
等待一组 Task,支持超时和条件等待 | 需要处理超时或只等部分任务完成的场景 |
五、异常处理与超时控制
5.1 在协程中处理异常
import asyncio
async def risky_request(url: str) -> str:
await asyncio.sleep(1)
if "error" in url:
raise ValueError(f"请求失败: {url}")
return f"成功: {url}"
async def main() -> None:
# 方式1:gather 默认一个任务失败就抛出异常
try:
results = await asyncio.gather(
risky_request("https://ok.example.com"),
risky_request("https://error.example.com"),
)
except ValueError as e:
print(f"捕获到异常: {e}")
# 方式2:return_exceptions=True,异常作为结果返回,不中断其他任务
results = await asyncio.gather(
risky_request("https://ok.example.com"),
risky_request("https://error.example.com"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"任务失败: {r}")
else:
print(f"任务成功: {r}")
asyncio.run(main())
5.2 超时控制:asyncio.wait_for()
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(10) # 模拟慢速操作
return "操作完成"
async def main() -> None:
try:
# 最多等待 3 秒,超时则抛出 asyncio.TimeoutError
result = await asyncio.wait_for(
slow_operation(),
timeout=3.0
)
print(result)
except asyncio.TimeoutError:
print("操作超时!已自动取消")
asyncio.run(main())
六、实战案例:高并发异步爬虫
理论讲完,来看一个真实的应用场景——用 asyncio + aiohttp 构建一个能并发抓取数百个网页的异步爬虫。
💡 前置安装:pip install aiohttp
aiohttp 是专为 asyncio 设计的异步 HTTP 客户端,不能在异步程序中使用 requests(同步库)。
import asyncio
import time
from typing import Optional
import aiohttp
# ── 配置 ──────────────────────────────────────────
MAX_CONCURRENT = 10 # 最大并发数(防止被封 IP 或压垮服务器)
URLS: list[str] = [
f"https://httpbin.org/delay/1?id={i}"
for i in range(20)
]
# ── 单个 URL 抓取协程 ──────────────────────────────
async def fetch_url(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
) -> Optional[str]:
"""
带并发限制的 URL 抓取。
semaphore 确保同时最多 MAX_CONCURRENT 个请求。
"""
async with semaphore: # 申请信号量(超出上限则等待)
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
content = await resp.text()
print(f"✅ 成功 [{resp.status}] {url[:50]}")
return content[:100] # 只取前100字符示意
else:
print(f"⚠️ 异常状态 [{resp.status}] {url}")
return None
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
print(f"❌ 请求失败 {url}: {e}")
return None
# ── 主入口 ─────────────────────────────────────────
async def main() -> None:
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
start = time.time()
# aiohttp 推荐使用 Session 复用 TCP 连接,比每次新建连接快很多
async with aiohttp.ClientSession() as session:
tasks = [
fetch_url(session, url, semaphore)
for url in URLS
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 统计结果
success = sum(1 for r in results if r is not None and not isinstance(r, Exception))
elapsed = time.time() - start
print(f"\n📊 完成: {success}/{len(URLS)} 成功 | 耗时: {elapsed:.2f}s")
print(f"🚀 同步方式预计耗时: ~{len(URLS)}s | 提速: {len(URLS)/elapsed:.1f}x")
asyncio.run(main())
这个爬虫的关键设计点:
asyncio.Semaphore:控制并发上限,防止同时发起过多请求被封禁aiohttp.ClientSession:复用 TCP 连接,减少握手开销async with:上下文管理器确保 Session 和连接被正确关闭return_exceptions=True:单个请求失败不影响其他请求
七、技术雷区汇总
| 错误做法 | 正确做法 | 原因 |
|---|---|---|
在 async 函数里用 time.sleep() |
用 await asyncio.sleep() |
time.sleep 会阻塞整个事件循环 |
在 async 程序里用 requests |
用 aiohttp 或 httpx |
同步库会阻塞事件循环,失去异步优势 |
逐个 await 协程 |
用 asyncio.gather() |
逐个 await 是顺序执行,不是并发 |
用 asyncio.get_event_loop() |
用 asyncio.run() |
旧 API 在 Python 3.10+ 已废弃 |
| 在 asyncio 中做 CPU 密集运算 | 用 loop.run_in_executor() 或 multiprocessing |
CPU 密集型会阻塞事件循环 |
总结
回顾本文的核心知识点:
- asyncio 适合 I/O 密集型场景,通过协程在等待期间切换执行其他任务
async def定义协程,await暂停并让出控制权,asyncio.run()启动事件循环- 并发的关键:用
asyncio.gather()或create_task(),而不是逐个await - 生产环境中记得用
Semaphore控制并发上限、用wait_for设置超时 - 配合
aiohttp可以构建高性能异步爬虫,轻松实现 10x 以上的速度提升
| 你的需求 | 推荐方案 |
|---|---|
| 并发请求多个 URL | asyncio.gather() + aiohttp |
| 动态添加后台任务 | asyncio.create_task() |
| 防止并发数过高 | asyncio.Semaphore |
| I/O 操作设置超时 | asyncio.wait_for() |
| CPU 密集型 + 异步混用 | loop.run_in_executor() |
如果你有其他关于 asyncio 的使用心得,或者在实际项目中踩过什么坑, 欢迎在评论区留言交流!觉得本文有帮助的话,点个赞和收藏支持一下 🙌
参考资料
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)