目录


前言

我开始独立开发一个水质三维智能监测与分析系统(项目代号"巡深",GitHub 仓库)。这个项目的核心逻辑并不复杂——用户上传 CSV 水质采样数据后,系统进行插值、可视化、异常检测、趋势预测、污染溯源,并提供 AI 驱动的对话分析和知识库问答。

但从 AI 工程化的角度看,它恰好覆盖了大模型应用开发的三个核心挑战:

  1. Agent 工具调用:如何让 LLM 自主决定调用哪些工具、按什么顺序调用?

  2. RAG 检索优化:如何在保证召回率的前提下,控制检索延迟和 token 成本?

  3. 基础设施防御:当依赖的向量数据库不可靠时,如何保证核心链路不被中断?

这三个问题,每一个单独拿出来都是 AI 工程化领域的热点话题。而把它们放在一个真实的项目中一并解决,收获的不仅是代码,更是一套可以迁移到其他项目的工程方法论。

项目的 AI 功能并非一蹴而就,而是经历了四个阶段的系统性演进——从第一阶段的对话助手与邮件发送,到第二阶段引入 AI 图表解读和双任务对比分析,再到第三阶段的趋势预测、分级预警和污染溯源,最后在第四阶段落地 RAG 知识库问答系统。四个阶段共 60 项扩展任务全部完成,每一阶段都建立在前一阶段的基础设施之上。这种渐进式的架构演进,比一次性堆砌功能更能体现工程化的思考——如何在保持系统稳定的前提下持续交付 AI 能力。

整个系统的规模也决定了技术选型的边界:12 张 MySQL 业务表、3 个 Milvus 向量集合、60+ 个 API 端点、18 个前端页面视图、80 项 pytest 自动化测试。这个体量决定了——引入一个 20+ 依赖的 Agent 框架不是"拥抱生态",而是给自己找麻烦。

LangChain 的 AgentExecutor 对流式 tool_call delta 支持不完整、回调钩子过于复杂、黑盒的执行过程也难以做精细的超时控制。对于一个只有 9 个工具的系统来说,自研约 300 行代码的投入,换来的是对每个环节的完全掌控。

这篇文章就是这些实践的系统复盘——不是为了证明"自研比开源好",而是希望分享在真实项目中做技术决策时的权衡过程和踩坑经验。


一、自研 Agent 框架:架构设计、流式难题与安全护栏

1.1 选型决策:三方案量化对比

Agent 框架选型是一个典型的"简单方案 vs 正确方案"问题。以下是我当时做的评估:

评估维度权重:可控性(30%) > 与现有架构的兼容性(25%) > 开发成本(20%) > 生态完善度(15%) > 社区支持(10%)
评估维度 LangChain AgentExecutor OpenAI Assistants API 自研(最终选择)
可控性 ★★☆☆☆ 工具执行过程是黑盒,回调钩子复杂 ★★☆☆☆ 服务端托管,超时/重试不可定 ★★★★★ 每环节完全可控
FastAPI 兼容性 ★★☆☆☆ async 兼容性需额外处理 ★★★☆☆ API 调用天然兼容 ★★★★★ 原生 async/await
流式 tool_call 支持 ★☆☆☆☆ AgentExecutor 流式有已知 bug ★★★☆☆ 支持但粒度粗 ★★★★★ 精确控制 delta 累积
开发成本 ★★★★☆ 上手快,改造成本高 ★★★★★ 零自建成本 ★★★☆☆ ~300 行,一次性投入
依赖体积 ★☆☆☆☆ 20+ 依赖 ★★★★★ 零额外依赖 ★★★★★ 零额外依赖
综合评分 2.1/5 3.0/5 4.3/5

结论:对于一个只有 9 个工具的系统,自研是最优解。但如果工具增长到 50+ 或需要复杂的多 Agent 协调(如 CrewAI),LangChain/LlamaIndex 的生态优势就会显现。

1.2 架构设计:三层分离 + 装饰器注册

我把 Agent 框架拆为清晰的三个层次,每层只关心一件事:

┌──────────────────────────────────────────────┐
│  规划层 (Planning Layer)                     │
│  PLANNING_PROMPT — 4步 CoT 引导 LLM 制定计划  │
│  职责:意图分解、工具选择决策                  │
├──────────────────────────────────────────────┤
│  执行层 (Execution Layer)                    │
│  _run_tool_loop() — 循环调度 + 超时 + 重试    │
│  职责:工具调用编排、消息管理、结果聚合        │
├──────────────────────────────────────────────┤
│  工具层 (Tool Layer)                         │
│  @register 装饰器 + TOOL_MAP 注册表           │
│  职责:工具的具体实现(独立于 Agent 框架)      │
└──────────────────────────────────────────────┘

核心:@register 装饰器模式

整个框架的心脏是一个只有 7 行的注册装饰器:

# registry.py — 核心 47 行
​
TOOL_MAP = {}
MAX_RETRIES = 1  # 工具执行失败时的重试次数
​
def register(name: str):
    """装饰器:将工具函数注册到全局 TOOL_MAP"""
    def deco(fn):
        TOOL_MAP[name] = fn
        return fn
    return deco
​
async def execute_tool(name: str, ctx: dict) -> str:
    """统一执行入口,带重试和错误兜底"""
    fn = TOOL_MAP.get(name)
    if not fn:
        return json.dumps({"error": f"Unknown tool: {name}"})
    return await _safe_execute(fn, ctx, name)
​
def is_write_tool(name: str) -> bool:
    """写操作判断 — Agent 安全护栏的关键拦截点"""
    return name in ("generate_report", "export_anomalies_csv", "send_report_email")

工具实现者只需要添加一个装饰器,框架自动完成注册和生命周期管理:

# read_tools.py — 读工具实现
​
@register("get_anomalies")
async def _get_anomalies(ctx: dict) -> str:
    """查询当前任务的异常点列表,返回最多 30 条"""
    db = ctx["db"]
    task_id = ctx["task_id"]
    indicator = ctx.get("arguments", {}).get("indicator")
