本文适合谁:有 Java 多线程或 CompletableFuture 经验、想理解 Python asyncio 的工程师。读完本篇,你能用 async/await 并发调用 LLM API,理解为什么不能在 async 函数里用同步库。

asyncio 是 Python 的异步编程框架,用于处理 IO 密集型(以等待网络、文件读写为主,而非大量计算的)并发任务。async def 定义协程函数(协程:一种可以在执行到一半时主动暂停、让出控制权的函数),await 等待协程执行完成,缺少 await 时函数调用只会返回协程对象,不会执行任何代码。这是 asyncio 给初学者最常见的陷阱。

asyncio 的执行模型和 Java 的多线程完全不同,理解这个差异是掌握 asyncio 的关键。

1.1 为什么 AI 开发必须懂 asyncio

在这里插入图片描述

同步阻塞与异步并发的执行时序对比

调用 GPT-4 的 API,一次请求平均需要 2-10 秒。如果要同时处理 100 个用户的请求,用同步的方式,这 100 个请求得排队,第 100 个用户等待时间接近 1000 秒。

Java 的传统解法是多线程:开一个线程池,每个请求一个线程,并发处理。这个方案可行,但有代价:每个线程占用约 1MB 内存,1000 个并发请求就是 1GB 内存,还不算线程切换的开销。

asyncio 的解法不同。它用一个线程跑一个事件循环(event loop,一个不断轮询"哪个任务准备好了"的调度器),当某个任务在等待 IO(比如等 LLM API 返回)的时候,事件循环切换去执行别的任务,不阻塞不浪费。等 IO 完成了,事件循环再回来继续处理。

1000 个并发 LLM 请求,asyncio 用一个线程就能搞定,内存占用极小。

这就是为什么 FastAPI、LangChain、大多数 LLM SDK 都默认支持或推荐 async。

1.2 event loop:和线程池的本质区别

Java 开发者理解异步通常是这个心智模型:把任务扔进线程池,线程池里的线程并行执行,完成后回调。

asyncio 的心智模型不同:

  • 只有一个线程在跑
  • 这个线程里有一个 event loop(事件循环)
  • event loop 维护一个任务队列
  • 当某个任务执行到 await 关键字,它主动"让出"控制权
  • event loop 切换到下一个任务
  • 当等待的 IO 完成,event loop 把原任务重新加入队列

关键点是主动让出。asyncio 是协作式多任务,不是抢占式。任务不说"我等一下",event loop 就不会切换。这也是为什么 asyncio 里如果调用了阻塞的同步操作(比如 time.sleep()),整个程序都会卡住——那个任务没有主动让出控制权。

import asyncio
import time

async def bad_task():
    time.sleep(2)  # ❌ 同步阻塞!会冻结整个event loop,其他协程无法运行
    return "done"

async def good_task():
    await asyncio.sleep(2)  # 对,这会让出控制权,其他任务可以运行
    return "done"

Java工程师必读:asyncio 不等于线程池

这是Java工程师最容易踩的坑。

Java的ThreadPoolExecutor:多个线程真正并行执行,可以同时利用多个CPU核心。

Python的asyncio单线程,event loop在协程之间切换,同一时刻只有一个协程在运行。

那asyncio的价值在哪里?IO密集型任务

当一个协程在等待LLM API响应(网络IO)时,event loop切换到其他协程继续工作。等待期间CPU不浪费。这就是为什么asyncio能让一个线程"同时"处理1000个LLM请求——本质上是IO等待时间的复用,不是并行计算。

结论

  • LLM API调用(IO密集)→ asyncio 完全够用,效率极高
  • 矩阵计算、图像处理(CPU密集)→ asyncio 无效,需要 multiprocessing

常见错误:

# 错误:忘记 await,返回的是协程对象,不会执行
result = client.chat.completions.create(...)  # 如果这是async函数,这行什么都不做

# 正确:加上 await
result = await client.chat.completions.create(...)

1.3 async/await 基本写法

定义一个异步函数,加 async def

