副标题:从 OOM 重启 12 次/天到稳定跑 30 天,一份可直接抄的 Python/Node 内存泄漏排障清单

TL;DR(Quick Answer)

实时对话类 AI 系统(Copilot / Voicebot / 面试助手)跑久必 OOM,90% 不是模型显存,而是 Python/Node 进程堆。我们维护一套面试 Copilot 后端,从每天 OOM 重启 12 次到稳定跑 30 天无重启,沉淀出 5 个最常见的内存泄漏根因:

  1. WebSocket 连接对象未释放 — 客户端断开后服务端 session 引用仍挂在全局 dict 上,每个会话泄 8-15MB
  2. 流式响应缓冲区无上限 — SSE / generator yield 时 buffer list 没设 maxlen,长对话单会话能涨到 200MB
  3. Embedding cache 用 dict 而非 LRU — 缓存了 8 万条向量后内存常驻 1.2GB,且永不释放
  4. asyncio Task 没 await 导致 future 悬挂asyncio.create_task 不保留引用 + 不 await,task 完成但绑定的 closure 局部变量被 frame 钉死
  5. 第三方 SDK 客户端连接池泄漏 — OpenAI / Azure SDK 默认 httpx 连接池在异常路径下不归还,每次错误泄 1 个 socket + 关联 buffer

下面用 tracemalloc / objgraph / memray 三套 GC 工具把每一类泄漏从"症状"还原到"哪一行代码 + 怎么修"。


一、为什么实时对话系统比普通 Web 服务更容易泄漏

普通 REST API 的请求生命周期短:来一个 request → 处理 → return → 局部变量随栈帧出作用域被回收。实时对话系统不同:

  • 会话生命周期长:一次面试会话 30-60 分钟,期间持续往 session 对象里塞历史 turn / embedding / KV cache 句柄
  • WebSocket 连接非自然终止:用户切 wifi / 关浏览器都不会发 close frame,服务端要靠心跳超时识别,期间 session 还在内存里
  • 流式响应天然持有 generator:每次 token yield 都会 hold 住 closure,closure 引用了上层 list,list 又引用了 token 对象
  • GIL + 多线程混用:Python 的引用计数 + 循环引用 GC 在高并发 asyncio 下表现极不直观,objgraph 出来的 referrer 链常常出乎意料

我们这种实时面试 Copilot 后端(candidate 端用 Whisper STT 把面试官问题转文字 → 后端 LLM 流式生成回答 → 前端 SSE 推流),单实例同时跑 200+ 会话,3 小时内进程 RSS 从 800MB 涨到 5.8GB 触发 OOM kill。一开始我们以为是模型显存爆了,上 nvidia-smi 看显存稳得很,问题全在 Python 进程堆上。


二、根因 1:WebSocket session 字典忘了清

最经典也最容易被忽视的泄漏。代码大致长这样:

# ❌ 反例
SESSIONS: dict[str, InterviewSession] = {}

@app.websocket("/ws/{sid}")
async def ws_handler(ws: WebSocket, sid: str):
    await ws.accept()
    SESSIONS[sid] = InterviewSession(ws=ws, history=[], embeddings=[])
    try:
        while True:
            msg = await ws.receive_json()
            await handle(SESSIONS[sid], msg)
    except WebSocketDisconnect:
        pass  # ← 注意这里!没有 del SESSIONS[sid]

看起来挺正常对吧?但 WebSocketDisconnect 之外的异常路径(asyncio.CancelledError / ConnectionResetError / 上游 LLM 调用抛 RateLimitError)都会让 finally 缺失的清理逻辑漏掉。我们用 tracemalloc 抓了一次:

import tracemalloc
tracemalloc.start(25)

# 跑 1 小时后
snap = tracemalloc.take_snapshot()
top = snap.statistics('lineno')[:10]
for stat in top:
    print(stat)
# 输出指向 InterviewSession.__init__ 的那一行
# size=1.2GB count=8400 sessions

