Python 异步编程完全指南:从入门到精通

阅读提示:本文约 8000 字,预计阅读时间 20 分钟。建议收藏后反复阅读,配合代码实践效果更佳。


在这里插入图片描述

前言:为什么你必须掌握异步编程?

想象一下这个场景:你需要从 100 个网站抓取数据。

  • 传统同步方式:一个接一个地请求,每个请求等待 1 秒,总共需要 100 秒
  • 异步方式:同时发起 100 个请求,总共只需要 约 1 秒

效率提升 100 倍! 这就是异步编程的魔力。

在当今互联网时代,高并发、高性能已经成为应用的标配需求。无论你是:

  • 🕷️ 开发爬虫系统
  • 🌐 构建 Web 服务
  • 📊 处理实时数据流
  • 🤖 开发聊天机器人

异步编程都是你绕不开的必修课。

本文将带你从零开始,彻底掌握 Python 异步编程的精髓。


目录


一、理解异步:从生活场景说起

1.1 餐厅点餐的启示

假设你去餐厅吃饭:

同步模式(糟糕的餐厅)

服务员 → 接待客人A → 等待A点完餐 → 送餐给A → 等A吃完 → 接待客人B → ...

一个服务员一次只能服务一位客人,效率极低。

异步模式(高效的餐厅)

服务员 → 接待客人A → A看菜单时去接待B → B看菜单时去送餐给C → ...

服务员在等待时去做其他事情,效率大幅提升。

1.2 程序世界中的同步与异步

┌─────────────────────────────────────────────────────────────┐
│                      同步执行模型                            │
├─────────────────────────────────────────────────────────────┤
│  任务A ████████████                                         │
│  任务B             ████████████                              │
│  任务C                         ████████████                  │
│  ─────────────────────────────────────────► 时间            │
│  总耗时: 任务A + 任务B + 任务C = 30秒                        │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                      异步执行模型                            │
├─────────────────────────────────────────────────────────────┤
│  任务A ████████████                                         │
│  任务B ████████████                                         │
│  任务C ████████████                                         │
│  ─────────────────────────────────────────► 时间            │
│  总耗时: max(任务A, 任务B, 任务C) = 10秒                     │
└─────────────────────────────────────────────────────────────┘

1.3 关键概念辨析

概念 含义 类比
同步 (Sync) 按顺序执行,等待完成 排队买票,必须等前面的人买完
异步 (Async) 不等待,继续做其他事 取号等候,可以先去逛街
并发 (Concurrent) 同一时间段处理多个任务 一个厨师同时炒几个菜
并行 (Parallel) 同一时刻执行多个任务 多个厨师同时炒菜
阻塞 (Blocking) 等待操作完成才能继续 打电话时必须等对方接听
非阻塞 (Non-blocking) 不等待,立即返回 发微信后不用等回复

💡 重要理解:Python 的 asyncio 实现的是单线程并发,通过事件循环在多个任务间切换,而不是真正的并行执行。


二、Python 异步编程基础

2.1 第一个异步程序

让我们从最简单的例子开始:

import asyncio

# 定义一个协程函数(使用 async 关键字)
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 异步等待 1 秒
    print("World")

# 运行协程
asyncio.run(say_hello())

输出

Hello
(等待 1 秒)
World

2.2 async/await 语法详解

async def - 定义协程函数
# 普通函数
def regular_function():
    return "I'm regular"

# 协程函数(加上 async 关键字)
async def coroutine_function():
    return "I'm a coroutine"

# 调用对比
print(regular_function())        # 输出: I'm regular
print(coroutine_function())      # 输出: <coroutine object ...>  ← 返回协程对象!

⚠️ 注意:调用协程函数不会立即执行,而是返回一个协程对象,需要用 awaitasyncio.run() 来执行。

await - 等待协程完成
async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(2)  # 模拟网络请求
    print("数据获取完成!")
    return {"status": "success"}

async def main():
    # await 会暂停当前协程,等待 fetch_data() 完成
    result = await fetch_data()
    print(f"结果: {result}")

asyncio.run(main())

2.3 同步 vs 异步:代码对比

场景:模拟 3 个耗时 1 秒的网络请求

同步版本
import time

