大模型面试

1.车险

1.1 工作内容

该项目面向车险欺诈识别场景。传统机器学习模型虽然能够输出欺诈风险评分,但存在召回率和准确率难以兼顾的问题:如果阈值设得较高,容易漏掉潜在欺诈案件;如果阈值降低,又会引入较多误报,增加人工复核压力。

因此,我在原有机器学习模型基础上引入大模型能力,主要做了两方面工作。

第一,构建“机器学习初筛 + 大模型二次复核”的流程。通过降低机器学习模型的判定阈值,优先提高可疑案件召回率,再将召回出的疑似欺诈案件交给本地部署的大模型进行二次分析,从案件描述、异常特征和历史相似案例等角度判断风险,从而在提高召回的同时尽量控制准确率损失。

第二,构建面向业务人员的 RAG 辅助复查系统。将历史欺诈案例、调查结论和风控经验沉淀为私有知识库,业务人员在人工复查案件时,可以通过可视化界面检索相似案例、查看风险依据,并获得大模型生成的核查建议,从而降低非专业人员理解欺诈案件的门槛。

1.2 关键点

具体关键点为RAG可以整理成四个:

第一,结构化切块。
不是直接把完整案件全文入库,而是按照业务语义拆成事故描述块、异常特征块和调查结论块。这样可以避免长文本噪声干扰,提高检索粒度。

第二,向量化存储。
每个 chunk 单独生成 embedding,向量存入 FAISS,chunk_id、case_id、chunk_type 和原文存入 MySQL。FAISS 负责相似度召回,MySQL 负责元数据管理和案件级信息补全。

第三,案件级有限聚合。
检索时先召回最相关的 chunk,再根据 case_id 找到同一案件下的相关信息。例如命中异常特征块时,会补充调查结论块;命中调查结论块时,会补充异常特征块。这样既避免只看单个片段信息不足,也避免整案全文输入导致 token 浪费。

第四,大模型辅助生成。
将当前案件信息和召回的历史相似案例一起放入 prompt,让大模型输出风险模式、相似案例依据和进一步核查建议,为人工复查提供解释性支持。

在这个项目里,我不是单纯用大模型替代机器学习模型,而是把它作为欺诈识别后的复核和解释模块。传统模型先通过降低阈值提高召回率,把更多潜在欺诈案件筛出来;然后我通过 Ollama 调用本地大模型,对这些疑似案件进行二次分析。RAG 部分主要用于提供历史案例支撑。我把历史车险欺诈案件按照业务语义拆成事故描述、异常特征和调查结论三个 chunk,每个 chunk 单独做 embedding,向量存到 FAISS,元数据存到 MySQL。在线分析新案件时,先检索相似历史 chunk,再按 case_id 做有限补全和聚合,最后让大模型基于当前案件和相似案例生成风险模式、相似点和核查建议。这样既提高了召回后的复核质量,也让业务人员能看到模型判断背后的依据。

2.OmniOps AI 多 Agent 运维系统

2.1 工作内容

该项目面向企业智能运维场景,目标是解决传统运维中故障响应慢、排查依赖人工经验、知识分散和处理流程不标准的问题。

我基于 LangChain + LangGraph 构建了一个多 Agent 智能运维系统,将运维知识库、告警分析、工具调用和任务规划能力整合到统一流程中,实现从“人工查询文档、人工分析日志”到“Agent 自动检索、推理、调用工具并生成处置建议”的转变。

系统主要包含三类能力:

第一,构建运维知识库 RAG,用于检索历史故障案例、SOP 文档、告警处理手册和系统说明文档,为故障分析提供知识支撑。

第二,构建 ReAct Agent,使大模型能够根据当前告警信息进行推理,并调用日志查询、监控查询、工单查询等外部工具。

第三,构建 Plan-Execute-Replan 工作流,将复杂运维任务拆解为多个步骤,逐步执行,并根据执行结果动态调整计划。


