在这里插入图片描述


本节学习目标

通过本节课的学习,你将全面掌握:

  1. RAG+LangGraph架构设计:理解RAG原理,设计可维护、可扩展的图结构
  2. 知识库文档加载:从本地文件、URL、数据库加载多种格式文档(PDF、Markdown、TXT)
  3. 向量化与向量库配置:使用嵌入模型和Chroma/FAISS构建向量索引
  4. 检索节点编写:实现语义检索、关键词检索、混合检索
  5. 生成回答节点编排:基于检索到的文档合成高质量答案
  6. RAG流程分支控制:处理无结果、低质量结果、多轮检索
  7. 问答优化策略:HyDE、查询改写、上下文压缩、重排序
  8. 完整RAG项目代码:端到端可运行的企业级知识问答系统,可直接替换为私有知识库

RAG+LangGraph架构设计

为什么要在LangGraph中实现RAG?

传统RAG流程是线性的:检索 → 生成 → 结束。但企业级场景需要更复杂的控制:

  • 多路检索:同时从多个知识库检索,融合结果
  • 结果评估:如果检索结果质量低,进行二次检索或查询改写
  • 多轮对话:结合历史对话上下文进行检索
  • 兜底逻辑:无结果时调用Web搜索或生成通用回答
  • 人工介入:敏感问题转为人工

LangGraph将RAG改造为有状态的循环图,每个环节都可控、可观测、可干预。

RAG智能体的宏观架构

语义

关键词

足够好

不足

START

查询处理

检索方式

向量检索

BM25检索

结果融合

结果评估

生成答案

改写查询

END

状态设计

我们需要在状态中存储:对话历史、检索到的文档、原始查询、改写后查询、最终答案、迭代计数等。

from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage

class RAGState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]   # 对话历史
    query: str                                             # 当前查询
    rewritten_query: Optional[str]                         # 改写后的查询
    retrieved_docs: List[Document]                         # 检索到的文档
    answer: Optional[str]                                  # 生成的答案
    iteration: int                                         # 当前迭代次数(用于改写循环)
    max_iterations: int                                    # 最大改写次数
    confidence: float                                      # 检索结果置信度

图构建预览

from langgraph.graph import StateGraph, START, END

builder = StateGraph(RAGState)
builder.add_node("query_processor", query_processor_node)
builder.add_node("retrieve", retrieve_node)
builder.add_node("generate", generate_node)
builder.add_node("rewrite", rewrite_query_node)

builder.add_edge(START, "query_processor")
builder.add_edge("query_processor", "retrieve")
builder.add_conditional_edges("retrieve", should_continue)
builder.add_edge("generate", END)

知识库文档加载

支持的文件格式

企业知识库通常包含:PDF、Word、Markdown、纯文本、HTML等。LangChain的UnstructuredLoader支持多种格式,也可以使用专用加载器。

安装依赖

pip install langchain-community chromadb langchain-chroma sentence-transformers

加载PDF文档

from langchain_community.document_loaders import PyPDFLoader

def load_pdfs(pdf_paths: List[str]):
    all_docs = []
    for path in pdf_paths:
        loader = PyPDFLoader(path)
        docs = loader.load()
        all_docs.extend(docs)
    return all_docs

加载Markdown/文本文件

from langchain_community.document_loaders import TextLoader, UnstructuredMarkdownLoader

def load_text_files(file_paths: List[str]):
    docs = []
    for path in file_paths:
        if path.endswith('.md'):
            loader = UnstructuredMarkdownLoader(path)
        else:
            loader = TextLoader(path, encoding='utf-8')
        docs.extend(loader.load())
    return docs

加载目录下所有受支持文件

import os
from pathlib import Path

def load_knowledge_base(kb_dir: str):
    all_docs = []
    for ext in ['*.pdf', '*.txt', '*.md']:
        for file_path in Path(kb_dir).glob(ext):
            if ext == '*.pdf':
                loader = PyPDFLoader(str(file_path))
            elif ext == '*.md':
                loader = UnstructuredMarkdownLoader(str(file_path))
            else:
                loader = TextLoader(str(file_path), encoding='utf-8')
            all_docs.extend(loader.load())
    return all_docs