​
    query = select(AnomalyRecord).where(AnomalyRecord.task_id == task_id)
    if indicator:
        query = query.where(AnomalyRecord.indicator == indicator)
​
    result = await db.execute(query.limit(30))
    rows = result.scalars().all()
    return json.dumps([{
        "lon": r.lon, "lat": r.lat, "depth": r.depth,
        "indicator": r.indicator, "value": r.value,
        "method": r.detection_method,
    } for r in rows], ensure_ascii=False)
​
​
@register("predict_trend")
async def _predict_trend(ctx: dict) -> str:
    """预测水质指标未来趋势,含 95% 置信区间"""
    args = ctx["arguments"]
    ps = PredictionService()
    result = ps.predict(
        reservoir=ctx["reservoir_name"],
        indicator=args["indicator"],
        horizon=args.get("horizon", 3)
    )
    return json.dumps(result, ensure_ascii=False)

9 个工具按类型分布在 3 个文件中——read_tools.py(5 个读工具:异常查询、统计分析、趋势预测、污染溯源、相似案例检索)、write_tools.py(3 个写工具:报告生成、异常导出、邮件发送)、knowledge_tool.py(1 个知识库检索工具)。__init__.py 按严格的三阶段导入顺序组织——先加载 registry 基础设施,再触发工具文件的 @register 装饰器,最后加载 definitions。这确保了 TOOL_MAP 在任何工具调用前已经被完全填充。

一套框架,两类对话

这套 Agent 框架同时驱动了项目中的两种 AI 对话模式,它们共享同一套 SSE 流式基础设施,但数据来源和工具权限完全不同:

维度 任务 Agent 对话 知识库 RAG 对话
数据来源 MySQL 任务监测数据(通过工具查询) Milvus 向量检索 + MySQL 文档元数据
工具权限 全部 9 个工具(含写操作) 仅知识库检索工具(只读)
上下文构建 任务统计 + 异常分布 + 历史案例 检索文档 + QA 对 + 持久记忆
安全策略 四层护栏(含写操作二次确认) 纯只读,写操作不暴露
会话存储 chat_messages 表(按 task_id + session_id 分区) knowledge_chat_messages 表(按 session_id 分区)

这种"一套框架、两类对话"的设计避免了为每种对话模式单独维护一套工具调用逻辑。工具注册表是共享的,但通过 is_write_tool() 门控,知识库对话永远看不到写操作工具,从架构层面杜绝了误操作的可能。

1.3 流式难题:tool_call delta 增量累积

这是自研框架真正比 LangChain 做得更好的地方,也是开发中遇到的最大技术挑战。

问题:非流式模式下,一次 LLM 请求返回完整的 tool_calls 数组——每个调用的 idnamearguments 都在一个 JSON 对象中。但在流式模式(stream=True)下,LLM 逐 token 发送 delta,且是多路并行发送的——可能有 2 个 tool_calls 在同一批 chunk 中交错出现。每个 chunk 只包含当前 tool_call 的增量片段

Chunk 1:  delta.tool_calls[0] → {index: 0, id: "call_abc", function: {name: "get_anomalies"}}
Chunk 2:  delta.tool_calls[0] → {index: 0, function: {arguments: '{"indicator"'}}
Chunk 3:  delta.tool_calls[0] → {index: 0, function: {arguments: ': "chlorophyll"}'}}
Chunk 4:  delta.tool_calls[1] → {index: 1, id: "call_def", function: {name: "predict_trend"}}
Chunk 5:  delta.tool_calls[1] → {index: 1, function: {arguments: '{"indicator"'}}
...

注意:idname 只在首次 delta 中出现,后续 delta 仅有 arguments 片段。而且多个 tool_calls 通过 index 字段区分,可能在不同 chunk 中交错。

解决方案:按 index 分组累积,id/name 只在首次出现时设置,arguments 持续拼接:

# chat_service.py: _run_tool_loop_stream 中的关键逻辑
​
tool_call_acc: dict[int, dict] = {}  # index → {id, name, arguments}
​
async for chunk in stream_resp:
    delta = chunk.choices[0].delta
​
    # 内容 token 直接流式输出(提供即时反馈)
    if delta.content:
        content_parts.append(delta.content)
        yield {"phase": "streaming", "delta": delta.content}
​
    # tool_call delta 按 index 分组累积
    if delta.tool_calls:
        for tc in delta.tool_calls:
            idx = tc.index
            if idx not in tool_call_acc:
                tool_call_acc[idx] = {"id": "", "name": "", "arguments": ""}
​
            # id 和 name 只在首次 delta 中出现
            if tc.id:
                tool_call_acc[idx]["id"] = tc.id
            if tc.function and tc.function.name:
                tool_call_acc[idx]["name"] = tc.function.name
​
            # arguments 逐步拼接
            if tc.function and tc.function.arguments:
                tool_call_acc[idx]["arguments"] += tc.function.arguments

为什么 LangChain 在这件事上做得不好:LangChain 的 AgentExecutor 最初是为非流式模式设计的,流式支持是后来通过 astream_events() 追加的功能。它在处理 tool_call delta 时会把完整 arguments 一次性返回而不是增量累积,这导致用户看到的是"卡一下然后突然冒出所有工具调用",而非实时渐进式的体验。

多阶段可见性的价值:用户在 Agent 执行过程中不应该盯着空白页面等待。我设计了一套多阶段 SSE 事件协议:

// 阶段 1:开始规划
{"phase": "thinking", "message": "正在分析 (第1轮)..."}
​
// 阶段 2:内容流式输出(如果有非工具调用的思考)
{"phase": "streaming", "delta": "让我先查看一下当前的异常点分布..."}
​
// 阶段 3:工具开始执行
{"phase": "tool_start", "tool": "get_anomalies", "args": "{\"indicator\": \"chlorophyll\"}"}
​
// 阶段 4:工具执行完成
{"phase": "tool_result", "tool": "get_anomalies", "summary": "发现 12 个异常点,分布在 3 个深度层"}
​
// 阶段 5:综合分析
{"phase": "thinking", "message": "正在汇总分析结果..."}
​
// 阶段 6:最终答案流式输出
{"phase": "streaming", "delta": "根据分析,当前水库叶绿素异常主要集中在..."}

