多智能体工作流:串行,并行,一级一个模型负责生成,一个模型负责评估,评估不过继续反馈生成,直到评估通过
在这里插入图片描述
多Agent的特点:
在这里插入图片描述

多智能体架构:

在这里插入图片描述
在新版本的langchain,把langgrach单独拿出了,并赋予了更多的agent功能,langGraph相对于langChain增强了三个部分,其中包括云平台/监看。

在这里插入图片描述
节点 边 图 是langGrach中的重要概念,其中边是连接节点的条件:

在这里插入图片描述
要点:
1.每个节点的返回return必须是状态。
2.在一开始就规划好全部的节点图,除了conditional,其他的都是确定的,只能执行节点里有的流程。
3.跟langchain不同,大模型不能直接调用工具,只能在AIMessage中包含tools_call,在里面放入想要调用的工具,需要自行用conditional的边来执行。
4.工具只有加入成为一个节点才能最终被执行,多个工具一般为为一个节点。由大语言模型中的tools_call决定调用哪一个。执行到工具的节点,如action(ToolNode),它是根据 last_message.tool_calls 里的 name 来决定执行哪个工具。
注意:
model.bind_tools(tools)工具如果绑定模型,模型就有可能在tools_call中表明调他,但如果它不在节点中,无法被执行。

from langchain_core.messages import AnyMessage
from typing_extensions import TypedDict

# 自定义节点间通讯的消息类型
class State(TypedDict):
    messages: list[AnyMessage]
    extra_field: int


#定义节点
from langchain_core.messages import AIMessage
def node(state: State):
    messages = state["messages"]
    new_message = AIMessage("你好!")
    return {"messages": messages + [new_message], "extra_field": 10}




from langgraph.graph import StateGraph
graph_builder = StateGraph(State) #graph_builder就行一张空白的纸,后面的节点都要画在这个纸上 State定义了机器能传递什么数据
graph_builder.add_node(node) #向图添加节点
graph_builder.set_entry_point("node") #设置这个图运行的起点
graph = graph_builder.compile() #编译这个图,让他成为真正可以运行的机器

#使用:
# 运行图(最常见方式)
result = graph.invoke({
    "messages": [],           # 向这个图中添加State类型的数据
    "extra_field": 0
}) #它就会把这个函数传递给node函数了

print(result["messages"])

串行控制

from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import START, StateGraph

class State(TypedDict):
    value_1: str
    value_2: int

def step_1(state: State):
    return {"value_1": "a"}

def step_2(state: State):
    current_value_1 = state["value_1"]
    return {"value_1": f"{current_value_1} b"}

def step_3(state: State):
    return {"value_2": 10}

//一直有一个公共的State值,每个节点都在修改
//这里的核心在于,每个节点的返回值都是在修改这个公共的值,下一个节点的输入值,就是这个公共的值
graph_builder = StateGraph(State)

# Add nodes
graph_builder.add_node(step_1)
graph_builder.add_node(step_2)
graph_builder.add_node(step_3)

# Add edges
graph_builder.add_edge(START, "step_1")
graph_builder.add_edge("step_1", "step_2")
graph_builder.add_edge("step_2", "step_3")

graph = graph_builder.compile()


display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

graph.invoke({"value_1": "c"})
{'value_1': 'a b', 'value_2': 10}

分支结构

import operator
from typing import Annotated, Any

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display

# Annotated允许为类型提示添加额外的元数据,而不影响类型检查器对类型本身的理解
class State(TypedDict):
    aggregate: Annotated[list, operator.add] #每次都是添加进list


def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}


def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}


def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}


def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()


display(Image(graph.get_graph().draw_mermaid_png()))
//运行时配置,{"configurable": {"thread_id": "foo"}} 这个是标记这个graph的,用于Memory(记忆)Checkpointer(断点恢复)。在这里没真正使用它的功能
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
{'aggregate': ['A', 'B', 'C', 'D']}

在这里插入图片描述

条件分支

import operator
from typing import Annotated, Literal

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    aggregate: Annotated[list, operator.add]


def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}


def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}


# Define nodes
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