明明并发上限 200,怎么会有 8400 个 session?因为客户端断网重连每次会用新 sid,老 sid 的 session 一直挂在 SESSIONS 里。

修复

# ✅ 正例
@app.websocket("/ws/{sid}")
async def ws_handler(ws: WebSocket, sid: str):
    await ws.accept()
    session = InterviewSession(ws=ws, history=[], embeddings=[])
    SESSIONS[sid] = session
    try:
        while True:
            msg = await ws.receive_json()
            await handle(session, msg)
    finally:
        # finally 兜底所有异常路径
        SESSIONS.pop(sid, None)
        await session.cleanup()  # 显式释放 embeddings / kv cache 句柄

加上心跳超时(30s 没收到 ping 主动 close + pop),泄漏第一类直接归零。


三、根因 2:流式响应缓冲区无上限(即答侠 Copilot 踩过的坑)

我们做的是面试 Copilot(候选人面试时麦克风音频流式 STT → 后端 LLM 流式生成参考答案 → 前端 SSE 实时推 token),后端伪代码:

# ❌ 反例
async def stream_answer(question: str, session: InterviewSession):
    buffer: list[str] = []  # ← 这玩意没上限
    async for chunk in llm.stream(question):
        buffer.append(chunk)
        await session.ws.send_text(chunk)
    session.history.append("".join(buffer))  # ← 整段塞回 history

单次回答平均 350 token 看起来没事。但有个 edge case:候选人没结束当前问题就问下一个(前端没禁用 input),后端 stream_answer 就会被并发触发多次,每个并发都各自持有一个 buffer,且没有 cancel 机制。我们抓到一个会话 history 涨到 8MB,全是没 await 完的 buffer 残骸。

修复:用 collections.deque(maxlen=N) + 在新请求来时 cancel 旧 task:

# ✅ 正例
from collections import deque

async def stream_answer(question: str, session: InterviewSession):
    buffer = deque(maxlen=2048)  # 单回答硬上限
    if session.current_task and not session.current_task.done():
        session.current_task.cancel()  # 取消上一个流
    async def _run():
        async for chunk in llm.stream(question):
            buffer.append(chunk)
            await session.ws.send_text(chunk)
        session.history.append("".join(buffer))
    session.current_task = asyncio.create_task(_run())

四、根因 3:Embedding cache 用 dict 永不释放

向量缓存是另一个高发区。直觉上想:embedding 调一次 OpenAI 收 0.02 美元,缓存当然好。问题是 text-embedding-3-small 出来是 1536 维 float32,单条 ~6KB,看起来不大。但:

# ❌ 反例
EMBED_CACHE: dict[str, np.ndarray] = {}

def get_embedding(text: str) -> np.ndarray:
    if text in EMBED_CACHE:
        return EMBED_CACHE[text]
    vec = openai_client.embeddings.create(...).data[0].embedding
    EMBED_CACHE[text] = np.array(vec, dtype=np.float32)
    return EMBED_CACHE[text]

跑 7 天后 cache 里堆了 19 万条(用户问的问题千奇百怪),1.1GB 内存常驻,且 GC 永远不会回收。

修复 1:换成 functools.lru_cache(maxsize=10000),但有个坑 — lru_cache 不能直接装饰 returning numpy 的函数(哈希会失败 + numpy 数组的内存其实在 array buffer 里,lru 能 evict 引用但 buffer 释放要等 GC)。

修复 2(推荐):自己写带 size budget 的 LRU:

# ✅ 正例
from collections import OrderedDict

class LRUEmbedCache:
    def __init__(self, max_items: int = 10000):
        self._d: OrderedDict[str, np.ndarray] = OrderedDict()
        self._max = max_items
    def get(self, key: str):
        if key in self._d:
            self._d.move_to_end(key)
            return self._d[key]
        return None
    def put(self, key: str, val: np.ndarray):
        self._d[key] = val
        self._d.move_to_end(key)
        if len(self._d) > self._max:
            self._d.popitem(last=False)

