基于 LangGraph 实现长文本总结的多阶段工作流

前言

大语言模型(LLM)在短文本处理上表现出色,但面对长文本总结(如文章摘要、报告提炼)时,往往受限于上下文窗口和输出长度,难以一次性完成高质量的压缩与重构。

解决思路并非等待更强的模型,而是改变架构:将长文本拆解为可处理的小单元,通过「分块 → 摘要 → 规划 → 重写」四阶段工作流,让 LLM 在每个环节只处理它擅长的粒度,最终由状态机编排完成全局协同。


1. 整体架构

1.1 工作流流程

四个阶段各司其职:

阶段 输入 输出 核心作用
分块 原始文本 文本块列表 将长文拆为可处理的小单元
摘要+记忆 文本块 摘要列表 + 向量库 压缩信息,构建可检索的记忆
规划 摘要列表 文章结构树 确定报告的章节框架
生成 结构树 + 向量库 最终报告 逐章节生成内容

1.2 技术栈

  • LangGraph:状态机工作流框架
  • FAISS:向量数据库,用于语义检索
  • DashScope(通义千问):LLM 和 Embedding 接口
  • OpenAI SDK:统一调用接口

2. 环境准备

2.1 依赖安装

pip install langgraph langchain-community faiss-cpu openai python-dotenv

注意:macOS 用户请安装 faiss-cpu,不要安装 faiss-gpu(不支持)。

2.2 环境变量

在项目根目录创建 .env 文件:

DASHCOPE_KEY=sk-your-api-key
BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1

2.3 初始化客户端

from openai import OpenAI
from dotenv import load_dotenv
import os

load_dotenv()
client = OpenAI(
    base_url=os.getenv('BASE_URL'),
    api_key=os.getenv('DASHCOPE_KEY')
)

DashScope 兼容 OpenAI 接口协议,因此可以直接使用 OpenAI SDK 调用。


3. 状态定义:工作流的“数据总线”

LangGraph 的核心思想是基于状态的工作流。所有节点(函数)通过一个共享的 State 对象传递数据。

from typing import Dict, List, TypedDict

class GenerationState(TypedDict):
    original_text: str       # 原始输入文本
    chunks: List[str]        # 切分后的文本块
    summaries: List[str]     # 每个文本块的摘要
    planning_tree: Dict      # 文章结构树(JSON)
    final_output: str        # 最终生成的报告
    vectorstore: FAISS       # 向量数据库

为什么需要状态定义?

  • 每个节点只读取自己需要的字段,写入自己产出的字段
  • 类型安全:TypedDict 提供了编译期检查
  • 数据流向清晰:从字段定义就能看出整个流程的数据变换

4. 文本分块:化整为零

4.1 为什么要分块?

长文本直接喂给 LLM 会面临:

  • 超出上下文窗口限制
  • 信息密度不均,关键内容被稀释
  • 摘要质量随长度下降

分块的目的是将长文拆为语义完整的小单元,便于后续逐块处理。

4.2 分块策略

def split_text(text: str) -> List[str]:
    """语义化文本分块,目标2-10块"""
    # 按双换行符(段落)分割
    paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
    
    # 情况1:段落数在目标范围内,直接返回
    if 2 <= len(paragraphs) <= 10:
        return paragraphs
    
    # 情况2:段落太少(<2),按句子进一步拆分
    if len(paragraphs) < 2:
        sentences = []
        for para in paragraphs:
            sent_list = re.split(r'[。!?]', para)
            sentences.extend([s.strip() for s in sent_list if s.strip()])
        # 将句子重新组合成块
        ...
    
    # 情况3:段落太多(>10),合并相邻段落
    if len(paragraphs) > 10:
        chunk_size = len(paragraphs) // 8  # 目标8块左右
        chunks = []
        for i in range(0, len(paragraphs), chunk_size):
            chunk_paras = paragraphs[i:i+chunk_size]
            chunks.append("\n\n".join(chunk_paras))
        return chunks

设计思路

  1. 先按段落自然分割
  2. 根据段落数量决定策略:合适则直接返回,太少则按句子拆分,太多则合并
  3. 最终控制在 2-10 块的范围内

4.3 实际效果

以《老人与海》读后感(2678 字符)为例,分块结果:

切分块数: 15 块
平均块长度: 176 字符
最大块: 237 字符
最小块: 24 字符

5. 摘要与记忆:信息压缩

5.1 LLM 摘要生成

对每个文本块调用 LLM 生成精简摘要,目标压缩到原文的 30%:

def generate_summary(chunk: str) -> str:
    target_length = int(len(chunk) * 0.3)
    
    response = client.chat.completions.create(
        model="qwen-plus",
        messages=[{
            "role": "system",
            "content": f"请对以下内容进行高度精简的摘要。要求:\n"
                       f"1. 摘要长度不超过{target_length}字符\n"
                       f"2. 只保留最核心的观点和关键信息"
        }, {
            "role": "user",
            "content": chunk
        }],
        temperature=0
    )
    return response.choices[0].message.content

二次压缩机制:如果首次摘要仍然超长,会再调用一次 LLM 进行压缩:

if len(summary) > target_length:
    # 再次调用 LLM 压缩
    ...

5.2 向量数据库:构建可检索记忆

摘要生成后,将其存入 FAISS 向量数据库:

state["vectorstore"] = FAISS.from_texts(summaries, embedding=embeddings)

Embedding 模型使用 DashScope 的 text-embedding-v4

class OpenAIEmbeddings(Embeddings):
    def __init__(self, model: str = "text-embedding-v4"):
        self.model = model
        self.client = client
        
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        # DashScope 限制单次最多 10 条,分批处理
        batch_size = 10
        all_embeddings = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            response = self.client.embeddings.create(model=self.model, input=batch)
            all_embeddings.extend([data.embedding for data in response.data])
        return all_embeddings

踩坑记录:DashScope 的 Embedding 接口限制单次最多 10 条文本,超过会报 400 错误。必须实现分批处理逻辑。


6. 规划阶段:结构设计

6.1 从摘要到结构树

将所有摘要汇总,让 LLM 生成文章结构大纲:

def build_planning_tree(summaries: List[str]) -> Dict:
    combined = "\n\n".join(f"Block {i+1}: {s}" for i, s in enumerate(summaries))
    prompt = f"""
    请根据以下文本块摘要,生成一份精简的综合报告结构大纲。
    要求:
    - 总共只生成3-4个主要章节
    - 输出为严格JSON格式
    
    摘要汇总:
    {combined}
    """
    response = client.chat.completions.create(
        model="qwen-plus",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )
    return json.loads(response.choices[0].message.content)

6.2 输出格式

结构树是一个 JSON 对象:

{
  "title": "《老人与海》精神内核综合报告",
  "sections": [
    {"title": "核心人物与精神象征", "subsections": []},
    {"title": "主题哲思与存在叩问", "subsections": []},
    {"title": "叙事美学与生命启示", "subsections": []}
  ]
}

subsections 为空数组,表示每个章节作为一个完整段落输出,避免过度碎片化。


7. 生成阶段:逐章撰写

7.1 语义检索 + 内容生成

对每个章节,先从向量库中检索相关内容,再交给 LLM 撰写:

def generate_node(state: GenerationState) -> GenerationState:
    tree = state["planning_tree"]
    content_parts = []
    
    for section in tree["sections"]:
        sec_title = section["title"]
        
        # 从向量库检索相关上下文
        context = retrieve_relevant_memory(sec_title, state["vectorstore"])
        
        # LLM 生成章节内容
        content = generate_section_content(sec_title, context)
        content_parts.append(content)
        
        # 将生成内容回写向量库(增量记忆)
        state["vectorstore"].add_texts([content])
    
    state["final_output"] = "\n\n".join(content_parts)
    return state

关键设计:生成的内容会回写到向量库,后续章节可以检索到前面已生成的内容,实现上下文连贯。

7.2 语义检索

def retrieve_relevant_memory(query: str, vectorstore: FAISS, k: int = 3) -> str:
    docs = vectorstore.similarity_search(query, k=k)
    return "\n".join(d.page_content for d in docs)

用章节标题作为 query,检索最相关的 k 条摘要/内容,为 LLM 提供写作素材。


8. 工作流编排:LangGraph 状态图

8.1 定义节点和边

from langgraph.graph import StateGraph, END

def create_generation_workflow():
    workflow = StateGraph(GenerationState)
    
    # 添加节点
    workflow.add_node("split", split_node)
    workflow.add_node("summarize_and_memorize", summarize_and_memorize_node)
    workflow.add_node("plan", planning_node)
    workflow.add_node("generate", generate_node)
    
    # 定义执行顺序(线性流水线)
    workflow.set_entry_point("split")
    workflow.add_edge("split", "summarize_and_memorize")
    workflow.add_edge("summarize_and_memorize", "plan")
    workflow.add_edge("plan", "generate")
    workflow.add_edge("generate", END)
    
    return workflow.compile()

8.2 执行工作流

app = create_generation_workflow()
result = app.invoke({"original_text": sample_text})
print(result["final_output"])

8.3 数据流示意


9. 运行效果

以《老人与海》读后感(2678 字符)为输入:

分块阶段:15 个文本块,平均 176 字符/块

摘要阶段:15 条摘要,平均压缩率约 16%

规划阶段:生成 3-4 章节结构

生成阶段:输出约 1400-1800 字符的综合报告






10. 总结与扩展

核心设计要点

  1. 分而治之:长文先拆块,逐块处理,避免超出 LLM 能力边界
  2. 记忆机制:向量数据库作为"外部记忆",让生成过程可以检索和引用
  3. 结构先行:先规划文章结构,再逐章填充,保证输出有条理
  4. 增量更新:生成的内容回写向量库,后续章节可以感知前文

可扩展方向

  • 并行分块处理:利用 LangGraph 的 Send API 并行生成摘要
  • 质量评估节点:在生成后加入评估-反思节点,自动修正
  • 多轮迭代:对生成结果再次摘要、对比,检查信息丢失
  • 流式输出:结合 LangGraph 的 stream 模式,实时展示进度

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