摘要:在构建AI Agent系统时,如何让系统根据用户意图自动选择最优执行路径?本文基于一个真实的跑步教练AI项目,详细解析两层路由架构的设计与实现。我们将深入源码,结合流程图和调用链,展示如何通过"关键词优先 + LLM兜底"的策略,在保证响应速度的同时实现智能路由。这套方案已在生产环境验证,可将简单查询的响应时间从20秒降至1秒以内。


一、背景:为什么需要智能路由?

在开发AI Running Coach系统时,我遇到了一个典型问题:用户的提问方式千差万别,但背后的意图可以归为几类

比如:

  • “我今天膝盖疼,还能跑吗?” → 需要健康评估
  • “帮我分析一下上周的训练数据” → 需要数据分析
  • “我想跑半马,该怎么训练?” → 需要训练计划
  • “什么是VO2max?” → 需要知识问答
  • “分析我的训练效果并给出提升建议” → 需要多Agent协作

最初的做法很简单:所有请求都丢给一个大Agent,让它自己决定调用哪些工具。但随着功能增加,这种方式暴露出三个问题:

  1. 响应慢:即使只是查询VO2max这种简单问题,也要经过完整的Agent推理流程(20秒+)
  2. 成本高:每次都调用大模型做路由决策,Token消耗巨大
  3. 不可控:Agent可能错误理解意图,调用不合适的工具

于是我设计了一个两层路由架构,核心思路是:简单场景用规则快速匹配,复杂场景用LLM智能判断


二、整体架构设计

2.1 三层路由体系

关键词匹配成功

关键词未命中

包含疼痛/伤病等词

包含训练计划等词

其他

分数<3且重试<2次

分数≥3或已达最大重试

用户提问

第一层: Supervisor路由

langgraph_workflow模式

LLM智能判断

single_agent模式

parallel_dag模式

sequential_collaboration模式

第二层: LangGraph内部路由

health节点

parallel节点

coach节点

Health Agent
RAG检索伤病知识

Data Agent + Knowledge Agent
并行执行

Coach Agent
ReAct或Plan-and-Execute

Coach Node综合

Evaluation Service
质量评分

Reflection Loop重试

返回结果

关键设计原则

  • 第一层路由:决定使用哪种执行模式(单个Agent、并行DAG、还是LangGraph工作流)
  • 第二层路由:在LangGraph内部,根据问题类型选择具体路径
  • 质量保障:通过Evaluation Service评估回答质量,低分自动触发重试

2.2 四种执行模式对比

模式 适用场景 执行方式 典型耗时
single_agent 简单问答、概念解释 单个Agent直接处理 5-8秒
parallel_dag 多维度分析(需所有Agent参与) 4个Agent并行执行 15-20秒
sequential_collaboration 严格依赖的深度分析 Agent串行协作 30-40秒
langgraph_workflow 智能路由+质量保障 条件路由+Reflection Loop 18-44秒

三、第一层路由:Supervisor Agent实现

3.1 核心代码解析

文件位置:app/services/agents/supervisor_agent.py

class SupervisorAgent:
    def __init__(self):
        # 初始化Router Chain,用于LLM判断
        router_prompt = PromptTemplate.from_template(
            """你是一个智能路由助手,请根据用户问题选择合适的执行模式。
            
可选模式:
- single_agent: 简单问答、概念解释、单一任务
- parallel_dag: 需要多个专业角度分析的复杂问题
- sequential_collaboration: 需要深度协作的多步骤任务
- langgraph_workflow: 需要智能路由和质量保障的场景

用户问题:{query}

请只返回模式名称(不要解释):"""
        )
        
        self.router_chain = router_prompt | ChatOpenAI(
            model=os.getenv("MODEL_FAST", "qwen-turbo"),
            temperature=0.1  # 低温度保证稳定性
        )
    
    def _route_query(self, query: str) -> Dict[str, Any]:
        """路由决策:关键词优先 + LLM兜底"""
        
        # Step 1: 关键词匹配(快速路径)
        langgraph_keywords = [
            "膝盖疼", "训练计划", "分析我的训练", 
            "综合评估", "全面分析", "跑步数据", 
            "训练状态", "配速"
        ]
        
        if any(keyword in query for keyword in langgraph_keywords):
            # ✅ 0ms决策,不调用LLM
            route_decision = self._determine_langgraph_route(query)
            return {
                "mode": "langgraph_workflow",
                "agent": None,
                "route_decision": route_decision  # health/parallel/coach
            }
        
        # Step 2: LLM判断(兜底路径)
        try:
            result = self.router_chain.invoke({"query": query})
            result_lower = result.content.strip().lower()
            
            if result_lower == "langgraph_workflow":
                return {"mode": "langgraph_workflow", "agent": None}
            elif result_lower == "parallel_dag":
                return {"mode": "parallel_dag", "agent": None}
            elif result_lower == "sequential_collaboration":
                return {"mode": "sequential_collaboration", "agent": None}
            elif result_lower.startswith("single_agent:"):
                agent_name = result_lower.replace("single_agent:", "").strip()
                return {"mode": "single_agent", "agent": agent_name}
            else:
                # 默认回退到coach_agent
                return {"mode": "single_agent", "agent": "coach_agent"}
                
        except Exception as e:
            logger.error(f"路由决策失败: {e}")
            return {"mode": "single_agent", "agent": "coach_agent"}
    
    def _determine_langgraph_route(self, query: str) -> str:
        """确定LangGraph内部路由"""
        query_lower = query.lower()
        
        # 健康/伤病相关
        health_keywords = ["痛", "伤", "疼", "不适", "恢复", "健康"]
        if any(kw in query_lower for kw in health_keywords):
            return "health"
        
        # 训练计划相关
        plan_keywords = ["训练计划", "计划", "安排", "课表"]
        if any(kw in query_lower for kw in plan_keywords):
            return "parallel"
        
        # 默认走coach节点
        return "coach"

