上一篇【第 016 篇】并发编程(一)——多线程与多进程
下一篇【第 018 篇】文件与IO操作——读写CSV、JSON、Excel


系列说明:本系列共 30 篇,全面介绍 Python 编程从零基础到软件工程师的完整路径。本文为第 017 篇,深入讲解 Python asyncio 异步编程——asyncio 是 Python 3.4 引入的标准库,专门用于编写高并发 I/O 操作,是处理大量网络连接和 I/O 密集型任务的利器。


摘要

asyncio 是 Python 异步编程的核心模块,它基于事件循环和协程机制,实现了单线程内的并发执行。相比多线程,asyncio 避免了锁竞争和上下文切换开销,特别适合 I/O 密集型任务如网络请求、文件操作、数据库访问等。本文涵盖 async/await 语法、事件循环、协程、任务管理、信号量与锁、流处理、异常处理,以及 asyncio 与其他异步库的集成。


一、为什么需要 asyncio

1.1 I/O 密集型问题的本质

# 同步方式:顺序执行,总耗时 = sum(各请求时间)
import requests
import time

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/1",
]

start = time.time()
for url in urls:
    response = requests.get(url)
    print(f"收到响应:{response.status_code}")

print(f"同步总耗时:{time.time() - start:.2f}s")
# 约 4 秒

1.2 asyncio 的解决思路

asyncio 采用协程(Coroutine)机制,在等待 I/O 时切换到其他任务,避免阻塞:

import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
        print(f"收到 {len(responses)} 个响应")

start = time.time()
asyncio.run(main())
print(f"异步总耗时:{time.time() - start:.2f}s")
# 约 2 秒(最长请求的时间)

二、async/await 语法

2.1 定义协程函数

# 使用 async def 定义协程函数
async def hello():
    return "Hello, async!"

# 协程函数调用后返回协程对象
coro = hello()
print(type(coro))  # <class 'coroutine'>

# 运行协程
result = asyncio.run(coro)
print(result)  # Hello, async!

# 不能直接调用
# result = hello()  # 返回协程对象,不会执行

2.2 await 关键字

import asyncio

async def get_data():
    """模拟异步 I/O 操作。"""
    await asyncio.sleep(1)  # 模拟 I/O 等待
    return "数据"

async def main():
    # await 等待协程完成并获取结果
    result = await get_data()
    print(f"获取到:{result}")

asyncio.run(main())

2.3 awaitable 对象

await 可以用于以下对象:协程(Coroutine)、任务(Task)、未来对象(Future)。

import asyncio

async def coro():
    return "result"

async def main():
    # 等待协程
    result1 = await coro()
    
    # 等待 Task
    task = asyncio.create_task(coro())
    result2 = await task
    
    # 等待 Future
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    # future.set_result("done")
    # result3 = await future
    
    print(result1, result2)

asyncio.run(main())

三、事件循环

3.1 运行事件循环

import asyncio

# 方法一:asyncio.run()(推荐,主入口)
async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

asyncio.run(main())

# 方法二:get_event_loop() + run_until_complete()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# 方法三:.run() 在已有事件循环中
async def nested():
    return 42

async def outer():
    result = await nested()
    print(f"结果:{result}")

# 在已存在的事件循环中运行
async def main_with_existing_loop():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, nested)  # 在线程池运行同步函数

3.2 事件循环的生命周期

import asyncio
import time

async def slow_task(name, duration):
    print(f"[{name}] 开始")
    await asyncio.sleep(duration)
    print(f"[{name}] 完成")
    return name

async def main():
    start = time.time()
    
    # gather: 并发执行多个协程
    results = await asyncio.gather(
        slow_task("A", 1),
        slow_task("B", 2),
        slow_task("C", 1),
    )
    print(f"结果:{results}")  # ['A', 'C', 'B'](完成顺序)
    
    print(f"总耗时:{time.time() - start:.2f}s")  # ~2s(最长任务的耗时)

asyncio.run(main())