这套协议让前端可以针对每个阶段做不同的 UI 渲染——loading skeleton → 工具执行卡片 → 最终答案的渐进展示。从用户体验角度,这比静默等待数十秒后弹出完整答案要好得多。

1.4 安全护栏:四层纵深防御

Agent 应用的安全不是一个点,而是一条链。我设计了四层递进的安全护栏:

第一层:Prompt 约束(预防层)
  ├─ PLANNING_PROMPT 的行为规则:"写操作需用户明确请求"
  ├─ 角色设定中的事实约束:"不编造数据"、"不知道就说不知道"
  └─ 工具 description 的场景边界:引导 LLM 在正确场景使用正确工具
​
第二层:工具级权限控制(拦截层)
  ├─ is_write_tool(name) 判断是否有副作用
  ├─ 调用方可基于此做前置拦截(如弹出二次确认对话框)
  └─ 三个写操作被显式标记:generate_report, export_anomalies_csv, send_report_email
​
第三层:执行级限制(控制层)
  ├─ MAX_TOOL_ROUNDS = 3:防止无限工具调用循环
  ├─ MAX_RETRIES = 1:限制工具重试,避免雪崩
  └─ MILVUS_TIMEOUT = 10s:每个工具调用有明确的时间上限
​
第四层:审计层(可追溯层)
  ├─ llm_call_logs 表:每次 LLM 调用全量记录(model, tokens, tools_called, latency)
  └─ 工具执行日志:每个工具的调用参数、返回结果、是否命中重试

这四层不是"都实现就安全了",而是每一层都在上一层的盲区提供额外防护。Prompt 约束可能被绕过(prompt injection),但工具级权限是不可绕过的代码判断。工具执行失败可能被重试掩盖,但执行级限制确保不会无限重试。最后,审计层让所有操作可追溯——即使前面三层都出问题了,你至少知道发生了什么。

1.5 遇到的问题与解决方案

问题 1:PLANNING_PROMPT 的注入位置

最初方案是把 PLANNING_PROMPT 放在 messages[-1](用户消息之前),结果 LLM 倾向于"忘记"系统提示。排查后发现,OpenAI API 的 system role 有特殊权重——模型训练时被强化为最高优先级指令。因此改为直接追加到 system message 的末尾:

# 错误做法:在 user message 前面插入
current_messages.insert(-1, {"role": "system", "content": PLANNING_PROMPT})
​
# 正确做法:追加到已有的 system message
if current_messages and current_messages[0]["role"] == "system":
    current_messages[0]["content"] = (
        current_messages[0]["content"] + "\n\n" + PLANNING_PROMPT
    )

问题 2:JSON 参数解析的容错

LLM 生成的 function.arguments 有时不是合法的 JSON——多了一个逗号、少了一个引号。最开始直接 json.loads() 会报错中断整个对话。修复方式是加入容错处理:

try:
    args = json.loads(tc.function.arguments)
except (json.JSONDecodeError, TypeError):
    args = {}  # 解析失败时传空参,工具自行处理默认值

问题 3:DeepSeek 的 reasoning_content 冲突

DeepSeek V4 默认开启 thinking mode,会在消息中插入 reasoning_content 字段。当这些消息被重新放入多轮对话的 messages 列表时,API 会报 400 错误(reasoning_content 不能和 thinking mode disabled 同时出现)。解决方案有两个层面:

# 层面 1:对于不需要深度推理的场景,禁用 thinking mode
extra_body = {"thinking": {"type": "disabled"}}
​
# 层面 2:在携带 reasoning_content 的消息回传时,跳过 extra_body
# llm_client.py 中的说明:
# "若消息列表中已包含 reasoning_content(如多轮工具调用回传),
#  调用方应跳过 extra_body,避免与 reasoning_content 冲突导致 400 错误。"

二、RAG 四层检索:每一层的实现、调参和效果

2.1 整体架构与设计原则

项目中的知识库包含水质标准文档、指标说明、检测方法和常见问题诊断,支持 TXT、PDF、Word、Excel 四种格式的文档上传。用户通过自然语言查询这些知识,系统返回最相关的内容并交给 LLM 生成回答。

双 Collection 的架构决策

设计之初就面临一个选择:所有知识放在一个 Collection 还是拆分为两个?最终我选择了后者——将原文切片(knowledge_chunks)和 LLM 自动生成的问答对(knowledge_qa)分别存入不同的 Milvus Collection:

维度 knowledge_chunks knowledge_qa
索引类型 IVF_FLAT, IP (内积) IVF_FLAT, IP (内积)
向量维度 1024 (text-embedding-v3) 1024 (text-embedding-v3)
匹配优势 语义匹配 — "水库水质变差"能匹配到"水体富营养化" 直接问答匹配 — "叶绿素a超标怎么办"精准命中相似问题
元数据 文档来源、分类、序号 关联的 chunk_id、生成时间
混合检索 Dense(COSINE) + Sparse(BM25) Dense(COSINE) + Sparse(BM25)

为什么要分两个 Collection? 因为搜索目标和匹配方式不同。原文切片更适合"我需要了解某方面的知识"这类开放式查询,而 QA 对更适合"我有个具体问题"这类直接问答。两者并行检索后通过语义去重合并,既避免了"只有原文回答太生硬"的问题,也避免了"只有 QA 对覆盖不全"的问题。

我设计了四层检索流水线,每一层解决一个特定问题:

输入 Query
  │
  ├─ L0 缓存层 ────── Redis 检索缓存(5min TTL,版本化失效)
  │   命中 → 直接返回,跳过后续全部流程
  │
  ├─ L0 查询改写层 ── 条件触发 LLM 改写(三段式:跳过/评估/必改写)
  │   将自然语言 query 扩展为多个检索角度
  │
  ├─ L1 双路混合检索 ─ Chunks ∥ QA Pairs,asyncio.gather 并行
  │   Dense(Cosine) + Sparse(Milvus 原生 BM25) → RRF(k=60) 融合
  │
  ├─ L1.5 语义去重 ─── 向量余弦相似度去重(QA ↔ Chunk)
  │   阈值 0.6:高于此值视为语义重复
  │
  └─ L2 余弦精排 ──── Query 向量 vs 候选文档向量,Top-5 阈值 0.65
      低于阈值的全部过滤,如果过滤后为空则返回最高分 Top-K 作为 fallback