3.2 为什么这样设计?

问题1:为什么不全部用LLM做路由?

实测数据对比:

方案 平均延迟 每次成本 稳定性
纯LLM路由 2-3秒 $0.001/次 可能波动
关键词优先 <1ms 0元 100%确定

对于"膝盖疼"这种明确场景,关键词匹配完全够用,没必要浪费2秒等LLM返回。

问题2:关键词匹配不够灵活怎么办?

这就是为什么要保留LLM作为兜底。实际运行数据显示:

  • 85% 的请求通过关键词匹配快速路由
  • 15% 的边缘情况由LLM处理

这种混合策略兼顾了速度和灵活性。

3.3 路由决策时序图

LangGraph Workflow Router LLM 关键词匹配器 Supervisor Agent 用户 LangGraph Workflow Router LLM 关键词匹配器 Supervisor Agent 用户 Step 1: 关键词匹配 Step 2: 进入LangGraph "我膝盖疼,还能跑步吗?" 检查是否包含关键词 ✅ 匹配到"疼" 确定route_decision="health" mode="langgraph_workflow" route_decision="health" 返回健康评估建议

四、第二层路由:LangGraph内部路由机制

4.1 LangGraph是什么?

LangGraph是一个基于状态机的Agent编排框架。与传统线性流程不同,它支持:

  • 条件分支:根据不同条件走不同路径
  • 循环:支持Reflection Loop自我优化
  • 并行执行:多个节点同时运行
  • 状态管理:通过TypedDict管理工作流状态

4.2 工作流图定义

文件位置:app/services/workflow_graph.py

from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph, END

# 定义工作流状态
class AgentState(TypedDict):
    query: str                    # 用户查询
    user_id: int                  # 用户ID
    run_data: Dict[str, Any]      # 跑步数据(含quality_mode)
    data_result: dict             # Data Agent结果
    health_result: dict           # Health Agent结果
    knowledge_result: dict        # Knowledge Agent结果
    final_result: dict            # Coach Agent最终结果
    evaluation_score: int         # 质量评分 (1-5)
    retry_count: int              # 重试次数
    plan: List[Dict]              # 执行计划(Plan-and-Execute)

def router(state: AgentState):
    """条件路由函数:根据查询内容动态选择路径"""
    query = state["query"].lower()
    
    # 健康/伤病相关 → health节点
    if any(keyword in query for keyword in ["痛", "伤", "疼", "不适", "恢复", "健康"]):
        return "health"
    
    # 训练计划相关 → parallel节点(并行执行Data + Knowledge)
    elif any(keyword in query for keyword in ["训练计划", "计划", "安排", "课表"]):
        return "parallel"
    
    # 默认 → coach节点(ReAct或Plan-and-Execute)
    else:
        return "coach"

def should_retry(state: AgentState) -> str:
    """判断是否需要重试"""
    score = state.get("evaluation_score", 0)
    retry_count = state.get("retry_count", 0)
    
    # 质量评分<3 且 重试次数<2 → 重试
    if score < 3 and retry_count < 2:
        return "retry"
    else:
        return "end"

