【Python × 深度学习 × Agent 系列·第六篇·收官】性能与部署:cProfile 性能分析、推理优化、并发模型服务,让 Agent 又快又稳

作者:技术博主 | 更新时间:2026-05-14 | 阅读时长:约 25 分钟
系列:Python × 深度学习 × Agent 系列(共 6 篇)· 收官篇
标签cProfile 性能分析 推理优化 量化 TorchScript 并发服务 动态批处理 内存泄漏 生产监控


🔥 本篇目标:代码能跑和代码能扛住生产流量之间,差着"性能工程"这整个领域。本篇从找瓶颈(cProfile + line_profiler + tracemalloc)开始,讲模型推理的完整优化链路(量化 → 编译 → 批处理 → 异步),然后给出一个能处理真实并发的模型服务架构——请求队列、动态批处理、多进程、监控告警。每一段代码都来自生产实践,不是玩具。


在这里插入图片描述

系列完整进度

篇次 主题 状态
第一篇 Python 基础精要:对象模型、生成器、装饰器 ✅ 已发布
第二篇 NumPy:广播、einsum,手推 Self-Attention ✅ 已发布
第三篇 PyTorch 核心:Tensor、autograd、训练循环 ✅ 已发布
第四篇 Agent 异步编程:async/await、并发 LLM 调用 ✅ 已发布
第五篇 类型系统:Pydantic V2、LLM 结构化输出 ✅ 已发布
第六篇(本篇·收官) 性能与部署:分析、优化、并发服务、监控

目录


一、性能分析:找到真正的瓶颈,不靠感觉

1.1 优化的第一原则:先测量,再优化

# 反例:凭感觉优化(浪费时间)
# "感觉 tokenization 很慢,先优化它"
# 结果:tokenization 只占 3%,真正的瓶颈是数据加载(占 70%)

# 正确做法:
# 1. 用 profiler 找出热点函数(占总时间 > 5%)
# 2. 只优化热点,忽略其他
# 3. 优化后重新测量,确认提升效果
# 4. 重复直到满足性能目标

# 经验数据:
# 通常 80% 的时间消耗在 20% 的代码上(帕累托原则)
# 优化非热点代码:即使提速 10 倍,对总体几乎没影响

1.2 快速性能估算:time + timeit

import time
import timeit

# 粗粒度:整体计时
t0 = time.perf_counter()
result = my_slow_function()
print(f"耗时:{time.perf_counter() - t0:.4f}s")

# 精细计时:多次运行取平均(消除偶发抖动)
elapsed = timeit.timeit(
    stmt="tokenize(sample_text)",
    setup="from __main__ import tokenize, sample_text",
    number=1000,
)
print(f"平均耗时:{elapsed/1000*1000:.3f}ms/次")

# 作为装饰器使用(调试时临时加)
import functools

