LangChain工作流

第一章 LangChain 提示词工程;第二章 LangChain 工作流



前言

提示:这里可以添加本文要记录的大概内容:

本文主要整理 LangChain 工作流相关内容,包括模型类型与输入输出、LCEL、Runnable 协议、同步/异步流式输出、Chain 链式调用、RunnablePassthrough、RunnableLambda、itemgetter 以及 Stream events 事件流等。


提示:以下是本篇文章正文内容,下面案例可供参考

LangChain工作流

一、模型类型与输入输出

1.Langchain封装了两类模型:大语言模型 (LLM),聊天模型 (Chat Models)

  • 大语言模型 (LLM)
    • 在Langchain中,LLM是指文本补全模型(text completion model)。
    • 文本补全模型:根据上下文的文本,推断出最有可能的下一个文本。
    • 输入:一条文本内容。输出:一条文本内容
  • 聊天模型 (Chat Models)
    • 聊天模型使用语言模型,提供基于"聊天消息"的接口。
    • 输入:聊天消息对象列表。输出:一条聊天消息对象AIMessage
    • 输入:一条文本内容。输出:一条聊天消息对象AIMessage
    • 输出的聊天消息除了消息内容文本,还会包含其他数据,如语境,角色等。

2.大语言模型 (LLM)

  • 使用LLM模型,输入是字符串,输出是字符串

代码

from langchain_ollama import OllamaLLM

model = OllamaLLM(
    model="qwen3:1.7b",
    base_url="http://127.0.0.1:11434",
    keep_alive=600,
    temperature=0,
    num_ctx=8000,
)
llm_response = model.invoke(f"泰国有什么地方好玩?")
print(llm_response)
print("---")
print(type(llm_response))

结果

泰国是一个充满魅力的旅游目的地,拥有丰富的... 一步推荐!
---
<class 'str'>
  • 输入是提示词模板,输出是字符串

代码

from langchain_ollama import OllamaLLM
from langchain_core.prompts import PromptTemplate

model = OllamaLLM(
    model="qwen3:1.7b",
    base_url="http://127.0.0.1:11434",
    keep_alive=600,
    temperature=0,
    num_ctx=8000,
)

prompt_template = PromptTemplate.from_template(
    template="{place}泰国有什么地方好玩?"
)
chain = prompt_template | model
llm_response = chain.invoke(input={"place":"泰国"})
print(llm_response)
print("---")
print(type(llm_response))

结果

泰国是一个充满魅力的旅游目的地,拥有丰富的... 一步推荐!
---
<class 'str'>

3.聊天模型 (Chat Models)

  • 使用聊天模型,输入是提示词模板,输出是AIMessage对象
    • 注意:是一个AIMessage对象,而不会包括之前的对话记录。这个与之后的langgraph的state里的Messages不一样,Messages保存了所有的记录。

代码

from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama

model = ChatOllama(
    model="qwen3:1.7b",
    base_url="http://127.0.0.1:11434",
    keep_alive=600, 
    temperature=0,
    num_ctx=8000,
)
prompt_temp = ChatPromptTemplate.from_template("从下面内容中提取人物信息:{text}")
chain = prompt_temp | model
res = chain.invoke(input={"text": "张三今年18岁。"})
print(res)
print("---")
print(type(res))
print("---")
print(res.content)

结果

content = "**人物信息提取结果:**  \n- **姓名**:张三  \n- **年龄**:18岁)"
additional_kwargs = {}
response_metadata = {
    "model": "qwen3:1.7b",
    "created_at": "2026-04-22T19:03:34.1178123Z",
    "done": True,
    "done_reason": "stop",
    "total_duration": 1471850700,
    "load_duration": 112276000,
    "prompt_eval_count": 25,
    "prompt_eval_duration": 69699600,
    "eval_count": 344,
    "eval_duration": 1192827700,
    "logprobs": None,
    "model_name": "qwen3:1.7b",
    "model_provider": "ollama",
}
id = "lc_run--019db693-7e23-72b1-8338-23818cf360e0-0"
tool_calls = []
invalid_tool_calls = []
usage_metadata = {"input_tokens": 25, "output_tokens": 344, "total_tokens": 369}
---
<class 'langchain_core.messages.ai.AIMessage'>
---
**人物信息提取结果:**  
- **姓名**:张三  
- **年龄**:18岁
  • 使用聊天模型,输入是聊天消息提示词模板,输出是AIMessage对象

代码

from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama

model = ChatOllama(
    model="qwen3:1.7b",
    base_url="http://127.0.0.1:11434",
    keep_alive=600, 
    temperature=0,
    num_ctx=8000,
)
chat_template = ChatPromptTemplate.from_messages(
    messages=[
        ("system", "你是一位人工智能助手,你的名字是{name}。"),
        ("human", "你好"),
        ("ai", "我很好,谢!"),
        ("human", "{user_input}"),
    ]
)
chainx = chat_template | model
res = chainx.invoke(input={"name": "sam", "user_input": "泰国有什么地方好玩?"})
print(res)
print("---")
print(type(res))
print("---")
print(res.content)