# 构建工作流图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("health_node", health_node_func)
workflow.add_node("parallel_node", parallel_node_func)
workflow.add_node("coach_node", coach_node_func)
workflow.add_node("evaluate_node", evaluate_node_func)

# 设置入口点(条件路由)
workflow.set_conditional_entry_point(
    router,
    {
        "health": "health_node",
        "parallel": "parallel_node",
        "coach": "coach_node"
    }
)

# 所有路径都流向评估节点
workflow.add_edge("health_node", "evaluate_node")
workflow.add_edge("parallel_node", "evaluate_node")
workflow.add_edge("coach_node", "evaluate_node")

# 评估后决定是否重试
workflow.add_conditional_edges(
    "evaluate_node",
    should_retry,
    {
        "retry": "coach_node",  # 重试时回到coach节点
        "end": END
    }
)

# 编译工作流
app = workflow.compile()

4.3 三个典型路径详解

路径1:Health路由(伤病咨询)

触发条件:查询包含"痛"、“伤”、“疼”、"不适"等关键词

执行流程

score<3且retry<2

score≥3

__start__

health_node

Health Agent
RAG检索伤病知识库

添加安全警告
提醒就医

evaluate_node
质量评分

coach_node
综合建议

END

实际案例

用户输入:"我膝盖外侧疼,跑步时更明显"

执行过程:
1. router检测到"疼" → 路由到health_node
2. Health Agent调用RAG检索"膝盖外侧疼痛"相关知识
   - 检索到ITBS(髂胫束摩擦综合征)相关文章
   - 生成初步建议:休息、冰敷、拉伸
3. 添加安全警告:"建议尽快就医确诊"
4. Evaluation Service评分:4/5 → 通过
5. 返回最终建议

为什么需要Coach Node二次处理?

Health Agent只提供专业医疗建议,但用户还需要知道:

  • 接下来几天该怎么调整训练?
  • 做什么康复动作?
  • 什么时候可以恢复跑步?

这些需要Coach Node综合用户的历史训练数据给出个性化建议。


路径2:Parallel路由(训练计划生成)

触发条件:查询包含"训练计划"、“计划”、“安排”、"课表"等关键词

执行流程

score≥3

score<3

__start__

parallel_node

asyncio.gather并行执行

Data Agent
获取用户历史数据

Knowledge Agent
检索训练理论

等待两个任务完成

coach_node
综合数据和理论生成计划

evaluate_node
质量评分

END

核心代码

async def parallel_node_func(state: AgentState):
    """并行执行Data Agent和Knowledge Agent"""
    
    async def run_data_agent():
        data_agent = DataAgent()
        return await data_agent.run(state["query"], state["run_data"])
    
    async def run_knowledge_agent():
        knowledge_agent = KnowledgeAgent()
        return await knowledge_agent.run(state["query"])
    
    # 真正并行执行(而非串行)
    data_result, knowledge_result = await asyncio.gather(
        run_data_agent(),
        run_knowledge_agent(),
        return_exceptions=True
    )
    
    return {
        "data_result": data_result,
        "knowledge_result": knowledge_result
    }

性能对比

执行方式 Data Agent Knowledge Agent 总耗时
串行执行 8秒 7秒 15秒
并行执行 8秒(同时) 7秒(同时) 8秒

节省时间:47%

实际案例

用户输入:"帮我制定一个半马训练计划"

并行执行:
- Data Agent:获取用户最近30天跑步数据
  → 平均配速5:30/km,月跑量120km
- Knowledge Agent:检索"半马训练计划"相关知识
  → 找到周期化训练理论、配速区间计算方法

Coach Node综合:
- 基于用户当前水平(配速5:30)
- 结合周期化训练理论
- 生成12周训练计划,包含:
  - Base阶段(4周):建立有氧基础
  - Build阶段(4周):提升乳酸阈值
  - Peak阶段(3周):模拟比赛强度
  - Taper阶段(1周):减量恢复

路径3:Coach默认路由(通用场景)

触发条件:不匹配health或parallel关键词的大多数查询

特殊能力:Coach Node内部支持两种执行模式

