llm面试整理
大模型面试
1.车险
1.1 工作内容
该项目面向车险欺诈识别场景。传统机器学习模型虽然能够输出欺诈风险评分,但存在召回率和准确率难以兼顾的问题:如果阈值设得较高,容易漏掉潜在欺诈案件;如果阈值降低,又会引入较多误报,增加人工复核压力。
因此,我在原有机器学习模型基础上引入大模型能力,主要做了两方面工作。
第一,构建“机器学习初筛 + 大模型二次复核”的流程。通过降低机器学习模型的判定阈值,优先提高可疑案件召回率,再将召回出的疑似欺诈案件交给本地部署的大模型进行二次分析,从案件描述、异常特征和历史相似案例等角度判断风险,从而在提高召回的同时尽量控制准确率损失。
第二,构建面向业务人员的 RAG 辅助复查系统。将历史欺诈案例、调查结论和风控经验沉淀为私有知识库,业务人员在人工复查案件时,可以通过可视化界面检索相似案例、查看风险依据,并获得大模型生成的核查建议,从而降低非专业人员理解欺诈案件的门槛。
1.2 关键点
具体关键点为RAG可以整理成四个:
第一,结构化切块。
不是直接把完整案件全文入库,而是按照业务语义拆成事故描述块、异常特征块和调查结论块。这样可以避免长文本噪声干扰,提高检索粒度。
第二,向量化存储。
每个 chunk 单独生成 embedding,向量存入 FAISS,chunk_id、case_id、chunk_type 和原文存入 MySQL。FAISS 负责相似度召回,MySQL 负责元数据管理和案件级信息补全。
第三,案件级有限聚合。
检索时先召回最相关的 chunk,再根据 case_id 找到同一案件下的相关信息。例如命中异常特征块时,会补充调查结论块;命中调查结论块时,会补充异常特征块。这样既避免只看单个片段信息不足,也避免整案全文输入导致 token 浪费。
第四,大模型辅助生成。
将当前案件信息和召回的历史相似案例一起放入 prompt,让大模型输出风险模式、相似案例依据和进一步核查建议,为人工复查提供解释性支持。
在这个项目里,我不是单纯用大模型替代机器学习模型,而是把它作为欺诈识别后的复核和解释模块。传统模型先通过降低阈值提高召回率,把更多潜在欺诈案件筛出来;然后我通过 Ollama 调用本地大模型,对这些疑似案件进行二次分析。RAG 部分主要用于提供历史案例支撑。我把历史车险欺诈案件按照业务语义拆成事故描述、异常特征和调查结论三个 chunk,每个 chunk 单独做 embedding,向量存到 FAISS,元数据存到 MySQL。在线分析新案件时,先检索相似历史 chunk,再按 case_id 做有限补全和聚合,最后让大模型基于当前案件和相似案例生成风险模式、相似点和核查建议。这样既提高了召回后的复核质量,也让业务人员能看到模型判断背后的依据。
2.OmniOps AI 多 Agent 运维系统
2.1 工作内容
该项目面向企业智能运维场景,目标是解决传统运维中故障响应慢、排查依赖人工经验、知识分散和处理流程不标准的问题。
我基于 LangChain + LangGraph 构建了一个多 Agent 智能运维系统,将运维知识库、告警分析、工具调用和任务规划能力整合到统一流程中,实现从“人工查询文档、人工分析日志”到“Agent 自动检索、推理、调用工具并生成处置建议”的转变。
系统主要包含三类能力:
第一,构建运维知识库 RAG,用于检索历史故障案例、SOP 文档、告警处理手册和系统说明文档,为故障分析提供知识支撑。
第二,构建 ReAct Agent,使大模型能够根据当前告警信息进行推理,并调用日志查询、监控查询、工单查询等外部工具。
第三,构建 Plan-Execute-Replan 工作流,将复杂运维任务拆解为多个步骤,逐步执行,并根据执行结果动态调整计划。
2.2 基础流程
1)RAG 知识库建设
我将历史故障案例、运维 SOP、告警处理手册、系统架构说明等文档整理成知识库。
处理流程是:
文档清洗
→ 按业务语义切块
→ embedding 向量化
→ 写入向量数据库
→ 在线检索相关知识
RAG 的作用是让 Agent 在分析问题时,不只依赖大模型自身知识,而是能够结合企业内部真实运维经验。
例如用户输入:
订单服务接口 5xx 增多,同时 CPU 使用率持续升高,怎么排查?
系统会先检索相关的:
CPU 飙高排查手册
接口 5xx 处理 SOP
历史相似故障案例
K8s Pod 资源限制说明
然后再交给大模型综合分析。
2)向量召回 + 重排序
这个项目里的 RAG 不是简单地向量 TopK 检索后直接返回,而是采用 粗召回 + 精排 的两阶段检索策略。
第一阶段通过向量检索召回较多候选 chunk,例如 Top20,保证召回率。
第二阶段使用 reranker 对 query 和候选 chunk 重新计算相关性分数,筛选出最相关的 Top5 作为最终上下文。
流程可以总结为:
用户问题 / 告警信息
→ 向量检索召回 TopK
→ reranker 重排序
→ 保留 TopN 高相关片段
→ 构造 Prompt
→ 交给 Agent 分析
这样做的好处是:
- 减少无关文档进入 prompt
- 提高故障知识匹配准确率
- 降低 token 消耗
- 提升大模型回答稳定性
面试时可以重点说:
向量检索负责召回,reranker 负责精排,两者结合可以兼顾召回率和相关性。
3)ReAct Agent 工具调用
在故障分析过程中,仅靠知识库是不够的,因为很多信息是实时变化的,比如日志、监控指标、服务状态和工单状态。
所以我实现了 ReAct Agent,让大模型可以在推理过程中调用外部工具。
整体流程是:
Thought:分析当前问题需要什么信息
Action:选择并调用工具
Observation:读取工具返回结果
Thought:结合新信息继续推理
Final Answer:输出结论和建议
例如一次排障流程中,Agent 可能会依次调用:
查询服务 CPU / 内存指标
查询近 30 分钟错误日志
查询接口 5xx 分布
查询最近发布记录
查询历史相似工单
这样系统就不是简单聊天机器人,而是能够结合实时系统状态进行运维分析的工具型 Agent。
4)Plan-Execute-Replan 工作流
对于复杂任务,比如“排查某服务不可用原因”,单轮 ReAct 可能不够稳定,因此我引入了 Plan-Execute-Replan 工作流。
它的核心流程是:
Planner:先生成排查计划
Executor:按步骤执行计划
Replanner:根据执行结果判断是否继续、修改计划或输出最终结论
例如:
1. 检查服务实例是否存活
2. 查看最近 30 分钟告警
3. 查询错误日志
4. 检查数据库连接状态
5. 检查最近发布记录
6. 汇总结论并给出处置建议
如果执行到某一步发现数据库连接异常,Replanner 会调整后续计划,把排查重点转向数据库或网络链路,而不是机械地继续原计划。
这个模块的价值是:
让复杂运维任务从一次性回答变成可动态调整的多步骤执行流程。
5)工程化封装
工程实现上,我将 Agent 能力封装成后端服务,对外提供统一接口。
大致流程是:
前端 / 运维平台输入告警
→ 后端接收请求
→ RAG 检索知识
→ Agent 判断是否调用工具
→ 工具返回实时数据
→ Agent 生成排查结论
→ 返回前端展示
系统可以输出:
风险等级
故障原因分析
相关历史案例
已调用工具结果
下一步排查建议
处置方案
面试 1 分钟版本
这个项目是一个面向企业运维场景的多 Agent 智能运维系统,主要解决传统运维中故障响应慢、依赖人工经验和知识分散的问题。我基于 LangChain 和 LangGraph 实现了 RAG、ReAct 和 Plan-Execute-Replan 三类核心能力。RAG 部分用于检索历史故障案例、SOP 文档和告警处理手册,并采用向量召回加 reranker 重排序的两阶段检索方式,提高上下文相关性。ReAct Agent 用于在推理过程中调用日志查询、监控查询、工单查询等工具,获取实时系统状态。对于复杂故障,我使用 Plan-Execute-Replan 工作流,先生成排查计划,再逐步执行,并根据工具返回结果动态调整计划。最终系统可以自动生成故障原因分析、相似案例和处置建议,把原本依赖人工经验的排障流程标准化、自动化。
2.3 React和Plan-Execute-RePlan
ReAct Agent 的实现方式
ReAct Agent 主要用于处理需要边分析边调用工具的运维问题,比如查询日志、查询监控、查询工单、检索知识库等。
它的核心逻辑是:
用户问题 / 告警信息
→ LLM 推理当前需要什么信息
→ 选择工具并生成 tool call
→ 执行工具
→ 将工具结果作为 Observation 写回上下文
→ LLM 继续推理
→ 直到不再调用工具,输出最终答案
在代码实现上,它本质上是一个循环:
for iteration in range(max_iterations):
response = llm.invoke(messages, tools=tools)
if not response.tool_calls:
final_answer = response.content
break
for tool_call in response.tool_calls:
tool_result = execute_tool(tool_call)
messages.append({
"role": "tool",
"content": tool_result,
"tool_call_id": tool_call.id
})
也就是说,ReAct 并不是一次性生成答案,而是通过:
Reasoning → Acting → Observation → Reasoning
不断迭代完成任务。
项目中实现React的关键代码:
self.agent = create_agent(
self.model,
tools=all_tools,
checkpointer=self.checkpointer,
)
其中:
self.model = ChatQwen(...)
表示使用千问模型;
all_tools = self.tools + self.mcp_tools
表示把本地工具和 MCP 工具一起绑定给 Agent。
本地工具包括:
retrieve_knowledge
get_current_time
MCP 工具主要用于接入外部运维能力,比如日志查询、监控告警等。
执行流程
用户输入问题后,代码先构造消息:
messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=question)
]
然后通过:
result = await self.agent.ainvoke(
input={"messages": messages},
config={"configurable": {"thread_id": session_id}},
)
触发 Agent 执行。
执行过程中,LangGraph 会自动完成 ReAct 风格的循环:
用户问题
→ 大模型判断是否需要调用工具
→ 如果需要,生成工具调用
→ 系统执行工具
→ 工具结果写回 messages
→ 大模型继续推理
→ 直到不再调用工具,输出最终回答
所以项目中不需要你手动维护:
for iteration in range(max_iterations):
这部分循环已经由 LangChain / LangGraph 封装。
状态怎么传递和保持
项目中的状态本质上是:
messages
也就是用户消息、AI 消息、工具调用消息和工具返回结果。
你代码里定义了:
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], add_messages]
它表示 Agent 状态结构中有一个 messages 字段,并且新消息会通过 add_messages 追加进去。
同时你使用:
self.checkpointer = MemorySaver()
它不是状态本身,而是状态存储器。作用是把某个会话的 messages 状态按 thread_id 保存下来。
每次调用时传入:
config={"configurable": {"thread_id": session_id}}
这样 LangGraph 就能根据 session_id 区分不同会话,并保存 / 恢复对应的消息状态。
messages 是 Agent 的状态内容,checkpointer 负责按 session_id/thread_id 保存和恢复这份状态。
当前实现使用的是 MemorySaver,所以状态保存在内存中;如果要做生产级持久化,可以替换成数据库型 checkpointer,比如 SQLite、Postgres 或 Redis。
所以可以总结为:
messages 是状态本体
AgentState 描述状态结构
MemorySaver 负责保存状态
thread_id/session_id 负责区分不同会话
面试回答版
在我的 OmniOps 项目中,ReAct Agent 是基于 LangChain 新版
create_agent实现的,底层由 LangGraph 管理执行流程。我把 ChatQwen 模型、本地 RAG 工具和 MCP 运维工具一起绑定到 Agent 上。用户输入问题后,系统构造 system message 和 human message,然后通过agent.ainvoke或agent.astream触发执行。执行过程中,模型会根据当前 messages 判断是否需要调用工具,比如知识库检索、日志查询或监控查询;工具返回结果会作为消息继续写回 messages,模型再基于新的 observation 继续推理,直到输出最终答案。状态方面,Agent 的核心状态是 messages,AgentState定义了 messages 的状态结构,MemorySaver作为 checkpointer 按thread_id=session_id保存不同会话的状态。
Plan-Execute-Replan 的实现方式
Plan-Execute-Replan 主要用于处理复杂、多步骤、需要动态调整计划的运维任务。
它比 ReAct 更适合复杂排障,例如:
排查订单服务不可用的根因
分析一次线上故障的完整影响链路
生成并执行故障处置方案
它的核心流程是:
Planner:生成初始计划
Executor:执行当前步骤
Replanner:根据执行结果判断下一步
Planner:生成计划
Planner 负责根据用户问题生成结构化任务列表。
例如:
用户问题:排查订单服务 5xx 激增原因
Planner 输出:
1. 查询订单服务近 30 分钟错误率和 QPS
2. 查询订单服务错误日志
3. 检查最近是否有发布变更
4. 查询数据库连接池状态
5. 检索历史相似故障案例
6. 汇总原因并给出处置建议
代码层面通常用 Pydantic 约束输出结构:
class Plan(BaseModel):
steps: list[str]
这样可以避免大模型输出随意文本,便于后续程序自动执行。
Executor:执行步骤
Executor 每次只执行计划中的一个步骤。
例如当前步骤是:
查询订单服务近 30 分钟错误日志
Executor 会选择合适工具,比如日志查询工具:
result = log_query_tool.run(
service="order-service",
time_range="30m",
keyword="ERROR"
)
然后把执行结果写入状态中:
state["past_steps"].append((current_step, result))
这里的 past_steps 很重要,它记录了已经执行过的步骤和观察结果,供 Replanner 判断后续计划是否需要调整。
Replanner:重新规划或结束
Replanner 根据当前状态做判断:
原始问题
当前剩余计划
已经执行的步骤
工具返回结果
然后有两种输出:
第一种:继续执行
如果信息还不够,Replanner 会更新计划:
继续查询数据库慢查询日志
检查 Redis 连接状态
第二种:结束任务
如果已经能回答问题,Replanner 会生成最终响应:
初步判断订单服务 5xx 激增与最近一次发布有关。
主要依据是……
建议先回滚版本,并继续观察错误率。
代码中通常会通过状态字段判断是否结束:
def should_continue(state):
if state.get("response"):
return "end"
return "execute"
也就是说:
response有值 → 任务完成,结束response为空 → 继续执行下一步
3. LangGraph 中的整体实现
在 LangGraph 中,可以把三个模块建成图节点:
planner → executor → replanner
↑ ↓
└──────────┘
伪代码如下:
workflow.add_node("planner", plan_node)
workflow.add_node("executor", execute_node)
workflow.add_node("replanner", replan_node)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replanner")
workflow.add_conditional_edges(
"replanner",
should_continue,
{
"execute": "executor",
"end": END,
}
)
这里的关键是:
Replanner 不只是判断是否结束,还会根据执行结果动态修改剩余计划。
所以它不是简单的固定流程,而是一个可以根据工具反馈调整方向的执行图。
面试 1 分钟版本
在 OmniOps 项目中,我实现了两类 Agent 工作流。第一类是 ReAct Agent,主要用于需要实时查询工具的运维问答。它本质上是一个 LLM 和工具调用的循环:模型先根据当前问题推理是否需要查询日志、监控或知识库,如果需要就生成 tool call,工具执行后把结果作为 observation 写回上下文,模型再继续分析,直到不再调用工具后输出最终结论。第二类是 Plan-Execute-Replan,主要用于复杂排障任务。它先由 Planner 生成结构化排查计划,然后 Executor 每次执行一个步骤,并把执行结果写入 past_steps,最后 Replanner 根据已有结果判断是继续执行、调整计划,还是生成最终 response。这个流程在 LangGraph 中通过 planner、executor、replanner 三个节点和条件边实现,能够让复杂运维任务从一次性回答变成可追踪、可调整的多步骤执行流程。
2.4 状态传递与保持
1. ReAct Agent 的状态传递与保持
ReAct 的状态主要保存在 messages 对话列表 里。
它不是额外维护复杂状态对象,而是把每轮的用户输入、模型回复、工具调用结果都追加到 messages 中。
流程是:
messages 初始包含:
system prompt
用户问题
第一轮 LLM 判断需要调用工具后,会生成 tool_call:
assistant: 调用日志查询工具
系统执行工具后,把工具结果追加回 messages:
tool: 查询到最近 30 分钟 ERROR 日志数量激增,主要异常为数据库连接超时
下一轮再把完整 messages 传给 LLM:
system prompt
用户问题
assistant tool_call
tool observation
所以模型能“记住”前面查过什么,不是因为程序内部隐藏记忆,而是因为工具结果被写回上下文。
代码层面可以这样理解:
messages = build_initial_messages(user_query)
for i in range(max_iterations):
response = llm.invoke(messages, tools=tools)
messages.append(response)
if not response.tool_calls:
return response.content
for tool_call in response.tool_calls:
tool_result = run_tool(tool_call)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_result
})
面试可以这样说:
ReAct Agent 的状态主要通过 messages 传递。每一轮 LLM 的响应、工具调用和工具返回结果都会追加到 messages 中,下一轮调用模型时把完整 messages 再传入,因此模型可以基于之前的 observation 继续推理。也就是说,ReAct 的状态保持本质上依赖上下文消息历史。
2. Plan-Execute-Replan 的状态传递与保持
Plan-Execute-Replan 的状态更结构化,一般不是只靠 messages,而是用一个 State 对象 保存。
常见字段包括:
class PlanExecuteState(TypedDict):
input: str
plan: list[str]
past_steps: list[tuple[str, str]]
response: str
每个字段作用是:
input:用户原始问题
plan:当前剩余计划
past_steps:已经执行过的步骤和结果
response:最终回答,如果有值说明任务完成
2.1 Planner 怎么更新状态
Planner 读取:
input
生成:
plan
例如:
state["plan"] = [
"查询订单服务近30分钟错误率",
"查询错误日志",
"检查最近发布记录",
"检索历史相似故障案例"
]
此时状态变成:
{
"input": "排查订单服务5xx激增原因",
"plan": [...],
"past_steps": [],
"response": ""
}
2.2 Executor 怎么更新状态
Executor 每次读取当前计划中的第一步:
current_step = state["plan"][0]
然后调用工具执行,得到结果:
result = log_tool.run(current_step)
执行完成后,把结果写入:
state["past_steps"].append((current_step, result))
同时可以从 plan 中移除已经执行的步骤:
state["plan"] = state["plan"][1:]
所以状态会从:
"past_steps": []
变成:
"past_steps": [
("查询错误日志", "发现大量数据库连接超时")
]
这就是它保持执行记忆的方式。
2.3 Replanner 怎么更新状态
Replanner 会读取:
input
plan
past_steps
判断下一步:
如果还需要继续:
它会更新 plan:
state["plan"] = [
"查询数据库连接池状态",
"检查数据库慢查询"
]
如果已经可以回答:
它会写入 response:
state["response"] = "初步判断故障由数据库连接池耗尽导致,建议先扩容连接池并排查慢查询。"
然后条件边判断:
def should_continue(state):
if state.get("response"):
return "end"
return "execute"
也就是说:
response 有值 → 结束
response 为空 → 回到 executor 继续执行
3.状态保持关键代码
React
为Agent 创建了带自动消息追加功能的记忆区
class AgentState(TypedDict):
"""Agent 状态"""
messages: Annotated[Sequence[BaseMessage], add_messages]
AgentState 是一个 类型提示类(TypedDict),用来告诉 LangGraph:
“这个 Agent 在运行过程中,它的内部状态(state)长什么样?”
你可以把它想象成 Agent 的 “大脑记忆区”,每次思考或回复后,都会更新这块内存。
add_messages 是 LangGraph 内置的一个消息合并函数
当你在节点函数中返回:
return {"messages": [AIMessage("你好")]}
LangGraph 会:
查看 AgentState 中 messages 字段的类型注解
发现它是 Annotated[…, add_messages]
于是不直接覆盖,而是调用:
new_state["messages"] = add_messages(current_state["messages"], [AIMessage("你好")])
结果就是追加,而不是替换!
使用的方法:
TypedDict + Annotated + add_messages 组合定义 LangGraph 状态结构
实现的效果:
给 Agent 创建了带自动消息追加功能的记忆区,让智能体可以记住历史对话。
Plan-Execute-Replan
PlanExecuteState
状态 Schema + LangGraph 自动传递 + Reducer 合并机制
这段共享状态主要用了 Python 标准库里的类型工具:
from typing import List, TypedDict, Annotated
import operator
TypedDict :定义 LangGraph 状态的字典结构
List :声明字段是列表类型
Annotated :给字段附加额外的合并规则
operator.add:作为 reducer,用于列表追加合并
LangGraph 自动传递:每个节点都能读到当前 state
在 LangGraph 里,每个节点函数都会接收当前状态:
async def planner(state: PlanExecuteState) -> Dict[str, Any]:
…
所以节点可以从 state 里读数据。
关键代码:定义共享状态结构
from typing import List, TypedDict, Annotated
import operator
class PlanExecuteState(TypedDict):
"""Plan-Execute-Replan 状态"""
# 用户输入
input: str
# 当前剩余计划
plan: List[str]
# 已执行步骤历史,使用 operator.add 追加
past_steps: Annotated[List[tuple], operator.add]
# 最终响应
response: str
其中最关键的是:
past_steps: Annotated[List[tuple], operator.add]
它表示:
past_steps 这个字段更新时,不是覆盖,而是用 operator.add 做列表拼接。
operator.add 是 Python 的列表加法函数,在 LangGraph 中通过 Annotated 注册为 past_steps 字段的 reducer。这样每次节点返回新的 past_steps 时,LangGraph 不会覆盖旧值,而是调用 operator.add(old, new) 把执行历史追加起来。它本身不是 LangGraph 专有的机制,但在 LangGraph 状态定义中充当 reducer 合并函数。
面试回答版
ReAct 和 Plan-Execute-Replan 的状态传递方式不一样。ReAct 的状态主要保存在 messages 里,每一轮 LLM 的回复、tool call 和工具返回的 observation 都会追加到 messages 中,下一轮调用模型时把完整 messages 再传入,所以模型可以基于之前的工具结果继续推理。它的状态保持更像是对话上下文历史。
Plan-Execute-Replan 的状态更结构化,一般用一个 State 对象维护,比如 input、plan、past_steps 和 response。Planner 根据 input 生成 plan;Executor 每次读取 plan 中的当前步骤并执行,把结果写入 past_steps,同时更新剩余 plan;Replanner 根据 input、剩余 plan 和 past_steps 判断是否需要修改计划或生成最终 response。如果 response 有值,条件边就结束;如果没有 response,就回到 Executor 继续执行。所以它的状态保持是显式字段驱动的,更适合复杂多步骤任务。
4.状态传递规范
定义Langgraph图:
from langgraph.graph import StateGraph, START, END
workflow = StateGraph(PlanExecuteState)
workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("replanner", replanner)
workflow.add_edge(START, "planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "replanner")
def should_continue(state: PlanExecuteState) -> str:
if state.get("response"):
return "end"
return "execute"
workflow.add_conditional_edges(
"replanner",
should_continue,
{
"execute": "executor",
"end": END,
}
)
app = workflow.compile()
app = workflow.compile() 是 LangGraph 里把工作流图编译成可执行应用对象 的代码。
之后,LangGraph 会把这个图编译成一个可运行对象 app。
之后你才能调用:
result = app.invoke({
"input": "排查订单服务 5xx 激增原因",
"plan": [],
"past_steps": [],
"response": ""
})
或者流式执行:
for event in app.stream(inputs):
print(event)
流程图:
START
↓
Planner
↓
Executor
↓
Replanner
├── 如果 response 为空 → 回到 Executor
└── 如果 response 有值 → END
| Replanner 决策 | Replanner 返回值 | state 变化 | 条件边下一跳 |
|---|---|---|---|
| continue | {} |
plan 不变,response 仍为空 | executor |
| replan | {"plan": new_steps} |
plan 被替换,response 仍为空 | executor |
| respond | {"response": final_response} |
response 有值 | END |
Planner 节点内部使用 Pydantic 的 Plan(BaseModel) 配合 llm.with_structured_output(Plan),约束大模型输出为包含 steps: List[str] 的结构化结果。
随后 Planner 会提取 plan_result.steps,并以 {“plan”: plan_steps} 的形式返回给 LangGraph,由 LangGraph 将其合并到共享状态 PlanExecuteState 的 plan 字段中。
Plan 类是:
class Plan(BaseModel):
steps: List[str] = Field(
description="完成任务所需的不同步骤..."
)
在 LangChain 里,Pydantic 被用来约束 LLM 输出
在非 LangChain 场景中,只要一个类继承了 Pydantic 的 BaseModel,创建对象时字段不满足类型或校验要求,就会触发校验错误。
继承 BaseModel 的类属于 Pydantic 数据模型类
主要有四个作用:
1.定义数据结构.它可以明确规定一个对象应该有哪些字段。
2.约束字段类型
3.可以添加字段说明 在 LangChain 的结构化输出里,它还会变成 schema 描述,帮助大模型理解这个字段应该输出什么。
4.可以和 with_structured_output() 配合,约束 LLM 输出
5.MCP
1. 本地项目里已经有本地工具
get_current_time
retrieve_knowledge
2. 创建 / 获取 MCP Client
mcp_client = await get_mcp_client_with_retry()
3. 通过 MCP Client 从 MCP Server 拉取外部工具
mcp_tools = await mcp_client.get_tools()
4. 在当前代码里合并工具
all_tools = local_tools + mcp_tools
5. 使用大模型时绑定所有工具
llm_with_tools = llm.bind_tools(all_tools)
6. 如果 LLM 返回 tool_calls
ToolNode(all_tools) 负责执行对应工具
项目中先获取一个 MCP Client,通过它从 MCP Server 动态获取外部工具;本地工具不需要注册到 MCP Client 中,而是在应用侧和 MCP 工具合并成 all_tools。之后通过 llm.bind_tools(all_tools) 把所有工具绑定给大模型,让模型可以生成 tool_calls;再通过 ToolNode(all_tools) 根据模型返回的工具调用真正执行对应工具。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)