结果

content = "泰国有很多好玩的地方,以下是一...! 🌴✨"
additional_kwargs = {}
response_metadata = {
    "model": "qwen3:1.7b",
    "created_at": "2026-04-22T19:08:34.1460898Z",
    "done": True,
    "done_reason": "stop",
    "total_duration": 3010375900,
    "load_duration": 95021700,
    "prompt_eval_count": 46,
    "prompt_eval_duration": 155683100,
    "eval_count": 782,
    "eval_duration": 2520336000,
    "logprobs": None,
    "model_name": "qwen3:1.7b",
    "model_provider": "ollama",
}
id = "lc_run--019db698-0c1d-7842-b059-ee78ded91a68-0"
tool_calls = []
invalid_tool_calls = []
usage_metadata = {"input_tokens": 46, "output_tokens": 782, "total_tokens": 828}
---
<class 'langchain_core.messages.ai.AIMessage'>
---
泰国有很多好玩的地方,以下是一...! 🌴✨

二、LCEL介绍

1.LCEL

  • LCEL(LangChain Expression Language–LangChain表达式语言)是一种强大的工作流编排工具,可以从基本组件构建复杂任务链条(chain),并支持诸如流式处理、并行处理和日志记录等开箱即用的功能。
  • LCEL支持将原型投入生产,无需更改代码,从最简单的“提示词 + LLM”链到最复杂的链(已在生产中运行了包含数百步的LCEL链)。

2.LCEL优势:

  • 流式支持:使用LCEL构建链时,流式的增量输出块的速率能与LLM 提供输出的速率相同。
  • 异步支持:使用LCEL构建的任何链都可以使用同步API(如,在Jupyter中进行)以及异步API(如,在LangServe服务器中进行)进行调用。这使得可以在原型和生产中使用相同的代码,具有出色的性能,并且能够在同一服务器中处理许多并发请求。
  • 并行执行:当LCEL链具可以并行执行时(如,从多个检索器中获取文档),会自动选择并行执行,无论是在同步接口还是异步接口中,以获得最小延迟。
  • 重试和回退:为LCEL链的任何部分配置重试和回退,使得链在规模上更可靠。并且目前正在尝试为重试/回退添加流式支持,这样就可以获得额外的可靠性而无需任何延迟成本。
  • 访问中间结果:对于更复杂的链,访问中间步骤的结果通常非常有用。也可以流式传输中间结果,并且在每个LangServe服务器上都可以使用。
  • 输入和输出模式:输入和输出模式为每个LCEL链提供了Pydantic和JSONSchema模式。可用于验证输入和输出,并且是LangServe的一个组成部分。

三、Runnable协议(Runable interface)

1.Runnable协议

  • Runnable协议是LangChain内部一些类或工具的标准接口,可以轻松定义链,并以标准方式调用它们。目的是为了尽可能简化创建链的过程。
  • 实现Runnable接口的组件可以使用|连接,拼接成链条,完成一个特定功能。
  • 许多LangChain组件都实现了Runnable协议,包括:提示模板、聊天模型、LLMs、检索器、代理、输出解析器等等。

2.Runnable协议的同步标准接口包括:

  • stream:返回响应的数据块
    • 输入参数input,为输入内容,可以是字符串或字典
  • invoke:对输入调用链
    • 输入参数input,为输入内容,可以是字符串或字典
    • 输入参数config,为设置的字典,字典名可以是"tag",“metadata”,“max_concurrency”(最大并发量)
  • batch:对输入列表调用链
    • 输入参数inputs,为输入内容列表
    • 输入参数config,为设置的字典,字典名可以是"max_concurrency"(最大并发量)

3.Runnable协议的异步标准接口(与asyncio一起使用await语法以实现并发)包括:

  • astream:异步返回响应的数据块
  • ainvoke:异步对输入调用链
  • abatch:异步对输入列表调用链
  • astream_log:异步返回中间步骤,以及最终响应
  • astream_events:beta流式传输链中发生的事件(在langchain-core 0.1.14 中引入)

4.Runnable接口方法(invoke, stream, batch, astream_log等)都接受一个可选的config参数

  • config参数通常是一个字典(RunnableConfig),作用是在不改变Runnable逻辑本身的情况下,从外部控制其运行的行为。
  • config参数的主要作用
    • 递归限制 (recursion_limit)
      • 作用:控制LangGraph或复杂Chain的最大步数,防止无限循环消耗大量Token。达到步数,程序会抛出异常。默认25。
      • 递归限制:系统最多允许执行多少个“节点推进动作”或“推理轮次”
    • 配置标识 (configurable)
      • 允许在运行时动态修改Runnable内部的参数,而不需要重新实例化对象。
      • 场景:切换模型、切换用户ID、修改检索器的k值。
      • 示例:config = {“thread_id”: “session_abc_123”}
    • 追踪与调试 (callbacks, tags, metadata)
      • 用于LangSmith监控、日志分析
      • tags:为这次运行打标签(如"production", “test”),方便在后台筛选。
      • metadata:存储自定义元数据(如user_id, session_id),这些数据会随追踪信息一起上传。
      • callbacks:手动传入回调处理器,用于实时监听运行状态。
    • 并发控制 (max_concurrency)
      • 作用:主要用于batch()方法。限制同时发送给API的请求数量。如有100个任务要跑,但不想被OpenAI封禁,可设置{“max_concurrency”: 5}。
  • 示例