#这个地方的核心在于return "b"就是直接转到执行b节点
# Define edges
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END


builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

//打印

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

graph.invoke({"aggregate": []})

Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
graph.invoke({"aggregate": []})

//为了防止异常情况况下无限循环,设置能在这个结构体上操作的最大次数
from langgraph.errors import GraphRecursionError
try:
    graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")

循环

//循环就是添加边的时候,指向值前

import operator
from typing import Annotated, Literal

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display

class State(TypedDict):
    aggregate: Annotated[list, operator.add]


def a(state: State):
    print(f'Node A sees {state["aggregate"]}')
    return {"aggregate": ["A"]}


def b(state: State):
    print(f'Node B sees {state["aggregate"]}')
    return {"aggregate": ["B"]}


def c(state: State):
    print(f'Node C sees {state["aggregate"]}')
    return {"aggregate": ["C"]}


def d(state: State):
    print(f'Node D sees {state["aggregate"]}')
    return {"aggregate": ["D"]}


# 节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)


# 边
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END


builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()


display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

result = graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']

图的运行时配置

import operator
from typing import Annotated, Sequence
from typing_extensions import TypedDict

from langchain_deepseek import ChatDeepSeek
from langchain_openai import ChatOpenAI
import os
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.runnables.config import RunnableConfig
from langgraph.graph import END, StateGraph, START

model = ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
)

model1 = ChatOpenAI(
    model="gpt-3.5-turbo",
    temperature=0,
    api_key=os.environ.get("OPENAI_API_KEY"),
    base_url=os.environ.get("OPENAI_API_BASE"),
)

# 定义要切换的模型
models = {
    "deepseek": model,
    "openai": model1,
}


class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]


def _call_model(state: AgentState, config: RunnableConfig):
    # 使用LCEL的配置
    model_name = config["configurable"].get("model", "deepseek")
    model = models[model_name]
    response = model.invoke(state["messages"])
    return {"messages": [response]}


# Define a new graph
builder = StateGraph(AgentState)
builder.add_node("model", _call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

//没有增加运行时配置的情况下,它会默认调用deepseek
graph.invoke({"messages": [HumanMessage(content="hi 你是谁?")]})
//图的运行时配置,就是多个行参而已
config = {"configurable": {"model": "openai"}}
graph.invoke({"messages": [HumanMessage(content="hi 你是谁?")]}, config=config)

激活持久层:

from langchain_deepseek import ChatDeepSeek
import os
from langgraph.graph import StateGraph, MessagesState, START

model = ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
)


def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": response}


builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")


from langgraph.checkpoint.memory import MemorySaver
# 使用 MemorySaver 保存中间状态 这个是内存,且聊天记录是没有向量化,直接存入
memory = MemorySaver()
#加上memory就能保持记忆了,激活持久化层
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! 我是tomie"}
#stream_mode="values"方式就是每次经过一个节点就返回一次MessagesState数据
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