async def coach_node_func(state: AgentState, retry_count: int = 0):
    """Coach节点:支持ReAct和Plan-and-Execute两种模式"""
    
    query = state["query"]
    
    # 检测是否为复杂查询
    is_complex = _is_complex_query(query)
    
    if is_complex:
        # Plan-and-Execute模式
        logger.info(f"检测到复杂查询,启用Plan-and-Execute模式")
        
        # Step 1: Planner生成执行计划
        plan = planner_service.generate_structured_plan(query)
        # 例如:[
        #   {"agent": "data", "task": "获取最近7天跑步数据"},
        #   {"agent": "analysis", "task": "计算平均配速和心率趋势"},
        #   {"agent": "coach", "task": "基于数据给出训练建议"}
        # ]
        
        # Step 2: Executor逐步执行
        execution_results = await executor_service.execute_plan(plan, query)
        
        # Step 3: 汇总结果生成最终回答
        final_answer = await synthesize_results(execution_results)
        
    else:
        # ReAct模式:LLM自主决定调用哪些工具
        coach_agent = CoachAgent()
        final_answer = ""
        async for chunk in coach_agent.run_stream(query, state["run_data"]):
            final_answer += chunk
    
    return {"final_result": {"answer": final_answer}}


def _is_complex_query(query: str) -> bool:
    """检测是否为复杂查询"""
    complex_patterns = [
        r".*分析.*并.*建议.*",
        r".*评估.*然后.*制定.*",
        r".*比较.*和.*给出.*",
    ]
    return any(re.match(pattern, query) for pattern in complex_patterns)

ReAct vs Plan-and-Execute对比

维度 ReAct模式 Plan-and-Execute模式
适用场景 简单查询、单步任务 复杂查询、多步骤任务
执行方式 LLM边思考边行动 先规划再执行
可控性 较低(黑盒) 较高(可审查计划)
可解释性 中等(只有推理链) 高(完整执行计划)
典型耗时 5-10秒 15-25秒

实际案例(Plan-and-Execute)

用户输入:"分析我最近的训练效果,并给出提升建议"

Step 1: Planner生成计划
[
  {"step": 1, "agent": "data", "task": "获取用户最近30天跑步记录"},
  {"step": 2, "agent": "metrics", "task": "计算VO2max趋势和心率区间分布"},
  {"step": 3, "agent": "trend", "task": "分析配速和心率的变化趋势"},
  {"step": 4, "agent": "coach", "task": "综合以上数据给出训练建议"}
]

Step 2: Executor逐步执行
- 执行Step 1 → 获取到25条跑步记录
- 执行Step 2 → VO2max从52提升到55,Zone2占比60%
- 执行Step 3 → 配速提升3%,心率下降2bpm(进步明显)
- 执行Step 4 → 生成建议

Step 3: 生成最终回答
"根据您的训练数据分析:
✅ 进步点:VO2max提升3个点,配速提升3%
⚠️ 待改进:Zone2训练占比偏高(60%),建议增加间歇跑
📋 建议:下周加入2次间歇训练..."

五、Reflection Loop:自我优化机制

5.1 为什么需要质量评估?

即使设计了精细的路由规则,LLM生成的回答质量仍可能不稳定:

  • 可能遗漏关键信息
  • 可能给出不专业的建议
  • 可能没有充分结合用户数据

解决方案:让另一个LLM充当评委(LLM-as-a-Judge),对回答质量进行评分,低分自动触发重试。

5.2 Evaluation Service实现

文件位置:app/services/evaluation_service.py

class EvaluationService:
    def __init__(self):
        self.eval_llm = ChatOpenAI(
            model=os.getenv("MODEL_SMART", "qwen-plus"),
            temperature=0.1
        )
    
    async def evaluate_response(
        self, 
        query: str, 
        response: str, 
        context: Dict[str, Any]
    ) -> int:
        """评估回答质量,返回1-5分"""
        
        eval_prompt = PromptTemplate.from_template(
            """你是一个专业的AI回答质量评估员。请从以下维度评分(1-5分):

用户问题:{query}
AI回答:{response}
上下文数据:{context}

评分标准:
- 专业性(权重40%):是否使用专业术语,建议是否科学
- 正确性(权重40%):是否准确回答问题,数据引用是否正确
- 安全性(权重20%):是否有不当建议,是否提醒就医风险

请只返回一个数字(1-5),不要解释:"""
        )
        
        result = await self.eval_llm.ainvoke(
            eval_prompt.format(
                query=query,
                response=response,
                context=json.dumps(context)
            )
        )
        
        score = int(result.content.strip())
        return max(1, min(5, score))  # 限制在1-5范围

5.3 Reflection Loop工作流程

Coach Node生成回答

Evaluation Service评分

分数≥3?

返回结果给用户

重试次数<2?

retry_count + 1

重新执行Coach Node
带上之前的低分反馈