2.2 基础流程

1)RAG 知识库建设

我将历史故障案例、运维 SOP、告警处理手册、系统架构说明等文档整理成知识库。

处理流程是:

文档清洗
→ 按业务语义切块
→ embedding 向量化
→ 写入向量数据库
→ 在线检索相关知识

RAG 的作用是让 Agent 在分析问题时,不只依赖大模型自身知识,而是能够结合企业内部真实运维经验。

例如用户输入:

订单服务接口 5xx 增多,同时 CPU 使用率持续升高,怎么排查?

系统会先检索相关的:

CPU 飙高排查手册
接口 5xx 处理 SOP
历史相似故障案例
K8s Pod 资源限制说明

然后再交给大模型综合分析。


2)向量召回 + 重排序

这个项目里的 RAG 不是简单地向量 TopK 检索后直接返回,而是采用 粗召回 + 精排 的两阶段检索策略。

第一阶段通过向量检索召回较多候选 chunk,例如 Top20,保证召回率。

第二阶段使用 reranker 对 query 和候选 chunk 重新计算相关性分数,筛选出最相关的 Top5 作为最终上下文。

流程可以总结为:

用户问题 / 告警信息
→ 向量检索召回 TopK
→ reranker 重排序
→ 保留 TopN 高相关片段
→ 构造 Prompt
→ 交给 Agent 分析

这样做的好处是:

  • 减少无关文档进入 prompt
  • 提高故障知识匹配准确率
  • 降低 token 消耗
  • 提升大模型回答稳定性

面试时可以重点说:

向量检索负责召回,reranker 负责精排,两者结合可以兼顾召回率和相关性。


3)ReAct Agent 工具调用

在故障分析过程中,仅靠知识库是不够的,因为很多信息是实时变化的,比如日志、监控指标、服务状态和工单状态。

所以我实现了 ReAct Agent,让大模型可以在推理过程中调用外部工具。

整体流程是:

Thought:分析当前问题需要什么信息
Action:选择并调用工具
Observation:读取工具返回结果
Thought:结合新信息继续推理
Final Answer:输出结论和建议

例如一次排障流程中,Agent 可能会依次调用:

查询服务 CPU / 内存指标
查询近 30 分钟错误日志
查询接口 5xx 分布
查询最近发布记录
查询历史相似工单

这样系统就不是简单聊天机器人,而是能够结合实时系统状态进行运维分析的工具型 Agent。


4)Plan-Execute-Replan 工作流

对于复杂任务,比如“排查某服务不可用原因”,单轮 ReAct 可能不够稳定,因此我引入了 Plan-Execute-Replan 工作流。

它的核心流程是:

Planner:先生成排查计划
Executor:按步骤执行计划
Replanner:根据执行结果判断是否继续、修改计划或输出最终结论

例如:

1. 检查服务实例是否存活
2. 查看最近 30 分钟告警
3. 查询错误日志
4. 检查数据库连接状态
5. 检查最近发布记录
6. 汇总结论并给出处置建议

如果执行到某一步发现数据库连接异常,Replanner 会调整后续计划,把排查重点转向数据库或网络链路,而不是机械地继续原计划。

这个模块的价值是:

让复杂运维任务从一次性回答变成可动态调整的多步骤执行流程。


5)工程化封装

工程实现上,我将 Agent 能力封装成后端服务,对外提供统一接口。

大致流程是:

前端 / 运维平台输入告警
→ 后端接收请求
→ RAG 检索知识
→ Agent 判断是否调用工具
→ 工具返回实时数据
→ Agent 生成排查结论
→ 返回前端展示

系统可以输出:

风险等级
故障原因分析
相关历史案例
已调用工具结果
下一步排查建议
处置方案

面试 1 分钟版本

