LangGraph 高可用设计:节点故障、服务熔断与自动恢复

关键词:LangGraph、高可用设计、节点故障、服务熔断、自动恢复、分布式系统、状态管理

摘要

在构建基于大语言模型的复杂应用时,LangGraph 已经成为开发者手中的强大工具,它允许我们创建具有状态管理、循环和分支的智能工作流。然而,当这些工作流部署到生产环境并处理实际业务流量时,确保系统的高可用性变得至关重要。本文将深入探讨 LangGraph 的高可用设计,重点关注节点故障处理、服务熔断机制和自动恢复策略。我们将从基础概念开始,逐步深入到技术实现,最终通过实际案例展示如何构建一个稳健、可靠的 LangGraph 应用系统。

1. 背景介绍

1.1 主题背景和重要性

在现代软件开发中,高可用性已经成为系统设计的关键指标之一。简单来说,高可用性指的是系统在面对各种故障时仍能保持持续服务的能力。对于基于 LangGraph 构建的应用来说,这一点尤为重要,因为:

  1. 业务连续性:越来越多的企业开始使用 LangGraph 构建核心业务流程,如客户服务、内容生成、数据分析等。这些系统的中断可能直接导致业务损失。

  2. 状态复杂性:与传统的无状态服务不同,LangGraph 工作流通常包含复杂的状态管理。节点故障不仅会导致当前请求失败,还可能破坏状态一致性。

  3. 外部依赖:LangGraph 节点常常依赖外部服务,如大语言模型 API、数据库、第三方服务等。这些依赖的不可预测性增加了系统故障的风险。

让我们通过一个生活化的例子来理解这个问题。想象一家餐厅,它的厨房工作流程就像一个 LangGraph 工作流:

  • 服务员(输入节点)接收顾客订单
  • 配菜员(中间节点)准备食材
  • 厨师(处理节点)烹饪食物
  • 传菜员(输出节点)将食物送到顾客桌上

如果其中任何一个环节出现问题,比如配菜员突然生病,整个工作流程就会中断,顾客无法按时获得食物。在 LangGraph 系统中,类似的情况也会发生,我们需要设计机制来处理这些"员工请假"或"设备故障"的情况。

1.2 目标读者

本文主要面向以下读者群体:

  1. AI 应用开发者:已经使用或计划使用 LangGraph 构建应用的开发者,希望了解如何提高系统的可靠性。
  2. 系统架构师:负责设计和维护 AI 应用系统架构的专业人士。
  3. DevOps 工程师:负责部署、监控和维护 LangGraph 应用的运维人员。
  4. 技术决策者:需要评估 LangGraph 在企业级应用中适用性的技术管理者。

假设读者已经具备基本的 Python 编程知识,对 LangChain 和 LangGraph 有初步了解,并且熟悉分布式系统的基本概念。

1.3 核心问题或挑战

在 LangGraph 高可用设计中,我们主要面临以下几个核心挑战:

  1. 节点故障检测与处理:如何快速检测工作流中的节点故障,并采取适当的补救措施?
  2. 状态一致性维护:在故障发生和恢复过程中,如何确保工作流状态的一致性?
  3. 服务熔断机制:当某个节点或外部依赖持续失败时,如何防止整个系统被拖垮?
  4. 自动恢复策略:故障发生后,如何自动恢复服务,减少人工干预?
  5. 分布式协调:在多实例部署的情况下,如何协调不同实例之间的状态和故障处理?

这些问题并非 LangGraph 独有,它们是分布式系统设计中的经典问题。但 LangGraph 的特性(状态管理、有向图结构、节点间依赖)使这些问题的解决具有独特的挑战和机遇。

2. 核心概念解析

2.1 什么是高可用性?

在深入探讨 LangGraph 的高可用设计之前,让我们先明确高可用性的基本概念。

高可用性(High Availability, HA) 是指系统在较长时间内能够持续正常运行的能力。它通常用 uptime(正常运行时间)占总时间的百分比来衡量:

可用性=正常运行时间总时间×100% \text{可用性} = \frac{\text{正常运行时间}}{\text{总时间}} \times 100\% 可用性=总时间正常运行时间×100%