3.3 asyncio.gather vs asyncio.create_task

import asyncio
import time

async def task(name, duration):
    print(f"[{name}] 开始")
    await asyncio.sleep(duration)
    print(f"[{name}] 完成")
    return name

# gather:等待所有任务完成
async def using_gather():
    results = await asyncio.gather(
        task("A", 1),
        task("B", 2),
    )
    return results

# create_task + gather:任务立即开始执行
async def using_create_task():
    # 创建任务(立即开始调度)
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 2))
    
    # 两任务同时执行
    results = await asyncio.gather(t1, t2)
    return results

# create_task + 独立等待
async def using_task_independently():
    t1 = asyncio.create_task(task("A", 1))
    t2 = asyncio.create_task(task("B", 2))
    
    # 等待特定任务
    result1 = await t1
    print(f"Task A 完成:{result1}")
    
    # 等待其他任务
    result2 = await t2
    return result2

四、任务管理

4.1 asyncio.Task

Task 是 Future 的子类,用于调度协程执行:

import asyncio

async def fetch(url):
    await asyncio.sleep(1)
    return f"数据 from {url}"

async def main():
    # create_task:创建并启动任务
    task1 = asyncio.create_task(fetch("https://example.com"))
    task2 = asyncio.create_task(fetch("https://python.org"))
    
    # 等待结果
    result1 = await task1
    result2 = await task2
    print(result1, result2)

asyncio.run(main())

4.2 asyncio.wait

import asyncio

async def task(name, duration):
    await asyncio.sleep(duration)
    return f"{name} 完成"

async def main():
    tasks = [
        asyncio.create_task(task("A", 2)),
        asyncio.create_task(task("B", 1)),
        asyncio.create_task(task("C", 3)),
    ]
    
    # wait:等待一组任务完成
    # return_when 参数:FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print(f"已完成:{[t.result() for t in done]}")
    print(f"待完成:{len(pending)} 个")
    
    # 取消待完成的任务
    for p in pending:
        p.cancel()
    
    # 等待取消完成
    await asyncio.gather(*pending, return_exceptions=True)

asyncio.run(main())

4.3 asyncio.as_completed

import asyncio

async def task(name, duration):
    await asyncio.sleep(duration)
    return f"{name} 完成"

async def main():
    tasks = [
        asyncio.create_task(task("A", 2)),
        asyncio.create_task(task("B", 1)),
        asyncio.create_task(task("C", 3)),
    ]
    
    # as_completed:按完成顺序返回结果
    for completed in asyncio.as_completed(tasks):
        result = await completed
        print(f"收到结果:{result}")

asyncio.run(main())
# 输出顺序:B 完成 -> A 完成 -> C 完成(按实际完成时间)

五、同步与异步

5.1 asyncio.Lock

import asyncio

class AsyncCounter:
    def __init__(self):
        self.count = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        async with self.lock:
            self.count += 1
            return self.count

async def worker(counter, worker_id):
    for _ in range(100):
        new_count = await counter.increment()
    print(f"Worker {worker_id} 完成")

async def main():
    counter = AsyncCounter()
    tasks = [worker(counter, i) for i in range(10)]
    await asyncio.gather(*tasks)
    print(f"最终计数:{counter.count}")  # 应该是 1000

asyncio.run(main())

5.2 asyncio.Semaphore

import asyncio

async def limited_task(semaphore, task_id):
    async with semaphore:
        print(f"任务 {task_id} 开始")
        await asyncio.sleep(1)
        print(f"任务 {task_id} 完成")
        return task_id

async def main():
    # 限制同时执行的任务数
    semaphore = asyncio.Semaphore(3)
    
    tasks = [limited_task(semaphore, i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(f"完成顺序:{results}")

asyncio.run(main())

5.3 asyncio.Queue

import asyncio

async def producer(queue, count):
    for i in range(count):
        item = f"item-{i}"
        await queue.put(item)
        print(f"生产:{item}")
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=2)
            print(f"消费者 {name} 处理:{item}")
            await asyncio.sleep(0.3)
            queue.task_done()
        except asyncio.TimeoutError:
            print(f"消费者 {name} 超时,退出")
            break

async def main():
    queue = asyncio.Queue()
    
    # 启动生产者和消费者
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue, "C1"),
        consumer(queue, "C2"),
    )