代码

...
# 定义配置
config = {
    "recursion_limit": 48,                 # 增大递归深度
    "configurable": {"user_id": "123"},    # 供内部组件读取的动态参数
    "tags": ["beta_test"],                 # 在 LangSmith 中标记
    "metadata": {"env": "dev"}             # 附加元数据
}

# 所有的 Runnable 方法都支持
result = chain.invoke(input_data, config=config)

# 在流式输出中使用
for chunk in app.stream(initial_input, config=config):
    print(chunk)
...

5.异步作用

  • 并发处理能力
    • 可同时处理多个流式请求,适合高并发场景
    • 在等待网络响应时,程序可以处理其他任务
  • 资源效率
    • 异步可以让单线程处理多个连接,内存占用更少
    • 避免线程切换开销
  • 响应性
    • 程序不会因为单个请求阻塞
    • 支持请求取消和超时控制

6.异步astream

  • 输出顺序不会错乱。
    • 单个model.astream(…)的async for消费顺序,会保持和模型流式返回的顺序一致输出。
    • 多个astream如果并发打印到同一个终端,会串行交错,但是顺序也不会乱。如:ab1c234de…
  • 速度提升
    • 异步不是让单次token生成变快,而是在“等待网络/等模型返回token的空档期”去做别的事,从而提升整体吞吐量、并发能力和响应体验。

7.组件的输入类型和输出类型

组件 输入类型 输出类型
提示词 字典 提示值
聊天模型 单个字符串、聊天消息列表或提示值 聊天消息
LLM 单个字符串、聊天消息列表或提示值 字符串
输出解析器 LLM或聊天模型的输出 取决于解析器
检索器 单个字符串 文档列表
工具 单个字符串或字典,取决于工具 取决于工具

8.所有可运行对象都公开输入和输出模式以检查:

  • input_schema: 从可运行对象结构自动生成的输入Pydantic模型
  • output_schema: 从可运行对象结构自动生成的输出Pydantic模型

四、Stream(流)

1.所有Runnable对象都有stream同步方法和astream异步方法。

2.LLM和聊天模型

  • LLM模型只有一问一答的模型,聊天模型是多轮对话的模型
  • 大型语言模型及其聊天变体模型是基于LLM的应用程序的主要瓶颈。
  • 大型语言模型可能需要几秒钟才能对查询生成完整的响应。这比一般应用程序的响应约200-300毫秒要慢得多。
  • 使应用程序具有更高的响应性的关键策略是显示中间进度;即逐个令牌流式传输模型的输出。也即是说流式响应可以提高响应速度。
  • 同步stream接口:
    • 为了简单直接使用模型作为Runnable。其实Runnable可以是一个完整的两条

代码

from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
    temperature=0.7,
    max_tokens=8000,
)