核心设计原则

  1. 每一步可独立开关和调参:全流程 timing 字典记录每层的毫秒级耗时,任何一层的参数调整不会影响其他层

  2. 向量复用_should_rewrite() 方法返回的 query 向量可以直接传给 search(),避免同一 query 被两次 embed

  3. 失败不传播:任何一层失败都有降级路径,不会因为一层挂了导致整体失败

  4. 完整的知识生命周期管理:检索只是知识库的一部分。系统还提供了 20+ 个管理端点——文档上传与解析、知识块 CRUD、批量删除、QA 对重新生成、向量索引重建、配置热更新、知识库导出(PDF/DOCX)——覆盖了知识从入库到检索到维护的完整生命周期

2.2 L0 缓存层:版本化失效策略

缓存看似简单,实则有几个细节决定了效果:

为什么是 5 分钟? 不是为了"缓存 5 分钟",而是根据知识库的变更频率来定。水质标准文档不是实时更新的——通常几天甚至几周才更新一次。5 分钟 TTL 足够覆盖同一用户在同一会话中的重复查询(比如用户追问细节时再次触发检索),同时又不会在文档更新后长期返回过期结果。

版本化失效:缓存 key 的设计包含了知识库内容的版本 hash。当文档被新增或删除时,_invalidate_result_cache() 更新全局版本号,所有旧缓存自动失效——不需要等到 5 分钟 TTL 自然过期。

# cache.py 的版本化设计
CACHE_PREFIX = "kb:search"
_version_key = "kb:version"
​
async def _get_cached_result(self, query, category):
    version = await redis.get(_version_key) or "0"
    key = f"{CACHE_PREFIX}:{version}:{self._query_hash(query, category)}"
    cached = await redis.get(key)
    return json.loads(cached) if cached else None

实际效果:在开发测试中,重复查询的缓存命中率约 30%(同一会话内的追问和细化查询)。5min TTL + 版本化失效的组合,既保证了缓存的有效性,又避免了过期数据问题。

2.3 L0 查询改写层:三段式条件触发

查询改写是 RAG 系统中一个经典的"效果 vs 成本"权衡点——每次改写都是一次 LLM 调用,有 token 成本(约 200-500 tokens)和延迟开销(约 0.5-1.5s)。

我的做法是不盲目改写,而是按查询特征决定

async def _should_rewrite(self, query: str, category: str | None):
    """
    三段式条件触发:
    1. 短查询(≤10字):跳过改写 — 关键词已足够精准
    2. 长查询(≥30字):必然改写 — 自然语言查询改写能显著提升召回
    3. 中等长度(11-29字):检索质量评估判断 — 只有检索结果不够好才改写
    """
    qlen = len(query)
​
    # 段 1:短查询 — "叶绿素超标"、"pH标准限值"
    if qlen <= 10:
        return False, None  # 跳过改写,节省 100% LLM 调用
​
    # 段 2:长查询 — "我想了解一下如果水库中叶绿素a的含量持续偏高..."
    if qlen >= 30:
        return True, None   # 必改写,投入产出比高
​
    # 段 3:中等查询 — 跑一次快速向量检索来判断
    vec = await self.embedder.embed_single(query)
    results = await self._dense_only_search(vec, top_k=3, category=category)
    if not results:
        return True, vec  # 完全没结果 → 必须改写
    scores = [r.get("score", 0) for r in results if r.get("score")]
    if not scores:
        return True, vec
    # top-3 平均分 < 0.55 说明检索质量不够,需要改写
    return sum(scores) / len(scores) < 0.55, vec

关键细节:向量复用

注意返回值中包含 vec——如果中等查询触发了快速检索,返回的向量可以被 search() 方法直接复用,避免同一 query 被 embed 两次。这是一个微小的优化,但在高并发场景下每节省一次 embedding API 调用就是一次成本降低。

改写 Prompt 的设计:改写不是为了生成"更长的 query",而是生成"更适合知识库检索的 query":

REWRITE_PROMPT = """你是一个水质专业领域的查询改写助手。
将用户原始问题改写为 {count} 条更适合知识库检索的查询语句。
​
改写规则:
1. 保留原始问题的核心意图,不要偏离
2. 从不同角度改写:标准限值角度、成因分析角度、治理建议角度
3. 每条改写后的查询应简洁(20字以内),使用专业术语
4. 生成 {count} 条改写结果,用 JSON 返回
​
原始问题:{question}
​
返回 JSON 格式:{"queries": ["改写1", "改写2", ...]}
只返回 JSON,不要其他内容。"""

改写结果会与原始 query 合并去重(通过长度 ≤100 字符过滤掉异常输出),取前 KB_QUERY_REWRITE_COUNT(3) 条。每条改写后的 query 都会独立走后续的 L1 双路检索。

实际效果

  • 短查询(约 40% 的查询):跳过改写,零额外成本

  • 中等查询(约 35%):通过检索质量评估,约 60% 触发了改写(这些正是"检索不够好"的场景)

  • 长查询(约 25%):全部改写,显著提升召回率

  • 改写总触发率约 46%——这意味着约 54% 的查询不需要额外的 LLM 调用

2.4 L1 双路混合检索:为什么选择这个组合

混合检索的必要性:纯 Dense 向量检索在语义匹配上很强——"水库水质变差"能匹配到"水体富营养化"相关的文档。但在精确关键词匹配上表现不佳——用户搜索"GB 3838-2002",Dense 可能返回所有关于"水质标准"的文档,无法精准定位到这个具体标准号。

Sparse 检索(BM25)正好弥补这个盲区——基于词频的倒排索引可以精确命中"GB 3838-2002"这种专有名词。