asyncio.run(main())

5.4 asyncio.Event

import asyncio

async def waiter(event, name):
    print(f"[{name}] 等待事件...")
    await event.wait()
    print(f"[{name}] 事件已触发!")

async def setter(event):
    await asyncio.sleep(2)
    print("[Setter] 设置事件")
    event.set()

async def main():
    event = asyncio.Event()
    
    tasks = [
        asyncio.create_task(waiter(event, "W1")),
        asyncio.create_task(waiter(event, "W2")),
        asyncio.create_task(setter(event)),
    ]
    
    await asyncio.gather(*tasks)

asyncio.run(main())
# W1 和 W2 会同时等待,直到 setter 设置事件

六、异步上下文管理器

6.1 async with 语法

import asyncio

class AsyncResource:
    """模拟异步资源(如数据库连接)。"""
    
    async def __aenter__(self):
        print("获取资源...")
        await asyncio.sleep(0.5)  # 模拟连接建立
        print("资源已就绪")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源...")
        await asyncio.sleep(0.5)  # 模拟关闭连接
        print("资源已释放")
        return False  # 不抑制异常

async def use_resource():
    async with AsyncResource() as resource:
        print("使用资源中...")
        await asyncio.sleep(1)
    # 退出 with 块时自动清理

asyncio.run(use_resource())

6.2 aenter 和 aexit 的返回值

class ManagedResource:
    async def __aenter__(self):
        return "返回值将绑定到 as 后的变量"
    
    async def __aexit__(self, *args):
        pass

async def main():
    async with ManagedResource() as resource:
        print(resource)  # "返回值将绑定到 as 后的变量"

七、异步迭代器与生成器

7.1 async for

import asyncio

class AsyncRange:
    """异步迭代器。"""
    
    def __init__(self, max_num):
        self.current = 0
        self.max_num = max_num
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        await asyncio.sleep(0.1)  # 模拟异步操作
        if self.current >= self.max_num:
            raise StopAsyncIteration
        value = self.current
        self.current += 1
        return value

async def main():
    async for i in AsyncRange(5):
        print(f"值:{i}")

asyncio.run(main())

7.2 异步生成器

import asyncio

async def async_range(max_num):
    """异步生成器。"""
    for i in range(max_num):
        await asyncio.sleep(0.1)  # 模拟异步操作
        yield i

async def main():
    async for i in async_range(5):
        print(f"值:{i}")

asyncio.run(main())

7.3 aitertools(Python 3.12+)

import asyncio
import aitertools

async def process_item(item):
    await asyncio.sleep(0.1)
    return item * 2

async def main():
    items = [1, 2, 3, 4, 5]
    
    # 异步映射
    doubled = [x async for x in aitertools.async_iter(items).map(process_item)]
    print(f"加倍结果:{doubled}")  # [2, 4, 6, 8, 10]
    
    # 异步过滤
    filtered = [x async for x in aitertools.async_iter(items).filter(lambda x: x > 2)]
    print(f"过滤结果:{filtered}")  # [3, 4, 5]

# Python 3.12 以下需要手动实现或使用第三方库

八、实战演练

8.1 异步 HTTP 客户端

import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class FetchResult:
    url: str
    status: int
    content: str
    duration: float
    error: Optional[str] = None

async def fetch(session, url) -> FetchResult:
    start = asyncio.get_event_loop().time()
    try:
        async with session.get(url) as response:
            content = await response.text()
            duration = asyncio.get_event_loop().time() - start
            return FetchResult(
                url=url,
                status=response.status,
                content=content[:100],  # 只保留前100字符
                duration=duration
            )
    except Exception as e:
        duration = asyncio.get_event_loop().time() - start
        return FetchResult(
            url=url,
            status=0,
            content="",
            duration=duration,
            error=str(e)
        )

