API接口

功能 方法 路径 说明
普通对话 POST /api/chat 一次性返回
流式对话 POST /api/chat_stream SSE 流式输出
AIOps 诊断 POST /api/aiops 自动故障诊断(流式)
文件上传 POST /api/upload 上传并索引文档
健康检查 GET /api/health 服务状态检查

Chat Agent

普通对话

请求入口:

@router.post("/chat")
async def chat(request: ChatRequest):
    """快速对话接口
    {
        "code": 200,
        "message": "success",
        "data": {
            "success": true,
            "answer": "回答内容",
            "errorMessage": null
        }
    }

    Args:
        request: 对话请求

    Returns:
        统一格式的对话响应
    """
    try:
        logger.info(f"[会话 {request.id}] 收到快速对话请求: {request.question}")
        answer = await rag_agent_service.query(
            request.question,
            session_id=request.id
        )

        logger.info(f"[会话 {request.id}] 快速对话完成")

        return {
            "code": 200,
            "message": "success",
            "data": {
                "success": True,
                "answer": answer,
                "errorMessage": None
            }
        }

    except Exception as e:
        logger.error(f"对话接口错误: {e}")
        return {
            "code": 500,
            "message": "error",
            "data": {
                "success": False,
                "answer": None,
                "errorMessage": str(e)
            }
        }

核心代码:

answer = await rag_agent_service.query(
            request.question,
            session_id=request.id
        )

其中rag_agent_service.query是Rag Agent中的异步方法,使用await等待其运行结果保存在answer。
在return中会将answer封装在data中作为HTTP 响应进行返回。,会通过 FastAPI 框架自动发送回 发起 /chat 请求的客户端


Rag Agent服务

为Agent 创建了带自动消息追加功能的记忆区

class AgentState(TypedDict):
    """Agent 状态"""
    messages: Annotated[Sequence[BaseMessage], add_messages]

AgentState 是一个 类型提示类(TypedDict),用来告诉 LangGraph:
“这个 Agent 在运行过程中,它的内部状态(state)长什么样?”

你可以把它想象成 Agent 的 “大脑记忆区”,每次思考或回复后,都会更新这块内存。

其中Annotated 是 Python 标准库(从 Python 3.9+ 引入,旧版本可通过 typing_extensions 使用)中的一个类型注解工具。
Annotated[Type, metadata1, metadata2, ...]

  • Type:真实的类型(如 str, int, List[str] 等)
  • metadata:任意附加信息(可以是函数、字符串、类、注解等),不影响运行时行为,但框架或工具可以读取它
    核心思想:在不改变类型本身的前提下,给类型“贴标签”或“加说明”,供外部系统(如 LangGraph、FastAPI、Pydantic)使用。

可以把 Annotated[Type, behavior] 想象成:
“这个变量是 Type 类型的,而且当你要更新它时,请用 behavior 的方式来更新。”

add_messages 是 LangGraph 内置的一个消息合并函数

当你在节点函数中返回:

return {"messages": [AIMessage("你好")]}

LangGraph 会:
查看 AgentState 中 messages 字段的类型注解
发现它是 Annotated[…, add_messages]
于是不直接覆盖,而是调用:

new_state["messages"] = add_messages(current_state["messages"], [AIMessage("你好")])

结果就是追加,而不是替换!

使用的方法:
TypedDict + Annotated + add_messages 组合定义 LangGraph 状态结构
实现的效果:
给 Agent 创建了带自动消息追加功能的记忆区,让智能体可以记住历史对话。


修剪 Agent 状态中的消息历史

def trim_messages_middleware(state: AgentState) -> dict[str, Any] | None:

这是一个 LangGraph 中间件函数,核心作用是:修剪 Agent 状态中的消息历史,只保留关键消息(第一条系统消息 + 最近 3 轮对话),避免消息过多超出上下文窗口,同时兼容 LangGraph 状态更新机制。
核心代码:

    # 提取第一条系统消息
    first_msg = messages[0]

    # 保留最近的 6 条消息(确保包含完整的对话轮次)
    recent_messages = messages[-6:] if len(messages) % 2 == 0 else messages[-7:]

    # 构建新的消息列表
    new_messages = [first_msg] + list(recent_messages)

RagAgentService

RagAgentService是一个基于 LangGraph + LangChain + Qwen(通义千问) 的 RAG(检索增强生成)智能代理服务(RagAgentService),支持 非流式 和 流式 两种对话模式,并具备 会话历史管理、工具调用(包括 MCP 工具)、消息修剪、错误处理 等能力。
调用和执行流程:

客户端 (POST /chat)

FastAPI 路由 → 调用 rag_agent_service.query()

RagAgentService
├── 初始化:ChatQwen 模型 + 工具(本地 + MCP)
├── 非流式查询:query() → 一次性返回完整答案
├── 流式查询:query_stream() → 异步生成器逐块返回
├── 会话管理:MemorySaver(基于 session_id = thread_id)
├── 上下文修剪:trim_messages_middleware(防止超长上下文)
└── 工具集成:retrieve_knowledge, get_current_time, MCP 工具(如日志、告警等)

初始化

初始化函数:

def __init__(self, streaming: bool = True):

可以选择是否流式输出,默认为流式输出
在初始化方法中先配置模型名称、是否流式输出、系统提示词,然后创建可直接调用的 ChatQwen 模型对象。接着定义工具、MCP、内存检查点self.checkpointer = MemorySaver(),其中获取MCP是异步的方式。
MemorySaver是 LangGraph 提供的一种 状态持久化后端(checkpoint saver):所有状态都保存在 Python 进程的内存中(即一个字典 dict),不会持久化到磁盘或数据库。
checkpointer 的生命周期 = RagAgentService 实例的生命周期。
核心代码:

self.model = ChatQwen(
            model=self.model_name,  # 用哪个模型
            api_key=config.dashscope_api_key,  # 调用阿里DashScope的密钥(必须有,不然用不了模型)
            temperature=0.7,  # 创造性(0-1,越小越严谨,越大越有想象力,0.7刚好平衡)
            streaming=streaming,  # 跟上面的streaming一致,启用/关闭流式
        )

接着异步初始化Agent

async def _initialize_agent(self):

加载MCP工具

 # 1. 加载MCP工具(从腾讯云MCP服务获取,比如日志查询、监控告警工具)
        mcp_client = await get_mcp_client_with_retry()  # 连接MCP服务(带重试,防止网络差连不上)
        mcp_tools = await mcp_client.get_tools()  # 从MCP服务拿到工具列表

接着合并工具:本地工具(retrieve_knowledge等) + MCP工具(云工具)

再创建 ReAct Agent

 self.agent = create_agent(
            self.model,  # 用我们前面初始化的ChatQwen模型
            tools=all_tools,  # 合并后的所有工具
            checkpointer=self.checkpointer,  # 会话记忆(记住聊天历史)
        )

构建系统提示词