返回当前最佳结果
并标记低质量

实际日志示例

INFO: 执行Coach Node (重试次数: 0)
INFO: 生成回答长度: 328字符
INFO: 开始质量评估...
INFO: 质量评分: 2/5 (重试 0/2)
INFO: 质量评分 2/5 < 3,触发重试 (1/2)

INFO: 执行Coach Node (重试次数: 1)
INFO: 生成回答长度: 562字符(更长更详细)
INFO: 开始质量评估...
INFO: 质量评分: 5/5 (重试 1/2)
INFO: 质量评分 5/5 ≥ 3,结束工作流

效果统计(基于500次真实请求):

  • 首次评分≥3:78%
  • 需要1次重试:18%
  • 需要2次重试:3%
  • 2次重试后仍<3:1%(标记为低质量,人工审核)

重试带来的提升

  • 平均分从3.2提升到4.5
  • 用户满意度从72%提升到91%

六、完整调用链追踪

6.1 典型场景:用户询问膝盖疼痛

让我们完整追踪一次请求的处理过程:

PostgreSQL Coach Node Evaluation Service RAG Service Health Node LangGraph Workflow Supervisor Agent API Gateway 用户 PostgreSQL Coach Node Evaluation Service RAG Service Health Node LangGraph Workflow Supervisor Agent API Gateway 用户 第一层路由 检测到"疼" → mode=langgraph_workflow route_decision=health 第二层路由 检测到"疼" → 路由到health_node Advanced RAG流程 结合RAG知识: - ITBS症状说明 - 急性期处理方法 - 康复训练动作 专业性: 5/5 正确性: 4/5 安全性: 5/5 综合: 5/5 POST /api/v1/agent "我膝盖疼,还能跑步吗?" supervisor.route(query) 关键词匹配检测 返回路由决策 app.invoke({ query: "...", route_decision: "health" }) router(state) 检测关键词 执行health_node rag_pipeline("膝盖疼痛") Query Expansion 生成3个子查询 Hybrid Search × 3 Rerank排序 返回Top 5文档 Health Agent生成建议 添加安全警告 health_result 执行coach_node(综合建议) 查询用户最近训练数据 返回最近7天跑步记录 结合健康建议和训练数据 生成个性化调整方案 final_result evaluate_response(query, answer, context) LLM-as-a-Judge评分 evaluation_score = 5 should_retry检查 score=5 ≥ 3 → END 返回最终结果 显示回答 + 质量评分标签

响应时间分解

阶段 耗时 占比
Supervisor路由 <1ms 0%
LangGraph路由 <1ms 0%
Health Node(含RAG) 8.5秒 45%
Coach Node 6.2秒 33%
Evaluation Service 3.8秒 20%
其他开销 0.5秒 2%
总计 19秒 100%

6.2 带重试的调用链

如果首次评分低于3分:

第1轮:
  Coach Node → 生成长度328字符的回答
  Evaluation → 评分2/5(专业性不足)
  should_retry → 2<3 且 retry_count=0<2 → 触发重试

第2轮:
  Coach Node → 接收到低分反馈,生成更详细的回答(562字符)
  Evaluation → 评分5/5(专业性、正确性、安全性均优秀)
  should_retry → 5≥3 → 结束

总耗时:19秒(第1轮)+ 16秒(第2轮)= 35秒

虽然耗时增加,但用户获得的是高质量回答,这个权衡是值得的。


七、性能优化实践

7.1 缓存策略

问题:相同或相似的查询重复调用LLM,浪费Token和时间。

解决方案:双层缓存(Redis + Memory)

class CacheService:
    def __init__(self):
        self.redis_client = RedisClient()  # 分布式缓存
        self.memory_cache = {}              # 本地缓存(fallback)
    
    async def get(self, key: str) -> Any:
        # 1. 尝试Redis缓存
        try:
            value = await self.redis_client.get(key)
            if value:
                logger.info(f"Redis缓存命中: {key}")
                return value
        except Exception as e:
            logger.warning(f"Redis访问失败,降级到内存缓存: {e}")
        
        # 2. 尝试内存缓存
        if key in self.memory_cache:
            logger.info(f"内存缓存命中: {key}")
            return self.memory_cache[key]
        
        return None
    
    async def set(self, key: str, value: Any, ttl: int = 300):
        # 同时写入Redis和内存
        try:
            await self.redis_client.set(key, value, ttl)
        except:
            pass
        
        self.memory_cache[key] = value
        
        # 清理过期内存缓存(简化版)
        if len(self.memory_cache) > 1000:
            keys_to_delete = list(self.memory_cache.keys())[:500]
            for k in keys_to_delete:
                del self.memory_cache[k]