async def fetch_all(urls: List[str]) -> List[FetchResult]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/get",
        "https://httpbin.org/uuid",
    ]
    
    print("开始并发请求...")
    results = await fetch_all(urls)
    
    print(f"\n请求完成,共 {len(results)} 个响应:")
    for r in results:
        status = f"HTTP {r.status}" if r.status else f"错误: {r.error}"
        print(f"  {r.url}: {status} ({r.duration:.2f}s)")

if __name__ == "__main__":
    asyncio.run(main())

8.2 异步 WebSocket 客户端

import asyncio
import json
import websockets
from typing import Callable, Optional

class AsyncWebSocketClient:
    """异步 WebSocket 客户端。"""
    
    def __init__(self, uri: str):
        self.uri = uri
        self.websocket: Optional[websockets.WebSocketClientProtocol] = None
    
    async def connect(self):
        self.websocket = await websockets.connect(self.uri)
        print(f"已连接到 {self.uri}")
    
    async def send(self, message: dict):
        if self.websocket:
            await self.websocket.send(json.dumps(message))
            print(f"发送:{message}")
    
    async def receive(self) -> dict:
        if self.websocket:
            message = await self.websocket.recv()
            data = json.loads(message)
            print(f"收到:{data}")
            return data
    
    async def close(self):
        if self.websocket:
            await self.websocket.close()
            print("连接已关闭")

async def echo_handler(uri: str):
    """回声服务器测试处理器。"""
    client = AsyncWebSocketClient(uri)
    
    try:
        await client.connect()
        
        # 发送消息
        for i in range(5):
            await client.send({"message": f"Hello {i}", "id": i})
            await asyncio.sleep(0.5)
        
        # 接收响应
        for _ in range(5):
            response = await asyncio.wait_for(client.receive(), timeout=5)
            print(f"响应:{response}")
    
    except asyncio.TimeoutError:
        print("接收超时")
    except websockets.exceptions.ConnectionClosed:
        print("连接已关闭")
    finally:
        await client.close()

# 使用(需要运行一个 echo 服务器)
# asyncio.run(echo_handler("ws://localhost:8080"))

8.3 异步限流器

import asyncio
import time
from typing import List, Callable, Any, Awaitable

class AsyncRateLimiter:
    """异步限流器。"""
    
    def __init__(self, rate: float, per: float):
        """
        Args:
            rate: 每秒允许的请求数
            per: 时间窗口(秒)
        """
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        """获取许可(必要时等待)。"""
        async with self.lock:
            current = time.time()
            time_passed = current - self.last_check
            self.last_check = current
            
            # 恢复令牌
            self.allowance += time_passed * self.rate
            if self.allowance > self.rate:
                self.allowance = self.rate
            
            if self.allowance < 1.0:
                # 需要等待
                wait_time = (1.0 - self.allowance) / self.rate
                await asyncio.sleep(wait_time)
                self.allowance = 0.0
            else:
                self.allowance -= 1.0

async def rate_limited_task(limiter, task_id):
    await limiter.acquire()
    print(f"任务 {task_id} 执行")
    await asyncio.sleep(0.1)

async def main():
    # 每秒最多执行 5 个任务
    limiter = AsyncRateLimiter(rate=5, per=1.0)
    
    tasks = [rate_limited_task(limiter, i) for i in range(15)]
    start = time.time()
    
    await asyncio.gather(*tasks)
    
    duration = time.time() - start
    print(f"总耗时:{duration:.2f}s(理论上应约 3 秒)")

asyncio.run(main())

九、异常处理

9.1 try/except 在 async 中

import asyncio

async def risky_task():
    await asyncio.sleep(1)
    raise ValueError("出错了!")

async def main():
    try:
        result = await risky_task()
    except ValueError as e:
        print(f"捕获异常:{e}")
    finally:
        print("清理代码")