这个项目是一个面向企业运维场景的多 Agent 智能运维系统,主要解决传统运维中故障响应慢、依赖人工经验和知识分散的问题。我基于 LangChain 和 LangGraph 实现了 RAG、ReAct 和 Plan-Execute-Replan 三类核心能力。RAG 部分用于检索历史故障案例、SOP 文档和告警处理手册,并采用向量召回加 reranker 重排序的两阶段检索方式,提高上下文相关性。ReAct Agent 用于在推理过程中调用日志查询、监控查询、工单查询等工具,获取实时系统状态。对于复杂故障,我使用 Plan-Execute-Replan 工作流,先生成排查计划,再逐步执行,并根据工具返回结果动态调整计划。最终系统可以自动生成故障原因分析、相似案例和处置建议,把原本依赖人工经验的排障流程标准化、自动化。

2.3 React和Plan-Execute-RePlan

ReAct Agent 的实现方式

ReAct Agent 主要用于处理需要边分析边调用工具的运维问题,比如查询日志、查询监控、查询工单、检索知识库等。

它的核心逻辑是:

用户问题 / 告警信息
→ LLM 推理当前需要什么信息
→ 选择工具并生成 tool call
→ 执行工具
→ 将工具结果作为 Observation 写回上下文
→ LLM 继续推理
→ 直到不再调用工具,输出最终答案

在代码实现上,它本质上是一个循环:

for iteration in range(max_iterations):
    response = llm.invoke(messages, tools=tools)

    if not response.tool_calls:
        final_answer = response.content
        break

    for tool_call in response.tool_calls:
        tool_result = execute_tool(tool_call)
        messages.append({
            "role": "tool",
            "content": tool_result,
            "tool_call_id": tool_call.id
        })

也就是说,ReAct 并不是一次性生成答案,而是通过:

Reasoning → Acting → Observation → Reasoning

不断迭代完成任务。

项目中实现React的关键代码:

self.agent = create_agent(
    self.model,
    tools=all_tools,
    checkpointer=self.checkpointer,
)

其中:

self.model = ChatQwen(...)

表示使用千问模型;

all_tools = self.tools + self.mcp_tools

表示把本地工具和 MCP 工具一起绑定给 Agent。

本地工具包括:

retrieve_knowledge
get_current_time

MCP 工具主要用于接入外部运维能力,比如日志查询、监控告警等。

执行流程

用户输入问题后,代码先构造消息:

messages = [
    SystemMessage(content=self.system_prompt),
    HumanMessage(content=question)
]

然后通过:

result = await self.agent.ainvoke(
    input={"messages": messages},
    config={"configurable": {"thread_id": session_id}},
)

触发 Agent 执行。

执行过程中,LangGraph 会自动完成 ReAct 风格的循环:

用户问题
→ 大模型判断是否需要调用工具
→ 如果需要,生成工具调用
→ 系统执行工具
→ 工具结果写回 messages
→ 大模型继续推理
→ 直到不再调用工具,输出最终回答

所以项目中不需要你手动维护:

for iteration in range(max_iterations):

这部分循环已经由 LangChain / LangGraph 封装。


状态怎么传递和保持

项目中的状态本质上是:

messages

也就是用户消息、AI 消息、工具调用消息和工具返回结果。

你代码里定义了:

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

它表示 Agent 状态结构中有一个 messages 字段,并且新消息会通过 add_messages 追加进去。

同时你使用:

self.checkpointer = MemorySaver()

它不是状态本身,而是状态存储器。作用是把某个会话的 messages 状态按 thread_id 保存下来。

每次调用时传入:

config={"configurable": {"thread_id": session_id}}

这样 LangGraph 就能根据 session_id 区分不同会话,并保存 / 恢复对应的消息状态。

messages 是 Agent 的状态内容,checkpointer 负责按 session_id/thread_id 保存和恢复这份状态。

当前实现使用的是 MemorySaver,所以状态保存在内存中;如果要做生产级持久化,可以替换成数据库型 checkpointer,比如 SQLite、Postgres 或 Redis。

所以可以总结为:

messages 是状态本体
AgentState 描述状态结构
MemorySaver 负责保存状态
thread_id/session_id 负责区分不同会话

面试回答版

在我的 OmniOps 项目中,ReAct Agent 是基于 LangChain 新版 create_agent 实现的,底层由 LangGraph 管理执行流程。我把 ChatQwen 模型、本地 RAG 工具和 MCP 运维工具一起绑定到 Agent 上。用户输入问题后,系统构造 system message 和 human message,然后通过 agent.ainvokeagent.astream 触发执行。执行过程中,模型会根据当前 messages 判断是否需要调用工具,比如知识库检索、日志查询或监控查询;工具返回结果会作为消息继续写回 messages,模型再基于新的 observation 继续推理,直到输出最终答案。状态方面,Agent 的核心状态是 messages,AgentState 定义了 messages 的状态结构,MemorySaver 作为 checkpointer 按 thread_id=session_id 保存不同会话的状态。


Plan-Execute-Replan 的实现方式

Plan-Execute-Replan 主要用于处理复杂、多步骤、需要动态调整计划的运维任务。

它比 ReAct 更适合复杂排障,例如:

排查订单服务不可用的根因
分析一次线上故障的完整影响链路
生成并执行故障处置方案

它的核心流程是:

Planner:生成初始计划
Executor:执行当前步骤
Replanner:根据执行结果判断下一步

Planner:生成计划

Planner 负责根据用户问题生成结构化任务列表。

例如:

用户问题:排查订单服务 5xx 激增原因

Planner 输出:

1. 查询订单服务近 30 分钟错误率和 QPS
2. 查询订单服务错误日志
3. 检查最近是否有发布变更
4. 查询数据库连接池状态
5. 检索历史相似故障案例
6. 汇总原因并给出处置建议

代码层面通常用 Pydantic 约束输出结构:

class Plan(BaseModel):
    steps: list[str]

这样可以避免大模型输出随意文本,便于后续程序自动执行。


Executor:执行步骤

Executor 每次只执行计划中的一个步骤。

例如当前步骤是:

查询订单服务近 30 分钟错误日志

Executor 会选择合适工具,比如日志查询工具:

result = log_query_tool.run(
    service="order-service",
    time_range="30m",
    keyword="ERROR"
)

然后把执行结果写入状态中:

state["past_steps"].append((current_step, result))

这里的 past_steps 很重要,它记录了已经执行过的步骤和观察结果,供 Replanner 判断后续计划是否需要调整。


Replanner:重新规划或结束

Replanner 根据当前状态做判断:

原始问题
当前剩余计划
已经执行的步骤
工具返回结果

然后有两种输出:

第一种:继续执行

如果信息还不够,Replanner 会更新计划:

继续查询数据库慢查询日志
检查 Redis 连接状态

第二种:结束任务

如果已经能回答问题,Replanner 会生成最终响应:

初步判断订单服务 5xx 激增与最近一次发布有关。
主要依据是……
建议先回滚版本,并继续观察错误率。

代码中通常会通过状态字段判断是否结束:

def should_continue(state):
    if state.get("response"):
        return "end"
    return "execute"

也就是说:

  • response 有值 → 任务完成,结束
  • response 为空 → 继续执行下一步

3. LangGraph 中的整体实现

在 LangGraph 中,可以把三个模块建成图节点:

planner → executor → replanner
              ↑          ↓
              └──────────┘

伪代码如下:

workflow.add_node("planner", plan_node)
workflow.add_node("executor", execute_node)
workflow.add_node("replanner", replan_node)

workflow.set_entry_point("planner")

workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replanner")

workflow.add_conditional_edges(
    "replanner",
    should_continue,
    {
        "execute": "executor",
        "end": END,
    }
)

这里的关键是:

Replanner 不只是判断是否结束,还会根据执行结果动态修改剩余计划。

所以它不是简单的固定流程,而是一个可以根据工具反馈调整方向的执行图。