def timed(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        print(f"[{func.__name__}] {(time.perf_counter()-t0)*1000:.2f}ms")
        return result
    return wrapper

@timed
def preprocess_batch(texts: list[str]) -> list[dict]:
    ...

二、cProfile 全链路分析

2.1 基本用法

import cProfile
import pstats
import io
from pstats import SortKey

def profile_agent_run(user_input: str):
    """对整个 Agent 运行做性能分析"""
    profiler = cProfile.Profile()
    profiler.enable()

    # 被分析的代码
    result = run_agent(user_input)

    profiler.disable()

    # 分析结果
    stream = io.StringIO()
    stats  = pstats.Stats(profiler, stream=stream)
    stats.strip_dirs()                    # 去掉文件路径前缀,更易读
    stats.sort_stats(SortKey.CUMULATIVE)  # 按累计时间排序(找慢函数用这个)
    stats.print_stats(20)                 # 只打印前 20 条

    print(stream.getvalue())
    return result


# 命令行使用(不需要改代码)
# python -m cProfile -s cumulative my_script.py
# python -m cProfile -o output.prof my_script.py
# python -m pstats output.prof   # 交互式查看

2.2 结果解读

输出格式:
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
─────────────────────────────────────────────────────────────────────
    1000   0.523   0.0005   8.234   0.0082 tokenizer.py:45(encode)
     500   0.012   0.0000   6.891   0.0138 model.py:120(forward)
    1000   5.234   0.0052   5.234   0.0052 {built-in method select}
      50   0.001   0.0000   1.234   0.0247 data_loader.py:33(__iter__)

关键列说明:
  ncalls:调用次数
  tottime:函数自身耗时(不含调用的子函数)← 找 CPU 热点
  cumtime:累计耗时(含子函数)← 找慢的调用链

诊断规律:
  tottime 很高 → 这个函数本身是 CPU 瓶颈,优化算法
  cumtime 很高 但 tottime 很低 → 时间在子函数里,继续往下看
  {built-in method select} 高 → I/O 等待(网络/磁盘),考虑异步
  tottime/ncalls(percall)高 → 单次调用慢,但调用次数少
  ncalls 极高 → 调用频率问题,考虑缓存

2.3 可视化:SnakeViz

pip install snakeviz
python -m cProfile -o profile.prof my_agent.py
snakeviz profile.prof   # 在浏览器打开交互式火焰图

2.4 针对 Agent 的分析实践

import cProfile
import pstats
from contextlib import contextmanager

@contextmanager
def profiling(output_file: str = None, top_n: int = 20, sort_by: str = "cumulative"):
    """
    方便的 profiling 上下文管理器
    用法:with profiling("agent.prof"):
              run_agent(...)
    """
    profiler = cProfile.Profile()
    profiler.enable()
    try:
        yield profiler
    finally:
        profiler.disable()
        stats = pstats.Stats(profiler)
        stats.strip_dirs()
        stats.sort_stats(sort_by)

        if output_file:
            stats.dump_stats(output_file)
            print(f"Profile 已保存到:{output_file}(用 snakeviz 查看)")

        stats.print_stats(top_n)


# 使用
async def analyze_performance():
    with profiling("agent_run.prof", top_n=15):
        await agent.run("分析最新的 AI 市场趋势")

三、line_profiler:逐行找热点

cProfile 找到慢函数后,line_profiler 精确到行级别。

pip install line_profiler
# 方法1:装饰器(需要用 kernprof 运行)
from line_profiler import profile

@profile   # 标记要分析的函数
def tokenize_batch(texts: list[str]) -> list[list[int]]:
    results = []
    for text in texts:
        # 哪一行最慢?
        cleaned   = text.strip().lower()          # 行1
        tokens    = cleaned.split()               # 行2
        token_ids = [vocab.get(t, 0) for t in tokens]  # 行3
        padded    = token_ids[:max_len] + [0] * max(0, max_len - len(token_ids))  # 行4
        results.append(padded)
    return results

# 运行命令:
# kernprof -l -v my_script.py
# 输出示例:
# Line #   Hits       Time  Per Hit   % Time  Line Contents
# ─────────────────────────────────────────────────────────
#     10   1000     512.3      0.5      1.2   cleaned = text.strip().lower()
#     11   1000     623.4      0.6      1.5   tokens = cleaned.split()
#     12   1000  38291.2     38.3     90.8   token_ids = [vocab.get(t, 0) ...]  ← 热点!
#     13   1000    2134.1      2.1      5.1   padded = token_ids[:max_len] + ...
# → 行12 占 90%,vocab.get 在循环里太慢 → 改用批量 numpy 查找


# 方法2:不改代码,直接程序内使用
from line_profiler import LineProfiler

def measure_line_by_line(func, *args, **kwargs):
    lp = LineProfiler()
    lp.add_function(func)
    lp_wrapper = lp(func)
    result = lp_wrapper(*args, **kwargs)
    lp.print_stats()
    return result

result = measure_line_by_line(tokenize_batch, sample_texts)

四、内存分析:tracemalloc 与泄漏排查

4.1 使用 tracemalloc 追踪内存分配

import tracemalloc
import gc

def find_memory_leak():
    """
    找内存泄漏的标准流程
    """
    # 第一次快照(baseline)
    tracemalloc.start()
    gc.collect()
    snapshot1 = tracemalloc.take_snapshot()

    # 运行可能泄漏的代码(多次运行以放大差异)
    for _ in range(100):
        result = run_agent("测试查询")
        # 正常情况:每次循环后内存应该回到基线
        # 泄漏情况:内存持续增长

    gc.collect()
    snapshot2 = tracemalloc.take_snapshot()

    # 对比两次快照
    stats = snapshot2.compare_to(snapshot1, "lineno")
    print("内存增量 Top 10:")
    for stat in stats[:10]:
        print(stat)
    # 输出示例:
    # my_module.py:45: size=12.5 MiB (+12.5 MiB), count=1024 (+1024)
    # ↑ 这一行每次都在分配新内存却没有释放 → 泄漏!

    tracemalloc.stop()


# 快速检查:运行前后内存变化
import psutil, os

def memory_usage_mb() -> float:
    return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024

before = memory_usage_mb()
for _ in range(1000):
    run_agent("测试")
after = memory_usage_mb()
print(f"内存变化:{after - before:.1f} MB({'泄漏' if after - before > 50 else '正常'})")

4.2 DL 场景的常见内存问题

import torch

# ── 问题1:训练循环中积累计算图 ─────────────────────────────
# ❌ 错误:把 loss 对象本身加到 list(保留计算图!)
losses = []
for batch in dataloader:
    loss = compute_loss(batch)
    losses.append(loss)   # loss 包含整个计算图!内存暴增

# ✅ 正确:只保留标量值
losses = []
for batch in dataloader:
    loss = compute_loss(batch)
    losses.append(loss.item())   # .item() 断开计算图,只取数值

# ── 问题2:验证阶段忘记 no_grad ──────────────────────────────
# ❌ 推理时不用 no_grad,中间激活值全都被保留用于梯度计算
def validate_wrong(model, loader):
    for batch in loader:
        output = model(batch)     # 保存了整个前向计算图!显存暴涨

# ✅ 正确
def validate_correct(model, loader):
    with torch.no_grad():
        for batch in loader:
            output = model(batch)

# ── 问题3:GPU 显存碎片化 ───────────────────────────────────
# 大 batch 推理后,PyTorch 缓存了大块显存,后续小 batch 反而 OOM
torch.cuda.empty_cache()    # 清空缓存(谨慎使用,会影响性能)

# ── 问题4:循环引用导致对象无法被 GC ───────────────────────
import gc
gc.collect()   # 强制 GC 收集循环引用(通常不需要,但特殊情况有用)

五、模型推理优化:从量化到编译

5.1 完整优化路线图

原始 FP32 模型(基线)
    ↓ torch.compile(10-50% 提升,零精度损失)
编译优化模型
    ↓ FP16/BF16 推理(2-3x 提升,精度几乎不变)
半精度模型
    ↓ INT8 量化(3-4x 提升,精度轻微下降)
量化模型
    ↓ TensorRT 导出(5-10x 提升,需要 NVIDIA GPU)
TensorRT 引擎
    ↓ 动态批处理(GPU 利用率从 30% → 80%+)
高吞吐服务

每一步都需要在精度和速度之间权衡,选择适合业务的停留点

5.2 torch.compile:一行优化

import torch
import time

model = load_pretrained_model()
model.eval()
model = model.to("cuda")

# 未编译:baseline
x = torch.randn(1, 512, device="cuda")
with torch.no_grad():
    for _ in range(10):   # 预热
        model(x)
t0 = time.perf_counter()
with torch.no_grad():
    for _ in range(100):
        model(x)
baseline = (time.perf_counter() - t0) / 100 * 1000
print(f"未编译:{baseline:.2f}ms")

# 编译(第一次调用会触发编译,后续快)
compiled_model = torch.compile(
    model,
    mode="reduce-overhead",    # 小 batch 推理首选
    # "max-autotune":更深优化,适合固定形状大 batch
    fullgraph=False,           # True:整图编译(更快,但不支持 Python 控制流)
)

# 预热(触发编译)
print("编译中(约 30-120s)...")
with torch.no_grad():
    for _ in range(5):
        compiled_model(x)

# 测速
t0 = time.perf_counter()
with torch.no_grad():
    for _ in range(100):
        compiled_model(x)
compiled = (time.perf_counter() - t0) / 100 * 1000
print(f"编译后:{compiled:.2f}ms(提升 {baseline/compiled:.1f}x)")

5.3 INT8 量化:PyTorch 原生方案

import torch
from torch.quantization import (
    quantize_dynamic,
    prepare,
    convert,
    get_default_qconfig,
)

# ── 方案1:动态量化(最简单,适合 Linear 层)────────────────
model_dynamic_q = quantize_dynamic(
    model,
    qconfig_spec={torch.nn.Linear},   # 只量化 Linear 层
    dtype=torch.qint8,
)

# 对比模型大小
import os
torch.save(model.state_dict(), "model_fp32.pt")
torch.save(model_dynamic_q.state_dict(), "model_int8.pt")
fp32_size = os.path.getsize("model_fp32.pt") / 1024 / 1024
int8_size  = os.path.getsize("model_int8.pt") / 1024 / 1024
print(f"FP32: {fp32_size:.1f} MB → INT8: {int8_size:.1f} MB(压缩 {fp32_size/int8_size:.1f}x)")


# ── 方案2:静态量化(更精确,需要 calibration)──────────────
def static_quantize(model, calibration_loader):
    """
    静态量化:用 calibration 数据确定量化参数(scale/zero_point)
    精度优于动态量化,但需要代表性数据
    """
    model.eval()
    model.qconfig = get_default_qconfig("fbgemm")  # CPU 用 fbgemm,GPU 用 qnnpack

    # 插入 Observer(观察激活值分布)
    model_prepared = prepare(model)

    # Calibration(用小批量真实数据运行前向,Observer 记录统计信息)
    with torch.no_grad():
        for batch in calibration_loader:
            model_prepared(batch)

    # 将 Observer 替换为量化算子
    model_quantized = convert(model_prepared)
    return model_quantized


# ── 方案3:BitsAndBytes 4bit 量化(大模型首选)─────────────
# pip install bitsandbytes
from transformers import AutoModelForCausalLM, BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,      # 嵌套量化,进一步压缩
    bnb_4bit_quant_type="nf4",           # NF4(Normalized Float 4),精度更好
)

