LangChain 的 LCEL:从"俄罗斯套娃"到"流水线魔法",一篇搞懂企业级 AI 工作流编排在这里插入图片描述

文章目录

一、痛点场景:那些年我们踩过的 Chain 坑

各位开发者朋友,你是否有过这样的经历?

场景一:多层嵌套的"代码套娃"

老板让你做一个智能文档处理系统,功能是这样的:用户上传一篇英文论文 -> 检索相关背景知识 -> 总结核心要点 -> 翻译成中文。

满怀信心打开代码,你开始写:

# 旧版 SequentialChain 实现
from langchain.chains import SequentialChain, LLMChain
from langchain.prompts import PromptTemplate

# 第一层:总结链
summarize_chain = LLMChain(
    llm=model,
    prompt=summarize_prompt,
    output_key="summary"
)

# 第二层:翻译链
translate_chain = LLMChain(
    llm=model,
    prompt=translate_prompt,
    output_key="translation"
)

# 第三层:组装
full_chain = SequentialChain(
    chains=[summarize_chain, translate_chain],
    input_variables=["document", "context"],
    output_variables=["summary", "translation"]
)

写完一看,代码像俄罗斯套娃一样,一层套一层。如果要加第四步、第五步呢?光是调试参数传递就让人头秃。

场景二:流式输出?不好意思,旧版不支持

产品经理跑过来:“用户反馈说等待时间太长了,能不能让回答一个字一个字蹦出来?”

你查了查文档,发现旧版的 LLMChain 根本不支持 stream 方法。要做流式输出?要么自己封装异步逻辑,要么等官方更新。一等就是几个月。

场景三:并行调用三个模型对比结果?同步循环等着吧

你做了一个"三模型投票"功能:同一个问题,同时问 GPT-4、Claude 和 Gemini,然后取最佳答案。

结果代码是这样的:

# 旧版实现:串行执行,耗时三倍
result_gpt = gpt_chain.invoke(question)  # 等待 5 秒
result_claude = claude_chain.invoke(question)  # 再等 5 秒
result_gemini = gemini_chain.invoke(question)  # 又等 5 秒
# 总耗时:15 秒

用户:我只是想问个问题,为什么要等这么久?

场景四:生产环境的"惊险时刻"

凌晨两点,你的 AI 客服系统突然挂了。排查发现是 OpenAI API 限流了。但系统没有任何降级策略,只能眼睁睁看着用户流失。

老板:为什么没有备用方案?
你:我写的是同步调用,没考虑这种情况…

旧版 Chain 的致命缺陷总结

缺陷 表现
灵活性差 配置繁琐,参数传递复杂
难以调试 中间结果不透明,错误定位困难
不支持流式 无法实现实时输出,用户体验差
异步支持弱 难以处理高并发场景
状态管理混乱 多步骤任务缺乏统一的上下文传递机制

这些问题,本质上是因为旧版架构是"命令式"的:你告诉程序"一步一步怎么做",而不是"描述最终想要什么"。

好消息是:LangChain 官方已经推出了新一代核心架构——LCEL。


二、痛点解决方案:LCEL 为何而来

LCEL(LangChain Expression Language)的诞生,就是为了解决上述所有问题。

核心理念:声明式编程

想象一下,如果你去奶茶店点单:

  • 命令式(旧版):“请拿一个空杯子,把茶叶放进去,倒热水,等三分钟,拿出茶包,倒入牛奶,搅拌”
  • 声明式(LCEL):“一杯港式奶茶”

LCEL 就是后者——你描述"想要什么结果",LangChain 负责"怎么实现"。

核心语法:管道操作符 |

LCEL 借鉴了 Unix 的管道哲学,用 | 将独立的 Runnable 组件串联起来:

chain = prompt | model | parser

这行代码的含义是:数据从左向右流动,prompt 处理输入 -> model 生成响应 -> parser 解析结果。

LCEL 的三大承诺

  1. 流式支持:天然支持,无需额外适配
  2. 异步支持:同一份代码,同时支持同步/异步调用
  3. 并行执行:RunnableParallel 让多个任务同时运行