面试 1 分钟版本

在 OmniOps 项目中,我实现了两类 Agent 工作流。第一类是 ReAct Agent,主要用于需要实时查询工具的运维问答。它本质上是一个 LLM 和工具调用的循环:模型先根据当前问题推理是否需要查询日志、监控或知识库,如果需要就生成 tool call,工具执行后把结果作为 observation 写回上下文,模型再继续分析,直到不再调用工具后输出最终结论。第二类是 Plan-Execute-Replan,主要用于复杂排障任务。它先由 Planner 生成结构化排查计划,然后 Executor 每次执行一个步骤,并把执行结果写入 past_steps,最后 Replanner 根据已有结果判断是继续执行、调整计划,还是生成最终 response。这个流程在 LangGraph 中通过 planner、executor、replanner 三个节点和条件边实现,能够让复杂运维任务从一次性回答变成可追踪、可调整的多步骤执行流程。

2.4 状态传递与保持

1. ReAct Agent 的状态传递与保持

ReAct 的状态主要保存在 messages 对话列表 里。

它不是额外维护复杂状态对象,而是把每轮的用户输入、模型回复、工具调用结果都追加到 messages 中。

流程是:

messages 初始包含:
system prompt
用户问题

第一轮 LLM 判断需要调用工具后,会生成 tool_call:

assistant: 调用日志查询工具

系统执行工具后,把工具结果追加回 messages:

tool: 查询到最近 30 分钟 ERROR 日志数量激增,主要异常为数据库连接超时

下一轮再把完整 messages 传给 LLM:

system prompt
用户问题
assistant tool_call
tool observation

所以模型能“记住”前面查过什么,不是因为程序内部隐藏记忆,而是因为工具结果被写回上下文

代码层面可以这样理解:

messages = build_initial_messages(user_query)

for i in range(max_iterations):
    response = llm.invoke(messages, tools=tools)

    messages.append(response)

    if not response.tool_calls:
        return response.content

    for tool_call in response.tool_calls:
        tool_result = run_tool(tool_call)
        messages.append({
            "role": "tool",
            "tool_call_id": tool_call.id,
            "content": tool_result
        })

面试可以这样说:

ReAct Agent 的状态主要通过 messages 传递。每一轮 LLM 的响应、工具调用和工具返回结果都会追加到 messages 中,下一轮调用模型时把完整 messages 再传入,因此模型可以基于之前的 observation 继续推理。也就是说,ReAct 的状态保持本质上依赖上下文消息历史。


2. Plan-Execute-Replan 的状态传递与保持

Plan-Execute-Replan 的状态更结构化,一般不是只靠 messages,而是用一个 State 对象 保存。

常见字段包括:

class PlanExecuteState(TypedDict):
    input: str
    plan: list[str]
    past_steps: list[tuple[str, str]]
    response: str

每个字段作用是:

input:用户原始问题
plan:当前剩余计划
past_steps:已经执行过的步骤和结果
response:最终回答,如果有值说明任务完成

2.1 Planner 怎么更新状态

Planner 读取:

input

生成:

plan

例如:

state["plan"] = [
    "查询订单服务近30分钟错误率",
    "查询错误日志",
    "检查最近发布记录",
    "检索历史相似故障案例"
]

此时状态变成:

{
    "input": "排查订单服务5xx激增原因",
    "plan": [...],
    "past_steps": [],
    "response": ""
}

2.2 Executor 怎么更新状态

Executor 每次读取当前计划中的第一步:

current_step = state["plan"][0]

然后调用工具执行,得到结果:

result = log_tool.run(current_step)

执行完成后,把结果写入:

state["past_steps"].append((current_step, result))

同时可以从 plan 中移除已经执行的步骤:

state["plan"] = state["plan"][1:]

所以状态会从:

"past_steps": []

变成:

"past_steps": [
    ("查询错误日志", "发现大量数据库连接超时")
]

这就是它保持执行记忆的方式。