input_message = {"role": "user", "content": "我叫什么名字?"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()
================================ Human Message =================================

我叫什么名字?
================================== Ai Message ==================================

哈哈,你刚刚说过啦!你叫 **Tomie**~ 😄  
(如果记错了,随时提醒我哦!需要我记住这个名字吗?)

用线程分割对话

input_message = {"role": "user", "content": "我叫什么名字?"}
for chunk in graph.stream(
    {"messages": [input_message]},
    {"configurable": {"thread_id": "2"}}, # different thread_id
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()
  
================================ Human Message =================================
我叫什么名字?
================================== Ai Message ==================================
目前我无法直接知道你的名字,但如果你愿意告诉我,我会记住并在接下来的对话中使用哦!😊 或者,你可以叫我“小助手”或任何你喜欢的名字~

跨线程共享持久化数据:使用userid实现
相同的user_id,即使不同的线程也不会隔离数据
设置内存记忆

from langgraph.store.memory import InMemoryStore
from langchain_openai import OpenAIEmbeddings
import os
# 使用OpenAI的封装,但是运行国产嵌入模型
# 使用内存存储来保存向量化后记忆数据
# 也是在内存中,但是向量数据库
in_memory_store = InMemoryStore(
    index={
        "embed": OpenAIEmbeddings(
            model="Pro/BAAI/bge-m3",
            api_key=os.environ.get("DEEPSEEK_API_KEY"),
            base_url=os.environ.get("DEEPSEEK_API_BASE")+ "/v1",
            ),
        "dims": 1024,
    }
)
import uuid
from typing import Annotated
from typing_extensions import TypedDict

from langchain_deepseek import ChatDeepSeek
import os
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.base import BaseStore


model = ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
)


# 注意:我们将 Store 参数传递给节点 --
# 这是我们编译图时使用的 Store
def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # 从存储中检索用户信息
    user_id = config["configurable"]["user_id"]
    # 从存储中检索用户信息
    namespace = ("memories", user_id)
    memories = store.search(namespace, query=str(state["messages"][-1].content))
    info = "\n".join([d.value["data"] for d in memories])
    system_msg = f"你是一个正在与用户交谈的小助手。用户信息:{info}"

    # 如果用户要求模型记住信息,则存储新的记忆
    last_message = state["messages"][-1]
    if "记住" in last_message.content.lower() or "remember" in last_message.content.lower():
        # 硬编码一个记忆
        memory = "用户名字是tomiezhang"
        store.put(namespace, str(uuid.uuid4()), {"data": memory})

    response = model.invoke(
        [{"role": "system", "content": system_msg}] + state["messages"]
    )
    return {"messages": response}


builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_edge(START, "call_model")

# 注意:我们在编译图时传递了 store 对象 
# 也传递了MemorySaver对象,跟之前相比就多个in_memory_store
graph = builder.compile(checkpointer=MemorySaver(), store=in_memory_store)
config = {"configurable": {"thread_id": "1", "user_id": "1"}}
input_message = {"role": "user", "content": "请记住我的名字叫tomiezhang!"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()

# 注意线程ID和用户ID 相同的user_id,即使不同的线程也不会隔离数据
config = {"configurable": {"thread_id": "2", "user_id": "1"}}
input_message = {"role": "user", "content": "我叫什么名字?"}
for chunk in graph.stream({"messages": [input_message]}, config, stream_mode="values"):
    chunk["messages"][-1].pretty_print()
================================ Human Message =================================
我叫什么名字?
================================== Ai Message ==================================
你的名字是Tomie Zhang。需要我帮你记住其他信息吗? 😊

长期记忆
使用MongDB

# 拉取MongoDB镜像
docker pull mongo

# 运行MongoDB容器
docker run -d -p 27017:27017 --name mongodb mongo

# 验证容器是否运行
docker ps
! pip install -U pymongo langgraph langgraph-checkpoint-mongodb

测试MongoDB连接

import pymongo

# 创建MongoDB客户端连接
client = pymongo.MongoClient("mongodb://localhost:27017/")

# 测试连接
try:
    client.admin.command('ping')
    print("MongoDB连接成功!")
except Exception as e:
    print(f"MongoDB连接失败: {e}")

创建一个最简单的智能体

from typing import Literal

from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek
from langgraph.prebuilt import create_react_agent


@tool
def get_weather(city: Literal["北京", "深圳"]):
    """用来返回天气信息的工具函数。"""
    if city == "北京":
        return "北京天气晴朗 大约22度 湿度30%"
    elif city == "深圳":
        return "深圳天气多云 大约28度 湿度80%"
    else:
        raise AssertionError("Unknown city")


tools = [get_weather]
model = ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
)

连接mongodb进行查询

from langgraph.checkpoint.mongodb import MongoDBSaver

MONGODB_URI = "localhost:27017"  # replace this with your connection string

with MongoDBSaver.from_conn_string(MONGODB_URI) as checkpointer:
    graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "1"}}
    response = graph.invoke(
        {"messages": [("human", "北京今天的天气如何?")]}, config
    )

补充,更graph
优化记忆

  • 消息过滤:对旧消息进行类似删除或编辑的操作,目的是为了防止撑爆上下文
  • 消息总结:对旧消息进行总结,目的一样是为了防止记忆内容过长
  • 注意对记忆的管理是一项关于召回率和精度的平衡艺术

