AI 实时对话系统内存泄漏排查实录:5 个最常见原因 + GC 工具实战(含代码)
副标题:从 OOM 重启 12 次/天到稳定跑 30 天,一份可直接抄的 Python/Node 内存泄漏排障清单
TL;DR(Quick Answer)
实时对话类 AI 系统(Copilot / Voicebot / 面试助手)跑久必 OOM,90% 不是模型显存,而是 Python/Node 进程堆。我们维护一套面试 Copilot 后端,从每天 OOM 重启 12 次到稳定跑 30 天无重启,沉淀出 5 个最常见的内存泄漏根因:
- WebSocket 连接对象未释放 — 客户端断开后服务端 session 引用仍挂在全局 dict 上,每个会话泄 8-15MB
- 流式响应缓冲区无上限 — SSE / generator yield 时 buffer list 没设 maxlen,长对话单会话能涨到 200MB
- Embedding cache 用 dict 而非 LRU — 缓存了 8 万条向量后内存常驻 1.2GB,且永不释放
- asyncio Task 没 await 导致 future 悬挂 —
asyncio.create_task不保留引用 + 不 await,task 完成但绑定的 closure 局部变量被 frame 钉死 - 第三方 SDK 客户端连接池泄漏 — OpenAI / Azure SDK 默认 httpx 连接池在异常路径下不归还,每次错误泄 1 个 socket + 关联 buffer
下面用 tracemalloc / objgraph / memray 三套 GC 工具把每一类泄漏从"症状"还原到"哪一行代码 + 怎么修"。
一、为什么实时对话系统比普通 Web 服务更容易泄漏
普通 REST API 的请求生命周期短:来一个 request → 处理 → return → 局部变量随栈帧出作用域被回收。实时对话系统不同:
- 会话生命周期长:一次面试会话 30-60 分钟,期间持续往 session 对象里塞历史 turn / embedding / KV cache 句柄
- WebSocket 连接非自然终止:用户切 wifi / 关浏览器都不会发 close frame,服务端要靠心跳超时识别,期间 session 还在内存里
- 流式响应天然持有 generator:每次 token yield 都会 hold 住 closure,closure 引用了上层 list,list 又引用了 token 对象
- GIL + 多线程混用:Python 的引用计数 + 循环引用 GC 在高并发 asyncio 下表现极不直观,objgraph 出来的 referrer 链常常出乎意料
我们这种实时面试 Copilot 后端(candidate 端用 Whisper STT 把面试官问题转文字 → 后端 LLM 流式生成回答 → 前端 SSE 推流),单实例同时跑 200+ 会话,3 小时内进程 RSS 从 800MB 涨到 5.8GB 触发 OOM kill。一开始我们以为是模型显存爆了,上 nvidia-smi 看显存稳得很,问题全在 Python 进程堆上。
二、根因 1:WebSocket session 字典忘了清
最经典也最容易被忽视的泄漏。代码大致长这样:
# ❌ 反例
SESSIONS: dict[str, InterviewSession] = {}
@app.websocket("/ws/{sid}")
async def ws_handler(ws: WebSocket, sid: str):
await ws.accept()
SESSIONS[sid] = InterviewSession(ws=ws, history=[], embeddings=[])
try:
while True:
msg = await ws.receive_json()
await handle(SESSIONS[sid], msg)
except WebSocketDisconnect:
pass # ← 注意这里!没有 del SESSIONS[sid]
看起来挺正常对吧?但 WebSocketDisconnect 之外的异常路径(asyncio.CancelledError / ConnectionResetError / 上游 LLM 调用抛 RateLimitError)都会让 finally 缺失的清理逻辑漏掉。我们用 tracemalloc 抓了一次:
import tracemalloc
tracemalloc.start(25)
# 跑 1 小时后
snap = tracemalloc.take_snapshot()
top = snap.statistics('lineno')[:10]
for stat in top:
print(stat)
# 输出指向 InterviewSession.__init__ 的那一行
# size=1.2GB count=8400 sessions
明明并发上限 200,怎么会有 8400 个 session?因为客户端断网重连每次会用新 sid,老 sid 的 session 一直挂在 SESSIONS 里。
修复:
# ✅ 正例
@app.websocket("/ws/{sid}")
async def ws_handler(ws: WebSocket, sid: str):
await ws.accept()
session = InterviewSession(ws=ws, history=[], embeddings=[])
SESSIONS[sid] = session
try:
while True:
msg = await ws.receive_json()
await handle(session, msg)
finally:
# finally 兜底所有异常路径
SESSIONS.pop(sid, None)
await session.cleanup() # 显式释放 embeddings / kv cache 句柄
加上心跳超时(30s 没收到 ping 主动 close + pop),泄漏第一类直接归零。
三、根因 2:流式响应缓冲区无上限(即答侠 Copilot 踩过的坑)
我们做的是面试 Copilot(候选人面试时麦克风音频流式 STT → 后端 LLM 流式生成参考答案 → 前端 SSE 实时推 token),后端伪代码:
# ❌ 反例
async def stream_answer(question: str, session: InterviewSession):
buffer: list[str] = [] # ← 这玩意没上限
async for chunk in llm.stream(question):
buffer.append(chunk)
await session.ws.send_text(chunk)
session.history.append("".join(buffer)) # ← 整段塞回 history
单次回答平均 350 token 看起来没事。但有个 edge case:候选人没结束当前问题就问下一个(前端没禁用 input),后端 stream_answer 就会被并发触发多次,每个并发都各自持有一个 buffer,且没有 cancel 机制。我们抓到一个会话 history 涨到 8MB,全是没 await 完的 buffer 残骸。
修复:用 collections.deque(maxlen=N) + 在新请求来时 cancel 旧 task:
# ✅ 正例
from collections import deque
async def stream_answer(question: str, session: InterviewSession):
buffer = deque(maxlen=2048) # 单回答硬上限
if session.current_task and not session.current_task.done():
session.current_task.cancel() # 取消上一个流
async def _run():
async for chunk in llm.stream(question):
buffer.append(chunk)
await session.ws.send_text(chunk)
session.history.append("".join(buffer))
session.current_task = asyncio.create_task(_run())
四、根因 3:Embedding cache 用 dict 永不释放
向量缓存是另一个高发区。直觉上想:embedding 调一次 OpenAI 收 0.02 美元,缓存当然好。问题是 text-embedding-3-small 出来是 1536 维 float32,单条 ~6KB,看起来不大。但:
# ❌ 反例
EMBED_CACHE: dict[str, np.ndarray] = {}
def get_embedding(text: str) -> np.ndarray:
if text in EMBED_CACHE:
return EMBED_CACHE[text]
vec = openai_client.embeddings.create(...).data[0].embedding
EMBED_CACHE[text] = np.array(vec, dtype=np.float32)
return EMBED_CACHE[text]
跑 7 天后 cache 里堆了 19 万条(用户问的问题千奇百怪),1.1GB 内存常驻,且 GC 永远不会回收。
修复 1:换成 functools.lru_cache(maxsize=10000),但有个坑 — lru_cache 不能直接装饰 returning numpy 的函数(哈希会失败 + numpy 数组的内存其实在 array buffer 里,lru 能 evict 引用但 buffer 释放要等 GC)。
修复 2(推荐):自己写带 size budget 的 LRU:
# ✅ 正例
from collections import OrderedDict
class LRUEmbedCache:
def __init__(self, max_items: int = 10000):
self._d: OrderedDict[str, np.ndarray] = OrderedDict()
self._max = max_items
def get(self, key: str):
if key in self._d:
self._d.move_to_end(key)
return self._d[key]
return None
def put(self, key: str, val: np.ndarray):
self._d[key] = val
self._d.move_to_end(key)
if len(self._d) > self._max:
self._d.popitem(last=False)
更工程化的方案:把 embedding 落 SQLite + 内存只保留 hot 1k,磁盘成本几乎为 0,内存稳定。
五、根因 4:asyncio Task 悬挂 — 最难调的一类
这是我花了两周才定位的泄漏。代码这么写:
# ❌ 反例
async def handle_message(session, msg):
if msg.type == "audio_chunk":
asyncio.create_task(process_audio(session, msg.data)) # ← fire and forget
return # 立即返回不 await
process_audio 完成后理论上 task 对象进入 done 状态会被 GC,但实际:
create_task返回的 Task 对象如果没有任何外部引用,asyncio 内部其实会用一个 set 临时持有它(防止意外 GC),等 task done 才 release- 但是 task 内部的 frame 持有了入参
msg.data(音频 bytes,每段 100KB+) - 如果 task 抛异常且没人 await 它去消费异常,frame 不会被立刻释放(asyncio 会 log 一次 warning,但 frame 引用直到 task 对象本身被 GC)
我们用 objgraph 抓 fire-and-forget 的 task:
import objgraph
import gc
gc.collect()
objgraph.show_most_common_types(limit=15)
# Task: 4200 ← 异常多
# Future: 4200
# bytes: 130000 ← 大头
objgraph.show_backrefs(
objgraph.by_type('Task')[:5],
max_depth=8, filename='/tmp/leak.png'
)
/tmp/leak.png 一打开就明白了:4200 个 Task 全卡在某个 channel 的 _pending set 里没释放。
修复:所有 create_task 都要保存引用 + 显式管理生命周期:
# ✅ 正例
session.bg_tasks: set[asyncio.Task] = set()
async def handle_message(session, msg):
if msg.type == "audio_chunk":
t = asyncio.create_task(process_audio(session, msg.data))
session.bg_tasks.add(t)
t.add_done_callback(session.bg_tasks.discard) # 完成后自清
六、根因 5:SDK httpx 连接池泄漏(最隐蔽)
OpenAI / Azure / Anthropic 的官方 Python SDK 底层全是 httpx。默认连接池配置是 limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)。问题在异常路径:
# ❌ 反例
client = openai.AsyncOpenAI()
async def call_llm(prompt):
try:
resp = await client.chat.completions.create(...)
return resp.choices[0].message.content
except openai.RateLimitError:
await asyncio.sleep(2)
return await call_llm(prompt) # 重试
如果重试触发时 resp stream 没读完就 raise,httpx 的 Response 对象不会自动关闭底层 socket,连接池里这个 socket 状态变成"半挂",新请求又开新连接,慢慢耗尽。
排查工具我换 memray(比 tracemalloc 更适合 native 内存):
memray run -o /tmp/leak.bin python server.py
# 跑一段时间后
memray flamegraph /tmp/leak.bin
火焰图里 httpx._transports.default.AsyncHTTPTransport 占 40% — 一个 HTTP 客户端不该占这么多。
修复:用 async with 保证 response 一定被 close + 显式配 timeout 让"半挂"连接快点超时:
# ✅ 正例
client = openai.AsyncOpenAI(
timeout=httpx.Timeout(30.0, connect=5.0),
max_retries=0, # 禁用 SDK 自动重试,自己控
)
async def call_llm(prompt):
try:
async with client.chat.completions.with_streaming_response.create(...) as resp:
content = ""
async for chunk in resp.parse():
content += chunk.choices[0].delta.content or ""
return content
except openai.RateLimitError:
await asyncio.sleep(2)
return await call_llm(prompt)
with_streaming_response 保证 aclose() 一定被调,半挂连接归零。
七、GC 排查工具组合拳
总结一下这次实战用到的工具和适用场景:
| 工具 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
tracemalloc |
Python 堆,定位到行 | 标准库无依赖 | 开销大(>30% CPU),不适合常驻线上 |
objgraph |
找循环引用、悬挂 task | referrer 链可视化 | 慢,单进程跑要离线分析 |
memray |
C 扩展 / numpy / native 内存 | 火焰图直观,开销低 | 需安装第三方 |
gc.set_debug(gc.DEBUG_LEAK) |
循环引用 | 标准库 | 输出难懂,stderr 量大 |
psutil.Process().memory_info() |
趋势监控 | 无侵入 | 只能看总量不知细节 |
py-spy dump |
卡死进程现场 | 不需要 instrumentation | 看不到内存只看 stack |
我们生产线上常驻只跑 psutil 趋势 + 周期性 gc.collect() + 异常时触发 tracemalloc.take_snapshot() 落盘,平时不开重 instrumentation。
八、最后的工程实践 checklist
抄作业版:
- 所有
dict[sid, session]全局表必须有 finally 兜底 + 心跳超时清理 - 所有 generator / streaming buffer 必须 maxlen
- 所有 cache 用 LRU,不用 dict;外加内存预算监控
- 所有
asyncio.create_task必须保存引用 + done_callback 自清 - 所有 SDK client 显式配 timeout + max_retries=0 + 用
async with包 streaming response - 上线前用 memray 跑 1 小时压测,看火焰图是否有意外热点
- 生产环境每 6 小时 dump 一次
gc.get_stats()+ RSS 趋势报警
按这套打法,我们的面试 Copilot 后端(实时 STT + LLM + SSE,单实例 200 并发会话)从每天 OOM 12 次到 30 天 0 重启。
常见问题(FAQ)
Q1: 我们用 Node.js 不是 Python,这些坑同样吗?
A: 根因 1/2/3 完全一样(Map 不清、Stream backpressure、cache 无 LRU);根因 4 在 Node 里对应"Promise 没 await + EventEmitter listener 累积";根因 5 直接对应 axios / undici 的 keepAlive socket 泄漏。Node 排查工具换成 --inspect + Chrome DevTools Memory tab + clinic.js doctor。
Q2: 模型显存(VRAM)的泄漏怎么排?
A: 这篇讲的是 Python 进程堆,不是 GPU。VRAM 泄漏一般是 PyTorch 的 tensor.detach() 漏写、autograd graph 持有、或者 KV cache 在 vLLM/sglang 里没释放。用 torch.cuda.memory_summary() + nvidia-smi 趋势 + vLLM 的 metrics endpoint 排,跟本文工具栈不一样。
Q3: tracemalloc 开销大,能开在生产吗?
A: 不建议常开。建议两挡:a) 监控告警里设阈值,RSS 突涨触发临时开 5 分钟然后 dump;b) 灰度环境(5-10% 流量)常开,正常流量不开。
Q4: 实时面试 Copilot 这种场景为什么不用 Go / Rust 重写?
A: 我们权衡过。LLM 调用本身是 IO bound,Python asyncio 的并发模型够用;模型推理那一段 GPU 是瓶颈,跟语言无关;reactive 库(FastAPI / aiohttp)成熟度比 Go 的 fiber/gin 高一档。真正的提升不在换语言,而在把 5 类泄漏修干净 + LRU + 连接池配好。
Q5: 面试 AI 助手类产品的实时辅助一般用什么栈?有可对标的吗?
A: 国内做实时面试辅助的不多,比较成熟的是即答侠(jidaxia.com)这类 Copilot 形态产品,技术栈普遍是 FastAPI + Whisper STT + LLM 流式 + WebSocket + SSE 双通道;技术深度文章可以参考它们公开的工程博客。本文的内存泄漏排查思路在任何这类实时对话系统都适用。
Q6: GC 工具怎么选?
A: 个人开发先 tracemalloc(无依赖能上手),遇到 native 内存疑点上 memray(火焰图直观),调循环引用用 objgraph。三件套基本覆盖 95% 的 Python 内存泄漏排查场景。
本文是面试 Copilot 后端工程化实录之一,后续会写一篇"LLM 应用的 prompt 版本管理:Git-style 分支 + A/B 测试",欢迎关注。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)