文档分块(Chunking)

大文档需要分割成块才能有效检索。常用的RecursiveCharacterTextSplitter可以按语义边界分割。

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
)

def chunk_documents(docs):
    return text_splitter.split_documents(docs)

向量化与向量库配置

选择嵌入模型

对于中文企业级应用,推荐使用BAAI/bge-large-zh-v1.5text2vec-large-chinese。为了简化示例,我们使用HuggingFace的轻量模型。

from langchain_community.embeddings import HuggingFaceEmbeddings

embedding_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
    model_kwargs={'device': 'cpu'},  # 可改为'cuda'
    encode_kwargs={'normalize_embeddings': True}
)

初始化向量数据库(Chroma)

Chroma是轻量级向量数据库,适合本地开发和中小规模生产。

from langchain_chroma import Chroma

def create_vectorstore(docs, embedding_model, persist_directory="./chroma_db"):
    vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=embedding_model,
        persist_directory=persist_directory
    )
    return vectorstore

# 加载或重建
def get_vectorstore(persist_directory="./chroma_db", embedding_model=None):
    return Chroma(
        persist_directory=persist_directory,
        embedding_function=embedding_model
    )

高级:混合检索(向量+BM25)

纯向量检索可能漏掉关键词精确匹配。可以使用EnsembleRetriever结合BM25。

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

# 构建BM25检索器需要原始文档列表
def create_hybrid_retriever(vectorstore, docs, embedding_model):
    bm25_retriever = BM25Retriever.from_documents(docs)
    bm25_retriever.k = 4
    vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
    
    ensemble_retriever = EnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.3, 0.7]
    )
    return ensemble_retriever

检索节点编写

基础检索节点

def retrieve_node(state: RAGState, retriever) -> dict:
    """检索节点:根据查询从向量库中检索相关文档"""
    query = state.get("rewritten_query") or state["query"]
    docs = retriever.invoke(query)
    # 可选:过滤低分文档
    docs = [doc for doc in docs if getattr(doc, 'score', 1.0) > 0.5]
    return {
        "retrieved_docs": docs,
        "confidence": len(docs) / 5.0 if len(docs) < 5 else 1.0  # 简单置信度
    }

多路检索(多个知识库)

def multi_source_retrieve_node(state: RAGState, retrievers: dict) -> dict:
    """从多个检索器并行检索并合并结果"""
    query = state.get("rewritten_query") or state["query"]
    all_docs = []
    for name, retriever in retrievers.items():
        docs = retriever.invoke(query)
        all_docs.extend(docs)
    # 去重(基于内容哈希或page_content)
    unique_docs = []
    seen = set()
    for doc in all_docs:
        if doc.page_content not in seen:
            seen.add(doc.page_content)
            unique_docs.append(doc)
    # 按相似度排序(如果有)
    unique_docs.sort(key=lambda x: getattr(x, 'score', 0), reverse=True)
    return {"retrieved_docs": unique_docs[:10]}

检索结果评估节点

判断检索结果是否足够生成高质量答案。

def evaluate_retrieval(state: RAGState) -> dict:
    """评估检索结果质量,决定是否需要改写查询"""
    docs = state["retrieved_docs"]
    if not docs:
        return {"need_rewrite": True, "reason": "no_docs"}
    # 简单启发式:如果所有文档都太短或者与查询的语义相似度过低
    avg_len = sum(len(d.page_content) for d in docs) / len(docs)
    if avg_len < 20:
        return {"need_rewrite": True, "reason": "docs_too_short"}
    
    # 更复杂的计算:调用LLM评估相关性
    # 这里简化为置信度阈值
    if state.get("confidence", 0) < 0.3:
        return {"need_rewrite": True, "reason": "low_confidence"}
    return {"need_rewrite": False}

生成回答节点编排