这是对话过滤,一般使用对话总结方法。

def filter_messages(messages: list):
    # 这是一个非常简单的辅助函数,它只使用最后一条消息
    return messages[-1:]

在节点与节点间增加人类反馈节点

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
    input: str
    user_feedback: str


def step_1(state):
    print("---Step 1---")
    pass


def human_feedback(state):
    print("---human_feedback---")
    feedback = interrupt("Please provide feedback:")
    return {"user_feedback": feedback}


def step_3(state):
    print("---Step 3---")
    pass


builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))


# Input
initial_input = {"input": "你好"}

# Thread
thread = {"configurable": {"thread_id": "1"}}

# Run the graph until the first interruption
# stream相对于invate是流式的。updates只返回State更新的部分
for event in graph.stream(initial_input, thread, stream_mode="updates"):
    print(event)
    print("\n")
   
---Step 1---
{'step_1': None}

---human_feedback---
{'__interrupt__': (Interrupt(value='Please provide feedback:', resumable=True, ns=['human_feedback:90f82932-04be-971f-eea2-d1366a5cfdd1']),)}


添加人类反馈
#这个意思就是当中断后,再向stream()发送一个Command()类型的命令,就能让程序继续执行。

# 继续执行
for event in graph.stream(
    Command(resume="go to step 3!"), thread, stream_mode="updates"
):
    print(event)
    print("\n")
---human_feedback---
{'human_feedback': {'user_feedback': 'go to step 3!'}}


---Step 3---
{'step_3': None}

时光旅行


  • 重放
  • 分叉
  • 注意经过实际测试deepseek的tool calling能力还是不如openai
    重放:
# 设置工具
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.graph import MessagesState, START
from langgraph.prebuilt import ToolNode
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver


@tool
def play_song_on_qq(song: str):
    """在qq音乐上播放歌曲"""
    # 调用QQ音乐 API...
    return f"成功在QQ音乐上播放了{song}!"


@tool
def play_song_on_163(song: str):
    """在网易云上播放歌曲"""
    # 调用网易云 API...
    return f"成功在网易云上播放了{song}!"


tools = [play_song_on_qq, play_song_on_163]
tool_node = ToolNode(tools)

# 设置模型
from langchain_openai import ChatOpenAI
import os

deepseek = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    api_key=os.environ.get("OPENAI_API_KEY"),
    base_url=os.environ.get("OPENAI_BASE_URL"),
)

model = deepseek.bind_tools(tools, parallel_tool_calls=False)


# 定义节点和条件边


# 定义确定是否继续的函数
def should_continue(state):
    messages = state["messages"]
    last_message = messages[-1]
    # 如果没有函数调用,则结束
    if not last_message.tool_calls:
        return "end"
    # 否则如果有,我们继续
    else:
        return "continue"


# 定义调用模型的函数
def call_model(state):
    messages = state["messages"]
    response = model.invoke(messages)
    # 我们返回一个列表,因为这将被添加到现有列表中
    return {"messages": [response]}


# 定义一个新图
workflow = StateGraph(MessagesState)

# 定义我们将循环的两个节点
workflow.add_node("agent", call_model)
workflow.add_node("action", tool_node)

# 将入口点设置为`agent`
# 这意味着这个节点是第一个被调用的
workflow.add_edge(START, "agent")

# 现在添加一个条件边
workflow.add_conditional_edgesgraph(
    # 首先,我们定义起始节点。我们使用`agent`。
    # 这意味着这些是在调用`agent`节点后采取的边。
    "agent",
    # 接下来,我们传入将确定下一个调用哪个节点的函数。
    should_continue,
    # 最后我们传入一个映射。
    # 键是字符串,值是其他节点。
    # END是一个特殊节点,标记图应该结束。
    # 将会发生的是我们调用`should_continue`,然后该函数的输出
    # 将与此映射中的键匹配。
    # 根据匹配的键,然后调用相应的节点。
    {
        # 如果是`tools`,则调用工具节点。
        "continue": "action",
        # 否则我们结束。
        "end": END,
    },
)