chunks = []                        # 保存所有输出内容,但程序没有使用
for chunk in model.stream("天空是什么颜色?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

结果

|天空|的颜色|会|因|时间|、天气|和观察|角度而变化|,以下是|常见的几种|情况:

|1.| **晴朗|白天|**  
|   |呈现**|蓝色**|,这是因为|阳光穿过|大气层|时,|较短|波长的|蓝光|被空气|分子散射|(瑞|利散 射|),使得|我们|看到的天空|以蓝色|为主。|
...
|**小|知识**|:在|火星上|,因|稀|薄大气|和尘埃|,|天空通常是|淡粉色|或锈|红色|哦|! 🌌|  
(|具体|颜色还|受观察|者位置|、海拔|等因素|影响。)||
  • 异步astream接口:
    • 为了简单直接使用模型作为Runnable。其实Runnable可以是一个完整的链条

代码

from langchain_openai import ChatOpenAI
import asyncio

async def main():
    model = ChatOpenAI(
        model="Pro/deepseek-ai/DeepSeek-V3",
        openai_api_key="************",
        openai_api_base="https://api.siliconflow.cn/v1",
        temperature=0.7,
        max_tokens=8000,
    )
    chunks = []
    async for chunk in model.astream("天空是什么颜色?"):
        chunks.append(chunk)
        print(chunk.content, end="|", flush=True)
    print(chunks[2])

if __name__ == "__main__":
    asyncio.run(main())

结果

|天空|的颜色|因|时间|、天气|和观察|角度而变化|,以下是|常见|情况|下的|颜色及其|成因:

|1.| **晴朗|白天|**  
|  | - **|蓝色|**:|因|瑞|利散射|(Ray|leigh scattering|)。太阳|光的|蓝|光波长|较短,|被空气|分子散射|更多,|布满整个|天空。
...
|简单|来说|,天空|是|自然的|「|动态|调|色板|」,颜色|背后|是|光与|大气|相互作用的|物理现象|。||

content='的颜色' additional_kwargs={} response_metadata={} id='run--05622231-1875-4958-a072-d0cc1bf884f2' usage_metadata={'input_tokens': 7, 'output_tokens': 2, 'total_tokens': 9, 'input_token_details': {}, 'output_token_details': {}}
  • 多条问题并发、异步流式输出。

代码

from langchain_openai import ChatOpenAI
import asyncio

async def stream_one(model, prompt, idx):
    chunks = []
    async for chunk in model.astream(prompt):
        text = chunk.content or ""
        chunks.append(text)
        print(f"[{idx}] {text}", end="", flush=True)
    print()  # 当前这一条结束后换行
    full_text = "".join(chunks)
    return idx, prompt, full_text

async def main():
    model = ChatOpenAI(
        model="Pro/deepseek-ai/DeepSeek-V3",
        openai_api_key="************",
        openai_api_base="https://api.siliconflow.cn/v1",
        temperature=0.7,
        max_tokens=8000,
    )
    prompts = [
        "为什么天空是蓝颜色?请简短回答。",
        "为什么海水看起来是蓝色的?请简短回答。",
        "树叶为什么通常是绿色的?请简短回答。",
    ]
    tasks = [
        asyncio.create_task(stream_one(model, prompt, i + 1))
        for i, prompt in enumerate(prompts)
    ]
    results = await asyncio.gather(*tasks)
    print("\n\n===== 最终完整结果 =====")
    for idx, prompt, full_text in results:
        print(f"[{idx}] 问题: {prompt}")
        print(f"[{idx}] 回答: {full_text}")
        print()

if __name__ == "__main__":
    asyncio.run(main())

五、Chain(链)

1.使用LangChain表达式语言(LCEL)构建一个简单链,可以结合提示、模型和解析器,并验证流式传输。

2.链条参数(Runnable接口参数)传入第一个组件

  • 使用Runnable协议标准接口(包括:stream、invoke、batch、astream、ainvoke、abatch、astream_log、astream_events)时,接口输入的参数会自动传入链条的第一个组件中
  • 传输过程:
    • LCEL会自动检查链中第一个组件(prompt)需要的参数
    • 发现prompt模板中有占位符(如{topic})
    • 自动将输入的键值对(如{“topic”:“鹦鹉”})匹配到对应的占位符。
  • 自动匹配原理
    • prompt模板在创建时就定义了需要的参数(如{topic})
    • LCEL链会分析每个组件需要的输入参数
    • 当执行时,自动将输入的键值对与组件需要的参数进行匹配
  • 等价写法:
    • 使用RunnablePassthrough()方法,获取Runnable接口的input值的对象
      • 不能直接用字符串来代替RunnablePassthrough()
    • 其实每条链的开头都应该有RunnablePassthrough()方法,只是省略了
    • 等价写法

代码

chain = prompt | model | parser
chain.astream(input={"topic": "鹦鹉"})

等价

from langchain.schema.runnable import RunnablePassthrough
chain = RunnablePassthrough() | prompt | model | parser
chain.astream(input={"topic": "鹦鹉"})

等价(当prompt只有一个参数时)

from langchain.schema.runnable import RunnablePassthrough
chain = RunnablePassthrough() | prompt | model | parser
chain.astream(input="鹦鹉")

等价

from langchain.schema.runnable import RunnablePassthrough
chain = {"topic": RunnablePassthrough()}| prompt | model | parser
chain.astream(input={"topic": "鹦鹉"})

等价

from langchain.schema.runnable import RunnablePassthrough
chain = {"topic": RunnablePassthrough()}| prompt | model | parser
chain.astream(input="鹦鹉")
  • 对于某些方法,会自动将链条参数传入第一个组件
    • 如langserve的add_routes()执行后,会自动的创建链条,并且自动将链条参数传入第一个组件
  • 对于某些方法,默认就是某个输入
    • 如检索中的retriever默认所有输入就是检索内容的输入。
  • 当链的第一个组件对参数有限制时,就不能用字典直接用字符串
    • 1代码,因为retriever需要输入一个字符串,不是{“question”: question}对象,所以输入参数input只能是question字符串。
    • 2代码,因为itemgetter(“question”)将获取输入键名为"question"的数据并返回,所以对于retriever来说也是输入了一个字符串。所以输入参数input只能是{“question”: question}对象

1代码

from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 旧版from langchain.text_splitter import RecursiveCharacterTextSplitter

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
    temperature=0.7,
    max_tokens=8000,
)
embeddings = OpenAIEmbeddings(
    model="Pro/BAAI/bge-m3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
)