行业中常用"几个9"来描述可用性水平:

  • 99% 可用性:每年停机时间约 3.65 天
  • 99.9% 可用性:每年停机时间约 8.77 小时
  • 99.99% 可用性:每年停机时间约 52.6 分钟
  • 99.999% 可用性:每年停机时间约 5.26 分钟

对于关键业务系统,我们通常追求至少 99.9% 或更高的可用性。

2.2 LangGraph 基础回顾

在继续之前,让我们快速回顾一下 LangGraph 的基本概念,以便所有读者都能跟上我们的讨论。

LangGraph 是一个用于构建状态ful、多角色应用程序的库。它的核心概念包括:

  1. 状态(State):工作流中共享的数据结构,可以在节点间传递和修改。
  2. 节点(Nodes):工作流中的处理单元,执行特定的功能。
  3. 边(Edges):连接节点的有向边,定义了工作流的执行顺序。
  4. 图(Graph):由节点和边组成的完整工作流定义。

让我们用一个简单的客服助手例子来说明 LangGraph 的基本结构:

查询订单

产品咨询

投诉

用户输入

意图识别

订单查询

产品信息检索

投诉处理

生成回复

结束

在这个例子中,每个方框代表一个节点,箭头代表边。整个工作流从"用户输入"开始,根据不同的意图分支到不同的处理节点,最后生成回复并结束。

2.3 故障模型与类型

在设计高可用系统时,我们首先需要了解可能发生的故障类型。对于 LangGraph 应用,常见的故障包括:

  1. 节点故障

    • 代码错误导致的节点执行异常
    • 资源不足(内存、CPU)导致的节点崩溃
    • 网络分区导致的节点不可达
  2. 外部依赖故障

    • LLM API 超时或错误
    • 数据库连接失败
    • 第三方服务不可用
  3. 状态相关故障

    • 状态数据损坏
    • 状态同步失败
    • 状态版本冲突
  4. 系统级故障

    • 服务器崩溃
    • 网络中断
    • 数据中心故障

让我们用一个表格来对比这些故障类型的特点:

故障类型 影响范围 检测难度 恢复难度 典型场景
节点故障 单个请求 代码异常、资源耗尽
外部依赖故障 多个请求 API 超时、服务降级
状态相关故障 单个或多个请求 数据损坏、同步失败
系统级故障 所有请求 服务器宕机、数据中心故障

2.4 高可用设计的核心原则

在设计 LangGraph 高可用系统时,我们应遵循以下核心原则:

  1. 故障检测:快速、准确地发现故障
  2. 故障隔离:防止故障蔓延到系统的其他部分
  3. 故障恢复:自动从故障中恢复,减少人工干预
  4. 冗余设计:通过冗余组件消除单点故障
  5. 优雅降级:在故障发生时提供有限但可用的服务
  6. 状态管理:确保状态在故障前后的一致性

这些原则相互关联,共同构成了高可用设计的基础。接下来的章节中,我们将看到如何在 LangGraph 中应用这些原则。

2.5 核心概念关系图

为了更好地理解这些概念之间的关系,让我们创建一个概念结构图:

includes

includes

includes

includes

includes

includes

uses

uses

uses

uses

uses

uses

uses

uses

contains

contains

contains

may_use

HIGH_AVAILABILITY

FAULT_DETECTION

FAULT_ISOLATION

FAULT_RECOVERY

REDUNDANCY

GRACEFUL_DEGRADATION

STATE_MANAGEMENT

NODE_MONITORING

HEARTBEAT

METRICS

CIRCUIT_BREAKER

BULKHEAD

RETRY

FAILOVER

CHECKPOINT

LANGGRAPH

NODE

STATE

EDGE

EXTERNAL_DEPENDENCY

这张图展示了高可用性设计的核心组件以及它们与 LangGraph 基本元素之间的关系。在接下来的章节中,我们将深入探讨这些组件的具体实现。

3. 技术原理与实现

3.1 节点故障检测机制

故障检测是高可用设计的第一步,只有及时发现故障,才能采取相应的措施。在 LangGraph 中,我们可以通过多种机制来检测节点故障。

3.1.1 基于异常的检测

最直接的故障检测方式是捕获节点执行过程中抛出的异常。LangGraph 提供了内置的异常处理机制,我们可以利用它来检测故障。