为什么是 Milvus 原生 BM25 而不是 Elasticsearch

这是一个架构决策。如果用 ES + Milvus 两套数据库,架构变成:

Query → Embedding → Milvus(Dense) + ES(BM25) → 应用层 RRF 融合
                              ↑
                          需要维护两套数据的同步

而 Milvus 2.4+ 内置了 BM25 Function,入库时自动生成 sparse_vector,架构简化为:

Query → Embedding → Milvus(Dense + BM25) → Milvus 内置 RRF 融合

少了一个 ES 集群的部署、维护和数据同步逻辑。对于万级文档量的场景,Milvus 的 BM25 实现性能足够。

RRF 参数 k=60 的选择:RRF 的 k 值控制融合时对排名差异的敏感度。k 越小,排名靠前的文档在融合结果中越占优势;k 越大,两个检索源的排名权重越均衡。k=60 是一个经验值——在语义匹配(Dense 擅长的)和关键词匹配(Sparse 擅长的)之间取得平衡。我没有做系统的 k 值网格搜索(受限于缺少标注数据),但在这个项目中 k=60 的效果在人工抽检中表现合理。

Milvus 索引配置:两个知识库 Collection 均使用 IVF_FLAT 索引 + nlist=128 + 内积(IP)度量。选择 IVF_FLAT 而非 HNSW 的原因是——万级文档量下,IVF_FLAT 的检索精度和 HNSW 差距不大,但内存占用更低,且构建速度更快(这在重建索引时特别有意义)。nlist=128 对万级数据量是一个合适的聚类中心数——太少则召回不足,太多则搜索时需要扫描更多聚类单元反而变慢。1024 维向量来自 DashScope text-embedding-v3 模型(也支持 OpenAI text-embedding-3-small 的 1536 维,通过配置切换)。

实现代码

# milvus_service.py: hybrid_search_knowledge 的关键逻辑
​
def hybrid_search_knowledge(collection_name, query_text, dense_vec, top_k, category):
    client = _get_milvus_client()
​
    # Dense 检索请求 — 语义匹配
    dense_req = AnnSearchRequest(
        data=[dense_vec],
        anns_field="vector",           # 1024 维 embedding
        param={"metric_type": "COSINE"},
        limit=top_k * 2,               # 取 2x 候选供 RRF 融合
    )
​
    # Sparse 检索请求 — 关键词匹配(Milvus 原生 BM25 Function)
    sparse_req = AnnSearchRequest(
        data=[{sparse_vector_field: query_text}],  # Milvus 自动分词 + BM25 编码
        anns_field="sparse_vector",
        param={"metric_type": "BM25"},
        limit=top_k * 2,
    )
​
    ranker = RRFRanker(k=60)
    results = client.hybrid_search(
        collection_name=collection_name,
        reqs=[dense_req, sparse_req],
        ranker=ranker,
        limit=top_k,
    )
    return results

降级机制:如果混合检索因为 sparse_vector 字段缺失而失败(这通常发生在从旧版本 Collection 迁移时,旧 Collection 没有 sparse_vector 字段),系统自动降级为纯 Dense 向量检索,并记录 warning 日志提示管理员运行 init_knowledge_collections_v2 迁移。

async def _hybrid_search(self, query, top_k, collection, category):
    vec = await self.embedder.embed_single(query)
    try:
        return await call_milvus(hybrid_search_knowledge, ...)
    except Exception as e:
        if "sparse_vector" in str(e).lower():
            logger.warning("混合检索失败,降级纯向量检索(缺少 sparse_vector 字段)")
        # Fallback: 纯 Dense 向量检索
        return await call_milvus(search_knowledge, collection, vec, top_k, category)

2.5 L1.5 语义去重:从 SequenceMatcher 到向量余弦

演进过程:第一版去重用 difflib.SequenceMatcher 做文本相似度比较——这个方案有两个问题:

  1. O(len(text)) 的字符串比较开销:每个候选项需要和 QA 候选逐一比较,时间复杂度高

  2. 文本相似 ≠ 语义相似:两个用词不同但意思相同的段落——"叶绿素a超标通常由藻类爆发导致"和"高浓度叶绿素a往往指示着浮游植物的过度生长"——SequenceMatcher 判断为不相似,但语义上是重复的

改进方案:利用 Milvus 检索结果中已有的向量(_vector 字段),直接做余弦相似度计算——O(dim) 的浮点运算替代 O(len(text)) 的字符串比较:

@staticmethod
def _semantic_dedup(candidates: list[dict]) -> list[dict]:
    """向量余弦相似度去重:QA vs Chunk,阈值 0.6"""
    qa_candidates = [c for c in candidates if c.get("path") == "qa"]
    chunk_candidates = [c for c in candidates if c.get("path") != "qa"]
​
    if not qa_candidates or not chunk_candidates:
        return candidates
​
    threshold = 0.6  # KB_DEDUP_THRESHOLD
    kept = list(qa_candidates)  # QA 结果有更高优先级
​
    for chunk in chunk_candidates:
        chunk_vec = chunk.get("_vector")
        if not chunk_vec:
            kept.append(chunk)   # 无向量 → 保守保留
            continue
​
        is_dup = False
        for qa in qa_candidates:
            qa_vec = qa.get("_vector")
            if qa_vec and cosine_sim(chunk_vec, qa_vec) > threshold:
                is_dup = True
                break
​
        if not is_dup:
            kept.append(chunk)
​
    return kept

阈值 0.6 的选择:这个值是通过人工抽检 20 组 QA-Chunk 对来确定的。0.5 去重过于保守(导致很多语义不重复的内容被误删),0.7 去重力度不够(保留了明显重复的内容),0.6 是折中后的结果。

2.6 L2 余弦精排:阈值选择与 fallback 策略

async def _cosine_rerank(self, query_vec, candidates):
    # 第一步:确保所有候选都有向量(缺少的补 embedding)
    missing = [c for c in candidates if not c.get("_vector")]
    if missing:
        texts = [c.get("content", "")[:800] for c in missing]
        vecs = await self.embedder.embed(texts)
        for c, v in zip(missing, vecs):
            c["_vector"] = v