# 现在我们从`tools`到`agent`添加一个普通边。
# 这意味着在调用`tools`之后,下一步调用`agent`节点。
workflow.add_edge("action", "agent")

# 设置内存
memory = MemorySaver()

# 最后,我们编译它!
# 这将它编译成一个LangChain Runnable,
# 意味着你可以像使用任何其他runnable一样使用它

# 我们添加`interrupt_before=["action"]`
# 这将在调用`action`节点之前添加一个断点
app = workflow.compile(checkpointer=memory)

然后:

from langchain_core.messages import HumanMessage

config = {"configurable": {"thread_id": "1"}}
input_message = HumanMessage(content="你能播放一首周杰伦播放量最高的歌曲吗?")
for event in app.stream({"messages": [input_message]}, config, stream_mode="values"):
    event["messages"][-1].pretty_print()

结果:

================================ Human Message =================================

你能播放一首周杰伦播放量最高的歌曲吗?
================================== Ai Message ==================================
Tool Calls:
  play_song_on_qq (call_jXjBMwXhQdWGVwmzCG607Dg3)
 Call ID: call_jXjBMwXhQdWGVwmzCG607Dg3
  Args:
    song: 周杰伦
================================= Tool Message =================================
Name: play_song_on_qq

成功在QQ音乐上播放了周杰伦!
================================== Ai Message ==================================

我已经在QQ音乐上播放了周杰伦的歌曲!你可以去QQ音乐欣赏他的音乐。

查看记录并重放

app.get_state(config).values["messages"]

