【Python × 深度学习 × Agent 系列·第六篇·收官】性能与部署:cProfile 性能分析、推理优化、并发模型服务,让 Agent 又快又稳
·
【Python × 深度学习 × Agent 系列·第六篇·收官】性能与部署:cProfile 性能分析、推理优化、并发模型服务,让 Agent 又快又稳
作者:技术博主 | 更新时间:2026-05-14 | 阅读时长:约 25 分钟
系列:Python × 深度学习 × Agent 系列(共 6 篇)· 收官篇
标签:cProfile性能分析推理优化量化TorchScript并发服务动态批处理内存泄漏生产监控
🔥 本篇目标:代码能跑和代码能扛住生产流量之间,差着"性能工程"这整个领域。本篇从找瓶颈(cProfile + line_profiler + tracemalloc)开始,讲模型推理的完整优化链路(量化 → 编译 → 批处理 → 异步),然后给出一个能处理真实并发的模型服务架构——请求队列、动态批处理、多进程、监控告警。每一段代码都来自生产实践,不是玩具。

系列完整进度
| 篇次 | 主题 | 状态 |
|---|---|---|
| 第一篇 | Python 基础精要:对象模型、生成器、装饰器 | ✅ 已发布 |
| 第二篇 | NumPy:广播、einsum,手推 Self-Attention | ✅ 已发布 |
| 第三篇 | PyTorch 核心:Tensor、autograd、训练循环 | ✅ 已发布 |
| 第四篇 | Agent 异步编程:async/await、并发 LLM 调用 | ✅ 已发布 |
| 第五篇 | 类型系统:Pydantic V2、LLM 结构化输出 | ✅ 已发布 |
| 第六篇(本篇·收官) | 性能与部署:分析、优化、并发服务、监控 | — |
目录
- 一、性能分析:找到真正的瓶颈,不靠感觉
- 二、
cProfile全链路分析 - 三、
line_profiler:逐行找热点 - 四、内存分析:
tracemalloc与泄漏排查 - 五、模型推理优化:从量化到编译
- 六、动态批处理:GPU 利用率从 20% 到 80%
- 七、并发模型服务:请求队列 + 多进程
- 八、Agent 服务的生产部署架构
- 九、监控与告警:Prometheus + 关键指标
- 十、六篇系列收官:完整技术地图
一、性能分析:找到真正的瓶颈,不靠感觉
1.1 优化的第一原则:先测量,再优化
# 反例:凭感觉优化(浪费时间)
# "感觉 tokenization 很慢,先优化它"
# 结果:tokenization 只占 3%,真正的瓶颈是数据加载(占 70%)
# 正确做法:
# 1. 用 profiler 找出热点函数(占总时间 > 5%)
# 2. 只优化热点,忽略其他
# 3. 优化后重新测量,确认提升效果
# 4. 重复直到满足性能目标
# 经验数据:
# 通常 80% 的时间消耗在 20% 的代码上(帕累托原则)
# 优化非热点代码:即使提速 10 倍,对总体几乎没影响
1.2 快速性能估算:time + timeit
import time
import timeit
# 粗粒度:整体计时
t0 = time.perf_counter()
result = my_slow_function()
print(f"耗时:{time.perf_counter() - t0:.4f}s")
# 精细计时:多次运行取平均(消除偶发抖动)
elapsed = timeit.timeit(
stmt="tokenize(sample_text)",
setup="from __main__ import tokenize, sample_text",
number=1000,
)
print(f"平均耗时:{elapsed/1000*1000:.3f}ms/次")
# 作为装饰器使用(调试时临时加)
import functools
def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
print(f"[{func.__name__}] {(time.perf_counter()-t0)*1000:.2f}ms")
return result
return wrapper
@timed
def preprocess_batch(texts: list[str]) -> list[dict]:
...
二、cProfile 全链路分析
2.1 基本用法
import cProfile
import pstats
import io
from pstats import SortKey
def profile_agent_run(user_input: str):
"""对整个 Agent 运行做性能分析"""
profiler = cProfile.Profile()
profiler.enable()
# 被分析的代码
result = run_agent(user_input)
profiler.disable()
# 分析结果
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.strip_dirs() # 去掉文件路径前缀,更易读
stats.sort_stats(SortKey.CUMULATIVE) # 按累计时间排序(找慢函数用这个)
stats.print_stats(20) # 只打印前 20 条
print(stream.getvalue())
return result
# 命令行使用(不需要改代码)
# python -m cProfile -s cumulative my_script.py
# python -m cProfile -o output.prof my_script.py
# python -m pstats output.prof # 交互式查看
2.2 结果解读
输出格式:
ncalls tottime percall cumtime percall filename:lineno(function)
─────────────────────────────────────────────────────────────────────
1000 0.523 0.0005 8.234 0.0082 tokenizer.py:45(encode)
500 0.012 0.0000 6.891 0.0138 model.py:120(forward)
1000 5.234 0.0052 5.234 0.0052 {built-in method select}
50 0.001 0.0000 1.234 0.0247 data_loader.py:33(__iter__)
关键列说明:
ncalls:调用次数
tottime:函数自身耗时(不含调用的子函数)← 找 CPU 热点
cumtime:累计耗时(含子函数)← 找慢的调用链
诊断规律:
tottime 很高 → 这个函数本身是 CPU 瓶颈,优化算法
cumtime 很高 但 tottime 很低 → 时间在子函数里,继续往下看
{built-in method select} 高 → I/O 等待(网络/磁盘),考虑异步
tottime/ncalls(percall)高 → 单次调用慢,但调用次数少
ncalls 极高 → 调用频率问题,考虑缓存
2.3 可视化:SnakeViz
pip install snakeviz
python -m cProfile -o profile.prof my_agent.py
snakeviz profile.prof # 在浏览器打开交互式火焰图
2.4 针对 Agent 的分析实践
import cProfile
import pstats
from contextlib import contextmanager
@contextmanager
def profiling(output_file: str = None, top_n: int = 20, sort_by: str = "cumulative"):
"""
方便的 profiling 上下文管理器
用法:with profiling("agent.prof"):
run_agent(...)
"""
profiler = cProfile.Profile()
profiler.enable()
try:
yield profiler
finally:
profiler.disable()
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats(sort_by)
if output_file:
stats.dump_stats(output_file)
print(f"Profile 已保存到:{output_file}(用 snakeviz 查看)")
stats.print_stats(top_n)
# 使用
async def analyze_performance():
with profiling("agent_run.prof", top_n=15):
await agent.run("分析最新的 AI 市场趋势")
三、line_profiler:逐行找热点
cProfile 找到慢函数后,line_profiler 精确到行级别。
pip install line_profiler
# 方法1:装饰器(需要用 kernprof 运行)
from line_profiler import profile
@profile # 标记要分析的函数
def tokenize_batch(texts: list[str]) -> list[list[int]]:
results = []
for text in texts:
# 哪一行最慢?
cleaned = text.strip().lower() # 行1
tokens = cleaned.split() # 行2
token_ids = [vocab.get(t, 0) for t in tokens] # 行3
padded = token_ids[:max_len] + [0] * max(0, max_len - len(token_ids)) # 行4
results.append(padded)
return results
# 运行命令:
# kernprof -l -v my_script.py
# 输出示例:
# Line # Hits Time Per Hit % Time Line Contents
# ─────────────────────────────────────────────────────────
# 10 1000 512.3 0.5 1.2 cleaned = text.strip().lower()
# 11 1000 623.4 0.6 1.5 tokens = cleaned.split()
# 12 1000 38291.2 38.3 90.8 token_ids = [vocab.get(t, 0) ...] ← 热点!
# 13 1000 2134.1 2.1 5.1 padded = token_ids[:max_len] + ...
# → 行12 占 90%,vocab.get 在循环里太慢 → 改用批量 numpy 查找
# 方法2:不改代码,直接程序内使用
from line_profiler import LineProfiler
def measure_line_by_line(func, *args, **kwargs):
lp = LineProfiler()
lp.add_function(func)
lp_wrapper = lp(func)
result = lp_wrapper(*args, **kwargs)
lp.print_stats()
return result
result = measure_line_by_line(tokenize_batch, sample_texts)
四、内存分析:tracemalloc 与泄漏排查
4.1 使用 tracemalloc 追踪内存分配
import tracemalloc
import gc
def find_memory_leak():
"""
找内存泄漏的标准流程
"""
# 第一次快照(baseline)
tracemalloc.start()
gc.collect()
snapshot1 = tracemalloc.take_snapshot()
# 运行可能泄漏的代码(多次运行以放大差异)
for _ in range(100):
result = run_agent("测试查询")
# 正常情况:每次循环后内存应该回到基线
# 泄漏情况:内存持续增长
gc.collect()
snapshot2 = tracemalloc.take_snapshot()
# 对比两次快照
stats = snapshot2.compare_to(snapshot1, "lineno")
print("内存增量 Top 10:")
for stat in stats[:10]:
print(stat)
# 输出示例:
# my_module.py:45: size=12.5 MiB (+12.5 MiB), count=1024 (+1024)
# ↑ 这一行每次都在分配新内存却没有释放 → 泄漏!
tracemalloc.stop()
# 快速检查:运行前后内存变化
import psutil, os
def memory_usage_mb() -> float:
return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
before = memory_usage_mb()
for _ in range(1000):
run_agent("测试")
after = memory_usage_mb()
print(f"内存变化:{after - before:.1f} MB({'泄漏' if after - before > 50 else '正常'})")
4.2 DL 场景的常见内存问题
import torch
# ── 问题1:训练循环中积累计算图 ─────────────────────────────
# ❌ 错误:把 loss 对象本身加到 list(保留计算图!)
losses = []
for batch in dataloader:
loss = compute_loss(batch)
losses.append(loss) # loss 包含整个计算图!内存暴增
# ✅ 正确:只保留标量值
losses = []
for batch in dataloader:
loss = compute_loss(batch)
losses.append(loss.item()) # .item() 断开计算图,只取数值
# ── 问题2:验证阶段忘记 no_grad ──────────────────────────────
# ❌ 推理时不用 no_grad,中间激活值全都被保留用于梯度计算
def validate_wrong(model, loader):
for batch in loader:
output = model(batch) # 保存了整个前向计算图!显存暴涨
# ✅ 正确
def validate_correct(model, loader):
with torch.no_grad():
for batch in loader:
output = model(batch)
# ── 问题3:GPU 显存碎片化 ───────────────────────────────────
# 大 batch 推理后,PyTorch 缓存了大块显存,后续小 batch 反而 OOM
torch.cuda.empty_cache() # 清空缓存(谨慎使用,会影响性能)
# ── 问题4:循环引用导致对象无法被 GC ───────────────────────
import gc
gc.collect() # 强制 GC 收集循环引用(通常不需要,但特殊情况有用)
五、模型推理优化:从量化到编译
5.1 完整优化路线图
原始 FP32 模型(基线)
↓ torch.compile(10-50% 提升,零精度损失)
编译优化模型
↓ FP16/BF16 推理(2-3x 提升,精度几乎不变)
半精度模型
↓ INT8 量化(3-4x 提升,精度轻微下降)
量化模型
↓ TensorRT 导出(5-10x 提升,需要 NVIDIA GPU)
TensorRT 引擎
↓ 动态批处理(GPU 利用率从 30% → 80%+)
高吞吐服务
每一步都需要在精度和速度之间权衡,选择适合业务的停留点
5.2 torch.compile:一行优化
import torch
import time
model = load_pretrained_model()
model.eval()
model = model.to("cuda")
# 未编译:baseline
x = torch.randn(1, 512, device="cuda")
with torch.no_grad():
for _ in range(10): # 预热
model(x)
t0 = time.perf_counter()
with torch.no_grad():
for _ in range(100):
model(x)
baseline = (time.perf_counter() - t0) / 100 * 1000
print(f"未编译:{baseline:.2f}ms")
# 编译(第一次调用会触发编译,后续快)
compiled_model = torch.compile(
model,
mode="reduce-overhead", # 小 batch 推理首选
# "max-autotune":更深优化,适合固定形状大 batch
fullgraph=False, # True:整图编译(更快,但不支持 Python 控制流)
)
# 预热(触发编译)
print("编译中(约 30-120s)...")
with torch.no_grad():
for _ in range(5):
compiled_model(x)
# 测速
t0 = time.perf_counter()
with torch.no_grad():
for _ in range(100):
compiled_model(x)
compiled = (time.perf_counter() - t0) / 100 * 1000
print(f"编译后:{compiled:.2f}ms(提升 {baseline/compiled:.1f}x)")
5.3 INT8 量化:PyTorch 原生方案
import torch
from torch.quantization import (
quantize_dynamic,
prepare,
convert,
get_default_qconfig,
)
# ── 方案1:动态量化(最简单,适合 Linear 层)────────────────
model_dynamic_q = quantize_dynamic(
model,
qconfig_spec={torch.nn.Linear}, # 只量化 Linear 层
dtype=torch.qint8,
)
# 对比模型大小
import os
torch.save(model.state_dict(), "model_fp32.pt")
torch.save(model_dynamic_q.state_dict(), "model_int8.pt")
fp32_size = os.path.getsize("model_fp32.pt") / 1024 / 1024
int8_size = os.path.getsize("model_int8.pt") / 1024 / 1024
print(f"FP32: {fp32_size:.1f} MB → INT8: {int8_size:.1f} MB(压缩 {fp32_size/int8_size:.1f}x)")
# ── 方案2:静态量化(更精确,需要 calibration)──────────────
def static_quantize(model, calibration_loader):
"""
静态量化:用 calibration 数据确定量化参数(scale/zero_point)
精度优于动态量化,但需要代表性数据
"""
model.eval()
model.qconfig = get_default_qconfig("fbgemm") # CPU 用 fbgemm,GPU 用 qnnpack
# 插入 Observer(观察激活值分布)
model_prepared = prepare(model)
# Calibration(用小批量真实数据运行前向,Observer 记录统计信息)
with torch.no_grad():
for batch in calibration_loader:
model_prepared(batch)
# 将 Observer 替换为量化算子
model_quantized = convert(model_prepared)
return model_quantized
# ── 方案3:BitsAndBytes 4bit 量化(大模型首选)─────────────
# pip install bitsandbytes
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True, # 嵌套量化,进一步压缩
bnb_4bit_quant_type="nf4", # NF4(Normalized Float 4),精度更好
)
# 70B 模型从 140GB FP16 → 约 40GB 4bit,单卡可跑
model_4bit = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3-70B",
quantization_config=quantization_config,
device_map="auto", # 自动分配到多卡
)
5.4 KV Cache 优化(LLM 推理专属)
# KV Cache:避免重复计算历史 token 的 K/V
# 生成 1000 token 时:
# 无 KV Cache:每步计算所有历史 token 的 attention → O(n²) 复杂度
# 有 KV Cache:只计算新 token,缓存历史 K/V → O(n) 复杂度
# HuggingFace 默认开启,但要注意 cache 的内存开销
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
input_ids = tokenizer("Hello, I am", return_tensors="pt").input_ids
# 生成(自动使用 KV Cache)
with torch.no_grad():
output = model.generate(
input_ids,
max_new_tokens=100,
use_cache=True, # 默认 True,关闭会慢很多
do_sample=False,
)
# Flash Attention 2:更高效的注意力计算(需要 GPU)
# pip install flash-attn
model_fa2 = AutoModelForCausalLM.from_pretrained(
"gpt2",
attn_implementation="flash_attention_2", # 替换注意力实现
torch_dtype=torch.float16,
)
# Flash Attention 2:内存 O(n) vs 标准注意力 O(n²),速度快 2-4x
六、动态批处理:GPU 利用率从 20% 到 80%
6.1 为什么 GPU 利用率低
单请求模式(常见错误):
请求1: 发送 → [等GPU] → 收结果(GPU利用率5%)
请求2: 发送 → [等GPU] → 收结果
请求3: 发送 → [等GPU] → 收结果
GPU 大部分时间在等请求,实际利用率 10-30%
批处理模式(正确做法):
积累多个请求 → 合并成大 batch → 一次 GPU 运算
请求1,2,3,4,5 → [batch size=5, GPU利用率80%+] → 返回结果
GPU 吞吐量提升 5-10x,延迟略有增加(等待凑批的时间)
6.2 异步动态批处理器
import asyncio
import time
import torch
from dataclasses import dataclass, field
from typing import Any
import uuid
@dataclass
class PendingRequest:
"""等待处理的单个请求"""
request_id: str
inputs: dict[str, Any]
future: asyncio.Future # 结果会被设置到这里
arrived_at: float = field(default_factory=time.perf_counter)
class DynamicBatcher:
"""
动态批处理器:积累请求直到 max_batch_size 或 max_wait_ms 超时
原理:
- 请求到来时放入队列,返回 Future
- 后台 worker 每隔 max_wait_ms 或积满 max_batch_size 就处理一批
- 处理完成后设置每个请求的 Future
"""
def __init__(
self,
model,
max_batch_size: int = 32,
max_wait_ms: float = 20.0, # 最多等 20ms 凑批
device: str = "cuda",
):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms / 1000 # 转秒
self.device = device
self._queue: list[PendingRequest] = []
self._lock = asyncio.Lock()
self._event = asyncio.Event() # 有新请求时通知 worker
self._running = False
async def start(self):
"""启动后台 batch worker"""
self._running = True
asyncio.create_task(self._batch_worker())
print("DynamicBatcher 已启动")
async def stop(self):
self._running = False
async def infer(self, inputs: dict[str, Any]) -> Any:
"""
提交单个推理请求,返回结果(await 直到处理完成)
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
request = PendingRequest(
request_id=str(uuid.uuid4()),
inputs=inputs,
future=future,
)
async with self._lock:
self._queue.append(request)
self._event.set() # 通知 worker 有新请求
return await future # 等待 worker 设置结果
async def _batch_worker(self):
"""
后台 worker:持续监听队列,凑批处理
"""
while self._running:
# 等待第一个请求
await self._event.wait()
self._event.clear()
# 等待最多 max_wait_ms,或者 batch 满了立即处理
deadline = time.perf_counter() + self.max_wait_ms
while time.perf_counter() < deadline:
async with self._lock:
if len(self._queue) >= self.max_batch_size:
break
await asyncio.sleep(0.001) # 每 1ms 检查一次
# 取出当前队列的所有请求(不超过 max_batch_size)
async with self._lock:
batch = self._queue[:self.max_batch_size]
self._queue = self._queue[self.max_batch_size:]
if not batch:
continue
# 合并请求,组成大 batch
try:
results = await self._process_batch(batch)
# 把结果分发给每个请求的 Future
for req, res in zip(batch, results):
if not req.future.done():
req.future.set_result(res)
except Exception as e:
# 整批失败:通知所有请求
for req in batch:
if not req.future.done():
req.future.set_exception(e)
async def _process_batch(self, batch: list[PendingRequest]) -> list[Any]:
"""
实际的批处理推理(在线程池中运行,避免阻塞事件循环)
"""
loop = asyncio.get_event_loop()
def _run_inference():
# 合并输入为 batch tensor
input_ids = torch.stack([
torch.tensor(req.inputs["input_ids"], device=self.device)
for req in batch
])
attn_mask = torch.stack([
torch.tensor(req.inputs["attention_mask"], device=self.device)
for req in batch
])
# GPU 推理
with torch.inference_mode():
logits = self.model(input_ids=input_ids, attention_mask=attn_mask)
# 拆分结果
return [logits[i].cpu().tolist() for i in range(len(batch))]
# 在线程池中运行(不阻塞事件循环)
return await loop.run_in_executor(None, _run_inference)
# ── 在 FastAPI 中使用 ──────────────────────────────────────────
from fastapi import FastAPI
app = FastAPI()
batcher = DynamicBatcher(model=my_model, max_batch_size=32, max_wait_ms=20)
@app.on_event("startup")
async def startup():
await batcher.start()
@app.post("/predict")
async def predict(request: PredictRequest):
t0 = time.perf_counter()
result = await batcher.infer({
"input_ids": request.token_ids,
"attention_mask": request.attention_mask,
})
return {
"prediction": result,
"latency_ms": (time.perf_counter() - t0) * 1000,
}
七、并发模型服务:请求队列 + 多进程
7.1 为什么需要多进程
单进程 + 单 GPU 的问题:
① Python GIL:即使多线程,CPU 预处理是串行的
② 单 GPU 未充分利用:显存还有 50% 空余,算力用了 30%
③ 单进程挂了 → 服务整体挂掉(无冗余)
多进程 + 多 GPU 的解法:
多个 Worker 进程,每个绑定一个 GPU
通过 Nginx 做请求负载均衡
每个 Worker 独立运行,挂一个不影响其他
gunicorn + uvicorn + FastAPI 的标准部署方案:
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
# 4 个 worker 进程,每个运行 uvicorn 事件循环
7.2 多 GPU Worker 的实现
import torch
import torch.multiprocessing as mp
from multiprocessing import Queue
import os
def worker_process(
rank: int,
num_gpus: int,
request_queue: Queue,
result_queue: Queue,
model_path: str,
):
"""
每个 GPU 对应一个 Worker 进程
从 request_queue 取请求,处理后放入 result_queue
"""
# 绑定 GPU
device = torch.device(f"cuda:{rank % num_gpus}")
torch.cuda.set_device(device)
# 加载模型(每个进程独立加载,内存不共享)
model = load_model(model_path).to(device)
model.eval()
print(f"Worker {rank} 在 GPU {device} 上就绪")
while True:
try:
# 从队列取请求(阻塞等待)
request_id, inputs = request_queue.get(timeout=1.0)
except Exception:
continue # 超时,继续等待
if request_id == "STOP":
break # 收到停止信号
# 推理
try:
with torch.inference_mode():
input_tensor = {
k: torch.tensor(v, device=device)
for k, v in inputs.items()
}
output = model(**input_tensor)
result = output.logits.cpu().tolist()
result_queue.put((request_id, result, None)) # (id, result, error)
except Exception as e:
result_queue.put((request_id, None, str(e)))
class MultiGPUServer:
"""多 GPU 推理服务器"""
def __init__(self, model_path: str, num_workers: int = 4):
self.num_workers = num_workers
self.request_queue = mp.Queue(maxsize=1000) # 最大积压 1000 个请求
self.result_queue = mp.Queue()
self._pending = {} # request_id → asyncio.Future
# 启动 Worker 进程
self.workers = []
num_gpus = torch.cuda.device_count()
for rank in range(num_workers):
p = mp.Process(
target=worker_process,
args=(rank, num_gpus, self.request_queue,
self.result_queue, model_path),
daemon=True,
)
p.start()
self.workers.append(p)
# 启动结果收集协程
self._result_collector_task = None
async def start_result_collector(self):
"""异步收集 Worker 的结果,设置对应的 Future"""
loop = asyncio.get_event_loop()
while True:
# 非阻塞地检查结果队列
try:
request_id, result, error = await loop.run_in_executor(
None, lambda: self.result_queue.get(timeout=0.01)
)
future = self._pending.pop(request_id, None)
if future and not future.done():
if error:
future.set_exception(RuntimeError(error))
else:
future.set_result(result)
except Exception:
await asyncio.sleep(0.001)
async def infer(self, request_id: str, inputs: dict) -> list:
"""提交请求,等待结果"""
loop = asyncio.get_event_loop()
future = loop.create_future()
self._pending[request_id] = future
# 放入请求队列
await loop.run_in_executor(
None,
lambda: self.request_queue.put((request_id, inputs))
)
return await asyncio.wait_for(future, timeout=60.0)
def shutdown(self):
for _ in self.workers:
self.request_queue.put(("STOP", None))
for p in self.workers:
p.join(timeout=5.0)
八、Agent 服务的生产部署架构
8.1 完整架构图
用户
│ HTTP/WebSocket
┌───▼────┐
│ Nginx │ 负载均衡 + SSL 终止 + 限流
└───┬────┘
┌────────────┼────────────┐
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│Worker1│ │Worker2│ │Worker3│ FastAPI + uvicorn
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
┌───▼────────────▼────────────▼───┐
│ Redis(消息队列) │ 长时任务的异步处理
└─────────────────────────────────┘
│
┌───▼───┐
│Worker │ 后台 Agent 任务执行器
│Process│ (长时运行,不阻塞 HTTP)
└───┬───┘
│
┌─────────┼──────────┐
│ │ │
┌──▼──┐ ┌──▼──┐ ┌───▼──┐
│LLM │ │Tools│ │向量DB │ 外部服务
│API │ │API │ │ │
└─────┘ └─────┘ └──────┘
8.2 长时 Agent 任务的异步处理
# 对于耗时 > 10s 的 Agent 任务,不能让 HTTP 连接等着
# 标准做法:立即返回 task_id,客户端轮询或用 WebSocket 推送
import asyncio
import uuid
from enum import Enum
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class AgentTask(BaseModel):
task_id: str
status: TaskStatus
result: str | None = None
error: str | None = None
created_at: float
updated_at: float
# 任务存储(生产中用 Redis)
task_store: dict[str, AgentTask] = {}
app = FastAPI()
@app.post("/agent/submit")
async def submit_task(
request: AgentRequest,
background_tasks: BackgroundTasks,
):
"""
提交 Agent 任务,立即返回 task_id
"""
task_id = str(uuid.uuid4())
now = time.time()
task = AgentTask(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=now,
updated_at=now,
)
task_store[task_id] = task
# 在后台执行(不阻塞当前请求)
background_tasks.add_task(
execute_agent_task,
task_id=task_id,
user_input=request.message,
)
return {"task_id": task_id, "status": "pending"}
async def execute_agent_task(task_id: str, user_input: str):
"""在后台执行 Agent 任务,更新 task_store"""
task = task_store[task_id]
task.status = TaskStatus.RUNNING
task.updated_at = time.time()
try:
result = await agent_executor.run(user_input)
task.status = TaskStatus.COMPLETED
task.result = result
task.updated_at = time.time()
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
task.updated_at = time.time()
@app.get("/agent/status/{task_id}")
async def get_task_status(task_id: str):
"""轮询任务状态"""
task = task_store.get(task_id)
if not task:
raise HTTPException(404, f"任务 {task_id} 不存在")
return task
@app.websocket("/agent/ws/{task_id}")
async def task_websocket(websocket, task_id: str):
"""WebSocket:实时推送任务进度(流式 Agent 输出)"""
await websocket.accept()
try:
while True:
task = task_store.get(task_id)
if not task:
await websocket.send_json({"error": "任务不存在"})
break
await websocket.send_json(task.model_dump())
if task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED):
break
await asyncio.sleep(0.5) # 每 500ms 推送一次状态
finally:
await websocket.close()
九、监控与告警:Prometheus + 关键指标
9.1 关键指标定义
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# ── 计数器(只增不减)───────────────────────────────────────
REQUESTS_TOTAL = Counter(
"agent_requests_total",
"Agent 请求总数",
labelnames=["status", "model"], # 按状态和模型分类
)
LLM_CALLS_TOTAL = Counter(
"llm_api_calls_total",
"LLM API 调用总数",
labelnames=["provider", "model", "status"],
)
TOOL_CALLS_TOTAL = Counter(
"tool_calls_total",
"工具调用总数",
labelnames=["tool_name", "status"],
)
# ── 直方图(延迟分布)────────────────────────────────────────
REQUEST_LATENCY = Histogram(
"agent_request_duration_seconds",
"Agent 请求端到端延迟",
labelnames=["model"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, float("inf")],
)
LLM_LATENCY = Histogram(
"llm_api_duration_seconds",
"LLM API 单次调用延迟",
labelnames=["model"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, float("inf")],
)
TOKENS_PER_REQUEST = Histogram(
"agent_tokens_per_request",
"每次 Agent 请求消耗的 Token 数",
labelnames=["model"],
buckets=[100, 500, 1000, 2000, 5000, 10000, 50000],
)
# ── 仪表盘(当前状态)────────────────────────────────────────
ACTIVE_REQUESTS = Gauge(
"agent_active_requests",
"当前正在处理的 Agent 请求数",
)
GPU_MEMORY_USED = Gauge(
"gpu_memory_used_bytes",
"GPU 显存占用",
labelnames=["device"],
)
QUEUE_SIZE = Gauge(
"request_queue_size",
"等待处理的请求队列长度",
)
9.2 装饰器自动采集指标
import functools
import time
from contextlib import contextmanager
import torch
@contextmanager
def track_request(model: str = "claude-sonnet"):
"""追踪单次 Agent 请求的完整指标"""
ACTIVE_REQUESTS.inc()
start = time.perf_counter()
try:
yield
REQUESTS_TOTAL.labels(status="success", model=model).inc()
except Exception:
REQUESTS_TOTAL.labels(status="error", model=model).inc()
raise
finally:
elapsed = time.perf_counter() - start
REQUEST_LATENCY.labels(model=model).observe(elapsed)
ACTIVE_REQUESTS.dec()
def track_llm_call(model: str):
"""追踪 LLM API 调用的装饰器"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
LLM_CALLS_TOTAL.labels(
provider="anthropic", model=model, status="success"
).inc()
return result
except Exception as e:
LLM_CALLS_TOTAL.labels(
provider="anthropic", model=model, status="error"
).inc()
raise
finally:
LLM_LATENCY.labels(model=model).observe(
time.perf_counter() - start
)
return wrapper
return decorator
def update_gpu_metrics():
"""定期更新 GPU 显存指标"""
if not torch.cuda.is_available():
return
for i in range(torch.cuda.device_count()):
used = torch.cuda.memory_allocated(i)
GPU_MEMORY_USED.labels(device=f"cuda:{i}").set(used)
# ── FastAPI 集成 ──────────────────────────────────────────────
from fastapi import FastAPI, Request
from prometheus_client import make_asgi_app
app = FastAPI()
# 挂载 Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
"""对所有 HTTP 请求自动采集延迟指标"""
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
# 只追踪 /agent/ 路径
if request.url.path.startswith("/agent/"):
REQUEST_LATENCY.labels(model="all").observe(elapsed)
return response
@app.post("/agent/run")
async def run_agent_endpoint(request: AgentRequest):
async with track_request(model=request.model):
result = await agent.run(request.message)
return {"result": result}
# 启动时开启 Prometheus 服务(可选,或用 FastAPI 挂载)
# start_http_server(8001) # 在 8001 端口暴露 /metrics
9.3 告警规则(Prometheus AlertManager)
# alerts.yml(Prometheus 告警规则)
groups:
- name: agent_service_alerts
rules:
# 错误率过高
- alert: HighErrorRate
expr: |
rate(agent_requests_total{status="error"}[5m])
/
rate(agent_requests_total[5m]) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "Agent 错误率超过 5%(当前:{{ $value | humanizePercentage }})"
# P99 延迟过高
- alert: HighP99Latency
expr: |
histogram_quantile(0.99, rate(agent_request_duration_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Agent P99 延迟超过 30s(当前:{{ $value | humanizeDuration }})"
# 请求队列积压
- alert: QueueBacklog
expr: request_queue_size > 100
for: 1m
labels:
severity: warning
annotations:
summary: "请求队列积压 {{ $value }} 个,可能需要扩容"
# GPU 显存告警
- alert: GPUMemoryHigh
expr: |
gpu_memory_used_bytes / (16 * 1024 * 1024 * 1024) > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "GPU 显存使用率超过 90%,有 OOM 风险"
# LLM API 调用失败率
- alert: LLMAPIFailureRate
expr: |
rate(llm_api_calls_total{status="error"}[5m])
/
rate(llm_api_calls_total[5m]) > 0.1
for: 3m
labels:
severity: warning
annotations:
summary: "LLM API 失败率 {{ $value | humanizePercentage }},检查 API key 和网络"
十、六篇系列收官:完整技术地图
历经六篇,从 Python 基础到生产部署,完整覆盖了 AI 工程师的技术栈:
第一篇:Python 基础精要
对象引用模型 → 可变性陷阱 → 生成器(DataLoader 底层)
→ 装饰器(Agent 工具注册)→ 上下文管理器(no_grad 原理)
第二篇:NumPy 矩阵运算
内存布局 → 广播规则 → 向量化(ReLU/Softmax)
→ einsum → 手推 Self-Attention 完整前向传播
第三篇:PyTorch 核心
Tensor Storage → autograd 计算图 → 自定义 Function
→ Dataset/DataLoader → 生产训练循环(AMP+梯度裁剪+checkpoint)
第四篇:Agent 异步编程
事件循环原理 → 协程挂起恢复 → gather/as_completed
→ 并发 LLM 调用 → 工具并行执行 → SSE 流式输出 → FastAPI
第五篇:类型系统与工程化
Pydantic V2 → field/model_validator → JSON Schema 自动生成
→ LLM 结构化输出 → 自动重试 → Protocol/TypedDict
第六篇:性能与部署(本篇)
cProfile/line_profiler → tracemalloc 内存泄漏
→ 量化/compile/Flash Attention → 动态批处理
→ 多进程服务 → Prometheus 监控告警
六篇串起来的核心认知:
# 从第一篇的"变量是引用"到第六篇的"GPU利用率",
# 贯穿始终的是同一个工程原则:
# 理解你的工具在内存里、在计算图里、在网络里到底在做什么。
# 不理解就无法调优,不调优就无法生产,不生产就是玩具。
# 每篇的"深度学习视角"不是装饰——
# 它是这些基础知识在你每天的工作里真正出现的地方。
💬 六篇看完了,哪篇对你帮助最大?有什么你觉得应该补充的主题? 欢迎评论区聊!
🙏 六篇系列完结撒花!如果整个系列帮到你,最后一次三连(点赞👍 + 收藏⭐ + 关注)!感谢一路相伴!
本文为原创技术分享。转载请注明出处。最后更新:2026-05-14
Python × 深度学习 × Agent 系列(六篇)完结 🎉
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)