# 70B 模型从 140GB FP16 → 约 40GB 4bit,单卡可跑
model_4bit = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-3-70B",
    quantization_config=quantization_config,
    device_map="auto",    # 自动分配到多卡
)

5.4 KV Cache 优化(LLM 推理专属)

# KV Cache:避免重复计算历史 token 的 K/V
# 生成 1000 token 时:
# 无 KV Cache:每步计算所有历史 token 的 attention → O(n²) 复杂度
# 有 KV Cache:只计算新 token,缓存历史 K/V → O(n) 复杂度

# HuggingFace 默认开启,但要注意 cache 的内存开销
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

model     = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

input_ids = tokenizer("Hello, I am", return_tensors="pt").input_ids

# 生成(自动使用 KV Cache)
with torch.no_grad():
    output = model.generate(
        input_ids,
        max_new_tokens=100,
        use_cache=True,          # 默认 True,关闭会慢很多
        do_sample=False,
    )

# Flash Attention 2:更高效的注意力计算(需要 GPU)
# pip install flash-attn
model_fa2 = AutoModelForCausalLM.from_pretrained(
    "gpt2",
    attn_implementation="flash_attention_2",   # 替换注意力实现
    torch_dtype=torch.float16,
)
# Flash Attention 2:内存 O(n) vs 标准注意力 O(n²),速度快 2-4x