def _build_system_prompt(self) -> str:
    """
    构建系统提示词

    注意:LangChain 框架会自动将工具信息传递给 LLM,
    因此系统提示词中无需列举具体的工具列表。

    Returns:
        str: 系统提示词
    """
    from textwrap import dedent
    # dedent:自动删除代码前面的缩进空格,让多行字符串保持顶格,避免格式混乱
    # .strip():去掉字符串最前面、最后面的空行和空格,让提示词更干净
    return dedent("""
        你是一个专业的AI助手,能够使用多种工具来帮助用户解决问题。

        工作原则:
        1. 理解用户需求,选择合适的工具来完成任务
        2. 当需要获取实时信息或专业知识时,主动使用相关工具
        3. 基于工具返回的结果提供准确、专业的回答
        4. 如果工具无法提供足够信息,请诚实地告知用户

        回答要求:
        - 保持友好、专业的语气
        - 回答简洁明了,重点突出
        - 基于事实,不编造信息
        - 如有不确定的地方,明确说明

        请根据用户的问题,灵活使用可用工具,提供高质量的帮助。
    """).strip()
  • dedent + strip():只是格式化提示词,让它没有多余的空格和空行,AI读起来更清晰,不影响功能。
  • 不用写工具列表:LangChain会自动把所有工具(本地+MCP)的信息传给AI,我们不用手动在提示词里列工具,省事儿~

非流式查询方法(query)—— 一次性返回完整答案

async def query( self, question: str, session_id: str, ) -> str:

核心代码:

            # 2. 构建消息列表:系统提示词 + 用户问题(告诉AI上下文)
            messages = [
                SystemMessage(content=self.system_prompt),  # 给AI立规矩的提示词
                HumanMessage(content=question)  # 用户的问题
            ]

            # 3. 构建Agent输入:格式必须是{"messages": 消息列表},LangGraph才能识别
            agent_input = {"messages": messages}

            # 4. 配置会话ID:用session_id作为thread_id,让checkpointer记住这个用户的聊天历史
            config_dict = {
                "configurable": {
                    "thread_id": session_id
                }
            }

            # 5. 异步执行Agent(ainvoke:异步调用,不卡主线程)
            result = await self.agent.ainvoke(
                input=agent_input,
                config=config_dict,
            )

            # 6. 提取最终答案(重点!)
            messages_result = result.get("messages", [])  # 从Agent返回结果里拿消息列表
            if messages_result:
                # 消息列表最后一条,就是AI的最终回答(前面是思考、工具调用记录)
                last_message = messages_result[-1]
                # 取回答内容,防止没有content属性报错
                answer = last_message.content if hasattr(last_message, 'content') else str(last_message)
  • 非流式:就是AI把所有思考、工具调用都做完,一次性把完整答案返回,适合不需要实时显示的场景(比如后台查询)。
  • session_id的作用:区分不同用户,比如用户A和用户B的聊天历史,靠session_id分开,不会串线。
  • 提取答案:Agent返回的消息列表里,最后一条就是最终回答(前面的是AI的思考过程、工具调用记录,不用管)。

流式查询方法(query_stream)—— 逐步返回答案(AI边想边说)

async def query_stream( self, question: str, session_id: str, ) -> AsyncGenerator[Dict[str, Any], None]:

异步生成器方法(async def + AsyncGenerator 返回类型):用于实现 流式问答(Streaming Q&A)功能。
AsyncGenerator[Dict[str, Any], None]:

  • 这是一个异步生成器(asynchronous generator)。
  • 每次 yield 会产出一个 Dict[str, Any] 类型的数据块(例如:{“content”: “你好”, “type”: “text”})。
  • None 表示这个生成器不接受外部 .send() 的值(标准用法)。
    核心代码:

            # 4. 异步流式调用Agent(astream:流式返回,每次返回一小段答案)
            async for token, metadata in self.agent.astream(
                input=agent_input,
                config=config_dict,
                stream_mode="messages",  # 流式模式:按消息片段返回
            ):
                # 从metadata里拿当前Agent运行的步骤(比如在思考、在调用工具)
                node_name = metadata.get('langgraph_node', 'unknown') if isinstance(metadata, dict) else 'unknown'
                # 拿到当前返回的消息类型(是AI的回答片段,还是工具调用)
                message_type = type(token).__name__

                # 只处理AI的消息(跳过工具返回、系统消息等无关内容)
                if message_type in ("AIMessage", "AIMessageChunk"):
                    # 从消息里提取文本内容块(AI的回答片段)
                    content_blocks = getattr(token, 'content_blocks', None)

                    if content_blocks and isinstance(content_blocks, list):
                        for block in content_blocks:
                            # 只提取文本类型的内容(跳过工具调用等其他类型)
                            if isinstance(block, dict) and block.get('type') == 'text':
                                text_content = block.get('text', '')
                                # 把片段返回给前端(前端拿到后,就能实时显示,实现“边想边说”)
                                if text_content:
                                    yield {
                                        "type": "content",
                                        "data": text_content,
                                        "node": node_name
                                    }
  • yield的作用:每次返回一小段文本,前端拿到后立刻显示,比如AI打“你”,前端就显示“你”,再打“好”,前端就显示“你好”,体验更流畅。
  • 四种返回类型:content(答案片段)、tool_call(工具调用记录)、complete(结束信号)、error(错误信息),前端能根据类型做不同显示。

if content_blocks and isinstance(content_blocks, list):
得到的block:文本类型:{ “type”: “text”, “text”: “你好” }
工具调用:{
“type”: “tool_use”,
“id”: “toolu_xxx”,
“name”: “search”, # 工具名:搜索/查询/计算器等
“input”: {
“query”: “FastAPI 是什么”
}
}
if text_content: yield { "type": "content", "data": text_content, "node": node_name }
重新打包后推给前端的 chunk:
chunk = {
“type”: “content”,
“data”: “AI回答的一小段文字”, # 👈 这里没有 text!是 data!
“node”: “…”
}

获取会话历史方法(get_session_history)—— 让AI记住之前的聊天

def get_session_history(self, session_id: str) -> list:
核心代码:

config = {"configurable": {"thread_id": session_id}}    #用 thread_id = session_id 来区分用户
            
            # 获取该 thread 的最新检查点
            checkpoint_tuple = self.checkpointer.get(config)    #checkpointer = LangGraph 的会话存储器
            
            if not checkpoint_tuple:
                logger.info(f"获取会话历史: {session_id}, 消息数量: 0")
                return []
            
            # checkpoint_tuple 可能是命名元组(新版本)或普通元组(旧版本),安全地提取 checkpoint(兼容处理)
            # 若checkpoint_tuple为命名元组
            if hasattr(checkpoint_tuple, 'checkpoint'):
                checkpoint_data = checkpoint_tuple.checkpoint  # type: ignore
            else:
                # 如果是普通元组,第一个元素是 checkpoint
                checkpoint_data = checkpoint_tuple[0] if checkpoint_tuple else {}
            
            # 从检查点中提取消息
            # LangGraph 将对话消息存储在 channel_values["messages"] 中。
            messages = checkpoint_data.get("channel_values", {}).get("messages", [])
            
            # 转换为前端需要的格式
            history = []
            for msg in messages:
                # 跳过系统消息
                if isinstance(msg, SystemMessage):
                    continue
                    
                role = "user" if isinstance(msg, HumanMessage) else "assistant" #如果这条消息是用户发的 → 标记为 user; 否则(是 AI 发的)→ 标记为 assistant。
                """
                {
                把 LangChain 消息 → 前端通用格式:
                  "role": "user/assistant(二选一)",
                  "content": "消息内容",
                  "timestamp": "时间"
                }
                """
                content = msg.content if hasattr(msg, 'content') else str(msg)
                
                # 提取时间戳(如果有的话)
                timestamp = getattr(msg, 'timestamp', None)
                if timestamp:
                    history.append({
                        "role": role,
                        "content": content,
                        "timestamp": timestamp
                    })
                else:
                    from datetime import datetime
                    history.append({
                        "role": role,
                        "content": content,
                        "timestamp": datetime.now().isoformat()
                    })
            
            logger.info(f"获取会话历史: {session_id}, 消息数量: {len(history)}")
            return history