​
    # 第二步:计算余弦相似度
    for c in candidates:
        v = c.get("_vector")
        c["score"] = round(cosine_sim(query_vec, v), 4) if v else 0
​
    candidates.sort(key=lambda x: x.get("score", 0), reverse=True)
    return candidates[:KB_RERANK_TOP_K]  # Top 5

L2 之后的结果过滤

# search() 方法中
results = [r for r in candidates if r.get("score", 0) >= KB_RERANK_THRESHOLD]  # 0.65
if not results:
    results = candidates[:KB_RERANK_TOP_K]  # fallback:返回最高分的 Top-K

这是 RAG 系统中一个重要的设计细节——永远不要返回空结果。即使所有候选的相似度都低于 0.65,也要返回最高分的 5 条,让 LLM 自己判断是否可用。LLM 在 prompt 中有"知识库内容不足以回答时诚实说明"的约束,比在检索层直接返回空结果然后告诉用户"未找到相关信息"的用户体验要好得多。

阈值 0.65 的权衡:这个值与去重阈值 0.6 之间有 0.05 的间隔,避免去重和精排阈值打架。0.65 的选择思路:

  • 余弦相似度 ≥ 0.65:内容足够相关,可以信赖

  • 0.45-0.65:可能相关但不够精准,作为 fallback 传递给 LLM 自行判断

  • < 0.45:基本不相关(但在当前阈值设置下不会单独出现,因为 L1 已经做了一轮过滤)

2.7 文档入库的质量控制链

除了检索流程,入库环节的质量控制同样决定了最终效果。我的实现中包含了三级质量控制:

第一级:三级分层切片

def _split_text(self, text: str) -> list[str]:
    """
    三级切分策略:
    1. 按 ## 标题切 → 保留文档结构边界
    2. 按句子边界切(。!?\n)→ 保证语义完整性
    3. 超长句子硬切分 → 兜底处理
    + 滑动窗口 overlap(保持最后 64 字符作为下一 chunk 开头)
    """
    chunk_size = 512
    overlap = 64
​
    # 一级:Markdown 标题切分
    sections = re.split(r"\n(?=## )", text)
​
    for section in sections:
        # 二级:句子边界切分
        sentences = re.split(r"(?<=[。!?\n])", section)
​
        for sent in sentences:
            # 三级:超长句子定长硬切
            if len(sent) > chunk_size:
                for j in range(0, len(sent), chunk_size - overlap):
                    chunks.append(sent[j:j + chunk_size].strip())
            else:
                # 正常累积,达到 chunk_size 时 flush
                buffer += sent
                if len(buffer) >= chunk_size - overlap:
                    chunks.append(buffer.strip())
                    # 滑动窗口:保留末尾 overlap 字符
                    buffer = buffer[-overlap:]

512+64 的参数选择

  • chunk_size=512:text-embedding-v3 模型在 512 token 以内的编码效果最佳,更长的文本会导致语义信息被"稀释"

  • overlap=64:保证句子不会恰好被切在中间导致语义断裂。64 字符 ≈ 20-30 中文词,足够覆盖句子边界的过渡内容

  • 按标题切分优先于定长切分:一个完整的 section 不会因为定长切割被拆分到两个 chunk 中丢失上下文

第二级:SHA256 去重

# _insert_chunks_batch 中的去重逻辑
all_hashes = [_sha256(text) for text in chunks]
​
# 先查 MySQL 已有的 hash 值
existing_rows = await self.db.execute(
    select(KnowledgeChunk.chunk_id, KnowledgeChunk.content_hash)
    .where(KnowledgeChunk.content_hash.in_(all_hashes))
)
existing_map = {r.content_hash: r.chunk_id for r in existing_rows}
​
for text, vec in zip(chunks, embeddings):
    h = _sha256(text)
    if h in existing_map:
        ids.append(existing_map[h])  # 复用已有 ID
        continue
    # 新 chunk 正常写入

为什么不在 Milvus 层做去重?因为 MySQL 是 source of truth,而 Milvus 是"可以被重建的"——基于 MySQL 的 SHA256 去重保证了即使 Milvus 数据丢失重建,也不会产生重复数据。

第三级:Milvus 写入失败的补偿队列

if new_entries:
    try:
        call_milvus(insert_knowledge_chunks_batch, new_entries)
    except Exception as e:
        logger.error(f"Milvus batch chunk 写入失败,逐条入队: {e}")
        for entry in new_entries:
            kb_compensate_insert_chunk.delay(entry["chunk_id"])

MySQL 写入成功后,如果 Milvus 写入失败(比如 Milvus 服务挂了),MySQL 数据和 Milvus 数据就出现了不一致。解决方案是 Celery 补偿队列——失败时逐条入队,worker 在 Milvus 恢复后自动重试。delay() 是异步的,不会阻塞主流程。

2.8 各层效果消融分析

虽然项目缺少系统的 RAGAS 评估数据(这是后续计划),但我通过 Timing 日志和人工抽检做了一些定性分析。以下是我对每一层贡献度的判断:

层次 增加延迟 召回提升 精准度提升 整体贡献
L0 缓存 -50ms(命中时节省全部延迟) 约 30% 查询被拦截
L0 查询改写 +500-1500ms ★★★★☆ 显著(长查询尤其) ★★★☆☆ 46% 查询触发改写
L1 双路混合检索 +100-300ms ★★★★★ 从单路到双路提升最大 ★★★★☆ 检索覆盖面翻倍
L1.5 语义去重 +2-10ms ★★★☆☆ 减少冗余 约 15-20% chunks 被去重
L2 余弦精排 +50-200ms ★★★★☆ 精排提升 Top-5 精准度 Top-5 结果明显更相关

注:以上数据是基于开发环境测试的近似值,非生产环境的严格评测。


三、Milvus 踩坑全记录:从连接超时到数据一致性

如果在面试中被问到"你在项目中遇到的最大技术难点是什么",我的答案一定是 Milvus 的接入。不是因为它的 API 有多复杂——恰恰相反,API 本身很简单——而是因为底层库的不可靠性迫使你必须在应用层建立一整套防御体系