六、动态批处理:GPU 利用率从 20% 到 80%

6.1 为什么 GPU 利用率低

单请求模式(常见错误):
  请求1: 发送 → [等GPU] → 收结果(GPU利用率5%)
  请求2:           发送 → [等GPU] → 收结果
  请求3:                    发送 → [等GPU] → 收结果
  GPU 大部分时间在等请求,实际利用率 10-30%

批处理模式(正确做法):
  积累多个请求 → 合并成大 batch → 一次 GPU 运算
  请求1,2,3,4,5 → [batch size=5, GPU利用率80%+] → 返回结果
  GPU 吞吐量提升 5-10x,延迟略有增加(等待凑批的时间)

6.2 异步动态批处理器

import asyncio
import time
import torch
from dataclasses import dataclass, field
from typing import Any
import uuid

@dataclass
class PendingRequest:
    """等待处理的单个请求"""
    request_id: str
    inputs:     dict[str, Any]
    future:     asyncio.Future     # 结果会被设置到这里
    arrived_at: float = field(default_factory=time.perf_counter)


class DynamicBatcher:
    """
    动态批处理器:积累请求直到 max_batch_size 或 max_wait_ms 超时
    原理:
      - 请求到来时放入队列,返回 Future
      - 后台 worker 每隔 max_wait_ms 或积满 max_batch_size 就处理一批
      - 处理完成后设置每个请求的 Future
    """

    def __init__(
        self,
        model,
        max_batch_size: int   = 32,
        max_wait_ms:    float = 20.0,   # 最多等 20ms 凑批
        device:         str   = "cuda",
    ):
        self.model          = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms    = max_wait_ms / 1000   # 转秒
        self.device         = device

        self._queue:  list[PendingRequest] = []
        self._lock    = asyncio.Lock()
        self._event   = asyncio.Event()   # 有新请求时通知 worker
        self._running = False

    async def start(self):
        """启动后台 batch worker"""
        self._running = True
        asyncio.create_task(self._batch_worker())
        print("DynamicBatcher 已启动")

    async def stop(self):
        self._running = False

    async def infer(self, inputs: dict[str, Any]) -> Any:
        """
        提交单个推理请求,返回结果(await 直到处理完成)
        """
        loop    = asyncio.get_event_loop()
        future  = loop.create_future()
        request = PendingRequest(
            request_id=str(uuid.uuid4()),
            inputs=inputs,
            future=future,
        )

        async with self._lock:
            self._queue.append(request)
            self._event.set()   # 通知 worker 有新请求

        return await future   # 等待 worker 设置结果

    async def _batch_worker(self):
        """
        后台 worker:持续监听队列,凑批处理
        """
        while self._running:
            # 等待第一个请求
            await self._event.wait()
            self._event.clear()

            # 等待最多 max_wait_ms,或者 batch 满了立即处理
            deadline = time.perf_counter() + self.max_wait_ms
            while time.perf_counter() < deadline:
                async with self._lock:
                    if len(self._queue) >= self.max_batch_size:
                        break
                await asyncio.sleep(0.001)   # 每 1ms 检查一次

            # 取出当前队列的所有请求(不超过 max_batch_size)
            async with self._lock:
                batch = self._queue[:self.max_batch_size]
                self._queue = self._queue[self.max_batch_size:]

            if not batch:
                continue

            # 合并请求,组成大 batch
            try:
                results = await self._process_batch(batch)
                # 把结果分发给每个请求的 Future
                for req, res in zip(batch, results):
                    if not req.future.done():
                        req.future.set_result(res)
            except Exception as e:
                # 整批失败:通知所有请求
                for req in batch:
                    if not req.future.done():
                        req.future.set_exception(e)

    async def _process_batch(self, batch: list[PendingRequest]) -> list[Any]:
        """
        实际的批处理推理(在线程池中运行,避免阻塞事件循环)
        """
        loop = asyncio.get_event_loop()

        def _run_inference():
            # 合并输入为 batch tensor
            input_ids = torch.stack([
                torch.tensor(req.inputs["input_ids"], device=self.device)
                for req in batch
            ])
            attn_mask = torch.stack([
                torch.tensor(req.inputs["attention_mask"], device=self.device)
                for req in batch
            ])

            # GPU 推理
            with torch.inference_mode():
                logits = self.model(input_ids=input_ids, attention_mask=attn_mask)

            # 拆分结果
            return [logits[i].cpu().tolist() for i in range(len(batch))]

        # 在线程池中运行(不阻塞事件循环)
        return await loop.run_in_executor(None, _run_inference)