def fetch_data(id):
    print(f"开始请求 {id}")
    time.sleep(1)  # 模拟耗时操作
    print(f"完成请求 {id}")
    return f"data_{id}"

def main():
    start = time.time()
    
    result1 = fetch_data(1)  # 等待 1 秒
    result2 = fetch_data(2)  # 再等待 1 秒
    result3 = fetch_data(3)  # 再等待 1 秒
    
    print(f"结果: {result1}, {result2}, {result3}")
    print(f"总耗时: {time.time() - start:.2f} 秒")

main()

输出

开始请求 1
完成请求 1
开始请求 2
完成请求 2
开始请求 3
完成请求 3
结果: data_1, data_2, data_3
总耗时: 3.01 秒
异步版本
import asyncio
import time

async def fetch_data(id):
    print(f"开始请求 {id}")
    await asyncio.sleep(1)  # 异步等待
    print(f"完成请求 {id}")
    return f"data_{id}"

async def main():
    start = time.time()
    
    # 使用 gather 并发执行多个协程
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    )
    
    print(f"结果: {results}")
    print(f"总耗时: {time.time() - start:.2f} 秒")

asyncio.run(main())

输出

开始请求 1
开始请求 2
开始请求 3
完成请求 1
完成请求 2
完成请求 3
结果: ['data_1', 'data_2', 'data_3']
总耗时: 1.00 秒

对比总结

指标 同步版本 异步版本 提升
总耗时 3.01 秒 1.00 秒 3 倍
代码复杂度 简单 稍复杂 -
资源利用率 -

三、asyncio 核心概念详解

3.1 事件循环 (Event Loop)

事件循环是异步编程的心脏,负责调度和执行协程。

┌────────────────────────────────────────────────────────────┐
│                       事件循环工作原理                       │
├────────────────────────────────────────────────────────────┤
│                                                            │
│    ┌─────────┐    ┌─────────┐    ┌─────────┐              │
│    │ 协程 A  │    │ 协程 B  │    │ 协程 C  │              │
│    └────┬────┘    └────┬────┘    └────┬────┘              │
│         │              │              │                    │
│         ▼              ▼              ▼                    │
│    ┌───────────────────────────────────────────┐          │
│    │              任务队列 (Task Queue)         │          │
│    └───────────────────┬───────────────────────┘          │
│                        │                                   │
│                        ▼                                   │
│    ┌───────────────────────────────────────────┐          │
│    │        事件循环 (Event Loop)               │          │
│    │   • 检查就绪的任务                          │          │
│    │   • 执行任务直到遇到 await                  │          │
│    │   • 切换到下一个就绪任务                    │          │
│    │   • 处理 I/O 事件                          │          │
│    └───────────────────────────────────────────┘          │
│                                                            │
└────────────────────────────────────────────────────────────┘
事件循环的基本用法
import asyncio

async def main():
    print("Hello, asyncio!")

# 方式 1:推荐(Python 3.7+)
asyncio.run(main())

# 方式 2:手动管理(更多控制)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    loop.close()

# 方式 3:获取当前运行的循环(在协程内部使用)
async def inside_coroutine():
    loop = asyncio.get_running_loop()
    print(f"当前循环: {loop}")

3.2 协程 (Coroutine)

协程是可以暂停和恢复的函数,是异步编程的基本单元。

import asyncio

async def my_coroutine(name, delay):
    """一个简单的协程示例"""
    print(f"[{name}] 开始执行")
    await asyncio.sleep(delay)  # 暂停点
    print(f"[{name}] 执行完成")
    return f"{name}_result"

async def main():
    # 协程必须被 await 或作为 Task 运行
    result = await my_coroutine("测试", 1)
    print(f"返回值: {result}")

asyncio.run(main())

3.3 Task(任务)

Task 是对协程的包装,用于并发执行多个协程。

import asyncio

async def worker(name, delay):
    print(f"Worker {name} 开始")
    await asyncio.sleep(delay)
    print(f"Worker {name} 完成")
    return f"result_{name}"

async def main():
    # 创建 Task(会立即开始调度)
    task1 = asyncio.create_task(worker("A", 2))
    task2 = asyncio.create_task(worker("B", 1))
    task3 = asyncio.create_task(worker("C", 3))
    
    print("所有任务已创建")
    
    # 等待所有任务完成
    result1 = await task1
    result2 = await task2
    result3 = await task3
    
    print(f"结果: {result1}, {result2}, {result3}")