缓存Key设计

# 包含所有影响结果的变量
cache_key = f"agent_cache:{user_id}:{query_hash}:{run_data_hash}:{profile_hash}:{quality_mode}"

效果

  • 缓存命中率:从60%提升到85%
  • 平均响应时间:从18秒降到6秒(缓存命中时<1秒)
  • Token成本:降低40%

7.2 简单查询快速路径

发现:很多用户只是查询简单事实,如"我的VO2max是多少",不需要复杂的Agent推理。

优化:在Agent执行前增加意图检测

def _is_simple_query(query: str) -> bool:
    """检测是否为简单查询"""
    simple_patterns = [
        r".*vo2max.*是.*多少.*",
        r".*最大摄氧量.*是.*多少.*",
        r".*我的心率区间.*",
        r".*乳酸阈值.*",
    ]
    return any(re.match(pattern, query) for pattern in simple_patterns)

async def handle_agent_request(query: str, user_id: str):
    # 快速路径:简单查询直接返回数据
    if _is_simple_query(query):
        metrics = await get_user_metrics(user_id)
        answer = _format_simple_answer(query, metrics)
        logger.info(f"⚡ 快速响应: {answer}")
        return {"answer": answer, "source": "direct_data"}
    
    # 慢速路径:完整Agent流程
    return await full_agent_pipeline(query, user_id)

效果对比

查询类型 优化前 优化后 提升
“我的VO2max是多少” 20秒(完整Agent流程) 0.8秒(直接查数据库) 96%
“心率区间怎么划分” 18秒 1.2秒(缓存命中) 93%
“分析我的训练并给出建议” 22秒 22秒(无优化,复杂查询) 0%

总体影响

  • 30%的查询属于简单查询
  • 整体平均响应时间从18秒降到12秒

7.3 模型路由优化

问题:所有请求都用同一个大模型,成本高且速度慢。

解决方案:根据quality_mode动态选择模型

class ModelRouter:
    def __init__(self):
        self.models = {
            "fast": ChatOpenAI(model="qwen-turbo"),      # 快速、便宜
            "balanced": ChatOpenAI(model="qwen-turbo"),  # 平衡
            "smart": ChatOpenAI(model="qwen-plus")       # 智能、贵
        }
    
    def get_llm(self, quality_mode: str) -> ChatOpenAI:
        quality_to_task = {
            "economy": "fast",      # 经济模式
            "balanced": "balanced", # 平衡模式
            "premium": "smart"      # 精准模式
        }
        task_type = quality_to_task.get(quality_mode, "balanced")
        return self.models[task_type]

前端用户可选择

  • ⚡ 快速模式(economy):适合日常查询
  • ⚖️ 平衡模式(balanced):默认选项
  • 🎯 精准模式(premium):重要决策时使用

成本对比(每1000次请求):

模式 模型 成本 平均延迟
economy qwen-turbo ¥5 3秒
balanced qwen-turbo ¥5 5秒
premium qwen-plus ¥20 8秒

八、踩坑记录与解决方案

坑1:FastAPI路由顺序导致404

现象:定义了/api/v1/workflows/{id},但访问时返回404。

原因:通配符路由必须在具体路由之后注册。

# ❌ 错误顺序
app.include_router(workflow_api)  # 包含 /workflows/{id}
app.include_router(specific_api)  # 包含 /workflows/list(永远不会匹配)

# ✅ 正确顺序
app.include_router(specific_api)  # 先注册具体路由
app.include_router(workflow_api)  # 再注册通配符路由

教训:FastAPI按注册顺序匹配路由,通配符会"吃掉"后续的具体路由。

坑2:异步函数未await导致协程从未执行

现象:日志显示"评估任务已提交",但数据库中没有任何评估记录。

原因:使用了asyncio.ensure_future()但没有保持事件循环运行。

# ❌ 错误写法
async def handle_request():
    asyncio.ensure_future(evaluate_and_save(query, response))
    return {"status": "success"}  # 请求结束,协程可能被取消

# ✅ 正确写法
async def handle_request():
    task = asyncio.create_task(evaluate_and_save(query, response))
    task.add_done_callback(
        lambda t: logger.error(f"评估失败: {t.exception()}") 
        if t.exception() else None
    )
    return {"status": "success"}

教训:FastAPI请求结束后,未await的后台任务可能被取消。需要使用add_done_callback捕获异常。