让我们看一个简单的例子:

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]

def safe_node(state: AgentState):
    try:
        # 节点的正常逻辑
        return {"messages": ["处理成功"]}
    except Exception as e:
        # 捕获异常并记录
        print(f"节点执行失败: {e}")
        # 可以选择返回错误状态或重新抛出异常
        return {"messages": [f"处理失败: {str(e)}"]}

# 构建图
graph = StateGraph(AgentState)
graph.add_node("safe_node", safe_node)
graph.set_entry_point("safe_node")
graph.add_edge("safe_node", END)

app = graph.compile()

这种方法简单直接,但只能检测到节点执行过程中明确抛出的异常,对于某些"静默失败"(如节点无限期挂起)的情况无法检测。

3.1.2 基于超时的检测

为了处理节点挂起的情况,我们可以添加超时机制。如果节点在指定时间内没有完成执行,就认为它发生了故障。

import asyncio
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
from functools import wraps

class AgentState(TypedDict):
    messages: Annotated[list, add]

def timeout(seconds):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
            except asyncio.TimeoutError:
                print(f"节点执行超时 ({seconds}秒)")
                return {"messages": [f"处理超时: {seconds}秒"]}
        return wrapper
    return decorator

@timeout(5)  # 设置5秒超时
async def risky_node(state: AgentState):
    # 模拟一个可能超时的操作
    await asyncio.sleep(10)  # 这个操作会超时
    return {"messages": ["处理成功"]}

# 构建图
graph = StateGraph(AgentState)
graph.add_node("risky_node", risky_node)
graph.set_entry_point("risky_node")
graph.add_edge("risky_node", END)

app = graph.compile()

这种方法可以有效检测节点挂起的情况,但需要注意设置合理的超时时间,避免误报。

3.1.3 基于心跳的检测

在分布式部署的场景中,我们可以使用心跳机制来检测节点的健康状态。每个工作节点定期向监控中心发送心跳信号,如果监控中心在一定时间内没有收到心跳,就认为该节点发生了故障。

import asyncio
import time
from typing import Dict, Optional

class HeartbeatMonitor:
    def __init__(self, timeout: int = 10):
        self.timeout = timeout
        self.node_heartbeats: Dict[str, float] = {}
        self.failed_nodes: set = set()
    
    def register_node(self, node_id: str):
        """注册一个新节点"""
        self.node_heartbeats[node_id] = time.time()
        if node_id in self.failed_nodes:
            self.failed_nodes.remove(node_id)
        print(f"节点 {node_id} 已注册")
    
    def receive_heartbeat(self, node_id: str):
        """接收节点心跳"""
        if node_id in self.node_heartbeats:
            self.node_heartbeats[node_id] = time.time()
            if node_id in self.failed_nodes:
                self.failed_nodes.remove(node_id)
                print(f"节点 {node_id} 已恢复")
    
    async def monitor(self):
        """监控节点健康状态"""
        while True:
            current_time = time.time()
            for node_id, last_heartbeat in list(self.node_heartbeats.items()):
                if current_time - last_heartbeat > self.timeout:
                    if node_id not in self.failed_nodes:
                        self.failed_nodes.add(node_id)
                        print(f"警告: 节点 {node_id} 可能已故障")
            await asyncio.sleep(1)

class HeartbeatSender:
    def __init__(self, node_id: str, monitor: HeartbeatMonitor, interval: int = 5):
        self.node_id = node_id
        self.monitor = monitor
        self.interval = interval
    
    async def send_heartbeats(self):
        """定期发送心跳"""
        self.monitor.register_node(self.node_id)
        while True:
            self.monitor.receive_heartbeat(self.node_id)
            await asyncio.sleep(self.interval)

# 使用示例
async def main():
    monitor = HeartbeatMonitor(timeout=10)
    sender1 = HeartbeatSender("node_1", monitor, interval=5)
    sender2 = HeartbeatSender("node_2", monitor, interval=5)
    
    # 启动监控和心跳发送
    monitor_task = asyncio.create_task(monitor.monitor())
    sender1_task = asyncio.create_task(sender1.send_heartbeats())
    sender2_task = asyncio.create_task(sender2.send_heartbeats())
    
    # 模拟运行一段时间
    await asyncio.sleep(30)
    
    # 取消任务
    monitor_task.cancel()
    sender1_task.cancel()
    sender2_task.cancel()