asyncio.run(main())

输出

所有任务已创建
Worker A 开始
Worker B 开始
Worker C 开始
Worker B 完成
Worker A 完成
Worker C 完成
结果: result_A, result_B, result_C

3.4 gather vs wait vs TaskGroup

asyncio.gather() - 最常用
async def main():
    # 并发执行,按顺序返回结果
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3),
        return_exceptions=True  # 异常不会中断其他任务
    )
    print(results)  # ['data_1', 'data_2', 'data_3']
asyncio.wait() - 更多控制
async def main():
    tasks = [
        asyncio.create_task(fetch_data(1)),
        asyncio.create_task(fetch_data(2)),
        asyncio.create_task(fetch_data(3)),
    ]
    
    # 等待第一个完成
    done, pending = await asyncio.wait(
        tasks, 
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print(f"完成: {len(done)}, 待处理: {len(pending)}")
    
    # 取消剩余任务
    for task in pending:
        task.cancel()
asyncio.TaskGroup - Python 3.11+ 推荐
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data(1))
        task2 = tg.create_task(fetch_data(2))
        task3 = tg.create_task(fetch_data(3))
    
    # TaskGroup 自动等待所有任务完成
    # 如果任何任务失败,会取消其他任务并抛出 ExceptionGroup
    print(task1.result(), task2.result(), task3.result())

3.5 协程执行流程图

┌─────────────────────────────────────────────────────────────────┐
│                     协程执行流程                                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   async def fetch():              事件循环调度器                 │
│       print("start")             ┌─────────────┐                │
│       await sleep(1)  ──────────►│ 暂停 fetch  │                │
│       print("end")               │ 执行其他任务 │                │
│       return data                │ sleep完成后 │                │
│                       ◄──────────│ 恢复 fetch  │                │
│                                  └─────────────┘                │
│                                                                 │
│   时间线:                                                        │
│   ─────────────────────────────────────────────────────────►    │
│   │    print     │ 其他任务执行 │   print    │                   │
│   │   "start"    │  (1秒等待)   │   "end"    │                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

四、实战案例:从简单到复杂

4.1 案例一:批量下载图片

一个完整的异步图片下载器:

import asyncio
import aiohttp
import aiofiles
from pathlib import Path
import time