# ── 在 FastAPI 中使用 ──────────────────────────────────────────
from fastapi import FastAPI

app     = FastAPI()
batcher = DynamicBatcher(model=my_model, max_batch_size=32, max_wait_ms=20)


@app.on_event("startup")
async def startup():
    await batcher.start()


@app.post("/predict")
async def predict(request: PredictRequest):
    t0 = time.perf_counter()

    result = await batcher.infer({
        "input_ids":      request.token_ids,
        "attention_mask": request.attention_mask,
    })

    return {
        "prediction": result,
        "latency_ms": (time.perf_counter() - t0) * 1000,
    }

七、并发模型服务:请求队列 + 多进程

7.1 为什么需要多进程

单进程 + 单 GPU 的问题:
  ① Python GIL:即使多线程,CPU 预处理是串行的
  ② 单 GPU 未充分利用:显存还有 50% 空余,算力用了 30%
  ③ 单进程挂了 → 服务整体挂掉(无冗余)

多进程 + 多 GPU 的解法:
  多个 Worker 进程,每个绑定一个 GPU
  通过 Nginx 做请求负载均衡
  每个 Worker 独立运行,挂一个不影响其他

gunicorn + uvicorn + FastAPI 的标准部署方案:
  gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
  # 4 个 worker 进程,每个运行 uvicorn 事件循环

7.2 多 GPU Worker 的实现

import torch
import torch.multiprocessing as mp
from multiprocessing import Queue
import os

def worker_process(
    rank:         int,
    num_gpus:     int,
    request_queue:  Queue,
    result_queue:   Queue,
    model_path:   str,
):
    """
    每个 GPU 对应一个 Worker 进程
    从 request_queue 取请求,处理后放入 result_queue
    """
    # 绑定 GPU
    device = torch.device(f"cuda:{rank % num_gpus}")
    torch.cuda.set_device(device)

    # 加载模型(每个进程独立加载,内存不共享)
    model = load_model(model_path).to(device)
    model.eval()

    print(f"Worker {rank} 在 GPU {device} 上就绪")

    while True:
        try:
            # 从队列取请求(阻塞等待)
            request_id, inputs = request_queue.get(timeout=1.0)
        except Exception:
            continue   # 超时,继续等待

        if request_id == "STOP":
            break      # 收到停止信号

        # 推理
        try:
            with torch.inference_mode():
                input_tensor = {
                    k: torch.tensor(v, device=device)
                    for k, v in inputs.items()
                }
                output = model(**input_tensor)
                result = output.logits.cpu().tolist()

            result_queue.put((request_id, result, None))   # (id, result, error)

        except Exception as e:
            result_queue.put((request_id, None, str(e)))