基础生成节点

将检索到的文档和对话历史作为上下文,调用LLM生成答案。

from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)

def generate_node(state: RAGState) -> dict:
    """生成最终答案"""
    context = "\n\n".join([doc.page_content for doc in state["retrieved_docs"]])
    # 构建提示
    system_prompt = f"""你是企业知识库助手。请基于以下参考内容回答用户问题。
如果参考内容不足以回答问题,请如实告知,不要编造信息。

参考内容:
{context}
"""
    messages = [
        SystemMessage(content=system_prompt),
        *state["messages"],  # 包含历史对话
    ]
    response = model.invoke(messages)
    return {"messages": [response], "answer": response.content}

带引用的生成节点(附来源)

def generate_with_citations(state: RAGState) -> dict:
    docs = state["retrieved_docs"]
    citations = []
    context_parts = []
    for i, doc in enumerate(docs):
        source = doc.metadata.get("source", f"文档{i}")
        citations.append(f"[{i+1}] {source}")
        context_parts.append(f"【文档{i+1}】\n{doc.page_content}")
    
    full_context = "\n\n".join(context_parts)
    prompt = f"""基于以下文档回答问题。在答案中,请在引用内容后标注文档编号,例如[1]。

文档内容:
{full_context}

问题:{state['query']}

答案:"""
    response = model.invoke([HumanMessage(content=prompt)])
    answer = response.content
    # 附加引用列表
    answer_with_citations = answer + "\n\n**参考资料:**\n" + "\n".join(citations)
    return {"messages": [AIMessage(content=answer_with_citations)], "answer": answer_with_citations}

RAG流程分支控制

条件边:是否需要改写查询

def should_continue_retrieval(state: RAGState) -> str:
    """决定是直接生成答案还是改写查询重新检索"""
    if state.get("iteration", 0) >= state.get("max_iterations", 2):
        # 超过最大迭代次数,即使结果差也生成
        return "generate"
    
    if not state["retrieved_docs"]:
        return "rewrite"
    
    # 如果置信度低,且迭代次数未超限
    if state.get("confidence", 1.0) < 0.5 and state["iteration"] < 2:
        return "rewrite"
    
    return "generate"

查询改写节点

当检索结果不佳时,使用LLM改写查询以获取更好结果。

def rewrite_query_node(state: RAGState) -> dict:
    """改写用户查询,提高检索质量"""
    original_query = state["query"]
    iteration = state["iteration"] + 1
    
    if iteration == 1:
        rewrite_prompt = f"""原始查询:{original_query}
问题:检索结果不理想。请将查询改写得更具体、更正式,添加关键词以提高检索准确率。
只输出改写后的查询,不要添加额外解释。"""
    else:
        # 第二次改写,尝试不同表达
        rewrite_prompt = f"""原始查询:{original_query}
上一次改写后仍未找到足够信息。请从不同角度改写查询,或者拆解为多个子查询用AND连接。
只输出改写后的查询。"""
    
    response = model.invoke([HumanMessage(content=rewrite_prompt)])
    rewritten = response.content.strip()
    print(f"查询改写: {original_query} -> {rewritten}")
    return {
        "rewritten_query": rewritten,
        "iteration": iteration,
        "query": original_query  # 保留原始查询
    }

无结果兜底处理

def fallback_node(state: RAGState) -> dict:
    """当多次检索无果时,给出通用回复或提示人工"""
    fallback_answer = "抱歉,知识库中没有找到与您问题相关的信息。请您换个方式提问,或联系人工客服。"
    return {"messages": [AIMessage(content=fallback_answer)], "answer": fallback_answer}

完整图构建代码

def build_rag_graph(retriever):
    builder = StateGraph(RAGState)
    builder.add_node("retrieve", lambda s: retrieve_node(s, retriever))
    builder.add_node("rewrite", rewrite_query_node)
    builder.add_node("generate", generate_node)
    builder.add_node("fallback", fallback_node)
    
    builder.add_edge(START, "retrieve")
    builder.add_conditional_edges(
        "retrieve",
        should_continue_retrieval,
        {
            "generate": "generate",
            "rewrite": "rewrite",
            "fallback": "fallback"
        }
    )
    builder.add_edge("rewrite", "retrieve")  # 循环
    builder.add_edge("generate", END)
    builder.add_edge("fallback", END)
    
    return builder.compile()