config = {"configurable": {"thread_id": session_id}} #用 thread_id = session_id 来区分用户
LangGraph 使用 thread_id 来区分不同用户的会话。这里将传入的 session_id 作为 thread_id,用于后续查询。

除了 thread_id,还可以添加其他可配置字段,例如:
{
“configurable”: {
“thread_id”: “user_123”,
“model_name”: “gpt-4”, # 切换模型
“user_locale”: “zh-CN” # 本地化设置
}
}


checkpoint_tuple = self.checkpointer.get(config)self.checkpointer 是 LangGraph 提供的 MemorySaver 类型的检查点存储器(用于持久化或缓存对话状态)。调用 .get(config) 获取该 thread_id 对应的最新对话状态(即 checkpoint)。
checkpointer 不认识 “session_id”,它只认 “thread_id”。
代码中把 session_id 当作 thread_id 传给了 Agent!
从此它们就永久绑定了。

# 1. 你创建了内存存储器
self.checkpointer = MemorySaver()

# 2. 你把 session_id 塞进 config 里,命名为 thread_id
config_dict = {
    "configurable": {
        "thread_id": session_id   # <--- 核心绑定行
    }
}

LangGraph 的 checkpointer 存储规则是固定死的: 每一段对话 = 一个 thread
这是因为将config_dict 传给Agent时

await self.agent.ainvoke(
    input=...,
    config=config_dict  # 👈 带进去
)

Agent 会把 config 交给 checkpointer()
LangGraph 内部执行:

checkpointer.save(
    thread_id=config["configurable"]["thread_id"],
    checkpoint=当前对话状态
)

MemorySaver 里保存:
thread_id = user_001
messages = [系统消息, 用户消息, AI消息…]

下一次相同 session_id 进来 → 自动恢复历史

await agent.ainvoke(..., config={"thread_id": "user_001"})

LangGraph 内部自动做:

# 自动读取
历史消息 = checkpointer.get(thread_id="user_001")

# 自动拼接到当前对话
state["messages"] = 历史消息 + 新问题

清空会话历史方法(clear_session)—— 忘记之前的聊天

def clear_session(self, session_id: str) -> bool:
    """
    清空会话历史(从 MemorySaver checkpointer 中删除)

    Args:
        session_id: 会话ID(即 thread_id)

    Returns:
        bool: 是否成功
    """
    try:
        # 核心代码:删除这个session_id对应的所有聊天记录
        self.checkpointer.delete_thread(session_id)
        logger.info(f"已清除会话历史: {session_id}")
        return True  # 成功返回True
        
    except Exception as e:
        logger.error(f"清空会话历史失败: {session_id}, 错误: {e}")
        return False  # 失败返回False
  • 核心作用:比如用户想“重新开始聊天”,调用这个方法,就能删除之前的所有聊天记录,AI就忘了之前的对话。

资源清理方法

async def cleanup(self):
    """清理资源"""
    try:
        logger.info("清理 RAG Agent 服务资源...")
        # MCP客户端由全局管理器统一管理,不用我们手动清理,省事儿
        logger.info("RAG Agent 服务资源已清理")
    except Exception as e:
        logger.error(f"清理资源失败: {e}")

程序结束时调用这个方法,释放占用的资源(比如模型连接、网络连接),避免资源浪费,属于“规范操作”,不用我们手动管太多。

AIOps Agent

核心逻辑:event_generator() 异步生成器

async def event_generator():
    try:
        async for event in aiops_service.diagnose(session_id=session_id):
            yield {
                "event": "message",
                "data": json.dumps(event, ensure_ascii=False)
            }
            if event.get("type") in ["complete", "error"]:
                break
    except Exception as e:
        # 异常处理:发送 error 事件

AIOpsService

AIOpsService为基于 LangGraph 的通用 Plan-Execute-Replan(规划-执行-重规划)智能代理服务,专为 AIOps(智能运维)场景设计。它使用状态图(StateGraph)构建一个可循环、可中断、可恢复的推理工作流,并通过异步生成器(AsyncGenerator)以流式方式返回每一步的执行结果。

1.状态定义

class PlanExecuteState(TypedDict):
    input: str              # 用户原始输入
    plan: List[str]         # 待执行的步骤列表(会逐步减少)
    past_steps: List[Tuple[str, Any]]  # 已执行的步骤和结果
    response: str           # 最终生成的回答(为空表示未完成)

所有节点共享这个状态,通过修改它来传递信息。

2.工作流构建:_build_graph()

创建图并添加节点:

workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("replanner", replanner)
  • planner, executor, replanner 是外部定义的函数(或 Runnable),接收 state 并返回更新后的 state。

设置执行顺序:
planner → executor → replanner → (条件判断) → executor 或 END
条件边逻辑(关键!):

def should_continue(state: PlanExecuteState) -> str:
    if state.get("response"):      # 已有最终答案 → 结束
        return END
    if state.get("plan"):          # 还有步骤 → 继续执行
        return NODE_EXECUTOR
    return END                     # 计划空但无答案 → 强制结束(应由 replanner 生成 response)

3.检查点(Checkpointing)

self.checkpointer = MemorySaver()
compiled_graph = workflow.compile(checkpointer=self.checkpointer)
  • 使用 MemorySaver 在内存中保存每个 thread_id(即 session_id)的状态。
  • 支持会话恢复:即使服务重启(若用持久化存储),也能从中断处继续。

流式执行:execute() 方法

execute 方法是整个 Plan-Execute-Replan 智能代理服务的核心执行入口。它的作用是:接收一个用户任务描述,启动一个可流式输出、支持多步推理与工具调用的智能工作流,并以异步生成器(AsyncGenerator)的形式逐步返回每一步的执行状态和结果。

执行流程详解

  1. 记录日志 & 初始化状态
initial_state: PlanExecuteState = {
    "input": user_input,      # 用户原始问题
    "plan": [],               # 初始计划为空(由 planner 填充)
    "past_steps": [],         # 已执行步骤(空)
    "response": ""            # 最终答案(空)
}
  • 这个 initial_state 是 LangGraph 状态机的起点。
  • 虽然 plan 为空,但第一个节点 planner 会根据 input 生成具体计划。
  1. 配置会话上下文(用于状态持久化)
config_dict = {
    "configurable": {
        "thread_id": session_id  # 关键!用于区分不同用户的会话
    }
}
  1. 启动流式执行(核心!)