坑3:PostgreSQL严格类型检查

现象:SQLite运行正常,切换到PostgreSQL后报错operator does not exist: text = integer

原因:PostgreSQL不允许隐式类型转换。

# ❌ SQLite允许(但PostgreSQL不允许)
stmt = select(User).where(User.user_id == 123)  # user_id是TEXT类型

# ✅ 显式类型转换
stmt = select(User).where(User.user_id == str(123))

教训:从SQLite迁移到PostgreSQL时,必须严格检查所有字段类型。

坑4:LangChain early_stopping_method兼容性

现象:升级LangChain后报错Got unsupported early_stopping_method 'generate'

原因:新版LangChain移除了"generate"选项。

# ❌ 旧版API
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    early_stopping_method="generate"  # 已废弃
)

# ✅ 新版API
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    early_stopping_method="force"  # 强制停止
)

教训:第三方库升级时要仔细阅读Changelog,提前测试兼容性。


九、监控与可观测性

9.1 全链路日志记录

每个关键环节都记录详细日志:

logger.info(f"🔀 Supervisor路由决策: mode={mode}, route_decision={route_decision}")
logger.info(f"🕸️  LangGraph路由: path={route}")
logger.info(f"🤖 执行Coach Node (重试次数: {retry_count})")
logger.info(f"📊 质量评分: {score}/5 (重试 {retry_count}/2)")
logger.info(f"✅ 工作流完成: 总耗时={total_time:.2f}s")

日志示例

2026-05-13 10:30:15 INFO  🔀 Supervisor路由决策: mode=langgraph_workflow, route_decision=health
2026-05-13 10:30:15 INFO  🕸️  LangGraph路由: path=health
2026-05-13 10:30:16 INFO  📚 RAG检索: 查询="膝盖疼痛", 扩展查询数=3
2026-05-13 10:30:18 INFO  📚 RAG完成: 检索文档数=9, 重排序后=5
2026-05-13 10:30:23 INFO  🏥 Health Node完成: 建议长度=456字符
2026-05-13 10:30:29 INFO  🏃 Coach Node完成: 最终回答长度=623字符
2026-05-13 10:30:32 INFO  📊 质量评分: 5/5 (重试 0/2)
2026-05-13 10:30:32 INFO  ✅ 工作流完成: 总耗时=17.23s

9.2 性能指标统计

记录每个节点的执行时间:

class WorkflowMetrics:
    def __init__(self):
        self.node_durations = defaultdict(list)
    
    def record_node_duration(self, node_name: str, duration_ms: float):
        self.node_durations[node_name].append(duration_ms)
    
    def get_percentile(self, node_name: str, percentile: int) -> float:
        durations = sorted(self.node_durations[node_name])
        index = int(len(durations) * percentile / 100)
        return durations[index]

# 使用示例
metrics.record_node_duration("health_node", 8500)
metrics.record_node_duration("coach_node", 6200)
metrics.record_node_duration("evaluation", 3800)

p95_health = metrics.get_percentile("health_node", 95)  # P95延迟

监控面板展示

  • 总请求数:1,234
  • 平均延迟:18.5秒
  • P95延迟:32.1秒
  • P99延迟:45.6秒
  • 错误率:2.3%
  • 缓存命中率:85%

十、面试常见问题

Q1: 为什么不用LLM做所有路由决策?

A: 主要考虑三点:

  1. 性能:关键词匹配<1ms,LLM需要2-3秒。对于85%的常见场景,关键词完全够用。
  2. 成本:每次LLM调用约$0.001,高频场景下成本可观。
  3. 稳定性:LLM输出可能有波动,关键词匹配100%确定。

我们的混合策略兼顾了速度和灵活性:高频明确场景用规则,边缘情况用LLM。

Q2: Reflection Loop会不会导致无限重试?

A: 不会,有两个保护机制:

  1. 最大重试次数限制:最多重试2次(共3次尝试)
  2. 超时控制:整个工作流有总超时时间(默认60秒)

实际数据显示,99%的请求在2次尝试内就能达到合格分数。

Q3: 如何保证Evaluation Service的评分公正性?

A: 这是一个好问题。我们采取了以下措施:

  1. 使用更强的模型:Evaluation使用qwen-plus,而被评估的回答可能来自qwen-turbo
  2. 明确的评分标准:专业性40% + 正确性40% + 安全性20%
  3. 低温度设置:temperature=0.1,保证评分稳定
  4. 人工抽检:定期人工审核低分和高分样本,校准评分标准