pdf_loader = PyPDFLoader(r"D:\python\envs\langchain2\pdf\rsgl.pdf")
documents = pdf_loader.load()
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", ".", " ", ""],
    chunk_size=1000,
    chunk_overlap=200,
    add_start_index=True,
)
docs = text_splitter.split_documents(documents)
vector_store = FAISS.from_documents(docs, embeddings)
retriever = vector_store.as_retriever()

prompt_template = """
基于以下上下文{context},回答问题。
问题:{question}
答案:"""
prompt = PromptTemplate.from_template(prompt_template)

chain = (
    {
        "context": retriever,
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
)

question = "新员工需要经过什么考核才能成为正式员工?"
result = chain.invoke(input=question)
# 不能:result = chain.invoke(input={"question": question})
# 因为input的内容同时会给到retriever,而retriever只接收字符串。
print(result)

2代码

...
prompt_template = """
基于以下上下文{context},回答问题。
问题:{question}
答案:"""
prompt = PromptTemplate.from_template(prompt_template)

chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
)

question = "新员工需要经过什么考核才能成为正式员工?"
result = chain.invoke(input={"question": question})
print(result)

3.输入与prompt的链条位置

  • 当链条最前位置无prompt对象,直接是模型时。输入参数是字符串,或者是提示词对象.format(),没有变量。
  • 当链条最前位置prompt对象时。输入参数是字典,对应的键名就是修改量。
  • 示例

代码

from typing import TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import  StrOutputParser

class TranslationState(TypedDict):
    translation: str

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
)
prompt = """
Translate the following text to Thai:
{text}
"""

prompt_massage = [HumanMessage(content=prompt.format(text="Hello, world!"))]
chain = model.with_structured_output(TranslationState)
response = chain.invoke(prompt_massage)
print(response)

chain2 = model | StrOutputParser()
response2 = chain2.invoke(prompt_massage)
print(response2)

prompt_template = PromptTemplate(
    input_variables=["text"],
    template=prompt,
)
chain3 = prompt_template | model.with_structured_output(TranslationState)
response3 = chain3.invoke({"text": "Hello, world!"})
print(response3)

LCEL机制

{'translation': 'สวัสดีชาวโลก!'}
สวัสดีชาวโลก
{'translation': 'สวัสดีชาวโลก'}

4.RunnablePassthrough与RunnableLambda与itemgetter

  • RunnablePassthrough()会自动匹配输入字典中的键值对,这是LangChain的LCEL(LangChain Expression Language)的核心机制之一。
    • 使用RunnablePassthrough的assign()方法,在传递数据的同时,动态添加新字段或修改现有字段。
      • 参数为键值对,键就是传入的字段名,值可以是:直接的值(如字符串、数字)或一个函数(接受当前输入数据,返回计算后的值)或另一个Runnable(如LLM、工具、子链等)

代码

from langchain.schema.runnable import RunnablePassthrough
...
chain = (
    {
        "aa": RunnablePassthrough(),
        "bb": RunnablePassthrough()
    }
    | prompt | model | parser
)
async for chunk in chain.astream({"aa": "鹦鹉", "bb": "故事"}):
    print(chunk, end="|", flush=True)
...

LCEL机制

输入字典{"aa":"鹦鹉","bb":"故事"}进入链的第一个环节。
LCEL会检查输入字典的键是否与当前环节的键一致:
发现 "aa" 键存在 → 将 "鹦鹉" 传递给 RunnablePassthrough()
发现 "bb" 键存在 → 将 "故事" 传递给 RunnablePassthrough()
  • 使用RunnablePassthrough的assign()方法,新增变量。也可以修改输入,有点像RunnableLambda

代码

from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
    temperature=0.7,
    max_tokens=8000,
)

# 1. 创建查询处理链
query_prompt = ChatPromptTemplate.from_template(
    "根据用户输入生成一个更好的搜索查询。用户输入: {input}"
)
query_chain = query_prompt | model | StrOutputParser()

# 2. 创建回答生成链
answer_prompt = ChatPromptTemplate.from_template(
    "回答以下问题:\n问题: {query}\n\n请提供详细的回答:"
)
answer_chain = answer_prompt | model | StrOutputParser()

# 3. 构建完整链条
chain = (
    # 保留原始输入
    RunnablePassthrough.assign(
        # 使用query_chain处理输入并赋值给query字段
        # x为从输入获取到的{"input": "如何学习Python编程?"}
        query=lambda x: query_chain.invoke(input={"input": x["input"]})
        # 可以直接query=lambda x: query_chain.invoke(input=x)
    )
    # 将query传递给answer_chain
    | answer_chain
)

# 4. 使用链条
result = chain.invoke(input={"input": "如何学习Python编程?"})
print(result)

LCEL机制

以下是针对Python学习不同需求的详细解答,包含搜索建议、资源推荐和学习策略:

### 一、基础语法学习
**搜索建议**:
- "Python 3.10核心语法速成教程"
- "Python常见数据类型与函数用法图解"
...
  • RunnableLambda(func)能够获取输入内容,并通过func参数传入的匿名函数来修改输入的内容

代码