[HumanMessage(content='你能播放一首周杰伦播放量最高的歌曲吗?', additional_kwargs={}, response_metadata={}, id='e2a5acbb-4b30-44d9-9253-04e2248dd617'),
 AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_jXjBMwXhQdWGVwmzCG607Dg3', 'function': {'arguments': '{"song":"周杰伦"}', 'name': 'play_song_on_qq'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 86, 'total_tokens': 106, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-11-20', 'system_fingerprint': 'fp_ded0d14823', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-aa8fe78a-f8a8-4edd-9f17-729d5dc6fb92-0', tool_calls=[{'name': 'play_song_on_qq', 'args': {'song': '周杰伦'}, 'id': 'call_jXjBMwXhQdWGVwmzCG607Dg3', 'type': 'tool_call'}], usage_metadata={'input_tokens': 86, 'output_tokens': 20, 'total_tokens': 106, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}),
 ToolMessage(content='成功在QQ音乐上播放了周杰伦!', name='play_song_on_qq', id='2216abd2-e0d4-48d4-ad6c-d7418ce15a4c', tool_call_id='call_jXjBMwXhQdWGVwmzCG607Dg3'),
 AIMessage(content='我已经在QQ音乐上播放了周杰伦的歌曲!你可以去QQ音乐欣赏他的音乐。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 127, 'total_tokens': 153, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-11-20', 'system_fingerprint': 'fp_ded0d14823', 'finish_reason': 'stop', 'logprobs': None}, id='run-460a30ad-44b4-4eb5-aefb-0887034245a1-0', usage_metadata={'input_tokens': 127, 'output_tokens': 26, 'total_tokens': 153, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]

我们可以返回任何一个状态节点,并从那个时候重新开始操作

all_states = []
for state in app.get_state_history(config):
    print(state)
    all_states.append(state)
    print("--")
to_replay = all_states[2]
to_replay.values
to_replay.next
#如果想从这个状态节点重播,只需这样
for event in app.stream(None, to_replay.config):
    for v in event.values():
        print(v)

for event in app.stream(None, to_replay.config):
    for v in event.values():
        print(v)
{'messages': [ToolMessage(content='成功在QQ音乐上播放了周杰伦!', name='play_song_on_qq', id='fa758237-ccab-470c-a26a-c342116c990e', tool_call_id='call_jXjBMwXhQdWGVwmzCG607Dg3')]}
{'messages': [AIMessage(content='我已经在QQ音乐上播放了周杰伦的歌曲!你可以去QQ音乐欣赏他的音乐。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 26, 'prompt_tokens': 127, 'total_tokens': 153, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-11-20', 'system_fingerprint': 'fp_ded0d14823', 'finish_reason': 'stop', 'logprobs': None}, id='run-7ddaa00e-a24f-4b2d-ac3e-f33db67a9618-0', usage_metadata={'input_tokens': 127, 'output_tokens': 26, 'total_tokens': 153, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}插入代码片

分叉操作
从某个节点开始操作,对执行数据进行分叉

# 修改最后一个消息的工具调用
# 我们将其从`play_song_on_qq`更改为`play_song_on_163`
last_message = to_replay.values["messages"][-1]
last_message.tool_calls[0]["name"] = "play_song_on_163"

branch_config = app.update_state(
    to_replay.config,
    {"messages": [last_message]},
)

此时整个图的流就进行了分叉处理

for event in app.stream(None, branch_config):
    for v in event.values():
        print(v)
{'messages': [ToolMessage(content='成功在网易云上播放了周杰伦!', name='play_song_on_163', id='cbbd78da-810b-473d-975e-e6a13e361074', tool_call_id='call_jXjBMwXhQdWGVwmzCG607Dg3')]}
{'messages': [AIMessage(content='我已经在网易云音乐上播放了周杰伦的歌曲!如果需要其他平台播放,请告诉我哦。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 27, 'prompt_tokens': 127, 'total_tokens': 154, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-11-20', 'system_fingerprint': 'fp_ded0d14823', 'finish_reason': 'stop', 'logprobs': None}, id='run-1e7b9931-6d84-4940-a065-66d4402f0241-0', usage_metadata={'input_tokens': 127, 'output_tokens': 27, 'total_tokens': 154, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}





#workflow = StateGraph(MessagesState)
state中只有一个key就是messages,在以上的流程流转中,他的信息如下,每经过一个节点,他都会被追加一个Message。

messages = [
# 人类输入的消息类型
    HumanMessage(content="播放周杰伦的晴天"),
    # AI调用工具是,会在它返回的AIMessage中标明要调用的工具
    AIMessage(                                 # ← 这里有 tool_calls
        content="",
        tool_calls=[{
            "name": "play_song_on_qq",
            "args": {"song": "晴天"},
            "id": "call_abc123"
        }]
    ),
    # 工具执行后,工具也会返回一个Message类型,表明工具调用的结果
    ToolMessage(                               # ← 这里没有 tool_calls,只有 tool_call_id
        content="成功在QQ音乐上播放了晴天!",
        tool_call_id="call_abc123"
    ),
    
    AIMessage(content="已经帮你播放了周杰伦的《晴天》~")   # 最终回答
]

有以下几个消息类型
在这里插入图片描述

流式输出

values / updates / debug / messages

from typing import TypedDict
from langgraph.graph import StateGraph, START


class State(TypedDict):
    topic: str
    joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + "和小狗"}


def generate_joke(state: State):
    return {"joke": f"这是一个关于{state['topic']}的笑话"}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)

// stream_mode=“values”

for chunk in graph.stream(
    {"topic": "冰激凌"},
    stream_mode="values",
):
    print(chunk)
{'topic': '冰激凌'}
{'topic': '冰激凌和小狗'}
{'topic': '冰激凌和小狗', 'joke': '这是一个关于冰激凌和小狗的笑话'}

// stream_mode=“updates”

for chunk in graph.stream(
    {"topic": "冰激凌"},
    stream_mode="updates",
):
    print(chunk)
{'refine_topic': {'topic': '冰激凌和小狗'}}
{'generate_joke': {'joke': '这是一个关于冰激凌和小狗的笑话'}}

// stream_mode=“debug”

for chunk in graph.stream(
    {"topic": "冰激凌"},
    stream_mode="debug",
):
    print(chunk)
{'type': 'task', 'timestamp': '2025-03-31T13:51:26.362608+00:00', 'step': 1, 'payload': {'id': '80703f54-1e33-beab-33a5-e919cdd2d07b', 'name': 'refine_topic', 'input': {'topic': '冰激凌'}, 'triggers': ('branch:to:refine_topic', 'start:refine_topic')}}
{'type': 'task_result', 'timestamp': '2025-03-31T13:51:26.362941+00:00', 'step': 1, 'payload': {'id': '80703f54-1e33-beab-33a5-e919cdd2d07b', 'name': 'refine_topic', 'error': None, 'result': [('topic', '冰激凌和小狗')], 'interrupts': []}}
{'type': 'task', 'timestamp': '2025-03-31T13:51:26.363213+00:00', 'step': 2, 'payload': {'id': '13543ef5-5006-ac7f-ebe2-21588197d3c0', 'name': 'generate_joke', 'input': {'topic': '冰激凌和小狗'}, 'triggers': ('branch:to:generate_joke', 'refine_topic')}}
{'type': 'task_result', 'timestamp': '2025-03-31T13:51:26.363800+00:00', 'step': 2, 'payload': {'id': '13543ef5-5006-ac7f-ebe2-21588197d3c0', 'name': 'generate_joke', 'error': None, 'result': [('joke', '这是一个关于冰激凌和小狗的笑话')], 'interrupts': []}}

// stream_mode=“messages”

from langchain_deepseek import ChatDeepSeek
import os

llm =  ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
)


def generate_joke(state: State):
    llm_response = llm.invoke(
        [
            {"role": "user", "content": f"生成一个关于 {state['topic']}的笑话"}
        ]
    )
    return {"joke": llm_response.content}


graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
)
for message_chunk, metadata in graph.stream(
    {"topic": "冰激凌"},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)
好的||这是一个关于|冰淇淋|和小狗的|搞笑||笑话|---

|**笑话||||的小狗|**

|夏天|,小明|买了一|支超|大的巧克力|冰淇淋,|正美|滋滋地|舔着||他家|的小狗||豆豆|”眼|巴巴地|坐在旁边|,尾巴||得像螺旋|桨。|  

小明|:“不行||,狗狗|不能|吃巧克力|!”  
|豆豆|一听|,突然|转身冲|进屋里|,叼||一张|**|狗狗||**|Dog|ecoin|)放在|小明脚|边。|  