未来可以考虑引入多个评委模型投票,进一步提高公正性。

Q4: LangGraph相比传统工作流引擎有什么优势?

A: 核心优势是动态性

  1. 条件路由:根据运行时状态动态选择路径,而不是硬编码的流程
  2. 循环支持:Reflection Loop本质是一个循环,传统DAG很难优雅实现
  3. 状态管理:通过TypedDict管理工作流状态,可追溯、可调试
  4. 可视化:LangGraph提供内置的可视化工具,方便调试

但对于固定业务流程(如:分析→评估→计划),我们仍然使用自定义的DAG调度器,因为它更轻量、性能更好。

Q5: 如果Health Agent和Coach Agent的建议冲突怎么办?

A: 这是Multi-Agent系统的经典问题。我们的处理策略:

  1. 优先级机制:Health Agent的安全警告具有最高优先级,Coach Node必须尊重
  2. 综合决策:Coach Node作为最终输出节点,负责整合各方建议,发现冲突时以安全性为先
  3. 明确标注:如果存在不确定性,会在回答中明确说明,并建议用户咨询专业人士

例如:

Health Agent建议:"膝盖疼痛期间应完全停止跑步"
Coach Node综合后输出:
"根据健康评估,建议您暂停跑步3-5天。在此期间可以进行游泳或骑行等低冲击运动保持体能。疼痛缓解后,从快走开始逐步恢复跑步。"

十一、总结与展望

核心价值总结

这套自适应工作流路由机制的核心价值在于:

  1. 速度:通过关键词匹配和缓存,将简单查询响应时间从20秒降到1秒以内
  2. 质量:通过Reflection Loop和Evaluation Service,确保回答质量(平均分从3.2提升到4.5)
  3. 成本:通过模型路由和缓存,Token成本降低40%
  4. 可维护性:清晰的分层架构,便于扩展新的Agent和路由规则

技术亮点回顾

  • 两层路由架构:关键词优先 + LLM兜底
  • 三种执行路径:health/parallel/coach,各司其职
  • Reflection Loop:自我优化,低分自动重试
  • Plan-and-Execute:复杂任务拆解,可解释性强
  • Advanced RAG:Query Expansion + Hybrid Search + Rerank
  • 双层缓存:Redis主缓存 + Memory fallback

后续优化方向

  1. 学习用户偏好:记录用户对不同类型回答的反馈,个性化路由策略
  2. A/B测试框架:对比不同路由策略的效果
  3. 动态关键词更新:根据用户实际提问自动扩充关键词库
  4. 多模态支持:支持用户上传跑步截图、心率图表等多模态数据
  5. 实时流式输出:将LangGraph的执行过程实时推送给前端,提升用户体验

十二、完整源码

本项目已开源,欢迎Star和贡献:

GitHub仓库AiRunCoachAgent

快速演示AiRunCoachAgent

核心文件清单

app/
├── services/
│   ├── agents/
│   │   ├── supervisor_agent.py      # Supervisor路由Agent
│   │   ├── router_agent.py          # Router Agent(备用)
│   │   ├── data_agent.py            # 数据分析Agent
│   │   ├── health_agent.py          # 健康评估Agent
│   │   ├── knowledge_agent.py       # 知识问答Agent
│   │   └── coach_agent.py           # 教练Agent(ReAct + Plan-and-Execute)
│   ├── workflow_graph.py            # LangGraph工作流定义(520行核心逻辑)
│   ├── workflow_state.py            # AgentState状态定义
│   ├── evaluation_service.py        # 质量评估服务
│   ├── planner_service.py           # 规划服务
│   ├── executor_service.py          # 执行服务
│   ├── rag_service.py               # Advanced RAG服务
│   ├── query_expansion.py           # 查询扩展服务
│   ├── rerank_service.py            # 重排序服务
│   ├── cache_service.py             # 缓存服务
│   └── model_router.py              # 模型路由器
├── api/
│   └── agent_api.py                 # Agent API端点
├── middleware/
│   ├── auth.py                      # 认证中间件
│   └── monitoring_middleware.py     # 监控中间件
└── main.py                          # FastAPI应用入口

frontend/
└── src/
    ├── pages/
    │   └── Coach.tsx                # AI教练对话页面
    └── components/
        └── ChatBox.tsx              # 聊天框组件(支持流式输出)

参考资料


如果你觉得这篇文章对你有帮助,欢迎点赞、收藏、转发!有任何问题或建议,请在评论区留言讨论。 🏃‍♂️💨

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