from langchain_core.runnables import RunnableLambda
...
chain = (
    {
        "aa": RunnableLambda(func=lambda x: f"肥胖的{x["aa"]}"),
        "bb": RunnableLambda(func=lambda x: f"开心的{x["bb"]}")
    }
    | prompt | model | parser
)
async for chunk in chain.astream({"aa": "鹦鹉", "bb": "故事"}):
    print(chunk, end="|", flush=True)
...

LCEL机制

输入字典{"aa":"鹦鹉","bb":"故事"}进入链的第一个环节。
相当于{"aa":"肥胖的鹦鹉","bb":"开心的故事"}
  • 使用itemgetter(),手动映射指定键的值到指定的变量中。
    • 第一个参数为需用的键名。

代码

from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from operator import itemgetter

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    # model="Qwen/Qwen2.5-VL-32B-Instruct",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
    temperature=0.7,
    max_tokens=8000,
)
prompt = PromptTemplate.from_template("讲一个关于{aa}的{bb}。")
parser = StrOutputParser()
chain = (
    {
        "aa": itemgetter("vv"),    # 输入中的"vv"的值 -> 链中的 "aa"
        "bb": itemgetter("xx"),    # 输入中的"xx"的值 -> 链中的 "bb"
    }
    | prompt
    | model
    | parser
)

res = chain.invoke({"vv": "鹦鹉", "xx": "故事"})
print(res)

结果

好的,我将按照您的要求创作一个关于鹦鹉和自闭症儿童的故事。这是一个温暖人心的故事,讲述了一只聪明鹦鹉如何帮助一个封闭心灵的小女孩重新与世界建立联系。以下是我整理的故事框架和正文内容:
### 故事梗概
- **孤独的小世界**:八岁的林小雨被诊断患有自闭症,生活在自己的小世界里,对外界几乎毫无反应,让母亲林雪既心疼又无助。
...
希望这个关于鹦鹉与自闭症女孩的故事能满足您的要求。如果需要更多细节或调整,请随时告诉我。

5.链条传输|符号

  • 创建链条时使用|符号,就是将左边方法的结果(或者值),传到右边方法中作为参数输入。类似于shell里的管道。
  • 完整链条可以使用()符号括起来,也可以不用。

6.使用StrOutputParser()来解析模型的输出

  • StrOutputParser()是一个简单解析器,从模型返回结果中的AIMessageChunk,提取content字段,作为输出。

7.使用astream()方法异步流式输出

  • 使用LCEL创建的链会自动提供stream和astream方法,从而实现对最终输出的流式传输。事实上,使用LCEL创建的链实现了整个标准Runnable接口。
    • 即使在链条末尾使用了parser解析器,仍然可以获得流式输出。
    • parser解析器会对每个流式块进行操作。许多LCEL基元也支持这种转换式的流式传递。

代码

from langchain_openai import ChatOpenAI
import asyncio
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

async def main():
    model = ChatOpenAI(
        model="Pro/deepseek-ai/DeepSeek-V3",
        openai_api_key="************",
        openai_api_base="https://api.siliconflow.cn/v1",
        temperature=0.7,
        max_tokens=8000,
    )
    prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
    parser = StrOutputParser()
    # 创建LCEL链
    chain = prompt | model | parser
    async for chunk in chain.astream({"topic": "鹦鹉"}):
        print(chunk, end="|", flush=True)

if __name__ == "__main__":
    asyncio.run(main())

结果

|好的|!|这里有一个|经典的|鹦鹉笑话|:

---

|**顾客|**|走进|宠物店|,想|买一只|会|说话的鹦鹉|。
|店主指着|三|只鹦鹉|说:|
“|这只卖|500块|,它会|说10|个|词|;那只|卖100|0块|,它会|说50|个词|;最|右边那只|卖|300|0块|。”|  

|**|顾客**|很|惊讶:“|它|难道|会说|100个|词?”|
店主|摇摇头:“|不,|它一个字|都不会说|……|但另外|两只鹦鹉|都叫|它‘|老板’|。”

|---|

(|寓意|:真正的|“|老大”|往往不用|亲自|干活儿|……|😂)||
  • 使用输入流

代码

from langchain_openai import ChatOpenAI
import asyncio
from langchain_core.output_parsers import JsonOutputParser

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
    temperature=0.7,
    max_tokens=8000,
)
chain = model | JsonOutputParser()
async def async_stream():
    async for text in chain.astream(
        "以JSON格式输出法国、西班牙和日本的国家及其人口列表。"
        "使用一个带有“countries”外部键的字典,其中包含国家列表。"
        "每个国家都应该有键`name`和`population`"
    ):
        print(text, flush=True)

if __name__ == "__main__":
    asyncio.run(async_stream())

结果

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 677}]}
{'countries': [{'name': 'France', 'population': 677500}]}
{'countries': [{'name': 'France', 'population': 67750000}]}
{'countries': [{'name': 'France', 'population': 67750000}, {}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 473}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}, {}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125700}]}
{'countries': [{'name': 'France', 'population': 67750000}, {'name': 'Spain', 'population': 47350000}, {'name': 'Japan', 'population': 125700000}]}