小明|:“……|你这是|||||’买|冰淇淋?”||豆疯狂|点头,|还伸出|爪子指了指|冰淇淋上的|**|坚果碎|**||了一声||仿佛在|说:“||||加钱|!”  

|---

||笑点|:小狗|用虚拟|货币“|付款|”,|还挑剔|冰淇淋配料|😂||希望你喜欢||

工具调用

langgraph - 工具调用

  • 工具定义
  • 工具绑定
  • 工具调用
  • 工具执行

工具定义:

from langchain_core.messages import AIMessage
from langchain_core.tools import tool

from langgraph.prebuilt import ToolNode
@tool
def get_weather(location: str):
    """调用此函数获取当前天气。"""
    if location.lower() in ["北京", "深圳"]:
        return "现在是20度,有雾。"
    else:
        return "现在是10度,晴朗。"


@tool
def get_coolest_cities():
    """获取最冷城市列表"""
    return "哈尔滨,北京"
# ToolNode 是执行函数返回ToolMessage,其中包含执行的结果,给到大模型
tools = [get_weather, get_coolest_cities]
tool_node = ToolNode(tools)


langgraph提供了低层面的封装,可以直接手动执行工具
这个就是模仿模型返回的AIMessage调用

message_with_single_tool_call = AIMessage(
    content="",
    tool_calls=[
        {
            "name": "get_weather",
            "args": {"location": "北京"},
            "id": "tool_call_id",
            "type": "tool_call",
        }
    ],
)

tool_node.invoke({"messages": [message_with_single_tool_call]})
{'messages': [ToolMessage(content='现在是20度,有雾。', name='get_weather', tool_call_id='tool_call_id')]}

绑定工具

from typing import Literal

from langchain_deepseek import ChatDeepSeek
import os
from langgraph.graph import StateGraph, MessagesState
from langgraph.prebuilt import ToolNode


model_with_tools = ChatDeepSeek(
    model="Pro/deepseek-ai/DeepSeek-V3",
    temperature=0,
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url=os.environ.get("DEEPSEEK_API_BASE"),
).bind_tools(tools)
#它只会返回AIMessage
model_with_tools.invoke("深圳的天气如何?")
#返回如下:
AIMessage(
    content="", 
    tool_calls=[{"name": "get_weather", "args": {"location": "深圳"}, "id": "..."}]
)