async def download_image(session: aiohttp.ClientSession, url: str, save_path: Path) -> dict:
    """下载单张图片"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                content = await response.read()
                async with aiofiles.open(save_path, 'wb') as f:
                    await f.write(content)
                return {"url": url, "status": "success", "size": len(content)}
            else:
                return {"url": url, "status": "failed", "error": f"HTTP {response.status}"}
    except Exception as e:
        return {"url": url, "status": "error", "error": str(e)}


async def batch_download(urls: list, save_dir: str, max_concurrent: int = 10):
    """批量下载图片,限制并发数"""
    save_path = Path(save_dir)
    save_path.mkdir(parents=True, exist_ok=True)
    
    # 使用信号量控制并发
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def limited_download(session, url, path):
        async with semaphore:
            return await download_image(session, url, path)
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i, url in enumerate(urls):
            filename = f"image_{i:04d}.jpg"
            file_path = save_path / filename
            task = asyncio.create_task(
                limited_download(session, url, file_path)
            )
            tasks.append(task)
        
        # 使用 tqdm 显示进度(如果可用)
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 统计结果
    success = sum(1 for r in results if isinstance(r, dict) and r.get("status") == "success")
    failed = len(results) - success
    
    return {"total": len(urls), "success": success, "failed": failed}


async def main():
    # 示例 URL 列表
    urls = [
        f"https://picsum.photos/800/600?random={i}" 
        for i in range(20)
    ]
    
    print(f"开始下载 {len(urls)} 张图片...")
    start = time.time()
    
    result = await batch_download(urls, "./downloads", max_concurrent=5)
    
    elapsed = time.time() - start
    print(f"下载完成!成功: {result['success']}, 失败: {result['failed']}")
    print(f"总耗时: {elapsed:.2f} 秒")


if __name__ == "__main__":
    asyncio.run(main())

4.2 案例二:异步 API 调用器

构建一个健壮的异步 API 客户端:

import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional, Any
import json


@dataclass
class APIResponse:
    """API 响应数据类"""
    status_code: int
    data: Any
    error: Optional[str] = None
    elapsed: float = 0.0


class AsyncAPIClient:
    """异步 API 客户端"""
    
    def __init__(self, base_url: str, timeout: int = 30, max_retries: int = 3):
        self.base_url = base_url.rstrip('/')
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_retries = max_retries
        self._session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        self._session = aiohttp.ClientSession(timeout=self.timeout)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()
    
    async def _request(
        self, 
        method: str, 
        endpoint: str, 
        **kwargs
    ) -> APIResponse:
        """发送请求(带重试机制)"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        for attempt in range(self.max_retries):
            try:
                start_time = asyncio.get_event_loop().time()
                
                async with self._session.request(method, url, **kwargs) as response:
                    elapsed = asyncio.get_event_loop().time() - start_time
                    
                    try:
                        data = await response.json()
                    except:
                        data = await response.text()
                    
                    return APIResponse(
                        status_code=response.status,
                        data=data,
                        elapsed=elapsed
                    )
                    
            except asyncio.TimeoutError:
                if attempt == self.max_retries - 1:
                    return APIResponse(status_code=0, data=None, error="Timeout")
                await asyncio.sleep(2 ** attempt)  # 指数退避
                
            except aiohttp.ClientError as e:
                if attempt == self.max_retries - 1:
                    return APIResponse(status_code=0, data=None, error=str(e))
                await asyncio.sleep(2 ** attempt)
    
    async def get(self, endpoint: str, params: dict = None) -> APIResponse:
        return await self._request("GET", endpoint, params=params)
    
    async def post(self, endpoint: str, data: dict = None) -> APIResponse:
        return await self._request("POST", endpoint, json=data)
    
    async def batch_get(self, endpoints: list, max_concurrent: int = 10) -> list:
        """批量 GET 请求"""
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def limited_get(endpoint):
            async with semaphore:
                return await self.get(endpoint)
        
        tasks = [limited_get(ep) for ep in endpoints]
        return await asyncio.gather(*tasks)


# 使用示例
async def main():
    async with AsyncAPIClient("https://jsonplaceholder.typicode.com") as client:
        # 单个请求
        response = await client.get("/posts/1")
        print(f"状态码: {response.status_code}")
        print(f"耗时: {response.elapsed:.3f}s")
        
        # 批量请求
        endpoints = [f"/posts/{i}" for i in range(1, 11)]
        results = await client.batch_get(endpoints)
        
        for i, r in enumerate(results, 1):
            print(f"Post {i}: {r.data.get('title', 'N/A')[:30]}...")


if __name__ == "__main__":
    asyncio.run(main())

4.3 案例三:实时数据处理管道

import asyncio
from asyncio import Queue
import random


class AsyncDataPipeline:
    """异步数据处理管道"""
    
    def __init__(self, num_workers: int = 3):
        self.input_queue: Queue = Queue()
        self.output_queue: Queue = Queue()
        self.num_workers = num_workers
        self._running = False
    
    async def producer(self, data_source: list):
        """生产者:将数据放入队列"""
        for item in data_source:
            await self.input_queue.put(item)
            await asyncio.sleep(0.1)  # 模拟数据流入
        
        # 发送结束信号
        for _ in range(self.num_workers):
            await self.input_queue.put(None)
    
    async def worker(self, worker_id: int):
        """消费者:处理数据"""
        processed = 0
        while True:
            item = await self.input_queue.get()
            
            if item is None:  # 结束信号
                break
            
            # 模拟数据处理
            await asyncio.sleep(random.uniform(0.1, 0.3))
            result = f"Worker-{worker_id} processed: {item}"
            
            await self.output_queue.put(result)
            processed += 1
        
        print(f"Worker-{worker_id} 完成,处理了 {processed} 个任务")
    
    async def collector(self, expected_count: int):
        """收集处理结果"""
        results = []
        while len(results) < expected_count:
            result = await self.output_queue.get()
            results.append(result)
            print(f"✓ {result}")
        return results
    
    async def run(self, data_source: list):
        """运行管道"""
        # 创建所有任务
        producer_task = asyncio.create_task(self.producer(data_source))
        worker_tasks = [
            asyncio.create_task(self.worker(i)) 
            for i in range(self.num_workers)
        ]
        collector_task = asyncio.create_task(
            self.collector(len(data_source))
        )
        
        # 等待所有任务完成
        await producer_task
        await asyncio.gather(*worker_tasks)
        results = await collector_task
        
        return results