8.使用链与不使用链的invoke

  • 以下对比(prompt_temp和data和schema相同的情况下):
    • 有链invoke:
chain = prompt_temp | model.with_structured_output(schema)
res = chain.invoke(data)
  • 无链invoke:
formatted_prompt = prompt_temp.invoke(data)
res2 = model.with_structured_output(schema).invoke(formatted_prompt)
  • 就是LCEL组合方式和直接调用方式的区别
    • 有链invoke一般可以多次调用,无链invoke一般单次使用
    • 条件相同的情况下,最终结果大概一样
    • 正式项目建议有链invoke,原因:Prompt和Model解耦,可维护,可继续接
  • 示例

LCEL 链式写法代码

from pydantic import BaseModel, Field
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate

class Person(BaseModel):
    name: str = Field(description="姓名")
    age: int = Field(description="年龄")

model = ChatOllama(
    model="qwen3:1.7b",
    base_url="http://127.0.0.1:11434",
    temperature=0,
    num_ctx=8000,
)

prompt_temp = ChatPromptTemplate.from_template("从下面内容中提取人物信息:{text}")
data = {"text": "张三今年18岁。"}

chain = prompt_temp | model.with_structured_output(Person)
res1 = chain.invoke(data)
print( res1)

直接 invoke 写法代码

from pydantic import BaseModel, Field
from langchain_ollama import ChatOllama
from langchain_core.prompts import ChatPromptTemplate

class Person(BaseModel):
    name: str = Field(description="姓名")
    age: int = Field(description="年龄")

model = ChatOllama(
    model="qwen3:1.7b",
    base_url="http://127.0.0.1:11434",
    temperature=0,
    num_ctx=8000,
)

prompt_temp = ChatPromptTemplate.from_template("从下面内容中提取人物信息:{text}")
data = {"text": "张三今年18岁。"}

# 将prompt_template对象转为message列表
formatted_prompt = prompt_temp.invoke(data) 
# 或者 formatted_prompt = prompt_temp.fromat_messages(text=data) 
res2 = model.with_structured_output(Person).invoke(formatted_prompt)
print( res2)

结果一样

name='张三' age=18

六、Stream events(事件流)

1.需要langchain-core >= 0.2。

  • 查看langchain-core版本

代码

import langchain_core
print(langchain_core.__version__)

结果

0.3.60

2.运行astream_events()条件

  • 在代码中尽可能使用async(如异步工具等)
  • 如果使用自定义函数或可运行项,需要传播回调
  • 在没有LCEL的情况下使用可运行项时,需要确保在LLMs上调用.astream()而不是.ainvoke,以强制LLM流式传输令牌

3.事件参考

  • 各种可运行对象可能发出的一些事件。
  • 当流式传输正确实现时,对于可运行项的输入直到输入流完全消耗后才会知道。这意味着inputs通常仅包括end事件,而不包括start事件。