执行工具,返回ToolMessage

#它会根据AIMessage去执行工具,然后返回ToolMessage,提交给大模型,由大模型来整合信息
tool_node.invoke({"messages": [model_with_tools.invoke("深圳的天气如何?")]})
{'messages': [ToolMessage(content='现在是20度,有雾。', name='get_weather', tool_call_id='0195ec88977d5c496aec00cee5374e89')]}

在ReAct智能体中执行

from typing import Literal

from langgraph.graph import StateGraph, MessagesState, START, END


def should_continue(state: MessagesState):
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return END


def call_model(state: MessagesState):
    messages = state["messages"]
    response = model_with_tools.invoke(messages)
    return {"messages": [response]}


workflow = StateGraph(MessagesState)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node) # Add the tool node to the graph

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, ["tools", END])
workflow.add_edge("tools", "agent")

app = workflow.compile()
from IPython.display import Image, display

display(Image(app.get_graph().draw_mermaid_png()))

tool_node中执行函数,返回ToolMessage,其中包含执行函数的结果,给大模型。
在这里插入图片描述

执行智能体

for chunk in app.stream(
    {"messages": [("human", "深圳的天气如何?")]}, stream_mode="values"
):
    chunk["messages"][-1].pretty_print()
================================ Human Message =================================

深圳的天气如何?
================================== Ai Message ==================================
Tool Calls:
  get_weather (0195ec88b01c87d55e3710b56e96db2f)
 Call ID: 0195ec88b01c87d55e3710b56e96db2f
  Args:
    location: 深圳
================================= Tool Message =================================
Name: get_weather

现在是20度,有雾。
================================== Ai Message ==================================

深圳现在的天气是20度,有雾。
for chunk in app.stream(
    {"messages": [("human", "最冷的城市天气如何?")]},
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()
================================ Human Message =================================

最冷的城市天气如何?
================================== Ai Message ==================================
Tool Calls:
  get_coolest_cities (0195ec88f0aff88ebff93ab71ea1aedb)
 Call ID: 0195ec88f0aff88ebff93ab71ea1aedb
  Args:
================================= Tool Message =================================
Name: get_coolest_cities

哈尔滨,北京
================================== Ai Message ==================================
Tool Calls:
  get_weather (0195ec890c413b1ddce24b57afda5992)
 Call ID: 0195ec890c413b1ddce24b57afda5992
  Args:
    location: 哈尔滨
  get_weather (0195ec890c410fb406bd2f0686535373)
 Call ID: 0195ec890c410fb406bd2f0686535373
  Args:
    location: 北京
================================= Tool Message =================================
Name: get_weather
...
最冷的城市天气如下:

- **哈尔滨**10度,晴朗。
- **北京**20度,有雾。

人机互动中的Graph
人机互动非常重要,比如在AI处理后需要向对方转一笔前,此时必须由任红审核一下,审核后再用Command(resume=“go to step 3!”)命令执行后面的操作。

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display


class State(TypedDict):
    input: str
    user_feedback: str


def step_1(state):
    print("---Step 1---")
    pass


def human_feedback(state):
    print("---human_feedback---")
    feedback = interrupt("Please provide feedback:")
    return {"user_feedback": feedback}


def step_3(state):
    print("---Step 3---")
    pass

'''
state = {
    "messages": [...]
}
其中messages是append类型的,每执行完一个节点,放进去一个索引-1就是最后一个
'''
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("human_feedback", human_feedback)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "human_feedback")
builder.add_edge("human_feedback", "step_3")
builder.add_edge("step_3", END)

# Set up memory
memory = MemorySaver()

# Add
graph = builder.compile(checkpointer=memory)

# View
display(Image(graph.get_graph().draw_mermaid_png()))

添加人类反馈

# 继续执行
for event in graph.stream(
    Command(resume="go to step 3!"), thread, stream_mode="updates"
):
    print(event)
    print("\n")
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