async def main():
    pipeline = AsyncDataPipeline(num_workers=3)
    
    # 模拟数据源
    data = [f"data_{i}" for i in range(10)]
    
    print("开始处理数据...")
    print("-" * 50)
    
    results = await pipeline.run(data)
    
    print("-" * 50)
    print(f"处理完成,共 {len(results)} 条结果")


if __name__ == "__main__":
    asyncio.run(main())

4.4 案例四:WebSocket 实时通信

import asyncio
import websockets
import json
from datetime import datetime


class WebSocketClient:
    """WebSocket 客户端"""
    
    def __init__(self, uri: str):
        self.uri = uri
        self.websocket = None
        self._running = False
    
    async def connect(self):
        """建立连接"""
        self.websocket = await websockets.connect(self.uri)
        self._running = True
        print(f"已连接到 {self.uri}")
    
    async def disconnect(self):
        """断开连接"""
        self._running = False
        if self.websocket:
            await self.websocket.close()
            print("连接已关闭")
    
    async def send(self, message: dict):
        """发送消息"""
        if self.websocket:
            await self.websocket.send(json.dumps(message))
    
    async def receive(self) -> dict:
        """接收消息"""
        if self.websocket:
            data = await self.websocket.recv()
            return json.loads(data)
    
    async def listen(self, callback):
        """持续监听消息"""
        try:
            while self._running:
                message = await self.receive()
                await callback(message)
        except websockets.ConnectionClosed:
            print("连接已断开")


async def message_handler(message: dict):
    """消息处理回调"""
    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"[{timestamp}] 收到消息: {message}")


async def main():
    client = WebSocketClient("wss://echo.websocket.org")
    
    await client.connect()
    
    # 并发执行:监听 + 定时发送
    async def send_periodic():
        for i in range(5):
            await client.send({"type": "ping", "count": i})
            await asyncio.sleep(2)
        await client.disconnect()
    
    await asyncio.gather(
        client.listen(message_handler),
        send_periodic()
    )


if __name__ == "__main__":
    asyncio.run(main())

五、高级技巧与性能优化

5.1 信号量控制并发

当需要限制同时执行的任务数量时,使用 Semaphore

import asyncio

async def limited_task(sem: asyncio.Semaphore, task_id: int):
    async with sem:  # 自动获取和释放信号量
        print(f"任务 {task_id} 开始执行")
        await asyncio.sleep(1)
        print(f"任务 {task_id} 执行完成")
        return task_id

async def main():
    # 限制最多 3 个任务同时执行
    semaphore = asyncio.Semaphore(3)
    
    tasks = [
        limited_task(semaphore, i) 
        for i in range(10)
    ]
    
    results = await asyncio.gather(*tasks)
    print(f"所有任务完成: {results}")

asyncio.run(main())

输出观察:你会发现任务是 3 个一组执行的,而不是 10 个同时开始。

5.2 超时控制

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "完成"

async def main():
    # 方式 1:wait_for(超时抛出异常)
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("操作超时!")
    
    # 方式 2:timeout(Python 3.11+,更优雅)
    async with asyncio.timeout(2.0):
        result = await slow_operation()
    
    # 方式 3:手动控制
    task = asyncio.create_task(slow_operation())
    try:
        result = await asyncio.wait_for(asyncio.shield(task), timeout=2.0)
    except asyncio.TimeoutError:
        print("超时,但任务继续在后台执行")
        # task 仍在运行,可以稍后获取结果

asyncio.run(main())

5.3 优雅取消任务

import asyncio

