** Langchain介绍**

Langchain是一个开源框架,用于开发由大语言模型(LLMs)驱动的应用程序。

LangChain 是一个帮助你构建 LLM 应用的全套工具集。这里涉及到prompt 构建、LLM 接入、记忆管理、工具调用、RAG、Agent开发等模块

使用场景

**Model I/O **

Model I/O 部分是与语言模型进行交互的核心组件,包括输入提示(Prompt Template)、调用模型(Model)、输出解析(Output Parser)。简单来说,就是输入、处理、输出这三个步骤。

1.1 OpenAI SDK 调用模型

OpenAI 的GPT 系列模型影响了大模型技术发展的开发范式和标准。所以无论是Qwen、ChatGLM 等模型,它们的使用方法和函数调用逻辑基本遵循OpenAI 定义的规范,没有太大差异。这就使得能够通过一个较为通用的接口来接入和使用不同的模型。

# pip install langchain langchain-openai
from openai import OpenAI

client = OpenAI(
    base_url="https://openrouter.ai/api/v1",  # 平台提供的 URL
    api_key="sk-...",  # 平台提供的 API-Key
)

completion = client.chat.completions.create(
    model="openai/gpt-oss-20b:free",  # 模型名称
    messages=[{"role": "user", "content": "将'你好'翻译成意大利语"}],  # 用户输入
)
print(completion.choices[0].message.content)

1.2 LangChain API 调用模型

通常通过聊天模型接口访问LLM,该接口通常以消息列表作为输入并返回一条消息作为输出。

输入:接受文本PromptValue 或消息列表List[BaseMessage],每条消息需指定角色(如System Message、HumanMessage、AIMessage)

输出:返回带角色的消息对象(BaseMessage 子类),通常是AIMessage

import os
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, HumanMessage, AIMessage

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

messages = [
    SystemMessage(content="你是一个诗人"),
    HumanMessage(content="写一首关于春天的诗"),
]
resp = llm.invoke(messages)
print(type(resp))  # <class 'langchain_core.messages.ai.AIMessage'>
print(resp.content)

初始化参数

参数 说明
model 模型名称或标识符
base_url 发送请求的 API 端点的 URL。常由模型的提供商提供
api_key 与模型提供商进行身份验证所需的 API 密钥
temperature 控制模型输出的随机性。数字越高,回答越有创意;数字越低,回答越确定
timeout 在取消请求之前,等待模型响应的最大时间(以秒为单位)
max_tokens 限制响应中的总tokens 数量,控制输出长度
max_retries 请求失败时系统尝试重新发送请求的最大次数

消息类型参数

消息类型 描述
SystemMessage 代表一组初始指令,用于引导模型的行为。可以使用系统消息来设定语气、定义模型的角色,并建立响应的指导方针
HumanMessage 表示用户输入
AIMessage 模型生成的响应,包括文本内容、工具调用和元数据
ToolMessage 表示工具调用的输出

调用方法

invoke / ainvoke 将单个输入转换为输出
batch / abatch 批量将多个输入转换为输出
stream / astream 从单个输入生成流式输出
1.2.1 非流式/流式输出

在Langchain中,语言模型的输出分为了两种主要的模式:流式输出非流式输出

  • **非流式输出:**用户提出需求请编写一首诗,系统在静默数秒后突然弹出了完整的诗歌。如同一种“提交请求,等待结果”的流程,实现简单,但体验单调。

这是LangChain 与LLM 交互时的默认行为,是最简单、最稳定的语言模型调用方式。当用户发出请求后,系统在后台等待模型生成完整响应,然后一次性将全部结果返回。

import os
from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

messages = [
    {"role": "system", "content": "你是一名数学家"},
    {"role": "user", "content": "请证明以下黎曼猜想"},
]
resp = llm.invoke(messages)
print(resp.content)

  • 流式输出:用户提问,请编写一首诗,当问题刚刚发送,系统就开始一字一句(逐个token)进行回复,更像是“实时对话”,贴近人类交互的习惯。

流式输出是一种更具交互感的模型输出方式,用户不再需要等待完整答案,而是能看到模型逐个 token地实时返回内容。适合构建强调“实时反馈”的应用。

import os
from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

messages = [
    {"role": "system", "content": "你是一名数学家"},
    {"role": "user", "content": "请证明以下黎曼猜想"},
]
# 使用 stream() 方法流式输出
for chunk in llm.stream(messages):
    # 逐个打印内容块,并刷新缓冲区以即时显示内容
    print(chunk.content, end="", flush=True)
** 1.2.2 批量调用**

将一组独立的请求批量发送给模型并行处理。

batch 默认没有依赖底层 API 的原生批量接口,而是使用线程池并行执行多个 invoke()。所以它对 IO 密集型任务(如调用远程 LLM API)很有效。

import os
from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

messages = [
    [
        {"role": "system", "content": "你是一位诗人"},
        {"role": "user", "content": "写一首关于春天的诗"},
    ],
    [
        {"role": "system", "content": "你是一位诗人"},
        {"role": "user", "content": "写一首关于夏天的诗"},
    ],
    [
        {"role": "system", "content": "你是一位诗人"},
        {"role": "user", "content": "写一首关于秋天的诗"},
    ],
]
resp = llm.batch(messages)  # 批量调用,返回一个消息列表
print(resp)
1.2.3 同步/异步调用
  • **同步调用 : **每个操作依次执行,直到当前操作完成后才开始下一个操作,总的执行时间是各个操作时间的总和。
  • **异步调用 : **允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞等待。这在处理I/O 操作(如网络请求、文件读写等)时特别有用,可以显著提高程序的效率和响应性。
import os
import time
import asyncio
from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

messagess = [
    [
        {"role": "system", "content": "你是一位诗人"},
        {"role": "user", "content": "写一首关于春天的诗"},
    ],
    [
        {"role": "system", "content": "你是一位诗人"},
        {"role": "user", "content": "写一首关于夏天的诗"},
    ],
    [
        {"role": "system", "content": "你是一位诗人"},
        {"role": "user", "content": "写一首关于秋天的诗"},
    ],
]

async def async_invoke():
    tasks = [llm.ainvoke(messages) for messages in messagess]
    return await asyncio.gather(*tasks)

start_time = time.time()

resps = asyncio.run(async_invoke())
print(resps)

end_time = time.time()
print(f"Total time: {end_time - start_time}")

1.3 调用本地模型

1.3.1 Ollama

Ollama是一个开源项目,其项目定位是:一个本地运行大模型的集成框架。目前主要针对主流的LlaMA架构的开源大模型设计,可以实现如 Qwen、Deepseek 等主流大模型的下载、启动和本地运行的自动化部署及推理流程。

# pip install langchain-ollama
from langchain_ollama import ChatOllama

ollama_llm = ChatOllama(
    model="deepseek-r1:7b"
    # base_url="http://your-ip:port",  #  不在本地端口运行,自定义地址
)
messages = {"role": "user", "content": "你好,请介绍一下你自己"}
resp = ollama_llm.invoke(messages)
print(resp.content)

1.4 Prompt Template

1.4.1 Prompt Template

在应用开发中,固定的提示词限制了模型的灵活性和适用范围。所以,Prompt Template 是一个模板化的字符串,我们可以将变量插入到模板中,从而创建出不同的提示。Prompt Template 接收用户输入,返回一个传递给LLM的信息(即提示词prompt)。

提示模板以字典作为输入,其中每个键代表要填充的提示模板中的变量。并输出一个 PromptValue。这个 PromptValue 可以传递给聊天模型,也可以转换为字符串或消息列表。PromptValue 存在的目的是为了方便在字符串和消息之间切换。