3.1 坑 1:pymilvus connect() 无限阻塞

问题发现:在开发环境测试时,有一次忘了启动 Milvus Docker 容器,然后发现整个 FastAPI 服务卡死了——任何请求都没有响应。排查后发现是 connections.connect() 在容器未启动时无限阻塞,阻塞了 FastAPI 的事件循环线程。

根因分析:pymilvus 的底层是 C 扩展的 gRPC 调用,connections.connect() 没有暴露 socket timeout 参数。当目标主机不可达时,TCP SYN 包没有回应,操作系统级别的 TCP 超时默认是 net.ipv4.tcp_syn_retries=6(Linux),总超时约 127 秒。在这期间,pymilvus 的 gRPC 调用持有 Python GIL,阻塞了同一进程中的所有线程。

解决方案 — TCP 预探测 + 熔断器

_PROBE_TIMEOUT = 3.0        # TCP 探活超时
_CHECK_TTL = 300            # 快速失败缓存时间
​
def _probe() -> bool:
    """在 connect() 之前用纯 socket 做 TCP 探活,3 秒超时"""
    try:
        sock = socket.create_connection(
            (settings.MILVUS_HOST, settings.MILVUS_PORT),
            timeout=_PROBE_TIMEOUT
        )
        sock.close()
        return True
    except (socket.timeout, ConnectionRefusedError, OSError):
        return False
​
def connect():
    global _available, _last_check
​
    # 熔断器:上次失败在 5 分钟内,直接快速失败
    if not _available and time.time() - _last_check < _CHECK_TTL:
        raise ConnectionError(
            f"Milvus 不可用(缓存至 {_last_check + _CHECK_TTL:.0f})"
        )
​
    # TCP 探活失败 → 更新熔断器状态
    if not _probe():
        _available = False
        _last_check = time.time()
        raise ConnectionError(
            f"Milvus TCP 探活失败 {settings.MILVUS_HOST}:{settings.MILVUS_PORT}"
        )
​
    # 探活通过 → 建立 gRPC 连接
    try:
        connections.connect(host=settings.MILVUS_HOST, port=settings.MILVUS_PORT)
        _available = True
    except Exception as e:
        _available = False
        _last_check = time.time()
        raise ConnectionError(f"Milvus 连接失败: {e}") from e

核心思想:不信任 pymilvus 的超时机制。用最基础的 socket.create_connection(timeout=3) 做前置探测——这 3 秒是你主动设置的上限,不是被动等待 127 秒。

3.2 坑 2:col.load() 启动时卡死服务

问题发现:在解决 connect() 阻塞后,服务启动时还是偶尔会卡住。排查后发现是 init_collection() 中调用了 col.load()——这个方法将 Milvus Collection 的全部向量数据加载到内存,是一个同步的、耗时的 gRPC 调用。

解决方案 — 懒加载

# init_collection() — 启动时只创建/获取 Collection 对象,不调用 load()
def init_collection():
    global _collection
    if utility.has_collection(COLLECTION_NAME):
        _collection = Collection(COLLECTION_NAME)  # 仅获取引用,毫秒级
        return  # 不调用 load()!
​
# search() — 首次使用时才加载
def search(vector, top_k=5):
    col = get_collection()
    try:
        col.load()           # 懒加载:首次搜索时执行
        results = col.search(...)
        return [...]
    except Exception as e:
        raise ConnectionError(f"Milvus 搜索失败: {e}") from e

设计原理col.load() 只在首次 search() 时执行一次(Milvus 内部有加载状态跟踪),后续搜索直接复用已加载的数据。这样启动路径是毫秒级的,加载开销延迟到了首次请求。

3.3 坑 3:混合检索降级与稀疏向量缺失

问题发现:在将知识库从 V1(纯 Dense)迁移到 V2(Dense + BM25 混合检索)后,部分 Collection 的 sparse_vector 字段缺失导致混合检索失败。

根因分析:V1 创建的 Collection Schema 中没有 sparse_vector 字段。V2 的混合检索需要这个字段。如果直接对 V1 Collection 调用 hybrid_search,Milvus 会返回 "field sparse_vector not found" 错误。

解决方案 — 自动降级 + 迁移提示

async def _hybrid_search(self, query, top_k, collection, category):
    vec = await self.embedder.embed_single(query)
    try:
        return await call_milvus(hybrid_search_knowledge, ...)
    except Exception as e:
        msg = str(e)
        is_schema_mismatch = "sparse_vector" in msg.lower()
        logger.warning(
            f"混合检索失败, 降级为纯向量检索"
            f"{'(集合缺少 sparse_vector,请运行 init_knowledge_collections_v2 迁移)' if is_schema_mismatch else ''}"
        )
​
    # Fallback: 纯 Dense 向量检索
    try:
        return await call_milvus(search_knowledge, collection, vec, top_k, category)
    except Exception as e2:
        logger.warning(f"降级检索也失败: {e2}")
        return []

降级链路:混合检索失败 → 纯 Dense 检索 → 空结果。每一步降级都有日志记录,方便排查。用户不会感知到区别——对他们来说,知识库检索要么返回结果,要么不返回。

3.4 坑 4:Milvus 写入失败的数据一致性

问题:MySQL 作为 source of truth,Milvus 作为向量检索引擎。当 Milvus 写入失败时,MySQL 已经有数据但 Milvus 没有——数据不一致。用户能看到文档入库成功(MySQL 有记录),但检索不到(Milvus 没有)。

解决方案 — 三层保障

第一层:同步批量写入(call_milvus + 独立线程池 + 10s 超时)
  ↓ 失败
第二层:Celery 补偿队列 — kb_compensate_insert_chunk.delay(chunk_id)
  ↓ worker 定时重试