class MultiGPUServer:
    """多 GPU 推理服务器"""

    def __init__(self, model_path: str, num_workers: int = 4):
        self.num_workers   = num_workers
        self.request_queue = mp.Queue(maxsize=1000)   # 最大积压 1000 个请求
        self.result_queue  = mp.Queue()
        self._pending      = {}   # request_id → asyncio.Future

        # 启动 Worker 进程
        self.workers = []
        num_gpus = torch.cuda.device_count()
        for rank in range(num_workers):
            p = mp.Process(
                target=worker_process,
                args=(rank, num_gpus, self.request_queue,
                      self.result_queue, model_path),
                daemon=True,
            )
            p.start()
            self.workers.append(p)

        # 启动结果收集协程
        self._result_collector_task = None

    async def start_result_collector(self):
        """异步收集 Worker 的结果,设置对应的 Future"""
        loop = asyncio.get_event_loop()
        while True:
            # 非阻塞地检查结果队列
            try:
                request_id, result, error = await loop.run_in_executor(
                    None, lambda: self.result_queue.get(timeout=0.01)
                )
                future = self._pending.pop(request_id, None)
                if future and not future.done():
                    if error:
                        future.set_exception(RuntimeError(error))
                    else:
                        future.set_result(result)
            except Exception:
                await asyncio.sleep(0.001)

    async def infer(self, request_id: str, inputs: dict) -> list:
        """提交请求,等待结果"""
        loop   = asyncio.get_event_loop()
        future = loop.create_future()
        self._pending[request_id] = future

        # 放入请求队列
        await loop.run_in_executor(
            None,
            lambda: self.request_queue.put((request_id, inputs))
        )

        return await asyncio.wait_for(future, timeout=60.0)

    def shutdown(self):
        for _ in self.workers:
            self.request_queue.put(("STOP", None))
        for p in self.workers:
            p.join(timeout=5.0)

八、Agent 服务的生产部署架构

8.1 完整架构图

                          用户
                           │ HTTP/WebSocket
                       ┌───▼────┐
                       │ Nginx  │ 负载均衡 + SSL 终止 + 限流
                       └───┬────┘
              ┌────────────┼────────────┐
          ┌───▼───┐    ┌───▼───┐    ┌───▼───┐
          │Worker1│    │Worker2│    │Worker3│  FastAPI + uvicorn
          └───┬───┘    └───┬───┘    └───┬───┘
              │            │            │
          ┌───▼────────────▼────────────▼───┐
          │         Redis(消息队列)          │  长时任务的异步处理
          └─────────────────────────────────┘
              │
          ┌───▼───┐
          │Worker │  后台 Agent 任务执行器
          │Process│  (长时运行,不阻塞 HTTP)
          └───┬───┘
              │
    ┌─────────┼──────────┐
    │         │          │
 ┌──▼──┐  ┌──▼──┐   ┌───▼──┐
 │LLM  │  │Tools│   │向量DB │  外部服务
 │API  │  │API  │   │      │
 └─────┘  └─────┘   └──────┘

8.2 长时 Agent 任务的异步处理

# 对于耗时 > 10s 的 Agent 任务,不能让 HTTP 连接等着
# 标准做法:立即返回 task_id,客户端轮询或用 WebSocket 推送

import asyncio
import uuid
from enum import Enum
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