from langchain_core.prompts import PromptTemplate

# 使用构造方法实例化提示词模板
template = PromptTemplate(
    template="请评价{product}的优缺点,包括{aspect1}和{aspect2}。",
    input_variables=["product", "aspect1", "aspect2"],
)

# 使用模板生成提示词
prompt_1 = template.format(product="智能手机", aspect1="电池续航", aspect2="拍照质量")
prompt_2 = template.format(product="笔记本电脑", aspect1="处理速度", aspect2="便携性")

print(prompt_1)  # 请评价智能手机的优缺点,包括电池续航和拍照质量。
print(prompt_2)  # 请评价笔记本电脑的优缺点,包括处理速度和便携性。
参数
template 提示模板,包括变量占位符
input_variables 需要将其值作为提示输入的变量名称列表
partial_variables 提示模板携带的部分变量的字典。使用部分变量预先填充模板,无需后续在每次调用时再传递这些变量
方法
format() 使用输入格式化提示
from langchain_core.prompts import PromptTemplate

# 使用 from_template 方法实例化提示词模板
template = PromptTemplate.from_template("请给我一个关于{topic}的{type}解释。")

# 使用模板生成提示
prompt = template.format(type="详细", topic="量子力学")

print(prompt)  # 请给我一个关于量子力学的详细解释。
from langchain_core.prompts import PromptTemplate

template = PromptTemplate.from_template("{foo} {bar}")

prompt = template.invoke({"foo": "hello", "bar": "world"})

print(prompt, type(prompt))
# text='hello world' <class 'langchain_core.prompt_values.StringPromptValue'>

prompt_str = prompt.to_string()
print(prompt_str, type(prompt_str))
# hello world <class 'str'>
1.4.2 ChatPromptTemplate

ChatPromptTemplate是创建_聊天消息列表_的提示模板。相较于普通 PromptTemplate更适合处理多角色、多轮次的对话场景。支持_System_/Human/_AI_等不同角色的消息模板。

实例化时需要传入 messages 参数,messages 参数支持如下格式:

  • tuple 构成的列表,格式为[(role, content)]
  • dict 构成的列表,格式为[{“role”:… , “content”:…}]
  • Message 类构成的列表
from langchain_core.prompts import ChatPromptTemplate

template = ChatPromptTemplate(
    [
        ("system", "你是一个AI开发工程师,你的名字是{name}。"),
        ("human", "你能帮我做什么?"),
        ("ai", "我能开发很多{thing}。"),
        ("human", "{user_input}"),
    ]
)

prompt = template.format_messages(name="小谷AI", thing="AI", user_input="行")
print(prompt)
# [
# SystemMessage(content="你是一个AI开发工程师,你的名字是小谷AI。",...),
# HumanMessage(content="你能帮我做什么?", ...),
# AIMessage(content="我能开发很多AI。", ...),
# HumanMessage(content="行", ...),
# ]

多模态提示词

可以使用提示模板来格式化多模态输入,比如将图片链接作为输入。

import os
from langchain.chat_models import init_chat_model
from langchain_core.prompts import ChatPromptTemplate

llm = init_chat_model(
    model="google/gemini-2.0-flash-exp:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

template = ChatPromptTemplate(
    [
        {"role": "system", "content": "用中文简短描述图片内容"},
        {"role": "user", "content": [{"image_url": "{image_url}"}]},
    ]
)

prompt = template.format_messages(
    image_url="https://img2.baidu.com/it/u=2976763563,2523722948&fm=253&app=138&f=JPEG?w=800&h=1200"
)

resp = llm.invoke(prompt)
print(resp.content)  # 图片中是...
1.4.3 外部加载Prompt

可以将prompt 保存为JSON 或者YAML 等格式的文件,通过读取指定路径的格式化文件,获取相应的prompt。这样方便对prompt 进行管理和维护。

prompts目录下创建json文件:prompt.json

{
    "_type": "prompt",
    "input_variables": ["name", "what"],
    "template": "请{name}讲一个{what}的故事"
}
from langchain_core.prompts import load_prompt

template = load_prompt("prompts/prompt.json", encoding="utf-8")
print(template.format(name="张三", what="搞笑的"))
# 请张三讲一个搞笑的的故事

prompts目录下创建yaml文件:prompt.yaml

_type: "prompt"
input_variables: ["name", "what"]
template: "请{name}讲一个{what}的故事"
from langchain_core.prompts import load_prompt

template = load_prompt("prompts/prompt.yaml", encoding="utf-8")
print(template.format(name="年轻人", what="滑稽"))
# 请年轻人讲一个滑稽的故事

1.5 Output Parsers

在应用开发中,大模型的输出可能是下一步逻辑处理的关键输入。因此,在这种情况下,_规范化输出_是必须要做的任务,以确保应用能够顺利进行后续的逻辑处理。

语言模型返回的内容通常都是文本字符串,而实际AI 应用开发过程中有时希望模型可以返回更直观、更格式化的内容,LangChain 提供了输出解析器(Output Parser)将模型输出解析为结构化数据。

有多种类型的输出解析器,常用的有_StrOutputParser_(字符串解析器)与_JsonOutputParser_(JSON解析器)。

1.5.1 StrOutputParser

StrOutputParser 是一个简单的解析器,从结果中提取content 字段。

import os
from langchain.chat_models import init_chat_model
from langchain_core.output_parsers import StrOutputParser

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

messages = [
    {"role": "system", "content": "你是一个机器人"},
    {"role": "user", "content": "你好"},
]

resp = llm.invoke(messages)
print(resp)  # content='你好!有什么我可以帮忙的吗?' additional_kwargs=......

str_resp = StrOutputParser().invoke(resp)
print(str_resp)  # 你好!有什么我可以帮忙的吗?
1.5.2 JsonOutputParser

JSON 解析器用于将大模型的_自由文本输出_转换为_结构化__JSON__数据_的工具。特别适用于需要严格结构化输出的场景,比如API 调用、数据存储或下游任务处理。

JsonOutputParser 能够结合Pydantic 模型进行数据验证,自动验证字段类型和内容(如字符串、数字、嵌套对象等)

使用get_format_instructions() 获取JSON解析的格式化指令:

import os
from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model
from langchain_core.output_parsers import JsonOutputParser

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

class Prime(BaseModel):
    prime: list[int] = Field(description="素数")
    count: list[int] = Field(description="小于该素数的素数个数")

json_parser = JsonOutputParser(pydantic_object=Prime)
messages = [
    {"role": "system", "content": json_parser.get_format_instructions()},
    {
        "role": "user",
        "content": "任意生成5个1000-100000之间素数,并标出小于该素数的素数个数",
    },
]

resp = llm.invoke(messages)
json_resp = json_parser.invoke(resp)
print(json_resp)
# {'prime': [1009, 2003, 3001, 4001, 5003], 'count': [168, 303, 430, 584, 669]}

1.6 Structured Outputs

可以要求模型按照给定的模式格式提供其响应,这有助于确保输出可以被轻松解析并在后续处理中使用。LangChain 支持多种模式类型和强制结构化输出的方法。

1.6.1 TypedDict

TypedDict 提供了一个使用 Python 内置类型的简单方案,但是没有验证功能。

import os
from typing import TypedDict, Annotated
from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

class Animal(TypedDict):
    animal: Annotated[str, "动物"]
    emoji: Annotated[str, "表情"]

class AnimalList(TypedDict):
    animals: Annotated[list[Animal], "动物与表情列表"]

messages = [{"role": "user", "content": "任意生成三种动物,以及他们的 emoji 表情"}]

llm_with_structured_output = llm.with_structured_output(AnimalList)
resp = llm_with_structured_output.invoke(messages)
print(resp)
# {'animals': [{'animal': '猫', 'emoji': '🐱'}, {'animal': '老虎', 'emoji': '🐯'}, {'animal': '企鹅', 'emoji': '🐧'}]}

Annotated是typing模块提供的增强型类型注解工具,python原生语法,核心公式:

Annotated[类型,元数据1,元数据2...]
# Annotated[int,"年龄,范围0-150"]

核心作用:

  • 给变量的基础类型附加任意的元数据(额外信息)
  • 基础类型是必须遵守的约束,元数据是对这个字段的补充说明
  • 元数据可以是任意内容:字符串、数字、函数…
1.6.2 Pydantic

Pydantic 模型提供了丰富的功能集,包括字段验证、描述和嵌套结构。

from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

class Animal(BaseModel):
    animal: str = Field(description="动物")
    emoji: str = Field(description="表情")

class AnimalList(BaseModel):
    animals: list[Animal] = Field(description="动物与表情列表")

messages = [{"role": "user", "content": "任意生成三种动物,以及他们的 emoji 表情"}]

llm_with_structured_output = llm.with_structured_output(AnimalList)
resp = llm_with_structured_output.invoke(messages)
print(resp)
# animals=[Animal(animal='猫', emoji='🐱'), Animal(animal='乌龟', emoji='🐢'), Animal(animal='企鹅', emoji='🐧')]
1.6.3 JSON Schema

若需最大程度的控制或互操作性,可以提供一个原始的 JSON Schema。可以将原始响应与解析后的表示一起返回,可在调用 with_structured_output 时设置 include_raw=True 来实现。

from langchain.chat_models import init_chat_model

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

schema = {
    "name": "animal_list",
    "schema": {
        "type": "array",
        "items": {
            "type": "object",
            "properties": {
                "animal": {"type": "string", "description": "动物名称"},
                "emoji": {"type": "string", "description": "动物的emoji表情"},
            },
            "required": ["animal", "emoji"],
        },
    },
}

messages = [{"role": "user", "content": "任意生成三种动物,以及他们的 emoji 表情"}]

llm_with_structured_output = llm.with_structured_output(
    schema, method="json_schema", include_raw=True
)
resp = llm_with_structured_output.invoke(messages)
print(resp)
print(resp["raw"])
print(resp["parsed"])
# [{'animal': '企鹅', 'emoji': '🐧'}, {'animal': '大象', 'emoji': '🐘'}, {'animal': '袋鼠', 'emoji': '🦘'}]

Chains

2.1 Runnable与LCEL

1)Runnable