2.3 Replanner 怎么更新状态

Replanner 会读取:

input
plan
past_steps

判断下一步:

如果还需要继续:

它会更新 plan

state["plan"] = [
    "查询数据库连接池状态",
    "检查数据库慢查询"
]

如果已经可以回答:

它会写入 response

state["response"] = "初步判断故障由数据库连接池耗尽导致,建议先扩容连接池并排查慢查询。"

然后条件边判断:

def should_continue(state):
    if state.get("response"):
        return "end"
    return "execute"

也就是说:

response 有值 → 结束
response 为空 → 回到 executor 继续执行
3.状态保持关键代码
React

为Agent 创建了带自动消息追加功能的记忆区

class AgentState(TypedDict):
    """Agent 状态"""
    messages: Annotated[Sequence[BaseMessage], add_messages]

AgentState 是一个 类型提示类(TypedDict),用来告诉 LangGraph:
“这个 Agent 在运行过程中,它的内部状态(state)长什么样?”

你可以把它想象成 Agent 的 “大脑记忆区”,每次思考或回复后,都会更新这块内存。

add_messages 是 LangGraph 内置的一个消息合并函数

当你在节点函数中返回:

return {"messages": [AIMessage("你好")]}

LangGraph 会:
查看 AgentState 中 messages 字段的类型注解
发现它是 Annotated[…, add_messages]
于是不直接覆盖,而是调用:

new_state["messages"] = add_messages(current_state["messages"], [AIMessage("你好")])

结果就是追加,而不是替换!

使用的方法:
TypedDict + Annotated + add_messages 组合定义 LangGraph 状态结构
实现的效果:
给 Agent 创建了带自动消息追加功能的记忆区,让智能体可以记住历史对话。

Plan-Execute-Replan

PlanExecuteState

状态 Schema + LangGraph 自动传递 + Reducer 合并机制
这段共享状态主要用了 Python 标准库里的类型工具:

from typing import List, TypedDict, Annotated
import operator

TypedDict :定义 LangGraph 状态的字典结构
List :声明字段是列表类型
Annotated :给字段附加额外的合并规则
operator.add:作为 reducer,用于列表追加合并

LangGraph 自动传递:每个节点都能读到当前 state

在 LangGraph 里,每个节点函数都会接收当前状态:

async def planner(state: PlanExecuteState) -> Dict[str, Any]:

所以节点可以从 state 里读数据。

关键代码:定义共享状态结构

from typing import List, TypedDict, Annotated
import operator

class PlanExecuteState(TypedDict):
    """Plan-Execute-Replan 状态"""
    # 用户输入
    input: str
    # 当前剩余计划
    plan: List[str]
    # 已执行步骤历史,使用 operator.add 追加
    past_steps: Annotated[List[tuple], operator.add]
    # 最终响应
    response: str

其中最关键的是:

past_steps: Annotated[List[tuple], operator.add]

它表示:
past_steps 这个字段更新时,不是覆盖,而是用 operator.add 做列表拼接。

operator.add 是 Python 的列表加法函数,在 LangGraph 中通过 Annotated 注册为 past_steps 字段的 reducer。这样每次节点返回新的 past_steps 时,LangGraph 不会覆盖旧值,而是调用 operator.add(old, new) 把执行历史追加起来。它本身不是 LangGraph 专有的机制,但在 LangGraph 状态定义中充当 reducer 合并函数。

面试回答版

ReAct 和 Plan-Execute-Replan 的状态传递方式不一样。ReAct 的状态主要保存在 messages 里,每一轮 LLM 的回复、tool call 和工具返回的 observation 都会追加到 messages 中,下一轮调用模型时把完整 messages 再传入,所以模型可以基于之前的工具结果继续推理。它的状态保持更像是对话上下文历史。