asyncio.run(main())

9.2 gather 中的异常

import asyncio

async def task(id, should_fail=False):
    await asyncio.sleep(0.5)
    if should_fail:
        raise ValueError(f"Task {id} 失败")
    return f"Task {id} 完成"

async def main():
    # return_exceptions=True:捕获所有异常,返回异常对象
    results = await asyncio.gather(
        task(1),
        task(2, should_fail=True),
        task(3),
        return_exceptions=True
    )
    
    for i, result in enumerate(results, 1):
        if isinstance(result, Exception):
            print(f"Task {i} 异常:{result}")
        else:
            print(f"Task {i} 结果:{result}")

asyncio.run(main())

9.3 超时处理

import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "完成"

async def with_timeout():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=3.0)
        print(f"结果:{result}")
    except asyncio.TimeoutError:
        print("任务超时!")

async def shield_from_cancellation():
    task = asyncio.create_task(slow_task())
    
    try:
        # shield 保护任务不被取消
        result = await asyncio.shield(task)
    except asyncio.CancelledError:
        print("主任务被取消,但保护的任务继续运行...")
        result = await task
        print(f"任务完成:{result}")

asyncio.run(with_timeout())

十、asyncio 与同步代码的集成

10.1 run_in_executor 运行同步代码

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def cpu_bound_sync():
    """同步 CPU 密集型任务。"""
    result = 0
    for i in range(10000000):
        result += i * i
    return result

async def main():
    # 在线程池中运行同步代码
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_sync)
        print(f"计算结果:{result}")

asyncio.run(main())

10.2 to_thread(Python 3.9+)

import asyncio

def blocking_io():
    time.sleep(1)
    return "同步 I/O 完成"

async def main():
    # 将同步阻塞代码放到线程中
    result = await asyncio.to_thread(blocking_io)
    print(result)

asyncio.run(main())

十一、常见问题与注意事项

11.1 忘记 await

async def wrong():
    coro = some_async_function()  # 没有 await
    return coro  # 返回协程对象,不是结果

async def correct():
    result = await some_async_function()  # 正确
    return result

11.2 在同步代码中调用异步代码

import asyncio

# 错误:在同步函数中调用 asyncio.run()
def sync_wrapper():
    asyncio.run(async_function())  # 会创建新的事件循环

# 正确:使用 get_event_loop()
def sync_wrapper():
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # 没有正在运行的事件循环
        asyncio.run(async_function())
    else:
        # 已有事件循环,在新线程中运行
        import concurrent.futures
        with concurrent.futures.ThreadPoolExecutor() as pool:
            loop.run_in_executor(pool, asyncio.run, async_function())

11.3 事件循环与嵌套

import asyncio

# asyncio.run() 不能嵌套使用
async def inner():
    return await asyncio.sleep(1, result="inner")

async def outer():
    # asyncio.run(inner())  # 错误:不能嵌套
    result = await inner()  # 正确
    return result

asyncio.run(outer())

十二、总结

asyncio 是 Python 处理高并发 I/O 密集型任务的首选方案,它通过事件循环和协程机制,在单线程内实现了高效的并发执行。

本文的核心要点:async/await 是 asyncio 的核心语法,定义了协程和等待机制。事件循环是 asyncio 的调度中心,管理所有协程的执行。Task 是 Future 的子类,用于调度和追踪协程的执行状态。asyncio 提供了 Lock、Semaphore、Queue、Event 等同步原语,用于协程间协调。gather、wait、as_completed 提供了不同的任务组合方式。asyncio 与同步代码可以通过 run_in_executor 或 to_thread 集成。


上一篇【第 016 篇】并发编程(一)——多线程与多进程
下一篇【第 018 篇】文件与IO操作——读写CSV、JSON、Excel


参考资料

  1. Python 官方文档 - asyncio
  2. aiohttp 官方文档
  3. PEP 492 - Coroutines with async and await syntax
  4. Real Python - Async IO in Python
  5. Awesome asyncio
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