一、为什么需要异步编程?

在日常开发中,大多数程序的性能瓶颈不是 CPU 计算,而是 I/O 等待—— 等网络响应、等数据库查询、等文件读写。传统的同步代码在等待期间会一直阻塞, CPU 只能干坐着什么都不做。

用一个生活中的类比来理解:

同步模式:你去咖啡馆点了一杯拿铁,然后站在柜台前一动不动地等5分钟,直到咖啡做好才去找座位。
异步模式:你点好单拿到号码,然后先去找座位、刷手机,服务员做好后叫你去取。

异步编程就是让程序在等待 I/O 的时候去处理别的任务,充分利用等待时间, 显著提升 I/O 密集型程序的吞吐量。

同步 vs 异步:性能差异有多大?

场景 同步方式 asyncio 方式 提升倍数
并发请求 100 个 URL ~50 秒 ~2 秒 约 25x
批量数据库查询 1000 次 ~30 秒 ~3 秒 约 10x
读取 500 个小文件 ~8 秒 ~1 秒 约 8x

ℹ️ 适用场景提示:asyncio 适合 I/O 密集型任务(网络请求、数据库、文件操作)。 对于 CPU 密集型任务(大规模计算、图像处理),应使用 multiprocessing 多进程方案。


二、协程基础:async 与 await

2.1 什么是协程?

协程(Coroutine)是一种可以在执行过程中 暂停并让出控制权 的函数。 与普通函数不同,调用协程函数不会立即执行,而是返回一个"协程对象", 需要由事件循环(Event Loop)来驱动执行。

定义一个协程函数只需在 def 前加 async

import asyncio

async def say_hello(name: str) -> None:
    """一个简单的协程函数"""
    # await 关键字:暂停当前协程,让出事件循环控制权
    await asyncio.sleep(1)   # 模拟 1 秒的 I/O 等待
    print(f"Hello, {name}!")

# 运行协程
asyncio.run(say_hello("World"))

预期输出:

Hello, World!

⚠️ 常见误区:直接调用 say_hello("World") 不会执行任何代码, 只会返回一个协程对象 <coroutine object say_hello>。 必须通过 asyncio.run() 或 await 来驱动执行。

2.2 await 关键字的作用

await 只能用在 async 函数内部。它的作用是: 暂停当前协程的执行,等待右侧的可等待对象(Awaitable)完成,期间将控制权交还给事件循环