Runnable 是LangChain 中可以调用、批处理、流式传输、转换和组合的工作单元。

Runnable 接口是使用 LangChain 组件的基础,它在许多组件中实现,例如语言模型、输出解析器、检索器、编译的 LangGraph 图等。

Runnable 接口_强制要求_所有LCEL 组件实现一组标准方法:

invoke / ainvoke 将单个输入转换为输出
batch / abatch 批量将多个输入转换为输出
stream / astream 从单个输入生成流式输出

_Runnable_统一调用方式

# 分步调用
prompt_text = prompt.invoke({"topic": "猫"})  # 方法1
model_out = model.invoke(prompt_text)  # 方法2
result = parser.invoke(model_out)  # 方法3

# LCEL管道式
chain = prompt | model | parser  # 用管道符组合
result = chain.invoke({"topic": "猫"})  # 所有组件统一用invoke

2)LCEL(语法糖)

LangChain 表达式语言(LCEL,LangChain Expression Language)是一种从现有的Runnable 构建新的Runnable 的声明式方法,用于声明、组合和执行各种组件(模型、提示、工具、函数等)。

我们称使用LCEL 创建的Runnable 为“链”,“链”本身就是Runnable。

LCEL 两个主要的组合原语是RunnableSequence 和 RunnableParallel。许多其他组合原语可以被认为是这两个原语的变体。

2.2 RunnableSequence 可运行序列

RunnableSequence 按顺序“链接”多个可运行对象,其中一个对象的输出作为下一个对象的输入。

LCEL重载了 | 运算符,以便从两个 Runnables 创建 RunnableSequence。

chain = runnable1 | runnable2
# 等同于
chain = RunnableSequence(runnable1, runnable2)
2.3 RunnableParallel 可运行并行

RunnableParallel 同时运行多个可运行对象,并为每个对象提供相同的输入。

对于同步执行,RunnableParallel 使用 ThreadPoolExecutor 来同时运行可运行对象。

对于异步执行,RunnableParallel 使用 asyncio.gather 来同时运行可运行对象。

在 LCEL 表达式中,字典会自动转换为 RunnableParallel。

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableParallel
import dotenv
dotenv.load_dotenv()

llm = ChatOpenAI(
    model='qwen-vl-plus'
)

joke_chain=(
    PromptTemplate.from_template('讲一个关于{topic}的笑话') | llm | StrOutputParser()
)
poem_chain=(
    PromptTemplate.from_template('写一首关于{topic}的情诗') | llm | StrOutputParser()
)

chain=RunnableParallel(joke=joke_chain,poem=poem_chain)

res=chain.invoke({"topic":"菊花"})
print(res)
2.4 RunnableLambda 可运行λ

RunnableLambda 将 Python 可调用函数转换为 Runnable,使得函数可以在同步或异步上下文中使用。

from langchain_core.runnables import RunnableLambda

chain = {
    "text1": lambda x: x + " world",
    "text2": lambda x: x + ", how are you",
} | RunnableLambda(lambda x: len(x["text1"]) + len(x["text2"]))

result = chain.invoke("hello")
print(result)  # 29
2.5 RunnablePassthrough 可运行透传

RunnablePassthrough 接收输入并将其原样输出。RunnablePassthrough 是 LangChain LCEL 体系中的“无操作节点”,用于在流水线中透传输入或保留上下文,也可以用于向输出中添加键。

from langchain_core.runnables import RunnablePassthrough, RunnableParallel

chain = RunnableParallel(
    original=RunnablePassthrough(),  # 保留中间结果
    word_count=lambda x: len(x),
)

result = chain.invoke("hello world")
print(result)  # {'original': 'hello world', 'word_count': 11}
2.6 RunnableBranch 可运行分支

RunnableBranch 使用 (条件,Runnable) 对列表和默认分支进行初始化。对输入进行操作时,选择第一个计算结果为 True 的条件,并在输入上运行相应的 Runnable。如果没有条件为 True,则在输入上运行默认分支。

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: isinstance(x, str), lambda x: x.upper()),
    (lambda x: isinstance(x, int), lambda x: x + 1),
    (lambda x: isinstance(x, float), lambda x: x * 2),
    lambda x: "goodbye",
)

result = branch.invoke("hello")
print(result)  # HELLO

result = branch.invoke(None)
print(result)  # goodbye
2.7 RunnableWithFallbacks 可运行带回退

RunnableWithFallbacks 使得 Runnable 失败后可以回退到其他 Runnable。可以直接在Runnable 上使用with_fallbacks 方法。

from langchain.chat_models import init_chat_model
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda

llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

chain = PromptTemplate.from_template("hello") | llm
chain_with_fallback = chain.with_fallbacks([RunnableLambda(lambda x: "sorry")])

result = chain_with_fallback.invoke("1")  # 提示词模板中没有需要填充的变量,会报错
print(result)  # sorry

Retrieval

3.1RAG
3.1.1 RAG介绍

为了改善大模型在时效性、可靠性与准确性方面的不足,各种针对 LLM 优化的方法应运而生。RAG(Retrieval-Augmented Generation,检索增强生成)就是其中一种被广泛研究和应用的优化架构。