async def fetch_data(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
    return response.json()

调用异步函数,必须 await,而且调用方也必须是 async def

async def main():
    data = await fetch_data("https://api.example.com/data")
    print(data)

在最顶层启动异步程序:

asyncio.run(main())

asyncio.run() 是同步代码进入异步世界的入口,它创建一个新的 event loop,运行传入的协程,完成后关闭 loop。

1.4 并发调用多个 LLM API:asyncio.gather()

AI 开发里常见的场景:同时用 GPT-4 做三件事——摘要、分类、情感分析,三个独立的 API 调用,没有依赖关系,没必要等第一个完成再发第二个请求。

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def summarize(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "你是一个摘要助手,用两句话概括输入文本。"},
            {"role": "user", "content": text}
        ]
    )
    return response.choices[0].message.content

async def classify(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "对文本分类,只返回:正面/负面/中性"},
            {"role": "user", "content": text}
        ]
    )
    return response.choices[0].message.content

async def analyze_sentiment(text: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "分析情感强度,返回0-10的数字。"},
            {"role": "user", "content": text}
        ]
    )
    return response.choices[0].message.content

async def analyze_text(text: str) -> dict:
    # 三个任务并发执行,总时间 ≈ 最慢的那个
    summary, category, sentiment = await asyncio.gather(
        summarize(text),
        classify(text),
        analyze_sentiment(text)
    )
    return {
        "summary": summary,
        "category": category,
        "sentiment": sentiment
    }

# 运行
result = asyncio.run(analyze_text("今天天气真好,适合出去走走"))
print(result)

如果串行调用三个 API,每个 3 秒,总共 9 秒。用 asyncio.gather() 并发,总时间约等于最慢的那个,大概 3 秒。

这就是 asyncio 在 AI 开发里的核心价值:IO 密集型任务的并发,无需多线程,无需进程池。

1.5 批量处理:控制并发数量

asyncio.gather() 是全量并发,100 个任务会同时发出 100 个请求,这会触发 API 的 rate limit(速率限制,API 服务商为防止滥用而设置的每分钟最大请求次数限制)。需要控制并发数量时,用 asyncio.Semaphore(信号量,一种限制同时运行任务数量的工具):

async def process_batch(texts: list[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_one(text: str) -> dict:
        async with semaphore:  # 同时最多10个任务持有这个信号量
            return await analyze_text(text)

    tasks = [process_one(text) for text in texts]
    return await asyncio.gather(*tasks)

Semaphore 就是一个计数器,同时最多 max_concurrent 个任务可以进入临界区。第 11 个任务会等待,直到有人释放信号量。

这个模式在 AI 批量处理任务里极其常用,比 Java 里配置线程池的方式简洁多了。

1.6 常见坑一:在同步函数里调用异步函数

这是最高频的错误。异步函数在普通同步函数里被调用,只会创建协程对象,不会执行:

async def get_answer(question: str) -> str:
    # 调用LLM...
    return "answer"

def handle_question(question: str):
    # 错误做法!这只是创建了协程对象,没有执行
    result = get_answer(question)
    print(result)  # 打印的是 <coroutine object get_answer at 0x...>

    # 正确做法
    result = asyncio.run(get_answer(question))
    print(result)

但如果调用方本身已经在 event loop 里(比如在另一个 async 函数里),不能用 asyncio.run(),必须用 await

async def main():
    # 在async函数里,用await
    result = await get_answer("什么是asyncio?")
    print(result)

判断依据很简单:当前函数是 async def 吗?是就用 await,否就用 asyncio.run()

1.7 常见坑二:混用同步库和异步代码

requests 库是同步的,在 asyncio 里用它会阻塞 event loop。正确做法是用 httpxaiohttp(另一个支持 async/await 的异步 HTTP 客户端库),它们提供了异步接口:

# 错误:在async函数里用同步requests
async def bad_fetch(url: str):
    import requests
    return requests.get(url).json()  # 阻塞!

# 正确:用httpx的异步接口
async def good_fetch(url: str):
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
    return response.json()

数据库也一样,同步的 SQLAlchemy 在 asyncio 里要换成 asyncpg 或者 SQLAlchemy 的 async 模式。这是从同步迁移到异步最麻烦的地方,好在 LLM 开发通常不涉及复杂的数据库操作。

1.8 FastAPI 原生支持 async,这很重要

FastAPI 是基于 Starlette(一个轻量级的 Python Web 框架)构建的,底层是 ASGI(异步服务器网关接口,支持异步处理 HTTP 请求的 Python Web 服务器标准),天然支持 async。路由处理函数可以直接写成 async:

from fastapi import FastAPI
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.post("/chat")
async def chat(message: str) -> dict:
    # 这里await不会阻塞其他请求
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}]
    )
    return {"reply": response.choices[0].message.content}