问答优化策略

1. HyDE(假设性文档嵌入)

HyDE生成一个假设性的答案文档,用这个文档的向量去检索,提高召回率。

def hyde_retrieve(state: RAGState, retriever, model) -> dict:
    """使用HyDE改进检索"""
    query = state["query"]
    # 生成假设性答案
    hyde_prompt = f"请写一段可能回答以下问题的文档内容:{query}"
    hypothetical_doc = model.invoke([HumanMessage(content=hyde_prompt)]).content
    # 用假设文档检索
    docs = retriever.invoke(hypothetical_doc)
    return {"retrieved_docs": docs}

2. 查询改写(Expand Query)

使用LLM生成多个同义查询,分别检索后合并结果。

def expand_query(state: RAGState) -> dict:
    query = state["query"]
    expansion_prompt = f"""请为以下查询生成3个不同表述的同义查询,每个一行:{query}"""
    response = model.invoke([HumanMessage(content=expansion_prompt)])
    queries = [query] + [q.strip() for q in response.content.strip().split("\n") if q.strip()]
    # 分别检索,合并结果
    all_docs = []
    for q in queries:
        docs = retriever.invoke(q)
        all_docs.extend(docs)
    # 去重、排序
    unique_docs = {doc.page_content: doc for doc in all_docs}.values()
    return {"retrieved_docs": list(unique_docs)}

3. 上下文压缩(Contextual Compression)

对检索到的长文档进行压缩,只保留与问题最相关的部分。

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

compressor = LLMChainExtractor.from_llm(model)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=vectorstore.as_retriever()
)

4. 重排序(Re-ranking)

使用更强大的交叉编码器对检索结果重新排序,提升精度。

from sentence_transformers import CrossEncoder

cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank_docs(query, docs, top_k=3):
    pairs = [[query, doc.page_content] for doc in docs]
    scores = cross_encoder.predict(pairs)
    scored_docs = sorted(zip(docs, scores), key=lambda x: x[1], reverse=True)
    return [doc for doc, _ in scored_docs[:top_k]]

完整RAG项目源码

以下是一个可直接运行的企业级知识库问答系统,采用LangGraph编排,支持多轮对话、检索生成、查询改写、超时重试。

项目结构

rag_agent/
├── main.py                 # 主程序
├── config.py               # 配置
├── loader.py               # 文档加载与分块
├── vectorstore.py          # 向量库管理
├── graph.py                # LangGraph图定义
├── requirements.txt
└── data/                   # 存放知识库文档

config.py

import os
from dotenv import load_dotenv
load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
VECTOR_DB_PATH = "./chroma_db"
KNOWLEDGE_BASE_DIR = "./data"
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
TOP_K_RETRIEVAL = 5
MAX_ITERATIONS = 2

loader.py

from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pathlib import Path
from config import CHUNK_SIZE, CHUNK_OVERLAP, KNOWLEDGE_BASE_DIR

def load_and_chunk():
    all_docs = []
    for file_path in Path(KNOWLEDGE_BASE_DIR).glob("*"):
        if file_path.suffix == ".pdf":
            loader = PyPDFLoader(str(file_path))
        elif file_path.suffix in [".txt", ".md"]:
            loader = TextLoader(str(file_path), encoding="utf-8")
        else:
            continue
        docs = loader.load()
        all_docs.extend(docs)
    
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
    )
    chunks = splitter.split_documents(all_docs)
    return chunks

vectorstore.py

from langchain_chroma import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from config import VECTOR_DB_PATH

def get_embeddings():
    return HuggingFaceEmbeddings(
        model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
        model_kwargs={'device': 'cpu'},
        encode_kwargs={'normalize_embeddings': True}
    )