RAG 的基本思想为:将传统的生成式大模型和实时信息检索技术相结合,为大模型补充来自外部的相关数据和上下文,来帮助大模型生成更加准确可靠的内容。这使得大模型在生成内容时可以依赖实时与个性化的数据和知识,而非仅仅依赖训练知识。就相当于在大模型回答时给它一本参考书。

3.1.2 RAG流程

典型的RAG有两个主要流程:

  • 索引:从数据源提取数据,构建索引。
  • 检索生成:接受用户查询并从索引中检索相关数据,然后将其传递给模型。。
  1. 索引阶段:
  • 从各种数据源加载数据️
  • 将文档切分为小块️
  • 对文本块进行嵌入️
  • 存储嵌入向量。

  1. 检索生成阶段:
  • 根据用户输入,使用检索器从存储中检索相关文本块➡️
  • 大模型使用包含问题和检索结果的提示生成回答。

3.2 文档加载

数据源可能包含多种格式的文件,如文本文档、Markdown,PDF 等。因此我们首先需要对各种格式的文件进行处理。LangChain 实现和集成了众多文档加载器,方便从不同格式的文件中加载数据。

LangChain 所有文档加载器都实现了 BaseLoader 接口,接口提供了通用的 load(一次加载所有文档)与lazy_load(以延迟方式加载文档)方法,用于从数据源加载数据并处理为 Document 对象。

LangChain 实现了 Document 抽象,用于表示文本单元及其元数据,它包含三个属性:

- page_content:文本内容字符串。
- metadata:包含元数据的字典,如文档的来源等。
- id:可选,文档标识符。
3.2.1 加载TXT
# pip install langchain_community
from langchain_community.document_loaders import TextLoader

docs = TextLoader(
    file_path="assets/sample.txt",  # 文件路径
    encoding="utf-8",  # 文件编码方式
).load()  # 返回List[Document]

print(docs)
# [Document(metadata={'source': 'asset/sample.txt'}, page_content='...')]
3.2.2 加载CSV
from langchain_community.document_loaders.csv_loader import CSVLoader

# 加载所有列
docs = CSVLoader(
    file_path="assets/sample.csv",  # 文件路径
).load()  # 返回List[Document]

print(docs)

# 加载部分列
docs = CSVLoader(
    file_path="assets/sample.csv",  # 文件路径
    metadata_columns=["title", "author"],  # 将指定列作为元数据
    content_columns=["content"],  # 将指定列作为内容
).load()  # 返回List[Document]

print(docs)
3.2.3 加载JSON

LangChain 实现了 JSONLoader,用来将 JSON 和 JSONL 数据转换为 LangChain 文档对象。它使用指定的 jq 模式来解析 JSON 文件,从而将特定字段提取到 LangChain 文档的内容和元数据中。

如果要从 JSON Lines 文件加载文档,需传递 json_lines=True。

from langchain_community.document_loaders import JSONLoader

# 提取所有字段
docs = JSONLoader(
    file_path="assets/sample.json",  # 文件路径
    jq_schema=".",  # 提取所有字段
    # jq_schema=".data.items[]",  # 提取data.items中的数据
    text_content=False,  # 提取内容是否为字符串格式
).load()

print(docs)
3.2.4 加载HTML 网页
import bs4
from langchain_community.document_loaders import WebBaseLoader

docs = WebBaseLoader(
    # 网址序列
    web_paths=("https://baike.baidu.com/item/%E5%BE%AE%E6%B3%A2%E7%82%89/84186",),
    # 传给 BeautifulSoup 的解析参数,parse_only 表示只提取指定标签的元素
    bs_kwargs={"parse_only": bs4.SoupStrainer(class_="J-lemma-content")},
).load()

print(docs)
3.2.5 加载Markdown

可以使用Unstructured 文档加载器来加载多种类型的文件。可使用 UnstructuredMarkdownLoader 加载 Markdown 文件,需要unstructured 包。

# pip install langchain_community unstructured[md]
from langchain_community.document_loaders import UnstructuredMarkdownLoader

docs = UnstructuredMarkdownLoader(
    # 文件路径
    file_path="assets/sample.md",
    # 加载模式:
    #   single 返回单个Document对象
    #   elements 按标题等元素切分文档
    mode="elements",
).load()

print(docs)
3.2.6 加载Doc/Docx
# pip install langchain_community unstructured[docx]
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

docs = UnstructuredWordDocumentLoader(
    # 文件路径
    file_path="assets/sample.docx",
    # 加载模式:
    #   single 返回单个Document对象
    #   elements 按标题等元素切分文档
    mode="single",
).load()

print(docs)
3.2.7 加载PDF

1)PyPDFLoader

# pip install langchain_community
from langchain_community.document_loaders import PyPDFLoader

docs = PyPDFLoader(
    # 文件路径,支持本地文件和在线文件链接,如"https://arxiv.org/pdf/alg-geom/9202012"
    file_path="assets/sample.pdf",
    # 提取模式:
    #   plain 提取文本
    #   layout 按布局提取
    extraction_mode="plain",
).load()
print(docs)

2)UnstructuredPDFLoader

UnstructuredPDFLoader是对 unstructured 库的封装。支持布局识别与 OCR 提取文字。使用UnstructuredPDFLoader,需要先下载PopplerTesseract OCR

# pip install unstructured[local-inference]
from langchain_community.document_loaders import UnstructuredPDFLoader

docs = UnstructuredPDFLoader(
    file_path="assets/sample.pdf",  # 文件路径
    # 加载模式:
    #   single: 返回单个Document对象
    #   elements: 按标题等元素切分文档
    mode="elements",
    # 加载策略:
    #   fast: pdfminer 提取并处理文本
    #   ocr_only: 转换为图片并进行 OCR
    #   hi_res: 识别文档布局,将OCR 输出与 pdfminer 输出融合
    strategy="hi_res",
    # 推断表格结构:仅 hi_res 下起效,如果为 True 则会在表格元素的元数据中添加 text_as_html
    infer_table_structure=True,
    # OCR 使用的语言: eng 英文,chi_sim 中文简体。语言列表参考 https://github.com/tesseract-ocr/langdata
    languages=["eng", "chi_sim"],
    # 更多参数详见 https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/partition/pdf.py
).load()

print(docs)
3.3 文档切分
3.3.1 切分策略
  1. 按照固定字符数或Token 数来切分,但可能会在不适当的位置切断句子。
  2. 递归使用多个分隔符**切分,**同时尽量保证字符数或 Token 数不超出限制。能保证不切断完整的句子。
  3. **语义切分:**根据文本的语义内容切分,旨在保持相关信息的集中和完整,适用于需要高度语义保持的场景。但处理速度较慢,且可能出现不同块之间长度极不均衡的情况。具体切分过程为:将相邻的几个句子拼成一个句组。对所有句组进行嵌入,并比较嵌入向量的距离,找到语义变化大的位置,根据阈值确定切分点(比如计算相邻句子嵌入向量的余弦距离,取距离分布的第N 百分位值作为阈值,高于此值则切分)。按照切分点切分出若干个语义段,并合并某些长度很短的语义段。
3.3.2 RecursiveCharacterTextSplitter

RecursiveCharacterTextSplitte_(递归字符文本切分器)是最常用的切分器_,它由一个字符列表作为参数,默认列表为 [“\n\n”, “\n”, " ", “”],并且会尝试按顺序使用这些字符进行切分,直到块足够小。由此尽可能地将所有段落(然后是句子,最后是词)保持在一起,因为这些段落通常看起来是语义上最相关的文本片段。

