多Agent开发
API接口
| 功能 | 方法 | 路径 | 说明 |
|---|---|---|---|
| 普通对话 | POST | /api/chat |
一次性返回 |
| 流式对话 | POST | /api/chat_stream |
SSE 流式输出 |
| AIOps 诊断 | POST | /api/aiops |
自动故障诊断(流式) |
| 文件上传 | POST | /api/upload |
上传并索引文档 |
| 健康检查 | GET | /api/health |
服务状态检查 |
Chat Agent
普通对话
请求入口:
@router.post("/chat")
async def chat(request: ChatRequest):
"""快速对话接口
{
"code": 200,
"message": "success",
"data": {
"success": true,
"answer": "回答内容",
"errorMessage": null
}
}
Args:
request: 对话请求
Returns:
统一格式的对话响应
"""
try:
logger.info(f"[会话 {request.id}] 收到快速对话请求: {request.question}")
answer = await rag_agent_service.query(
request.question,
session_id=request.id
)
logger.info(f"[会话 {request.id}] 快速对话完成")
return {
"code": 200,
"message": "success",
"data": {
"success": True,
"answer": answer,
"errorMessage": None
}
}
except Exception as e:
logger.error(f"对话接口错误: {e}")
return {
"code": 500,
"message": "error",
"data": {
"success": False,
"answer": None,
"errorMessage": str(e)
}
}
核心代码:
answer = await rag_agent_service.query(
request.question,
session_id=request.id
)
其中rag_agent_service.query是Rag Agent中的异步方法,使用await等待其运行结果保存在answer。
在return中会将answer封装在data中作为HTTP 响应进行返回。,会通过 FastAPI 框架自动发送回 发起 /chat 请求的客户端
Rag Agent服务
为Agent 创建了带自动消息追加功能的记忆区
class AgentState(TypedDict):
"""Agent 状态"""
messages: Annotated[Sequence[BaseMessage], add_messages]
AgentState 是一个 类型提示类(TypedDict),用来告诉 LangGraph:
“这个 Agent 在运行过程中,它的内部状态(state)长什么样?”
你可以把它想象成 Agent 的 “大脑记忆区”,每次思考或回复后,都会更新这块内存。
其中Annotated 是 Python 标准库(从 Python 3.9+ 引入,旧版本可通过 typing_extensions 使用)中的一个类型注解工具。Annotated[Type, metadata1, metadata2, ...]
- Type:真实的类型(如 str, int, List[str] 等)
- metadata:任意附加信息(可以是函数、字符串、类、注解等),不影响运行时行为,但框架或工具可以读取它
核心思想:在不改变类型本身的前提下,给类型“贴标签”或“加说明”,供外部系统(如 LangGraph、FastAPI、Pydantic)使用。
可以把 Annotated[Type, behavior] 想象成:
“这个变量是 Type 类型的,而且当你要更新它时,请用 behavior 的方式来更新。”
add_messages 是 LangGraph 内置的一个消息合并函数
当你在节点函数中返回:
return {"messages": [AIMessage("你好")]}
LangGraph 会:
查看 AgentState 中 messages 字段的类型注解
发现它是 Annotated[…, add_messages]
于是不直接覆盖,而是调用:
new_state["messages"] = add_messages(current_state["messages"], [AIMessage("你好")])
结果就是追加,而不是替换!
使用的方法:
TypedDict + Annotated + add_messages 组合定义 LangGraph 状态结构
实现的效果:
给 Agent 创建了带自动消息追加功能的记忆区,让智能体可以记住历史对话。
修剪 Agent 状态中的消息历史
def trim_messages_middleware(state: AgentState) -> dict[str, Any] | None:
这是一个 LangGraph 中间件函数,核心作用是:修剪 Agent 状态中的消息历史,只保留关键消息(第一条系统消息 + 最近 3 轮对话),避免消息过多超出上下文窗口,同时兼容 LangGraph 状态更新机制。
核心代码:
# 提取第一条系统消息
first_msg = messages[0]
# 保留最近的 6 条消息(确保包含完整的对话轮次)
recent_messages = messages[-6:] if len(messages) % 2 == 0 else messages[-7:]
# 构建新的消息列表
new_messages = [first_msg] + list(recent_messages)
RagAgentService
RagAgentService是一个基于 LangGraph + LangChain + Qwen(通义千问) 的 RAG(检索增强生成)智能代理服务(RagAgentService),支持 非流式 和 流式 两种对话模式,并具备 会话历史管理、工具调用(包括 MCP 工具)、消息修剪、错误处理 等能力。
调用和执行流程:
客户端 (POST /chat)
↓
FastAPI 路由 → 调用 rag_agent_service.query()
↓
RagAgentService
├── 初始化:ChatQwen 模型 + 工具(本地 + MCP)
├── 非流式查询:query() → 一次性返回完整答案
├── 流式查询:query_stream() → 异步生成器逐块返回
├── 会话管理:MemorySaver(基于 session_id = thread_id)
├── 上下文修剪:trim_messages_middleware(防止超长上下文)
└── 工具集成:retrieve_knowledge, get_current_time, MCP 工具(如日志、告警等)
初始化
初始化函数:
def __init__(self, streaming: bool = True):
可以选择是否流式输出,默认为流式输出
在初始化方法中先配置模型名称、是否流式输出、系统提示词,然后创建可直接调用的 ChatQwen 模型对象。接着定义工具、MCP、内存检查点self.checkpointer = MemorySaver(),其中获取MCP是异步的方式。
MemorySaver是 LangGraph 提供的一种 状态持久化后端(checkpoint saver):所有状态都保存在 Python 进程的内存中(即一个字典 dict),不会持久化到磁盘或数据库。
checkpointer 的生命周期 = RagAgentService 实例的生命周期。
核心代码:
self.model = ChatQwen(
model=self.model_name, # 用哪个模型
api_key=config.dashscope_api_key, # 调用阿里DashScope的密钥(必须有,不然用不了模型)
temperature=0.7, # 创造性(0-1,越小越严谨,越大越有想象力,0.7刚好平衡)
streaming=streaming, # 跟上面的streaming一致,启用/关闭流式
)
接着异步初始化Agent
async def _initialize_agent(self):
加载MCP工具
# 1. 加载MCP工具(从腾讯云MCP服务获取,比如日志查询、监控告警工具)
mcp_client = await get_mcp_client_with_retry() # 连接MCP服务(带重试,防止网络差连不上)
mcp_tools = await mcp_client.get_tools() # 从MCP服务拿到工具列表
接着合并工具:本地工具(retrieve_knowledge等) + MCP工具(云工具)
再创建 ReAct Agent
self.agent = create_agent(
self.model, # 用我们前面初始化的ChatQwen模型
tools=all_tools, # 合并后的所有工具
checkpointer=self.checkpointer, # 会话记忆(记住聊天历史)
)
构建系统提示词
def _build_system_prompt(self) -> str:
"""
构建系统提示词
注意:LangChain 框架会自动将工具信息传递给 LLM,
因此系统提示词中无需列举具体的工具列表。
Returns:
str: 系统提示词
"""
from textwrap import dedent
# dedent:自动删除代码前面的缩进空格,让多行字符串保持顶格,避免格式混乱
# .strip():去掉字符串最前面、最后面的空行和空格,让提示词更干净
return dedent("""
你是一个专业的AI助手,能够使用多种工具来帮助用户解决问题。
工作原则:
1. 理解用户需求,选择合适的工具来完成任务
2. 当需要获取实时信息或专业知识时,主动使用相关工具
3. 基于工具返回的结果提供准确、专业的回答
4. 如果工具无法提供足够信息,请诚实地告知用户
回答要求:
- 保持友好、专业的语气
- 回答简洁明了,重点突出
- 基于事实,不编造信息
- 如有不确定的地方,明确说明
请根据用户的问题,灵活使用可用工具,提供高质量的帮助。
""").strip()
- dedent + strip():只是格式化提示词,让它没有多余的空格和空行,AI读起来更清晰,不影响功能。
- 不用写工具列表:LangChain会自动把所有工具(本地+MCP)的信息传给AI,我们不用手动在提示词里列工具,省事儿~
非流式查询方法(query)—— 一次性返回完整答案
async def query( self, question: str, session_id: str, ) -> str:
核心代码:
# 2. 构建消息列表:系统提示词 + 用户问题(告诉AI上下文)
messages = [
SystemMessage(content=self.system_prompt), # 给AI立规矩的提示词
HumanMessage(content=question) # 用户的问题
]
# 3. 构建Agent输入:格式必须是{"messages": 消息列表},LangGraph才能识别
agent_input = {"messages": messages}
# 4. 配置会话ID:用session_id作为thread_id,让checkpointer记住这个用户的聊天历史
config_dict = {
"configurable": {
"thread_id": session_id
}
}
# 5. 异步执行Agent(ainvoke:异步调用,不卡主线程)
result = await self.agent.ainvoke(
input=agent_input,
config=config_dict,
)
# 6. 提取最终答案(重点!)
messages_result = result.get("messages", []) # 从Agent返回结果里拿消息列表
if messages_result:
# 消息列表最后一条,就是AI的最终回答(前面是思考、工具调用记录)
last_message = messages_result[-1]
# 取回答内容,防止没有content属性报错
answer = last_message.content if hasattr(last_message, 'content') else str(last_message)
- 非流式:就是AI把所有思考、工具调用都做完,一次性把完整答案返回,适合不需要实时显示的场景(比如后台查询)。
- session_id的作用:区分不同用户,比如用户A和用户B的聊天历史,靠session_id分开,不会串线。
- 提取答案:Agent返回的消息列表里,最后一条就是最终回答(前面的是AI的思考过程、工具调用记录,不用管)。
流式查询方法(query_stream)—— 逐步返回答案(AI边想边说)
async def query_stream( self, question: str, session_id: str, ) -> AsyncGenerator[Dict[str, Any], None]:
异步生成器方法(async def + AsyncGenerator 返回类型):用于实现 流式问答(Streaming Q&A)功能。
AsyncGenerator[Dict[str, Any], None]:
- 这是一个异步生成器(asynchronous generator)。
- 每次 yield 会产出一个 Dict[str, Any] 类型的数据块(例如:{“content”: “你好”, “type”: “text”})。
- None 表示这个生成器不接受外部 .send() 的值(标准用法)。
核心代码:
# 4. 异步流式调用Agent(astream:流式返回,每次返回一小段答案)
async for token, metadata in self.agent.astream(
input=agent_input,
config=config_dict,
stream_mode="messages", # 流式模式:按消息片段返回
):
# 从metadata里拿当前Agent运行的步骤(比如在思考、在调用工具)
node_name = metadata.get('langgraph_node', 'unknown') if isinstance(metadata, dict) else 'unknown'
# 拿到当前返回的消息类型(是AI的回答片段,还是工具调用)
message_type = type(token).__name__
# 只处理AI的消息(跳过工具返回、系统消息等无关内容)
if message_type in ("AIMessage", "AIMessageChunk"):
# 从消息里提取文本内容块(AI的回答片段)
content_blocks = getattr(token, 'content_blocks', None)
if content_blocks and isinstance(content_blocks, list):
for block in content_blocks:
# 只提取文本类型的内容(跳过工具调用等其他类型)
if isinstance(block, dict) and block.get('type') == 'text':
text_content = block.get('text', '')
# 把片段返回给前端(前端拿到后,就能实时显示,实现“边想边说”)
if text_content:
yield {
"type": "content",
"data": text_content,
"node": node_name
}
- yield的作用:每次返回一小段文本,前端拿到后立刻显示,比如AI打“你”,前端就显示“你”,再打“好”,前端就显示“你好”,体验更流畅。
- 四种返回类型:content(答案片段)、tool_call(工具调用记录)、complete(结束信号)、error(错误信息),前端能根据类型做不同显示。
if content_blocks and isinstance(content_blocks, list):
得到的block:文本类型:{ “type”: “text”, “text”: “你好” }
工具调用:{
“type”: “tool_use”,
“id”: “toolu_xxx”,
“name”: “search”, # 工具名:搜索/查询/计算器等
“input”: {
“query”: “FastAPI 是什么”
}
} if text_content: yield { "type": "content", "data": text_content, "node": node_name }
重新打包后推给前端的 chunk:
chunk = {
“type”: “content”,
“data”: “AI回答的一小段文字”, # 👈 这里没有 text!是 data!
“node”: “…”
}
获取会话历史方法(get_session_history)—— 让AI记住之前的聊天
def get_session_history(self, session_id: str) -> list:
核心代码:
config = {"configurable": {"thread_id": session_id}} #用 thread_id = session_id 来区分用户
# 获取该 thread 的最新检查点
checkpoint_tuple = self.checkpointer.get(config) #checkpointer = LangGraph 的会话存储器
if not checkpoint_tuple:
logger.info(f"获取会话历史: {session_id}, 消息数量: 0")
return []
# checkpoint_tuple 可能是命名元组(新版本)或普通元组(旧版本),安全地提取 checkpoint(兼容处理)
# 若checkpoint_tuple为命名元组
if hasattr(checkpoint_tuple, 'checkpoint'):
checkpoint_data = checkpoint_tuple.checkpoint # type: ignore
else:
# 如果是普通元组,第一个元素是 checkpoint
checkpoint_data = checkpoint_tuple[0] if checkpoint_tuple else {}
# 从检查点中提取消息
# LangGraph 将对话消息存储在 channel_values["messages"] 中。
messages = checkpoint_data.get("channel_values", {}).get("messages", [])
# 转换为前端需要的格式
history = []
for msg in messages:
# 跳过系统消息
if isinstance(msg, SystemMessage):
continue
role = "user" if isinstance(msg, HumanMessage) else "assistant" #如果这条消息是用户发的 → 标记为 user; 否则(是 AI 发的)→ 标记为 assistant。
"""
{
把 LangChain 消息 → 前端通用格式:
"role": "user/assistant(二选一)",
"content": "消息内容",
"timestamp": "时间"
}
"""
content = msg.content if hasattr(msg, 'content') else str(msg)
# 提取时间戳(如果有的话)
timestamp = getattr(msg, 'timestamp', None)
if timestamp:
history.append({
"role": role,
"content": content,
"timestamp": timestamp
})
else:
from datetime import datetime
history.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
logger.info(f"获取会话历史: {session_id}, 消息数量: {len(history)}")
return history
config = {"configurable": {"thread_id": session_id}} #用 thread_id = session_id 来区分用户
LangGraph 使用 thread_id 来区分不同用户的会话。这里将传入的 session_id 作为 thread_id,用于后续查询。
除了 thread_id,还可以添加其他可配置字段,例如:
{
“configurable”: {
“thread_id”: “user_123”,
“model_name”: “gpt-4”, # 切换模型
“user_locale”: “zh-CN” # 本地化设置
}
}
checkpoint_tuple = self.checkpointer.get(config)self.checkpointer 是 LangGraph 提供的 MemorySaver 类型的检查点存储器(用于持久化或缓存对话状态)。调用 .get(config) 获取该 thread_id 对应的最新对话状态(即 checkpoint)。
checkpointer 不认识 “session_id”,它只认 “thread_id”。
代码中把 session_id 当作 thread_id 传给了 Agent!
从此它们就永久绑定了。
# 1. 你创建了内存存储器
self.checkpointer = MemorySaver()
# 2. 你把 session_id 塞进 config 里,命名为 thread_id
config_dict = {
"configurable": {
"thread_id": session_id # <--- 核心绑定行
}
}
LangGraph 的 checkpointer 存储规则是固定死的: 每一段对话 = 一个 thread
这是因为将config_dict 传给Agent时
await self.agent.ainvoke(
input=...,
config=config_dict # 👈 带进去
)
Agent 会把 config 交给 checkpointer()
LangGraph 内部执行:
checkpointer.save(
thread_id=config["configurable"]["thread_id"],
checkpoint=当前对话状态
)
MemorySaver 里保存:
thread_id = user_001
messages = [系统消息, 用户消息, AI消息…]
下一次相同 session_id 进来 → 自动恢复历史
await agent.ainvoke(..., config={"thread_id": "user_001"})
LangGraph 内部自动做:
# 自动读取
历史消息 = checkpointer.get(thread_id="user_001")
# 自动拼接到当前对话
state["messages"] = 历史消息 + 新问题
清空会话历史方法(clear_session)—— 忘记之前的聊天
def clear_session(self, session_id: str) -> bool:
"""
清空会话历史(从 MemorySaver checkpointer 中删除)
Args:
session_id: 会话ID(即 thread_id)
Returns:
bool: 是否成功
"""
try:
# 核心代码:删除这个session_id对应的所有聊天记录
self.checkpointer.delete_thread(session_id)
logger.info(f"已清除会话历史: {session_id}")
return True # 成功返回True
except Exception as e:
logger.error(f"清空会话历史失败: {session_id}, 错误: {e}")
return False # 失败返回False
- 核心作用:比如用户想“重新开始聊天”,调用这个方法,就能删除之前的所有聊天记录,AI就忘了之前的对话。
资源清理方法
async def cleanup(self):
"""清理资源"""
try:
logger.info("清理 RAG Agent 服务资源...")
# MCP客户端由全局管理器统一管理,不用我们手动清理,省事儿
logger.info("RAG Agent 服务资源已清理")
except Exception as e:
logger.error(f"清理资源失败: {e}")
程序结束时调用这个方法,释放占用的资源(比如模型连接、网络连接),避免资源浪费,属于“规范操作”,不用我们手动管太多。
AIOps Agent
核心逻辑:event_generator() 异步生成器
async def event_generator():
try:
async for event in aiops_service.diagnose(session_id=session_id):
yield {
"event": "message",
"data": json.dumps(event, ensure_ascii=False)
}
if event.get("type") in ["complete", "error"]:
break
except Exception as e:
# 异常处理:发送 error 事件
AIOpsService
AIOpsService为基于 LangGraph 的通用 Plan-Execute-Replan(规划-执行-重规划)智能代理服务,专为 AIOps(智能运维)场景设计。它使用状态图(StateGraph)构建一个可循环、可中断、可恢复的推理工作流,并通过异步生成器(AsyncGenerator)以流式方式返回每一步的执行结果。
1.状态定义
class PlanExecuteState(TypedDict):
input: str # 用户原始输入
plan: List[str] # 待执行的步骤列表(会逐步减少)
past_steps: List[Tuple[str, Any]] # 已执行的步骤和结果
response: str # 最终生成的回答(为空表示未完成)
所有节点共享这个状态,通过修改它来传递信息。
2.工作流构建:_build_graph()
创建图并添加节点:
workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("replanner", replanner)
- planner, executor, replanner 是外部定义的函数(或 Runnable),接收 state 并返回更新后的 state。
设置执行顺序:planner → executor → replanner → (条件判断) → executor 或 END
条件边逻辑(关键!):
def should_continue(state: PlanExecuteState) -> str:
if state.get("response"): # 已有最终答案 → 结束
return END
if state.get("plan"): # 还有步骤 → 继续执行
return NODE_EXECUTOR
return END # 计划空但无答案 → 强制结束(应由 replanner 生成 response)
3.检查点(Checkpointing)
self.checkpointer = MemorySaver()
compiled_graph = workflow.compile(checkpointer=self.checkpointer)
- 使用 MemorySaver 在内存中保存每个 thread_id(即 session_id)的状态。
- 支持会话恢复:即使服务重启(若用持久化存储),也能从中断处继续。
流式执行:execute() 方法
execute 方法是整个 Plan-Execute-Replan 智能代理服务的核心执行入口。它的作用是:接收一个用户任务描述,启动一个可流式输出、支持多步推理与工具调用的智能工作流,并以异步生成器(AsyncGenerator)的形式逐步返回每一步的执行状态和结果。
执行流程详解
- 记录日志 & 初始化状态
initial_state: PlanExecuteState = {
"input": user_input, # 用户原始问题
"plan": [], # 初始计划为空(由 planner 填充)
"past_steps": [], # 已执行步骤(空)
"response": "" # 最终答案(空)
}
- 这个 initial_state 是 LangGraph 状态机的起点。
- 虽然 plan 为空,但第一个节点 planner 会根据 input 生成具体计划。
- 配置会话上下文(用于状态持久化)
config_dict = {
"configurable": {
"thread_id": session_id # 关键!用于区分不同用户的会话
}
}
- 启动流式执行(核心!)
async for event in self.graph.astream(
input=initial_state,
config=config_dict,
stream_mode="updates"
):
- astream(…, stream_mode=“updates”):
表示每当图中某个节点完成并更新状态时,就立即 yield 一个事件。 - 返回的 event 是一个字典,格式如:{“planner”: {“plan”: […], …}
- 解析节点输出并转换为前端友好事件
for node_name, node_output in event.items():
if node_name == NODE_PLANNER:
yield self._format_planner_event(node_output)
elif node_name == NODE_EXECUTOR:
yield self._format_executor_event(node_output)
elif node_name == NODE_REPLANNER:
yield self._format_replanner_event(node_output)
每个节点的原始输出(如 {“plan”: [“查日志”, “查指标”]})被转换成标准化 SSE 事件:
planner → {“type”: “plan”, “plan”: […]}
executor → {“type”: “step_complete”, “current_step”: “查日志”, …}
replanner → 可能是 {“type”: “report”, “report”: “…”} 或状态更新
💡 前端无需理解内部状态,只需监听 type 字段即可渲染对应 UI。
| 节点 | 输出状态 | 转换后的 SSE 事件 |
|---|---|---|
planner |
{"plan": ["查告警", "查日志"]} |
{"type": "plan", "plan": [...]} |
executor |
{"past_steps": [("查告警", {...})], "plan": ["查日志"]} |
{"type": "step_complete", "current_step": "查告警", ...} |
replanner |
{"response": "# 告警报告\n..."} |
{"type": "report", "report": "..."} |
- 获取最终结果并发送完成事件
final_state = self.graph.get_state(config_dict)
final_response = final_state.values.get("response", "")
yield {
"type": "complete",
"stage": "complete",
"message": "任务执行完成",
"response": final_response
}
- 即使工作流结束,也要显式发送 complete 事件,通知前端“可以关闭连接了”。
- response 字段包含最终生成的完整答案(如 Markdown 报告)。
关键机制说明
| 机制 | 说明 |
|---|---|
stream_mode="updates" |
只返回变化的部分(delta),而非完整状态,减少网络开销 |
thread_id 会话隔离 |
不同 session_id 的任务互不影响,状态独立存储 |
| 节点输出格式化 | 将内部状态转换为前端可消费的标准化事件,解耦前后端 |
| 最终状态兜底 | 即使流式过程中漏掉某些事件,也能在最后获取完整结果 |
AIOps 专用接口:diagnose()
这是一个兼容层,将通用 execute() 适配为 AIOps 诊断场景:
关键设计:
- 固定任务指令:
- 使用 dedent(“”“…”“”) 定义详细的诊断任务要求
- 强制输出 Markdown 格式报告
- 强调“严禁编造”,必须基于真实工具数据
- 事件格式转换:
- 将通用 execute() 返回的 {“type”: “complete”, “response”: “…”}
转换为 AIOps API 要求的格式:
{
"type": "complete",
"stage": "diagnosis_complete",
"diagnosis": { "report": "..." }
}
当调用 diagnose(session_id=“s123”) 时:
- 内部调用
execute(“诊断当前系统是否存在告警…”, session_id=“s123”) - 流式事件透传
收到 {“type”: “plan”, “plan”: [“获取活跃告警”, “查询服务日志”]} → 直接 yield
收到 {“type”: “step_complete”, “current_step”: “获取活跃告警”} → 直接 yield
收到 {“type”: “report”, “report”: “# 告警分析报告\n…”} → 直接 yield - 完成事件转换
当 execute 返回 {“type”: “complete”, “response”: “# 告警分析报告…”} 时:
→ 转换为
{
"type": "complete",
"stage": "diagnosis_complete",
"diagnosis": {
"status": "completed",
"report": "# 告警分析报告..."
}
}
- 前端消费
JavaScript 监听到 stage === “diagnosis_complete” 后:
displayReport(event.data.diagnosis.report); // 直接渲染 Markdown
补充:SSE事件
SSE = Server-Sent Events(服务器推送事件)
你可以把它理解成:
后端 → 前端 的 “实时消息通道”
它的特点:
- 单向:只有后端主动发给前端
- 流式:像水流一样,一段一段发
- 格式固定:前端能直接识别
- 比 WebSocket 轻量、简单
SSE 长什么样?
后端发给前端的真实数据格式是这样的:
data: {"type": "plan", "plan": ["查告警", "查日志"]}\n\n
data: {"type": "step_complete", "current_step": "查告警"}\n\n
data: {"type": "report", "report": "# 告警报告..."}\n\n
data: {"type": "complete"}\n\n
前端只要监听这个流,就能实时收到每一步更新。
为什么要把节点输出转成 SSE 事件?
原因:
- 前端只能识别 SSE 格式,不能直接识别 AI 节点输出
AI 节点输出是这样的:
{"plan": ["查告警", "查日志"]}
但前端 SSE 必须是:
data: {"type":"plan", "plan":["查告警","查日志"]}\n\n
不转,前端收不到、解析不了。
plan:AI 生成了计划
step_complete:完成一步
report:生成报告
complete:全部结束
前端要根据不同 type 做不同 UI:
显示计划卡片
显示执行步骤动画
渲染最终报告
关闭加载动画
-
流式执行必须用 SSE 才能实时推送
如果不用 SSE:
前端要一直等,直到 AI 全部做完才返回
用户体验极差:一直转圈,不知道 AI 在干嘛
用 SSE:
前端实时接收每一步
可以做打字机效果、步骤动画、思考过程展示 -
前端需要知道这一步是什么类型的事件
LangGraph知识补充——状态自动更新机制
一、核心机制名称
LangGraph 中这一核心机制称为:状态自动更新机制(也可结合功能表述为「基于注解的状态合并机制」)。
核心作用:自动根据 AgentState 定义的规则,处理节点返回的状态数据,无需手动编写状态合并逻辑(如消息追加)。
二、机制依赖的核心组件
该机制能运行,依赖 3 个关键组件的配合(缺一不可):
-
TypedDict(AgentState):定义 Agent 状态的「数据结构」,明确状态中包含哪些字段(如 messages)、每个字段的基础类型,相当于给状态“定规矩”。
-
Annotated 类型注解:给状态字段“贴规则标签”,不改变字段本身类型,仅向 LangGraph 传递「如何处理该字段」的附加信息(metadata)。
-
状态处理函数(如 add_messages):LangGraph 内置(或自定义)的处理逻辑,是 Annotated 传递的“规则具体实现”(如 add_messages 实现消息追加)。
三、机制核心原理
核心逻辑:节点 return 触发状态更新 → LangGraph 读取 AgentState 注解 → 执行对应处理逻辑 → 更新状态
关键结论:
-
return 是「触发信号」:只要在节点函数中 return 一个字典(对应状态字段的更新内容),就会触发 LangGraph 的状态更新机制。
-
Annotated 是「规则指引」:告诉 LangGraph,某个字段需要用特定方式处理(而非直接覆盖)。
-
处理函数(如 add_messages)是「执行逻辑」:LangGraph 按照注解指引,调用对应函数,完成状态的更新(如消息追加)。
四、具体运行流程(以 messages 字段为例)
-
定义状态结构(AgentState):通过 TypedDict 声明 messages 字段,并用 Annotated 绑定 add_messages 规则。
class AgentState(TypedDict): """Agent 状态""" messages: Annotated[Sequence[BaseMessage], add_messages] -
节点返回更新数据:在节点函数中 return 包含 messages 字段的字典(新消息)。
return {"messages": [AIMessage("你好")]} -
LangGraph 自动触发机制:检测到 return 信号,读取 AgentState 中 messages 字段的 Annotated 注解。
-
执行追加逻辑:LangGraph 调用 add_messages 函数,将新消息追加到旧消息列表中,而非直接覆盖。
new_state["messages"] = add_messages(current_state["messages"], [AIMessage("你好")]) -
完成状态更新:将合并后的消息列表赋值给新状态,Agent 状态更新完成,保留历史对话记忆。
方法定义:def trim_messages_middleware(state: AgentState) -> dict[str, Any] | None:
中间件的核心作用是 修改 / 补充 Agent 状态,而 Agent 状态本质是「键值对字典」,中间件支持「可选修改」—— 如果不需要修改任何状态(比如消息数量没超阈值,无需修剪),直接返回 None 即可。
五、关键口诀(便于记忆)
-
return → 触发更新(告诉框架“要更状态”)
-
Annotated → 指引规则(告诉框架“怎么更”)
-
add_messages → 执行规则(实现“追加不覆盖”)
六、核心价值
无需手动编写状态合并代码(如手动拼接消息列表),简化 Agent 开发流程,让 Agent 自动拥有“记忆能力”(如多轮对话历史保留),提升开发效率。
RAG完整流程
整体架构
RagAgentService
├── 模型:ChatQwen(阿里千问)
├── 工具:retrieve_knowledge(RAG 核心)、get_current_time、MCP 工具
├── 记忆:MemorySaver(会话持久化)
└── 执行器:LangGraph create_agent (ReAct 智能代理)
完整详细执行流程(从用户提问 → 回答返回)
【步骤 0:服务启动】
关键代码
rag_agent_service = RagAgentService(streaming=True)
做了什么
- 加载配置
- 初始化 ChatQwen 模型
- 加载基础工具
retrieve_knowledge、get_current_time - 初始化内存存储器
MemorySaver
【步骤 1:用户发起请求】
用户调用(代码入口)
answer = await rag_agent_service.query(
question="公司的请假政策是什么?",
session_id="user_001"
)
【步骤 2:进入 query 方法】
关键代码
async def query(self, question: str, session_id: str) -> str:
【步骤 3:异步初始化 Agent】
关键代码
await self._initialize_agent()
内部执行 4 件事
-
加载 MCP 远程工具
mcp_client = await get_mcp_client_with_retry() mcp_tools = await mcp_client.get_tools() -
合并所有工具
all_tools = self.tools + self.mcp_tools工具包含:
retrieve_knowledge(RAG 检索)get_current_time- 各种 MCP 云工具
-
创建 LangGraph Agent
self.agent = create_agent( self.model, tools=all_tools, checkpointer=self.checkpointer ) -
标记 Agent 初始化完成
【步骤 4:构建消息列表】
关键代码
messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=question)
]
消息内容
[
SystemMessage("你是专业AI助手..."),
HumanMessage("公司的请假政策是什么?")
]
【步骤 5:包装成 Agent 要求的输入格式】
关键代码
agent_input = {"messages": messages}
为什么要这样?
因为你的 AgentState 是:
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], add_messages]
Agent 只接受 key = “messages” 的字典输入
【步骤 6:配置会话 ID】
关键代码
config_dict = {
"configurable": {
"thread_id": session_id # user_001
}
}
作用:让对话历史持久化,多轮对话保持上下文
【步骤 7:执行 Agent → 进入 LangGraph 自动运行】
关键代码
result = await self.agent.ainvoke(
input=agent_input,
config=config_dict
)
🔥【步骤 8:LangGraph 内部自动执行 RAG 核心流程】
8.1 Agent 读取初始状态
state = {
"messages": [
SystemMessage(...),
HumanMessage("公司的请假政策是什么?")
]
}
8.2 LLM(千问)思考
LLM 看到问题 + 工具列表,判断:
必须调用
retrieve_knowledge工具
8.3 LLM 输出工具调用指令
{
"name": "retrieve_knowledge",
"parameters": {
"query": "公司的请假政策是什么?"
}
}
8.4 LangGraph 自动执行 RAG 检索(真正的 RAG)
框架自动调用:
knowledge = retrieve_knowledge("公司的请假政策是什么?")
retrieve_knowledge 内部标准 RAG 步骤
- 用户问题向量化
- 向量库相似度检索
- 返回最相关的文档片段
- 拼接成上下文
返回示例:
知识库内容:
1. 事假需提前1天申请
2. 病假需提供医院证明
3. 年假按工龄计算
8.5 工具结果自动追加到消息列表
state["messages"].append(
ToolMessage(content=知识库内容)
)
8.6 LLM 基于检索结果生成回答
LLM 读取工具返回的知识 → 生成自然语言回答。
8.7 回答追加到消息列表
state["messages"].append(AIMessage("根据公司政策..."))
【步骤 9:Agent 执行完毕,返回结果】
返回结构
result = {
"messages": [
SystemMessage(...),
HumanMessage(...),
ToolMessage(...),
AIMessage(...) # 最终回答
]
}
【步骤 10:提取最终答案】
关键代码
messages_result = result.get("messages", [])
last_message = messages_result[-1]
answer = last_message.content
【步骤 11:返回给用户】
return answer
完整 RAG 流程图(文字版)
1. 用户提问
↓
2. 初始化 Agent → 加载所有工具(含 RAG 检索)
↓
3. 构建消息:[系统提示, 用户问题]
↓
4. 包装为:{"messages": 消息列表}
↓
5. 传入 LangGraph Agent
↓
=================================
【LangGraph 自动执行 RAG】
6. LLM 判断:需要查知识库
7. 自动调用 retrieve_knowledge
8. 执行:向量化 → 检索向量库 → 返回知识
9. 工具结果加入对话
10. LLM 生成最终回答
=================================
↓
11. 提取最后一条消息
12. 返回答案给用户
RAG检索阶段
RAG检索阶段通过自定义tool来实现
@tool(response_format="content_and_artifact")
def retrieve_knowledge(query: str) -> Tuple[str, List[Document]]:
"""从知识库中检索相关信息来回答问题
当用户的问题涉及专业知识、文档内容或需要参考资料时,使用此工具。
Args:
query: 用户的问题或查询
Returns:
Tuple[str, List[Document]]: (格式化的上下文文本, 原始文档列表)
"""
try:
logger.info(f"知识检索工具被调用: query='{query}'")
# 从向量存储中检索相关文档
vector_store = vector_store_manager.get_vector_store()
retriever = vector_store.as_retriever(
search_kwargs={"k": config.rag_top_k}
)
docs = retriever.invoke(query)
if not docs:
logger.warning("未检索到相关文档")
return "没有找到相关信息。", []
# 格式化文档为上下文
context = format_docs(docs)
logger.info(f"检索到 {len(docs)} 个相关文档")
return context, docs
except Exception as e:
logger.error(f"知识检索工具调用失败: {e}")
return f"检索知识时发生错误: {str(e)}", []
def format_docs(docs: List[Document]) -> str:
"""
格式化文档列表为上下文文本
Args:
docs: 文档列表
Returns:
str: 格式化的上下文文本
"""
formatted_parts = []
for i, doc in enumerate(docs, 1):
# 提取元数据
metadata = doc.metadata
source = metadata.get("_file_name", "未知来源")
# 提取标题信息 (如果有)
headers = []
for key in ["h1", "h2", "h3"]:
if key in metadata and metadata[key]:
headers.append(metadata[key])
header_str = " > ".join(headers) if headers else ""
# 构建格式化文本
formatted = f"【参考资料 {i}】"
if header_str:
formatted += f"\n标题: {header_str}"
formatted += f"\n来源: {source}"
formatted += f"\n内容:\n{doc.page_content}\n"
formatted_parts.append(formatted)
return "\n".join(formatted_parts)
检索关键代码:
vector_store = vector_store_manager.get_vector_store()
retriever = vector_store.as_retriever(
search_kwargs={"k": config.rag_top_k}
)
docs = retriever.invoke(query)
- vector_store_manager.get_vector_store():获取已构建好的向量数据库(如 FAISS、Chroma 等),其中文档已被嵌入(embedding)。
- as_retriever(…):将向量库转换为 LangChain 的 Retriever 对象。
- search_kwargs={“k”: config.rag_top_k}:控制返回最相关的 k 个文档(例如 k=3 或 5),这是 RAG 效果的关键超参数。
- retriever.invoke(query):用用户 query 去检索,返回 List[Document]。
def format_docs(docs: List[Document]) -> str:
作用是将从向量数据库(如 Milvus)检索到的多个文档(List[Document])格式化为一段结构清晰、便于大语言模型(LLM)理解的上下文文本。
在 LangChain 的 Document 对象中:
page_content:存的是真正的文本内容(比如一段文章、一个段落)。
metadata:存的是关于这段文本的附加信息,比如:
来自哪个文件?
在文档中的页码?
所属的章节标题(h1/h2/h3)?
创建时间、作者等?
✅ 简单说:metadata 就是“描述文档的文档”,用来提供上下文背景。
metadata = doc.metadata
source = metadata.get(“_file_name”, “未知来源”)
第 1 步:获取 metadata 字典
metadata = doc.metadata
假设 doc 是一个 Document 对象,它的 metadata 可能长这样:
{
"_file_name": "技术白皮书.pdf",
"h1": "第三章 系统架构",
"h2": "3.2 向量数据库设计",
"page": 15,
"chunk_id": "abc123"
}
第 2 步:获取来源文件名
source = metadata.get("_file_name", "未知来源")
这里尝试从 metadata 中找 _file_name 字段。
如果有 → source = “技术白皮书.pdf”
如果没有 → source = “未知来源”(避免程序崩溃)
💡 为什么字段叫 _file_name?
这是你在文档预处理阶段自己定义的(比如用 PyPDFLoader 或 UnstructuredLoader 解析 PDF 时,把原始文件名存进去)。下划线前缀可能是为了区分系统字段。
第 3 步:提取标题层级(h1/h2/h3)
headers = []
for key in ["h1", "h2", "h3"]:
if key in metadata and metadata[key]:
headers.append(metadata[key])
举个例子:
假设 metadata 是:
{
"h1": "人工智能概述",
"h2": "2.1 大模型发展",
"h3": "2.1.3 开源生态"
}
循环过程:
- key = “h1” → 存在且非空 → headers = [“人工智能概述”]
- key = “h2” → 存在 → headers = [“人工智能概述”, “2.1 大模型发展”]
- key = “h3” → 存在 → headers = [“人工智能概述”, “2.1 大模型发展”, “2.1.3 开源生态”]
最终:
header_str = "人工智能概述 > 2.1 大模型发展 > 2.1.3 开源生态"
✅ 这样 LLM 就知道这段内容属于哪个逻辑位置,而不是孤立的一段文字。
MCP
CLS server
实现了一个 腾讯云 CLS(Cloud Log Service)的本地模拟 MCP Server,用于为 AIOps 智能体提供日志查询能力。下面我将逐层解析其结构、功能和设计意图。
🧩 一、整体目标
目的:让 LLM 驱动的 AIOps Agent 能通过标准 MCP 协议,像调用函数一样查询日志,而无需关心底层日志系统细节。
- MCP = Model Context Protocol,一种标准化的工具调用协议。
- CLS = 腾讯云日志服务(Cloud Log Service),真实系统中用于收集/检索日志。
- 本地模拟:当前返回的是 mock 数据,但接口完全兼容真实场景。
🏗️ 二、核心架构
mcp = FastMCP("CLS") # 创建一个名为 "CLS" 的 MCP 服务
- 使用
fastmcp库快速构建 MCP 服务。 - 服务名称为
"CLS",客户端通过此名称识别该服务。 - 最终通过 HTTP 暴露在
http://127.0.0.1:8003/mcp。
🔧 三、提供的工具(Tools)
共提供 5 个工具,全部用 @mcp.tool() 装饰器注册:
| 工具名 | 功能 | 典型用途 |
|---|---|---|
get_current_timestamp() |
获取当前毫秒时间戳 | 构建日志查询的时间范围 |
get_region_code_by_name(region_name) |
地区名 → 腾讯云 region code | 定位日志所在区域(如“北京”→ap-beijing) |
get_topic_info_by_name(topic_name, region_code) |
精确查找日志主题信息 | 获取 topic_id(已基本被下一条替代) |
search_topic_by_service_name(service_name, ...) |
核心工具! 根据服务名模糊搜索日志主题 | 快速定位目标服务的日志入口 |
search_log(topic_id, start_time, end_time, ...) |
核心工具! 查询具体日志内容 | 获取错误日志、分析行为 |
✅ 这是一个完整的 “服务 → 主题 → 日志” 查询链路。
🔍 四、关键工具详解
search_topic_by_service_name(最常用)
search_topic_by_service_name(service_name="data-sync")
- 支持模糊匹配:输入
"sync"可匹配"data-sync-service"。 - 返回多个主题:如应用日志 + 错误日志。
- 输出示例:
{ "total": 2, "topics": [ { "topic_id": "topic-001", "service_name": "data-sync-service", ... }, { "topic_id": "topic-002", "service_name": "data-sync-service", ... } ] }
💡 Agent 通常先调此工具,拿到
topic_id后再查日志。
2. search_log(日志检索核心)
search_log(
topic_id="topic-001",
start_time=1712422800000, # 15分钟前
end_time=1712423700000 # 当前时间
)
- 时间必须是毫秒时间戳(int),强调不能传字符串。
- 动态生成日志:对
topic-001,每分钟生成一条"正在同步元数据……"。 - 模拟真实场景:可用于检测“重复日志”或“死循环”。
⚠️ 其他
topic_id返回错误,确保 Agent 必须先正确获取topic_id。
3. get_current_timestamp()
- 返回
int(datetime.now().timestamp() * 1000) - 作用:避免 Agent 自己计算时间(LLM 不擅长时间运算)。
- 典型用法:
now = get_current_timestamp() fifteen_min_ago = now - 15 * 60 * 1000 search_log(start_time=fifteen_min_ago, end_time=now)
🛠️ 五、辅助功能
@log_tool_call装饰器
- 自动记录每次工具调用的:
- 方法名
- 输入参数(格式化为 JSON)
- 返回结果摘要(避免日志爆炸)
- 成功/失败状态
- 极大方便调试:开发时可清晰看到 Agent 如何使用工具。
- 时间处理工具
parse_time_or_default: 安全解析时间字符串。generate_time_series: 生成偏移后的时间(未在工具中直接使用,但为扩展预留)。
🌐 六、服务启动
if __name__ == "__main__":
mcp.run(
transport="streamable-http",
host="127.0.0.1",
port=8003,
path="/mcp"
)
- 启动一个 HTTP 服务器,监听 8003 端口。
- MCP 客户端通过
POST /mcp/call调用工具。 - 符合 MCP over HTTP 标准。
📦 七、Mock 数据设计
所有数据都是硬编码的模拟数据:
mock_topics = [
{
"topic_id": "topic-001",
"service_name": "data-sync-service",
"region_code": "ap-beijing",
...
}
]
- 优点:无需依赖真实腾讯云账号,本地即可运行完整 AIOps 流程。
- 未来替换:只需将内部逻辑改为调用 腾讯云 CLS API,Agent 无需改动。
✅ 八、典型使用流程(Agent 视角)
- 用户问:“data-sync-service 为什么变慢?”
- Agent 调用
search_topic_by_service_name("data-sync")→ 得到topic-001 - Agent 调用
get_current_timestamp()→ 得到当前时间戳 - Agent 计算 15 分钟前的时间戳
- Agent 调用
search_log(topic_id="topic-001", start_time=..., end_time=...):在指定时间范围内,从指定的日志主题(topic)中检索具体的日志内容。 - 返回日志:全是
"正在同步元数据……"→ 判断可能陷入死循环
🎯 总结:这段代码的价值
| 方面 | 说明 |
|---|---|
| 标准化 | 通过 MCP 协议,将日志能力暴露为 LLM 友好的工具 |
| 解耦 | Agent 不依赖腾讯云 SDK,只依赖 MCP 接口 |
| 可测试 | 本地 mock 数据,无需云环境即可验证 Agent 逻辑 |
| 可扩展 | 未来轻松替换为真实 CLS API |
| 可观测 | 详细日志记录,便于调试 Agent 行为 |
💡 本质:这是一个 “适配器”(Adapter) —— 将腾讯云 CLS(或任何日志系统)的能力,包装成 LLM Agent 能理解的“工具”。
这段代码实现了一个 本地模拟的智能运维监控 MCP Server(Monitor MCP Server),用于支持 AIOps(智能运维)Agent 在故障排查时查询服务的运行状态,比如 CPU、内存使用情况等。
Monitor server
🧠 一、整体目标
为运维 Agent 提供一个“假但逼真”的监控数据接口,让 Agent 能在没有真实监控系统(如 Prometheus、Zabbix、腾讯云 Monitor)的情况下进行开发和测试。
🏗️ 二、架构概览
| 模块 | 功能 |
|---|---|
FastMCP("Monitor") |
基于 fastmcp 框架构建的 MCP(Model Context Protocol)服务 |
@mcp.tool() |
将函数注册为可被 Agent 调用的工具 |
@log_tool_call |
装饰器,记录每次工具调用的参数、结果、错误(便于调试) |
query_cpu_metrics |
模拟查询 CPU 使用率 |
query_memory_metrics |
模拟查询内存使用率 |
| Mock 数据生成逻辑 | 动态生成随时间增长的 CPU/内存数据,模拟“服务逐渐异常” |
🔍 三、核心功能详解
- 装饰器
@log_tool_call
- 作用:自动记录每个工具调用的:
- 方法名
- 输入参数(格式化为 JSON)
- 返回结果摘要(避免日志爆炸)
- 异常信息
- 价值:方便调试 Agent 的调用行为,知道“它问了什么、得到了什么”。
- 时间处理辅助函数
parse_time_or_default
- 如果用户传了
start_time="2026-04-06 17:00:00",就解析它。 - 如果没传,默认:
start_time= 当前时间 - 1 小时end_time= 当前时间
✅ 符合运维场景:用户说“最近有问题”,Agent 自动查“最近1小时”。
generate_time_series
- 用于生成带偏移的时间字符串(当前未直接使用,但为扩展预留)。
- 核心工具:
query_cpu_metrics
📌 功能
模拟返回某个服务(如 "data-sync-service")的 CPU 使用率时间序列数据。
📈 数据生成逻辑(关键!)
- 初始阶段(前3个点):CPU ≈ 10%(正常)
- 后续阶段:CPU 指数级上升,最终接近 95%
- 加随机噪声:±2%,让数据更真实
- 检测突增:如果 max > 80%,触发告警
📤 返回结构示例
{
"service_name": "data-sync-service",
"metric_name": "cpu_usage_percent",
"interval": "1m",
"data_points": [
{"timestamp": "17:00", "value": 10.2},
{"timestamp": "17:01", "value": 10.8},
{"timestamp": "17:02", "value": 18.5},
{"timestamp": "17:03", "value": 27.3},
...
{"timestamp": "17:59", "value": 94.7}
],
"statistics": {
"avg": 45.6,
"max": 94.7,
"min": 10.2,
"spike_detected": true
},
"alert_info": {
"triggered": true,
"threshold": 80.0,
"message": "CPU 使用率持续超过 80% 阈值"
}
}
💡 设计意图:模拟一个 “服务逐渐卡死” 的典型故障场景!
- 核心工具:
query_memory_metrics
📌 功能
类似 CPU,但模拟 内存泄漏或内存压力 场景。
📈 数据生成逻辑
- 初始内存 ≈ 30%
- 逐步线性增长到 85%
- 随机波动 ±1%
- 总内存固定为 8GB
- 告警阈值:70%
📤 返回包含:
used_gb和total_gb(更贴近真实监控)
⚙️ 四、设计亮点
| 特性 | 说明 |
|---|---|
| 逼真的异常模式 | 不是随机数据,而是有趋势的故障模拟(CPU飙升、内存增长) |
| 时间灵活 | 支持指定时间范围,也支持默认“最近1小时” |
| 告警集成 | 直接在返回中包含 alert_info,Agent 可直接判断是否异常 |
| 统计指标丰富 | 包含 avg/max/min/p95/spike 等,满足深度分析 |
| 日志透明 | 所有调用都被详细记录,便于复现问题 |
| 接口标准化 | 参数、返回格式清晰,符合 MCP 规范 |
🧪 五、典型使用场景(Agent 如何用?)
# 场景:用户说“data-sync 服务变慢了”
# Agent 自动执行:
1. search_topic_by_service_name("data-sync") → 得到 topic_id
2. search_log(topic_id, 最近15分钟) → 发现日志重复
3. query_cpu_metrics("data-sync-service") → 发现 CPU 从 10% 升到 95%
4. query_memory_metrics("data-sync-service") → 发现内存也在缓慢增长
# 结论:服务可能陷入死循环 + 内存泄漏!
🚀 六、运行方式
python monitor_mcp_server.py
- 启动一个 HTTP 服务:
http://127.0.0.1:8004/mcp - Agent 通过 MCP 协议调用
query_cpu_metrics等工具
✅ 总结
这段代码是一个 高度仿真的运维监控 Mock 服务,它的核心价值在于:
用可控、可预测、带故障模式的假数据,训练和测试 AIOps Agent 的故障诊断能力。
- 它不是简单返回静态数据,而是模拟真实世界的性能恶化过程。
- 它为 Agent 提供了结构化、带告警、带统计的监控视图。
- 它是构建 “可观测性驱动的智能运维” 的关键基础设施之一。
未来只需将内部逻辑从 “mock 生成” 替换为 “调用真实监控 API”,即可无缝上线生产环境。
以下是一个 CLS Server(日志服务) 与 Monitor Server(监控服务) 协同工作的完整示例,模拟 AIOps Agent 如何通过两者联合分析,精准定位“数据同步服务死循环”故障。
两个MCP协同工作
🎯 场景设定
- 用户反馈:
data-sync服务最近响应变慢,数据未更新。 - Agent 目标:自动诊断根因。
🔁 协同工作流程(含具体调用与返回)
以下是一个 完整的、端到端的 Plan-Execute-Replan (PER) 智能代理诊断流程示例,清晰地展示了从用户提问到最终生成报告的每一步,以及 Monitor MCP 和 CLS MCP 是如何在其中被调用并发挥作用的。
🎯 场景
用户输入: “data-sync 服务最近响应很慢,帮我看看是什么原因。”
🔁 完整执行流程
第 0 步:启动 (execute() 方法被调用)
AIOpsService.execute(user_input="data-sync 服务最近响应很慢...", session_id="sess_123")- 初始化状态:
initial_state = { "input": "data-sync 服务最近响应很慢...", "plan": [], "past_steps": [], "response": "" }
第 1 轮:Planner 节点 - 制定计划
- 内部动作: LLM 根据系统提示词(包含可用工具列表)分析用户问题。
- 思考过程: “要诊断服务慢,我需要先找到它的日志和监控指标。首先得知道它的技术名称。”
- 输出状态:
{ "plan": [ {"name": "search_topic_by_service_name", "args": {"service_name": "data-sync"}} ] } - 流式事件 (SSE):
{ "type": "plan", "stage": "plan_created", "message": "执行计划已制定,共 1 个步骤", "plan": ["根据业务名 'data-sync' 查询其对应的技术服务名和日志主题"] }
第 2 轮:Executor 节点 - 执行第一步 (调用 CLS MCP)
- 内部动作:
executor从plan中取出第一个动作:search_topic_by_service_name(service_name="data-sync")。- 调用 CLS MCP Server。
- CLS MCP Server 返回:
{ "topics": [{ "topic_id": "topic-001", "service_name": "data-sync-service" }] } - 更新状态:
{ "past_steps": [ ( {"name": "search_topic_by_service_name", "args": {"service_name": "data-sync"}}, {"topic_id": "topic-001", "service_name": "data-sync-service"} ) ], "plan": [] # 第一步已完成 } - 流式事件 (SSE):
{ "type": "step_complete", "stage": "step_executed", "message": "步骤执行完成 (1/1)", "current_step": "根据业务名 'data-sync' 查询其对应的技术服务名和日志主题", "result": {"topic_id": "topic-001", "service_name": "data-sync-service"} }
第 3 轮:Replanner 节点 - 评估并重新规划
- 内部动作:
replanner检查past_steps,发现已经获取了关键信息service_name="data-sync-service"。- LLM 决定下一步:需要查询该服务的日志和 CPU 指标。
- 输出新状态:
{ "plan": [ {"name": "search_log", "args": {"topic_id": "topic-001", "start_time": "...", "end_time": "..."}}, {"name": "query_cpu_metrics", "args": {"service_name": "data-sync-service", "start_time": "...", "end_time": "..."}} ] } - 流式事件 (SSE):
{ "type": "status", "stage": "replanner", "message": "评估完成,继续执行剩余步骤", "remaining_steps": 2 }
第 4 轮:Executor 节点 - 执行第二步 (调用 CLS MCP)
- 内部动作:
executor取出新plan的第一个动作:search_log(topic_id="topic-001", ...).- 再次调用 CLS MCP Server 查询最近15分钟日志。
- CLS MCP Server 返回:
{ "logs": [ {"message": "正在同步元数据……"}, {"message": "正在同步元数据……"}, // ... 共15条重复日志 ] } - 更新状态:
{ "past_steps": [ /* previous step */, (log_action, log_result) ], "plan": [{"name": "query_cpu_metrics", ...}] // 剩余1步 } - 流式事件 (SSE):
{ "type": "step_complete", "current_step": "查询 data-sync-service 服务的最近日志", "result": {"logs_count": 15, "sample": "正在同步元数据……"} }
第 5 轮:Executor 节点 - 执行第三步 (调用 Monitor MCP)
- 内部动作:
executor取出最后一个动作:query_cpu_metrics(service_name="data-sync-service", ...).- 调用 Monitor MCP Server。
- Monitor MCP Server 返回:
{ "data_points": [...], "statistics": {"max": 94.8}, "alert_info": {"triggered": true, "message": "CPU 使用率持续超过 80% 阈值"} } - 更新状态:
{ "past_steps": [ /* all 3 steps */ ], "plan": [] // 所有计划步骤完成 } - 流式事件 (SSE):
{ "type": "step_complete", "current_step": "查询 data-sync-service 服务的 CPU 使用率", "result": {"max_cpu": "94.8%", "alert_triggered": true} }
第 6 轮:Replanner 节点 - 生成最终报告
- 内部动作:
replanner发现plan为空,且past_steps包含了充分的证据(重复日志 + CPU 飙升)。- LLM 综合所有工具返回的真实数据,生成最终的 Markdown 诊断报告。
- 输出最终状态:
{ "response": "# 告警分析报告\n\n---\n\n## 📋 活跃告警清单\n\n| 告警名称 | 级别 | 目标服务 | ... |\n|----------|------|----------|-----|\n| HighCpuUsage | Critical | data-sync-service | ... |\n\n---\n\n## 🔍 告警根因分析...\n\n### 根因结论\n服务因代码逻辑缺陷陷入死循环,导致 CPU 资源耗尽。\n\n..." } - 流式事件 (SSE):
{ "type": "report", "stage": "final_report", "message": "最终报告已生成", "report": "# 告警分析报告\n..." }
第 7 步:结束 (execute() 方法收尾)
execute()方法调用self.graph.get_state(...)获取最终状态。- 发送完成事件:
{ "type": "complete", "stage": "complete", "message": "任务执行完成", "response": "# 告警分析报告\n..." // 完整的报告内容 } - 流式生成器结束。
🌐 总结:MCP 工具在整个流程中的作用
| 步骤 | 调用的 MCP 工具 | 作用 | 返回的关键信息 |
|---|---|---|---|
| 第2轮 | search_topic_by_service_name (CLS) |
服务发现 | 将业务名 data-sync 映射为技术名 data-sync-service 和日志ID |
| 第4轮 | search_log (CLS) |
日志取证 | 发现服务在无限循环打印“正在同步元数据” |
| 第5轮 | query_cpu_metrics (Monitor) |
指标验证 | 确认 CPU 使用率飙升至 94.8%,触发告警 |
💡 核心价值:整个 PER 流程通过 多次、有序地调用不同的 MCP 工具,像一个经验丰富的 SRE 一样,先定位目标,再收集证据,最后交叉验证并得出结论。LLM 本身不产生任何运维数据,它只是协调者和分析师,而 MCP 工具才是提供真实世界“感知”能力的感官。
🌐 真实云环境中的协同(腾讯云为例)
根据搜索结果,此流程完全符合腾讯云最佳实践:
“CLS 与 Cloud Monitor 深度集成,用户可将 Nginx 错误日志与服务器 CPU 负载波动关联,快速定位性能瓶颈。”
— 腾讯云官方文档
在真实场景中:
- CLS 提供日志检索与关键词告警(如检测到
"ERROR"自动触发) - Cloud Monitor 提供指标看板与阈值告警(如 CPU > 80% 持续 5 分钟)
- 联动效果:当 Monitor 检测到 CPU 飙升时,自动关联 CLS 中对应时间段的日志,实现秒级根因定位。
✅ 总结:协同价值
| 单独使用 CLS | 单独使用 Monitor | CLS + Monitor 协同 |
|---|---|---|
| 知道“服务在重复打印日志” | 知道“CPU 很高” | 知道“重复日志导致 CPU 飙升” |
| 无法区分是正常循环还是死循环 | 无法知道 CPU 高的具体原因 | 精准定位代码级根因 |
| 可能误判(如高频正常任务) | 可能漏判(如低 CPU 的逻辑错误) | 减少误报/漏报,提升诊断准确率 |
这正是现代 AIOps 的核心能力:通过多维度可观测数据融合,实现从“现象”到“根因”的自动推理。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)