def create_vectorstore(docs):
    embeddings = get_embeddings()
    vectordb = Chroma.from_documents(
        documents=docs,
        embedding=embeddings,
        persist_directory=VECTOR_DB_PATH
    )
    return vectordb

def load_vectorstore():
    embeddings = get_embeddings()
    return Chroma(persist_directory=VECTOR_DB_PATH, embedding_function=embeddings)

graph.py

from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph, START, END, add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from config import TOP_K_RETRIEVAL, MAX_ITERATIONS, OPENAI_API_KEY
import os

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

class RAGState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    query: str
    rewritten_query: Optional[str]
    retrieved_docs: List[Document]
    answer: Optional[str]
    iteration: int
    max_iterations: int

model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)

def retrieve_node(state: RAGState, retriever) -> dict:
    query = state.get("rewritten_query") or state["query"]
    docs = retriever.invoke(query)
    # 限制数量
    docs = docs[:TOP_K_RETRIEVAL]
    return {"retrieved_docs": docs}

def evaluate_retrieval(state: RAGState) -> str:
    if state["iteration"] >= state["max_iterations"]:
        return "generate"
    if not state["retrieved_docs"]:
        return "rewrite"
    # 简单启发:文档总长度太短
    total_len = sum(len(d.page_content) for d in state["retrieved_docs"])
    if total_len < 50:
        return "rewrite"
    return "generate"

def rewrite_query_node(state: RAGState) -> dict:
    original = state["query"]
    iteration = state["iteration"] + 1
    prompt = f"""原始查询:{original}
检索结果不理想。请改写查询,使其更具体、更准确。只输出改写后的查询。"""
    response = model.invoke([HumanMessage(content=prompt)])
    rewritten = response.content.strip()
    return {"rewritten_query": rewritten, "iteration": iteration}

def generate_node(state: RAGState) -> dict:
    context = "\n\n".join([doc.page_content for doc in state["retrieved_docs"]])
    system = f"""基于以下参考内容回答用户的问题。如果参考内容不足,请告知。

参考内容:
{context}
"""
    messages = [SystemMessage(content=system)] + state["messages"]
    response = model.invoke(messages)
    return {"messages": [response], "answer": response.content}

def build_rag_graph(retriever):
    builder = StateGraph(RAGState)
    builder.add_node("retrieve", lambda s: retrieve_node(s, retriever))
    builder.add_node("rewrite", rewrite_query_node)
    builder.add_node("generate", generate_node)

    builder.add_edge(START, "retrieve")
    builder.add_conditional_edges(
        "retrieve",
        evaluate_retrieval,
        {
            "generate": "generate",
            "rewrite": "rewrite"
        }
    )
    builder.add_edge("rewrite", "retrieve")
    builder.add_edge("generate", END)
    return builder.compile()

main.py

from loader import load_and_chunk
from vectorstore import create_vectorstore, load_vectorstore
from graph import build_rag_graph
from langgraph.checkpoint.memory import MemorySaver
import sys

def main():
    # 1. 构建向量库(首次运行)
    try:
        vectorstore = load_vectorstore()
        print("加载已有向量库")
    except Exception:
        print("未找到向量库,开始构建...")
        docs = load_and_chunk()
        vectorstore = create_vectorstore(docs)
        print("向量库构建完成")
    
    retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
    graph = build_rag_graph(retriever)
    memory = MemorySaver()
    graph_with_memory = graph.compile(checkpointer=memory)

    print("知识库问答系统已启动,输入 'exit' 退出")
    thread_id = "user_001"
    config = {"configurable": {"thread_id": thread_id}}

    while True:
        user_input = input("\n用户: ")
        if user_input.lower() == "exit":
            break
        state = {
            "messages": [HumanMessage(content=user_input)],
            "query": user_input,
            "rewritten_query": None,
            "retrieved_docs": [],
            "answer": None,
            "iteration": 0,
            "max_iterations": 2
        }
        result = graph_with_memory.invoke(state, config=config)
        last_message = result["messages"][-1]
        print(f"助手: {last_message.content}")