事件 名称 输入 输出
on_chat_model_start [模型名称] {“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_end [模型名称] {“messages”: [[SystemMessage, HumanMessage]]} AIMessageChunk(content=“hello world”)
on_llm_start [模型名称] {‘input’: ‘hello’}
on_llm_stream [模型名称] ‘Hello’
on_llm_end [模型名称] ‘Hello human!’
on_chain_start format_docs
on_chain_stream format_docs “hello world!, goodbye world!”
on_chain_end format_docs [Document(…)] “hello world!, goodbye world!”
on_tool_start some_tool {“x”: 1, “y”: “2”}
on_tool_end some_tool {“x”: 1, “y”: “2”}
on_retriever_start [检索器名称] {“query”: “hello”}
on_retriever_end [检索器名称] {“query”: “hello”} [Document(…), …]
on_prompt_start [模板名称] {“question”: “hello”}
on_prompt_end [模板名称] {“question”: “hello”} ChatPromptValue(messages: [SystemMessage, …])

4.聊天模型

  • 输出事件

代码

from langchain_openai import ChatOpenAI
import asyncio
model = ChatOpenAI(
    model='Pro/deepseek-ai/DeepSeek-V3',
    openai_api_key="************",
    openai_api_base='https://api.siliconflow.cn/v1',
    temperature=0.7,
    max_tokens=8000
)
events = []
async def test():
    async for event in model.astream_events("hello", version="v2"):
        events.append(event)
    print(events)

if __name__ == '__main__':
    asyncio.run(test())

结果

[
    {
        "event": "on_chat_model_start",
        "data": {"input": "hello"},
        "name": "ChatOpenAI",
        "tags": [],
        "run_id": "ad8d363e-142c-4ecf-b49d-37a790017f68",
        "metadata": {
            "ls_provider": "openai",
            "ls_model_name": "Pro/deepseek-ai/DeepSeek-V3",
            "ls_model_type": "chat",
            "ls_temperature": 0.7,
            "ls_max_tokens": 8000,
        },
        "parent_ids": [],
    },
    {
        "event": "on_chat_model_stream",
        "run_id": "ad8d363e-142c-4ecf-b49d-37a790017f68",
        "name": "ChatOpenAI",
        "tags": [],
        "metadata": {
            "ls_provider": "openai",
            "ls_model_name": "Pro/deepseek-ai/DeepSeek-V3",
            "ls_model_type": "chat",
            "ls_temperature": 0.7,
            "ls_max_tokens": 8000,
        },
        "data": {
            "chunk": AIMessageChunk(
                content="",
                additional_kwargs={},
                response_metadata={},
                id="run--ad8d363e-142c-4ecf-b49d-37a790017f68",
                usage_metadata={
                    "input_tokens": 4,
                    "output_tokens": 0,
                    "total_tokens": 4,
                    "input_token_details": {},
                    "output_token_details": {},
                },
            )
        },
        "parent_ids": [],
    },
    {
        "event": "on_chat_model_stream",
        "run_id": "ad8d363e-142c-4ecf-b49d-37a790017f68",
        "name": "ChatOpenAI",
        "tags": [],
        "metadata": {
            "ls_provider": "openai",
            "ls_model_name": "Pro/deepseek-ai/DeepSeek-V3",
            "ls_model_type": "chat",
            "ls_temperature": 0.7,
            "ls_max_tokens": 8000,
        },
        "data": {
            "chunk": AIMessageChunk(
                content="Hello",
                additional_kwargs={},
                response_metadata={},
                id="run--ad8d363e-142c-4ecf-b49d-37a790017f68",
                usage_metadata={
                    "input_tokens": 4,
                    "output_tokens": 1,
                    "total_tokens": 5,
                    "input_token_details": {},
                    "output_token_details": {},
                },
            )
        },
        "parent_ids": [],
    },
...
    {
        "event": "on_chat_model_stream",
        "run_id": "ad8d363e-142c-4ecf-b49d-37a790017f68",
        "name": "ChatOpenAI",
        "tags": [],
        "metadata": {
            "ls_provider": "openai",
            "ls_model_name": "Pro/deepseek-ai/DeepSeek-V3",
            "ls_model_type": "chat",
            "ls_temperature": 0.7,
            "ls_max_tokens": 8000,
        },
        "data": {
            "chunk": AIMessageChunk(
                content="",
                additional_kwargs={},
                response_metadata={
                    "finish_reason": "stop",
                    "model_name": "Pro/deepseek-ai/DeepSeek-V3",
                },
                id="run--ad8d363e-142c-4ecf-b49d-37a790017f68",
                usage_metadata={
                    "input_tokens": 4,
                    "output_tokens": 11,
                    "total_tokens": 15,
                    "input_token_details": {},
                    "output_token_details": {},
                },
            )
        },
        "parent_ids": [],
    },
    {
        "event": "on_chat_model_end",
        "data": {
            "output": AIMessageChunk(
                content="Hello! 😊 How can I assist you today?",
                additional_kwargs={},
                response_metadata={
                    "finish_reason": "stop",
                    "model_name": "Pro/deepseek-ai/DeepSeek-V3",
                },
                id="run--ad8d363e-142c-4ecf-b49d-37a790017f68",
                usage_metadata={
                    "input_tokens": 36,
                    "output_tokens": 53,
                    "total_tokens": 89,
                    "input_token_details": {},
                    "output_token_details": {},
                },
            )
        },
        "run_id": "ad8d363e-142c-4ecf-b49d-37a790017f68",
        "name": "ChatOpenAI",
        "tags": [],
        "metadata": {
            "ls_provider": "openai",
            "ls_model_name": "Pro/deepseek-ai/DeepSeek-V3",
            "ls_model_type": "chat",
            "ls_temperature": 0.7,
            "ls_max_tokens": 8000,
        },
        "parent_ids": [],
    },
]
  • 输出结果

代码

from langchain_openai import ChatOpenAI
import asyncio

model = ChatOpenAI(
    model="Pro/deepseek-ai/DeepSeek-V3",
    openai_api_key="************",
    openai_api_base="https://api.siliconflow.cn/v1",
)

async def main():
    async for event in model.astream_events(
        input="你好,请用20个字说说你自己",
    ):
        if event["event"] == "on_chat_model_stream":
            print(event["data"]["chunk"].content)

if __name__ == "__main__":
    asyncio.run(main())

结果

 AI
助手
,智能
高效,
有
问必
答,
陪你
探索未知
!
 🌟

总结

本文围绕 LangChain 工作流的核心概念进行了整理,重点包括 LLM 与 Chat Model 的输入输出差异、LCEL 链式编排方式、Runnable 标准接口、流式输出、异步并发、链条输入映射、输出解析器以及事件流调试方法。实际项目中,建议优先使用 LCEL 链式写法,将 Prompt、Model、Parser、Retriever 等组件解耦,便于后续维护、扩展和调试。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