async def cancellable_task():
    try:
        while True:
            print("工作中...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("收到取消信号,执行清理...")
        # 执行清理工作
        await asyncio.sleep(0.5)
        print("清理完成")
        raise  # 重新抛出,让调用者知道任务已取消

async def main():
    task = asyncio.create_task(cancellable_task())
    
    # 运行 3 秒后取消
    await asyncio.sleep(3)
    print("准备取消任务...")
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("任务已成功取消")

asyncio.run(main())

5.4 异步上下文管理器

import asyncio
import aiofiles

class AsyncResourceManager:
    """异步资源管理器示例"""
    
    def __init__(self, name: str):
        self.name = name
        self.resource = None
    
    async def __aenter__(self):
        print(f"正在获取资源 {self.name}...")
        await asyncio.sleep(0.5)  # 模拟异步初始化
        self.resource = f"Resource({self.name})"
        print(f"资源 {self.name} 已就绪")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"正在释放资源 {self.name}...")
        await asyncio.sleep(0.3)  # 模拟异步清理
        self.resource = None
        print(f"资源 {self.name} 已释放")
        return False  # 不抑制异常
    
    async def do_work(self):
        print(f"使用 {self.resource} 工作")
        await asyncio.sleep(1)


async def main():
    async with AsyncResourceManager("DB连接") as manager:
        await manager.do_work()
    
    print("所有资源已清理")

asyncio.run(main())

5.5 异步迭代器与生成器

import asyncio

class AsyncDataStream:
    """异步数据流迭代器"""
    
    def __init__(self, data: list):
        self.data = data
        self.index = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # 模拟异步获取
        value = self.data[self.index]
        self.index += 1
        return value


async def async_generator(n: int):
    """异步生成器"""
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i * 2


async def main():
    # 使用异步迭代器
    print("异步迭代器:")
    async for item in AsyncDataStream([1, 2, 3, 4, 5]):
        print(f"  收到: {item}")
    
    # 使用异步生成器
    print("\n异步生成器:")
    async for value in async_generator(5):
        print(f"  生成: {value}")

asyncio.run(main())

5.6 性能对比表

场景 同步方案 异步方案 多线程方案 推荐
100 个 HTTP 请求 ~100秒 ~1秒 ~10秒 异步
文件批量读写 ~10秒 ~2秒 ~3秒 异步
CPU 密集计算 ~10秒 ~10秒 ~3秒 多进程
数据库批量查询 ~50秒 ~5秒 ~10秒 异步
实时消息处理 阻塞 流畅 复杂 异步

六、常见陷阱与避坑指南

6.1 陷阱一:阻塞事件循环

这是最常见的错误!

import asyncio
import time

# ❌ 错误:使用同步的 time.sleep
async def bad_example():
    print("开始")
    time.sleep(1)  # 阻塞整个事件循环!
    print("结束")

# ✅ 正确:使用异步的 asyncio.sleep
async def good_example():
    print("开始")
    await asyncio.sleep(1)  # 不阻塞,允许其他任务执行
    print("结束")

常见阻塞操作及替代方案

阻塞操作 异步替代
time.sleep() await asyncio.sleep()
requests.get() await aiohttp.get()
open().read() await aiofiles.open()
socket.recv() await reader.read()
CPU 密集计算 await loop.run_in_executor()

6.2 陷阱二:忘记 await

import asyncio

async def fetch_data():
    await asyncio.sleep(1)
    return "data"

async def main():
    # ❌ 错误:忘记 await,result 是协程对象而非结果
    result = fetch_data()
    print(result)  # <coroutine object fetch_data at 0x...>
    
    # ✅ 正确:使用 await
    result = await fetch_data()
    print(result)  # "data"

asyncio.run(main())

💡 提示:Python 会给出警告 RuntimeWarning: coroutine 'fetch_data' was never awaited,要注意查看!

6.3 陷阱三:在同步函数中调用协程

import asyncio

async def async_function():
    return "async result"

# ❌ 错误:直接在同步函数中 await
def sync_function():
    result = await async_function()  # SyntaxError!
    return result

# ✅ 正确:使用 asyncio.run() 桥接
def sync_function():
    result = asyncio.run(async_function())
    return result

# ✅ 或者:在已有事件循环中使用
def sync_function_in_loop():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(async_function())
    return result

6.4 陷阱四:异常处理不当

import asyncio