class TaskStatus(str, Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    COMPLETED = "completed"
    FAILED    = "failed"

class AgentTask(BaseModel):
    task_id:    str
    status:     TaskStatus
    result:     str | None = None
    error:      str | None = None
    created_at: float
    updated_at: float

# 任务存储(生产中用 Redis)
task_store: dict[str, AgentTask] = {}

app = FastAPI()


@app.post("/agent/submit")
async def submit_task(
    request: AgentRequest,
    background_tasks: BackgroundTasks,
):
    """
    提交 Agent 任务,立即返回 task_id
    """
    task_id = str(uuid.uuid4())
    now     = time.time()

    task = AgentTask(
        task_id=task_id,
        status=TaskStatus.PENDING,
        created_at=now,
        updated_at=now,
    )
    task_store[task_id] = task

    # 在后台执行(不阻塞当前请求)
    background_tasks.add_task(
        execute_agent_task,
        task_id=task_id,
        user_input=request.message,
    )

    return {"task_id": task_id, "status": "pending"}


async def execute_agent_task(task_id: str, user_input: str):
    """在后台执行 Agent 任务,更新 task_store"""
    task = task_store[task_id]
    task.status     = TaskStatus.RUNNING
    task.updated_at = time.time()

    try:
        result = await agent_executor.run(user_input)
        task.status     = TaskStatus.COMPLETED
        task.result     = result
        task.updated_at = time.time()
    except Exception as e:
        task.status     = TaskStatus.FAILED
        task.error      = str(e)
        task.updated_at = time.time()


@app.get("/agent/status/{task_id}")
async def get_task_status(task_id: str):
    """轮询任务状态"""
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(404, f"任务 {task_id} 不存在")
    return task


@app.websocket("/agent/ws/{task_id}")
async def task_websocket(websocket, task_id: str):
    """WebSocket:实时推送任务进度(流式 Agent 输出)"""
    await websocket.accept()
    try:
        while True:
            task = task_store.get(task_id)
            if not task:
                await websocket.send_json({"error": "任务不存在"})
                break

            await websocket.send_json(task.model_dump())

            if task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED):
                break

            await asyncio.sleep(0.5)   # 每 500ms 推送一次状态
    finally:
        await websocket.close()

九、监控与告警:Prometheus + 关键指标

9.1 关键指标定义

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# ── 计数器(只增不减)───────────────────────────────────────
REQUESTS_TOTAL = Counter(
    "agent_requests_total",
    "Agent 请求总数",
    labelnames=["status", "model"],   # 按状态和模型分类
)

LLM_CALLS_TOTAL = Counter(
    "llm_api_calls_total",
    "LLM API 调用总数",
    labelnames=["provider", "model", "status"],
)

TOOL_CALLS_TOTAL = Counter(
    "tool_calls_total",
    "工具调用总数",
    labelnames=["tool_name", "status"],
)

# ── 直方图(延迟分布)────────────────────────────────────────
REQUEST_LATENCY = Histogram(
    "agent_request_duration_seconds",
    "Agent 请求端到端延迟",
    labelnames=["model"],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, float("inf")],
)

LLM_LATENCY = Histogram(
    "llm_api_duration_seconds",
    "LLM API 单次调用延迟",
    labelnames=["model"],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, float("inf")],
)

TOKENS_PER_REQUEST = Histogram(
    "agent_tokens_per_request",
    "每次 Agent 请求消耗的 Token 数",
    labelnames=["model"],
    buckets=[100, 500, 1000, 2000, 5000, 10000, 50000],
)

# ── 仪表盘(当前状态)────────────────────────────────────────
ACTIVE_REQUESTS = Gauge(
    "agent_active_requests",
    "当前正在处理的 Agent 请求数",
)

GPU_MEMORY_USED = Gauge(
    "gpu_memory_used_bytes",
    "GPU 显存占用",
    labelnames=["device"],
)

QUEUE_SIZE = Gauge(
    "request_queue_size",
    "等待处理的请求队列长度",
)

9.2 装饰器自动采集指标

import functools
import time
from contextlib import contextmanager
import torch

@contextmanager
def track_request(model: str = "claude-sonnet"):
    """追踪单次 Agent 请求的完整指标"""
    ACTIVE_REQUESTS.inc()
    start = time.perf_counter()

    try:
        yield
        REQUESTS_TOTAL.labels(status="success", model=model).inc()
    except Exception:
        REQUESTS_TOTAL.labels(status="error", model=model).inc()
        raise
    finally:
        elapsed = time.perf_counter() - start
        REQUEST_LATENCY.labels(model=model).observe(elapsed)
        ACTIVE_REQUESTS.dec()


