asyncio,一个异步的 Python 库!
一、库的简介:异步编程的利器
在现实生活中,很多程序需要等待外部资源响应——比如一个网络爬虫向服务器请求网页,发送请求后要等待数据返回来才能继续;一个聊天程序可能需要同时监听用户输入和网络消息。传统的同步代码在等待 I/O 时会阻塞整个线程,导致 CPU 闲置,资源利用率低下。而引入多线程或多进程又会带来线程安全、内存开销等问题。Python 的 asyncio 库正是为了解决这类并发 I/O 问题而诞生的。它提供了一套基于事件循环的异步编程框架,让你可以使用 async/await 语法编写高并发的代码。在实际生活中,asyncio 可以驱动一个 Web 爬虫同时抓取成百上千个网页,可以在一个 Web 服务器中轻松处理上万条 WebSocket 连接,也可以用于实时监控多个温度传感器的数据。自从 Python 3.4 引入 asyncio 以来,大量网络框架(如 FastAPI、aiohttp、Sanic)都基于它构建,它彻底改变了 Python 在高并发领域的地位。
二、安装 asyncio
asyncio 是 Python 3.4+ 的标准库模块,无需任何 pip install 命令。需要注意的是,在 Windows 平台上,默认的事件循环策略可能与某些功能不兼容(例如子进程),但基础使用完全正常。你可以通过以下代码检查版本:
python
import asyncio print(asyncio.__version__) # 输出 Python 版本相关信息
三、基本用法
以下代码需要 Python 3.7+(推荐 3.10+)。
第一步:定义异步函数
使用 async def 定义一个协程(coroutine),内部可以使用 await 等待另一个协程。
python
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟 I/O 等待
print("World")
第二步:运行协程
Python 3.7+ 推荐使用 asyncio.run() 作为入口点,它会创建事件循环并运行协程直到完成。
python
asyncio.run(say_hello())
第三步:同时运行多个协程 —— asyncio.gather
若要并发执行多个协程(如同时请求多个网页),使用 gather。
python
async def task(name, delay):
print(f"Task {name} 开始")
await asyncio.sleep(delay)
print(f"Task {name} 结束")
return f"结果{name}"
async def main():
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(results)
asyncio.run(main())
# 输出顺序:B 先结束,A 其次,C 最后,总耗时约 3 秒而非 2+1+3=6 秒
第四步:创建任务(create_task)
create_task 将协程包装成 Task 对象,使其能够在后台并发执行,无需立即 await。
python
async def main():
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
await task1
await task2
四、高级用法
1. 超时控制 —— asyncio.wait_for
为协程设置超时时间,超时后抛出 TimeoutError。
python
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("操作超时了")
2. 异步队列 —— asyncio.Queue
实现生产者-消费者模式的高效通道,常用于爬虫任务分发。
python
async def producer(queue):
for i in range(5):
await queue.put(f"数据{i}")
await asyncio.sleep(0.5)
await queue.put(None) # 结束信号
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"消费: {item}")
queue.task_done()
async def main():
q = asyncio.Queue()
prod = asyncio.create_task(producer(q))
cons = asyncio.create_task(consumer(q))
await prod
await cons
asyncio.run(main())
3. 异步上下文管理器
使用 async with 管理异步资源,例如异步文件操作或数据库连接。
python
class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(0.1)
async def main():
async with AsyncResource() as res:
print("使用资源")
五、实际应用场景案例
场景一:异步并发请求多个 API(天气查询 + 汇率转换)
假设你正在编写一个旅行助手 App,需要同时调用天气 API 和汇率 API,并将两个结果组合展示。同步版本需要串行等待,总耗时是两个接口耗时之和;而异步版本几乎只需要最慢接口的耗时。
python
import asyncio
import aiohttp # 第三方库,需要 pip install aiohttp
async def fetch_weather(city):
async with aiohttp.ClientSession() as session:
# 这里使用真实 API 地址(示例用模拟延迟)
await asyncio.sleep(1) # 模拟网络请求
return f"{city} 天气: 晴天 25°C"
async def fetch_exchange_rate():
await asyncio.sleep(0.8) # 模拟汇率 API
return "USD/CNY = 7.25"
async def main():
weather_task = asyncio.create_task(fetch_weather("北京"))
rate_task = asyncio.create_task(fetch_exchange_rate())
weather, rate = await asyncio.gather(weather_task, rate_task)
print(f"组合信息: {weather}, {rate}")
asyncio.run(main()) # 总耗时约 1.0 秒(max(1,0.8))
场景二:实时聊天服务器(WebSocket 广播)
使用 FastAPI + WebSocket 实现一个简易聊天室,每个用户连接对应一个异步任务,asyncio 可以轻松管理上百个连接。
python
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI()
connections = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connections.append(websocket)
try:
while True:
data = await websocket.receive_text()
# 广播给所有连接
for conn in connections:
if conn != websocket:
await conn.send_text(f"用户说: {data}")
except:
connections.remove(websocket)
# 启动命令:uvicorn main:app --host 0.0.0.0 --port 8000
场景三:异步下载并发控制(信号量限流)
当你需要同时下载大量文件,但又不希望一次性发出太多请求导致对方服务器过载,可以使用 asyncio.Semaphore 控制并发数。
python
import asyncio
import aiohttp
sem = asyncio.Semaphore(3) # 最多同时 3 个下载任务
async def download(url):
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
print(f"下载完成: {url} 大小 {len(content)}")
return content
async def main():
urls = [f"https://example.com/file{i}.jpg" for i in range(20)]
tasks = [asyncio.create_task(download(url)) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
六、结尾互动
asyncio 是 Python 迈向高并发、高性能 I/O 编程的关键一步。它让开发者能够以同步思维编写异步代码,避免了回调地狱,同时又能充分利用系统资源。从爬虫、实时通信到微服务网关,asyncio 都是不可或缺的基础设施。掌握 asyncio 不仅能提升你程序的并发能力,还能让你更深刻地理解事件驱动编程的本质。尽管其学习曲线略陡(需要理解事件循环、协程、任务等概念),但一旦上手,你会发现它打开了 Python 性能优化的一扇崭新的大门。
你是否曾经在项目中因为同步阻塞而苦恼?或者已经尝试过用 asyncio 替换多线程并获得了数倍的性能提升?欢迎在评论区分享你的踩坑经历或成功实践。如果你刚接触异步,不妨动手写一个异步的“定时提醒”小工具——相信我,第一次看到多个任务“同时”运行的感觉会非常奇妙!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)