如果用 Flask 或 Django(同步框架),每个请求会占用一个线程,1000 并发就需要 1000 个线程。用 FastAPI 的 async,一个进程可以处理数千并发,内存消耗小得多。

这是 AI 应用后端普遍选择 FastAPI 的核心原因:async 支持让它在 IO 密集场景下性能远胜同步框架。

1.9 什么时候用 async,什么时候不用

不是所有代码都需要 async。判断原则如下:

适合用 async 的场景:

  • 调用 LLM API(必须,IO 等待时间长)
  • 调用外部 HTTP 接口
  • 读写文件(大文件)
  • 数据库查询(高并发场景)
  • 需要同时发多个请求

不适合用 async 的场景:

  • CPU 密集型计算(asyncio 解决不了 CPU 瓶颈,要用多进程)
  • 简单的脚本,一次性跑完就行
  • 逻辑很简单,并发不是瓶颈

还有一个原则:async 有传染性。一个函数用了 await,它自己必须是 async def,调用它的函数如果也要 await,也必须是 async def。这条链一直传递到顶层,顶层用 asyncio.run() 收尾。

在一个项目里,要么整体拥抱 async(FastAPI 这种路子),要么老老实实写同步代码。中间状态最难维护——一半 async 一半 sync,到处是 asyncio.run() 补丁。

对于 AI 应用开发,推荐从一开始就以 FastAPI 为框架,把 LLM 调用全部写成 async,整个项目的异步贯通,远比后期迁移省事。

1.10 与 Java CompletableFuture 的全面对比

Java 开发者常用 CompletableFuture 处理异步操作,Python asyncio 是等价概念,但实现方式不同:

特性 Java CompletableFuture Python asyncio
并发模型 线程池(真并行,多核利用) 事件循环(单线程,IO 复用)
并发启动 CompletableFuture.allOf(f1, f2, f3) asyncio.gather(coro1, coro2, coro3)
顺序执行 .thenApply(fn) result = await coroutine
错误处理 .exceptionally(fn) try/except
超时 .orTimeout(5, TimeUnit.SECONDS) asyncio.wait_for(coro, timeout=5.0)
并发限制 配置线程池大小 asyncio.Semaphore(n)
适合场景 IO + CPU 密集都行 IO 密集(LLM API)

代码对比——并发发三个 LLM 请求:

// Java CompletableFuture
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> callLLM("问题1"));
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> callLLM("问题2"));
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> callLLM("问题3"));

CompletableFuture.allOf(f1, f2, f3).join();
String r1 = f1.get(), r2 = f2.get(), r3 = f3.get();
# Python asyncio
r1, r2, r3 = await asyncio.gather(
    call_llm_async("问题1"),
    call_llm_async("问题2"),
    call_llm_async("问题3"),
)
# 代码更简洁,逻辑更清晰

关键区别:Java CompletableFuture 在多个 CPU 核心上真正并行运行;Python asyncio 是单线程,在 IO 等待期间切换协程。对于 LLM API 调用(大部分时间在等网络),两种方式效果相当,但 asyncio 消耗内存更少。

1.11 小结

场景 推荐方案 原因
单次 LLM 调用 await client.chat.completions.create(...) 不阻塞
多个独立 LLM 调用 asyncio.gather(coro1, coro2, ...) 并发执行
批量处理(防限流) asyncio.Semaphore(n) + asyncio.gather 控制并发数
同步代码调用 async asyncio.run(main()) 同步入口
async 函数调用 async await coro 必须用 await
需要超时控制 asyncio.wait_for(coro, timeout=30.0) 防止长时间挂起
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