async def risky_task(n):
    if n == 2:
        raise ValueError(f"任务 {n} 出错!")
    return f"任务 {n} 成功"

async def main():
    # ❌ 问题:一个任务失败,gather 会抛出异常,但其他任务可能还在运行
    try:
        results = await asyncio.gather(
            risky_task(1),
            risky_task(2),  # 这个会失败
            risky_task(3),
        )
    except ValueError as e:
        print(f"出错: {e}")
        # 其他任务的状态不确定!
    
    # ✅ 推荐:使用 return_exceptions=True
    results = await asyncio.gather(
        risky_task(1),
        risky_task(2),
        risky_task(3),
        return_exceptions=True  # 异常作为结果返回,不会中断
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i+1} 失败: {result}")
        else:
            print(f"任务 {i+1} 成功: {result}")

asyncio.run(main())

6.5 陷阱五:资源泄漏

import asyncio
import aiohttp

# ❌ 错误:没有正确关闭 session
async def bad_fetch():
    session = aiohttp.ClientSession()
    response = await session.get("https://example.com")
    return await response.text()
    # session 永远不会关闭!

# ✅ 正确:使用 async with 自动管理
async def good_fetch():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as response:
            return await response.text()
    # 退出 with 块时自动关闭

6.6 陷阱六:并发修改共享状态

import asyncio

counter = 0

# ❌ 危险:多个协程并发修改共享变量
async def bad_increment():
    global counter
    for _ in range(1000):
        temp = counter
        await asyncio.sleep(0)  # 让出控制权
        counter = temp + 1

# ✅ 安全:使用锁保护
async def good_increment(lock: asyncio.Lock):
    global counter
    for _ in range(1000):
        async with lock:
            counter += 1

async def main():
    global counter
    
    # 不安全的方式
    counter = 0
    await asyncio.gather(bad_increment(), bad_increment())
    print(f"不安全结果: {counter}")  # 可能小于 2000!
    
    # 安全的方式
    counter = 0
    lock = asyncio.Lock()
    await asyncio.gather(
        good_increment(lock), 
        good_increment(lock)
    )
    print(f"安全结果: {counter}")  # 正好 2000

asyncio.run(main())

七、异步生态:常用库推荐

7.1 HTTP 客户端

库名 特点 适用场景
aiohttp 功能全面,社区活跃 Web 爬虫、API 调用
httpx 同时支持同步/异步 需要灵活切换的场景
aiofiles 异步文件操作 配合 HTTP 下载
# aiohttp 示例
import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get('https://api.example.com/data') as resp:
        data = await resp.json()

# httpx 示例
import httpx

async with httpx.AsyncClient() as client:
    response = await client.get('https://api.example.com/data')
    data = response.json()

7.2 数据库驱动

数据库 推荐库 安装命令
PostgreSQL asyncpg pip install asyncpg
MySQL aiomysql pip install aiomysql
Redis aioredis pip install aioredis
MongoDB motor pip install motor
SQLite aiosqlite pip install aiosqlite
# asyncpg 示例
import asyncpg

async def query_db():
    conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
    rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
    await conn.close()
    return rows

7.3 Web 框架

框架 特点 GitHub Stars
FastAPI 现代、高性能、自动文档 60k+
Starlette 轻量级、FastAPI 的基础 8k+
Sanic 类 Flask 语法 17k+
aiohttp.web aiohttp 自带 -
# FastAPI 示例
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}

7.4 任务队列

库名 特点
Celery 成熟稳定,但主要是同步
arq 纯异步,基于 Redis
dramatiq 简单可靠
# arq 示例
from arq import create_pool
from arq.connections import RedisSettings

async def download_content(ctx, url: str):
    # 异步任务逻辑
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

class WorkerSettings:
    functions = [download_content]
    redis_settings = RedisSettings()

7.5 其他实用工具

# aiocache - 异步缓存
from aiocache import cached

@cached(ttl=60)
async def get_expensive_data():
    await asyncio.sleep(2)
    return "data"

# tenacity - 重试机制
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
async def unreliable_api():
    # 自动重试 3 次
    pass

# tqdm - 异步进度条
from tqdm.asyncio import tqdm

async for item in tqdm(async_iterator):
    await process(item)

八、总结与学习路线图

8.1 核心要点回顾