同时为了保证段之间语义完整,可以设置每个块之间有一部分重叠。

# pip install langchain-text-splitters
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

# 加载文档
docs = UnstructuredWordDocumentLoader(
    file_path="assets/sample.docx", mode="single"
).load()

# 切分为文本块
chunks = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],  # 分隔符列表
    chunk_size=400,  # 每个块的最大长度
    chunk_overlap=50,  # 每个块重叠的长度
    length_function=len,  # 可选:计算文本长度的函数,默认为字符串长度,可自定义函数来实现按 token 数切分
    add_start_index=True,  # 可选:块的元数据中添加此块起始索引
).split_documents(docs)

print(chunks)
3.4 文档嵌入

使用嵌入模型生成文档的嵌入向量,后续检索时用于与查询的嵌入向量进行相似度计算。

常用嵌入模型:

模型 机构 描述
bge-large-zh 北京智源研究院(BAAI) 开源,向量维度1024,序列长度512
bge-base-zh BAAI 开源,向量维度768,序列长度512
bge-small-zh BAAI 开源,向量维度512,序列长度512
bge-m3 BAAI 开源,多语言,向量维度1024,序列长度8192
text-embedding-3-small OpenAI 多语言,向量维度1536,序列长度8192
text-embedding-3-large OpenAI 多语言,向量维度3072,序列长度8192
# pip install sentence-transformers langchain_huggingface
import os
from langchain_huggingface import HuggingFaceEmbeddings

# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
    model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)

# 单文本嵌入
query = "你好,世界"
print(embed_model.embed_query(query))

# 多文本嵌入
docs = ["你好,世界", "你好,世界"]
print(embed_model.embed_documents(docs))
3.5 向量存储
3.5.1常用的向量数据库

LangChain提供了众多向量存储的集成,包括开源的_本地向量存储_与_云托管_的私有向量存储。并公开了一个标准接口,可以轻松地在向量存储之间进行交换。

向量数据库 描述
FAISS 一个用于高效相似性搜索和密集向量聚类的库
Chroma 开源的轻量级向量数据库,有极简的 API
Milvus 开源的专为向量搜索设计的云原生数据库。性能强悍,功能丰富。覆盖轻量级的原型开发到十亿级向量的大规模生产系统
Pgvector 开源关系型数据库PostgreSQL 的扩展,为PostgreSQL增加了向量数据类型和相似性搜索功能
Redis 开源内存数据结构存储,现已原生支持向量相似性搜索功能
Elasticsearch 开源分布式搜索和分析引擎,提供了一个基于文档的数据库,结构化、非结构化和向量数据通过高效的列式存储统一管理
3.5.2 Milvus

Milvus 通过数据库—Collections—实体的结构管理数据。Collections 和实体就类似关系型数据库中的表和记录。具体来说,Collection 是一个二维表,具有固定的列和变化的行。每列代表一个字段,每行代表一个实体。

Collection 通过Collection Schema 来定义有哪些字段以及字段的类型、索引等。一个 Collection Schema 有一个主键、最多四个向量字段和若干标量字段。

主键用于唯一标识一个实体,只接受Int64 或VarChar 值。插入实体时,默认情况下应包含主键值。但是,如果在创建 Collections 时启用了AutoId,Milvus 将在插入数据时生成主键值,此时插入的实体中不应包含主键值。

向量字段用于存储文本、图像和音频等非结构化数据类型的嵌入,可以是密集向量、稀疏向量或二进制向量。通常,密集向量用于语义搜索,而稀疏向量则更适合全文或词性匹配。

标量字段通常用来存储一些元数据,并可以在搜索时通过元数据进行过滤,以提高搜索结果的正确性。

字段类型 字段 描述
向量字段 密集向量 FLOAT_VECTOR 32位浮点数列表
FLOAT16_VECTOR 16位半精度浮点数列表
BFLOAT16_VECTOR 16位浮点数列表,精度稍低,但指数范围与 Float32 相同
INT8_VECTOR 8位有符号整数向量
稀疏向量 SPARSE_FLOAT_VECTOR 非零数字及其序列号列表
二进制向量 BINARY_VECTOR 一个0和1的列表
标量字段 VARCHAR 字符串
BOOL 存储true或false
INT INT8、INT16、INT32、INT64
FLOAT 32位浮点数
DOUBLE 64位双精度浮点数
ARRAY 相同数据类型元素的有序集合
JSON 结构化的键值数据

索引是建立在数据之上的附加结构,可以加快搜索速度。不同字段数据类型适用不同的索引类型。比如FLOAT_VECTOR 可使用 HNSW(分层导航小世界)索引,VARCHAR 可使用INVERTED(反转)索引。

3.5.3 创建Collection
# pip install pymilvus[milvus_lite]
from pprint import pprint
from pymilvus import MilvusClient, DataType

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 创建 schema
def build_schema():
    return (
        MilvusClient.create_schema(
            # 自动分配主键
            auto_id=True,
            # 启用动态字段,未在 Schema 中声明的字段会以键值对的形式存储在这个动态字段
            enable_dynamic_field=True,
        )
        # 添加 id 字段,类型为整数,设置为主键
        .add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        # 添加 vector 字段,类型为浮点数向量,维度为 768
        .add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
        # 添加 text 字段,类型为字符串,最大长度为 1024
        .add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1024)
        # 添加 metadata 字段,类型为 JSON
        .add_field(field_name="metadata", datatype=DataType.JSON)
    )

# 创建 index
def build_index():
    index_params = MilvusClient.prepare_index_params()
    index_params.add_index(
        field_name="vector",  # 建立索引的字段
        index_type="AUTOINDEX",  # 索引类型
        metric_type="L2",  # 向量相似度度量方式
    )
    return index_params

# 创建 collection
if client.has_collection(collection_name="demo_collection"):
    # 删除 collection
    # 在 Milvus 中删除数据后,存储空间不会立即释放。虽然删除数据会将实体标记为 "逻辑删除",但实际空间可能不会立即释放。
    # Milvus 会在后台自动压缩数据。这个过程会将较小的数据段合并为较大的数据段,并删除"逻辑删除"的数据或已超过有效时间的数据。
    # 一个名为 Garbage Collection (GC) 的独立进程会定期删除这些 "已删除 "的数据段,从而释放它们占用的存储空间。
    client.drop_collection(collection_name="demo_collection")
client.create_collection(
    collection_name="demo_collection",  # collection 名称
    schema=build_schema(),  # collection 的 schema
    index_params=build_index(),  # collection 的 index
)

# 查看 collection
print(client.list_collections())

# 查看 collection 描述
pprint(client.describe_collection(collection_name="demo_collection"))
3.5.4 操作实体

1)插入实体

from pymilvus import MilvusClient
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 加载文档
docs = UnstructuredWordDocumentLoader(
    file_path="assets/sample.docx",
    mode="single",
).load()

# 文档切分
chunks = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],
    chunk_size=400,
    chunk_overlap=50,
).split_documents(docs)

# 加载嵌入模型
embed_model = HuggingFaceEmbeddings(
    model_name=os.path.expanduser("~/models/bge-base-zh-v1.5")
)

# 计算嵌入向量
embeddings = embed_model.embed_documents([chunk.page_content for chunk in chunks])

# 转换数据格式
data = [
    {
        "vector": embedding,
        "text": chunk.page_content,
        "metadata": chunk.metadata,
    }
    for chunk, embedding in zip(chunks, embeddings)
]

# 插入实体
res = client.insert(collection_name="demo_collection", data=data)
print(res)

2)查询实体

