一、库的简介:异步编程的利器

在现实生活中,很多程序需要等待外部资源响应——比如一个网络爬虫向服务器请求网页,发送请求后要等待数据返回来才能继续;一个聊天程序可能需要同时监听用户输入和网络消息。传统的同步代码在等待 I/O 时会阻塞整个线程,导致 CPU 闲置,资源利用率低下。而引入多线程或多进程又会带来线程安全、内存开销等问题。Python 的 asyncio 库正是为了解决这类并发 I/O 问题而诞生的。它提供了一套基于事件循环的异步编程框架,让你可以使用 async/await 语法编写高并发的代码。在实际生活中,asyncio 可以驱动一个 Web 爬虫同时抓取成百上千个网页,可以在一个 Web 服务器中轻松处理上万条 WebSocket 连接,也可以用于实时监控多个温度传感器的数据。自从 Python 3.4 引入 asyncio 以来,大量网络框架(如 FastAPI、aiohttp、Sanic)都基于它构建,它彻底改变了 Python 在高并发领域的地位。

二、安装 asyncio

asyncio 是 Python 3.4+ 的标准库模块,无需任何 pip install 命令。需要注意的是,在 Windows 平台上,默认的事件循环策略可能与某些功能不兼容(例如子进程),但基础使用完全正常。你可以通过以下代码检查版本:

python

import asyncio
print(asyncio.__version__)  # 输出 Python 版本相关信息

三、基本用法

以下代码需要 Python 3.7+(推荐 3.10+)。

第一步:定义异步函数

使用 async def 定义一个协程(coroutine),内部可以使用 await 等待另一个协程。

python

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)   # 模拟 I/O 等待
    print("World")

第二步:运行协程

Python 3.7+ 推荐使用 asyncio.run() 作为入口点,它会创建事件循环并运行协程直到完成。

python

asyncio.run(say_hello())

第三步:同时运行多个协程 —— asyncio.gather

若要并发执行多个协程(如同时请求多个网页),使用 gather

python

async def task(name, delay):
    print(f"Task {name} 开始")
    await asyncio.sleep(delay)
    print(f"Task {name} 结束")
    return f"结果{name}"

async def main():
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    print(results)

asyncio.run(main())
# 输出顺序:B 先结束,A 其次,C 最后,总耗时约 3 秒而非 2+1+3=6 秒

第四步:创建任务(create_task

create_task 将协程包装成 Task 对象,使其能够在后台并发执行,无需立即 await

python

async def main():
    task1 = asyncio.create_task(task("A", 2))
    task2 = asyncio.create_task(task("B", 1))
    await task1
    await task2

四、高级用法

1. 超时控制 —— asyncio.wait_for

为协程设置超时时间,超时后抛出 TimeoutError

python

async def slow_operation():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("操作超时了")

2. 异步队列 —— asyncio.Queue

实现生产者-消费者模式的高效通道,常用于爬虫任务分发。

python

async def producer(queue):
    for i in range(5):
        await queue.put(f"数据{i}")
        await asyncio.sleep(0.5)
    await queue.put(None)  # 结束信号

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"消费: {item}")
        queue.task_done()

async def main():
    q = asyncio.Queue()
    prod = asyncio.create_task(producer(q))
    cons = asyncio.create_task(consumer(q))
    await prod
    await cons

asyncio.run(main())

3. 异步上下文管理器

使用 async with 管理异步资源,例如异步文件操作或数据库连接。

python

class AsyncResource:
    async def __aenter__(self):
        print("获取资源")
        await asyncio.sleep(0.1)
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源")
        await asyncio.sleep(0.1)

async def main():
    async with AsyncResource() as res:
        print("使用资源")

五、实际应用场景案例

场景一:异步并发请求多个 API(天气查询 + 汇率转换)

假设你正在编写一个旅行助手 App,需要同时调用天气 API 和汇率 API,并将两个结果组合展示。同步版本需要串行等待,总耗时是两个接口耗时之和;而异步版本几乎只需要最慢接口的耗时。

python

import asyncio
import aiohttp   # 第三方库,需要 pip install aiohttp

async def fetch_weather(city):
    async with aiohttp.ClientSession() as session:
        # 这里使用真实 API 地址(示例用模拟延迟)
        await asyncio.sleep(1)   # 模拟网络请求
        return f"{city} 天气: 晴天 25°C"

async def fetch_exchange_rate():
    await asyncio.sleep(0.8)   # 模拟汇率 API
    return "USD/CNY = 7.25"

async def main():
    weather_task = asyncio.create_task(fetch_weather("北京"))
    rate_task = asyncio.create_task(fetch_exchange_rate())
    weather, rate = await asyncio.gather(weather_task, rate_task)
    print(f"组合信息: {weather}, {rate}")

asyncio.run(main())   # 总耗时约 1.0 秒(max(1,0.8))

场景二:实时聊天服务器(WebSocket 广播)

使用 FastAPI + WebSocket 实现一个简易聊天室,每个用户连接对应一个异步任务,asyncio 可以轻松管理上百个连接。

python

from fastapi import FastAPI, WebSocket
import asyncio

app = FastAPI()
connections = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connections.append(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 广播给所有连接
            for conn in connections:
                if conn != websocket:
                    await conn.send_text(f"用户说: {data}")
    except:
        connections.remove(websocket)

# 启动命令:uvicorn main:app --host 0.0.0.0 --port 8000

场景三:异步下载并发控制(信号量限流)

当你需要同时下载大量文件,但又不希望一次性发出太多请求导致对方服务器过载,可以使用 asyncio.Semaphore 控制并发数。

python

import asyncio
import aiohttp

sem = asyncio.Semaphore(3)   # 最多同时 3 个下载任务

async def download(url):
    async with sem:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                content = await resp.read()
                print(f"下载完成: {url} 大小 {len(content)}")
                return content

async def main():
    urls = [f"https://example.com/file{i}.jpg" for i in range(20)]
    tasks = [asyncio.create_task(download(url)) for url in urls]
    await asyncio.gather(*tasks)

asyncio.run(main())

六、结尾互动

asyncio 是 Python 迈向高并发、高性能 I/O 编程的关键一步。它让开发者能够以同步思维编写异步代码,避免了回调地狱,同时又能充分利用系统资源。从爬虫、实时通信到微服务网关,asyncio 都是不可或缺的基础设施。掌握 asyncio 不仅能提升你程序的并发能力,还能让你更深刻地理解事件驱动编程的本质。尽管其学习曲线略陡(需要理解事件循环、协程、任务等概念),但一旦上手,你会发现它打开了 Python 性能优化的一扇崭新的大门。

你是否曾经在项目中因为同步阻塞而苦恼?或者已经尝试过用 asyncio 替换多线程并获得了数倍的性能提升?欢迎在评论区分享你的踩坑经历或成功实践。如果你刚接触异步,不妨动手写一个异步的“定时提醒”小工具——相信我,第一次看到多个任务“同时”运行的感觉会非常奇妙!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