LangChain v1.0 完整学习教程
LangChain v1.0 完整学习教程
版本: LangChain 1.0+ | 语言: Python 3.10+ | 更新日期: 2026-06-05
目录
第一部分:基础入门
第二部分:核心框架
第三部分:数据与检索
第四部分:进阶主题
- LangGraph 状态图基础
- 多 Agent 协作与 Supervisor 模式
- 记忆与持久化
- 流式输出详解
- Human-in-the-Loop
- 回调系统与监控
- 生产环境最佳实践
- v0.x → v1.0 迁移指南
1. v1.0 概述与环境搭建
1.1 LangChain v1.0 核心变革
LangChain v1.0 于 2025年10月正式发布,是一次彻底的架构升级。核心变化:
| 维度 | v0.x (旧版) | v1.0 (新版) |
|---|---|---|
| Agent 创建 | AgentExecutor, initialize_agent, create_react_agent 多种方式 |
create_agent() 统一入口 |
| 链式调用 | LLMChain, ConversationChain 等硬编码链 |
LCEL ` |
| 消息内容 | 各厂商格式不统一,需写适配代码 | content_blocks 跨厂商统一视图 |
| 记忆系统 | RunnableWithMessageHistory |
LangGraph Checkpointer |
| 工具定义 | 需继承 BaseTool 类 | 纯 Python 函数 + 类型注解 |
| 模型初始化 | 各厂商各自导入 | init_chat_model() 统一入口 |
| 扩展机制 | Callbacks | Middleware 中间件系统 (洋葱模型) |
| 运行时 | 自建运行时 | 底层基于 LangGraph |
1.2 包架构:模块化拆分
v1.0 将原来的 langchain 大包拆分为多个独立包:
| 包名 | 定位 | 安装场景 |
|---|---|---|
langchain-core |
核心抽象层 + LCEL + Runnable 协议 | 所有项目必须依赖 |
langchain |
主包,create_agent() 入口 |
日常开发首选 |
langchain-openai |
OpenAI 官方集成 | 使用 OpenAI 模型 |
langchain-anthropic |
Anthropic Claude 官方集成 | 使用 Claude |
langchain-google-genai |
Google Gemini 官方集成 | 使用 Gemini |
langchain-deepseek |
DeepSeek 官方集成 | 使用 DeepSeek |
langchain-community |
社区第三方集成 | 原型验证阶段 |
langchain-classic |
旧版兼容包 (v0.x Legacy) | v0.x 项目渐进迁移 |
langchain-text-splitters |
文本分割 | 文档处理场景 |
langgraph |
状态图运行时 | 复杂工作流、多Agent |
langgraph-supervisor |
Supervisor 多Agent模式 | 多Agent编排 |
1.3 安装指南
# === 新项目推荐安装(最小化) ===
pip install langchain langchain-openai
# 或使用 uv(更快)
uv add langchain langchain-openai
# === 完整安装(常用组合) ===
pip install langchain
pip install langchain-openai # OpenAI
pip install langchain-anthropic # Anthropic Claude
pip install langchain-google-genai # Google Gemini
pip install langchain-deepseek # DeepSeek
pip install langchain-community # 社区集成
pip install langchain-text-splitters # 文本分割
pip install langgraph # LangGraph 运行时
# === 向量数据库(按需选装) ===
pip install langchain-chroma chromadb # Chroma
pip install faiss-cpu # FAISS (CPU版)
# pip install faiss-gpu # FAISS (GPU版)
pip install langchain-pinecone # Pinecone
# === v0.x 项目迁移 ===
pip install langchain-classic # 旧版 Chains/Agents 兼容层
# === 开发工具 ===
pip install langsmith # 官方监控调试平台
pip install tiktoken # Token 计数
pip install python-dotenv # 环境变量管理
1.4 环境变量配置
# .env 文件
"""
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=...
DEEPSEEK_API_KEY=sk-...
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=ls_...
LANGCHAIN_PROJECT=my-project
"""
import os
from dotenv import load_dotenv
load_dotenv() # 自动加载 .env 文件
# 方式一:环境变量
os.environ["OPENAI_API_KEY"] = "sk-..."
# 方式二:构造时传入
from langchain_openai import ChatOpenAI
model = ChatOpenAI(api_key="sk-...", model="gpt-4o")
1.5 第一个 v1.0 应用
# === 方式一:LCEL 链式调用 ===
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_template("用{language}解释什么是{topic}")
parser = StrOutputParser()
chain = prompt | model | parser
result = chain.invoke({"language": "中文", "topic": "LangChain"})
print(result)
# === 方式二:create_agent (推荐) ===
from langchain.agents import create_agent
agent = create_agent(
model="openai:gpt-4o",
system_prompt="你是一个有帮助的AI助手。",
)
result = agent.invoke({
"messages": [{"role": "user", "content": "用中文解释什么是LangChain"}]
})
print(result["messages"][-1].content)
2. 模型初始化与统一接口
2.1 init_chat_model:统一模型入口
v1.0 推荐使用 init_chat_model() 实现厂商无关的模型初始化:
from langchain.chat_models import init_chat_model
# 方式一:冒号分隔的合并写法(最简洁)
model = init_chat_model("openai:gpt-4o")
model = init_chat_model("anthropic:claude-sonnet-4-6")
model = init_chat_model("google:gemini-2.0-flash")
model = init_chat_model("deepseek:deepseek-chat")
# 方式二:显式声明 provider
model = init_chat_model("gpt-4o", model_provider="openai")
model = init_chat_model("claude-sonnet-4-6", model_provider="anthropic")
# 方式三:直接传入 ChatModel 实例(最灵活)
from langchain_openai import ChatOpenAI
model = init_chat_model(ChatOpenAI(model="gpt-4o", temperature=0.7))
2.2 ChatOpenAI 详解
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4o", # 模型名称
temperature=0.7, # 随机性 (0-2)
max_tokens=4096, # 最大输出 token 数
top_p=0.95, # 核采样
frequency_penalty=0.0, # 频率惩罚 (-2.0 ~ 2.0)
presence_penalty=0.0, # 存在惩罚 (-2.0 ~ 2.0)
seed=42, # 随机种子(确定性输出)
timeout=60, # 请求超时(秒)
max_retries=3, # 最大重试次数
streaming=True, # 启用流式输出
)
# 基础调用
response = model.invoke("你好,请介绍一下自己")
print(response.content)
# 使用消息列表
from langchain_core.messages import HumanMessage, SystemMessage
messages = [
SystemMessage(content="你是一个资深的Python工程师。"),
HumanMessage(content="如何优化Django API的性能?"),
]
response = model.invoke(messages)
print(response.content)
2.3 ChatAnthropic (Claude)
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(
model="claude-sonnet-4-6",
temperature=0.7,
max_tokens=4096,
thinking={"type": "enabled", "budget_tokens": 2000}, # 启用思考
)
response = model.invoke("请详细推导一下费马大定理的证明思路。")
# 使用 content_blocks 访问思考过程
for block in response.content_blocks:
if block["type"] == "reasoning":
print(f"[思考过程]: {block['reasoning'][:200]}...")
elif block["type"] == "text":
print(f"[回答]: {block['text'][:200]}...")
2.4 bind() 绑定参数
# bind() 在运行时绑定额外参数
model = ChatOpenAI(model="gpt-4o")
# 绑定工具
model_with_tools = model.bind_tools(
tools=[search_tool, calculator_tool],
tool_choice="auto", # auto | any | none | required
)
# 绑定停止词
model_with_stop = model.bind(stop=["\n\nHuman:", "\n\nSystem:"])
# 绑定响应格式
model_json = model.bind(response_format={"type": "json_object"})
# 链式绑定
configured_model = (
model
.bind(temperature=0.3)
.bind(max_tokens=2000)
.bind(stop=["\n\n"])
)
2.5 消息类型详解
from langchain_core.messages import (
HumanMessage, # 用户消息
AIMessage, # AI 回复
SystemMessage, # 系统提示
ToolMessage, # 工具调用结果
AIMessageChunk, # 流式输出的 AI 消息块
ToolCallChunk, # 流式输出的工具调用块
)
# HumanMessage 支持多模态内容
message = HumanMessage(content=[
{"type": "text", "text": "请描述这张图片"},
{"type": "image_url", "image_url": {"url": "https://example.com/photo.jpg"}},
])
# 构建对话
conversation = [
SystemMessage(content="你是一个有帮助的助手。"),
HumanMessage(content="我叫小明。"),
AIMessage(content="你好小明!有什么可以帮你的?"),
HumanMessage(content="我刚才说我的名字是什么?"),
]
response = model.invoke(conversation)
2.6 多模型切换与 Fallback
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
primary = ChatOpenAI(model="gpt-4o")
backup = ChatAnthropic(model="claude-sonnet-4-6")
# with_fallbacks:主模型失败时自动切换
robust_model = primary.with_fallbacks([backup])
# 带重试的模型
from langchain.agents.middleware import ModelFallbackMiddleware
agent = create_agent(
model=primary,
middleware=[
ModelFallbackMiddleware(
primary,
ChatAnthropic(model="claude-sonnet-4-6"),
ChatOpenAI(model="gpt-4o-mini"), # 第三顺位
),
],
)
3. Content Blocks:跨厂商统一内容视图
3.1 为什么需要 Content Blocks
不同 LLM 厂商的响应结构各不相同。v1.0 通过 content_blocks 提供统一的内容视图:
from langchain.chat_models import init_chat_model
# 不同厂商的模型
openai_model = init_chat_model("openai:gpt-4o")
anthropic_model = init_chat_model("anthropic:claude-sonnet-4-6")
# 使用 content_blocks 统一处理,无需写厂商适配代码
def print_response(response):
print(f"[模型: {response.response_metadata.get('model_name', 'unknown')}]")
for block in response.content_blocks:
block_type = block["type"]
if block_type == "text":
print(f" 文本: {block['text'][:100]}...")
elif block_type == "reasoning":
print(f" 思考: {block['reasoning'][:100]}...")
elif block_type == "tool_use":
print(f" 工具调用: {block['name']}({block['input']})")
print(" ---")
# 统一处理逻辑,适用于所有厂商
print_response(openai_model.invoke("hello"))
print_response(anthropic_model.invoke("hello"))
3.2 Content Block 类型详解
# v1.0 支持的 Content Block 类型:
# - text: 标准文本输出(含 citations/annotations)
# - reasoning: 模型推理输出(thinking / chain-of-thought)
# - tool_use: 工具/函数调用请求
# - tool_use_summary: 工具调用结果摘要
# - image: 图像数据
# - audio: 音频数据
# - citation: 引用来源
# - web_search_call: Web搜索查询(厂商特定)
# - web_search_result: Web搜索结果(厂商特定)
# - server_tool_use: 服务端工具调用
3.3 处理含推理的响应
# Claude 启用 thinking 模式
model = init_chat_model(
"anthropic:claude-sonnet-4-6",
thinking={"type": "enabled", "budget_tokens": 5000},
)
response = model.invoke("解释量子纠缠的基本原理")
# content_blocks 将推理和回答分开
for block in response.content_blocks:
if block["type"] == "reasoning":
print("=" * 40)
print("【思考过程】")
print("=" * 40)
print(block["reasoning"])
print()
elif block["type"] == "text":
print("=" * 40)
print("【最终回答】")
print("=" * 40)
print(block["text"])
# 传统 .content 是 content_blocks 中所有 text 块的拼接
print(response.content) # 仅文本部分
3.4 处理工具调用响应
model = init_chat_model("openai:gpt-4o")
model_with_tools = model.bind_tools([search_tool, calculator_tool])
response = model_with_tools.invoke([
HumanMessage(content="搜索2026年AI最新进展并计算 15*23")
])
for block in response.content_blocks:
if block["type"] == "text":
print(f"[文本] {block['text']}")
elif block["type"] == "tool_use":
print(f"[工具调用] {block['name']}")
print(f" 参数: {block['input']}")
print(f" ID: {block['id']}")
3.5 构建多模态输入消息
# HumanMessage 直接支持 content_blocks 格式
message = HumanMessage(content=[
{"type": "text", "text": "请分析这些数据:"},
{"type": "image_url", "image_url": {"url": "https://example.com/chart.png"}},
])
# 或使用本地文件
import base64
with open("document.pdf", "rb") as f:
pdf_base64 = base64.b64encode(f.read()).decode()
message = HumanMessage(content=[
{"type": "text", "text": "请总结这份PDF文档"},
{
"type": "file",
"file": {
"filename": "document.pdf",
"file_data": pdf_base64,
"media_type": "application/pdf",
},
},
])
response = model.invoke([message])
4. 提示模板与消息系统
4.1 ChatPromptTemplate 基础
from langchain_core.prompts import ChatPromptTemplate
# from_template — 最简方式
prompt = ChatPromptTemplate.from_template(
"你是一个{role}。请用{style}风格解释{topic}。"
)
messages = prompt.invoke({
"role": "Python专家", "style": "通俗易懂", "topic": "装饰器"
})
print(messages)
# from_messages — 构建多消息模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个{role}。使用{language}回答。"),
("human", "{question}"),
("ai", "好的,我会用{language}回答。"),
("human", "请继续展开说明。"),
])
messages = prompt.invoke({
"role": "数学老师", "language": "中文",
"question": "什么是微积分?",
})
4.2 MessagesPlaceholder:动态插入消息
from langchain_core.prompts import (
ChatPromptTemplate,
MessagesPlaceholder,
)
# 动态插入对话历史(Agent 常用模式)
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有帮助的助手。"),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
])
from langchain_core.messages import HumanMessage, AIMessage
result = prompt.invoke({
"history": [
HumanMessage(content="我喜欢Python"),
AIMessage(content="Python是一门很棒的语言!"),
HumanMessage(content="有哪些好用的库?"),
AIMessage(content="NumPy、Pandas、FastAPI等等。"),
],
"question": "那NumPy主要用来做什么?",
})
print(result)
4.3 部分变量 (partial)
prompt = ChatPromptTemplate.from_template(
"你是一个{role}。当前日期是{current_date}。请回答:{question}"
)
# 方式一:partial 预填充静态值
partial_prompt = prompt.partial(role="法律顾问")
result = partial_prompt.invoke({
"current_date": "2026-06-05",
"question": "什么是合同生效的要件?",
})
# 方式二:partial 使用函数动态生成值
from datetime import datetime
def get_today():
return datetime.now().strftime("%Y年%m月%d日")
prompt_with_date = prompt.partial(current_date=get_today)
result = prompt_with_date.invoke({
"role": "助手",
"question": "今天星期几?",
})
4.4 Few-Shot 少样本提示
from langchain_core.prompts import (
ChatPromptTemplate,
FewShotChatMessagePromptTemplate,
)
# 定义示例
examples = [
{"input": "这个产品质量太差了,用了一次就坏了", "output": "negative"},
{"input": "物流很快,包装也很用心,非常满意", "output": "positive"},
{"input": "一般般吧,没什么特别的", "output": "neutral"},
{"input": "贵是贵了点,但是效果确实好", "output": "positive"},
]
# 示例模板
example_prompt = ChatPromptTemplate.from_messages([
("human", "{input}"),
("ai", "{output}"),
])
# 创建 Few-Shot 提示
few_shot_prompt = FewShotChatMessagePromptTemplate(
example_prompt=example_prompt,
examples=examples,
)
# 组合到完整提示
final_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个情感分析专家。将用户评论分类为:positive、negative、neutral。"),
few_shot_prompt,
("human", "{input}"),
])
chain = final_prompt | model | StrOutputParser()
result = chain.invoke({"input": "包装很精美但发货太慢了"})
print(result) # 预期: neutral
4.5 动态 Few-Shot (基于相似度选择示例)
from langchain_core.prompts import (
ChatPromptTemplate,
FewShotChatMessagePromptTemplate,
)
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
# 基于语义相似度自动选择最相关的示例
example_selector = SemanticSimilarityExampleSelector.from_examples(
examples=examples,
embeddings=OpenAIEmbeddings(),
vectorstore_cls=Chroma,
k=2, # 每次选择最相关的2个示例
)
dynamic_few_shot = FewShotChatMessagePromptTemplate(
example_prompt=example_prompt,
example_selector=example_selector,
)
final_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个情感分析专家。"),
dynamic_few_shot,
("human", "{input}"),
])
4.6 提示模板最佳实践
# 典型的 Agent 系统提示结构
SYSTEM_PROMPT = """你是一个{department}部门的{role}。
## 你的职责
{responsibilities}
## 工作规范
{rules}
## 当前时间
{current_date}
## 注意事项
- 只能基于提供的工具和数据回答
- 不确定时请明确说明
- 复杂问题请分步骤处理
"""
prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT),
MessagesPlaceholder("messages"),
])
# 使用时填充变量
formatted_prompt = prompt.invoke({
"department": "技术支持",
"role": "高级工程师",
"responsibilities": "- 解答用户技术问题\n- 提供代码示例\n- 排查系统故障",
"rules": "- 优先使用官方文档\n- 给出可运行的代码\n- 标注信息来源",
"current_date": "2026-06-05",
"messages": [HumanMessage(content="我的服务启动报错怎么办?")],
})
5. 结构化输出
5.1 with_structured_output (推荐方式)
v1.0 中获取结构化输出最推荐的方式。内部会自动选择最优策略(Tool Calling 或 JSON Mode):
from pydantic import BaseModel, Field
from typing import List, Optional, Literal
# 定义输出结构
class ProductReview(BaseModel):
product_name: str = Field(description="产品名称")
rating: float = Field(ge=1, le=5, description="评分 1-5")
sentiment: Literal["positive", "neutral", "negative"]
summary: str = Field(description="评价总结,50字以内")
pros: List[str] = Field(description="优点列表")
cons: List[str] = Field(description="缺点列表")
recommend: bool = Field(description="是否推荐购买")
# 一行绑定
model = ChatOpenAI(model="gpt-4o")
structured_model = model.with_structured_output(ProductReview)
# 调用,直接返回 Pydantic 对象
result = structured_model.invoke(
"iPhone 15 Pro Max 拍照效果惊艳,电池续航也比上一代好一些,"
"但价格确实太贵了,而且充电速度一般。"
)
print(f"产品: {result.product_name}")
print(f"评分: {result.rating}/5")
print(f"情感: {result.sentiment}")
print(f"推荐: {'是' if result.recommend else '否'}")
print(f"优点: {', '.join(result.pros)}")
5.2 TypedDict 方式
from typing import TypedDict, List, Optional
class ContactInfo(TypedDict):
name: str
company: str
title: str
email: Optional[str]
phone: Optional[str]
skills: List[str]
structured_model = model.with_structured_output(ContactInfo)
result = structured_model.invoke(
"张三是阿里巴巴的高级Java工程师,擅长微服务和分布式系统,"
"邮箱是zhangsan@example.com"
)
print(result)
# {'name': '张三', 'company': '阿里巴巴', 'title': '高级Java工程师',
# 'email': 'zhangsan@example.com', 'phone': None,
# 'skills': ['微服务', '分布式系统', 'Java']}
5.3 嵌套结构
from pydantic import BaseModel, Field
from typing import List
class Ingredient(BaseModel):
name: str = Field(description="食材名称")
amount: str = Field(description="用量")
unit: str = Field(description="单位(克/毫升/个等)")
class Step(BaseModel):
order: int = Field(description="步骤序号")
instruction: str = Field(description="操作说明")
duration_min: int = Field(description="预估时间(分钟)")
class NutritionalInfo(BaseModel):
calories: int = Field(description="卡路里")
protein_g: float = Field(description="蛋白质(克)")
carbs_g: float = Field(description="碳水化合物(克)")
fat_g: float = Field(description="脂肪(克)")
class Recipe(BaseModel):
"""完整的菜谱结构"""
dish_name: str
cuisine: str
difficulty: Literal["简单", "中等", "困难"]
prep_time_min: int
cook_time_min: int
servings: int
ingredients: List[Ingredient]
steps: List[Step]
nutrition: NutritionalInfo
tips: List[str] = Field(description="烹饪小贴士")
structured_model = model.with_structured_output(Recipe)
recipe = structured_model.invoke("给我一份正宗的红烧肉食谱,4人份")
print(f"菜名: {recipe.dish_name}")
print(f"难度: {recipe.difficulty}")
print(f"食材数: {len(recipe.ingredients)}")
print(f"步骤数: {len(recipe.steps)}")
print(f"热量: {recipe.nutrition.calories}千卡")
5.4 create_agent 中使用结构化输出
from langchain.agents import create_agent
from langchain.agents.structured_output import ToolStrategy
from pydantic import BaseModel, Field
from typing import List
class TravelPlan(BaseModel):
destination: str = Field(description="推荐目的地")
duration_days: int = Field(description="建议天数")
budget_estimate: float = Field(description="预估总花费(元)")
highlights: List[str] = Field(description="必去景点")
tips: List[str] = Field(description="旅行建议")
def search_travel_info(query: str) -> str:
"""搜索旅行信息"""
return f"关于'{query}'的旅行数据:..."
agent = create_agent(
model="openai:gpt-4o",
tools=[search_travel_info],
system_prompt="你是一个旅行规划师。根据用户需求推荐最佳旅行目的地。",
response_format=ToolStrategy(TravelPlan),
)
result = agent.invoke({
"messages": [{
"role": "user",
"content": "我想去一个适合带孩子的海滨城市度假,预算5000左右,5天时间。"
}]
})
plan = result["structured_response"]
print(f"推荐: {plan.destination}, {plan.duration_days}天, 约{plan.budget_estimate}元")
5.5 JSON Mode vs Tool Calling 策略选择
# with_structured_output 内部自动选择最优策略
# 方式一:Tool/Function Calling (多数模型默认)
structured_model = model.with_structured_output(
ProductReview,
method="function_calling", # 显式指定
)
# 方式二:JSON Mode (GPT-4o/GPT-4-turbo 支持)
structured_model = model.with_structured_output(
ProductReview,
method="json_mode", # 需要模型原生支持
)
# 方式三:自动选择 (推荐)
structured_model = model.with_structured_output(ProductReview)
# 内部逻辑:
# - 如果模型支持 native JSON Schema → ProviderStrategy
# - 如果模型支持 Tool Calling → ToolStrategy
# - 如果都不支持 → 回退到提示工程方式
5.6 含枚举的复杂结构
from enum import Enum
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import date
class BugSeverity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class BugReport(BaseModel):
title: str = Field(description="Bug 标题")
severity: BugSeverity = Field(description="严重程度")
description: str = Field(description="详细描述")
steps_to_reproduce: List[str] = Field(description="复现步骤")
expected_behavior: str = Field(description="预期行为")
actual_behavior: str = Field(description="实际行为")
affected_version: str = Field(description="影响版本")
suggested_fix: Optional[str] = Field(description="建议修复方案")
structured_model = model.with_structured_output(BugReport)
bug = structured_model.invoke(
"用户登录页面在输入超过100个字符的密码时会直接崩溃,"
"返回500错误。任何带特殊字符的长密码都能触发。v2.3.1 版本。"
"应该返回友好的错误提示,而不是崩溃。"
)
print(f"Bug: {bug.title}")
print(f"严重度: {bug.severity.value}")
print(f"复现步骤: {len(bug.steps_to_reproduce)}步")
6. LCEL:LangChain 表达式语言
6.1 管道操作符 |
LCEL 使用 | (管道) 操作符将组件串联起来,数据从左到右流动:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_template("将以下文本翻译成{target_lang}:\n{text}")
parser = StrOutputParser()
# LCEL 链:使用 | 串联
chain = prompt | model | parser
# 等价于
# chain = RunnableSequence(prompt, model, parser)
result = chain.invoke({
"target_lang": "英文",
"text": "今天天气非常好,适合出去郊游。",
})
print(result)
# 数据流:
# {"target_lang": "英文", "text": "..."}
# → prompt → ChatPromptValue
# → model → AIMessage
# → parser → str
6.2 RunnablePassthrough
from langchain_core.runnables import RunnablePassthrough
# 基础用法:透传数据
passthrough = RunnablePassthrough()
print(passthrough.invoke("hello")) # "hello"
# assign() — 动态添加字段到 dict
chain = RunnablePassthrough.assign(
word_count=lambda x: len(x.split()),
is_long=lambda x: len(x.split()) > 100,
)
result = chain.invoke("Hello world, this is a test sentence.")
print(result)
# {'word_count': 7, 'is_long': False}
# 在 RAG 中使用
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| model
| parser
)
# 调用时只需传入问题字符串
answer = chain.invoke("什么是机器学习?")
6.3 RunnableParallel 并行执行
from langchain_core.runnables import RunnableParallel
# 同时执行多个任务
chain = RunnableParallel(
translation=translation_prompt | model | parser,
summary=summary_prompt | model | parser,
keywords=keywords_prompt | model | parser,
)
result = chain.invoke({"text": "这是一段需要处理的文本。"})
print(result["translation"])
print(result["summary"])
print(result["keywords"])
# 并行检索 + 并行分析(实际项目示例)
analysis_chain = RunnableParallel(
sentiment=sentiment_chain,
entities=ner_chain,
categories=classification_chain,
key_phrases=keyword_chain,
)
article_analysis = analysis_chain.invoke({
"text": "苹果公司今天发布了新款iPhone,股价上涨3%..."
})
6.4 RunnableLambda
from langchain_core.runnables import RunnableLambda
def count_chars(text: str) -> int:
return len(text)
def format_count(count: int) -> str:
return f"文本共 {count} 个字符"
# 用 RunnableLambda 包装
count_chain = RunnableLambda(count_chars) | RunnableLambda(format_count)
result = count_chain.invoke("Hello World!")
print(result) # "文本共 12 个字符"
# 直接作为管道的一环
chain = prompt | model | parser | RunnableLambda(count_chars)
# 使用装饰器
@RunnableLambda
def to_uppercase(text: str) -> str:
return text.upper()
chain = prompt | model | parser | to_uppercase
6.5 RunnableBranch 条件路由
from langchain_core.runnables import RunnableBranch
# 根据条件选择不同的处理分支
branch = RunnableBranch(
(lambda x: "数学" in x["subject"], math_chain),
(lambda x: "编程" in x["subject"], coding_chain),
(lambda x: "英语" in x["subject"], english_chain),
# 默认分支
general_chain,
)
result = branch.invoke({
"subject": "编程问题",
"question": "如何优化Python代码性能?",
})
# 在复杂工作流中使用
content_router = RunnableBranch(
(lambda x: x["type"] == "image", image_processing_chain),
(lambda x: x["type"] == "audio", audio_processing_chain),
(lambda x: x["type"] == "video", video_processing_chain),
text_processing_chain,
)
6.6 RunnableConfig 运行时配置
from langchain_core.runnables import RunnableConfig
# 传递运行时配置
config = RunnableConfig(
configurable={
"model_name": "gpt-4o",
"temperature": 0.3,
"language": "zh-CN",
},
metadata={"user_id": "user-123", "session_id": "sess-abc"},
tags=["production", "v2.0"],
max_concurrency=5,
)
result = chain.invoke({"text": "hello"}, config=config)
# .with_config() 设置默认配置
configured_chain = chain.with_config(
run_name="TranslationPipeline",
tags=["translation", "v1.0.0"],
metadata={"team": "ai-platform"},
)
7. Runnable 协议详解
7.1 统一调用接口
所有组件(Prompt、Model、Tool、Chain、Agent)都实现了 Runnable 协议:
# === 同步调用 ===
# invoke: 等待完整结果
result = chain.invoke({"text": "hello"})
# batch: 批量并发处理
results = chain.batch([
{"text": "first input"},
{"text": "second input"},
{"text": "third input"},
])
# stream: 流式输出
for chunk in chain.stream({"text": "tell me a story"}):
print(chunk, end="", flush=True)
# batch_as_completed: 批量处理,先完成的先返回
for result in chain.batch_as_completed([
{"text": "input 1"},
{"text": "input 2"},
]):
print(f"完成: {result}")
# === 异步调用 ===
import asyncio
# ainvoke: 异步调用
result = await chain.ainvoke({"text": "hello"})
# abatch: 异步批量
results = await chain.abatch([
{"text": "first"}, {"text": "second"}
])
# astream: 异步流式
async for chunk in chain.astream({"text": "hello"}):
print(chunk, end="", flush=True)
# abatch_as_completed: 异步批量(先完成的先返回)
async for result in chain.abatch_as_completed([
{"text": "first"}, {"text": "second"}
]):
print(f"完成: {result}")
7.2 .assign() 动态添加字段
from langchain_core.runnables import RunnablePassthrough
# 逐步构建丰富的响应结构
chain = (
RunnablePassthrough.assign(
# 第一步:添加基础分析
translated=RunnableLambda(lambda x: translate(x["text"])),
word_count=RunnableLambda(lambda x: len(x["text"].split())),
)
| RunnablePassthrough.assign(
# 第二步:基于已有结果继续分析
summary=RunnableLambda(
lambda x: summarize(x["translated"])
),
is_long_text=RunnableLambda(
lambda x: x["word_count"] > 200
),
)
)
result = chain.invoke({"text": "This is a sample text for demonstration."})
print(result.keys())
# dict_keys(['text', 'translated', 'word_count', 'summary', 'is_long_text'])
7.3 .map() 列表映射
# .map() 对列表中的每个元素应用链
individual_chain = prompt | model | parser
# 批量处理列表
batch_chain = individual_chain.map()
results = batch_chain.invoke([
{"topic": "机器学习"},
{"topic": "深度学习"},
{"topic": "强化学习"},
])
# 等价于手动的 batch 调用
results = individual_chain.batch([
{"topic": "机器学习"},
{"topic": "深度学习"},
{"topic": "强化学习"},
])
7.4 .pick() 字段选择
from langchain_core.runnables import RunnablePassthrough
# .pick() 从输入字典中选择特定字段
chain = (
RunnablePassthrough.pick("question") # 仅提取 question 字段
| retriever
)
# 等价于
chain = RunnableLambda(lambda x: x["question"]) | retriever
# 常用于 RAG
chain = (
{
"context": RunnablePassthrough.pick("user_query") | retriever,
"question": RunnablePassthrough.pick("user_query"),
"language": RunnablePassthrough.pick("target_language"),
}
| prompt
| model
| parser
)
result = chain.invoke({
"user_query": "What is ML?",
"target_language": "Chinese",
"user_id": "123", # 不会传递到下游
"session_id": "abc", # 不会传递到下游
})
7.5 自定义 Runnable
from langchain_core.runnables import Runnable
from typing import Any, Iterator
class TextAnalysisRunnable(Runnable):
"""自定义文本分析 Runnable"""
def __init__(self, min_word_count: int = 10):
self.min_word_count = min_word_count
def invoke(self, input: str, config=None) -> dict:
words = input.split()
return {
"original": input,
"word_count": len(words),
"char_count": len(input),
"is_too_short": len(words) < self.min_word_count,
}
async def ainvoke(self, input: str, config=None) -> dict:
return self.invoke(input, config)
def stream(self, input: str, config=None) -> Iterator[dict]:
yield self.invoke(input, config)
# 直接使用
analyzer = TextAnalysisRunnable(min_word_count=20)
result = analyzer.invoke("This is a short sentence.")
print(result)
# 集成到 LCEL 链
chain = prompt | model | parser | analyzer
7.6 .with_fallbacks() 容错
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
primary = ChatOpenAI(model="gpt-4o")
backup1 = ChatAnthropic(model="claude-sonnet-4-6")
backup2 = ChatOpenAI(model="gpt-4o-mini")
# 自动降级
robust_model = primary.with_fallbacks([backup1, backup2])
chain = prompt | robust_model | parser
# 即使主模型不可用,也能自动切换到备用模型
result = chain.invoke({"input": "解释什么是AI"})
7.7 .with_retry() 自动重试
# 为链添加智能重试
robust_chain = chain.with_retry(
retry_if_exception_type=(TimeoutError, ConnectionError),
wait_exponential_jitter=True, # 指数退避 + 随机抖动
stop_after_attempt=3, # 最多重试3次
wait_exponential_multiplier=1,
wait_exponential_max=30, # 最大等待30秒
)
result = robust_chain.invoke({"input": "hello"})
8. 工具定义与使用
8.1 纯函数定义(v1.0 最简方式)
v1.0 中工具定义极大简化,推荐直接使用纯 Python 函数 + 类型注解 + Docstring:
# 最简工具定义:纯函数 + 类型注解 + Docstring
def get_weather(city: str) -> str:
"""获取指定城市的当前天气状况。当用户询问天气时必须调用此工具。
Args:
city: 城市名称,如 '北京'、'上海'、'深圳'
"""
weather_db = {
"北京": "晴天,25°C,湿度40%,风力2级",
"上海": "多云,28°C,湿度65%,风力3级",
"深圳": "阵雨,30°C,湿度80%,风力1级",
}
return weather_db.get(city, f"未找到{city}的天气数据")
def calculate(expression: str) -> float:
"""执行数学计算。支持基本运算和常用数学函数。
Args:
expression: 有效的数学表达式,如 '2+2'、'sin(pi/2)'、'sqrt(16)*3'
"""
import math
allowed = {"__builtins__": {}, **math.__dict__}
return eval(expression, allowed)
def search_web(query: str) -> str:
"""在互联网上搜索最新信息。当需要实时数据、新闻或事实核查时使用。
Args:
query: 搜索关键词
"""
# 实际项目中对接搜索API
return f"关于'{query}'的搜索结果:..."
# 直接传入 create_agent
from langchain.agents import create_agent
agent = create_agent(
model="openai:gpt-4o",
tools=[get_weather, calculate, search_web],
system_prompt="你是一个全能的助手。",
)
8.2 @tool 装饰器
from langchain_core.tools import tool
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件。在确认用户意图后使用。
Args:
to: 收件人邮箱地址
subject: 邮件主题
body: 邮件正文
"""
# 实际发送逻辑
return f"邮件已发送至 {to},主题:{subject}"
@tool
def query_database(sql: str) -> str:
"""执行SQL查询。仅支持SELECT语句。
Args:
sql: SQL查询语句
"""
if not sql.strip().upper().startswith("SELECT"):
return "错误:仅支持SELECT查询"
# 实际查询逻辑
return f"查询结果:..."
# 查看工具元数据
print(send_email.name) # "send_email"
print(send_email.description) # "发送邮件。在确认用户意图后使用。"
print(send_email.args_schema) # Pydantic model
8.3 StructuredTool (Pydantic Schema)
from pydantic import BaseModel, Field
from langchain_core.tools import StructuredTool
from typing import List
class FlightSearchInput(BaseModel):
"""航班搜索参数"""
origin: str = Field(description="出发城市,如 '北京'、'上海'")
destination: str = Field(description="到达城市")
date: str = Field(description="出发日期,格式 YYYY-MM-DD")
passengers: int = Field(default=1, description="乘客人数")
def search_flights(
origin: str, destination: str, date: str, passengers: int = 1
) -> str:
"""搜索航班信息"""
return (
f"从{origin}到{destination},{date}出发,{passengers}人\n"
f"找到5个航班,最低票价 ¥680"
)
flight_tool = StructuredTool.from_function(
func=search_flights,
name="search_flights",
description="搜索指定日期和航线的航班信息",
args_schema=FlightSearchInput,
)
# 或使用 @tool 装饰器 + args_schema
@tool(args_schema=FlightSearchInput)
def search_flights_v2(
origin: str, destination: str, date: str, passengers: int = 1
) -> str:
"""搜索指定日期和航线的航班信息"""
return f"从{origin}到{destination}的航班搜索结果:..."
8.4 继承 BaseTool 类
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type
class CodeReviewInput(BaseModel):
code: str = Field(description="需要审查的代码")
language: str = Field(description="编程语言")
class CodeReviewTool(BaseTool):
name: str = "code_review"
description: str = "审查代码质量,检查潜在bug、性能问题和安全隐患。"
args_schema: Type[BaseModel] = CodeReviewInput
def _run(self, code: str, language: str, **kwargs) -> str:
# 同步审查逻辑
issues = []
if "eval(" in code or "exec(" in code:
issues.append("⚠️ 安全:使用了危险的 eval/exec")
if "TODO" in code or "FIXME" in code:
issues.append("⚠️ 质量:存在未完成的 TODO/FIXME")
if not issues:
return "✅ 代码审查通过,未发现问题"
return "\n".join(issues)
async def _arun(self, code: str, language: str, **kwargs) -> str:
return self._run(code, language)
8.5 工具错误处理
from langchain_core.tools import ToolException
@tool
def api_caller(endpoint: str) -> str:
"""调用外部API。"""
try:
if not endpoint:
raise ValueError("endpoint 不能为空")
# 模拟API调用
if "timeout" in endpoint:
raise TimeoutError("API请求超时")
return f"API响应: {{'status': 'ok'}}"
except Exception as e:
raise ToolException(f"工具执行失败 [{endpoint}]: {e}")
# 配置错误处理策略
api_caller.handle_tool_error = True # 返回异常消息给模型
# 或自定义错误消息
api_caller.handle_tool_error = (
"工具调用失败,请稍后重试或换一种方式查询。"
)
# 使用 handle_tool_error 回调函数
def error_handler(error: ToolException) -> str:
return f"[错误码: 500] {error}"
api_caller.handle_tool_error = error_handler
8.6 工具组合与内置工具
# 常用内置工具
from langchain_community.tools import (
WikipediaQueryRun,
ArxivQueryRun,
)
from langchain_community.utilities import (
WikipediaAPIWrapper,
ArxivAPIWrapper,
)
wikipedia = WikipediaQueryRun(
api_wrapper=WikipediaAPIWrapper(lang="zh", top_k_results=3)
)
arxiv = ArxivQueryRun(
api_wrapper=ArxivAPIWrapper(top_k_results=5)
)
# Tavily 搜索工具
# pip install tavily-python
from langchain_community.tools.tavily_search import TavilySearchResults
tavily = TavilySearchResults(
max_results=5,
search_depth="advanced", # basic | advanced
)
# 所有工具组合
tools = [
get_weather,
calculate,
search_web,
wikipedia,
arxiv,
tavily,
]
8.7 工具中的流式输出
from langgraph.config import get_stream_writer
def long_running_tool(data_source: str) -> str:
"""处理大量数据,耗时较长,需要实时反馈进度。"""
writer = get_stream_writer()
writer(f"🔍 正在连接数据源: {data_source}...")
# 连接操作...
writer(f"📥 正在下载数据...")
# 下载操作...
writer(f"🔄 正在处理数据...")
# 处理操作...
writer(f"✅ 处理完成!")
return "数据处理结果: ..."
# 使用 stream_mode="custom" 接收进度消息
agent = create_agent(
model="openai:gpt-4o",
tools=[long_running_tool],
)
for mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "处理 sales_data 数据源"}]},
stream_mode=["messages", "custom"],
):
if mode == "custom":
print(f"[进度] {chunk}")
elif mode == "messages":
print(chunk, end="", flush=True)
9. create_agent:统一 Agent 入口
9.1 create_agent 基础
create_agent() 是 v1.0 中创建 Agent 的唯一推荐入口,替代了旧版所有的 Agent 创建方式:
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
# 最简 Agent (~10行代码)
agent = create_agent(
model="openai:gpt-4o",
tools=[get_weather, calculate, search_web],
system_prompt="你是一个全能的AI助手。",
)
# 调用
result = agent.invoke({
"messages": [{"role": "user", "content": "北京今天天气怎么样?"}]
})
# 获取最终回答
final_message = result["messages"][-1]
print(final_message.content)
9.2 create_agent 完整参数
agent = create_agent(
# === 核心参数 ===
model="openai:gpt-4o", # 模型名称 或 ChatModel 实例
tools=[get_weather, calculate], # 工具列表
system_prompt="你是...", # Agent 人设/系统指令
# === 持久化 ===
checkpointer=InMemorySaver(), # LangGraph Checkpointer
# === 中间件 ===
middleware=[ # 中间件列表(洋葱模型)
SummarizationMiddleware(model="openai:gpt-4o-mini"),
ToolRetryMiddleware(max_retries=3),
ModelCallLimitMiddleware(run_limit=10),
],
# === 结构化输出 ===
response_format=ToolStrategy(OutputSchema), # 期望的输出结构
# === 自定义上下文 ===
context_schema=CustomContext, # 自定义运行时上下文
# === 中断控制 ===
interrupt_before=["tools"], # 在执行工具前暂停
interrupt_after=["tools"], # 在执行工具后暂停
)
9.3 Agent 执行循环
create_agent 底层基于 LangGraph,执行循环如下:
User Input
↓
[Middleware: before_agent]
↓
┌─────────────────────────────────┐
│ Agent 循环 │
│ 1. [before_model] → 2. LLM │
│ 3. [after_model] │
│ 4. 需要工具? → [wrap_tool_call] │
│ ↓ 是 │
│ 5. 执行工具 → [after_tool] │
│ ↓ │
│ 6. 返回步骤1 (将工具结果传入) │
│ 7. 不需要工具? → 结束循环 │
└─────────────────────────────────┘
↓
[Middleware: after_agent]
↓
Final Result
9.4 Agent 响应格式详解
result = agent.invoke({
"messages": [
{"role": "user", "content": "帮我查一下北京天气,并计算158*23"}
]
})
# result 的结构
print(result.keys()) # dict_keys(['messages'])
# 遍历所有消息
for i, msg in enumerate(result["messages"]):
print(f"\n[{i}] {type(msg).__name__}")
if hasattr(msg, "content") and msg.content:
print(f" 内容: {msg.content[:100]}...")
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
print(f" 工具: {tc['name']}({tc['args']})")
# 访问结构化响应(如果设置了 response_format)
if "structured_response" in result:
print(f"结构化输出: {result['structured_response']}")
9.5 多轮对话
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
model="openai:gpt-4o",
tools=[get_weather, calculate],
checkpointer=InMemorySaver(),
)
# 使用 thread_id 区分不同会话
config = {"configurable": {"thread_id": "conversation-001"}}
# 第一轮
result = agent.invoke(
{"messages": [{"role": "user", "content": "我叫小明,我在北京。"}]},
config=config,
)
print(result["messages"][-1].content)
# 第二轮 — Agent 记得之前的上下文
result = agent.invoke(
{"messages": [{"role": "user", "content": "我叫什么名字?我在哪个城市?"}]},
config=config,
)
print(result["messages"][-1].content)
# 新会话 — 使用不同的 thread_id
new_config = {"configurable": {"thread_id": "conversation-002"}}
result3 = agent.invoke(
{"messages": [{"role": "user", "content": "我叫什么名字?"}]},
config=new_config,
)
print(result3["messages"][-1].content)
# Agent 会说自己不知道,因为这是一个全新的会话
9.6 多种模型提供商
# OpenAI
agent_openai = create_agent(
model="openai:gpt-4o",
tools=[...],
)
# Anthropic Claude
agent_claude = create_agent(
model="anthropic:claude-sonnet-4-6",
tools=[...],
)
# Google Gemini
agent_gemini = create_agent(
model="google:gemini-2.0-flash",
tools=[...],
)
# DeepSeek
agent_deepseek = create_agent(
model="deepseek:deepseek-chat",
tools=[...],
)
# 直接传入 ChatModel 实例
from langchain_openai import ChatOpenAI
custom_model = ChatOpenAI(model="gpt-4o", temperature=0.3)
agent = create_agent(model=custom_model, tools=[...])
9.7 Agent 与 LCEL 对比
# LCEL 适用场景:固定的线性流程
chain = prompt | model | parser
result = chain.invoke({"input": "hello"})
# create_agent 适用场景:动态决策、工具调用
agent = create_agent(model="openai:gpt-4o", tools=[...])
result = agent.invoke({"messages": [{"role": "user", "content": "..."}]})
# 选择指南:
# - 不需要工具、流程固定 → LCEL
# - 需要工具调用和动态决策 → create_agent
# - 复杂分支/循环/多Agent → LangGraph StateGraph
10. 中间件系统详解
10.1 中间件概念
中间件是 v1.0 最重要的扩展机制,采用洋葱模型在 Agent 生命周期的每个节点注入逻辑:
┌──────────────┐
│ Middleware1 │
│ ┌──────────┐ │
│ │ Mid2 │ │
│ │ ┌──────┐ │ │
│ │ │ Core │ │ │ ← Agent 核心循环
│ │ └──────┘ │ │
│ │ │ │
│ └──────────┘ │
└──────────────┘
10.2 中间件钩子一览
| 钩子 | 类型 | 执行时机 | 典型用途 |
|---|---|---|---|
before_agent |
Node | Agent 启动前,仅一次 | 加载上下文、验证输入 |
before_model |
Node | 每次 LLM 调用前 | 剪裁消息、脱敏、提示注入 |
wrap_model_call |
Wrap | 包裹 LLM 调用 | 重试、降级、动态工具选择 |
after_model |
Node | 每次 LLM 响应后 | 输出校验、安全护栏 |
wrap_tool_call |
Wrap | 包裹工具执行 | 重试、模拟、审批、脱敏 |
after_agent |
Node | Agent 结束后,仅一次 | 保存结果、清理资源 |
10.3 SummarizationMiddleware
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
checkpointer=InMemorySaver(),
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini", # 用于总结的模型(建议用小模型)
max_tokens_before_summary=2000, # 超过此token数自动触发总结
),
],
)
# 长对话时自动压缩历史,保持上下文窗口可控
for i in range(50):
result = agent.invoke(
{"messages": [{"role": "user", "content": f"第{i}轮问题..."}]},
config={"configurable": {"thread_id": "long-chat"}},
)
10.4 ToolRetryMiddleware
from langchain.agents.middleware import ToolRetryMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[api_caller, query_database],
middleware=[
ToolRetryMiddleware(
max_retries=3, # 最大重试次数
backoff_factor=2.0, # 指数退避因子
initial_delay=1.0, # 初始延迟(秒)
max_delay=60.0, # 最大延迟(秒)
jitter=True, # 添加随机抖动
retry_on=[TimeoutError, ConnectionError], # 触发重试的异常
on_failure="continue", # continue | error | fallback
),
],
)
10.5 ModelCallLimitMiddleware / ToolCallLimitMiddleware
from langchain.agents.middleware import (
ModelCallLimitMiddleware,
ToolCallLimitMiddleware,
)
agent = create_agent(
model="openai:gpt-4o",
tools=[get_weather, calculate, search_web],
middleware=[
# 限制每次运行的模型调用次数
ModelCallLimitMiddleware(
run_limit=10, # 单次运行最多10次LLM调用
thread_limit=100, # 同一会话最多100次
exit_behavior="end", # end=优雅结束 | error=抛出异常
),
# 限制特定工具的使用次数
ToolCallLimitMiddleware(
tool_name="search_web",
run_limit=5, # 单次运行最多5次搜索
exit_behavior="end",
),
],
)
10.6 PIIMiddleware (敏感信息脱敏)
from langchain.agents.middleware import PIIMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[send_email, search_web],
middleware=[
# 邮箱地址 → 自动脱敏
PIIMiddleware(
pii_type="email",
strategy="redact", # redact | mask | block | hash
apply_to_input=True, # 对用户输入脱敏
apply_to_output=True, # 对模型输出脱敏
apply_to_tool_results=True, # 对工具结果脱敏
),
# 手机号 → 自动脱敏
PIIMiddleware(
pii_type="phone",
strategy="mask", # 如 138****1234
),
# 信用卡号 → 自动拦截
PIIMiddleware(
pii_type="credit_card",
strategy="block", # 直接阻止请求
),
# 自定义正则
PIIMiddleware(
pii_type="custom",
strategy="redact",
detector=r"身份证号:\s*\d{17}[\dXx]",
),
],
)
10.7 HumanInTheLoopMiddleware
from langchain.agents.middleware import HumanInTheLoopMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[send_email, transfer_money, delete_record],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
# 发邮件 → 需要审批
"send_email": {
"allowed_decisions": ["approve", "edit", "reject"],
},
# 转账 → 需要审批
"transfer_money": {
"allowed_decisions": ["approve", "reject"],
},
},
),
],
)
# Agent 执行到 send_email 时会暂停,等待人工决策
# 同意:
agent.invoke(
Command(resume={"decision": "approve"}),
config=config,
)
# 拒绝:
agent.invoke(
Command(resume={"decision": "reject"}),
config=config,
)
10.8 ModelFallbackMiddleware
from langchain.agents.middleware import ModelFallbackMiddleware
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
ModelFallbackMiddleware(
ChatOpenAI(model="gpt-4o"), # 主模型
ChatAnthropic(model="claude-sonnet-4-6"), # 第一降级
ChatOpenAI(model="gpt-4o-mini"), # 第二降级
),
],
)
10.9 ContextEditingMiddleware
from langchain.agents.middleware import ContextEditingMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
ContextEditingMiddleware(
edits=[
# 自动清理旧的工具调用,避免上下文污染
ClearToolUsesEdit(),
],
trigger=50_000, # 超过 50K tokens 触发
keep=3, # 保留最近 3 条消息
),
],
)
10.10 自定义中间件
from langchain.agents.middleware import (
AgentMiddleware,
before_model,
after_model,
wrap_tool_call,
)
from typing import Any
# 方式一:类继承(完整控制)
class LoggingMiddleware(AgentMiddleware):
"""记录 Agent 所有操作的自定义中间件"""
def before_model(self, state, runtime):
print(f"[{self.name}] 即将调用LLM,当前消息数: {len(state.messages)}")
return None # 不修改状态
def after_model(self, state, runtime):
last_msg = state.messages[-1]
print(f"[{self.name}] LLM响应完成: {last_msg.content[:100]}...")
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
print(f"[{self.name}] 请求调用 {len(last_msg.tool_calls)} 个工具")
return None
def wrap_tool_call(self, request, handler):
print(f"[{self.name}] 调用工具: {request.tool_call['name']}")
result = handler(request) # 执行原始工具
print(f"[{self.name}] 工具返回: {str(result)[:100]}...")
return result
# 方式二:装饰器(简化,单个钩子)
@before_model
def inject_user_context(state, runtime):
"""在每次LLM调用前注入用户上下文"""
user_context = load_user_context(runtime.config.get("user_id"))
if user_context:
state.messages.insert(
0, {"role": "system", "content": f"用户信息: {user_context}"}
)
return None
@wrap_tool_call
def retry_on_failure(request, handler):
"""自定义工具重试逻辑"""
for attempt in range(3):
try:
return handler(request)
except Exception as e:
if attempt == 2:
raise
print(f"重试 {attempt+1}/3: {e}")
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
LoggingMiddleware(),
inject_user_context,
retry_on_failure,
],
)
10.11 中间件执行顺序
# before_* 钩子:按注册顺序执行(第一个先执行)
# after_* 钩子:按注册逆序执行(最后一个先执行)
# wrap_* 钩子:嵌套执行 — 第一个中间件包裹所有后续
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
middleware=[
M1, # before: 最先 | wrap: 最外层 | after: 最后
M2, # before: 第二 | wrap: 中间层 | after: 第二
M3, # before: 最后 | wrap: 最内层 | after: 最先
],
)
# 执行顺序:
# M1.before_model → M2.before_model → M3.before_model
# → M1.wrap → M2.wrap → M3.wrap → 实际LLM调用
# → M3.wrap返回 → M2.wrap返回 → M1.wrap返回
# → M3.after_model → M2.after_model → M1.after_model
11. 文档加载器
11.1 TextLoader
from langchain_community.document_loaders import TextLoader
loader = TextLoader("document.txt", encoding="utf-8")
documents = loader.load()
for doc in documents:
print(f"来源: {doc.metadata['source']}")
print(f"内容预览: {doc.page_content[:200]}...")
11.2 PyPDFLoader
# pip install pypdf
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader("report.pdf")
documents = loader.load()
print(f"共 {len(documents)} 页")
for i, page in enumerate(documents):
print(f"\n第{i+1}页: {page.page_content[:200]}...")
print(f" 元数据: {page.metadata}")
# 异步加载
docs = await loader.aload()
11.3 WebBaseLoader
from langchain_community.document_loaders import WebBaseLoader
# 单个网页
loader = WebBaseLoader("https://docs.langchain.com/")
docs = loader.load()
# 多个网页(并发加载)
urls = [
"https://docs.langchain.com/oss/python/langchain/overview",
"https://docs.langchain.com/oss/python/langgraph/overview",
]
loader = WebBaseLoader(
urls,
requests_per_second=2, # 限速
continue_on_failure=True, # 某个失败不影响其他
)
docs = loader.load()
print(f"加载了 {len(docs)} 个页面")
11.4 CSV/JSON 加载
# CSV
from langchain_community.document_loaders import CSVLoader
loader = CSVLoader(
file_path="data.csv",
encoding="utf-8",
source_column="title",
)
docs = loader.load()
# JSON
from langchain_community.document_loaders import JSONLoader
loader = JSONLoader(
file_path="data.json",
jq_schema=".messages[].content", # jq 语法提取字段
text_content=True,
)
docs = loader.load()
11.5 DirectoryLoader
from langchain_community.document_loaders import (
DirectoryLoader,
TextLoader,
PythonLoader,
)
# 加载目录中所有 .py 文件
loader = DirectoryLoader(
path="./src/",
glob="**/*.py",
loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"},
show_progress=True,
use_multithreading=True,
)
docs = loader.load()
print(f"共加载 {len(docs)} 个文件")
11.6 数据库加载
from langchain_community.document_loaders import DatabaseLoader
loader = DatabaseLoader(
engine_connection_string="postgresql://user:pass@localhost/db",
query="""
SELECT id, title, content, created_at
FROM articles
WHERE status = 'published'
ORDER BY created_at DESC
""",
)
docs = loader.load()
11.7 Document 对象
from langchain_core.documents import Document
# 创建
doc = Document(
page_content="文档的完整文本内容...",
metadata={
"source": "handbook.pdf",
"page": 42,
"author": "张三",
"created_at": "2026-01-15",
"tags": ["tutorial", "python"],
},
)
# 常用操作
print(doc.page_content[:200])
print(doc.metadata["source"])
# JSON 序列化
json_str = doc.to_json()
restored = Document.from_json(json_str)
12. 文本分割器
12.1 RecursiveCharacterTextSplitter (最常用)
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每块最大字符数
chunk_overlap=50, # 块间重叠字符数
length_function=len, # 长度计算函数
separators=[ # 分割符优先级(由粗到细)
"\n\n", "\n", "。", ";", ",", " ", ""
],
add_start_index=True, # 在元数据中添加起始索引
)
# 对文本分割
chunks = splitter.split_text(long_text)
# 对 Document 列表分割
split_docs = splitter.split_documents(documents)
for i, doc in enumerate(split_docs[:5]):
print(f"\n--- Chunk {i+1} ({len(doc.page_content)} 字符) ---")
print(doc.page_content[:150])
print(f" 起始位置: {doc.metadata.get('start_index')}")
12.2 TokenTextSplitter
from langchain_text_splitters import TokenTextSplitter
splitter = TokenTextSplitter(
chunk_size=200, # token 数
chunk_overlap=30, # 重叠 token 数
encoding_name="cl100k_base", # GPT-4 编码
# 或直接指定模型:
# model_name="gpt-4o",
)
chunks = splitter.split_text(long_text)
12.3 代码分割器
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
Language,
)
# Python 代码分割
python_splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON,
chunk_size=500,
chunk_overlap=50,
)
python_code = """
class DatabaseService:
def __init__(self, connection_string: str):
self.conn = create_engine(connection_string)
def get_users(self, limit: int = 100) -> List[User]:
with self.conn.connect() as session:
return session.query(User).limit(limit).all()
def create_user(self, data: dict) -> User:
user = User(**data)
with self.conn.connect() as session:
session.add(user)
session.commit()
return user
"""
chunks = python_splitter.split_text(python_code)
for i, chunk in enumerate(chunks):
print(f"Chunk {i+1}: {len(chunk)} chars")
print(chunk[:100])
print("---")
# 支持的语言
# Language.PYTHON, Language.JS, Language.TS, Language.GO,
# Language.RUST, Language.JAVA, Language.CPP, Language.HTML, 等
12.4 SemanticChunker
from langchain_text_splitters import SemanticChunker
from langchain_openai import OpenAIEmbeddings
# 基于嵌入相似度的智能分割
splitter = SemanticChunker(
embeddings=OpenAIEmbeddings(),
breakpoint_threshold_type="percentile", # percentile | standard_deviation | interquartile
breakpoint_threshold_amount=90,
)
chunks = splitter.split_text(long_text)
# 语义相近的句子会放在同一个chunk中
12.5 HTML 分割器
from langchain_text_splitters import HTMLHeaderTextSplitter
html_splitter = HTMLHeaderTextSplitter(
headers_to_split_on=[
("h1", "一级标题"),
("h2", "二级标题"),
("h3", "三级标题"),
],
)
html_docs = html_splitter.split_text_from_file("page.html")
# 结合 RecursiveCharacterTextSplitter 进一步分割
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=30)
final_docs = text_splitter.split_documents(html_docs)
12.6 中文分割最佳实践
def create_chinese_splitter(chunk_size=500, chunk_overlap=50):
"""针对中文优化的分割器"""
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=[
"\n\n", # 段落
"\n", # 换行
"。", # 中文句号
";", # 中文分号
"!", "?", # 感叹号、问号
",", # 中文逗号
"、", # 顿号
" ", # 空格
"", # 字符级
],
)
13. 嵌入与向量存储
13.1 文本嵌入
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small", # 高性价比
# model="text-embedding-3-large", # 更高精度
dimensions=1536, # 可选降维
)
# 嵌入单段文本
text = "LangChain 是一个用于构建LLM应用的框架。"
vector = embeddings.embed_query(text)
print(f"向量维度: {len(vector)}")
# 嵌入多段文本
texts = [
"LangChain 用于LLM应用开发。",
"PyTorch 是深度学习框架。",
"FastAPI 用于构建Web API。",
]
vectors = embeddings.embed_documents(texts)
print(f"生成 {len(vectors)} 个向量,每个维度 {len(vectors[0])}")
# 相似度计算
import numpy as np
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
v1 = embeddings.embed_query("机器学习")
v2 = embeddings.embed_query("深度学习")
v3 = embeddings.embed_query("今天天气很好")
print(f"ML vs DL: {cosine_similarity(v1, v2):.4f}") # 高 (0.85+)
print(f"ML vs 天气: {cosine_similarity(v1, v3):.4f}") # 低 (<0.5)
13.2 Chroma 向量数据库
# pip install chromadb langchain-chroma
from langchain_chroma import Chroma
# 从文档创建
vectorstore = Chroma.from_documents(
documents=split_docs,
embedding=embeddings,
persist_directory="./chroma_db",
collection_name="knowledge_base",
)
# 从文本创建
vectorstore = Chroma.from_texts(
texts=["文本1", "文本2", "文本3"],
embedding=embeddings,
metadatas=[
{"source": "doc1", "page": 1},
{"source": "doc2", "page": 2},
{"source": "doc3", "page": 3},
],
)
# 持久化(自动)
# 下次使用直接加载
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings,
collection_name="knowledge_base",
)
13.3 向量检索方法
# 1. 基础相似性搜索
docs = vectorstore.similarity_search("什么是机器学习?", k=3)
# 2. 带分数的搜索
results = vectorstore.similarity_search_with_score("查询文本", k=5)
for doc, score in results:
print(f"分数: {score:.4f} | 内容: {doc.page_content[:80]}")
# 3. 使用向量搜索
query_vector = embeddings.embed_query("查询文本")
docs = vectorstore.similarity_search_by_vector(query_vector, k=5)
# 4. MMR 多样性搜索(平衡相似性和多样性)
docs = vectorstore.max_marginal_relevance_search(
"查询文本",
k=5, # 最终返回数
fetch_k=20, # 先取20个候选
lambda_mult=0.5, # 0=最大多样性, 1=最大相似性
)
# 5. 带元数据过滤的搜索
docs = vectorstore.similarity_search(
"查询文本",
k=5,
filter={"source": "handbook.pdf", "page": {"$gte": 10}},
)
13.4 FAISS
# pip install faiss-cpu (CPU版)
# pip install faiss-gpu (GPU版)
from langchain_community.vectorstores import FAISS
# 创建
vectorstore = FAISS.from_documents(split_docs, embeddings)
vectorstore.save_local("./faiss_index")
# 加载
vectorstore = FAISS.load_local(
"./faiss_index",
embeddings,
allow_dangerous_deserialization=True, # 仅加载可信来源
)
# 合并
other_store = FAISS.from_documents(other_docs, embeddings)
vectorstore.merge_from(other_store)
13.5 Retriever 接口
# as_retriever — 将 VectorStore 转换为标准 Retriever 接口
retriever = vectorstore.as_retriever(
search_type="similarity", # similarity | mmr | similarity_score_threshold
search_kwargs={
"k": 5,
# "score_threshold": 0.7, # 仅用于 similarity_score_threshold
# "fetch_k": 20, # 仅用于 mmr
# "lambda_mult": 0.5, # 仅用于 mmr
# "filter": {"source": "doc1"}, # 元数据过滤
},
)
# Retriever 实现了 Runnable 接口
docs = retriever.invoke("查询文本")
# 直接用于 LCEL
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| model
| parser
)
14. RAG 检索增强生成
14.1 完整 RAG 管道
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# 1. 加载文档
loader = TextLoader("knowledge.txt", encoding="utf-8")
docs = loader.load()
# 2. 分割
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(docs)
# 3. 创建向量存储
vectorstore = Chroma.from_documents(chunks, OpenAIEmbeddings())
# 4. 创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# 5. 构建提示
template = """根据以下上下文回答问题。如果上下文中没有答案,请说"无法从资料中找到答案"。
上下文:
{context}
问题: {question}
请提供详细且准确的回答:"""
prompt = ChatPromptTemplate.from_template(template)
# 6. 创建模型
model = ChatOpenAI(model="gpt-4o", temperature=0)
# 7. 构建 RAG 链
def format_docs(docs):
return "\n\n---\n\n".join(
f"[{i+1}] {d.page_content}"
for i, d in enumerate(docs)
)
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
# 8. 查询
answer = rag_chain.invoke("什么是机器学习?")
print(answer)
14.2 带来源引用的 RAG
template = """根据以下带编号的上下文回答问题。在回答中标注引用来源,格式为 [来源X]。
上下文:
{context}
问题: {question}
详细回答(请标注 [来源X]):"""
prompt = ChatPromptTemplate.from_template(template)
# 返回完整结构
def rag_with_sources(question: str):
docs = retriever.invoke(question)
context = "\n\n---\n\n".join(
f"[来源{i+1}] (来自: {d.metadata.get('source', '未知')})\n{d.page_content}"
for i, d in enumerate(docs)
)
answer = (prompt | model | StrOutputParser()).invoke({
"context": context, "question": question
})
return {
"question": question,
"answer": answer,
"sources": [
{
"index": i+1,
"content": d.page_content[:300],
"metadata": d.metadata,
}
for i, d in enumerate(docs)
],
}
result = rag_with_sources("解释深度学习的原理")
print(f"回答: {result['answer']}\n")
print("参考来源:")
for src in result["sources"]:
print(f" [{src['index']}] {src['metadata'].get('source')} - {src['content'][:60]}...")
14.3 对话式 RAG
from langchain_core.prompts import MessagesPlaceholder
# 步骤1:将用户问题上下文化
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "根据聊天历史,将用户的问题改写为一个独立完整的问题。"),
MessagesPlaceholder("chat_history"),
("human", "{question}"),
])
contextualize_chain = contextualize_prompt | model | StrOutputParser()
# 步骤2:RAG 问答
qa_prompt = ChatPromptTemplate.from_messages([
("system", "根据上下文回答问题。\n\n上下文:\n{context}"),
MessagesPlaceholder("chat_history"),
("human", "{question}"),
])
# 完整链
def build_conversational_rag():
def _contextualize(inputs):
if inputs.get("chat_history"):
return contextualize_chain.invoke(inputs)
return inputs["question"]
return (
RunnablePassthrough.assign(
context=lambda x: retriever.invoke(_contextualize(x)) | format_docs
)
| qa_prompt
| model
| StrOutputParser()
)
conv_rag = build_conversational_rag()
# 多轮对话
from langchain_core.messages import HumanMessage, AIMessage
history = []
q1 = "什么是Python?"
a1 = conv_rag.invoke({"question": q1, "chat_history": history})
history.extend([HumanMessage(content=q1), AIMessage(content=a1)])
q2 = "它有什么优缺点?" # "它"指Python
a2 = conv_rag.invoke({"question": q2, "chat_history": history})
print(a2)
14.4 create_agent RAG
# 使用 create_agent 构建 RAG Agent
from langchain.agents import create_agent
from langchain.tools import tool
@tool
def search_knowledge_base(query: str) -> str:
"""搜索内部知识库。当需要查找文档、手册或内部资料时使用。"""
docs = retriever.invoke(query)
if not docs:
return "未找到相关文档"
return "\n\n".join(
f"[{d.metadata.get('source')}]\n{d.page_content}"
for d in docs
)
agent = create_agent(
model="openai:gpt-4o",
tools=[search_knowledge_base],
system_prompt="""你是一个基于知识库的问答助手。
- 始终先搜索知识库再回答
- 引用来源标注
- 如果知识库没有相关信息,请明确说明""",
)
result = agent.invoke({
"messages": [{"role": "user", "content": "公司的远程办公政策是什么?"}]
})
15. 高级 RAG 策略
15.1 多查询检索 (Multi-Query)
from langchain.retrievers import MultiQueryRetriever
from langchain_openai import ChatOpenAI
# 自动生成多个不同角度的查询,合并检索结果
multi_retriever = MultiQueryRetriever.from_llm(
llm=ChatOpenAI(model="gpt-4o", temperature=0.5),
retriever=base_retriever,
num_queries=3, # 生成3个角度的查询
)
# 原查询:"如何提高代码质量?"
# 自动生成:
# "代码质量改进的最佳实践有哪些?"
# "如何通过代码审查和测试提高代码质量?"
# "代码质量的衡量标准和改进方法?"
docs = multi_retriever.invoke("如何提高代码质量?")
15.2 自查询检索 (Self-Query)
from langchain.retrievers import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
# 定义文档的元数据字段
metadata_field_info = [
AttributeInfo(name="source", description="文档来源", type="string"),
AttributeInfo(name="date", description="发布日期", type="string"),
AttributeInfo(name="author", description="作者", type="string"),
AttributeInfo(name="category", description="分类", type="string"),
]
self_query_retriever = SelfQueryRetriever.from_llm(
llm=model,
vectorstore=vectorstore,
document_contents="技术文档和教程合集",
metadata_field_info=metadata_field_info,
)
# 查询:"2025年后张三写的Python分类的文档"
# 自动解析为 filter + query,精准检索
docs = self_query_retriever.invoke("2025年后张三写的Python分类的文档")
15.3 父文档检索器
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
# 用小块检索,返回完整大块上下文
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
store = InMemoryStore()
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
search_kwargs={"k": 3},
)
retriever.add_documents(documents)
# 返回完整的父文档(2000字符),而不是小片段(400字符)
docs = retriever.invoke("什么是机器学习?")
15.4 上下文压缩
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# 用LLM提取文档中与查询真正相关的部分
compressor = LLMChainExtractor.from_llm(model)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=retriever,
)
# 检索结果被自动压缩,去除无关内容
compressed = compression_retriever.invoke("Python的GIL是什么?")
for doc in compressed:
print(f"[压缩后 {len(doc.page_content)} 字符] {doc.page_content[:150]}...")
15.5 Ensemble 集成检索
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# BM25 关键词检索器
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 5
# 语义检索器
semantic_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# 集成两者
ensemble = EnsembleRetriever(
retrievers=[bm25_retriever, semantic_retriever],
weights=[0.3, 0.7], # BM25 30% + 语义 70%
)
docs = ensemble.invoke("Python编程基础教程")
15.6 HyDE (假设文档嵌入)
# HyDE: 先生成假设性答案,再用答案向量去检索
hyde_prompt = ChatPromptTemplate.from_template(
"请写一段回答以下问题的短文(内容不需要完全准确,目的是用于向量检索):\n{question}"
)
hyde_chain = hyde_prompt | model | StrOutputParser()
def hyde_retrieve(question: str, k: int = 5):
# 1. 生成假设答案
hypothetical_doc = hyde_chain.invoke({"question": question})
# 2. 用假设答案的向量进行检索
docs = vectorstore.similarity_search(hypothetical_doc, k=k)
return docs
# 适用于问句和文档表述差异大的场景
docs = hyde_retrieve("怎么让Python跑得更快?")
15.7 RAG-Fusion
def rag_fusion(question: str, num_queries: int = 4, top_k: int = 5):
"""RAG-Fusion: 多查询 + 倒数排名融合"""
# 1. 生成多个相关查询
query_prompt = ChatPromptTemplate.from_template(
"为以下问题生成{num}个不同角度的搜索查询,每行一个:\n{question}"
)
queries_str = (query_prompt | model | StrOutputParser()).invoke({
"question": question, "num": num_queries,
})
queries = [q.strip() for q in queries_str.split("\n") if q.strip()]
# 2. 为每个查询检索
all_results = []
for q in queries:
docs = retriever.invoke(q)
all_results.append(docs)
# 3. 倒数排名融合 (RRF)
fused = {}
for doc_list in all_results:
for rank, doc in enumerate(doc_list):
key = doc.page_content[:200] # 简化key
if key not in fused:
fused[key] = {"doc": doc, "score": 0}
fused[key]["score"] += 1 / (rank + 60) # RRF公式
# 4. 排序返回
ranked = sorted(fused.values(), key=lambda x: x["score"], reverse=True)
return [item["doc"] for item in ranked[:top_k]]
15.8 Step-Back 检索
# 生成更通用的"后退"问题辅助检索
step_back_prompt = ChatPromptTemplate.from_template(
"针对以下具体问题,生成一个更通用、更高层次的'后退'问题,"
"用于检索背景知识:\n具体问题: {question}\n后退问题:"
)
step_back_chain = step_back_prompt | model | StrOutputParser()
def step_back_retrieve(question: str, k: int = 3):
back_question = step_back_chain.invoke({"question": question})
# 原问题 + 后退问题分别检索
original_docs = retriever.invoke(question)
back_docs = retriever.invoke(back_question)
# 合并去重
seen = set()
merged = []
for doc in original_docs + back_docs:
key = doc.page_content[:200]
if key not in seen:
seen.add(key)
merged.append(doc)
return merged[:k]
# 示例:
# 具体问题:"如何强制重启iPhone 15?"
# 后退问题:"移动设备故障排除的通用方法是什么?"
16. LangGraph 状态图基础
16.1 概述
LangGraph 是 LangChain 生态中的有状态图编排框架。v1.0 中 create_agent 底层就是 LangGraph。
- StateGraph: 将工作流建模为节点(Nodes)和边(Edges)组成的状态机
- Checkpointer: 在每个节点后自动保存状态快照
- Streaming: 支持多种流式模式
- HITL: 原生支持人工干预
16.2 第一个 StateGraph
from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage
# 1. 定义状态
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
# 2. 定义节点函数
def chatbot(state: AgentState):
"""LLM节点:调用模型"""
response = model.invoke(state["messages"])
return {"messages": [response]}
# 3. 构建图
graph = StateGraph(AgentState)
graph.add_node("chatbot", chatbot)
graph.set_entry_point("chatbot")
graph.add_edge("chatbot", END)
# 4. 编译
app = graph.compile()
# 5. 运行
result = app.invoke({
"messages": [HumanMessage(content="你好!")]
})
print(result["messages"][-1].content)
16.3 Agent + 工具执行图
from langchain_core.messages import ToolMessage
from langgraph.graph import StateGraph, END
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
# 绑定工具的模型
model_with_tools = model.bind_tools([get_weather, calculate])
def agent_node(state: AgentState):
"""Agent决策节点"""
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}
def tool_executor(state: AgentState):
"""工具执行节点"""
last_msg = state["messages"][-1]
results = []
for tc in last_msg.tool_calls:
# 查找并执行工具
for tool in [get_weather, calculate]:
if tool.name == tc["name"]:
output = tool.invoke(tc["args"])
break
results.append(ToolMessage(
content=str(output),
tool_call_id=tc["id"],
))
return {"messages": results}
def should_continue(state: AgentState) -> str:
"""路由决策"""
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END
# 构建图
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_executor)
workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue, {
"tools": "tools",
END: END,
})
workflow.add_edge("tools", "agent")
app = workflow.compile()
# 运行
result = app.invoke({
"messages": [HumanMessage(content="北京天气如何?另外算一下156*23")]
})
16.4 Checkpointer 持久化
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# 开发测试
checkpointer = InMemorySaver()
# 生产环境(SQLite)
import sqlite3
conn = sqlite3.connect("agent_state.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
# 编译时传入
app = workflow.compile(checkpointer=checkpointer)
# 使用 thread_id 隔离会话
config = {"configurable": {"thread_id": "user-001"}}
# 多轮对话 — 状态自动持久化
app.invoke({"messages": [HumanMessage(content="记住:我喜欢Python")]}, config)
app.invoke({"messages": [HumanMessage(content="我刚才说了我喜欢什么?")]}, config)
# Agent 能回答 "你喜欢Python"
16.5 时间旅行
# 查看历史状态
history = list(app.get_state_history(config))
# 查看最新的状态
current_state = app.get_state(config)
print(f"当前步骤: {current_state.values}")
# 回滚到之前某个状态
target_state = history[3] # 回滚3步
app.update_state(
config,
target_state.values,
)
# 从那个状态继续执行(产生新分支)
result = app.invoke(None, config)
16.6 自定义流式模式
# LangGraph 支持5种流式模式
for mode, chunk in app.stream(
{"messages": [HumanMessage(content="帮我查天气")]},
stream_mode=["updates", "messages", "custom"],
config=config,
):
if mode == "updates":
print(f"[状态更新] 节点: {list(chunk.keys())}")
elif mode == "messages":
msg, metadata = chunk
print(f"[消息] {metadata['langgraph_node']}: {msg.content}", end="")
elif mode == "custom":
print(f"[自定义] {chunk}")
17. 多 Agent 协作与 Supervisor 模式
17.1 create_supervisor 基础
# pip install langgraph-supervisor
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
# 创建专业 Agent
math_agent = create_react_agent(
model="openai:gpt-4o",
tools=[add, multiply, divide],
name="math_expert",
)
research_agent = create_react_agent(
model="openai:gpt-4o",
tools=[search_web, wikipedia],
name="research_expert",
)
writer_agent = create_react_agent(
model="openai:gpt-4o",
tools=[],
name="writer",
)
# 创建 Supervisor
supervisor = create_supervisor(
agents=[research_agent, math_agent, writer_agent],
model=ChatOpenAI(model="gpt-4o"),
prompt=(
"你是一个团队主管。将任务分配给最合适的专家。"
"当任务完成时,综合各专家的结果给出最终答案。"
),
output_mode="last_message",
)
# 编译运行
app = supervisor.compile()
result = app.invoke({
"messages": [{
"role": "user",
"content": "研究AI最新趋势,分析关键数据,写一份200字的简报。"
}]
})
17.2 Supervisor 高级配置
supervisor = create_supervisor(
agents=[research_agent, math_agent, writer_agent],
model=ChatOpenAI(model="gpt-4o"),
prompt="你是团队主管...",
# 并行调用:允许同时分配给多个Agent
parallel_tool_calls=True,
# 输出模式
output_mode="last_message", # last_message | full_history
# 自定义 handoff 工具前缀
handoff_tool_prefix="transfer_to_",
# 在 Supervisor 模型调用前后插入逻辑
pre_model_hook=lambda state, runtime: print("Supervisor 思考中..."),
post_model_hook=lambda state, runtime: print("Supervisor 决策完成"),
# Agent 名称展示方式
include_agent_name="inline", # 在消息内容中嵌入Agent名称
)
app = supervisor.compile(checkpointer=InMemorySaver())
17.3 自定义多 Agent 工作流
from langgraph.graph import StateGraph, END
class MultiAgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
next_agent: str
task_result: dict
# 创建各 Agent
researcher = create_react_agent(model, [search_web], name="researcher")
analyst = create_react_agent(model, [calculate], name="analyst")
def research_node(state):
result = researcher.invoke({"messages": state["messages"]})
return {"messages": result["messages"], "next_agent": "analyst"}
def analyst_node(state):
result = analyst.invoke({"messages": state["messages"]})
return {"messages": result["messages"], "next_agent": END}
def quality_check(state):
"""质量检查:如果不满意,退回研究员重新处理"""
last_msg = state["messages"][-1]
if "需要更多信息" in last_msg.content:
return "research"
return END
# 构建工作流
workflow = StateGraph(MultiAgentState)
workflow.add_node("research", research_node)
workflow.add_node("analyst", analyst_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "analyst")
workflow.add_conditional_edges("analyst", quality_check, {
"research": "research",
END: END,
})
app = workflow.compile()
17.4 Swarm 模式 (手拉手)
# Swarm: Agent之间直接互传,无中心Supervisor
def create_swarm_agent(name, tools, handoff_targets):
system_prompt = f"""你是{name}。你负责{name}相关的任务。
如果遇到超出你能力范围的问题,使用 handoff 工具转给对应的专家。"""
# 创建 handoff 工具
handoff_tools = []
for target in handoff_targets:
def make_handoff(target_name):
@tool
def handoff(reason: str) -> str:
"""将任务转交给其他专家。"""
return f"任务已转交给{target_name}。原因: {reason}"
handoff.name = f"transfer_to_{target_name}"
return handoff
handoff_tools.append(make_handoff(target))
return create_react_agent(
model="openai:gpt-4o",
tools=tools + handoff_tools,
name=name,
state_modifier=system_prompt,
)
17.5 多 Agent 模式对比
| 模式 | 协调方式 | 适合场景 | Agent数量 |
|---|---|---|---|
| Supervisor | 中心LLM路由 | 任务明确可分派 | 3-7个 |
| Swarm | Agent间直接handoff | 动态协作 | 不限 |
| Hierarchical | 嵌套Supervisor | 复杂组织结构 | 10+ |
| Sequential | 固定流水线 | 线性处理流程 | 2-5个 |
18. 记忆与持久化
18.1 v1.0 记忆策略总览
v1.0 中 Agent 本身无状态。记忆通过 LangGraph Checkpointer 实现:
| 策略 | 原理 | 优缺点 |
|---|---|---|
| 全量记忆 | 完整保存所有消息 | 零信息损失,但Token爆炸 |
| 摘要记忆 | SummarizationMiddleware自动压缩 | Token稳定,细节可能丢失 |
| 窗口记忆 | 只保留最近N条消息 | 实现简单,丢失早期上下文 |
| 向量记忆 | 存入向量库,检索注入 | 支持跨会话长期记忆 |
18.2 Checkpointer 后端
# 1. 内存(开发测试)
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
# 2. SQLite(轻量生产)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("sqlite:///./agent.db")
# 3. Postgres(生产推荐)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/agentdb"
)
# 4. MongoDB
from langgraph.checkpoint.mongodb import MongoDBSaver
checkpointer = MongoDBSaver.from_conn_string(
"mongodb://localhost:27017/agentdb"
)
18.3 Agent + Checkpointer 完整示例
from langchain.agents import create_agent
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("sqlite:///./agent_memory.db")
agent = create_agent(
model="openai:gpt-4o",
tools=[get_weather, search_web],
checkpointer=checkpointer,
system_prompt="你是一个有记忆的AI助手。记得用户之前说过的话。",
)
# 用户首次会话
config_user1 = {"configurable": {"thread_id": "user-alice"}}
result = agent.invoke(
{"messages": [{"role": "user", "content": "我叫Alice,我在上海。"}]},
config=config_user1,
)
# 同一用户稍后回来 — Agent 记得一切
result = agent.invoke(
{"messages": [{"role": "user", "content": "我叫什么名字?我在哪个城市?"}]},
config=config_user1,
)
print(result["messages"][-1].content)
# "你叫Alice,你在上海。"
# 不同用户 — 完全隔离
config_user2 = {"configurable": {"thread_id": "user-bob"}}
result = agent.invoke(
{"messages": [{"role": "user", "content": "我之前说了什么?"}]},
config=config_user2,
)
# "这是我们第一次对话,你没有告诉我任何信息。"
18.4 摘要记忆 (SummarizationMiddleware)
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[...],
checkpointer=InMemorySaver(),
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini",
max_tokens_before_summary=2000, # 超过2000 tokens自动总结
messages_to_keep=5, # 保留最近5条原始消息
),
],
)
# 长对话自动管理 — 旧消息被总结,新消息保持完整
18.5 Long-Term Memory (Store)
# Checkpointer = 短期记忆(同一thread内的对话历史)
# Store = 长期记忆(跨thread持久化知识)
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# 在 agent 节点中存取长期记忆
def node_with_memory(state, config, *, store):
user_id = config["configurable"]["user_id"]
# 读取用户记忆
memories = store.search(("users", user_id))
# 处理逻辑...
# 保存新记忆
store.put(
("users", user_id, "preferences"),
"preferences",
{"language": "Chinese", "expertise": "beginner"},
)
18.6 记忆管理最佳实践
class MemoryManager:
"""统一管理短期/长期记忆"""
def __init__(self, checkpointer, store):
self.checkpointer = checkpointer
self.store = store
def get_conversation_history(self, thread_id: str):
"""获取会话历史(短期记忆)"""
config = {"configurable": {"thread_id": thread_id}}
state = self.checkpointer.get(config)
return state.values["messages"] if state else []
def get_user_memories(self, user_id: str):
"""获取用户长期记忆"""
return self.store.search(("users", user_id))
def save_user_fact(self, user_id: str, key: str, value):
"""保存用户事实"""
self.store.put(
("users", user_id, "facts"),
key, value
)
def build_context(self, thread_id: str, user_id: str):
"""构建完整的上下文"""
history = self.get_conversation_history(thread_id)
memories = self.get_user_memories(user_id)
context = {
"conversation_history": history[-10:], # 最近10条
"user_memories": memories,
}
return context
19. 流式输出详解
19.1 LCEL 流式
# 同步流式
chain = prompt | model | parser
for chunk in chain.stream({"text": "请介绍人工智能的发展历程"}):
print(chunk, end="", flush=True)
# 异步流式
async def stream_chain():
async for chunk in chain.astream({"text": "介绍深度学习"}):
print(chunk, end="", flush=True)
import asyncio
asyncio.run(stream_chain())
19.2 Agent 流式模式
# Agent 支持5种流式模式
agent = create_agent(model="openai:gpt-4o", tools=[get_weather, calculate])
# 1. messages 模式 — 逐token流式 + 元数据
for mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "北京天气如何?"}]},
stream_mode=["messages"],
):
msg, metadata = chunk
node = metadata["langgraph_node"]
if hasattr(msg, "content") and msg.content:
print(f"[{node}] {msg.content}", end="", flush=True)
# 2. updates 模式 — 状态更新
for mode, chunk in agent.stream(
{"messages": [...]},
stream_mode=["updates"],
):
print(f"节点更新: {chunk}")
# 3. custom 模式 — 工具自定义流式
for mode, chunk in agent.stream(
{"messages": [...]},
stream_mode=["custom"],
):
print(f"[工具进度] {chunk}")
# 4. 组合多种模式
for mode, chunk in agent.stream(
{"messages": [...]},
stream_mode=["messages", "updates", "custom"],
):
if mode == "messages":
msg, meta = chunk
print(msg.content, end="", flush=True)
elif mode == "updates":
print(f"\n[状态变更] {list(chunk.keys())}")
elif mode == "custom":
print(f"\n[进度] {chunk}")
19.3 astream_events (详细事件)
# astream_events 提供最详细的执行事件
async for event in chain.astream_events({"text": "hello"}, version="v2"):
kind = event["event"]
name = event.get("name", "unknown")
if kind == "on_chat_model_start":
print(f"\n[模型开始] {name}")
elif kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="", flush=True)
elif kind == "on_chat_model_end":
print(f"\n[模型结束]")
elif kind == "on_tool_start":
print(f"\n[工具开始] {name}")
elif kind == "on_tool_end":
print(f"\n[工具结束] {name}: {event['data'].get('output', '')[:100]}")
elif kind == "on_chain_start":
print(f"\n[链开始] {name}")
elif kind == "on_chain_end":
print(f"\n[链结束] {name}")
19.4 FastAPI SSE 流式服务
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
class ChatRequest(BaseModel):
message: str
thread_id: str = "default"
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
async def generate():
config = {"configurable": {"thread_id": req.thread_id}}
async for mode, chunk in agent.astream(
{"messages": [{"role": "user", "content": req.message}]},
stream_mode=["messages", "custom"],
config=config,
):
if mode == "messages":
msg, _ = chunk
content = msg.content if hasattr(msg, "content") else ""
if content:
yield f"data: {json.dumps({'type': 'text', 'content': content})}\n\n"
elif mode == "custom":
yield f"data: {json.dumps({'type': 'progress', 'content': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
19.5 工具中的流式进度
from langgraph.config import get_stream_writer
def data_analysis_tool(dataset: str) -> str:
"""分析大型数据集,返回分析报告。"""
writer = get_stream_writer()
writer(f"📊 开始分析 {dataset}...")
# 步骤1
writer(f" ├─ [1/4] 加载数据...")
# 加载...
# 步骤2
writer(f" ├─ [2/4] 数据清洗...")
# 清洗...
# 步骤3
writer(f" ├─ [3/4] 统计分析...")
# 分析...
# 步骤4
writer(f" └─ [4/4] 生成报告...")
# 报告...
writer(f"✅ 分析完成!")
return "分析报告: ..."
# 消费进度消息
for mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "分析sales_2025数据集"}]},
stream_mode=["messages", "custom"],
):
if mode == "custom":
print(f"\n{chunk}")
elif mode == "messages":
msg, _ = chunk
if hasattr(msg, "content") and msg.content:
print(msg.content, end="", flush=True)
20. Human-in-the-Loop
20.1 interrupt_before / interrupt_after
# create_agent 原生支持 HITL
agent = create_agent(
model="openai:gpt-4o",
tools=[send_email, transfer_money],
interrupt_before=["tools"], # 执行任何工具前暂停
)
# 运行,会在工具执行前暂停
result = agent.invoke(
{"messages": [{"role": "user", "content": "给admin@company.com发一封请假邮件"}]},
config={"configurable": {"thread_id": "session-123"}},
)
# 检查状态
state = agent.get_state({"configurable": {"thread_id": "session-123"}})
print(f"暂停: {state.next}") # "tools"
# 批准执行
from langgraph.types import Command
agent.invoke(
Command(resume={"action": "approve"}),
config={"configurable": {"thread_id": "session-123"}},
)
20.2 HumanInTheLoopMiddleware
from langchain.agents.middleware import HumanInTheLoopMiddleware
agent = create_agent(
model="openai:gpt-4o",
tools=[send_email, delete_record, transfer_money],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"send_email": {
"allowed_decisions": ["approve", "edit", "reject"],
},
"delete_record": {
"allowed_decisions": ["approve", "reject"],
},
"transfer_money": {
"allowed_decisions": ["approve", "reject"],
"require_comment": True, # 需要审批意见
},
},
),
],
)
# 不同的审批决策
from langgraph.types import Command
# 批准
agent.invoke(Command(resume={"decision": "approve"}), config)
# 编辑后批准
agent.invoke(Command(resume={
"decision": "edit",
"edited_input": {"to": "new_email@company.com"}
}), config)
# 拒绝
agent.invoke(Command(resume={
"decision": "reject",
"comment": "未经授权的操作"
}), config)
20.3 自定义 HITL 工作流
# 在 LangGraph 中自定义 HITL
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
class WorkflowState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
approval_status: str
def process_node(state):
"""处理节点"""
# 处理逻辑...
return {"messages": [...], "approval_status": "pending"}
def review_node(state):
"""审查节点:在此暂停等待人工"""
# interrupt() 会暂停图执行,等待外部恢复
decision = interrupt("请人工审查此操作")
return {"approval_status": decision}
def finalize_node(state):
return {"messages": [AIMessage(content="操作已完成")]}
# 构建图
workflow = StateGraph(WorkflowState)
workflow.add_node("process", process_node)
workflow.add_node("review", review_node)
workflow.add_node("finalize", finalize_node)
workflow.set_entry_point("process")
workflow.add_edge("process", "review")
workflow.add_edge("review", "finalize")
workflow.add_edge("finalize", END)
app = workflow.compile(checkpointer=InMemorySaver())
# 运行到 review_node 时会暂停
config = {"configurable": {"thread_id": "workflow-001"}}
app.invoke({"messages": [...]}, config)
# 人工审查后恢复执行
app.invoke(Command(resume="approved"), config)
20.4 HITL 审批 UI 示例
# 后端:提供待审批列表
@app.get("/pending-approvals")
async def get_pending():
"""获取所有暂停等待审批的会话"""
pending = []
# 查询所有处于interrupt状态的thread
# ...
return pending
@app.post("/approve/{thread_id}")
async def approve(thread_id: str, decision: dict):
"""审批并恢复执行"""
config = {"configurable": {"thread_id": thread_id}}
result = agent.invoke(
Command(resume=decision),
config=config,
)
return {"status": "resumed", "thread_id": thread_id}
21. 回调系统与监控
21.1 基础回调处理器
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any, Dict, List
class LoggingCallback(BaseCallbackHandler):
"""自定义回调:记录所有操作"""
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"[LLM Start] 提示数: {len(prompts)}")
for i, p in enumerate(prompts):
print(f" 提示{i+1}: {str(p)[:100]}...")
def on_llm_end(self, response: LLMResult, **kwargs):
usage = response.llm_output.get("token_usage", {})
print(f"[LLM End] Tokens: {usage.get('total_tokens', '?')}")
def on_llm_error(self, error, **kwargs):
print(f"[LLM Error] {error}")
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"[Tool Start] {serialized['name']}: {input_str[:80]}")
def on_tool_end(self, output, **kwargs):
print(f"[Tool End] {str(output)[:200]}")
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"[Chain Start] {serialized.get('name', 'Unknown')}")
def on_chain_end(self, outputs, **kwargs):
print(f"[Chain End] keys: {list(outputs.keys())}")
# 使用
callback = LoggingCallback()
result = chain.invoke({"input": "hello"}, config={"callbacks": [callback]})
21.2 成本追踪回调
class CostTracker(BaseCallbackHandler):
"""追踪 API 调用成本"""
PRICING = {
"gpt-4o": {"prompt": 2.50, "completion": 10.00}, # 每1M tokens
"gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
"claude-sonnet-4-6": {"prompt": 3.00, "completion": 15.00},
}
def __init__(self):
self.calls = 0
self.prompt_tokens = 0
self.completion_tokens = 0
self.total_cost = 0.0
def on_llm_end(self, response, **kwargs):
self.calls += 1
usage = response.llm_output.get("token_usage", {})
prompt_t = usage.get("prompt_tokens", 0)
completion_t = usage.get("completion_tokens", 0)
self.prompt_tokens += prompt_t
self.completion_tokens += completion_t
model = response.llm_output.get("model_name", "unknown")
price = self.PRICING.get(model, {"prompt": 2.50, "completion": 10.00})
cost = (prompt_t / 1_000_000) * price["prompt"] + \
(completion_t / 1_000_000) * price["completion"]
self.total_cost += cost
def report(self):
return {
"calls": self.calls,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"total_tokens": self.prompt_tokens + self.completion_tokens,
"estimated_cost_usd": round(self.total_cost, 6),
}
tracker = CostTracker()
chain.invoke({"input": "hello"}, config={"callbacks": [tracker]})
print(tracker.report())
21.3 LangSmith 集成
# 设置环境变量即可自动追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "ls_..."
os.environ["LANGCHAIN_PROJECT"] = "my-production-app"
# 所有执行自动追踪到 LangSmith
result = agent.invoke({
"messages": [{"role": "user", "content": "hello"}]
})
# 自定义追踪名称和标签
from langchain_core.tracers.context import tracing_v2_enabled
with tracing_v2_enabled(project_name="experiment"):
result = chain.invoke({"input": "test"}, config={
"metadata": {"experiment_id": "exp-42"},
"tags": ["evaluation", "v2"],
})
21.4 请求缓存
from langchain_core.caches import InMemoryCache, SQLiteCache
from langchain_core.globals import set_llm_cache
# 内存缓存
set_llm_cache(InMemoryCache())
# SQLite 持久化缓存
set_llm_cache(SQLiteCache(database_path=".langchain_cache.db"))
# 相同请求直接从缓存返回,不调用API
response1 = model.invoke("什么是机器学习?") # API调用
response2 = model.invoke("什么是机器学习?") # 缓存命中
22. 生产环境最佳实践
22.1 配置管理
from dataclasses import dataclass, field
from typing import List, Optional
import os
@dataclass
class AppConfig:
"""应用配置"""
# LLM
model_name: str = os.getenv("LLM_MODEL", "gpt-4o")
model_provider: str = os.getenv("LLM_PROVIDER", "openai")
temperature: float = 0.0
max_tokens: int = 4096
# 检索
chunk_size: int = 500
chunk_overlap: int = 50
retriever_k: int = 5
embedding_model: str = "text-embedding-3-small"
# Agent
max_iterations: int = 15
max_execution_time: int = 60
# 限流
requests_per_second: float = 5.0
max_concurrency: int = 10
# 持久化
checkpoint_db_url: str = "sqlite:///./agent.db"
chroma_persist_dir: str = "./chroma_db"
# 缓存
cache_enabled: bool = True
cache_db_path: str = "./llm_cache.db"
# 安全
pii_detection_enabled: bool = True
sensitive_tools: List[str] = field(default_factory=lambda: [
"send_email", "delete_record", "transfer_money"
])
require_approval: bool = True
config = AppConfig()
22.2 生产级 Agent 工厂
class ProductionAgent:
"""生产级 Agent 封装"""
def __init__(self, config: AppConfig):
self.config = config
# 初始化模型
self.model = init_chat_model(
f"{config.model_provider}:{config.model_name}",
temperature=config.temperature,
max_tokens=config.max_tokens,
)
# 初始化 Checkpointer
self.checkpointer = SqliteSaver.from_conn_string(
config.checkpoint_db_url
)
# 构建中间件栈
self.middleware = self._build_middleware()
# 创建 Agent
self.agent = create_agent(
model=self.model,
tools=self._get_tools(),
checkpointer=self.checkpointer,
middleware=self.middleware,
system_prompt=self._get_system_prompt(),
)
def _build_middleware(self):
middleware = []
# 敏感信息脱敏
if self.config.pii_detection_enabled:
middleware.append(PIIMiddleware("email", strategy="redact"))
middleware.append(PIIMiddleware("credit_card", strategy="block"))
# 调用限制
middleware.append(ModelCallLimitMiddleware(run_limit=15))
middleware.append(ToolCallLimitMiddleware(
tool_name="search_web", run_limit=10
))
# 摘要压缩
middleware.append(SummarizationMiddleware(
model="openai:gpt-4o-mini",
max_tokens_before_summary=3000,
))
# 敏感工具需审批
if self.config.require_approval:
middleware.append(HumanInTheLoopMiddleware(
interrupt_on={
tool: {"allowed_decisions": ["approve", "reject"]}
for tool in self.config.sensitive_tools
},
))
# 降级
middleware.append(ModelFallbackMiddleware(
self.model,
init_chat_model("openai:gpt-4o-mini"),
))
return middleware
def invoke(self, user_id: str, session_id: str, message: str):
config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
return self.agent.invoke(
{"messages": [{"role": "user", "content": message}]},
config=config,
)
def stream(self, user_id: str, session_id: str, message: str):
config = {"configurable": {"thread_id": f"{user_id}:{session_id}"}}
return self.agent.stream(
{"messages": [{"role": "user", "content": message}]},
stream_mode=["messages", "custom"],
config=config,
)
22.3 错误处理策略
from tenacity import (
retry, stop_after_attempt, wait_exponential,
retry_if_exception_type,
)
@retry(
retry=retry_if_exception_type((TimeoutError, ConnectionError)),
wait=wait_exponential(multiplier=1, min=2, max=30),
stop=stop_after_attempt(3),
)
def robust_invoke(chain, inputs, config=None):
try:
return chain.invoke(inputs, config=config)
except Exception as e:
# 记录错误日志
logger.error(f"调用失败: {e}", extra={
"inputs": str(inputs)[:200],
"error_type": type(e).__name__,
})
raise
# 优雅降级
def safe_invoke(chain, inputs, fallback="抱歉,服务暂时不可用。"):
try:
return chain.invoke(inputs)
except Exception as e:
return fallback
22.4 日志与可观测性
import logging
import json
import time
# 结构化日志
logger = logging.getLogger("langchain-app")
logger.setLevel(logging.INFO)
class StructuredLogHandler(logging.Handler):
def emit(self, record):
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
}
if hasattr(record, "extra_data"):
log_entry.update(record.extra_data)
print(json.dumps(log_entry, ensure_ascii=False))
# 性能监控
def monitor_latency(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = (time.time() - start) * 1000
logger.info(f"{func.__name__} 耗时: {duration:.0f}ms")
return result
return wrapper
@monitor_latency
def answer_question(question: str):
return agent.invoke({
"messages": [{"role": "user", "content": question}]
})
22.5 安全最佳实践
class SecurityMiddleware:
"""安全中间件"""
# 1. 输入校验
@staticmethod
def validate_input(message: str, max_length: int = 10000):
if len(message) > max_length:
raise ValueError(f"输入过长(>{max_length}字符)")
# 其他校验...
# 2. Prompt Injection 防护
@staticmethod
def sanitize_system_prompt(prompt: str) -> str:
dangerous_patterns = [
r"忽略.*指令",
r"ignore.*instruction",
r"system:\s*",
]
for pattern in dangerous_patterns:
if re.search(pattern, prompt, re.IGNORECASE):
raise ValueError("检测到潜在的Prompt Injection")
return prompt
# 3. 工具权限控制
@staticmethod
def validate_tool_access(user_role: str, tool_name: str) -> bool:
TOOL_PERMISSIONS = {
"admin": ["send_email", "delete_record", "transfer_money"],
"user": ["search_web", "get_weather", "calculate"],
"guest": ["search_web"],
}
allowed = TOOL_PERMISSIONS.get(user_role, [])
return tool_name in allowed
22.6 健康检查与监控
# FastAPI 健康检查端点
@app.get("/health")
async def health_check():
checks = {}
# 检查 LLM 连接
try:
model.invoke("ping")
checks["llm"] = "healthy"
except Exception as e:
checks["llm"] = f"unhealthy: {e}"
# 检查向量数据库
try:
vectorstore.similarity_search("test", k=1)
checks["vectorstore"] = "healthy"
except Exception as e:
checks["vectorstore"] = f"unhealthy: {e}"
# 检查 Checkpointer
try:
checkpointer.get({"configurable": {"thread_id": "health-check"}})
checks["checkpointer"] = "healthy"
except Exception as e:
checks["checkpointer"] = f"unhealthy: {e}"
all_healthy = all(v == "healthy" for v in checks.values())
status_code = 200 if all_healthy else 503
return JSONResponse(
content={"status": "ok" if all_healthy else "degraded", "checks": checks},
status_code=status_code,
)
# 指标收集
from prometheus_client import Counter, Histogram, generate_latest
request_count = Counter("agent_requests_total", "Total agent requests")
request_duration = Histogram("agent_request_duration_seconds", "Request duration")
tool_call_count = Counter("tool_calls_total", "Total tool calls", ["tool_name"])
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
23. v0.x → v1.0 迁移指南
23.1 导入变更速查
# ===== 模型 =====
# OLD (v0.x)
from langchain.chat_models import ChatOpenAI
from langchain.llms import OpenAI # 已废弃
# NEW (v1.0)
from langchain.chat_models import init_chat_model # 统一入口
from langchain_openai import ChatOpenAI
# ===== Agent =====
# OLD (v0.x)
from langchain.agents import initialize_agent, AgentExecutor
from langgraph.prebuilt import create_react_agent
# NEW (v1.0)
from langchain.agents import create_agent # 唯一推荐入口
# ===== Chains =====
# OLD (v0.x)
from langchain.chains import LLMChain, ConversationChain
from langchain.chains import RetrievalQA
# NEW (v1.0)
# 使用 LCEL 管道 或 create_agent
chain = prompt | model | parser
# ===== 记忆 =====
# OLD (v0.x)
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
# NEW (v1.0)
# 使用 LangGraph Checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver
# ===== 旧版兼容 =====
# 如需使用旧版 Chains/Agents
# pip install langchain-classic
from langchain_classic.chains import LLMChain
23.2 典型迁移场景
# === 场景1:简单对话链 ===
# OLD
from langchain.chains import ConversationChain
chain = ConversationChain(llm=model, memory=memory)
result = chain.run("你好")
# NEW — LCEL
chain = prompt | model | parser
result = chain.invoke({"input": "你好"})
# NEW — create_agent
agent = create_agent(model="openai:gpt-4o")
result = agent.invoke({"messages": [{"role": "user", "content": "你好"}]})
# === 场景2:RAG链 ===
# OLD
from langchain.chains import RetrievalQA
qa = RetrievalQA.from_chain_type(llm=model, retriever=retriever)
result = qa.run("question")
# NEW — LCEL
def format_docs(docs):
return "\n\n".join(d.page_content for d in docs)
rag = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt | model | parser
)
result = rag.invoke("question")
# === 场景3:带工具的Agent ===
# OLD
from langchain.agents import initialize_agent, AgentType
agent = initialize_agent(tools, model, AgentType.ZERO_SHOT_REACT_DESCRIPTION)
result = agent.run("任务")
# NEW
agent = create_agent(model="openai:gpt-4o", tools=tools)
result = agent.invoke({"messages": [{"role": "user", "content": "任务"}]})
# === 场景4:记忆 ===
# OLD
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(return_messages=True)
# NEW
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
agent = create_agent(
model="openai:gpt-4o",
checkpointer=checkpointer,
)
config = {"configurable": {"thread_id": "user-123"}}
23.3 迁移检查清单
# ✅ 检查清单
MIGRATION_CHECKLIST = """
1. [ ] 移除所有 langchain.chains 导入
→ 替换为 LCEL | 管道 或 create_agent
2. [ ] 移除 AgentExecutor / initialize_agent
→ 替换为 create_agent
3. [ ] 移除 create_react_agent (已废弃)
→ 替换为 create_agent
4. [ ] 移除 RunnableWithMessageHistory
→ 替换为 LangGraph Checkpointer
5. [ ] 移除 ConversationBufferMemory 等记忆类
→ 替换为 Checkpointer + Middleware
6. [ ] 检查所有 import 路径
→ langchain.llms → langchain.chat_models
→ langchain.chains → LCEL 或 langchain-classic
7. [ ] 使用 init_chat_model 统一模型创建
→ 替代各厂商独立的 ChatModel 导入
8. [ ] 处理结构化输出时使用 with_structured_output
→ 替代各种 OutputParser 组合
9. [ ] 跨厂商响应处理使用 content_blocks
→ 替代厂商特定的响应解析逻辑
10.[ ] 添加 langchain-classic 依赖(如需渐进迁移)
"""
附录 A:常用代码片段
A.1 翻译链
translate_prompt = ChatPromptTemplate.from_template(
"将以下{source}文本翻译成{target}:\n{text}"
)
translate_chain = translate_prompt | model | StrOutputParser()
A.2 总结链
summarize_prompt = ChatPromptTemplate.from_template(
"请用{max_words}字以内总结以下内容:\n\n{text}\n\n总结:"
)
summarize_chain = summarize_prompt | model | StrOutputParser()
A.3 代码审查链
review_prompt = ChatPromptTemplate.from_template("""
请审查以下{language}代码,从以下角度分析:
1. 潜在Bug
2. 性能问题
3. 安全隐患
4. 代码风格
```{language}
{code}
审查报告:“”")
review_chain = review_prompt | model | StrOutputParser()
### A.4 实体提取
```python
class Entities(BaseModel):
people: List[str] = Field(description="人名列表")
organizations: List[str] = Field(description="组织名列表")
locations: List[str] = Field(description="地名列表")
dates: List[str] = Field(description="日期列表")
extractor = model.with_structured_output(Entities)
entities = extractor.invoke("2026年6月,张三在北京参加了阿里巴巴的技术大会。")
A.5 文本分类
class Category(str, Enum):
TECH = "科技"
SPORTS = "体育"
FINANCE = "财经"
ENTERTAINMENT = "娱乐"
class Classification(BaseModel):
category: Category
confidence: float = Field(ge=0, le=1)
keywords: List[str]
classifier = model.with_structured_output(Classification)
result = classifier.invoke("苹果公司发布了新一代MacBook Pro,搭载M4芯片。")
附录 B:常见问题
Q: create_agent 和 old create_react_agent 的区别?
A: create_agent 是 v1.0 统一入口,运行在 LangGraph 上,原生支持 middleware、checkpointer、structured output。create_react_agent 在 v1.2 已废弃。
Q: 什么时候用 LCEL,什么时候用 create_agent?
A: 固定流程用 LCEL(| 管道),需要工具调用和动态决策用 create_agent,复杂工作流用 StateGraph。
Q: Checkpointer 选哪个?
A: 开发用 InMemorySaver,轻量生产用 SqliteSaver,正式生产用 PostgresSaver。
Q: 如何降低 Token 成本?
A: (1) 启用 LLM 缓存 (2) 用 SummarizationMiddleware (3) 小模型做总结,大模型做推理 (4) 优化 prompt 长度 (5) 使用更便宜的结构化输出模型。
Q: v0.x 项目如何渐进迁移?
A: 安装 langchain-classic 兼容包,用 from langchain_classic.chains import LLMChain 替代旧导入,逐步替换为 LCEL 和 create_agent。
Q: content_blocks 和 .content 有什么区别?
A: .content 返回纯文本拼接,厂商格式不统一。.content_blocks 返回类型化列表,跨厂商统一,支持 reasoning、tool_use、citation 等类型。
学习路线建议:
- 从 LCEL 和 create_agent 入手
- 熟练掌握 with_structured_output 和 content_blocks
- 理解中间件系统(Middleware)
- 深入 LangGraph 的 StateGraph 和 Checkpointer
- 掌握多 Agent 编排(Supervisor/Swarm)
本文档基于 LangChain v1.0 官方文档编写,持续更新中。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)