from pymilvus import MilvusClient

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 通过主键查询实体
res = client.get(
    collection_name="demo_collection",
    ids=[461484610130804912, 461484610130804913],
    output_fields=["text", "metadata"],
)
print(res)

# 通过过滤条件(https://milvus.io/docs/zh/boolean.md)查询实体
res = client.query(
    collection_name="demo_collection",
    filter='metadata["source"] == "assets/sample.docx"',  # 使用 metadata["source"] 进行过滤
    output_fields=["text", "metadata"],
    limit=1,
)
print(res)

3)删除实体

from pymilvus import MilvusClient

# 实例化向量数据库客户端
client = MilvusClient(
    uri="./milvus_demo.db",  # 数据存储在本地当前目录下
)

# 通过主键删除实体
res = client.delete(
    collection_name="demo_collection",
    ids=[461484610130804912, 461484610130804913],
)
print(res)

# 通过过滤条件(https://milvus.io/docs/zh/boolean.md)删除实体
res = client.delete(
    collection_name="demo_collection",
    filter='text LIKE "第%"',  # 使用 text 前缀过滤
)
print(res)
3.6 检索与生成
3.6.1 检索

检索阶段:用户输入查询➡️计算嵌入向量➡️在向量存储中检索相似向量➡️返回相似向量对应的内容。

def retrieval(query, embed_model, client):
    """检索并返回上下文"""
    query_embedding = embed_model.embed_query(query)  # 查询嵌入
    context = client.search(
        collection_name="demo_collection",  # collection 名称
        data=[query_embedding],  # 搜索的向量
        anns_field="vector",  # 进行向量搜索的字段
        # 度量方式:L2 欧氏距离/IP 内积/COSINE 余弦相似度
        search_params={"metric_type": "L2"},
        output_fields=["text", "metadata"],  # 输出字段
        limit=3,  # 搜索结果数量
    )
    return context

context = retrieval("不动产被占有了怎么办?", embed_model, client)
print(context)
3.6.2 生成
# ========== 生成 ==========
llm = init_chat_model(
    model="openai/gpt-oss-20b:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

template = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "# 任务\n\n根据上下文参考,回答用户的问题。\n\n# 上下文参考\n\n{context}",
        ),
        ("human", "{query}"),
    ]
)

rag_chain = (
    {
        "query": RunnablePassthrough(),
        "context": lambda x: retrieval(query=x, embed_model=embed_model, client=client),
    }
    | RunnableLambda(lambda x: print(x) or x)  # 打印中间结果
    | template
    | llm
    | StrOutputParser()
)
res_chunks = rag_chain.stream(input="不动产被占有了怎么办?")
for chunk in res_chunks:
    print(chunk, end="", flush=True)

Agent

Agent介绍

通用人工智能(AGI)将是AI 的终极形态,几乎已成为业界共识。同样,构建Agent则是AI 工程应用当下的“终极形态”。

语言模型本身无法采取行动——它们只是输出文本。LangChain 的一个重要功能是创建Agent。Agent 是一种使用 LLM 作为推理引擎的系统,它决定要采取哪些行动以及这些行动的输入应该是什么。这些行动的结果可以反馈给 Agent,由 Agent 决定是否需要采取更多行动,或者是否可以完成。

与传统的固定流程链不同,Agent 具备一定的自主决策能力,更适合处理开放式、多步骤的问题。它可以拆解任务,根据任务动态决定调用哪些工具,并利用中间结果推进任务。

Agent 的核心能力/组件:

(1)大模型(LLM):作为大脑,提供推理、规划和知识理解能力。

(2)记忆(Memory):具备短期记忆和长期记忆,支持快速知识检索。

(3)工具(Tools):调用外部工具(如API、数据库)的执行单元。

(4)规划(Planning):任务分解、反思与自省框架实现复杂任务处理。

(5)行动(Action):实际执行决策的能力。

(6)协作:通过与其他Agent 交互合作,完成更复杂的任务目标。

Tools

工具封装了一个可调用函数及其输入模式。这些参数可以传递给兼容的聊天模型,从而允许模型决定是否调用工具以及调用哪些参数。在这种情况下,工具调用使模型能够生成符合指定输入模式的请求。

4.2.1.创建工具

一个Tool 通常包括工具名称,工具描述,以及工具参数的类型注解。

可以通过 @tool 装饰器来创建工具。

from langchain.tools import tool

@tool
def add_number(a: int, b: int) -> int:
    """两个整数相加"""
    return a + b

print(f"{add_number.name=}\n{add_number.description=}\n{add_number.args=}")

# add_number.name='add_number'
# add_number.description='两个整数相加'
# add_number.args={'a': {'title': 'A', 'type': 'integer'}, 'b': {'title': 'B', 'type': 'integer'}}
from langchain.tools import tool
from pydantic import BaseModel, Field

class FieldInfo(BaseModel):
    a: int = Field(description="第1个参数")
    b: int = Field(description="第2个参数")

@tool(
    name_or_callable="add_2_number",
    description="计算两整数之和",
    args_schema=FieldInfo,  # 定义参数模式
)
def add_number(a: int, b: int) -> int:
    """两个整数相加"""
    return a + b

print(f"{add_number.name=}\n{add_number.description=}\n{add_number.args=}")

# add_number.name='add_2_number'
# add_number.description='计算两整数之和'
# add_number.args={'a': {'description': '第1个参数', 'title': 'A', 'type': 'integer'}, 'b': {'description': '第2个参数', 'title': 'B', 'type': 'integer'}}

4.2.2 绑定工具

要想让大模型能够使用工具,首先需要将工具给到大模型。

创建模型实例,并通过bind_tools 方法将工具绑定到大模型。

(1)大模型通过分析用户需求,判断是否需要调用工具。

(2)如果需要则在响应的_additional_kwargs_参数中包含工具调用的详细信息。

(3)使用模型提供的参数执行工具。

import os
from langchain.tools import tool
from langchain.chat_models import init_chat_model

@tool
def query_user_info(user_id: int) -> str:
    """查询用户信息"""
    return {1001: "Jack", 1002: "Tom", 1003: "Alice"}[user_id]

llm = init_chat_model(
    model="z-ai/glm-4.5-air:free",
    model_provider="openai",
    base_url="https://openrouter.ai/api/v1",
    api_key=os.getenv("OPENROUTER_API_KEY"),
)

# 为模型提供工具
tools = [query_user_info]
llm_with_tools = llm.bind_tools(tools)
resp = llm_with_tools.invoke("帮我查下1001用户的信息")
print(resp)
# content='\n\n我来帮您查询1001用户的信息。\n'
# additional_kwargs={'tool_calls': [{'id': '...', 'function': {'arguments': '{"user_id": 1001}', 'name': 'query_user_info'}

# 返回的响应中 additional_kwargs 参数中包括了工具调用的信息,此时还没有调用工具,只是返回了要调用的工具及参数

# 手动执行工具
for tool_call in resp.tool_calls:
    tool_name = tool_call["name"]  # 获取工具名称
    tool_args = tool_call["args"]  # 获取工具参数
    tool_result = globals()[tool_name].invoke(tool_args)  # 执行工具
    print(tool_name, tool_args, tool_result)
构建Agent

使用 create_agent 来创建 Agent,create_agent 使用 LangGraph 构建基于图的 Agent运行时。此 Agent 会在一个循环中反复调用模型和工具,直到某次模型输出中不再包含工具调用则结束。

使用 create_agent 创建 Agent 时,需传入模型和工具、可选地也可以传入系统提示词。

这里使用Tavily(搜索引擎)作为工具,需要先获取它的API-Key 并添加到环境变量。

from langchain_tavily import TavilySearch
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
import dotenv
dotenv.load_dotenv()
llm = ChatOpenAI(
    model='qwen-vl-plus'
)