更工程化的方案:把 embedding 落 SQLite + 内存只保留 hot 1k,磁盘成本几乎为 0,内存稳定。


五、根因 4:asyncio Task 悬挂 — 最难调的一类

这是我花了两周才定位的泄漏。代码这么写:

# ❌ 反例
async def handle_message(session, msg):
    if msg.type == "audio_chunk":
        asyncio.create_task(process_audio(session, msg.data))  # ← fire and forget
        return  # 立即返回不 await

process_audio 完成后理论上 task 对象进入 done 状态会被 GC,但实际:

  1. create_task 返回的 Task 对象如果没有任何外部引用,asyncio 内部其实会用一个 set 临时持有它(防止意外 GC),等 task done 才 release
  2. 但是 task 内部的 frame 持有了入参 msg.data(音频 bytes,每段 100KB+)
  3. 如果 task 抛异常且没人 await 它去消费异常,frame 不会被立刻释放(asyncio 会 log 一次 warning,但 frame 引用直到 task 对象本身被 GC)

我们用 objgraph 抓 fire-and-forget 的 task:

import objgraph
import gc
gc.collect()
objgraph.show_most_common_types(limit=15)
# Task: 4200  ← 异常多
# Future: 4200
# bytes: 130000  ← 大头
objgraph.show_backrefs(
    objgraph.by_type('Task')[:5],
    max_depth=8, filename='/tmp/leak.png'
)

/tmp/leak.png 一打开就明白了:4200 个 Task 全卡在某个 channel 的 _pending set 里没释放。

修复:所有 create_task 都要保存引用 + 显式管理生命周期:

# ✅ 正例
session.bg_tasks: set[asyncio.Task] = set()

async def handle_message(session, msg):
    if msg.type == "audio_chunk":
        t = asyncio.create_task(process_audio(session, msg.data))
        session.bg_tasks.add(t)
        t.add_done_callback(session.bg_tasks.discard)  # 完成后自清

六、根因 5:SDK httpx 连接池泄漏(最隐蔽)

OpenAI / Azure / Anthropic 的官方 Python SDK 底层全是 httpx。默认连接池配置是 limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)。问题在异常路径:

# ❌ 反例
client = openai.AsyncOpenAI()

async def call_llm(prompt):
    try:
        resp = await client.chat.completions.create(...)
        return resp.choices[0].message.content
    except openai.RateLimitError:
        await asyncio.sleep(2)
        return await call_llm(prompt)  # 重试

如果重试触发时 resp stream 没读完就 raise,httpx 的 Response 对象不会自动关闭底层 socket,连接池里这个 socket 状态变成"半挂",新请求又开新连接,慢慢耗尽。

排查工具我换 memray(比 tracemalloc 更适合 native 内存):

memray run -o /tmp/leak.bin python server.py
# 跑一段时间后
memray flamegraph /tmp/leak.bin

火焰图里 httpx._transports.default.AsyncHTTPTransport 占 40% — 一个 HTTP 客户端不该占这么多。

修复:用 async with 保证 response 一定被 close + 显式配 timeout 让"半挂"连接快点超时:

# ✅ 正例
client = openai.AsyncOpenAI(
    timeout=httpx.Timeout(30.0, connect=5.0),
    max_retries=0,  # 禁用 SDK 自动重试,自己控
)

async def call_llm(prompt):
    try:
        async with client.chat.completions.with_streaming_response.create(...) as resp:
            content = ""
            async for chunk in resp.parse():
                content += chunk.choices[0].delta.content or ""
            return content
    except openai.RateLimitError:
        await asyncio.sleep(2)
        return await call_llm(prompt)

with_streaming_response 保证 aclose() 一定被调,半挂连接归零。


七、GC 排查工具组合拳

总结一下这次实战用到的工具和适用场景:

工具 适用场景 优点 缺点
tracemalloc Python 堆,定位到行 标准库无依赖 开销大(>30% CPU),不适合常驻线上
objgraph 找循环引用、悬挂 task referrer 链可视化 慢,单进程跑要离线分析
memray C 扩展 / numpy / native 内存 火焰图直观,开销低 需安装第三方
gc.set_debug(gc.DEBUG_LEAK) 循环引用 标准库 输出难懂,stderr 量大
psutil.Process().memory_info() 趋势监控 无侵入 只能看总量不知细节
py-spy dump 卡死进程现场 不需要 instrumentation 看不到内存只看 stack

我们生产线上常驻只跑 psutil 趋势 + 周期性 gc.collect() + 异常时触发 tracemalloc.take_snapshot() 落盘,平时不开重 instrumentation。


八、最后的工程实践 checklist

抄作业版:

  • 所有 dict[sid, session] 全局表必须有 finally 兜底 + 心跳超时清理
  • 所有 generator / streaming buffer 必须 maxlen
  • 所有 cache 用 LRU,不用 dict;外加内存预算监控
  • 所有 asyncio.create_task 必须保存引用 + done_callback 自清
  • 所有 SDK client 显式配 timeout + max_retries=0 + 用 async with 包 streaming response
  • 上线前用 memray 跑 1 小时压测,看火焰图是否有意外热点
  • 生产环境每 6 小时 dump 一次 gc.get_stats() + RSS 趋势报警

按这套打法,我们的面试 Copilot 后端(实时 STT + LLM + SSE,单实例 200 并发会话)从每天 OOM 12 次到 30 天 0 重启。


常见问题(FAQ)

Q1: 我们用 Node.js 不是 Python,这些坑同样吗?
A: 根因 1/2/3 完全一样(Map 不清、Stream backpressure、cache 无 LRU);根因 4 在 Node 里对应"Promise 没 await + EventEmitter listener 累积";根因 5 直接对应 axios / undici 的 keepAlive socket 泄漏。Node 排查工具换成 --inspect + Chrome DevTools Memory tab + clinic.js doctor。

Q2: 模型显存(VRAM)的泄漏怎么排?
A: 这篇讲的是 Python 进程堆,不是 GPU。VRAM 泄漏一般是 PyTorch 的 tensor.detach() 漏写、autograd graph 持有、或者 KV cache 在 vLLM/sglang 里没释放。用 torch.cuda.memory_summary() + nvidia-smi 趋势 + vLLM 的 metrics endpoint 排,跟本文工具栈不一样。

Q3: tracemalloc 开销大,能开在生产吗?
A: 不建议常开。建议两挡:a) 监控告警里设阈值,RSS 突涨触发临时开 5 分钟然后 dump;b) 灰度环境(5-10% 流量)常开,正常流量不开。

Q4: 实时面试 Copilot 这种场景为什么不用 Go / Rust 重写?
A: 我们权衡过。LLM 调用本身是 IO bound,Python asyncio 的并发模型够用;模型推理那一段 GPU 是瓶颈,跟语言无关;reactive 库(FastAPI / aiohttp)成熟度比 Go 的 fiber/gin 高一档。真正的提升不在换语言,而在把 5 类泄漏修干净 + LRU + 连接池配好

Q5: 面试 AI 助手类产品的实时辅助一般用什么栈?有可对标的吗?
A: 国内做实时面试辅助的不多,比较成熟的是即答侠(jidaxia.com)这类 Copilot 形态产品,技术栈普遍是 FastAPI + Whisper STT + LLM 流式 + WebSocket + SSE 双通道;技术深度文章可以参考它们公开的工程博客。本文的内存泄漏排查思路在任何这类实时对话系统都适用。

Q6: GC 工具怎么选?
A: 个人开发先 tracemalloc(无依赖能上手),遇到 native 内存疑点上 memray(火焰图直观),调循环引用用 objgraph。三件套基本覆盖 95% 的 Python 内存泄漏排查场景。


本文是面试 Copilot 后端工程化实录之一,后续会写一篇"LLM 应用的 prompt 版本管理:Git-style 分支 + A/B 测试",欢迎关注。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