第三层:日志告警 — logger.error 记录失败详情,便于人工介入
# 批量写入的核心逻辑
if new_entries:
    try:
        # 第一层:同步批量写入,有超时保护
        call_milvus(insert_knowledge_chunks_batch, new_entries)
    except Exception as e:
        # 第二层:批量失败 → 逐条入补偿队列,不阻塞主流程
        logger.error(f"Milvus batch chunk 写入失败,逐条入队: {e}")
        for entry in new_entries:
            kb_compensate_insert_chunk.delay(entry["chunk_id"])
        # 注意:MySQL 已经写入成功,数据不会丢失
        # 补偿队列会在 Milvus 恢复后自动完成同步

线程池隔离

# 独立线程池 — 与 FastAPI 默认线程池完全隔离
MILVUS_EXECUTOR = ThreadPoolExecutor(
    max_workers=2,           # 为什么是 2?见下文
    thread_name_prefix="milvus_"
)
MILVUS_TIMEOUT = 10         # 每个操作 10 秒超时
​
def call_milvus(func, *args, timeout=MILVUS_TIMEOUT):
    """在线程池中执行 Milvus 同步调用,带超时保护"""
    future = MILVUS_EXECUTOR.submit(func, *args)
    try:
        return future.result(timeout=timeout)
    except FutureTimeoutError:
        future.cancel()
        raise TimeoutError(f"Milvus 调用超时 ({timeout}s): {func.__name__}")
    except Exception:
        future.cancel()
        raise

max_workers=2 的选择不是随意的——collection.load() 是串行瓶颈操作(需要将全部向量加载到内存),更多 worker 只会增加锁竞争,不会提升吞吐。2 个 worker 确保一个用于当前的搜索/写入操作,另一个作为冗余缓冲。

3.5 防御性编程的设计原则总结

从上述四个坑中,我提炼出四条防御性编程原则,适用于任何依赖不稳定外部服务的场景:

原则 1:永远不信任底层库的超时机制
  → 在应用层建立 TCP 探活 + future.result(timeout) 双重保护

原则 2:启动路径不能有阻塞操作
  → 懒加载:初始化只建对象,首次使用才加载数据

原则 3:任何时候都有降级路径
  → 混合检索 → 纯 Dense → 空结果,每步降级都记录日志

原则 4:最终一致性 > 强一致性(对于向量存储)
  → MySQL 先写 + Milvus 补偿队列,允许短暂的不一致但绝不丢数据

这四条原则的泛化能力 — MinIO 双模存储

同一套防御性思维在项目的另一个基础设施——MinIO 对象存储——上也验证了其有效性。MinIO 同样被设计为可选依赖:系统同时维护本地文件路径(file_pathreport_path)和 MinIO 对象键(file_keyreport_key)两套字段。MinIO 可用时优先使用对象存储;MinIO 不可用时自动降级为本地文件系统,功能不受任何影响。TCP 探活和连接超时保护同样适用于 MinIO 客户端初始化。

这种"外部服务一律按不可靠假设来设计"的思维模式,比记住具体的 API 或参数重要得多。当你默认任何外部依赖都可能随时挂掉,你的系统设计会自然走向更健壮的方向。

常见问题补充

在实际使用中,一些运维层面的问题也值得记录:

  • Milvus 未部署能否运行? 可以。Milvus 初始化失败时仅输出 Warning,核心链路(上传→分析→可视化→报告)完全不受影响。相似案例检索和特征入库会跳过,知识库检索会返回空结果并由 LLM 诚实告知用户。

  • 报告生成状态卡在"生成中"? 系统使用数据库 report_phase 字段(而非内存状态)管控报告生成生命周期。服务器重启后,状态检测端点会自动发现 report_phase 非空但无有效报告文件的情况,将状态重置为可重新生成,不会永久卡死。这是一个比"用内存标记位 + 重启后丢失状态"更可靠的设计。

  • 知识库 Collection 迁移? 从纯 Dense 的 V1 Collection 迁移到 Dense+BM25 的 V2 Collection 时,V1 的 Schema 缺少 sparse_vector 字段。系统会自动检测并降级为纯向量检索,同时通过日志提示管理员运行 init_knowledge_collections_v2 完成迁移。


四、总结与展望

做对了什么

  1. 自研 Agent 框架(~300 行)而非引入 LangChain(20+ 依赖):对于 9 个工具的规模,这是投入产出比最高的选择。核心收益不是"省了依赖",而是对每个环节的完全掌控——超时可精确控制、流式可精确处理、安全护栏可精确实现。这套框架同时驱动了任务 Agent 对话和知识库 RAG 对话两种模式,避免了为每种对话模式单独维护一套工具调用逻辑。

  2. 四层 RAG 检索的模块化设计:每一层独立可开关,有独立的 timing 追踪,参数调优不影响其他层。双 Collection 的设计让原文语义匹配和问答精准匹配各司其职,再通过语义去重合并结果。这种模块化是后续做消融实验和参数优化的基础。

  3. Milvus 的防御性编程:在基础设施不可靠的假设下设计系统,这个理念比"用了哪个向量数据库的什么 API"重要得多。这套防御性思维被复用到 MinIO 双模存储、Celery 补偿队列、报告 DB 状态机等多个模块,形成了项目级的设计共识。

  4. 完整的工程配套:80 项 pytest 自动化测试覆盖 18 个测试类(从健康检查、认证流程到端到端报告生成),API 层面的实机测试和全维度集成测试分层互补。LLM 调用全量日志(llm_call_logs 表记录每次调用的模型、token 用量、延迟、成功/失败)使得成本归因和问题排查有数据可依。用户反馈系统(点赞/点踩)为检索质量评估提供了真实反馈信号。


本文是"AI 工程化系列"的第一篇,聚焦 Agent 框架、RAG 检索和基础设施防御三个核心方向。后续计划分享:Prompt 模板的版本管理与 A/B 测试、RAGAS 评估体系的搭建实操、LLM 应用的成本归因与性能监控。欢迎通过 GitHub 仓库关注项目进展和更多技术细节。

项目地址github.com/liu17v/water_analysis — 完整源码、架构文档及部署指南。


关键词:Agent 框架、Function Calling、RAG、混合检索、RRF、Milvus、AI 工程化、LLM 应用开发、防御性编程、向量数据库、SSE 流式响应

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