search = TavilySearch(max_results=5)
tools=[search]

agent = create_agent(
    model=llm,
    tools=tools,
    system_prompt="你是一名助手,需要调用工具来帮助用户解决问题。"
)


res=agent.invoke({
    "messages": [
        {"role": "user", "content": "重庆天气怎么样"}
    ]
})
print(res)
LangSmith

使用 LangChain 构建的许多应用程序都包含多个步骤,需要多次调用LLM。随着这些应用程序变得越来越复杂,能够检查链或 Agent 内部的具体情况变得至关重要。最好的方法是使用LangSmith

注册LangSmith,在 Settings ➡️ API Keys 下创建 API-Key 并复制。之后在环境变量中添加以开始记录跟踪:

LANGSMITH_TRACING="true"
LANGSMITH_API_KEY="..."

配置好环境变量之后,可在 LangSmith 的Tracing Projects 中查看跟踪记录。

LangSmith默认将跟踪记录到default 项目,可通过LANGSMITH_PROJECT环境变量设置LangSmith跟踪记录保存到哪个项目,如果该项目不存在则会创建。

记忆

为了给代理添加短期记忆(线程级持久化),在创建代理时需要指定一个 checkpointer,并在调用代理时指定线程 ID。这个短期记忆的能力是借助 LangGraph 的状态和检查点实现的。

from langchain_tavily import TavilySearch
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
import dotenv
import datetime

dotenv.load_dotenv()
llm = ChatOpenAI(
    model='qwen-vl-plus'
)

search = TavilySearch(max_results=5)
tools=[search]

agent = create_agent(
    model=llm,
    tools=tools,
    checkpointer=InMemorySaver(),
    # 需要给大模型提示使用工具
    system_prompt="你是一名助手,需要调用工具来帮助用户解决问题。"
)

for chunk in agent.stream(
    input={
        'messages':[
            {'role':'system','content': f"当前时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",},
            {'role':'user','content':'今天晚上重庆天气怎么样?'}
        ]
    },
    config={'configurable':{'thread_id':'1'}},
):
    print(chunk,end='\n\n')

for chunk in agent.stream(
    input={
        "messages": [
            {
                "role": "system",
                "content": f"当前时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
            },
            {"role": "user", "content": "上海呢?"},
        ]
    },
    config={"configurable": {"thread_id": "1"}},
):
    print(chunk, end="\n\n")
MCP

Model Context Protocol(MCP,模型上下文协议)是一个开源协议,它标准化了大语言模型与外部工具和数据源通信的方式,允许开发者和工具提供商只需集成一次,就能与任何兼容 MCP 的系统交互。MCP 就像 USB-C 标准:不需要为每个设备使用不同的连接器,而是使用一个端口来处理多种类型的连接。

MCP 架构、
MCP 主机 协调和管理一个或多个 MCP 客户端的 AI 应用
MCP 客户端 一个保持与 MCP 服务器连接的组件,通过 MCP 定义的消息处理通信,从服务器查找并请求资源和工具,并管理与服务器的连接生命周期
MCP 服务器 一个向 MCP 客户端提供服务的程序,通过协议暴露工具、资源和提示模板功能

MCP 层级

MCP 分为两个层级:

(1)数据层

数据层实现了一个基于 JSON-RPC 2.0 的交换协议,该协议定义了消息结构和语义。

数据层包括生命周期管理(连接初始化、能力协商、连接终止)、服务器功能(提供工具、资源和提示模板)、客户端功能(调用LLM、获取输入、记录消息)、其他功能(实时更新通知、长时运行操作跟踪)。

(2)传输层

传输层定义了客户端与服务器之间数据交换的通信机制和通道,包括特定传输方式的连接建立、消息帧定界和授权。

MCP 支持多种传输机制,包括Stdio、Streamable HTTP、SSE。

Stdio 使用标准输入和输出流,与在终端输入命令并看到响应时使用的机制相同。适用于本地开发
Streamable HTTP 该传输使用 HTTP POST 和 GET 请求,服务器可以选择使用SSE来流式传输多个服务器消息。支持流式传输和服务器到客户端通知,并支持标准 HTTP 身份验证方法,包括授权令牌、API 密钥和自定义头信息
SSE 带有 SSE(Server-Sent Events 服务器发送事件)的 HTTP,MCP早期传输机制,现逐渐被 Streamable HTTP 取代
MCP 工作流程

(1)初始化

在初始化过程中,AI 应用程序的 MCP 客户端管理器连接到配置的服务器,并将它们的能力存储起来以供后续使用。应用程序使用这些信息来确定哪些服务器可以提供特定类型的功能(工具、资源、提示),以及它们是否支持实时更新。

初始化有几个重要的作用:

协议版本协商 确保客户端和服务器使用兼容的协议版本,避免因版本不一致导致的通信问题
能力发现 声明各自支持的功能,包括他们能够处理的基元类型(工具、资源、提示)以及是否支持通知等特性
身份交换 交换客户端与服务器的身份及版本信息,便于后续的调试与兼容性管理

(2)工具发现

AI 应用程序从所有连接的 MCP 服务器中获取可用工具,并将它们组合成一个语言模型可以访问的统一工具注册表。这使得 LLM 能够理解它可以执行哪些操作,并在对话期间自动生成相应的工具调用。

连接建立之后,客户端可以通过发送 tools/list 请求来发现可用的工具。这个请求是 MCP 工具发现机制的基础—它允许客户端在尝试使用工具之前了解服务器上有哪些可用的工具。响应包含一个 tools 数组,该数组提供了关于每个可用工具的全面元数据。这种基于数组的结构允许服务器同时展示多个工具,同时保持不同功能之间的清晰界限。响应中的每个工具包括几个关键字段:

name 工具标识符
title 工具的易读显示名称
description 工具描述
inputSchema 一个定义预期输入参数的 JSON Schema,支持类型验证并提供关于必需和可选参数的清晰文档

(3)工具执行

当语言模型在对话中决定使用工具时,AI 应用程序会拦截工具调用,将其路由到合适的 MCP 服务器,执行该工具,并将结果作为对话流程的一部分返回给 LLM。这使 LLM 能够访问实时数据并在外部世界中执行操作。

客户端使用 tools/call 方法执行一个工具。tools/call 请求遵循结构化格式,确保客户端和服务器之间的类型安全和清晰通信。请求结构包括几个重要组件:

name 工具标识符
arguments 包含工具的 inputSchema 定义的输入参数

响应返回一个内容对象数组,允许进行丰富、多格式的响应(文本、图片、资源等)。每个内容对象都有一个 type 字段。

(4)实时更新

MCP 支持实时通知,使服务器能够在未经明确请求的情况下通知客户端有关变更。当 AI 应用程序收到关于工具变更的通知时,它会立即刷新其工具注册表并更新 LLM 的可用功能。这确保了正在进行的对话始终能够访问最新的一组工具,并且 LLM 可以随着新功能的可用而动态适应。

MCP SDK

Stdio 服务端与客户端

# pip add mcp
from mcp.server.fastmcp import FastMCP

# 创建 MCP 实例
mcp = FastMCP("Demo")

# 为 MCP 实例添加工具
@mcp.tool()
def add(a: int, b: int) -> int:
    return a + b

# 为 MCP 实例添加资源
@mcp.resource("greeting://default")
def get_greeting() -> str:
    return "Hello from static resource!"

# 为 MCP 实例添加提示词
@mcp.prompt()
def greet_user(name: str, style: str = "friendly") -> str:
    styles = {
        "friendly": "写一句友善的问候",
        "formal": "写一句正式的问候",
        "casual": "写一句轻松的问候",
    }
    return f"为{name}{styles.get(style, styles['friendly'])}"