用 LCEL 重写"三模型投票":

from langchain_core.runnables import RunnableParallel

# LCEL 实现:并行执行,耗时约等于最慢的一个
parallel_chain = RunnableParallel(
    gpt=full_gpt_chain,
    claude=full_claude_chain,
    gemini=full_gemini_chain
)

result = parallel_chain.invoke(question)  # 约 5 秒搞定

这就是 LCEL 的魔力——让你从"怎么写"中解放出来,专注于"要什么"。


三、是什么:LCEL 极简概念与原理

定义:LCEL 是一种声明式的链式编写方式

LangChain Expression Language(LCEL)采用声明式方法构建新的 Runnables。这意味着你描述的是"应该发生什么",而不是"如何发生"。LangChain 负责优化链的运行时执行。

核心公式

chain = prompt | model | parser

这行代码创建一个 Runnable(链),它包含:

  • 输入:字典(如 {"topic": "人工智能"}
  • 经过 Prompt 模板格式化
  • 发送给模型
  • 模型输出经过 Parser 解析
  • 返回最终结果

Runnable 协议:LCEL 的基石

Runnable 是 LangChain 的核心协议,所有组件(Prompt、Model、Parser、Retriever 等)都实现了这个接口。它定义了四个标准方法:

方法 作用 返回类型
invoke 单次调用 同步返回结果
stream 流式输出 生成器(Generator)
batch 批量处理 列表
ainvoke/abatch 异步版本 Awaitable

四大核心 Runnable 原语

1. RunnablePassthrough(透传数据)

用于保持输入不变,或在管道中传递中间变量。

from langchain_core.runnables import RunnablePassthrough

# 场景:RAG 管道中,question 需要原封不动传递
chain = (
    RunnableParallel(
        context=retriever,           # 检索相关文档
        question=RunnablePassthrough() # 原封不动传递原始问题
    )
    | prompt
    | model
)

大白话:RunnablePassthrough 像一个"透明人",它站在流水线中间,不改变任何东西,只是让原材料继续往前走。

2. RunnableParallel(并行执行)

让多个 Runnable 同时运行,显著降低总耗时。

from langchain_core.runnables import RunnableParallel

# 同时调用三个模型
parallel_chain = RunnableParallel(
    gpt=full_gpt_chain,
    claude=full_claude_chain,
    gemini=full_gemini_chain
)

result = parallel_chain.invoke(question)
# result = {"gpt": "...", "claude": "...", "gemini": "..."}

大白话:RunnableParallel 像一个"三头六臂"的孙悟空,同时处理多件事,效率翻倍。

3. RunnableLambda(自定义函数包装)

将普通 Python 函数转换为 Runnable 组件。

from langchain_core.runnables import RunnableLambda

# 自定义函数:清洗文本
def clean_text(text: str) -> str:
    return text.strip().replace("\n\n", "\n")

# 包装成 Runnable
clean_node = RunnableLambda(clean_text)

# 接入管道
chain = prompt | model | clean_node | parser

大白话:RunnableLambda 像一个"万能转换器",无论你有什么特殊需求(格式转换、数据清洗),都可以用它包装后塞进流水线。

4. RunnableBranch(条件分支)

根据条件选择不同的处理路径。

from langchain_core.runnables import RunnableBranch

branch_chain = RunnableBranch(
    (lambda x: x["language"] == "中文", chinese_chain),
    (lambda x: x["language"] == "English", english_chain),
    default_chain  # 默认路径
)

大白话:RunnableBranch 像流水线上的"分岔路口",根据产品类型自动送到不同的包装车间。

生活案例:LCEL 像自动化流水线

想象一个薯片工厂的自动化流水线:

原材料(土豆)→ 清洗 → 切片 → 油炸 → 调味 → 包装 → 成品
  • 每个车间就是一个 Runnable
  • 传送带就是 | 管道操作符
  • LCEL 链就是整个自动化流水线

你只需要说"我要薯片",工厂自动完成所有步骤。


四、为什么用:LCEL 核心优势对比

对比旧版 SequentialChain

特性 旧版 SequentialChain LCEL
组合方式 构造函数嵌套 管道操作符 `
可读性 步骤多了变得臃肿 保持线性和简洁
中间调试 难以拦截中间步骤 简单支持 RunnableLambda
并行计算 需手动处理多线程 原生支持 RunnableParallel
流式输出 不支持 原生支持
异步支持 较弱 完整支持

LCEL 五大核心特性

1. 流式支持(Streaming)

旧版 Chain 需要额外配置才能实现流式输出,LCEL 原生支持:

# 流式输出,逐字显示
for chunk in chain.stream({"topic": "量子计算"}):
    print(chunk.content, end="", flush=True)

用户看到的是"打字机效果",等待时间感知大幅降低。

2. 异步支持(Async)

同一份代码,支持同步和异步两种调用方式:

# 同步调用
result = chain.invoke({"topic": "AI"})

# 异步调用(同一份代码)
result = await chain.ainvoke({"topic": "AI"})

# 异步批量
results = await chain.abatch([{"topic": "AI"}, {"topic": "ML"}])
3. 并行执行(Parallelism)

RunnableParallel 让独立任务同时执行:

# 原来:串行执行,总耗时 = sum(各任务耗时)
result1 = task1.invoke(input)  # 5秒
result2 = task2.invoke(input)  # 5秒
result3 = task3.invoke(input)  # 5秒
# 总耗时:15秒

# 现在:并行执行,总耗时 = max(各任务耗时)
parallel_chain = RunnableParallel(task1=task1, task2=task2, task3=task3)
result = parallel_chain.invoke(input)
# 总耗时:5秒
4. 自动重试与回退(Fallbacks)

生产环境的必备保障:

from langchain_core.runnables import RunnableLambda

# 配置降级策略
primary_model = ChatOpenAI(model="gpt-4")
fallback_model = ChatOpenAI(model="gpt-3.5-turbo")

chain = (
    prompt
    | primary_model.with_fallbacks([fallback_model])
    | parser
)

# 当 GPT-4 不可用时,自动切换到 GPT-3.5
5. 类型检查(Schema Validation)

运行前就能知道输入输出格式:

chain = prompt | model | parser

# 查看输入输出 Schema
print(chain.input_schema.model_json_schema())
print(chain.output_schema.model_json_schema())

企业级价值:从 Demo 到生产的桥梁

2026 年,LangChain 0.3.0 发布,LCEL v2 带来 40% 执行速度提升。LCEL 已成为 LangChain 官方推荐的构建生产级应用的标准方式。


五、怎么用:保姆级基础教学

环境搭建

pip install langchain langchain-openai langchain-community
pip install faiss-cpu  # 向量数据库
pip install python-dotenv  # 环境变量管理
# .env 文件
OPENAI_API_KEY=sk-your-key-here

核心三件套:Prompt + Model + Parser

1. Prompt 模板
from langchain_core.prompts import ChatPromptTemplate

# 简单占位符
prompt = ChatPromptTemplate.from_template(
    "请用中文简短介绍一下{topic}:"
)

# 支持多变量
complex_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的{domain}顾问。"),
    ("human", "请回答用户的问题:{question}")
])
2. Model 模型
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="gpt-4",
    temperature=0.7,
    streaming=True  # 启用流式
)
3. Output Parser
from langchain_core.output_parsers import StrOutputParser

parser = StrOutputParser()

完整示例:从"Hello World"到流式输出

Step 1: 组装基础链
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 初始化模型
model = ChatOpenAI(model="gpt-4", temperature=0)

# 创建 Prompt
prompt = ChatPromptTemplate.from_template(
    "用一句话解释{topic}的核心概念:"
)

# 组装链:prompt | model | parser
simple_chain = prompt | model | StrOutputParser()

# 调用
result = simple_chain.invoke({"topic": "量子纠缠"})
print(result)
Step 2: 实现流式输出
# 流式调用,逐 token 输出
for chunk in simple_chain.stream({"topic": "量子纠缠"}):
    print(chunk.content, end="", flush=True)

用户看到的效果:

量子纠缠是...
量子纠缠是一种...
量子纠缠是一种量子力学现象...
Step 3: 批量处理
# 同时处理多个问题
questions = [
    {"topic": "人工智能"},
    {"topic": "机器学习"},
    {"topic": "深度学习"}
]

results = simple_chain.batch(questions)
for r in results:
    print(r)

进阶示例:构建完整 RAG 链

RAG(检索增强生成)是企业知识库问答的核心架构。

架构图
用户问题 → 检索器 → 获取相关文档 → 组装 Prompt → 大模型推理 → 返回答案
代码实现
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

# 1. 准备向量数据库
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(
    ["LangChain 是一个 AI 编排框架", "LCEL 是其核心语法"],
    embedding=embeddings
)
retriever = vectorstore.as_retriever()

# 2. 定义 Prompt 模板
template = """基于以下上下文回答问题。如果上下文中没有相关信息,请如实说明。

上下文:
{context}

问题:{question}

答案:"""
prompt = ChatPromptTemplate.from_template(template)

# 3. 初始化模型
model = ChatOpenAI(model="gpt-4")

# 4. 组装 RAG 链
rag_chain = (
    RunnableParallel(
        context=retriever,           # 检索相关文档
        question=RunnablePassthrough()  # 原样传递问题
    )
    | prompt                        # 组装完整 Prompt
    | model                         # 调用大模型
    | StrOutputParser()             # 解析输出
)

# 5. 调用
result = rag_chain.invoke("LangChain 是什么?")
print(result)

调试与日志

使用 ConsoleCallbackHandler
from langchain_core.callbacks import ConsoleCallbackHandler

# 启用控制台日志
result = chain.invoke(
    {"topic": "AI"},
    config={"callbacks": [ConsoleCallbackHandler()]}
)
使用 LangSmith(生产推荐)

LangSmith 是 LangChain 官方的可观测性平台,可以追踪每一次调用的完整链路。

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"

# 之后的所有 chain 调用都会自动记录
result = chain.invoke({"topic": "AI"})

迁移指南:从旧版 load_qa_chain 到 LCEL

旧版代码:

from langchain.chains import load_qa_chain

chain = load_qa_chain(llm, chain_type="stuff")
result = chain({"query": question, "input_documents": docs})

LCEL 迁移后:

from langchain_core.runnables import RunnablePassthrough

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)
result = chain.invoke(question)

优势:代码更简洁,支持流式,调试更容易。


六、常用场景列举

场景 1:简单问答链

最基础的用法,适合快速原型开发。

chain = prompt | model | StrOutputParser()
result = chain.invoke({"question": user_input})

场景 2:RAG 知识库问答

企业知识库、智能客服、文档助手的核心架构。

# 完整 RAG 链
rag_chain = (
    RunnableParallel(
        context=retriever,
        question=RunnablePassthrough()
    )
    | prompt
    | model
    | StrOutputParser()
)

场景 3:多步骤文章处理流水线

文档摘要 -> 关键词提取 -> 翻译 -> 格式化输出。

# 子链 1:总结
summary_chain = summarize_prompt | model | StrOutputParser()

# 子链 2:翻译
translate_chain = translate_prompt | model | StrOutputParser()

# 子链 3:格式化
format_chain = format_prompt | model | StrOutputParser()

# 组合流水线
full_pipeline = summary_chain | translate_chain | format_chain

场景 4:并行多模型对比

让 GPT-4、Claude、Gemini 同时回答,取最佳或整合。

from langchain_core.runnables import RunnableParallel

parallel_chain = RunnableParallel(
    gpt=full_gpt_chain,
    claude=full_claude_chain,
    gemini=full_gemini_chain
)

results = parallel_chain.invoke(question)
# 整合三个模型的结果
final_answer = integrate_results(results)

场景 5:智能客服系统

意图识别 -> 知识库检索 -> 上下文组装 -> 模型推理。

# 意图识别
intent_chain = intent_prompt | model | intent_parser

# 根据意图路由
routing_chain = RunnableBranch(
    (lambda x: x["intent"] == "咨询", faq_chain),
    (lambda x: x["intent"] == "投诉", complaint_chain),
    default_chain
)

# 组装完整流程
客服链 = intent_chain | routing_chain

场景 6:带工具调用的 Agent

ReAct 模式的 Agent,串联工具选择、参数填充、结果解析。

# 工具调用链
tool_chain = (
    {"tools": tools, "question": RunnablePassthrough()}
    | tool_selection_prompt
    | model.with_fallbacks([simple_fallback_model])
    | tool_parser
)

# 带重试的完整 Agent 链
agent_chain = tool_chain.with_fallbacks([
    fallback_tool_chain,
    RunnableLambda(lambda x: "抱歉,暂时无法处理您的请求")
])

七、专业解释 + 大白话 + 生活案例

专业解释:Runnable 协议的核心机制

1. __or__ 方法实现管道操作符

当你写下 chain = a | b | c 时,Python 实际执行的是:

chain = a.__or__(b)  # 返回 RunnableSequence
chain = chain.__or__(c)  # 继续拼接

这意味着任何实现了 Runnable 接口的组件都可以用 | 串联。

2. 底层是 RunnableSequence 构建过程
# 这两种写法完全等价
chain1 = runnable1 | runnable2
chain2 = RunnableSequence([runnable1, runnable2])

LangChain 在底层自动处理了类型转换和执行优化。

3. 字典自动转换为 RunnableParallel
# 这两种写法完全等价
chain1 = {"context": retriever, "question": RunnablePassthrough()}
chain2 = RunnableParallel(context=retriever, question=RunnablePassthrough())

大白话解读

概念 大白话 生活案例
` ` 管道符 流水线传送带
RunnablePassthrough 透明人/快递员 快递员只管送货,不改变货物
RunnableParallel 三头六臂 孙大圣同时处理多件事
RunnableLambda 万能转换插头 欧标转国标适配器
RunnableBranch 分岔路口 高速路的匝道,根据目的地选择路线

生活案例:点一杯奶茶的 LCEL 表达

# 原材料准备
materials = {"tea": "茶叶", "milk": "牛奶", "sugar": "糖"}

# 流水线
making_chain = (
    RunnableParallel(  # 同时准备所有材料
        tea=tea_prepare,
        milk=milk_prepare,
        sugar=sugar_prepare
    )
    | brewing  # 冲泡
    | mixing   # 混合
    | serving  # 装杯
)

# 点单
order = {"type": "港式奶茶", "ice": "少冰"}
result = making_chain.invoke(order)

你只管点单,工厂自动完成所有步骤。


八、企业级实战指导

LCEL 与 LangGraph:分工定位

这是很多开发者最困惑的问题。2026 年,框架选型直接决定项目成败。

维度 LCEL LangGraph
架构 无状态管道式 有状态循环图
适用场景 单次问答、线性 RAG 多步推理、Agent、工具调用
循环/分支 不支持 原生支持
状态管理 强大的 State 管理
复杂度 中高
学习曲线 平缓 较陡

框架选型建议

是否需要复杂的状态管理、分支、循环或多个 Agent?
    ├── 是 → 使用 LangGraph
    └── 否 → 继续判断
            是否只进行一次 LLM 调用?
                ├── 是 → 直接调用底层模型
                └── 否 → 使用 LCEL

实际选型

场景 推荐方案
单轮 RAG / 纯文档问答 LCEL 或 FastAPI + 向量库
复杂多步推理、工具调用 LangGraph
多智能体协同 / 角色扮演 CrewAI 或 AutoGen
需要循环 + 条件分支 LangGraph(可在节点内使用 LCEL)

LCEL 局限性

  1. 构建有向无环图:LCEL 本质是线性管道,不支持循环
  2. 多轮对话状态管理:需要额外集成 Memory 组件
  3. 复杂条件路由:需要配合 RunnableBranch,但灵活性不如 LangGraph

解决方案:在 LangGraph 的各个节点中继续使用 LCEL,兼顾灵活性和简洁性。

生产环境三件套

1. 全链路 Trace(LangSmith)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"

# 每次调用自动追踪
result = chain.invoke(input)
2. 重试与熔断机制
# 配置多级降级
chain = (
    prompt
    | gpt_4o.with_fallbacks([
        gpt_4o_mini,
        claude_3_5,
        RunnableLambda(lambda x: "服务暂时不可用,请稍后重试")
    ])
    | parser
)
3. 成本与延迟告警
from langchain_core.callbacks import CallbackManager

class CostAlertCallback(BaseCallbackHandler):
    def on_chain_end(self, outputs, **kwargs):
        # 检查 token 消耗
        if outputs.get("usage", {}).get("total_tokens", 0) > 10000:
            send_alert("Token 消耗超标!")

安全与合规

输出结构化校验
from pydantic import BaseModel

class AnswerSchema(BaseModel):
    answer: str
    confidence: float
    source: str

structured_parser = JsonOutputParser(pydantic_object=AnswerSchema)
chain = prompt | model | structured_parser
检索范围按角色权限过滤
# 在检索阶段过滤
def filtered_retriever(query, user_role):
    docs = base_retriever.invoke(query)
    return [d for d in docs if d.metadata.get("role") in user_role.permissions]
Prompt 注入检测
from langchain_core.runnables import RunnableLambda

def inject_checker(input_dict):
    prompt = input_dict.get("prompt", "")
    if any(keyword in prompt.lower() for keyword in ["ignore", "system", "admin"]):
        raise ValueError("检测到潜在 Prompt 注入")
    return input_dict

safe_chain = (
    RunnableLambda(inject_checker)
    | prompt
    | model
)

2026 趋势展望

趋势 影响
MCP 统一工具调用 跨框架工具集成更便捷
RAGAS/DeepEval 进入 CI/CD 自动化评估检索质量与幻觉率
内置可观测性 LangChain 原生支持成本追踪
多模态支持增强 LCEL 链支持图像、音视频处理

九、总结与互动

核心要点回顾

  1. LCEL 是声明式编程:你描述"要什么",不关心"怎么做"
  2. 管道操作符 | 是核心语法:像 Unix 管道一样简洁
  3. Runnable 协议是基石:统一了所有组件的接口
  4. 四大原语要牢记:Passthrough、Parallel、Lambda、Branch
  5. 五大优势:流式、异步、并行、重试、类型检查
  6. 框架选型:简单任务用 LCEL,复杂 Agent 用 LangGraph

快速上手代码模板

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 3 行代码搞定一切
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("用一句话解释{topic}:")
chain = prompt | model | StrOutputParser()

# 调用
result = chain.invoke({"topic": "LCEL"})

互动引导

你在生产环境中遇到过哪些 LCEL 或 LangChain 的"坑"?

  • 流式输出遇到特殊字符乱码?
  • 并行调用时遇到 API 限流?
  • RAG 检索结果总是不准确?
  • 迁移旧版 Chain 到 LCEL 遇到兼容性问题?

欢迎在评论区分享你的实战经验,一起交流成长!


转载声明

免责声明:本文仅代表作者个人观点,与平台立场无关。本文仅供参考学习,不构成任何投资或使用建议。相关内容如有不实或侵权,请联系删除。


参考链接

  1. LangChain 官方文档 - LCEL
  2. LangChain 中文指南 - LCEL
  3. LangChain Runnable 接口文档
  4. LangChain 0.3.0 Release Notes - March 2026
  5. LangChain Expression Language Cheatsheet
  6. Building LLM-Powered Applications with LangChain: A Production Guide for 2026
  7. LangChain Error Handling Best Practices
  8. Complete LangChain Production Deployment Guide
  9. LangGraph vs LangChain: How to Choose
  10. Building a RAG chain using LCEL

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