Plan-Execute-Replan 的状态更结构化,一般用一个 State 对象维护,比如 input、plan、past_steps 和 response。Planner 根据 input 生成 plan;Executor 每次读取 plan 中的当前步骤并执行,把结果写入 past_steps,同时更新剩余 plan;Replanner 根据 input、剩余 plan 和 past_steps 判断是否需要修改计划或生成最终 response。如果 response 有值,条件边就结束;如果没有 response,就回到 Executor 继续执行。所以它的状态保持是显式字段驱动的,更适合复杂多步骤任务。

4.状态传递规范

定义Langgraph图:

from langgraph.graph import StateGraph, START, END

workflow = StateGraph(PlanExecuteState)

workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("replanner", replanner)

workflow.add_edge(START, "planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replanner")

def should_continue(state: PlanExecuteState) -> str:
    if state.get("response"):
        return "end"
    return "execute"

workflow.add_conditional_edges(
    "replanner",
    should_continue,
    {
        "execute": "executor",
        "end": END,
    }
)

app = workflow.compile()

app = workflow.compile()LangGraph 里把工作流图编译成可执行应用对象 的代码。

之后,LangGraph 会把这个图编译成一个可运行对象 app

之后你才能调用:

result = app.invoke({
    "input": "排查订单服务 5xx 激增原因",
    "plan": [],
    "past_steps": [],
    "response": ""
})

或者流式执行:

for event in app.stream(inputs):
    print(event)

流程图:

START
  ↓
Planner
  ↓
Executor
  ↓
Replanner
  ├── 如果 response 为空 → 回到 Executor
  └── 如果 response 有值 → END
Replanner 决策 Replanner 返回值 state 变化 条件边下一跳
continue {} plan 不变,response 仍为空 executor
replan {"plan": new_steps} plan 被替换,response 仍为空 executor
respond {"response": final_response} response 有值 END

Planner 节点内部使用 Pydantic 的 Plan(BaseModel) 配合 llm.with_structured_output(Plan),约束大模型输出为包含 steps: List[str] 的结构化结果。
随后 Planner 会提取 plan_result.steps,并以 {“plan”: plan_steps} 的形式返回给 LangGraph,由 LangGraph 将其合并到共享状态 PlanExecuteState 的 plan 字段中。

Plan 类是:

class Plan(BaseModel):
    steps: List[str] = Field(
        description="完成任务所需的不同步骤..."
    )

在 LangChain 里,Pydantic 被用来约束 LLM 输出
在非 LangChain 场景中,只要一个类继承了 Pydantic 的 BaseModel,创建对象时字段不满足类型或校验要求,就会触发校验错误。

继承 BaseModel 的类属于 Pydantic 数据模型类
主要有四个作用:
1.定义数据结构.它可以明确规定一个对象应该有哪些字段。
2.约束字段类型
3.可以添加字段说明 在 LangChain 的结构化输出里,它还会变成 schema 描述,帮助大模型理解这个字段应该输出什么。
4.可以和 with_structured_output() 配合,约束 LLM 输出

5.MCP
1. 本地项目里已经有本地工具
   get_current_time
   retrieve_knowledge

2. 创建 / 获取 MCP Client
   mcp_client = await get_mcp_client_with_retry()

3. 通过 MCP Client 从 MCP Server 拉取外部工具
   mcp_tools = await mcp_client.get_tools()

4. 在当前代码里合并工具
   all_tools = local_tools + mcp_tools

5. 使用大模型时绑定所有工具
   llm_with_tools = llm.bind_tools(all_tools)

6. 如果 LLM 返回 tool_calls
   ToolNode(all_tools) 负责执行对应工具

项目中先获取一个 MCP Client,通过它从 MCP Server 动态获取外部工具;本地工具不需要注册到 MCP Client 中,而是在应用侧和 MCP 工具合并成 all_tools。之后通过 llm.bind_tools(all_tools) 把所有工具绑定给大模型,让模型可以生成 tool_calls;再通过 ToolNode(all_tools) 根据模型返回的工具调用真正执行对应工具。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