if __name__ == "__main__":
    mcp.run(transport="stdio")
# pip install mcp
import asyncio
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters

async def stdio_run():
    server_params = StdioServerParameters(
        command="python",
        args=["mcp_server_stdio.py"],
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 初始化连接
            await session.initialize()

            # 获取可用工具
            tools = await session.list_tools()
            print(tools)
            print()

            # 调用工具
            call_res = await session.call_tool("add", {"a": 1, "b": 2})
            print(call_res)
            print()

            # 获取可用资源
            resources = await session.list_resources()
            print(resources)
            print()

            # 调用资源
            read_res = await session.read_resource("greeting://default")
            print(read_res)
            print()

            # 获取可用提示
            prompts = await session.list_prompts()
            print(prompts)
            print()

            # 调用提示
            get_res = await session.get_prompt("greet_user", {"name": "Jack"})
            print(get_res)
            print()

asyncio.run(stdio_run())

Streamable HTTP 服务端与客户端

# pip add mcp
from mcp.server.fastmcp import FastMCP

# 创建 MCP 实例
mcp = FastMCP("Demo")

# 为 MCP 实例添加工具
@mcp.tool()
def add(a: int, b: int) -> int:
    return a + b

# 为 MCP 实例添加资源
@mcp.resource("greeting://default")
def get_greeting() -> str:
    return "Hello from static resource!"

# 为 MCP 实例添加提示词
@mcp.prompt()
def greet_user(name: str, style: str = "friendly") -> str:
    styles = {
        "friendly": "写一句友善的问候",
        "formal": "写一句正式的问候",
        "casual": "写一句轻松的问候",
    }
    return f"为{name}{styles.get(style, styles['friendly'])}"

if __name__ == "__main__":
    # mcp.settings.host = "0.0.0.0"
    # mcp.settings.port = 8888
    mcp.run(transport="streamable-http")  # 默认启动在 127.0.0.1:8000
# pip install mcp
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def streamablehttp_run():
    url = "http://127.0.0.1:8000/mcp"
    headers = {"Authorization": "Bearer sk-atguigu"}

    async with streamablehttp_client(url, headers) as (read, write, _):
        async with ClientSession(read, write) as session:
            # 初始化连接
            await session.initialize()

            # 获取可用工具
            tools = await session.list_tools()
            print(tools)
            print()

            # 调用工具
            call_res = await session.call_tool("add", {"a": 1, "b": 2})
            print(call_res)
            print()

            # 获取可用资源
            resources = await session.list_resources()
            print(resources)
            print()

            # 调用资源
            read_res = await session.read_resource("greeting://default")
            print(read_res)
            print()

            # 获取可用提示
            prompts = await session.list_prompts()
            print(prompts)
            print()

            # 调用提示
            get_res = await session.get_prompt("greet_user", {"name": "Jack"})
            print(get_res)
            print()

asyncio.run(streamablehttp_run())

将多个 Streamable HTTP 服务器挂载到 ASGI 服务器

ASGI(Asynchronous Server Gateway Interface)是 Python 的异步 Web 服务器接口标准,定义了服务器与应用之间的通信协议,支持异步调用,能够处理高并发和长连接。

可以使用 streamable_http_app 方法将 StreamableHTTP 服务器挂载到现有的 ASGI 服务器。这允许将 StreamableHTTP 服务器与其他 ASGI 应用程序集成。

# pip add mcp fastapi
import uvicorn
import contextlib
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP

# 创建 MCP 实例
tool_mcp = FastMCP("tool server")
resource_mcp = FastMCP("resource server")
prompt_mcp = FastMCP("prompt server")

# 为 tool_mcp 实例添加工具
@tool_mcp.tool()
def add(a: int, b: int) -> int:
    return a + b

# 为 resource_mcp 实例添加资源
@resource_mcp.resource("greeting://default")
def get_greeting() -> str:
    return "Hello from static resource!"

# 为 prompt_mcp 实例添加提示词
@prompt_mcp.prompt()
def greet_user(name: str, style: str = "friendly") -> str:
    styles = {
        "friendly": "写一句友善的问候",
        "formal": "写一句正式的问候",
        "casual": "写一句轻松的问候",
    }
    return f"为{name}{styles.get(style, styles['friendly'])}"

# 设置 MCP 的 HTTP 根路径
tool_mcp.settings.streamable_http_path = "/"
resource_mcp.settings.streamable_http_path = "/"
prompt_mcp.settings.streamable_http_path = "/"

# 创建一个组合生命周期来管理会话管理器
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI): 
    async with contextlib.AsyncExitStack() as stack:
        await stack.enter_async_context(tool_mcp.session_manager.run())
        await stack.enter_async_context(resource_mcp.session_manager.run())
        await stack.enter_async_context(prompt_mcp.session_manager.run())
        yield

app = FastAPI(lifespan=lifespan)

# 挂载 MCP 服务器
app.mount("/tool", tool_mcp.streamable_http_app())
app.mount("/resource", resource_mcp.streamable_http_app())
app.mount("/prompt", prompt_mcp.streamable_http_app())

if __name__ == "__main__":
    uvicorn.run(app)

客户端代码和之前一致,注意修改 URL 路径。

LangChain 使用 MCP

LangChain Agent 可以通过 langchain-mcp-adapters 包使用 MCP 服务器上定义的工具。

这里使用了如下工具,需要先在相关平台创建 API-Key 并添加到环境变量:

阿里云百炼:https://bailian.console.aliyun.com/?tab=mcp#/mcp-market/detail/WebSearch

Smithery:https://smithery.ai/server/@DeniseLewis200081/rail

import os
import asyncio
from urllib.parse import urlencode
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
import dotenv
dotenv.load_dotenv()
# 配置 MCP 客户端
mcp_client = MultiServerMCPClient(
    {
        "WebSearch": {
            "transport": "streamable_http",
            "url": "https://dashscope.aliyuncs.com/api/v1/mcps/WebSearch/mcp",
            "headers": {"Authorization": f"Bearer {os.getenv('DASHSCOPE_API_KEY')}"},
        },  # https://bailian.console.aliyun.com/?tab=mcp#/mcp-market/detail/WebSearch
        "RailService": {
            "transport": "streamable_http",
            "url": f"{'https://server.smithery.ai/@DeniseLewis200081/rail/mcp'}?{urlencode({'api_key': os.getenv('SMITHERY_API_KEY')})}",
        },  # https://smithery.ai/server/@DeniseLewis200081/rail
    }
)
# 获取工具
tools = asyncio.run(mcp_client.get_tools())

llm=ChatOpenAI(
    model='qwen3-vl-plus'
)

agent = create_agent(model=llm,tools=tools)

async def main():
    async for chunk in agent.astream(
        {
            "messages": [
                {"role": "system", "content": "你是位助手,需要调用工具来帮助用户。"},
                {
                    "role": "user",
                    "content": "北京今天天气怎么样,要是还不错的话,帮我看看今天上海到北京的车票",
                },
            ]
        }
    ):
        print(chunk, end="\n\n")

asyncio.run(main())
监督者模式多 Agnet 架构

监督者(主管)模式是一种多 Agnet 架构,其中中央主管 Agnet 负责协调各专业工作 Agnet 。当任务需要不同类型的专业知识时,这种方法非常有效。与其构建一个管理跨领域工具选择的 Agnet ,不如创建由了解整体工作流程的主管协调的、专注的专家。

在 LangChain 中可以将 Agent 封装为工具,将工具绑定到主管 Agent 来实现主管多代理模式。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