async for event in self.graph.astream(
    input=initial_state,
    config=config_dict,
    stream_mode="updates"
):
  • astream(…, stream_mode=“updates”):
    表示每当图中某个节点完成并更新状态时,就立即 yield 一个事件。
  • 返回的 event 是一个字典,格式如:{“planner”: {“plan”: […], …}
  1. 解析节点输出并转换为前端友好事件
for node_name, node_output in event.items():
    if node_name == NODE_PLANNER:
        yield self._format_planner_event(node_output)
    elif node_name == NODE_EXECUTOR:
        yield self._format_executor_event(node_output)
    elif node_name == NODE_REPLANNER:
        yield self._format_replanner_event(node_output)

每个节点的原始输出(如 {“plan”: [“查日志”, “查指标”]})被转换成标准化 SSE 事件:
planner → {“type”: “plan”, “plan”: […]}
executor → {“type”: “step_complete”, “current_step”: “查日志”, …}
replanner → 可能是 {“type”: “report”, “report”: “…”} 或状态更新
💡 前端无需理解内部状态,只需监听 type 字段即可渲染对应 UI。

节点 输出状态 转换后的 SSE 事件
planner {"plan": ["查告警", "查日志"]} {"type": "plan", "plan": [...]}
executor {"past_steps": [("查告警", {...})], "plan": ["查日志"]} {"type": "step_complete", "current_step": "查告警", ...}
replanner {"response": "# 告警报告\n..."} {"type": "report", "report": "..."}
  1. 获取最终结果并发送完成事件
final_state = self.graph.get_state(config_dict)
final_response = final_state.values.get("response", "")
yield {
    "type": "complete",
    "stage": "complete",
    "message": "任务执行完成",
    "response": final_response
}
  • 即使工作流结束,也要显式发送 complete 事件,通知前端“可以关闭连接了”。
  • response 字段包含最终生成的完整答案(如 Markdown 报告)。

关键机制说明

机制 说明
stream_mode="updates" 只返回变化的部分(delta),而非完整状态,减少网络开销
thread_id 会话隔离 不同 session_id 的任务互不影响,状态独立存储
节点输出格式化 将内部状态转换为前端可消费的标准化事件,解耦前后端
最终状态兜底 即使流式过程中漏掉某些事件,也能在最后获取完整结果

AIOps 专用接口:diagnose()

这是一个兼容层,将通用 execute() 适配为 AIOps 诊断场景:
关键设计:

  1. 固定任务指令:
  • 使用 dedent(“”“…”“”) 定义详细的诊断任务要求
  • 强制输出 Markdown 格式报告
  • 强调“严禁编造”,必须基于真实工具数据
  1. 事件格式转换:
  • 将通用 execute() 返回的 {“type”: “complete”, “response”: “…”}
    转换为 AIOps API 要求的格式:
{
  "type": "complete",
  "stage": "diagnosis_complete",
  "diagnosis": { "report": "..." }
}

当调用 diagnose(session_id=“s123”) 时:

  1. 内部调用
    execute(“诊断当前系统是否存在告警…”, session_id=“s123”)
  2. 流式事件透传
    收到 {“type”: “plan”, “plan”: [“获取活跃告警”, “查询服务日志”]} → 直接 yield
    收到 {“type”: “step_complete”, “current_step”: “获取活跃告警”} → 直接 yield
    收到 {“type”: “report”, “report”: “# 告警分析报告\n…”} → 直接 yield
  3. 完成事件转换
    当 execute 返回 {“type”: “complete”, “response”: “# 告警分析报告…”} 时:
    → 转换为
{
  "type": "complete",
  "stage": "diagnosis_complete",
  "diagnosis": {
    "status": "completed",
    "report": "# 告警分析报告..."
  }
}
  1. 前端消费
    JavaScript 监听到 stage === “diagnosis_complete” 后:
displayReport(event.data.diagnosis.report); // 直接渲染 Markdown

补充:SSE事件

SSE = Server-Sent Events(服务器推送事件)
你可以把它理解成:
后端 → 前端 的 “实时消息通道”
它的特点:

  • 单向:只有后端主动发给前端
  • 流式:像水流一样,一段一段发
  • 格式固定:前端能直接识别
  • 比 WebSocket 轻量、简单
    SSE 长什么样?
    后端发给前端的真实数据格式是这样的:
data: {"type": "plan", "plan": ["查告警", "查日志"]}\n\n
data: {"type": "step_complete", "current_step": "查告警"}\n\n
data: {"type": "report", "report": "# 告警报告..."}\n\n
data: {"type": "complete"}\n\n

前端只要监听这个流,就能实时收到每一步更新。

为什么要把节点输出转成 SSE 事件?
原因:

  1. 前端只能识别 SSE 格式,不能直接识别 AI 节点输出
    AI 节点输出是这样的:
{"plan": ["查告警", "查日志"]}

但前端 SSE 必须是:

data: {"type":"plan", "plan":["查告警","查日志"]}\n\n

不转,前端收不到、解析不了。
plan:AI 生成了计划
step_complete:完成一步
report:生成报告
complete:全部结束
前端要根据不同 type 做不同 UI:
显示计划卡片
显示执行步骤动画
渲染最终报告
关闭加载动画

  1. 流式执行必须用 SSE 才能实时推送
    如果不用 SSE:
    前端要一直等,直到 AI 全部做完才返回
    用户体验极差:一直转圈,不知道 AI 在干嘛
    用 SSE:
    前端实时接收每一步
    可以做打字机效果、步骤动画、思考过程展示

  2. 前端需要知道这一步是什么类型的事件

LangGraph知识补充——状态自动更新机制

一、核心机制名称

LangGraph 中这一核心机制称为:状态自动更新机制(也可结合功能表述为「基于注解的状态合并机制」)。

核心作用:自动根据 AgentState 定义的规则,处理节点返回的状态数据,无需手动编写状态合并逻辑(如消息追加)。

二、机制依赖的核心组件

该机制能运行,依赖 3 个关键组件的配合(缺一不可):

  1. TypedDict(AgentState):定义 Agent 状态的「数据结构」,明确状态中包含哪些字段(如 messages)、每个字段的基础类型,相当于给状态“定规矩”。

  2. Annotated 类型注解:给状态字段“贴规则标签”,不改变字段本身类型,仅向 LangGraph 传递「如何处理该字段」的附加信息(metadata)。

  3. 状态处理函数(如 add_messages):LangGraph 内置(或自定义)的处理逻辑,是 Annotated 传递的“规则具体实现”(如 add_messages 实现消息追加)。

三、机制核心原理

核心逻辑:节点 return 触发状态更新 → LangGraph 读取 AgentState 注解 → 执行对应处理逻辑 → 更新状态

关键结论:

  • return 是「触发信号」:只要在节点函数中 return 一个字典(对应状态字段的更新内容),就会触发 LangGraph 的状态更新机制。

  • Annotated 是「规则指引」:告诉 LangGraph,某个字段需要用特定方式处理(而非直接覆盖)。

  • 处理函数(如 add_messages)是「执行逻辑」:LangGraph 按照注解指引,调用对应函数,完成状态的更新(如消息追加)。

四、具体运行流程(以 messages 字段为例)

  1. 定义状态结构(AgentState):通过 TypedDict 声明 messages 字段,并用 Annotated 绑定 add_messages 规则。
    class AgentState(TypedDict): """Agent 状态""" messages: Annotated[Sequence[BaseMessage], add_messages]

  2. 节点返回更新数据:在节点函数中 return 包含 messages 字段的字典(新消息)。
    return {"messages": [AIMessage("你好")]}

  3. LangGraph 自动触发机制:检测到 return 信号,读取 AgentState 中 messages 字段的 Annotated 注解。

  4. 执行追加逻辑:LangGraph 调用 add_messages 函数,将新消息追加到旧消息列表中,而非直接覆盖。
    new_state["messages"] = add_messages(current_state["messages"], [AIMessage("你好")])

  5. 完成状态更新:将合并后的消息列表赋值给新状态,Agent 状态更新完成,保留历史对话记忆。

方法定义:def trim_messages_middleware(state: AgentState) -> dict[str, Any] | None:
中间件的核心作用是 修改 / 补充 Agent 状态,而 Agent 状态本质是「键值对字典」,中间件支持「可选修改」—— 如果不需要修改任何状态(比如消息数量没超阈值,无需修剪),直接返回 None 即可。
五、关键口诀(便于记忆)

  • return → 触发更新(告诉框架“要更状态”)

  • Annotated → 指引规则(告诉框架“怎么更”)

  • add_messages → 执行规则(实现“追加不覆盖”)

六、核心价值

无需手动编写状态合并代码(如手动拼接消息列表),简化 Agent 开发流程,让 Agent 自动拥有“记忆能力”(如多轮对话历史保留),提升开发效率。

RAG完整流程

整体架构

RagAgentService
   ├── 模型:ChatQwen(阿里千问)
   ├── 工具:retrieve_knowledge(RAG 核心)、get_current_time、MCP 工具
   ├── 记忆:MemorySaver(会话持久化)
   └── 执行器:LangGraph create_agent (ReAct 智能代理)

完整详细执行流程(从用户提问 → 回答返回)
【步骤 0:服务启动】
关键代码

rag_agent_service = RagAgentService(streaming=True)

做了什么

  • 加载配置
  • 初始化 ChatQwen 模型
  • 加载基础工具 retrieve_knowledgeget_current_time
  • 初始化内存存储器 MemorySaver

【步骤 1:用户发起请求】
用户调用(代码入口)

answer = await rag_agent_service.query(
    question="公司的请假政策是什么?",
    session_id="user_001"
)

【步骤 2:进入 query 方法】
关键代码

async def query(self, question: str, session_id: str) -> str:

【步骤 3:异步初始化 Agent】
关键代码

await self._initialize_agent()

内部执行 4 件事

  1. 加载 MCP 远程工具

    mcp_client = await get_mcp_client_with_retry()
    mcp_tools = await mcp_client.get_tools()
    
  2. 合并所有工具

    all_tools = self.tools + self.mcp_tools
    

    工具包含:

    • retrieve_knowledge(RAG 检索)
    • get_current_time
    • 各种 MCP 云工具
  3. 创建 LangGraph Agent

    self.agent = create_agent(
        self.model,
        tools=all_tools,
        checkpointer=self.checkpointer
    )
    
  4. 标记 Agent 初始化完成


【步骤 4:构建消息列表】
关键代码

messages = [
    SystemMessage(content=self.system_prompt),
    HumanMessage(content=question)
]

消息内容

[
    SystemMessage("你是专业AI助手..."),
    HumanMessage("公司的请假政策是什么?")
]

【步骤 5:包装成 Agent 要求的输入格式】
关键代码

agent_input = {"messages": messages}

为什么要这样?
因为你的 AgentState 是:

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

Agent 只接受 key = “messages” 的字典输入


【步骤 6:配置会话 ID】
关键代码

config_dict = {
    "configurable": {
        "thread_id": session_id  # user_001
    }
}

作用:让对话历史持久化,多轮对话保持上下文


【步骤 7:执行 Agent → 进入 LangGraph 自动运行】
关键代码

result = await self.agent.ainvoke(
    input=agent_input,
    config=config_dict
)

🔥【步骤 8:LangGraph 内部自动执行 RAG 核心流程】
8.1 Agent 读取初始状态

state = {
    "messages": [
        SystemMessage(...),
        HumanMessage("公司的请假政策是什么?")
    ]
}

8.2 LLM(千问)思考
LLM 看到问题 + 工具列表,判断:

必须调用 retrieve_knowledge 工具

8.3 LLM 输出工具调用指令

{
  "name": "retrieve_knowledge",
  "parameters": {
    "query": "公司的请假政策是什么?"
  }
}

8.4 LangGraph 自动执行 RAG 检索(真正的 RAG)
框架自动调用

knowledge = retrieve_knowledge("公司的请假政策是什么?")

retrieve_knowledge 内部标准 RAG 步骤

  1. 用户问题向量化
  2. 向量库相似度检索
  3. 返回最相关的文档片段
  4. 拼接成上下文

返回示例:

知识库内容:
1. 事假需提前1天申请
2. 病假需提供医院证明
3. 年假按工龄计算

8.5 工具结果自动追加到消息列表

state["messages"].append(
    ToolMessage(content=知识库内容)
)

8.6 LLM 基于检索结果生成回答
LLM 读取工具返回的知识 → 生成自然语言回答。

8.7 回答追加到消息列表

state["messages"].append(AIMessage("根据公司政策..."))

【步骤 9:Agent 执行完毕,返回结果】

返回结构

result = {
    "messages": [
        SystemMessage(...),
        HumanMessage(...),
        ToolMessage(...),
        AIMessage(...)  # 最终回答
    ]
}

【步骤 10:提取最终答案】

关键代码

messages_result = result.get("messages", [])
last_message = messages_result[-1]
answer = last_message.content

【步骤 11:返回给用户】

return answer

完整 RAG 流程图(文字版)

1. 用户提问
   ↓
2. 初始化 Agent → 加载所有工具(含 RAG 检索)
   ↓
3. 构建消息:[系统提示, 用户问题]
   ↓
4. 包装为:{"messages": 消息列表}
   ↓
5. 传入 LangGraph Agent
   ↓
=================================
【LangGraph 自动执行 RAG】
6. LLM 判断:需要查知识库
7. 自动调用 retrieve_knowledge
8. 执行:向量化 → 检索向量库 → 返回知识
9. 工具结果加入对话
10. LLM 生成最终回答
=================================
   ↓
11. 提取最后一条消息
12. 返回答案给用户

RAG检索阶段

RAG检索阶段通过自定义tool来实现

@tool(response_format="content_and_artifact")
def retrieve_knowledge(query: str) -> Tuple[str, List[Document]]:
    """从知识库中检索相关信息来回答问题
    
    当用户的问题涉及专业知识、文档内容或需要参考资料时,使用此工具。
    
    Args:
        query: 用户的问题或查询
        
    Returns:
        Tuple[str, List[Document]]: (格式化的上下文文本, 原始文档列表)
    """
    try:
        logger.info(f"知识检索工具被调用: query='{query}'")
        
        # 从向量存储中检索相关文档
        vector_store = vector_store_manager.get_vector_store()
        retriever = vector_store.as_retriever(
            search_kwargs={"k": config.rag_top_k}
        )
        
        docs = retriever.invoke(query)
        
        if not docs:
            logger.warning("未检索到相关文档")
            return "没有找到相关信息。", []
        
        # 格式化文档为上下文
        context = format_docs(docs)
        
        logger.info(f"检索到 {len(docs)} 个相关文档")
        return context, docs
        
    except Exception as e:
        logger.error(f"知识检索工具调用失败: {e}")
        return f"检索知识时发生错误: {str(e)}", []


def format_docs(docs: List[Document]) -> str:
    """
    格式化文档列表为上下文文本
    
    Args:
        docs: 文档列表
        
    Returns:
        str: 格式化的上下文文本
    """
    formatted_parts = []
    
    for i, doc in enumerate(docs, 1):
        # 提取元数据
        metadata = doc.metadata
        source = metadata.get("_file_name", "未知来源")
        
        # 提取标题信息 (如果有)
        headers = []
        for key in ["h1", "h2", "h3"]:
            if key in metadata and metadata[key]:
                headers.append(metadata[key])
        
        header_str = " > ".join(headers) if headers else ""
        
        # 构建格式化文本
        formatted = f"【参考资料 {i}】"
        if header_str:
            formatted += f"\n标题: {header_str}"
        formatted += f"\n来源: {source}"
        formatted += f"\n内容:\n{doc.page_content}\n"
        
        formatted_parts.append(formatted)
    
    return "\n".join(formatted_parts)

检索关键代码:

vector_store = vector_store_manager.get_vector_store()
retriever = vector_store.as_retriever(
    search_kwargs={"k": config.rag_top_k}
)
docs = retriever.invoke(query)
  • vector_store_manager.get_vector_store():获取已构建好的向量数据库(如 FAISS、Chroma 等),其中文档已被嵌入(embedding)。
  • as_retriever(…):将向量库转换为 LangChain 的 Retriever 对象。
  • search_kwargs={“k”: config.rag_top_k}:控制返回最相关的 k 个文档(例如 k=3 或 5),这是 RAG 效果的关键超参数。
  • retriever.invoke(query):用用户 query 去检索,返回 List[Document]。
def format_docs(docs: List[Document]) -> str:

作用是将从向量数据库(如 Milvus)检索到的多个文档(List[Document])格式化为一段结构清晰、便于大语言模型(LLM)理解的上下文文本。

在 LangChain 的 Document 对象中:
page_content:存的是真正的文本内容(比如一段文章、一个段落)。
metadata:存的是关于这段文本的附加信息,比如:
来自哪个文件?
在文档中的页码?
所属的章节标题(h1/h2/h3)?
创建时间、作者等?
✅ 简单说:metadata 就是“描述文档的文档”,用来提供上下文背景。
metadata = doc.metadata
source = metadata.get(“_file_name”, “未知来源”)

第 1 步:获取 metadata 字典

metadata = doc.metadata

假设 doc 是一个 Document 对象,它的 metadata 可能长这样:

{
    "_file_name": "技术白皮书.pdf",
    "h1": "第三章 系统架构",
    "h2": "3.2 向量数据库设计",
    "page": 15,
    "chunk_id": "abc123"
}

第 2 步:获取来源文件名

source = metadata.get("_file_name", "未知来源")

这里尝试从 metadata 中找 _file_name 字段。
如果有 → source = “技术白皮书.pdf”
如果没有 → source = “未知来源”(避免程序崩溃)
💡 为什么字段叫 _file_name?
这是你在文档预处理阶段自己定义的(比如用 PyPDFLoader 或 UnstructuredLoader 解析 PDF 时,把原始文件名存进去)。下划线前缀可能是为了区分系统字段。
第 3 步:提取标题层级(h1/h2/h3)

headers = []
for key in ["h1", "h2", "h3"]:
    if key in metadata and metadata[key]:
        headers.append(metadata[key])

举个例子:
假设 metadata 是:

{
    "h1": "人工智能概述",
    "h2": "2.1 大模型发展",
    "h3": "2.1.3 开源生态"
}

循环过程:

  • key = “h1” → 存在且非空 → headers = [“人工智能概述”]
  • key = “h2” → 存在 → headers = [“人工智能概述”, “2.1 大模型发展”]
  • key = “h3” → 存在 → headers = [“人工智能概述”, “2.1 大模型发展”, “2.1.3 开源生态”]
    最终:
header_str = "人工智能概述 > 2.1 大模型发展 > 2.1.3 开源生态"

✅ 这样 LLM 就知道这段内容属于哪个逻辑位置,而不是孤立的一段文字。


MCP

CLS server

实现了一个 腾讯云 CLS(Cloud Log Service)的本地模拟 MCP Server,用于为 AIOps 智能体提供日志查询能力。下面我将逐层解析其结构、功能和设计意图。


🧩 一、整体目标

目的:让 LLM 驱动的 AIOps Agent 能通过标准 MCP 协议,像调用函数一样查询日志,而无需关心底层日志系统细节。

  • MCP = Model Context Protocol,一种标准化的工具调用协议。
  • CLS = 腾讯云日志服务(Cloud Log Service),真实系统中用于收集/检索日志。
  • 本地模拟:当前返回的是 mock 数据,但接口完全兼容真实场景。

🏗️ 二、核心架构

mcp = FastMCP("CLS")  # 创建一个名为 "CLS" 的 MCP 服务
  • 使用 fastmcp 库快速构建 MCP 服务。
  • 服务名称为 "CLS",客户端通过此名称识别该服务。
  • 最终通过 HTTP 暴露在 http://127.0.0.1:8003/mcp

🔧 三、提供的工具(Tools)

共提供 5 个工具,全部用 @mcp.tool() 装饰器注册:

工具名 功能 典型用途
get_current_timestamp() 获取当前毫秒时间戳 构建日志查询的时间范围
get_region_code_by_name(region_name) 地区名 → 腾讯云 region code 定位日志所在区域(如“北京”→ap-beijing
get_topic_info_by_name(topic_name, region_code) 精确查找日志主题信息 获取 topic_id(已基本被下一条替代)
search_topic_by_service_name(service_name, ...) 核心工具! 根据服务名模糊搜索日志主题 快速定位目标服务的日志入口
search_log(topic_id, start_time, end_time, ...) 核心工具! 查询具体日志内容 获取错误日志、分析行为

✅ 这是一个完整的 “服务 → 主题 → 日志” 查询链路


🔍 四、关键工具详解

  1. search_topic_by_service_name(最常用)
search_topic_by_service_name(service_name="data-sync")
  • 支持模糊匹配:输入 "sync" 可匹配 "data-sync-service"
  • 返回多个主题:如应用日志 + 错误日志。
  • 输出示例
    {
      "total": 2,
      "topics": [
        { "topic_id": "topic-001", "service_name": "data-sync-service", ... },
        { "topic_id": "topic-002", "service_name": "data-sync-service", ... }
      ]
    }
    

💡 Agent 通常先调此工具,拿到 topic_id 后再查日志。


2. search_log(日志检索核心)

search_log(
    topic_id="topic-001",
    start_time=1712422800000,  # 15分钟前
    end_time=1712423700000     # 当前时间
)
  • 时间必须是毫秒时间戳(int),强调不能传字符串。
  • 动态生成日志:对 topic-001,每分钟生成一条 "正在同步元数据……"
  • 模拟真实场景:可用于检测“重复日志”或“死循环”。

⚠️ 其他 topic_id 返回错误,确保 Agent 必须先正确获取 topic_id


3. get_current_timestamp()

  • 返回 int(datetime.now().timestamp() * 1000)
  • 作用:避免 Agent 自己计算时间(LLM 不擅长时间运算)。
  • 典型用法
    now = get_current_timestamp()
    fifteen_min_ago = now - 15 * 60 * 1000
    search_log(start_time=fifteen_min_ago, end_time=now)
    

🛠️ 五、辅助功能

  1. @log_tool_call 装饰器
  • 自动记录每次工具调用的:
    • 方法名
    • 输入参数(格式化为 JSON)
    • 返回结果摘要(避免日志爆炸)
    • 成功/失败状态
  • 极大方便调试:开发时可清晰看到 Agent 如何使用工具。
  1. 时间处理工具
  • parse_time_or_default: 安全解析时间字符串。
  • generate_time_series: 生成偏移后的时间(未在工具中直接使用,但为扩展预留)。

🌐 六、服务启动

if __name__ == "__main__":
    mcp.run(
        transport="streamable-http",
        host="127.0.0.1",
        port=8003,
        path="/mcp"
    )
  • 启动一个 HTTP 服务器,监听 8003 端口
  • MCP 客户端通过 POST /mcp/call 调用工具。
  • 符合 MCP over HTTP 标准。

📦 七、Mock 数据设计

所有数据都是硬编码的模拟数据:

mock_topics = [
    {
        "topic_id": "topic-001",
        "service_name": "data-sync-service",
        "region_code": "ap-beijing",
        ...
    }
]
  • 优点:无需依赖真实腾讯云账号,本地即可运行完整 AIOps 流程。
  • 未来替换:只需将内部逻辑改为调用 腾讯云 CLS API,Agent 无需改动。

✅ 八、典型使用流程(Agent 视角)

  1. 用户问:“data-sync-service 为什么变慢?”
  2. Agent 调用 search_topic_by_service_name("data-sync") → 得到 topic-001
  3. Agent 调用 get_current_timestamp() → 得到当前时间戳
  4. Agent 计算 15 分钟前的时间戳
  5. Agent 调用 search_log(topic_id="topic-001", start_time=..., end_time=...):在指定时间范围内,从指定的日志主题(topic)中检索具体的日志内容。
  6. 返回日志:全是 "正在同步元数据……" → 判断可能陷入死循环

🎯 总结:这段代码的价值

方面 说明
标准化 通过 MCP 协议,将日志能力暴露为 LLM 友好的工具
解耦 Agent 不依赖腾讯云 SDK,只依赖 MCP 接口
可测试 本地 mock 数据,无需云环境即可验证 Agent 逻辑
可扩展 未来轻松替换为真实 CLS API
可观测 详细日志记录,便于调试 Agent 行为

💡 本质:这是一个 “适配器”(Adapter) —— 将腾讯云 CLS(或任何日志系统)的能力,包装成 LLM Agent 能理解的“工具”。

这段代码实现了一个 本地模拟的智能运维监控 MCP Server(Monitor MCP Server),用于支持 AIOps(智能运维)Agent 在故障排查时查询服务的运行状态,比如 CPU、内存使用情况等。


Monitor server

🧠 一、整体目标

为运维 Agent 提供一个“假但逼真”的监控数据接口,让 Agent 能在没有真实监控系统(如 Prometheus、Zabbix、腾讯云 Monitor)的情况下进行开发和测试。


🏗️ 二、架构概览

模块 功能
FastMCP("Monitor") 基于 fastmcp 框架构建的 MCP(Model Context Protocol)服务
@mcp.tool() 将函数注册为可被 Agent 调用的工具
@log_tool_call 装饰器,记录每次工具调用的参数、结果、错误(便于调试)
query_cpu_metrics 模拟查询 CPU 使用率
query_memory_metrics 模拟查询内存使用率
Mock 数据生成逻辑 动态生成随时间增长的 CPU/内存数据,模拟“服务逐渐异常”

🔍 三、核心功能详解

  1. 装饰器 @log_tool_call
  • 作用:自动记录每个工具调用的:
    • 方法名
    • 输入参数(格式化为 JSON)
    • 返回结果摘要(避免日志爆炸)
    • 异常信息
  • 价值:方便调试 Agent 的调用行为,知道“它问了什么、得到了什么”。

  1. 时间处理辅助函数

parse_time_or_default

  • 如果用户传了 start_time="2026-04-06 17:00:00",就解析它。
  • 如果没传,默认:
    • start_time = 当前时间 - 1 小时
    • end_time = 当前时间

✅ 符合运维场景:用户说“最近有问题”,Agent 自动查“最近1小时”。

generate_time_series

  • 用于生成带偏移的时间字符串(当前未直接使用,但为扩展预留)。

  1. 核心工具:query_cpu_metrics

📌 功能
模拟返回某个服务(如 "data-sync-service")的 CPU 使用率时间序列数据

📈 数据生成逻辑(关键!)

  • 初始阶段(前3个点):CPU ≈ 10%(正常)
  • 后续阶段:CPU 指数级上升,最终接近 95%
  • 加随机噪声:±2%,让数据更真实
  • 检测突增:如果 max > 80%,触发告警

📤 返回结构示例

{
  "service_name": "data-sync-service",
  "metric_name": "cpu_usage_percent",
  "interval": "1m",
  "data_points": [
    {"timestamp": "17:00", "value": 10.2},
    {"timestamp": "17:01", "value": 10.8},
    {"timestamp": "17:02", "value": 18.5},
    {"timestamp": "17:03", "value": 27.3},
    ...
    {"timestamp": "17:59", "value": 94.7}
  ],
  "statistics": {
    "avg": 45.6,
    "max": 94.7,
    "min": 10.2,
    "spike_detected": true
  },
  "alert_info": {
    "triggered": true,
    "threshold": 80.0,
    "message": "CPU 使用率持续超过 80% 阈值"
  }
}

💡 设计意图:模拟一个 “服务逐渐卡死” 的典型故障场景!


  1. 核心工具:query_memory_metrics

📌 功能
类似 CPU,但模拟 内存泄漏或内存压力 场景。

📈 数据生成逻辑

  • 初始内存 ≈ 30%
  • 逐步线性增长到 85%
  • 随机波动 ±1%
  • 总内存固定为 8GB
  • 告警阈值:70%

📤 返回包含:

  • used_gbtotal_gb(更贴近真实监控)

⚙️ 四、设计亮点

特性 说明
逼真的异常模式 不是随机数据,而是有趋势的故障模拟(CPU飙升、内存增长)
时间灵活 支持指定时间范围,也支持默认“最近1小时”
告警集成 直接在返回中包含 alert_info,Agent 可直接判断是否异常
统计指标丰富 包含 avg/max/min/p95/spike 等,满足深度分析
日志透明 所有调用都被详细记录,便于复现问题
接口标准化 参数、返回格式清晰,符合 MCP 规范

🧪 五、典型使用场景(Agent 如何用?)

# 场景:用户说“data-sync 服务变慢了”
# Agent 自动执行:

1. search_topic_by_service_name("data-sync") → 得到 topic_id
2. search_log(topic_id, 最近15分钟) → 发现日志重复
3. query_cpu_metrics("data-sync-service") → 发现 CPU 从 10% 升到 95%
4. query_memory_metrics("data-sync-service") → 发现内存也在缓慢增长

# 结论:服务可能陷入死循环 + 内存泄漏!

🚀 六、运行方式

python monitor_mcp_server.py
  • 启动一个 HTTP 服务:http://127.0.0.1:8004/mcp
  • Agent 通过 MCP 协议调用 query_cpu_metrics 等工具

✅ 总结

这段代码是一个 高度仿真的运维监控 Mock 服务,它的核心价值在于:

用可控、可预测、带故障模式的假数据,训练和测试 AIOps Agent 的故障诊断能力。

  • 它不是简单返回静态数据,而是模拟真实世界的性能恶化过程
  • 它为 Agent 提供了结构化、带告警、带统计的监控视图。
  • 它是构建 “可观测性驱动的智能运维” 的关键基础设施之一。

未来只需将内部逻辑从 “mock 生成” 替换为 “调用真实监控 API”,即可无缝上线生产环境。

以下是一个 CLS Server(日志服务)Monitor Server(监控服务) 协同工作的完整示例,模拟 AIOps Agent 如何通过两者联合分析,精准定位“数据同步服务死循环”故障。


两个MCP协同工作

🎯 场景设定

  • 用户反馈data-sync 服务最近响应变慢,数据未更新。
  • Agent 目标:自动诊断根因。

🔁 协同工作流程(含具体调用与返回)

以下是一个 完整的、端到端的 Plan-Execute-Replan (PER) 智能代理诊断流程示例,清晰地展示了从用户提问到最终生成报告的每一步,以及 Monitor MCPCLS MCP 是如何在其中被调用并发挥作用的。


🎯 场景
用户输入: “data-sync 服务最近响应很慢,帮我看看是什么原因。”


🔁 完整执行流程

第 0 步:启动 (execute() 方法被调用)

  • AIOpsService.execute(user_input="data-sync 服务最近响应很慢...", session_id="sess_123")
  • 初始化状态:
    initial_state = {
        "input": "data-sync 服务最近响应很慢...",
        "plan": [],
        "past_steps": [],
        "response": ""
    }
    

第 1 轮:Planner 节点 - 制定计划

  • 内部动作: LLM 根据系统提示词(包含可用工具列表)分析用户问题。
  • 思考过程: “要诊断服务慢,我需要先找到它的日志和监控指标。首先得知道它的技术名称。”
  • 输出状态:
    {
        "plan": [
            {"name": "search_topic_by_service_name", "args": {"service_name": "data-sync"}}
        ]
    }
    
  • 流式事件 (SSE):
    {
      "type": "plan",
      "stage": "plan_created",
      "message": "执行计划已制定,共 1 个步骤",
      "plan": ["根据业务名 'data-sync' 查询其对应的技术服务名和日志主题"]
    }
    

第 2 轮:Executor 节点 - 执行第一步 (调用 CLS MCP)

  • 内部动作:
    1. executorplan 中取出第一个动作: search_topic_by_service_name(service_name="data-sync")
    2. 调用 CLS MCP Server
  • CLS MCP Server 返回:
    {
      "topics": [{
        "topic_id": "topic-001",
        "service_name": "data-sync-service"
      }]
    }
    
  • 更新状态:
    {
        "past_steps": [
            (
                {"name": "search_topic_by_service_name", "args": {"service_name": "data-sync"}},
                {"topic_id": "topic-001", "service_name": "data-sync-service"}
            )
        ],
        "plan": [] # 第一步已完成
    }
    
  • 流式事件 (SSE):
    {
      "type": "step_complete",
      "stage": "step_executed",
      "message": "步骤执行完成 (1/1)",
      "current_step": "根据业务名 'data-sync' 查询其对应的技术服务名和日志主题",
      "result": {"topic_id": "topic-001", "service_name": "data-sync-service"}
    }
    

第 3 轮:Replanner 节点 - 评估并重新规划

  • 内部动作:
    1. replanner 检查 past_steps,发现已经获取了关键信息 service_name="data-sync-service"
    2. LLM 决定下一步:需要查询该服务的日志和 CPU 指标。
  • 输出新状态:
    {
        "plan": [
            {"name": "search_log", "args": {"topic_id": "topic-001", "start_time": "...", "end_time": "..."}},
            {"name": "query_cpu_metrics", "args": {"service_name": "data-sync-service", "start_time": "...", "end_time": "..."}}
        ]
    }
    
  • 流式事件 (SSE):
    {
      "type": "status",
      "stage": "replanner",
      "message": "评估完成,继续执行剩余步骤",
      "remaining_steps": 2
    }
    

第 4 轮:Executor 节点 - 执行第二步 (调用 CLS MCP)

  • 内部动作:
    1. executor 取出新 plan 的第一个动作: search_log(topic_id="topic-001", ...).
    2. 再次调用 CLS MCP Server 查询最近15分钟日志。
  • CLS MCP Server 返回:
    {
      "logs": [
          {"message": "正在同步元数据……"},
          {"message": "正在同步元数据……"},
          // ... 共15条重复日志
      ]
    }
    
  • 更新状态:
    {
        "past_steps": [ /* previous step */, (log_action, log_result) ],
        "plan": [{"name": "query_cpu_metrics", ...}] // 剩余1}
    
  • 流式事件 (SSE):
    {
      "type": "step_complete",
      "current_step": "查询 data-sync-service 服务的最近日志",
      "result": {"logs_count": 15, "sample": "正在同步元数据……"}
    }
    

第 5 轮:Executor 节点 - 执行第三步 (调用 Monitor MCP)

  • 内部动作:
    1. executor 取出最后一个动作: query_cpu_metrics(service_name="data-sync-service", ...).
    2. 调用 Monitor MCP Server
  • Monitor MCP Server 返回:
    {
      "data_points": [...],
      "statistics": {"max": 94.8},
      "alert_info": {"triggered": true, "message": "CPU 使用率持续超过 80% 阈值"}
    }
    
  • 更新状态:
    {
        "past_steps": [ /* all 3 steps */ ],
        "plan": [] // 所有计划步骤完成
    }
    
  • 流式事件 (SSE):
    {
      "type": "step_complete",
      "current_step": "查询 data-sync-service 服务的 CPU 使用率",
      "result": {"max_cpu": "94.8%", "alert_triggered": true}
    }
    

第 6 轮:Replanner 节点 - 生成最终报告

  • 内部动作:
    1. replanner 发现 plan 为空,且 past_steps 包含了充分的证据(重复日志 + CPU 飙升)。
    2. LLM 综合所有工具返回的真实数据,生成最终的 Markdown 诊断报告。
  • 输出最终状态:
    {
        "response": "# 告警分析报告\n\n---\n\n## 📋 活跃告警清单\n\n| 告警名称 | 级别 | 目标服务 | ... |\n|----------|------|----------|-----|\n| HighCpuUsage | Critical | data-sync-service | ... |\n\n---\n\n## 🔍 告警根因分析...\n\n### 根因结论\n服务因代码逻辑缺陷陷入死循环,导致 CPU 资源耗尽。\n\n..."
    }
    
  • 流式事件 (SSE):
    {
      "type": "report",
      "stage": "final_report",
      "message": "最终报告已生成",
      "report": "# 告警分析报告\n..."
    }
    

第 7 步:结束 (execute() 方法收尾)

  • execute() 方法调用 self.graph.get_state(...) 获取最终状态。
  • 发送完成事件:
    {
      "type": "complete",
      "stage": "complete",
      "message": "任务执行完成",
      "response": "# 告警分析报告\n..." // 完整的报告内容
    }
    
  • 流式生成器结束。

🌐 总结:MCP 工具在整个流程中的作用

步骤 调用的 MCP 工具 作用 返回的关键信息
第2轮 search_topic_by_service_name (CLS) 服务发现 将业务名 data-sync 映射为技术名 data-sync-service 和日志ID
第4轮 search_log (CLS) 日志取证 发现服务在无限循环打印“正在同步元数据”
第5轮 query_cpu_metrics (Monitor) 指标验证 确认 CPU 使用率飙升至 94.8%,触发告警

💡 核心价值:整个 PER 流程通过 多次、有序地调用不同的 MCP 工具,像一个经验丰富的 SRE 一样,先定位目标,再收集证据,最后交叉验证并得出结论。LLM 本身不产生任何运维数据,它只是协调者和分析师,而 MCP 工具才是提供真实世界“感知”能力的感官

🌐 真实云环境中的协同(腾讯云为例)
根据搜索结果,此流程完全符合腾讯云最佳实践:

“CLS 与 Cloud Monitor 深度集成,用户可将 Nginx 错误日志与服务器 CPU 负载波动关联,快速定位性能瓶颈。”
腾讯云官方文档

在真实场景中:

  • CLS 提供日志检索与关键词告警(如检测到 "ERROR" 自动触发)
  • Cloud Monitor 提供指标看板与阈值告警(如 CPU > 80% 持续 5 分钟)
  • 联动效果:当 Monitor 检测到 CPU 飙升时,自动关联 CLS 中对应时间段的日志,实现秒级根因定位。

✅ 总结:协同价值

单独使用 CLS 单独使用 Monitor CLS + Monitor 协同
知道“服务在重复打印日志” 知道“CPU 很高” 知道“重复日志导致 CPU 飙升”
无法区分是正常循环还是死循环 无法知道 CPU 高的具体原因 精准定位代码级根因
可能误判(如高频正常任务) 可能漏判(如低 CPU 的逻辑错误) 减少误报/漏报,提升诊断准确率

这正是现代 AIOps 的核心能力:通过多维度可观测数据融合,实现从“现象”到“根因”的自动推理

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