LangGraph 高可用设计:节点故障、服务熔断与自动恢复
LangGraph 高可用设计:节点故障、服务熔断与自动恢复
关键词:LangGraph、高可用设计、节点故障、服务熔断、自动恢复、分布式系统、状态管理
摘要
在构建基于大语言模型的复杂应用时,LangGraph 已经成为开发者手中的强大工具,它允许我们创建具有状态管理、循环和分支的智能工作流。然而,当这些工作流部署到生产环境并处理实际业务流量时,确保系统的高可用性变得至关重要。本文将深入探讨 LangGraph 的高可用设计,重点关注节点故障处理、服务熔断机制和自动恢复策略。我们将从基础概念开始,逐步深入到技术实现,最终通过实际案例展示如何构建一个稳健、可靠的 LangGraph 应用系统。
1. 背景介绍
1.1 主题背景和重要性
在现代软件开发中,高可用性已经成为系统设计的关键指标之一。简单来说,高可用性指的是系统在面对各种故障时仍能保持持续服务的能力。对于基于 LangGraph 构建的应用来说,这一点尤为重要,因为:
-
业务连续性:越来越多的企业开始使用 LangGraph 构建核心业务流程,如客户服务、内容生成、数据分析等。这些系统的中断可能直接导致业务损失。
-
状态复杂性:与传统的无状态服务不同,LangGraph 工作流通常包含复杂的状态管理。节点故障不仅会导致当前请求失败,还可能破坏状态一致性。
-
外部依赖:LangGraph 节点常常依赖外部服务,如大语言模型 API、数据库、第三方服务等。这些依赖的不可预测性增加了系统故障的风险。
让我们通过一个生活化的例子来理解这个问题。想象一家餐厅,它的厨房工作流程就像一个 LangGraph 工作流:
- 服务员(输入节点)接收顾客订单
- 配菜员(中间节点)准备食材
- 厨师(处理节点)烹饪食物
- 传菜员(输出节点)将食物送到顾客桌上
如果其中任何一个环节出现问题,比如配菜员突然生病,整个工作流程就会中断,顾客无法按时获得食物。在 LangGraph 系统中,类似的情况也会发生,我们需要设计机制来处理这些"员工请假"或"设备故障"的情况。
1.2 目标读者
本文主要面向以下读者群体:
- AI 应用开发者:已经使用或计划使用 LangGraph 构建应用的开发者,希望了解如何提高系统的可靠性。
- 系统架构师:负责设计和维护 AI 应用系统架构的专业人士。
- DevOps 工程师:负责部署、监控和维护 LangGraph 应用的运维人员。
- 技术决策者:需要评估 LangGraph 在企业级应用中适用性的技术管理者。
假设读者已经具备基本的 Python 编程知识,对 LangChain 和 LangGraph 有初步了解,并且熟悉分布式系统的基本概念。
1.3 核心问题或挑战
在 LangGraph 高可用设计中,我们主要面临以下几个核心挑战:
- 节点故障检测与处理:如何快速检测工作流中的节点故障,并采取适当的补救措施?
- 状态一致性维护:在故障发生和恢复过程中,如何确保工作流状态的一致性?
- 服务熔断机制:当某个节点或外部依赖持续失败时,如何防止整个系统被拖垮?
- 自动恢复策略:故障发生后,如何自动恢复服务,减少人工干预?
- 分布式协调:在多实例部署的情况下,如何协调不同实例之间的状态和故障处理?
这些问题并非 LangGraph 独有,它们是分布式系统设计中的经典问题。但 LangGraph 的特性(状态管理、有向图结构、节点间依赖)使这些问题的解决具有独特的挑战和机遇。
2. 核心概念解析
2.1 什么是高可用性?
在深入探讨 LangGraph 的高可用设计之前,让我们先明确高可用性的基本概念。
高可用性(High Availability, HA) 是指系统在较长时间内能够持续正常运行的能力。它通常用 uptime(正常运行时间)占总时间的百分比来衡量:
可用性=正常运行时间总时间×100% \text{可用性} = \frac{\text{正常运行时间}}{\text{总时间}} \times 100\% 可用性=总时间正常运行时间×100%
行业中常用"几个9"来描述可用性水平:
- 99% 可用性:每年停机时间约 3.65 天
- 99.9% 可用性:每年停机时间约 8.77 小时
- 99.99% 可用性:每年停机时间约 52.6 分钟
- 99.999% 可用性:每年停机时间约 5.26 分钟
对于关键业务系统,我们通常追求至少 99.9% 或更高的可用性。
2.2 LangGraph 基础回顾
在继续之前,让我们快速回顾一下 LangGraph 的基本概念,以便所有读者都能跟上我们的讨论。
LangGraph 是一个用于构建状态ful、多角色应用程序的库。它的核心概念包括:
- 状态(State):工作流中共享的数据结构,可以在节点间传递和修改。
- 节点(Nodes):工作流中的处理单元,执行特定的功能。
- 边(Edges):连接节点的有向边,定义了工作流的执行顺序。
- 图(Graph):由节点和边组成的完整工作流定义。
让我们用一个简单的客服助手例子来说明 LangGraph 的基本结构:
在这个例子中,每个方框代表一个节点,箭头代表边。整个工作流从"用户输入"开始,根据不同的意图分支到不同的处理节点,最后生成回复并结束。
2.3 故障模型与类型
在设计高可用系统时,我们首先需要了解可能发生的故障类型。对于 LangGraph 应用,常见的故障包括:
-
节点故障:
- 代码错误导致的节点执行异常
- 资源不足(内存、CPU)导致的节点崩溃
- 网络分区导致的节点不可达
-
外部依赖故障:
- LLM API 超时或错误
- 数据库连接失败
- 第三方服务不可用
-
状态相关故障:
- 状态数据损坏
- 状态同步失败
- 状态版本冲突
-
系统级故障:
- 服务器崩溃
- 网络中断
- 数据中心故障
让我们用一个表格来对比这些故障类型的特点:
| 故障类型 | 影响范围 | 检测难度 | 恢复难度 | 典型场景 |
|---|---|---|---|---|
| 节点故障 | 单个请求 | 中 | 中 | 代码异常、资源耗尽 |
| 外部依赖故障 | 多个请求 | 低 | 中 | API 超时、服务降级 |
| 状态相关故障 | 单个或多个请求 | 高 | 高 | 数据损坏、同步失败 |
| 系统级故障 | 所有请求 | 低 | 高 | 服务器宕机、数据中心故障 |
2.4 高可用设计的核心原则
在设计 LangGraph 高可用系统时,我们应遵循以下核心原则:
- 故障检测:快速、准确地发现故障
- 故障隔离:防止故障蔓延到系统的其他部分
- 故障恢复:自动从故障中恢复,减少人工干预
- 冗余设计:通过冗余组件消除单点故障
- 优雅降级:在故障发生时提供有限但可用的服务
- 状态管理:确保状态在故障前后的一致性
这些原则相互关联,共同构成了高可用设计的基础。接下来的章节中,我们将看到如何在 LangGraph 中应用这些原则。
2.5 核心概念关系图
为了更好地理解这些概念之间的关系,让我们创建一个概念结构图:
这张图展示了高可用性设计的核心组件以及它们与 LangGraph 基本元素之间的关系。在接下来的章节中,我们将深入探讨这些组件的具体实现。
3. 技术原理与实现
3.1 节点故障检测机制
故障检测是高可用设计的第一步,只有及时发现故障,才能采取相应的措施。在 LangGraph 中,我们可以通过多种机制来检测节点故障。
3.1.1 基于异常的检测
最直接的故障检测方式是捕获节点执行过程中抛出的异常。LangGraph 提供了内置的异常处理机制,我们可以利用它来检测故障。
让我们看一个简单的例子:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
def safe_node(state: AgentState):
try:
# 节点的正常逻辑
return {"messages": ["处理成功"]}
except Exception as e:
# 捕获异常并记录
print(f"节点执行失败: {e}")
# 可以选择返回错误状态或重新抛出异常
return {"messages": [f"处理失败: {str(e)}"]}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("safe_node", safe_node)
graph.set_entry_point("safe_node")
graph.add_edge("safe_node", END)
app = graph.compile()
这种方法简单直接,但只能检测到节点执行过程中明确抛出的异常,对于某些"静默失败"(如节点无限期挂起)的情况无法检测。
3.1.2 基于超时的检测
为了处理节点挂起的情况,我们可以添加超时机制。如果节点在指定时间内没有完成执行,就认为它发生了故障。
import asyncio
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
from functools import wraps
class AgentState(TypedDict):
messages: Annotated[list, add]
def timeout(seconds):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
print(f"节点执行超时 ({seconds}秒)")
return {"messages": [f"处理超时: {seconds}秒"]}
return wrapper
return decorator
@timeout(5) # 设置5秒超时
async def risky_node(state: AgentState):
# 模拟一个可能超时的操作
await asyncio.sleep(10) # 这个操作会超时
return {"messages": ["处理成功"]}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("risky_node", risky_node)
graph.set_entry_point("risky_node")
graph.add_edge("risky_node", END)
app = graph.compile()
这种方法可以有效检测节点挂起的情况,但需要注意设置合理的超时时间,避免误报。
3.1.3 基于心跳的检测
在分布式部署的场景中,我们可以使用心跳机制来检测节点的健康状态。每个工作节点定期向监控中心发送心跳信号,如果监控中心在一定时间内没有收到心跳,就认为该节点发生了故障。
import asyncio
import time
from typing import Dict, Optional
class HeartbeatMonitor:
def __init__(self, timeout: int = 10):
self.timeout = timeout
self.node_heartbeats: Dict[str, float] = {}
self.failed_nodes: set = set()
def register_node(self, node_id: str):
"""注册一个新节点"""
self.node_heartbeats[node_id] = time.time()
if node_id in self.failed_nodes:
self.failed_nodes.remove(node_id)
print(f"节点 {node_id} 已注册")
def receive_heartbeat(self, node_id: str):
"""接收节点心跳"""
if node_id in self.node_heartbeats:
self.node_heartbeats[node_id] = time.time()
if node_id in self.failed_nodes:
self.failed_nodes.remove(node_id)
print(f"节点 {node_id} 已恢复")
async def monitor(self):
"""监控节点健康状态"""
while True:
current_time = time.time()
for node_id, last_heartbeat in list(self.node_heartbeats.items()):
if current_time - last_heartbeat > self.timeout:
if node_id not in self.failed_nodes:
self.failed_nodes.add(node_id)
print(f"警告: 节点 {node_id} 可能已故障")
await asyncio.sleep(1)
class HeartbeatSender:
def __init__(self, node_id: str, monitor: HeartbeatMonitor, interval: int = 5):
self.node_id = node_id
self.monitor = monitor
self.interval = interval
async def send_heartbeats(self):
"""定期发送心跳"""
self.monitor.register_node(self.node_id)
while True:
self.monitor.receive_heartbeat(self.node_id)
await asyncio.sleep(self.interval)
# 使用示例
async def main():
monitor = HeartbeatMonitor(timeout=10)
sender1 = HeartbeatSender("node_1", monitor, interval=5)
sender2 = HeartbeatSender("node_2", monitor, interval=5)
# 启动监控和心跳发送
monitor_task = asyncio.create_task(monitor.monitor())
sender1_task = asyncio.create_task(sender1.send_heartbeats())
sender2_task = asyncio.create_task(sender2.send_heartbeats())
# 模拟运行一段时间
await asyncio.sleep(30)
# 取消任务
monitor_task.cancel()
sender1_task.cancel()
sender2_task.cancel()
asyncio.run(main())
这个例子展示了一个简单的心跳监控系统。在实际生产环境中,我们可以使用更成熟的解决方案,如 etcd、Consul 或 ZooKeeper 来实现服务发现和健康检查。
3.2 服务熔断机制
当某个节点或外部依赖持续失败时,我们需要一种机制来防止整个系统被拖垮,这就是服务熔断机制的作用。熔断器模式(Circuit Breaker Pattern)是一种防止系统在故障点反复尝试的设计模式。
3.2.1 熔断器的基本原理
熔断器可以看作是一个状态机,它有三种状态:
- 关闭(Closed):正常状态,请求可以正常通过。
- 打开(Open):故障状态,请求直接被拒绝,不执行实际操作。
- 半打开(Half-Open):恢复状态,允许少量请求通过,以检查服务是否恢复。
让我们用流程图来表示熔断器的状态转换:
当熔断器处于关闭状态时,它会记录请求的成功和失败次数。如果失败次数超过某个阈值,熔断器就会切换到打开状态。在打开状态下,所有请求都会直接失败,不会执行实际操作。经过一段冷却时间后,熔断器会切换到半打开状态,允许少量请求通过。如果这些请求成功,熔断器就会切换回关闭状态;如果失败,就会再次切换到打开状态。
3.2.2 在 LangGraph 中实现熔断器
现在让我们看看如何在 LangGraph 节点中实现熔断器模式:
import time
from enum import Enum
from functools import wraps
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
expected_exception: tuple = (Exception,),
fallback_function: Callable = None
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.fallback_function = fallback_function
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def _call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
# 切换到半打开状态
self.state = CircuitState.HALF_OPEN
print("熔断器切换到半打开状态,尝试恢复...")
else:
# 熔断器打开,使用降级方案
print("熔断器处于打开状态,请求被拒绝")
if self.fallback_function:
return self.fallback_function(*args, **kwargs)
raise Exception("服务暂时不可用")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
if self.fallback_function:
return self.fallback_function(*args, **kwargs)
raise
def _on_success(self):
"""请求成功时的处理"""
if self.state == CircuitState.HALF_OPEN:
# 半打开状态下成功,切换到关闭状态
self.state = CircuitState.CLOSED
self.failure_count = 0
print("熔断器切换到关闭状态,服务已恢复")
elif self.state == CircuitState.CLOSED:
# 关闭状态下成功,重置失败计数
self.failure_count = 0
def _on_failure(self):
"""请求失败时的处理"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# 半打开状态下失败,切换回打开状态
self.state = CircuitState.OPEN
print("熔断器切换到打开状态,服务恢复失败")
elif self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold:
# 关闭状态下失败次数超过阈值,切换到打开状态
self.state = CircuitState.OPEN
print(f"熔断器切换到打开状态,失败次数: {self.failure_count}")
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
return self._call(func, *args, **kwargs)
return wrapper
# 使用示例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
def fallback_llm_call(state: AgentState) -> AgentState:
"""降级方案:使用简单的回复"""
return {"messages": ["抱歉,服务暂时不可用,请稍后再试。"]}
# 创建熔断器
circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=60,
fallback_function=fallback_llm_call
)
@circuit_breaker
def unreliable_llm_call(state: AgentState) -> AgentState:
"""模拟一个不稳定的LLM调用"""
import random
if random.random() < 0.7: # 70%的概率失败
raise Exception("LLM API调用失败")
return {"messages": ["LLM处理结果"]}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("llm_node", unreliable_llm_call)
graph.set_entry_point("llm_node")
graph.add_edge("llm_node", END)
app = graph.compile()
这个例子实现了一个基本的熔断器,并将其应用到 LangGraph 节点中。当节点失败次数超过阈值时,熔断器会打开,后续请求会直接使用降级方案,而不是继续尝试调用可能失败的服务。
3.2.3 舱壁模式(Bulkhead Pattern)
除了熔断器模式,另一个有用的模式是舱壁模式。这个模式的思想是将系统的不同部分隔离起来,就像船舱的隔舱一样,这样即使一个部分出现问题,也不会影响到其他部分。
在 LangGraph 中,我们可以使用舱壁模式来隔离不同的节点或外部依赖。例如,我们可以为每个外部服务创建独立的线程池或连接池,这样一个服务的问题不会耗尽所有资源。
import asyncio
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
results: Dict[str, Any]
class BulkheadIsolator:
def __init__(self, max_concurrent: int = 10):
self.semaphores: Dict[str, asyncio.Semaphore] = {}
self.max_concurrent = max_concurrent
def _get_semaphore(self, bulkhead_name: str) -> asyncio.Semaphore:
if bulkhead_name not in self.semaphores:
self.semaphores[bulkhead_name] = asyncio.Semaphore(self.max_concurrent)
return self.semaphores[bulkhead_name]
async def execute(self, bulkhead_name: str, func, *args, **kwargs):
semaphore = self._get_semaphore(bulkhead_name)
async with semaphore:
return await func(*args, **kwargs)
# 创建舱壁隔离器
bulkhead = BulkheadIsolator(max_concurrent=5)
async def call_service_a(state: AgentState) -> AgentState:
"""调用服务A,使用舱壁隔离"""
async def _service_call():
# 模拟服务调用
await asyncio.sleep(1)
return "服务A结果"
result = await bulkhead.execute("service_a", _service_call)
return {"results": {"service_a": result}}
async def call_service_b(state: AgentState) -> AgentState:
"""调用服务B,使用舱壁隔离"""
async def _service_call():
# 模拟服务调用
await asyncio.sleep(1)
return "服务B结果"
result = await bulkhead.execute("service_b", _service_call)
current_results = state.get("results", {})
current_results["service_b"] = result
return {"results": current_results}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("call_service_a", call_service_a)
graph.add_node("call_service_b", call_service_b)
graph.set_entry_point("call_service_a")
graph.add_edge("call_service_a", "call_service_b")
graph.add_edge("call_service_b", END)
app = graph.compile()
在这个例子中,我们为服务A和服务B分别创建了独立的舱壁,每个舱壁最多允许5个并发请求。这样即使服务A出现问题,导致大量请求积压,也不会影响到服务B的正常调用。
3.3 状态管理与检查点
在 LangGraph 中,状态是工作流的核心。当故障发生时,我们需要确保状态的一致性,并且能够从故障点恢复,而不是重新开始整个工作流。这就需要使用检查点(Checkpoint)机制。
3.3.1 LangGraph 的检查点机制
LangGraph 内置了检查点机制,可以将工作流的状态保存到持久化存储中。当故障发生时,我们可以从最近的检查点恢复工作流的执行。
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
current_step: int
def step1(state: AgentState) -> AgentState:
print("执行步骤1")
return {
"messages": ["步骤1完成"],
"current_step": 1
}
def step2(state: AgentState) -> AgentState:
print("执行步骤2")
# 模拟可能失败的操作
import random
if random.random() < 0.5:
raise Exception("步骤2执行失败")
return {
"messages": ["步骤2完成"],
"current_step": 2
}
def step3(state: AgentState) -> AgentState:
print("执行步骤3")
return {
"messages": ["步骤3完成"],
"current_step": 3
}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_node("step3", step3)
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
# 创建SQLite检查点存储器
memory = SqliteSaver.from_conn_string(":memory:")
# 编译图,启用检查点
app = graph.compile(checkpointer=memory)
# 执行图,带重试逻辑
def run_with_recovery():
config = {"configurable": {"thread_id": "1"}}
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# 尝试执行
result = app.invoke({"messages": [], "current_step": 0}, config)
print("工作流执行完成:", result)
return result
except Exception as e:
retry_count += 1
print(f"执行失败 (尝试 {retry_count}/{max_retries}): {e}")
if retry_count >= max_retries:
print("已达到最大重试次数,放弃执行")
raise
# 等待一段时间后重试
import time
wait_time = 2 ** retry_count # 指数退避
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
# 运行
run_with_recovery()
这个例子展示了如何使用 LangGraph 的内置检查点机制。当工作流执行失败时,我们可以从最近的检查点恢复,而不是重新开始整个工作流。
3.3.2 自定义检查点策略
虽然 LangGraph 提供了内置的检查点机制,但在某些情况下,我们可能需要更精细的控制。例如,我们可能希望只在关键节点执行后保存检查点,或者使用不同的存储后端。
让我们看看如何实现一个自定义的检查点策略:
import json
import os
from typing import Any, Dict, Optional
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointAt, CheckpointTuple
class FileCheckpointSaver(BaseCheckpointSaver):
"""基于文件系统的检查点存储器"""
def __init__(self, directory: str = "./checkpoints"):
self.directory = directory
os.makedirs(directory, exist_ok=True)
def _get_file_path(self, thread_id: str, checkpoint_ns: str = "") -> str:
"""获取检查点文件路径"""
if checkpoint_ns:
return os.path.join(self.directory, f"{thread_id}_{checkpoint_ns}.json")
return os.path.join(self.directory, f"{thread_id}.json")
def get_tuple(self, config: Dict[str, Any]) -> Optional[CheckpointTuple]:
"""获取检查点"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
file_path = self._get_file_path(thread_id, checkpoint_ns)
if not os.path.exists(file_path):
return None
with open(file_path, "r") as f:
data = json.load(f)
return CheckpointTuple(
config=config,
checkpoint=Checkpoint(**data["checkpoint"]),
metadata=data.get("metadata", {}),
parent_config=data.get("parent_config")
)
def put(self, config: Dict[str, Any], checkpoint: Checkpoint, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""保存检查点"""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
file_path = self._get_file_path(thread_id, checkpoint_ns)
data = {
"checkpoint": checkpoint.dict(),
"metadata": metadata,
"parent_config": config
}
with open(file_path, "w") as f:
json.dump(data, f)
return config
def list(self, config: Dict[str, Any]) -> Any:
"""列出检查点"""
# 简化实现,实际应用中可能需要更复杂的逻辑
thread_id = config["configurable"]["thread_id"]
for filename in os.listdir(self.directory):
if filename.startswith(thread_id):
with open(os.path.join(self.directory, filename), "r") as f:
data = json.load(f)
yield CheckpointTuple(
config=config,
checkpoint=Checkpoint(**data["checkpoint"]),
metadata=data.get("metadata", {}),
parent_config=data.get("parent_config")
)
# 自定义检查点策略
from langgraph.checkpoint.base import CheckpointAt
class CustomCheckpointAt(CheckpointAt):
"""自定义检查点策略:只在特定节点后保存检查点"""
def __init__(self, nodes_to_checkpoint: list):
self.nodes_to_checkpoint = nodes_to_checkpoint
def __call__(self, step: dict) -> bool:
"""决定是否在当前步骤后保存检查点"""
# 检查是否是我们想要保存检查点的节点
if "nodes" in step:
for node_id in step["nodes"]:
if node_id in self.nodes_to_checkpoint:
print(f"在节点 {node_id} 后保存检查点")
return True
return False
# 使用自定义检查点
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
current_step: int
# 创建自定义检查点存储器和策略
checkpoint_saver = FileCheckpointSaver()
checkpoint_at = CustomCheckpointAt(nodes_to_checkpoint=["step2"])
# 定义节点
def step1(state: AgentState) -> AgentState:
print("执行步骤1")
return {
"messages": ["步骤1完成"],
"current_step": 1
}
def step2(state: AgentState) -> AgentState:
print("执行步骤2")
return {
"messages": ["步骤2完成"],
"current_step": 2
}
def step3(state: AgentState) -> AgentState:
print("执行步骤3")
return {
"messages": ["步骤3完成"],
"current_step": 3
}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_node("step3", step3)
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)
# 编译图,使用自定义检查点
app = graph.compile(
checkpointer=checkpoint_saver,
checkpointer_kwargs={"checkpoint_at": checkpoint_at}
)
# 执行图
config = {"configurable": {"thread_id": "custom_checkpoint_example"}}
result = app.invoke({"messages": [], "current_step": 0}, config)
print("工作流执行完成:", result)
这个例子展示了如何实现自定义的检查点存储器和检查点策略。在实际应用中,我们可以根据需求选择合适的存储后端(如 Redis、数据库等)和检查点策略。
3.4 重试与故障转移策略
当故障发生时,我们需要决定如何处理。是重试失败的操作?还是故障转移到备用节点?让我们探讨一些常见的策略。
3.4.1 重试策略
重试是最简单直接的故障处理方式,但需要谨慎使用,避免造成"重试风暴"。常见的重试策略包括:
- 固定间隔重试:每次重试之间等待固定的时间。
- 指数退避重试:每次重试的等待时间指数增长,如 1s, 2s, 4s, 8s…
- 带抖动的指数退避:在指数退避的基础上添加随机抖动,避免多个请求同时重试。
让我们看看如何在 LangGraph 节点中实现这些重试策略:
import asyncio
import random
from functools import wraps
from typing import Callable, Any, Tuple, Type
class RetryPolicy:
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retry_exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.retry_exceptions = retry_exceptions
def _calculate_delay(self, attempt: int) -> float:
"""计算重试延迟"""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
# 添加抖动,范围为延迟的±20%
jitter_amount = delay * 0.2
delay += random.uniform(-jitter_amount, jitter_amount)
delay = max(0, delay)
return delay
def __call__(self, func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except self.retry_exceptions as e:
last_exception = e
if attempt == self.max_retries:
print(f"已达到最大重试次数 ({self.max_retries}),放弃重试")
raise
delay = self._calculate_delay(attempt)
print(f"重试 {attempt + 1}/{self.max_retries},等待 {delay:.2f} 秒...")
await asyncio.sleep(delay)
raise last_exception
@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except self.retry_exceptions as e:
last_exception = e
if attempt == self.max_retries:
print(f"已达到最大重试次数 ({self.max_retries}),放弃重试")
raise
delay = self._calculate_delay(attempt)
print(f"重试 {attempt + 1}/{self.max_retries},等待 {delay:.2f} 秒...")
import time
time.sleep(delay)
raise last_exception
import inspect
if inspect.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
# 使用示例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
# 创建重试策略
retry_policy = RetryPolicy(
max_retries=3,
base_delay=1.0,
max_delay=10.0,
exponential_base=2.0,
jitter=True
)
@retry_policy
async def unreliable_node(state: AgentState) -> AgentState:
"""模拟一个不稳定的节点"""
import random
if random.random() < 0.7: # 70%的概率失败
print("节点执行失败")
raise Exception("临时错误")
print("节点执行成功")
return {"messages": ["处理成功"]}
# 构建图
graph = StateGraph(AgentState)
graph.add_node("unreliable_node", unreliable_node)
graph.set_entry_point("unreliable_node")
graph.add_edge("unreliable_node", END)
app = graph.compile()
# 执行图
async def main():
result = await app.ainvoke({"messages": []})
print("最终结果:", result)
asyncio.run(main())
这个例子实现了一个灵活的重试策略装饰器,可以应用于同步或异步函数。它支持指数退避和抖动,可以有效避免重试风暴。
3.4.2 故障转移策略
当重试不能解决问题时,我们可能需要考虑故障转移(Failover)。故障转移是指当主节点发生故障时,自动切换到备用节点的过程。
在 LangGraph 中,我们可以通过多种方式实现故障转移:
- 备用节点:为每个关键节点准备一个备用实现,当主节点失败时,自动切换到备用节点。
- 多路复用:同时调用多个实现,使用第一个成功的结果。
- 分布式部署:将 LangGraph 应用部署到多个实例,通过负载均衡器分配请求。
让我们看看如何实现备用节点的故障转移:
from typing import Callable, List, Any
from functools import wraps
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
messages: Annotated[list, add]
class FailoverHandler:
def __init__(self, primaries: List[Callable], fallbacks: List[Callable]):
self.primaries = primaries
self.fallbacks = fallbacks
def _try_execute(self, funcs: List[Callable], state: AgentState) -> Any:
"""尝试执行一系列函数,返回第一个成功的结果"""
last_exception = None
for func in funcs:
try:
print(f"尝试执行: {func.__name__}")
return func(state)
except Exception as e:
last_exception = e
print(f"{func.__name__} 执行失败: {e}")
raise last_exception or Exception("所有函数都执行失败")
def __call__(self, state: AgentState) -> AgentState:
try:
# 首先尝试主节点
return self._try_execute(self.primaries, state)
except Exception as e:
print(f"所有主节点都失败,尝试备用节点: {e}")
# 主节点都失败,尝试备用节点
return self._try_execute(self.fallbacks, state)
# 定义一些模拟节点
def primary_llm_call(state: AgentState) -> AgentState:
"""主LLM调用节点"""
import random
if random.random() < 0.8: # 80%的概率失败
raise Exception("主LLM API调用失败")
return {"messages": ["主LLM处理结果"]}
def secondary_llm_call(state: AgentState) -> AgentState:
"""备用LLM调用节点"""
import random
if random.random() < 0.5: # 50%的概率失败
raise Exception("备用LLM API调用失败")
return {"messages": ["备用LLM处理结果"]}
def simple_fallback(state: AgentState) -> AgentState:
"""简单降级方案"""
return {"messages": ["抱歉,服务暂时不可用,请使用预定义回复。"]}
# 创建故障转移处理器
failover_handler = FailoverHandler(
primaries=[primary_llm_call],
fallbacks=[secondary_llm_call, simple_fallback]
)
# 构建图
graph = StateGraph(AgentState)
graph.add_node("llm_node", failover_handler)
graph.set_entry_point("llm_node")
graph.add_edge("llm_node", END)
app = graph.compile()
# 执行图多次,观察故障转移效果
for i in range(5):
print(f"\n--- 执行第 {i+1} 次 ---")
try:
result = app.invoke({"messages": []})
print("结果:", result)
except Exception as e:
print("错误:", e)
这个例子实现了一个简单的故障转移机制。当主节点失败时,系统会自动尝试备用节点,最后还有一个简单的降级方案作为最后的保障。
4. 实际应用
4.1 案例研究:构建高可用的客服助手
让我们通过一个实际案例来展示如何将前面介绍的技术整合起来,构建一个高可用的 LangGraph 应用。我们将创建一个客服助手系统,它能够处理用户的各种查询,并在面对各种故障时保持稳定运行。
4.1.1 系统架构设计
首先,让我们设计系统的整体架构:
这个架构包含以下关键组件:
- 负载均衡器:分发用户请求
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)