LangGraph详解
LangGraph 是 LangChain 生态中专为构建复杂、有状态、支持循环迭代的 AI 工作流而设计的图编排框架,其核心突破在于通过有向循环图(DCG)结构替代传统线性链式流程,解决了多步骤推理、多智能体协作等场景中的动态流程控制问题。与 LangChain 的线性链式调用不同,LangGraph 原生支持条件分支、循环迭代和状态持久化,尤其适用于需要动态调整执行路径的复杂 AI 系统(如多智能体协作、需人工干预的高风险任务)

一 框架概述与设计哲学
1.1 核心定位
LangGraph 不是 RAG 框架,也不是 LLM 封装层,而是专注于Agent 编排与生命周期管理的底层框架。它不隐藏执行细节,让开发者完全掌控节点、边和状态的每一个环节。
1.2 五大设计原则
| 设计原则 | 说明 |
|---|---|
| 低级可控 | 不做黑盒抽象,开发者完全掌控执行流程 |
| 图即状态机 | 每个 super-step 都是一次状态快照,支持时间旅行 |
| 持久性优先 | Checkpointing 是一等公民,而非事后添加的功能 |
| 框架无关 | 不强依赖 LangChain,可独立使用任何 LLM SDK |
| 可组合性 | 图可以嵌套为子图,Graph API 与 Functional API 可混用 |
1.3 适用场景
- 多步骤 Agent 工作流(如 ReAct、Plan-and-Execute)
- 多智能体系统(Multi-Agent)
- 需要人工审核 / 干预的工作流
- 长运行、可恢复的任务
- 复杂的对话系统
二、核心概念与组件
2.1 State(状态)
State 是 LangGraph 的核心,它是一个共享数据结构,代表应用的当前快照。所有节点都从 State 读取输入,并将更新写入 State。
2.1.1 状态定义方式
LangGraph 支持三种状态定义方式:
# 方式1:TypedDict(推荐,性能最好)
from typing_extensions import TypedDict
from typing import Annotated, List
from operator import add
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[List, add_messages] # 使用内置消息reducer
count: int
results: Annotated[List[str], add] # 使用加法reducer
# 方式2:Pydantic BaseModel(支持运行时验证)
from pydantic import BaseModel
class State(BaseModel):
user_input: str
temperature: float = 0.7 # 支持默认值
# 方式3:dataclass(支持默认值)
from dataclasses import dataclass
@dataclass
class State:
user_input: str
temperature: float = 0.7
2.1.2 Reducers(归约器)
Reducers 控制如何将节点返回的更新应用到 State。每个状态键可以有独立的 reducer:
- 默认 reducer:覆盖原有值
- 加法 reducer:
operator.add,适用于列表追加 - 消息 reducer:
add_messages,专门处理消息列表,支持消息更新和序列化 - Overwrite:强制覆盖,绕过 reducer
from langgraph.types import Overwrite
def reset_node(state: State):
# 强制覆盖messages列表,绕过add_messages reducer
return {"messages": Overwrite([])}
2.1.3 MessagesState(预构建状态)
由于消息列表在 LLM 应用中非常常见,LangGraph 提供了预构建的 MessagesState:
from langgraph.graph import MessagesState
class State(MessagesState):
# 继承了messages字段和add_messages reducer
extra_field: str
2.2 Nodes(节点)
节点是执行实际工作的Python 函数(同步或异步)。它们接收当前 State 作为输入,返回 State 的更新。
2.2.1 节点函数签名
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
# 基础节点:只接收state
def basic_node(state: State) -> dict:
return {"count": state["count"] + 1}
# 带config的节点:访问thread_id等配置
def node_with_config(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"results": [f"Processed in thread {thread_id}"]}
# 带runtime的节点:访问运行时上下文
def node_with_runtime(state: State, runtime: Runtime) -> dict:
# 可以访问store、stream_writer等
return {"results": ["Done"]}
2.2.2 特殊节点
- START:图的入口点
- END:图的终止点
2.3 Edges(边)
边定义了节点之间的执行顺序。LangGraph 支持三种类型的边:
2.3.1 普通边(Normal Edges)
固定的执行顺序:
from langgraph.graph import START, END
graph.add_edge(START, "node1")
graph.add_edge("node1", "node2")
graph.add_edge("node2", END)
2.3.2 条件边(Conditional Edges)
根据当前 State 动态决定下一个节点:
def should_continue(state: State) -> str:
if state["count"] < 5:
return "continue"
else:
return "end"
graph.add_conditional_edges(
"agent",
should_continue,
{
"continue": "tools",
"end": END
}
)
2.3.3 Send API(动态并行边)
用于实现 Map-Reduce 模式,动态生成多个并行任务:
from langgraph.types import Send
def map_node(state: State) -> list[Send]:
# 动态生成3个并行任务
return [
Send("worker", {"task": task})
for task in state["tasks"]
]
graph.add_conditional_edges("splitter", map_node)
2.4 StateGraph(状态图)
StateGraph 是 LangGraph 的主类,用于构建和编译图:
from langgraph.graph import StateGraph
# 初始化图
builder = StateGraph(State)
# 添加节点
builder.add_node("node1", node1)
builder.add_node("node2", node2)
# 添加边
builder.add_edge(START, "node1")
builder.add_edge("node1", "node2")
builder.add_edge("node2", END)
# 编译图
graph = builder.compile()
三、基础使用流程
3.1 安装
pip install -U langgraph
# 可选:安装检查点后端
pip install langgraph-checkpoint-sqlite # SQLite
pip install langgraph-checkpoint-redis # Redis
pip install langgraph-checkpoint-postgres # PostgreSQL
3.2 完整示例:ReAct Agent
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
# 1. 定义状态
class State(TypedDict):
messages: Annotated[list, add_messages]
# 2. 初始化工具和模型
tools = [TavilySearchResults(max_results=2)]
tool_node = ToolNode(tools)
model = ChatOpenAI(model="gpt-4o").bind_tools(tools)
# 3. 定义节点
def agent(state: State):
return {"messages": [model.invoke(state["messages"])]}
def should_continue(state: State):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# 4. 构建图
builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
# 5. 编译并运行
graph = builder.compile()
# 运行图
result = graph.invoke({
"messages": [{"role": "user", "content": "今天杭州的天气怎么样?"}]
})
# 打印结果
for message in result["messages"]:
message.pretty_print()
3.3 图可视化
LangGraph 支持生成 Mermaid 图和 PNG 图:
from IPython.display import Image, display
# 生成Mermaid图
mermaid_graph = graph.get_graph().draw_mermaid()
# 生成并显示PNG图
display(Image(graph.get_graph().draw_mermaid_png()))
四、高级功能详解
4.1 Send API 与 Map-Reduce 模式
Send API 允许在运行时动态生成多个并行任务,非常适合处理批量数据:
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
import operator
# 状态定义
class State(TypedDict):
documents: List[str]
summaries: Annotated[List[str], operator.add]
final_summary: str
# Map阶段:为每个文档生成摘要
def map_summarize(state: State) -> List[Send]:
return [
Send("summarize_doc", {"document": doc})
for doc in state["documents"]
]
# Worker节点:处理单个文档
def summarize_doc(state: dict) -> dict:
# 这里调用LLM生成摘要
summary = f"Summary of: {state['document'][:20]}..."
return {"summaries": [summary]}
# Reduce阶段:合并所有摘要
def reduce_summarize(state: State) -> dict:
final_summary = "\n\n".join(state["summaries"])
return {"final_summary": final_summary}
# 构建图
builder = StateGraph(State)
builder.add_node("map_summarize", map_summarize)
builder.add_node("summarize_doc", summarize_doc)
builder.add_node("reduce_summarize", reduce_summarize)
builder.add_edge(START, "map_summarize")
builder.add_conditional_edges("map_summarize", lambda _: ["summarize_doc"])
builder.add_edge("summarize_doc", "reduce_summarize")
builder.add_edge("reduce_summarize", END)
graph = builder.compile()
# 运行
result = graph.invoke({
"documents": [
"Document 1 content...",
"Document 2 content...",
"Document 3 content..."
]
})
print(result["final_summary"])
4.2 Command API
Command API 允许节点同时返回状态更新和控制流指令,比条件边更灵活:
from langgraph.types import Command
from typing import Literal
def human_approval(state: State) -> Command[Literal["approved", "rejected"]]:
# 中断执行,等待人类输入
decision = interrupt({
"question": "Do you approve this action?",
"action": state["proposed_action"]
})
if decision == "approve":
return Command(
goto="approved",
update={"status": "approved"}
)
else:
return Command(
goto="rejected",
update={"status": "rejected"}
)
4.3 Human-in-the-loop(人机协同)
LangGraph 提供了原生的人机协同支持,通过 interrupt() 函数暂停执行,等待人类输入后恢复:
4.3.1 基本使用
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
# 定义状态
class State(TypedDict):
input: str
output: str
approved: bool
# 定义节点
def generate_output(state: State) -> dict:
return {"output": f"Generated output for: {state['input']}"}
def human_review(state: State) -> Command:
# 中断执行,返回需要人类审核的内容
user_input = interrupt({
"output_to_review": state["output"],
"message": "Please review and approve or edit the output"
})
# 根据人类输入更新状态并决定下一步
if user_input.get("approved", False):
return Command(
goto="finalize",
update={"output": user_input.get("edited_output", state["output"])}
)
else:
return Command(goto="generate_output")
def finalize(state: State) -> dict:
return {"approved": True}
# 构建图
builder = StateGraph(State)
builder.add_node("generate_output", generate_output)
builder.add_node("human_review", human_review)
builder.add_node("finalize", finalize)
builder.add_edge(START, "generate_output")
builder.add_edge("generate_output", "human_review")
builder.add_edge("finalize", END)
# 编译时必须提供checkpointer
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 第一次运行:会在human_review节点中断
config = {"configurable": {"thread_id": "1"}}
result = graph.invoke({"input": "Hello World"}, config=config)
# 此时result会包含interrupts信息
print("Interrupts:", result.get("interrupts"))
# 恢复执行:提供人类输入
resume_result = graph.invoke(
Command(resume={"approved": True, "edited_output": "Edited output"}),
config=config
)
print("Final result:", resume_result)
4.3.2 强制中断点
除了在节点内部调用 interrupt(),还可以在编译时指定强制中断点:
# 在执行"tools"节点前中断
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["tools"]
)
# 在执行"agent"节点后中断
graph = builder.compile(
checkpointer=checkpointer,
interrupt_after=["agent"]
)
4.4 Checkpointing 与持久化
Checkpointing 是 LangGraph 的核心功能之一,它允许:
- 保存和恢复图的状态
- 实现多轮对话记忆
- 支持错误恢复
- 实现时间旅行(Time Travel)
4.4.1 检查点后端
LangGraph 支持多种检查点后端:
# 内存后端(开发测试用)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# SQLite后端(本地部署用)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
# Redis后端(生产部署用)
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379/0")
# PostgreSQL后端(生产部署用)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://user:pass@localhost/db")
4.4.2 状态管理
# 获取当前状态
snapshot = graph.get_state(config)
print("Current state:", snapshot.values)
print("Next nodes:", snapshot.next)
# 获取历史检查点
history = list(graph.get_state_history(config))
for checkpoint in history:
print(f"Step {checkpoint.metadata['step']}: {checkpoint.values}")
# 回滚到之前的检查点
graph.update_state(
config,
values={}, # 可以更新状态
as_of=history[1].config # 指定要回滚到的检查点
)
4.5 流式输出
LangGraph 支持多种流式输出模式,提供实时反馈:
| 流模式 | 描述 |
|---|---|
values |
每个步骤后流式传输完整状态 |
updates |
每个步骤后只流式传输状态更新 |
messages |
流式传输 LLM 令牌和工具调用 |
custom |
从节点内部流式传输自定义数据 |
debug |
流式传输所有调试信息 |
# 流式输出完整状态
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "Hello"}]},
config=config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
# 流式输出状态更新
for node_name, update in graph.stream(
{"messages": [{"role": "user", "content": "Hello"}]},
config=config,
stream_mode="updates"
):
print(f"Node {node_name} updated: {update}")
# 流式输出LLM令牌
for token, metadata in graph.stream(
{"messages": [{"role": "user", "content": "Hello"}]},
config=config,
stream_mode="messages"
):
print(token.content, end="", flush=True)
4.6 子图与可组合性
LangGraph 支持将图嵌套为子图,实现模块化设计:
# 定义子图
sub_builder = StateGraph(SubState)
sub_builder.add_node("sub_node1", sub_node1)
sub_builder.add_node("sub_node2", sub_node2)
sub_builder.add_edge(START, "sub_node1")
sub_builder.add_edge("sub_node1", "sub_node2")
sub_builder.add_edge("sub_node2", END)
subgraph = sub_builder.compile()
# 在主图中使用子图
main_builder = StateGraph(MainState)
main_builder.add_node("subgraph", subgraph)
main_builder.add_node("main_node", main_node)
main_builder.add_edge(START, "subgraph")
main_builder.add_edge("subgraph", "main_node")
main_builder.add_edge("main_node", END)
main_graph = main_builder.compile()
五、常见 Agent 模式
5.1 ReAct 模式
最基础的 Agent 模式,"思考 - 行动 - 观察" 循环:
START → Agent → [工具调用? → Tools → Agent] → END
5.2 Plan-and-Execute 模式
先制定计划,再逐步执行:
START → Planner → Executor → [还有步骤? → Executor] → Finalizer → END
5.3 Supervisor 模式
一个 Supervisor Agent 管理多个 Worker Agent:
START → Supervisor → [分配任务给Worker] → Worker1/Worker2/Worker3 → Supervisor → [完成? → END]
5.4 多轮对话模式
结合 Checkpointing 实现多轮对话:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
def chatbot(state: MessagesState):
return {"messages": [model.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
# 第一轮对话
graph.invoke({"messages": [{"role": "user", "content": "我叫张三"}]}, config=config)
# 第二轮对话(自动记住上下文)
result = graph.invoke({"messages": [{"role": "user", "content": "我叫什么名字?"}]}, config=config)
result["messages"][-1].pretty_print() # 会回答"你叫张三"
六、生产部署与最佳实践
6.1 性能优化
- 使用
TypedDict而非 Pydantic 作为状态(性能更好) - 合理设置递归限制(默认 25):
graph.compile(recursion_limit=100) - 使用 Redis 作为检查点后端(高并发场景)
- 启用节点缓存:
graph.compile(cache=True)
6.2 错误处理
- 在节点内部添加 try-except 块
- 使用重试策略:
from langgraph.retry import RetryPolicy
retry_policy = RetryPolicy(max_attempts=3)
builder.add_node("flaky_node", flaky_node, retry=retry_policy)
6.3 可观测性
- 集成 LangSmith 进行追踪和调试
- 使用
stream_mode="debug"获取详细执行信息 - 记录检查点历史用于事后分析
6.4 部署选项
- LangGraph Platform:官方托管平台,提供自动扩展和监控
- FastAPI + Uvicorn:自定义 API 服务
- Docker + Kubernetes:容器化部署
七、v1.1.10 最新特性
- 改进的 Redis 检查点:v0.1.0 版本重构,性能提升 10 倍以上
- 增强的 Command API:支持更复杂的控制流指令
- 更好的类型安全:改进了类型提示和运行时验证
- 优化的流式输出:支持同时使用多种流模式
- 改进的子图支持:更好的状态传递和错误处理
八、总结
LangGraph 是目前构建生产级 AI Agent 的最佳框架之一。它的核心优势在于:
- 完全可控:开发者可以精确控制 Agent 的每一步执行
- 生产就绪:提供持久化、容错、人工干预等企业级功能
- 灵活可扩展:支持多种 Agent 模式和复杂工作流
- 活跃社区:持续更新,生态系统丰富
对于需要构建复杂、可靠、可扩展的 AI Agent 系统的开发者来说,LangGraph 是一个不可或缺的工具
九 生产级多 Agent 系统
这是一个基于Supervisor-Worker 架构的生产级多 Agent 系统,完全符合 LangGraph 1.1.10 最新规范。包含工具调用、人工审核、Redis 持久化、错误重试、日志记录、任务跟踪等企业级功能
用户输入 → 主管Agent → 任务分解 → [研究Agent/写作Agent/代码Agent] → 人工审核 → 结果汇总 → 输出
9.1 代码
import os
import json
import uuid
from typing import TypedDict, Annotated, List, Literal, Optional
from enum import Enum
from datetime import datetime
from dotenv import load_dotenv
from operator import add
# LangGraph
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import ToolNode
# LangChain
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from langchain_experimental.tools import PythonAstREPLTool
# FastAPI
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import sys
sys.path.append(os.path.basename(os.path.basename(os.path.abspath(__file__))))
from ai_study.agent_study.deepseek_test import deep_llm, return_llm
from ai_study.agent_study.tavily_test import search
# ======================
# 配置
# ======================
load_dotenv()
# ======================
# 状态定义
# ======================
class TaskStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
APPROVED = "approved"
class Task(TypedDict):
task_id: str
task_type: Literal["research", "writing", "coding"]
description: str
status: TaskStatus
result: str
created_at: str
class State(TypedDict):
user_query: str
tasks: Annotated[List[Task], add]
current_task_index: int
messages: Annotated[List[BaseMessage], add]
final_output: str
progress: float
created_at: str
updated_at: str
# ======================
# 工具
# ======================
search_tool = search
python_repl = PythonAstREPLTool()
tools = [search_tool, python_repl]
tool_node = ToolNode(tools)
# LLM
model = return_llm()
# ======================
# 节点
# ======================
def supervisor_node(state: State, config: RunnableConfig) -> Command:
if not state.get("tasks"):
prompt = f"""
请将用户请求拆分为最多3个任务,仅返回JSON。
任务类型仅限:research, writing, coding。
格式:{{"tasks": [{{"task_id":"task_1","task_type":"research","description":"..."}}]}}
用户请求:{state['user_query']}
"""
print(state['user_query'])
response = model.invoke(prompt)
try:
tasks = json.loads(response.content)["tasks"]
now = datetime.now().isoformat()
for t in tasks:
t.update(status="pending", result="", created_at=now)
except Exception as e:
return Command(goto=END, update={"final_output": f"解析失败: {str(e)}"})
return Command(
update={"tasks": tasks, "current_task_index": 0, "created_at": now, "updated_at": now},
goto="assign_task"
)
if all(t["status"] == "approved" for t in state["tasks"]):
return Command(goto="finalize")
return Command(goto="assign_task")
def assign_task_node(state: State) -> Command:
idx = state["current_task_index"]
task = state["tasks"][idx]
task["status"] = "in_progress"
goto_agent = {
"research": "research_agent",
"writing": "writing_agent",
"coding": "coding_agent"
}[task["task_type"]]
return Command(update={"tasks": state["tasks"]}, goto=goto_agent)
def research_agent(state: State) -> Command:
idx = state["current_task_index"]
task = state["tasks"][idx]
llm = model.bind_tools([search_tool])
res = llm.invoke([HumanMessage(content=f"研究任务:{task['description']}")])
return Command(update={"messages": [res]}, goto="tools" if res.tool_calls else "review")
def writing_agent(state: State) -> Command:
idx = state["current_task_index"]
task = state["tasks"][idx]
res = model.invoke([HumanMessage(content=f"写作任务:{task['description']}")])
task["result"] = res.content
task["status"] = "completed"
return Command(update={"tasks": state["tasks"]}, goto="review")
def coding_agent(state: State) -> Command:
idx = state["current_task_index"]
task = state["tasks"][idx]
llm = model.bind_tools([python_repl])
res = llm.invoke([HumanMessage(content=f"编码任务:{task['description']}")])
return Command(update={"messages": [res]}, goto="tools" if res.tool_calls else "review")
def review_node(state: State) -> Command:
idx = state["current_task_index"]
task = state["tasks"][idx]
if not task.get("result"):
last_msg = state["messages"][-1]
if isinstance(last_msg, AIMessage) and not last_msg.tool_calls:
task["result"] = last_msg.content
task["status"] = "completed"
decision = interrupt({
"task_id": task["task_id"],
"result": task["result"],
"msg": "请输入 approve / reject"
})
if decision.get("decision") == "approve":
task["status"] = "approved"
return Command(update={"tasks": state["tasks"], "current_task_index": idx + 1}, goto="supervisor")
task["status"] = "pending"
return Command(goto="assign_task")
def finalize_node(state: State) -> Command:
final_content = "\n\n".join(t["result"] for t in state["tasks"] if t["status"] == "approved")
return Command(update={"final_output": final_content}, goto=END)
# ======================
# 构建图(无错误版)
# ======================
def build_graph():
builder = StateGraph(State)
builder.add_node("supervisor", supervisor_node)
builder.add_node("assign_task", assign_task_node)
builder.add_node("research_agent", research_agent)
builder.add_node("writing_agent", writing_agent)
builder.add_node("coding_agent", coding_agent)
builder.add_node("tools", tool_node)
builder.add_node("review", review_node)
builder.add_node("finalize", finalize_node)
builder.add_edge(START, "supervisor")
builder.add_edge("assign_task", "supervisor")
builder.add_edge("research_agent", "tools")
builder.add_edge("writing_agent", "review")
builder.add_edge("coding_agent", "tools")
builder.add_edge("tools", "review")
builder.add_edge("review", "supervisor")
builder.add_edge("finalize", END)
return builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["review"]
)
graph = build_graph()
# ======================
# FastAPI
# ======================
app = FastAPI(title="多Agent系统")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
class QueryRequest(BaseModel):
user_query: str
class ResumeRequest(BaseModel):
thread_id: str
decision: Literal["approve", "reject"]
@app.post("/run")
async def run(req: QueryRequest):
tid = str(uuid.uuid4())
cfg = {"configurable": {"thread_id": tid}}
graph.invoke({"user_query": req.user_query}, cfg)
return {"thread_id": tid, "status": "waiting_for_approval"}
@app.post("/resume")
async def resume(req: ResumeRequest):
cfg = {"configurable": {"thread_id": req.thread_id}}
result = graph.invoke(Command(resume={"decision": req.decision}), cfg)
return {"final_output": result.get("final_output")}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
9.2 特性详解
9.2.1 Redis 持久化
- 使用
RedisSaver作为检查点后端,支持高并发和分布式部署 - 每个会话通过唯一的
thread_id隔离 - 自动保存所有状态快照,支持随时恢复和回滚
- 支持查看完整的会话历史和执行轨迹
9.2.2 人工审核机制
- 在
human_review节点自动中断执行 - 向人类展示任务详情和结果
- 支持批准、拒绝并提供修改意见
- 支持编辑结果后再批准
- 审核不通过时自动重新执行任务
9.2.3 工具调用
- 集成 Tavily 网络搜索工具,获取最新信息
- 集成 PythonREPL 工具,安全执行代码
- 支持多轮工具调用
- 自动处理工具结果并生成最终回答
9.2.4 错误处理与重试
- 为所有节点配置了默认重试策略
- 最多重试 3 次,指数退避
- 自动记录错误信息
- 任务失败时优雅终止并提示用户
9.2.5 任务管理
- 自动将复杂查询分解为多个子任务
- 跟踪每个任务的状态和进度
- 支持任务依赖和上下文传递
- 动态分配任务给最合适的 Agent
9.3 部署与扩展
9.3.1 创建.env文件:
OPENAI_API_KEY=your_openai_api_key
TAVILY_API_KEY=your_tavily_api_key
REDIS_URL=redis://localhost:6379/0
9.3.2扩展新的 Agent 类型
- 定义新的任务类型
- 创建对应的 Agent 节点函数
- 在
assign_task_node中添加任务分配逻辑 - 如有需要,添加新的工具
9.3.3 部署为 API 服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
graph = build_multi_agent_graph()
class QueryRequest(BaseModel):
user_query: str
thread_id: str = None
class ResumeRequest(BaseModel):
thread_id: str
decision: str
feedback: str = None
edited_result: str = None
@app.post("/query")
async def create_query(request: QueryRequest):
thread_id = request.thread_id or str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}
try:
result = graph.invoke(
{"user_query": request.user_query},
config=config
)
snapshot = graph.get_state(config)
if "interrupts" in snapshot.metadata:
return {
"thread_id": thread_id,
"status": "interrupted",
"interrupt_data": snapshot.metadata["interrupts"][0]["value"]
}
else:
return {
"thread_id": thread_id,
"status": "completed",
"final_output": result["final_output"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/resume")
async def resume_query(request: ResumeRequest):
config = {"configurable": {"thread_id": request.thread_id}}
try:
resume_data = {
"decision": request.decision,
"feedback": request.feedback,
"edited_result": request.edited_result
}
result = graph.invoke(
Command(resume=resume_data),
config=config
)
snapshot = graph.get_state(config)
if "interrupts" in snapshot.metadata:
return {
"status": "interrupted",
"interrupt_data": snapshot.metadata["interrupts"][0]["value"]
}
else:
return {
"status": "completed",
"final_output": result["final_output"]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)