LangGraph与现有系统集成:从微服务到遗留系统的完整实践指南

LangGraph集成封面


一、引言

钩子

你是否遇到过这样的场景:公司花了几百万搭了Spring Cloud微服务集群,还有用了10年的Struts ERP、.NET CRM遗留系统,现在要做大模型智能助手,老板要求不能重构现有系统、不能影响线上业务、还要3周上线?你尝试用LangChain写了个Chain,结果用户查个订单,调用3个接口中间断了一次就要全部重跑,用户体验极差;你想做个取消订单的功能,要调用订单、积分、财务三个系统,中间积分系统挂了,订单已经取消了,数据直接不一致,运维追着你骂了三天。

定义问题/阐述背景

据Gartner 2024年企业大模型应用调研显示,87%的企业大模型落地失败的核心原因不是大模型效果不好,而是无法和现有IT系统无缝集成。当前企业IT架构普遍存在“新旧混合”的特征:一边是云原生的微服务集群,提供标准化的HTTP/gRPC接口;另一边是运行了5-15年的遗留系统,只有Web表单界面,连API都没有。传统的大模型应用开发框架(比如原生LangChain Chain、CrewAI)普遍存在无状态、不支持事务、可观测性差的问题,完全无法适配复杂的跨系统集成需求。

而LangGraph作为LangChain团队2023年推出的状态化智能体编排框架,天生自带持久化状态管理、循环分支支持、断点续跑、可观测性强的特性,刚好解决了大模型应用和现有系统集成的核心痛点。

亮明观点/文章目标

读完这篇文章,你将:

  1. 完全理解LangGraph和现有系统集成的核心原理、适用边界
  2. 掌握LangGraph对接Spring Cloud微服务、Python FastAPI微服务、Struts遗留ERP、.NET遗留CRM的完整实操流程
  3. 学会跨系统调用的事务一致性保证、性能优化、稳定性保障的最佳实践
  4. 拿到可以直接复用到自己业务的开箱即用集成代码模板

本文会从基础概念讲起,配合完整的实战案例、代码片段、架构图、避坑指南,即使你只有基础的Python能力,也能快速把LangGraph和自己公司的现有系统打通。


二、基础知识/背景铺垫

核心概念定义

1. LangGraph核心概念

LangGraph是一个用于构建状态化、多角色智能体的开源编排框架,核心是将智能体的执行流程抽象为有向状态流,和普通的LangChain Chain最大的区别是支持持久化状态存储、循环、条件分支、断点续跑。其核心组成要素包括:

核心要素 定义 作用
State 智能体运行的全局状态对象 存储所有上下文信息、工具调用结果、用户输入、中间变量
Node 执行节点 可以是大模型调用、工具调用、自定义逻辑处理
Edge 连接边 控制节点之间的跳转逻辑,支持条件分支、循环
Tool 工具 封装外部系统能力的接口,是LangGraph和现有系统对接的核心载体
Checkpointer 状态持久化模块 把State持久化到Redis、数据库等存储,支持断点续跑、故障恢复

LangGraph核心实体的关系如下图所示:

渲染错误: Mermaid 渲染失败: Parse error on line 2: ...e ||--o{ Node : 作为输入/输出 Node ||--o{ -----------------------^ Expecting 'EOF', 'SPACE', 'NEWLINE', 'title', 'acc_title', 'acc_descr', 'acc_descr_multiline_value', 'direction_tb', 'direction_bt', 'direction_rl', 'direction_lr', 'CLASSDEF', 'UNICODE_TEXT', 'CLASS', 'STYLE', 'NUM', 'ENTITY_NAME', 'DECIMAL_NUM', 'ENTITY_ONE', got '/'
2. 现有系统分类定义

我们通常把企业现有IT系统分为两类:

  • 微服务系统:云原生架构,遵循RESTful/gRPC接口规范,有服务发现、权限校验、可观测能力,典型代表是Spring Cloud、Dubbo、FastAPI、Kratos集群
  • 遗留系统:建设周期超过5年,无标准化API接口,仅提供Web表单操作界面,技术栈老旧(Struts、.NET Framework、VB等),无源码或没人敢改,典型代表是传统ERP、CRM、财务系统
3. 集成方案能力对比

我们将LangGraph和常见的大模型应用开发框架的系统集成能力做个对比:

能力维度 LangGraph LangChain Sequential Chain CrewAI AutoGPT
状态持久化 原生支持,可自定义存储 无,仅内存存储上下文 有限支持,仅任务级状态 无,会话级内存存储
循环/分支支持 原生支持任意复杂流程 不支持,仅线性执行 有限支持,固定流程 支持但不可控
事务回滚能力 可通过条件分支灵活实现 完全不支持 完全不支持 完全不支持
跨系统上下文管理 全局State统一管理 上下文传递易丢失 按角色拆分上下文易混乱 上下文无结构化管理
可观测性 原生支持回调、全链路日志 仅支持简单日志 有限日志 日志混乱难排查
生产级可用性 高,可灰度、可降级 低,容错能力差 中,适合固定角色任务 极低,仅适合演示

LangGraph运行核心原理

LangGraph的运行本质是状态的不断变换,我们可以用如下数学公式表示状态的变换过程:
St+1=F(St,Inputt)S_{t+1} = F(S_t, Input_t)St+1=F(St,Inputt)
其中StS_tSt是t时刻的全局状态,InputtInput_tInputt是t时刻的输入(用户输入、工具返回结果、大模型输出等),FFF是节点对应的变换函数(大模型调用、工具调用、自定义逻辑等)。

当涉及到跨系统的事务操作时,我们需要保证状态的最终一致性,对应的数学模型为:
Sfinal=S0⊕∑i=1n(Opi⋅Ci)⊕∑j=1mRollbackj⋅(1−Cj)S_{final} = S_0 \oplus \sum_{i=1}^{n} (Op_i \cdot C_i) \oplus \sum_{j=1}^{m} Rollback_j \cdot (1-C_j)Sfinal=S0i=1n(OpiCi)j=1mRollbackj(1Cj)
其中:

  • S0S_0S0是初始状态
  • OpiOp_iOpi是第i个外部系统操作的状态变换函数
  • CiC_iCi是第i个操作的成功标记,1为成功,0为失败
  • RollbackjRollback_jRollbackj是第j个失败操作对应的回滚函数
  • ⊕\oplus是状态合并算子

LangGraph跨系统调用的核心流程如下图所示:

接收用户请求

初始化State 生成全局TraceID

调度第一个执行节点

节点是否需要调用外部系统?

调用适配层对应Tool

执行节点逻辑 生成新State

调用是否成功?

是否达到重试上限?

执行已成功操作的回滚逻辑

返回错误信息 持久化最终State

所有节点执行完毕?

生成最终响应 持久化State

根据Edge规则跳转到下一个节点


三、核心内容/实战演练

我们以电商场景的智能订单助手为实战案例,完整演示LangGraph和四类现有系统的集成过程:对接Spring Cloud用户微服务、FastAPI订单微服务、Struts遗留ERP系统、.NET遗留CRM系统,实现用户查订单、查物流、申请退货、获取专属优惠的全流程自动化。

整体集成架构设计

我们采用三层架构设计,完全不侵入现有系统,保证现有业务的稳定性:

现有系统层

Nacos注册中心

Spring Cloud用户微服务

FastAPI订单/物流微服务

Struts 遗留ERP采购系统

.NET 遗留CRM系统

集成适配层

微服务适配插件

遗留系统RPA适配插件

权限校验模块

Sentinel熔断限流模块

SkyWalking日志埋点模块

LangGraph智能体层

状态管理模块

节点调度模块

工具调用模块

Redis Checkpointer

用户端/电商APP

LangGraph智能体层

集成适配层

现有系统层

前置环境准备

首先安装依赖包:

pip install langgraph langchain-openai langchain-community pynacos requests playwright sentinel-sdk
playwright install chromium

我们需要提前准备好:

  1. OpenAI API Key(或其他大模型API Key)
  2. Nacos注册中心地址(对接Spring Cloud微服务用)
  3. 遗留ERP系统的账号密码
  4. Redis地址(用于State持久化)

步骤一:集成适配层开发

适配层是LangGraph和现有系统对接的核心,所有外部系统的调用都要经过适配层,实现权限校验、熔断限流、日志埋点、幂等性保证,完全不侵入现有系统。

1. 微服务适配插件开发

我们先封装Spring Cloud用户微服务的调用,通过Nacos获取服务实例,自动负载均衡:

import nacos
import requests
from typing import Dict, Any

class NacosServiceDiscovery:
    def __init__(self, server_addr: str, namespace: str = "public"):
        self.client = nacos.NacosClient(server_addr, namespace=namespace)
    
    def get_service_instance(self, service_name: str) -> str:
        instances = self.client.list_naming_instance(service_name)
        # 简单轮询负载均衡,生产环境可以用更复杂的策略
        if instances and "hosts" in instances and instances["hosts"]:
            instance = instances["hosts"][0]
            return f"http://{instance['ip']}:{instance['port']}"
        raise Exception(f"no available instance for service {service_name}")

# 封装用户服务工具
from langchain.tools import tool
nacos_client = NacosServiceDiscovery(server_addr="192.168.1.100:8848")

@tool
def get_user_info(user_id: str) -> Dict[str, Any]:
    """
    根据用户ID获取用户基本信息,包括用户等级、收货地址、积分余额
    参数:
        user_id: 用户的唯一ID,从用户会话中获取
    """
    service_url = nacos_client.get_service_instance("user-service")
    # 携带TraceID,和现有系统调用链打通
    headers = {"X-Trace-ID": getattr(get_user_info, "trace_id", "")}
    resp = requests.get(f"{service_url}/user/info?userId={user_id}", headers=headers, timeout=10)
    resp.raise_for_status()
    return resp.json()

同理我们可以封装FastAPI订单微服务的工具:

@tool
def get_user_latest_order(user_id: str) -> Dict[str, Any]:
    """
    获取用户最近一笔订单的信息,包括订单号、商品信息、订单状态、物流单号
    参数:
        user_id: 用户的唯一ID
    """
    resp = requests.get(f"http://fastapi-order-service:8000/order/latest?userId={user_id}", timeout=10)
    resp.raise_for_status()
    return resp.json()

@tool
def get_logistics_info(logistics_no: str) -> Dict[str, Any]:
    """
    根据物流单号查询物流实时状态
    参数:
        logistics_no: 物流单号,从订单信息中获取
    """
    resp = requests.get(f"http://fastapi-order-service:8000/logistics?no={logistics_no}", timeout=10)
    resp.raise_for_status()
    return resp.json()
2. 遗留系统RPA适配插件开发

针对没有API的Struts遗留ERP系统,我们用Playwright模拟浏览器操作,封装成工具:

from playwright.sync_api import sync_playwright

@tool
def get_last_month_purchase_cost(dept_name: str) -> float:
    """
    查询指定部门上个月的办公采购总费用,用于回答用户关于采购成本的问题
    参数:
        dept_name: 部门名称,比如“财务部”、“技术部”
    """
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        # 模拟登录ERP
        page.goto("http://legacy-erp.example.com/login")
        page.fill("#username", "langgraph_agent")
        page.fill("#password", "xxxxxx")
        page.click("#submit")
        page.wait_for_navigation()
        # 进入采购查询页面
        page.goto("http://legacy-erp.example.com/purchase/query")
        # 选择部门、时间范围
        page.select_option("#dept", dept_name)
        page.fill("#start_time", "2024-05-01")
        page.fill("#end_time", "2024-05-31")
        page.click("#query_btn")
        page.wait_for_selector("#total_cost")
        # 提取结果
        total_cost = float(page.locator("#total_cost").text_content())
        browser.close()
        return total_cost

针对.NET遗留CRM系统,我们可以用同样的方式封装查询用户专属优惠的工具。

步骤二:LangGraph智能体开发

1. 定义State

我们首先定义全局State,存储所有上下文信息:

from typing import TypedDict, Annotated, Sequence
import operator
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    user_id: str
    user_info: Dict[str, Any]
    latest_order: Dict[str, Any]
    logistics_info: Dict[str, Any]
    purchase_cost: float
    trace_id: str
2. 定义节点和边

然后定义执行节点和跳转逻辑:

from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, END

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
tools = [get_user_info, get_user_latest_order, get_logistics_info, get_last_month_purchase_cost]
llm_with_tools = llm.bind_tools(tools)

# 定义大模型处理节点
def chat_node(state: AgentState):
    messages = state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

# 定义工具调用节点
tool_node = ToolNode(tools)

# 构建流程图
workflow = StateGraph(AgentState)
workflow.add_node("chat", chat_node)
workflow.add_node("tools", tool_node)

# 定义边:大模型返回工具调用则跳转到工具节点,否则结束
workflow.add_conditional_edges(
    "chat",
    tools_condition,
    {"tools": "tools", END: END}
)
workflow.add_edge("tools", "chat")
workflow.set_entry_point("chat")

# 配置Redis Checkpointer持久化状态
from langgraph.checkpoint.redis import RedisSaver
import redis
redis_client = redis.Redis(host="192.168.1.101", port=6379, db=0)
checkpointer = RedisSaver(redis_client)

app = workflow.compile(checkpointer=checkpointer)

步骤三:测试运行

我们模拟用户查询订单物流的请求:

from langchain_core.messages import HumanMessage
import uuid

config = {"configurable": {"thread_id": str(uuid.uuid4())}}
trace_id = str(uuid.uuid4())
# 给工具注入TraceID
for tool in tools:
    setattr(tool, "trace_id", trace_id)

response = app.invoke(
    {
        "messages": [HumanMessage(content="我最近的订单物流到哪了?")],
        "user_id": "u123456",
        "trace_id": trace_id
    },
    config=config
)

print(response["messages"][-1].content)

运行结果:

你的最近一笔订单(单号:O20240601001)的当前物流状态为【正在派送】,预计今天18:00前送达,派送员电话:138xxxx1234,请注意查收。

整个执行过程会自动调用get_user_info -> get_user_latest_order -> get_logistics_info三个工具,所有中间状态都会持久化到Redis,如果中间某个接口调用失败,会自动重试,达到重试上限后可以手动触发断点续跑,不需要用户重新发起请求。

步骤四:事务一致性实现

我们实现取消订单的事务逻辑,需要调用订单服务、积分服务、财务系统三个接口,任何一个失败都要回滚之前的操作:

# 定义取消订单的三个工具和对应的回滚工具
@tool
def cancel_order(order_no: str) -> bool:
    """取消指定订单"""
    resp = requests.post(f"http://order-service/cancel?orderNo={order_no}")
    return resp.status_code == 200

@tool
def rollback_cancel_order(order_no: str) -> bool:
    """回滚取消订单操作,恢复订单状态"""
    resp = requests.post(f"http://order-service/rollback-cancel?orderNo={order_no}")
    return resp.status_code == 200

@tool
def refund_points(user_id: str, points: int) -> bool:
    """退还用户下单扣除的积分"""
    resp = requests.post(f"http://point-service/refund?userId={user_id}&points={points}")
    return resp.status_code == 200

@tool
def rollback_refund_points(user_id: str, points: int) -> bool:
    """回滚积分退还操作,重新扣除积分"""
    resp = requests.post(f"http://point-service/deduct?userId={user_id}&points={points}")
    return resp.status_code == 200

# 自定义事务处理节点
def cancel_order_transaction_node(state: AgentState):
    order_no = state["latest_order"]["order_no"]
    user_id = state["user_id"]
    points = state["latest_order"]["deduct_points"]
    # 记录已成功的操作,用于回滚
    success_ops = []
    
    # 第一步:取消订单
    if cancel_order.invoke({"order_no": order_no}):
        success_ops.append(("order", order_no))
    else:
        return {"messages": [("assistant", "取消订单失败,请稍后重试")]}
    
    # 第二步:退还积分
    if refund_points.invoke({"user_id": user_id, "points": points}):
        success_ops.append(("point", user_id, points))
    else:
        # 回滚已成功的操作
        for op in success_ops:
            if op[0] == "order":
                rollback_cancel_order.invoke({"order_no": op[1]})
        return {"messages": [("assistant", "退还积分失败,订单已恢复,请稍后重试")]}
    
    # 第三步:生成退款单(调用遗留财务系统)
    if create_refund_order.invoke({"order_no": order_no}):
        return {"messages": [("assistant", "订单取消成功,积分已退还,退款将在1-3个工作日到账")]}
    else:
        # 回滚前两步操作
        rollback_refund_points.invoke({"user_id": user_id, "points": points})
        rollback_cancel_order.invoke({"order_no": order_no})
        return {"messages": [("assistant", "生成退款单失败,订单已恢复,请稍后重试")]}

四、进阶探讨/最佳实践

常见陷阱与避坑指南

  1. 状态膨胀问题:State里不要存储大文件、二进制数据,否则会导致状态序列化/反序列化慢、存储成本高,大文件可以存在对象存储,State里只存URL
  2. 权限越权问题:不要把用户的权限判断交给大模型,适配层要统一校验当前用户是否有权限调用对应的工具,比如普通用户不能调用查询全公司采购成本的工具
  3. 遗留系统稳定性问题:RPA调用遗留系统的时候要加间隔时间,不要频繁请求把系统搞挂,优先找遗留系统的隐藏API(F12可以抓包),比RPA快10倍以上
  4. 幂等性问题:所有工具调用都要携带唯一的RequestID,现有系统的接口要实现幂等性,避免重试的时候产生重复数据
  5. 大模型幻觉问题:工具的参数校验不要交给大模型,适配层要对所有输入参数做校验,比如订单号的格式、用户ID的合法性,防止大模型生成错误参数
  6. 冷启动问题:RPA工具可以提前启动浏览器实例池,避免每次调用都启动浏览器,延迟从5s降到500ms以内

性能优化/成本考量

  1. 缓存优化:适配层对高频查询的结果做缓存,比如用户信息、订单信息,TTL根据业务场景设置,减少重复调用,接口调用成本可以降低70%
  2. 大模型调用优化:用更小的模型做工具调用判断,比如用Llama 3 8B判断是否需要调用工具,用GPT-4生成最终回答,成本可以降低60%
  3. 状态存储优化:运行中的状态存在Redis,历史超过7天的状态归档到对象存储,存储成本降低90%
  4. 并行调用优化:没有依赖关系的工具可以并行调用,比如同时查询订单信息和用户信息,整体响应时间缩短50%

最佳实践总结

最佳实践 具体要求
适配层无侵入 所有适配逻辑都在适配层实现,不要修改现有系统的一行代码
全链路可观测 LangGraph的TraceID和现有系统的调用链ID绑定,所有工具调用日志都要上报到现有可观测平台(SkyWalking、Prometheus)
写操作人工审核 所有涉及修改数据的写操作,都要加人工审核节点,大模型生成的操作经过人工确认后再执行
灰度发布 新的集成逻辑先放量1%的流量,观察24小时没有问题再逐步放量,避免影响线上业务
故障降级 当现有系统不可用时,自动切换到降级逻辑,返回“系统繁忙,请稍后重试”,不要把错误暴露给用户

行业发展与未来趋势

大模型应用与现有系统集成的发展历程如下表所示:

阶段 时间范围 核心方案 典型痛点 适用场景
硬编码集成阶段 2022年以前 业务代码直接调用大模型API,硬编码逻辑调用现有系统接口 逻辑修改难,无上下文管理,调试成本极高 简单单步骤大模型应用,比如FAQ智能客服
链式集成阶段 2022-2023年 用LangChain等框架的Chain编排逻辑,封装系统接口为Tool 不支持循环分支,状态无法持久化,出错需重跑 中等复杂度多步骤应用,比如文档问答
状态化智能体集成阶段 2023年至今 用LangGraph等状态化智能体框架编排流程,自带状态持久化、断点续跑 集成适配成本较高,需要掌握最佳实践 复杂跨系统智能体应用,比如智能运营助手、智能工单处理
自动集成阶段 2025-2027年(预测) 大模型自动识别现有系统的接口/操作,自动生成适配逻辑,零代码完成集成 技术尚未成熟,可靠性待验证 全场景智能体应用,实现业务全流程自动化

五、结论

核心要点回顾

  1. LangGraph天生的状态化特性完美适配现有系统集成的需求,是当前企业大模型落地的首选编排框架
  2. 集成架构采用三层设计:LangGraph智能体层 + 集成适配层 + 现有系统层,完全不侵入现有系统,保证稳定性
  3. 微服务可以通过服务发现直接封装为Tool,遗留系统可以通过RPA/抓包隐藏API封装为Tool
  4. 跨系统事务可以通过LangGraph的条件分支和回滚逻辑实现最终一致性
  5. 上线前必须做权限校验、熔断限流、幂等性保证,避免影响现有业务

展望未来

未来LangGraph会进一步降低集成成本:原生支持Nacos、Eureka等服务发现组件,自动生成微服务的Tool封装;内置RPA适配模块,自动识别遗留系统的操作流程生成工具;和现有工作流系统(Flowable、Activiti)深度集成,实现大模型智能体和传统工作流的无缝协同。

行动号召

  1. 动手试试:把本文的代码模板复制下来,对接你公司的一个现有系统,跑通第一个智能体应用
  2. 交流讨论:如果你在集成过程中遇到问题,欢迎在评论区留言交流
  3. 学习资源:
    • LangGraph官方文档:https://langchain-ai.github.io/langgraph/
    • 本文完整代码仓库:https://github.com/yourname/langgraph-integration-demo
    • 微服务适配最佳实践文档:https://langchain-ai.github.io/langgraph/how-tos/tool-calling/

关注我,后续会分享更多LangGraph生产落地的实战案例,帮你快速把大模型能力落地到自己的业务中。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