asyncio.run(main())

这个例子展示了一个简单的心跳监控系统。在实际生产环境中,我们可以使用更成熟的解决方案,如 etcd、Consul 或 ZooKeeper 来实现服务发现和健康检查。

3.2 服务熔断机制

当某个节点或外部依赖持续失败时,我们需要一种机制来防止整个系统被拖垮,这就是服务熔断机制的作用。熔断器模式(Circuit Breaker Pattern)是一种防止系统在故障点反复尝试的设计模式。

3.2.1 熔断器的基本原理

熔断器可以看作是一个状态机,它有三种状态:

  1. 关闭(Closed):正常状态,请求可以正常通过。
  2. 打开(Open):故障状态,请求直接被拒绝,不执行实际操作。
  3. 半打开(Half-Open):恢复状态,允许少量请求通过,以检查服务是否恢复。

让我们用流程图来表示熔断器的状态转换:

失败次数超过阈值

冷却时间到期

健康检查成功

健康检查失败

Closed

Open

HalfOpen

当熔断器处于关闭状态时,它会记录请求的成功和失败次数。如果失败次数超过某个阈值,熔断器就会切换到打开状态。在打开状态下,所有请求都会直接失败,不会执行实际操作。经过一段冷却时间后,熔断器会切换到半打开状态,允许少量请求通过。如果这些请求成功,熔断器就会切换回关闭状态;如果失败,就会再次切换到打开状态。

3.2.2 在 LangGraph 中实现熔断器

现在让我们看看如何在 LangGraph 节点中实现熔断器模式:

import time
from enum import Enum
from functools import wraps
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        expected_exception: tuple = (Exception,),
        fallback_function: Callable = None
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self.fallback_function = fallback_function
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
    
    def _call(self, func: Callable, *args, **kwargs) -> Any:
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                # 切换到半打开状态
                self.state = CircuitState.HALF_OPEN
                print("熔断器切换到半打开状态,尝试恢复...")
            else:
                # 熔断器打开,使用降级方案
                print("熔断器处于打开状态,请求被拒绝")
                if self.fallback_function:
                    return self.fallback_function(*args, **kwargs)
                raise Exception("服务暂时不可用")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            if self.fallback_function:
                return self.fallback_function(*args, **kwargs)
            raise
    
    def _on_success(self):
        """请求成功时的处理"""
        if self.state == CircuitState.HALF_OPEN:
            # 半打开状态下成功,切换到关闭状态
            self.state = CircuitState.CLOSED
            self.failure_count = 0
            print("熔断器切换到关闭状态,服务已恢复")
        elif self.state == CircuitState.CLOSED:
            # 关闭状态下成功,重置失败计数
            self.failure_count = 0
    
    def _on_failure(self):
        """请求失败时的处理"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitState.HALF_OPEN:
            # 半打开状态下失败,切换回打开状态
            self.state = CircuitState.OPEN
            print("熔断器切换到打开状态,服务恢复失败")
        elif self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold:
            # 关闭状态下失败次数超过阈值,切换到打开状态
            self.state = CircuitState.OPEN
            print(f"熔断器切换到打开状态,失败次数: {self.failure_count}")
    
    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            return self._call(func, *args, **kwargs)
        return wrapper

# 使用示例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]

def fallback_llm_call(state: AgentState) -> AgentState:
    """降级方案:使用简单的回复"""
    return {"messages": ["抱歉,服务暂时不可用,请稍后再试。"]}

# 创建熔断器
circuit_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=60,
    fallback_function=fallback_llm_call
)

@circuit_breaker
def unreliable_llm_call(state: AgentState) -> AgentState:
    """模拟一个不稳定的LLM调用"""
    import random
    if random.random() < 0.7:  # 70%的概率失败
        raise Exception("LLM API调用失败")
    return {"messages": ["LLM处理结果"]}

# 构建图
graph = StateGraph(AgentState)
graph.add_node("llm_node", unreliable_llm_call)
graph.set_entry_point("llm_node")
graph.add_edge("llm_node", END)

app = graph.compile()

这个例子实现了一个基本的熔断器,并将其应用到 LangGraph 节点中。当节点失败次数超过阈值时,熔断器会打开,后续请求会直接使用降级方案,而不是继续尝试调用可能失败的服务。

3.2.3 舱壁模式(Bulkhead Pattern)

除了熔断器模式,另一个有用的模式是舱壁模式。这个模式的思想是将系统的不同部分隔离起来,就像船舱的隔舱一样,这样即使一个部分出现问题,也不会影响到其他部分。

在 LangGraph 中,我们可以使用舱壁模式来隔离不同的节点或外部依赖。例如,我们可以为每个外部服务创建独立的线程池或连接池,这样一个服务的问题不会耗尽所有资源。

import asyncio
from typing import Dict, Any
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]
    results: Dict[str, Any]

class BulkheadIsolator:
    def __init__(self, max_concurrent: int = 10):
        self.semaphores: Dict[str, asyncio.Semaphore] = {}
        self.max_concurrent = max_concurrent
    
    def _get_semaphore(self, bulkhead_name: str) -> asyncio.Semaphore:
        if bulkhead_name not in self.semaphores:
            self.semaphores[bulkhead_name] = asyncio.Semaphore(self.max_concurrent)
        return self.semaphores[bulkhead_name]
    
    async def execute(self, bulkhead_name: str, func, *args, **kwargs):
        semaphore = self._get_semaphore(bulkhead_name)
        async with semaphore:
            return await func(*args, **kwargs)

# 创建舱壁隔离器
bulkhead = BulkheadIsolator(max_concurrent=5)

async def call_service_a(state: AgentState) -> AgentState:
    """调用服务A,使用舱壁隔离"""
    async def _service_call():
        # 模拟服务调用
        await asyncio.sleep(1)
        return "服务A结果"
    
    result = await bulkhead.execute("service_a", _service_call)
    return {"results": {"service_a": result}}

async def call_service_b(state: AgentState) -> AgentState:
    """调用服务B,使用舱壁隔离"""
    async def _service_call():
        # 模拟服务调用
        await asyncio.sleep(1)
        return "服务B结果"
    
    result = await bulkhead.execute("service_b", _service_call)
    current_results = state.get("results", {})
    current_results["service_b"] = result
    return {"results": current_results}

# 构建图
graph = StateGraph(AgentState)
graph.add_node("call_service_a", call_service_a)
graph.add_node("call_service_b", call_service_b)
graph.set_entry_point("call_service_a")
graph.add_edge("call_service_a", "call_service_b")
graph.add_edge("call_service_b", END)

app = graph.compile()

在这个例子中,我们为服务A和服务B分别创建了独立的舱壁,每个舱壁最多允许5个并发请求。这样即使服务A出现问题,导致大量请求积压,也不会影响到服务B的正常调用。

3.3 状态管理与检查点

在 LangGraph 中,状态是工作流的核心。当故障发生时,我们需要确保状态的一致性,并且能够从故障点恢复,而不是重新开始整个工作流。这就需要使用检查点(Checkpoint)机制。

3.3.1 LangGraph 的检查点机制

LangGraph 内置了检查点机制,可以将工作流的状态保存到持久化存储中。当故障发生时,我们可以从最近的检查点恢复工作流的执行。

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]
    current_step: int

def step1(state: AgentState) -> AgentState:
    print("执行步骤1")
    return {
        "messages": ["步骤1完成"],
        "current_step": 1
    }

def step2(state: AgentState) -> AgentState:
    print("执行步骤2")
    # 模拟可能失败的操作
    import random
    if random.random() < 0.5:
        raise Exception("步骤2执行失败")
    return {
        "messages": ["步骤2完成"],
        "current_step": 2
    }

def step3(state: AgentState) -> AgentState:
    print("执行步骤3")
    return {
        "messages": ["步骤3完成"],
        "current_step": 3
    }

# 构建图
graph = StateGraph(AgentState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_node("step3", step3)
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)

# 创建SQLite检查点存储器
memory = SqliteSaver.from_conn_string(":memory:")

# 编译图,启用检查点
app = graph.compile(checkpointer=memory)

# 执行图,带重试逻辑
def run_with_recovery():
    config = {"configurable": {"thread_id": "1"}}
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            # 尝试执行
            result = app.invoke({"messages": [], "current_step": 0}, config)
            print("工作流执行完成:", result)
            return result
        except Exception as e:
            retry_count += 1
            print(f"执行失败 (尝试 {retry_count}/{max_retries}): {e}")
            
            if retry_count >= max_retries:
                print("已达到最大重试次数,放弃执行")
                raise
            
            # 等待一段时间后重试
            import time
            wait_time = 2 ** retry_count  # 指数退避
            print(f"等待 {wait_time} 秒后重试...")
            time.sleep(wait_time)

# 运行
run_with_recovery()

这个例子展示了如何使用 LangGraph 的内置检查点机制。当工作流执行失败时,我们可以从最近的检查点恢复,而不是重新开始整个工作流。

3.3.2 自定义检查点策略

虽然 LangGraph 提供了内置的检查点机制,但在某些情况下,我们可能需要更精细的控制。例如,我们可能希望只在关键节点执行后保存检查点,或者使用不同的存储后端。

让我们看看如何实现一个自定义的检查点策略:

import json
import os
from typing import Any, Dict, Optional
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointAt, CheckpointTuple

class FileCheckpointSaver(BaseCheckpointSaver):
    """基于文件系统的检查点存储器"""
    
    def __init__(self, directory: str = "./checkpoints"):
        self.directory = directory
        os.makedirs(directory, exist_ok=True)
    
    def _get_file_path(self, thread_id: str, checkpoint_ns: str = "") -> str:
        """获取检查点文件路径"""
        if checkpoint_ns:
            return os.path.join(self.directory, f"{thread_id}_{checkpoint_ns}.json")
        return os.path.join(self.directory, f"{thread_id}.json")
    
    def get_tuple(self, config: Dict[str, Any]) -> Optional[CheckpointTuple]:
        """获取检查点"""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        file_path = self._get_file_path(thread_id, checkpoint_ns)
        
        if not os.path.exists(file_path):
            return None
        
        with open(file_path, "r") as f:
            data = json.load(f)
        
        return CheckpointTuple(
            config=config,
            checkpoint=Checkpoint(**data["checkpoint"]),
            metadata=data.get("metadata", {}),
            parent_config=data.get("parent_config")
        )
    
    def put(self, config: Dict[str, Any], checkpoint: Checkpoint, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """保存检查点"""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        file_path = self._get_file_path(thread_id, checkpoint_ns)
        
        data = {
            "checkpoint": checkpoint.dict(),
            "metadata": metadata,
            "parent_config": config
        }
        
        with open(file_path, "w") as f:
            json.dump(data, f)
        
        return config
    
    def list(self, config: Dict[str, Any]) -> Any:
        """列出检查点"""
        # 简化实现,实际应用中可能需要更复杂的逻辑
        thread_id = config["configurable"]["thread_id"]
        for filename in os.listdir(self.directory):
            if filename.startswith(thread_id):
                with open(os.path.join(self.directory, filename), "r") as f:
                    data = json.load(f)
                yield CheckpointTuple(
                    config=config,
                    checkpoint=Checkpoint(**data["checkpoint"]),
                    metadata=data.get("metadata", {}),
                    parent_config=data.get("parent_config")
                )

# 自定义检查点策略
from langgraph.checkpoint.base import CheckpointAt

class CustomCheckpointAt(CheckpointAt):
    """自定义检查点策略:只在特定节点后保存检查点"""
    
    def __init__(self, nodes_to_checkpoint: list):
        self.nodes_to_checkpoint = nodes_to_checkpoint
    
    def __call__(self, step: dict) -> bool:
        """决定是否在当前步骤后保存检查点"""
        # 检查是否是我们想要保存检查点的节点
        if "nodes" in step:
            for node_id in step["nodes"]:
                if node_id in self.nodes_to_checkpoint:
                    print(f"在节点 {node_id} 后保存检查点")
                    return True
        return False

# 使用自定义检查点
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]
    current_step: int

# 创建自定义检查点存储器和策略
checkpoint_saver = FileCheckpointSaver()
checkpoint_at = CustomCheckpointAt(nodes_to_checkpoint=["step2"])

# 定义节点
def step1(state: AgentState) -> AgentState:
    print("执行步骤1")
    return {
        "messages": ["步骤1完成"],
        "current_step": 1
    }

def step2(state: AgentState) -> AgentState:
    print("执行步骤2")
    return {
        "messages": ["步骤2完成"],
        "current_step": 2
    }

def step3(state: AgentState) -> AgentState:
    print("执行步骤3")
    return {
        "messages": ["步骤3完成"],
        "current_step": 3
    }

# 构建图
graph = StateGraph(AgentState)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_node("step3", step3)
graph.set_entry_point("step1")
graph.add_edge("step1", "step2")
graph.add_edge("step2", "step3")
graph.add_edge("step3", END)

# 编译图,使用自定义检查点
app = graph.compile(
    checkpointer=checkpoint_saver,
    checkpointer_kwargs={"checkpoint_at": checkpoint_at}
)

# 执行图
config = {"configurable": {"thread_id": "custom_checkpoint_example"}}
result = app.invoke({"messages": [], "current_step": 0}, config)
print("工作流执行完成:", result)

这个例子展示了如何实现自定义的检查点存储器和检查点策略。在实际应用中,我们可以根据需求选择合适的存储后端(如 Redis、数据库等)和检查点策略。

3.4 重试与故障转移策略

当故障发生时,我们需要决定如何处理。是重试失败的操作?还是故障转移到备用节点?让我们探讨一些常见的策略。

3.4.1 重试策略

重试是最简单直接的故障处理方式,但需要谨慎使用,避免造成"重试风暴"。常见的重试策略包括:

  1. 固定间隔重试:每次重试之间等待固定的时间。
  2. 指数退避重试:每次重试的等待时间指数增长,如 1s, 2s, 4s, 8s…
  3. 带抖动的指数退避:在指数退避的基础上添加随机抖动,避免多个请求同时重试。

让我们看看如何在 LangGraph 节点中实现这些重试策略:

import asyncio
import random
from functools import wraps
from typing import Callable, Any, Tuple, Type

class RetryPolicy:
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retry_exceptions: Tuple[Type[Exception], ...] = (Exception,)
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retry_exceptions = retry_exceptions
    
    def _calculate_delay(self, attempt: int) -> float:
        """计算重试延迟"""
        delay = self.base_delay * (self.exponential_base ** attempt)
        delay = min(delay, self.max_delay)
        
        if self.jitter:
            # 添加抖动,范围为延迟的±20%
            jitter_amount = delay * 0.2
            delay += random.uniform(-jitter_amount, jitter_amount)
            delay = max(0, delay)
        
        return delay
    
    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        async def async_wrapper(*args, **kwargs) -> Any:
            last_exception = None
            
            for attempt in range(self.max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except self.retry_exceptions as e:
                    last_exception = e
                    
                    if attempt == self.max_retries:
                        print(f"已达到最大重试次数 ({self.max_retries}),放弃重试")
                        raise
                    
                    delay = self._calculate_delay(attempt)
                    print(f"重试 {attempt + 1}/{self.max_retries},等待 {delay:.2f} 秒...")
                    await asyncio.sleep(delay)
            
            raise last_exception
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs) -> Any:
            last_exception = None
            
            for attempt in range(self.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except self.retry_exceptions as e:
                    last_exception = e
                    
                    if attempt == self.max_retries:
                        print(f"已达到最大重试次数 ({self.max_retries}),放弃重试")
                        raise
                    
                    delay = self._calculate_delay(attempt)
                    print(f"重试 {attempt + 1}/{self.max_retries},等待 {delay:.2f} 秒...")
                    import time
                    time.sleep(delay)
            
            raise last_exception
        
        import inspect
        if inspect.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper

# 使用示例
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]

# 创建重试策略
retry_policy = RetryPolicy(
    max_retries=3,
    base_delay=1.0,
    max_delay=10.0,
    exponential_base=2.0,
    jitter=True
)

@retry_policy
async def unreliable_node(state: AgentState) -> AgentState:
    """模拟一个不稳定的节点"""
    import random
    if random.random() < 0.7:  # 70%的概率失败
        print("节点执行失败")
        raise Exception("临时错误")
    print("节点执行成功")
    return {"messages": ["处理成功"]}

# 构建图
graph = StateGraph(AgentState)
graph.add_node("unreliable_node", unreliable_node)
graph.set_entry_point("unreliable_node")
graph.add_edge("unreliable_node", END)

app = graph.compile()

# 执行图
async def main():
    result = await app.ainvoke({"messages": []})
    print("最终结果:", result)

asyncio.run(main())

这个例子实现了一个灵活的重试策略装饰器,可以应用于同步或异步函数。它支持指数退避和抖动,可以有效避免重试风暴。

3.4.2 故障转移策略

当重试不能解决问题时,我们可能需要考虑故障转移(Failover)。故障转移是指当主节点发生故障时,自动切换到备用节点的过程。

在 LangGraph 中,我们可以通过多种方式实现故障转移:

  1. 备用节点:为每个关键节点准备一个备用实现,当主节点失败时,自动切换到备用节点。
  2. 多路复用:同时调用多个实现,使用第一个成功的结果。
  3. 分布式部署:将 LangGraph 应用部署到多个实例,通过负载均衡器分配请求。

让我们看看如何实现备用节点的故障转移:

from typing import Callable, List, Any
from functools import wraps
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    messages: Annotated[list, add]

class FailoverHandler:
    def __init__(self, primaries: List[Callable], fallbacks: List[Callable]):
        self.primaries = primaries
        self.fallbacks = fallbacks
    
    def _try_execute(self, funcs: List[Callable], state: AgentState) -> Any:
        """尝试执行一系列函数,返回第一个成功的结果"""
        last_exception = None
        
        for func in funcs:
            try:
                print(f"尝试执行: {func.__name__}")
                return func(state)
            except Exception as e:
                last_exception = e
                print(f"{func.__name__} 执行失败: {e}")
        
        raise last_exception or Exception("所有函数都执行失败")
    
    def __call__(self, state: AgentState) -> AgentState:
        try:
            # 首先尝试主节点
            return self._try_execute(self.primaries, state)
        except Exception as e:
            print(f"所有主节点都失败,尝试备用节点: {e}")
            # 主节点都失败,尝试备用节点
            return self._try_execute(self.fallbacks, state)

# 定义一些模拟节点
def primary_llm_call(state: AgentState) -> AgentState:
    """主LLM调用节点"""
    import random
    if random.random() < 0.8:  # 80%的概率失败
        raise Exception("主LLM API调用失败")
    return {"messages": ["主LLM处理结果"]}

def secondary_llm_call(state: AgentState) -> AgentState:
    """备用LLM调用节点"""
    import random
    if random.random() < 0.5:  # 50%的概率失败
        raise Exception("备用LLM API调用失败")
    return {"messages": ["备用LLM处理结果"]}

def simple_fallback(state: AgentState) -> AgentState:
    """简单降级方案"""
    return {"messages": ["抱歉,服务暂时不可用,请使用预定义回复。"]}

# 创建故障转移处理器
failover_handler = FailoverHandler(
    primaries=[primary_llm_call],
    fallbacks=[secondary_llm_call, simple_fallback]
)

# 构建图
graph = StateGraph(AgentState)
graph.add_node("llm_node", failover_handler)
graph.set_entry_point("llm_node")
graph.add_edge("llm_node", END)

app = graph.compile()

# 执行图多次,观察故障转移效果
for i in range(5):
    print(f"\n--- 执行第 {i+1} 次 ---")
    try:
        result = app.invoke({"messages": []})
        print("结果:", result)
    except Exception as e:
        print("错误:", e)

这个例子实现了一个简单的故障转移机制。当主节点失败时,系统会自动尝试备用节点,最后还有一个简单的降级方案作为最后的保障。

4. 实际应用

4.1 案例研究:构建高可用的客服助手

让我们通过一个实际案例来展示如何将前面介绍的技术整合起来,构建一个高可用的 LangGraph 应用。我们将创建一个客服助手系统,它能够处理用户的各种查询,并在面对各种故障时保持稳定运行。

4.1.1 系统架构设计

首先,让我们设计系统的整体架构:

用户请求

负载均衡器

LangGraph应用实例1

LangGraph应用实例2

LangGraph应用实例3

状态存储Redis

监控与告警系统

主LLM API

备用LLM API

客户数据库

这个架构包含以下关键组件:

  1. 负载均衡器:分发用户请求
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