if __name__ == "__main__":
    main()

requirements.txt

langgraph>=0.2.0
langchain>=0.3.0
langchain-openai>=0.2.0
langchain-community>=0.3.0
langchain-chroma>=0.1.0
sentence-transformers>=2.2.0
chromadb>=0.5.0
pypdf>=4.0.0
python-dotenv>=1.0.0

本节总结+思考题

核心知识点总结

组件 作用 关键代码
文档加载 读取多种格式文件 PyPDFLoader, TextLoader
文本分块 分割长文档 RecursiveCharacterTextSplitter
嵌入模型 将文本转为向量 HuggingFaceEmbeddings
向量库 存储和检索向量 Chroma
检索节点 根据查询查找文档 retriever.invoke()
生成节点 基于上下文生成答案 model.invoke()
查询改写 优化检索效果 LLM改写
条件边 分支控制 add_conditional_edges
检查点 支持多轮对话 MemorySaver

课后思考题

基础题

  1. 为什么需要对文档进行分块?chunk_sizechunk_overlap参数如何影响检索效果?
  2. 在LangGraph中构建RAG时,多轮对话的上下文是如何通过messages字段传递的?
  3. 简述HyDE的原理,它与传统向量检索有何不同?

进阶题

  1. 设计一个评估函数,使用LLM自动判断检索到的文档与用户问题的相关性程度(输出0-100分),并根据分数决定是否需要重新检索。
  2. 如何实现多模态RAG(同时检索文本和图片)?需要哪些额外的组件?
  3. 如果你的知识库包含超过100万篇文档,Chroma可能成为瓶颈。请设计一个分布式检索方案(如使用Elasticsearch或Pinecone)并集成到LangGraph中。

实践题

  1. 运行本节课的完整RAG项目,用自己的PDF或TXT文件替换./data目录下的内容,测试至少5个不同的问题,记录回答质量。
  2. 在RAG图中增加一个“重排序”节点,使用cross-encoder对检索结果重新排序,然后传递给生成节点。实现并对比效果。
  3. 将查询改写节点改为多查询检索(生成3个同义查询,合并结果),然后评估是否提升了召回率。

思考题

  1. 企业级RAG常面临知识更新问题:文档库每天增加新文件,如何在不重建整个向量库的情况下增量更新?LangGraph的哪个特性可以帮助实现增量更新流程?(提示:检查点与子图)

下节课预告

第14节课:多Agent协作架构设计与LangGraph工程实现

单Agent RAG已经很强,但复杂任务需要多个专业Agent协同工作。下一节课我们将学习:

  • 多Agent应用场景:分工协作、辩论、角色扮演
  • 主从Agent架构:一个主管Agent分配任务给多个专家Agent
  • 分工Agent架构:各Agent独立处理子任务,最终汇总
  • Agent间通信机制:通过全局状态、消息队列、事件总线
  • 状态共享与隔离:如何避免Agent间状态污染
  • 多节点任务分发:并行执行多个Agent
  • 协作流程编排:串行、并行、竞速等模式
  • 完整多Agent实战项目:代码审查系统、研究助理团队

课前准备

  • 复习本节课的RAG流程
  • 阅读LangGraph官方文档中的Send API用于并行任务分发
  • 思考一个适合多Agent分工的场景(如写一份包含市场调查+技术分析+财务评估的综合报告)

下一节课后,你将能够设计复杂的多Agent系统,迈向真正的AI工程架构师!


🔗《20节课 LangGraph 从入门到精通》系列课程导航

去订阅

🌟 感谢您耐心阅读到这里!
💡 如果本文对您有所启发欢迎:
👍 点赞📌 收藏 📤 分享给更多需要的伙伴。
🗣️ 期待在评论区看到您的想法, 共同进步。
🔔 关注我,持续获取更多干货内容~
🤗 我们下篇文章见~

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