可以被 await 的对象有三类:

  • 协程对象(coroutine
  • Task 对象(asyncio.Task
  • Future 对象(asyncio.Future
import asyncio

async def fetch_data(url: str) -> str:
    """模拟一次网络请求"""
    print(f"开始请求: {url}")
    await asyncio.sleep(2)   # ← 在这里暂停,让出控制权
    print(f"请求完成: {url}")
    return f"来自 {url} 的数据"

async def main() -> None:
    result = await fetch_data("https://api.example.com/data")
    print(result)

asyncio.run(main())

三、事件循环:asyncio 的心脏

事件循环(Event Loop)是 asyncio 的核心调度器。 它持续监听所有注册的协程和 I/O 事件,当某个协程在 await 处暂停时, 立即切换去执行另一个就绪的协程——这就是"并发"的来源。

┌─────────────────────────────────────────┐
│               Event Loop                │
│                                         │
│  ┌──────────┐    await    ┌──────────┐  │
│  │ 协程 A   │ ──暂停──▶  │  I/O等待 │  │
│  └──────────┘             └──────────┘  │
│       │ 切换                   │完成     │
│       ▼                        ▼        │
│  ┌──────────┐             ┌──────────┐  │
│  │ 协程 B   │ ──运行──▶  │ 协程 A   │  │
│  └──────────┘             └──────────┘  │
└─────────────────────────────────────────┘

3.1 事件循环的正确打开方式

Python 3.7+ 推荐使用 asyncio.run(),它会自动创建、运行并关闭事件循环:

import asyncio

async def main() -> None:
    """所有异步代码的入口点"""
    await asyncio.sleep(1)
    print("事件循环运行完毕")

# ✅ 推荐写法(Python 3.7+)
asyncio.run(main())

# ❌ 旧写法(不推荐,Python 3.10 已废弃)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

四、并发执行:asyncio.gather() 与 asyncio.create_task()

这是 asyncio 最关键的使用场景——同时发起多个异步任务,不互相等待。 理解了这一点,才算真正掌握了 asyncio。

4.1 顺序执行 vs 并发执行(性能对比)

先看问题:下面的代码是真正的并发吗?

import asyncio
import time

async def task(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} 完成"

async def wrong_way() -> None:
    """❌ 错误写法:逐个 await,实际上是顺序执行!"""
    start = time.time()
    r1 = await task("任务A", 2)
    r2 = await task("任务B", 2)
    r3 = await task("任务C", 2)
    print(f"耗时: {time.time() - start:.1f}s")  # 输出:耗时: 6.0s

asyncio.run(wrong_way())
❌ 顺序 await(错误)

每个任务完成后才开始下一个。3个各等2秒的任务 = 总共等6秒。

✅ gather 并发(正确)

3个任务同时开始,一起等待。3个各等2秒的任务 = 总共只等2秒。

4.2 asyncio.gather():最常用的并发方式

import asyncio
import time

async def task(name: str, delay: float) -> str:
    print(f"{name} 开始")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return f"{name} 的结果"

async def right_way() -> None:
    """✅ 正确写法:gather 并发执行所有任务"""
    start = time.time()

    # gather 同时启动所有协程,等待全部完成
    results = await asyncio.gather(
        task("任务A", 2),
        task("任务B", 2),
        task("任务C", 2),
    )

    print(f"耗时: {time.time() - start:.1f}s")  # 输出:耗时: 2.0s
    print(f"所有结果: {results}")

asyncio.run(right_way())

预期输出:

任务A 开始
任务B 开始
任务C 开始
任务A 完成
任务B 完成
任务C 完成
耗时: 2.0s
所有结果: ['任务A 的结果', '任务B 的结果', '任务C 的结果']

4.3 asyncio.create_task():更灵活的任务控制

如果你需要在创建任务后继续执行其他代码,而不是立即等待,使用 create_task()

import asyncio

async def background_job(name: str) -> str:
    await asyncio.sleep(3)
    return f"{name} 后台任务完成"

async def main() -> None:
    # 立即将协程封装为 Task 并调度执行(不阻塞当前代码)
    task1 = asyncio.create_task(background_job("爬虫"))
    task2 = asyncio.create_task(background_job("数据库同步"))

    print("任务已提交,主程序继续运行...")
    await asyncio.sleep(1)
    print("主程序做了其他事情")

    # 最后再等待任务完成、获取结果
    result1 = await task1
    result2 = await task2
    print(result1)
    print(result2)

asyncio.run(main())
方法 特点 适用场景
asyncio.gather() 传入多个协程,一次性并发,返回结果列表 已知所有任务,一起等待结果
asyncio.create_task() 立即调度单个协程为 Task,可独立控制 动态添加任务,需要精细控制执行时机
asyncio.wait() 等待一组 Task,支持超时和条件等待 需要处理超时或只等部分任务完成的场景

五、异常处理与超时控制

5.1 在协程中处理异常

import asyncio

async def risky_request(url: str) -> str:
    await asyncio.sleep(1)
    if "error" in url:
        raise ValueError(f"请求失败: {url}")
    return f"成功: {url}"

async def main() -> None:
    # 方式1:gather 默认一个任务失败就抛出异常
    try:
        results = await asyncio.gather(
            risky_request("https://ok.example.com"),
            risky_request("https://error.example.com"),
        )
    except ValueError as e:
        print(f"捕获到异常: {e}")

    # 方式2:return_exceptions=True,异常作为结果返回,不中断其他任务
    results = await asyncio.gather(
        risky_request("https://ok.example.com"),
        risky_request("https://error.example.com"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"任务失败: {r}")
        else:
            print(f"任务成功: {r}")

asyncio.run(main())

5.2 超时控制:asyncio.wait_for()

import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(10)   # 模拟慢速操作
    return "操作完成"

async def main() -> None:
    try:
        # 最多等待 3 秒,超时则抛出 asyncio.TimeoutError
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=3.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!已自动取消")

asyncio.run(main())

六、实战案例:高并发异步爬虫

理论讲完,来看一个真实的应用场景——用 asyncio + aiohttp 构建一个能并发抓取数百个网页的异步爬虫。

💡 前置安装pip install aiohttp

aiohttp 是专为 asyncio 设计的异步 HTTP 客户端,不能在异步程序中使用 requests(同步库)。

import asyncio
import time
from typing import Optional
import aiohttp

# ── 配置 ──────────────────────────────────────────
MAX_CONCURRENT = 10   # 最大并发数(防止被封 IP 或压垮服务器)

URLS: list[str] = [
    f"https://httpbin.org/delay/1?id={i}"
    for i in range(20)
]

# ── 单个 URL 抓取协程 ──────────────────────────────
async def fetch_url(
    session: aiohttp.ClientSession,
    url: str,
    semaphore: asyncio.Semaphore,
) -> Optional[str]:
    """
    带并发限制的 URL 抓取。
    semaphore 确保同时最多 MAX_CONCURRENT 个请求。
    """
    async with semaphore:   # 申请信号量(超出上限则等待)
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                if resp.status == 200:
                    content = await resp.text()
                    print(f"✅ 成功 [{resp.status}] {url[:50]}")
                    return content[:100]   # 只取前100字符示意
                else:
                    print(f"⚠️  异常状态 [{resp.status}] {url}")
                    return None
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            print(f"❌ 请求失败 {url}: {e}")
            return None

# ── 主入口 ─────────────────────────────────────────
async def main() -> None:
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    start = time.time()

    # aiohttp 推荐使用 Session 复用 TCP 连接,比每次新建连接快很多
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_url(session, url, semaphore)
            for url in URLS
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    # 统计结果
    success = sum(1 for r in results if r is not None and not isinstance(r, Exception))
    elapsed = time.time() - start

    print(f"\n📊 完成: {success}/{len(URLS)} 成功 | 耗时: {elapsed:.2f}s")
    print(f"🚀 同步方式预计耗时: ~{len(URLS)}s | 提速: {len(URLS)/elapsed:.1f}x")

asyncio.run(main())

这个爬虫的关键设计点:

  • asyncio.Semaphore:控制并发上限,防止同时发起过多请求被封禁
  • aiohttp.ClientSession:复用 TCP 连接,减少握手开销
  • async with:上下文管理器确保 Session 和连接被正确关闭
  • return_exceptions=True:单个请求失败不影响其他请求

七、技术雷区汇总

错误做法 正确做法 原因
在 async 函数里用 time.sleep() 用 await asyncio.sleep() time.sleep 会阻塞整个事件循环
在 async 程序里用 requests 用 aiohttp 或 httpx 同步库会阻塞事件循环,失去异步优势
逐个 await 协程 用 asyncio.gather() 逐个 await 是顺序执行,不是并发
用 asyncio.get_event_loop() 用 asyncio.run() 旧 API 在 Python 3.10+ 已废弃
在 asyncio 中做 CPU 密集运算 用 loop.run_in_executor() 或 multiprocessing CPU 密集型会阻塞事件循环

总结

回顾本文的核心知识点:

  • asyncio 适合 I/O 密集型场景,通过协程在等待期间切换执行其他任务
  • async def 定义协程,await 暂停并让出控制权,asyncio.run() 启动事件循环
  • 并发的关键:用 asyncio.gather() 或 create_task(),而不是逐个 await
  • 生产环境中记得用 Semaphore 控制并发上限、用 wait_for 设置超时
  • 配合 aiohttp 可以构建高性能异步爬虫,轻松实现 10x 以上的速度提升
你的需求 推荐方案
并发请求多个 URL asyncio.gather() + aiohttp
动态添加后台任务 asyncio.create_task()
防止并发数过高 asyncio.Semaphore
I/O 操作设置超时 asyncio.wait_for()
CPU 密集型 + 异步混用 loop.run_in_executor()

如果你有其他关于 asyncio 的使用心得,或者在实际项目中踩过什么坑, 欢迎在评论区留言交流!觉得本文有帮助的话,点个赞和收藏支持一下 🙌

参考资料

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