┌──────────────────────────────────────────────────────────────┐
│                  Python 异步编程核心要点                      │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  ① 基础语法                                                   │
│     • async def 定义协程                                      │
│     • await 等待协程完成                                       │
│     • asyncio.run() 启动事件循环                              │
│                                                              │
│  ② 并发执行                                                   │
│     • asyncio.gather() 并发等待多个任务                        │
│     • asyncio.create_task() 创建任务                          │
│     • asyncio.wait() 更灵活的等待                              │
│                                                              │
│  ③ 流程控制                                                   │
│     • Semaphore 限制并发数                                    │
│     • Lock 保护共享资源                                       │
│     • Queue 任务队列                                          │
│     • Event 事件通知                                          │
│                                                              │
│  ④ 最佳实践                                                   │
│     • 使用 async with 管理资源                                │
│     • 合理处理异常                                            │
│     • 避免阻塞操作                                            │
│     • 控制并发数量                                            │
│                                                              │
└──────────────────────────────────────────────────────────────┘

8.2 适用场景决策树

你的任务是什么类型?
      │
      ├─── I/O 密集型(网络请求、文件读写)
      │         │
      │         └─── ✅ 使用异步编程 (asyncio)
      │
      ├─── CPU 密集型(计算、数据处理)
      │         │
      │         └─── ✅ 使用多进程 (multiprocessing)
      │
      └─── 混合型
                │
                └─── ✅ 异步 + run_in_executor

8.3 学习路线图

入门阶段 (1-2周)
    │
    ├── 理解同步与异步的区别
    ├── 掌握 async/await 基本语法  
    ├── 学会使用 asyncio.run() 和 gather()
    └── 完成简单的并发 HTTP 请求练习
    │
    ▼
进阶阶段 (2-4周)
    │
    ├── 深入理解事件循环机制
    ├── 掌握 Task、Future 的使用
    ├── 学习异步上下文管理器和迭代器
    └── 实践:构建异步爬虫或 API 客户端
    │
    ▼
高级阶段 (4-8周)
    │
    ├── 掌握异步框架 (FastAPI/aiohttp)
    ├── 异步数据库操作 (asyncpg/aiomysql)
    ├── 性能调优和监控
    └── 实践:构建完整的异步 Web 服务
    │
    ▼
精通阶段 (持续)
    │
    ├── 深入源码理解底层实现
    ├── 自定义事件循环和协议
    ├── 复杂系统架构设计
    └── 贡献开源项目

8.4 常用代码片段速查

# 1. 基本并发模板
async def main():
    results = await asyncio.gather(
        task1(),
        task2(),
        task3(),
        return_exceptions=True
    )

# 2. 限制并发数模板
sem = asyncio.Semaphore(10)
async def limited_task():
    async with sem:
        await actual_work()

# 3. 超时控制模板
try:
    result = await asyncio.wait_for(task(), timeout=5.0)
except asyncio.TimeoutError:
    print("超时")

# 4. 异步 HTTP 请求模板
async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
        data = await resp.json()

# 5. 后台任务模板
task = asyncio.create_task(background_work())
# ... 做其他事情 ...
await task  # 需要时等待结果

附录:学习资源

官方文档

推荐教程

开源项目参考

  • aiohttp - 学习异步 HTTP 实现
  • httpx - 现代 HTTP 客户端设计
  • FastAPI - 高性能 Web 框架

书籍推荐

  • 《Python 并发编程》- 系统学习并发模型
  • 《流畅的 Python》第二版 - 异步编程章节

结语

异步编程是现代 Python 开发的必备技能。通过本文的学习,你应该已经:

  • ✅ 理解了异步编程的核心概念
  • ✅ 掌握了 asyncio 的基本用法
  • ✅ 学会了多种实战技巧
  • ✅ 了解了常见陷阱和解决方案

记住:理论学习只是第一步,真正的掌握来自于实践。建议从简单的项目开始,逐步积累经验。

如果这篇文章对你有帮助,欢迎 点赞、收藏、转发!有任何问题欢迎在评论区讨论。


作者提示:异步编程有一定的学习曲线,遇到困难是正常的。保持耐心,多写代码,你一定能掌握它!

Happy Coding! 🚀

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