def track_llm_call(model: str):
    """追踪 LLM API 调用的装饰器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                result = await func(*args, **kwargs)
                LLM_CALLS_TOTAL.labels(
                    provider="anthropic", model=model, status="success"
                ).inc()
                return result
            except Exception as e:
                LLM_CALLS_TOTAL.labels(
                    provider="anthropic", model=model, status="error"
                ).inc()
                raise
            finally:
                LLM_LATENCY.labels(model=model).observe(
                    time.perf_counter() - start
                )
        return wrapper
    return decorator


def update_gpu_metrics():
    """定期更新 GPU 显存指标"""
    if not torch.cuda.is_available():
        return
    for i in range(torch.cuda.device_count()):
        used = torch.cuda.memory_allocated(i)
        GPU_MEMORY_USED.labels(device=f"cuda:{i}").set(used)


# ── FastAPI 集成 ──────────────────────────────────────────────
from fastapi import FastAPI, Request
from prometheus_client import make_asgi_app

app = FastAPI()

# 挂载 Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

@app.middleware("http")
async def prometheus_middleware(request: Request, call_next):
    """对所有 HTTP 请求自动采集延迟指标"""
    start = time.perf_counter()
    response = await call_next(request)
    elapsed  = time.perf_counter() - start

    # 只追踪 /agent/ 路径
    if request.url.path.startswith("/agent/"):
        REQUEST_LATENCY.labels(model="all").observe(elapsed)

    return response


@app.post("/agent/run")
async def run_agent_endpoint(request: AgentRequest):
    async with track_request(model=request.model):
        result = await agent.run(request.message)
    return {"result": result}


# 启动时开启 Prometheus 服务(可选,或用 FastAPI 挂载)
# start_http_server(8001)   # 在 8001 端口暴露 /metrics

9.3 告警规则(Prometheus AlertManager)

# alerts.yml(Prometheus 告警规则)
groups:
  - name: agent_service_alerts
    rules:

      # 错误率过高
      - alert: HighErrorRate
        expr: |
          rate(agent_requests_total{status="error"}[5m])
          /
          rate(agent_requests_total[5m]) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Agent 错误率超过 5%(当前:{{ $value | humanizePercentage }})"

      # P99 延迟过高
      - alert: HighP99Latency
        expr: |
          histogram_quantile(0.99, rate(agent_request_duration_seconds_bucket[5m])) > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Agent P99 延迟超过 30s(当前:{{ $value | humanizeDuration }})"

      # 请求队列积压
      - alert: QueueBacklog
        expr: request_queue_size > 100
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "请求队列积压 {{ $value }} 个,可能需要扩容"

      # GPU 显存告警
      - alert: GPUMemoryHigh
        expr: |
          gpu_memory_used_bytes / (16 * 1024 * 1024 * 1024) > 0.9
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "GPU 显存使用率超过 90%,有 OOM 风险"

      # LLM API 调用失败率
      - alert: LLMAPIFailureRate
        expr: |
          rate(llm_api_calls_total{status="error"}[5m])
          /
          rate(llm_api_calls_total[5m]) > 0.1
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "LLM API 失败率 {{ $value | humanizePercentage }},检查 API key 和网络"

十、六篇系列收官:完整技术地图

历经六篇,从 Python 基础到生产部署,完整覆盖了 AI 工程师的技术栈:

第一篇:Python 基础精要
  对象引用模型 → 可变性陷阱 → 生成器(DataLoader 底层)
  → 装饰器(Agent 工具注册)→ 上下文管理器(no_grad 原理)

第二篇:NumPy 矩阵运算
  内存布局 → 广播规则 → 向量化(ReLU/Softmax)
  → einsum → 手推 Self-Attention 完整前向传播

第三篇:PyTorch 核心
  Tensor Storage → autograd 计算图 → 自定义 Function
  → Dataset/DataLoader → 生产训练循环(AMP+梯度裁剪+checkpoint)

第四篇:Agent 异步编程
  事件循环原理 → 协程挂起恢复 → gather/as_completed
  → 并发 LLM 调用 → 工具并行执行 → SSE 流式输出 → FastAPI

第五篇:类型系统与工程化
  Pydantic V2 → field/model_validator → JSON Schema 自动生成
  → LLM 结构化输出 → 自动重试 → Protocol/TypedDict

第六篇:性能与部署(本篇)
  cProfile/line_profiler → tracemalloc 内存泄漏
  → 量化/compile/Flash Attention → 动态批处理
  → 多进程服务 → Prometheus 监控告警

六篇串起来的核心认知:

# 从第一篇的"变量是引用"到第六篇的"GPU利用率",
# 贯穿始终的是同一个工程原则:

# 理解你的工具在内存里、在计算图里、在网络里到底在做什么。
# 不理解就无法调优,不调优就无法生产,不生产就是玩具。

# 每篇的"深度学习视角"不是装饰——
# 它是这些基础知识在你每天的工作里真正出现的地方。

💬 六篇看完了,哪篇对你帮助最大?有什么你觉得应该补充的主题? 欢迎评论区聊!

🙏 六篇系列完结撒花!如果整个系列帮到你,最后一次三连(点赞👍 + 收藏⭐ + 关注)!感谢一路相伴!


本文为原创技术分享。转载请注明出处。最后更新:2026-05-14
Python × 深度学习 × Agent 系列(六篇)完结 🎉

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