前面四篇分别讲解了会话级、任务级、长期级、知识级记忆的核心实现和实操要点,但生产级智能体的核心的是“五大模块协同工作”(新增:记忆清理与优化模块),而非单独使用某一种记忆。

很多开发者的误区是“单独开发某一种记忆,忽略模块间的协同”,导致智能体出现“对话不连贯、无法自主推进任务、专业度不足、记忆冗余、Token消耗过高”等问题。比如,只使用会话记忆,智能体重启后遗忘用户偏好;只使用知识记忆,无法实现个性化交互。

本章将整合五大记忆模块(会话+任务+长期+知识+记忆清理),讲解模块协同逻辑、工业级落地完整流程,附可直接复用的工程化代码,解决生产环境中的核心痛点(Token控制、记忆精度、持久化、可维护性)。

一、五大记忆模块协同逻辑(工业级标准)

五大模块的核心协同逻辑是“分层存储、按需检索、动态优化”,各模块的定位和协同关系如下:

  1. 会话级记忆:底层基础,负责当前对话上下文衔接,为所有模块提供“即时交互语境”;
  2. 任务级记忆:核心驱动,负责自主推进任务,调用其他模块的记忆(如从知识记忆获取专业知识,从长期记忆获取用户偏好);
  3. 长期级记忆:个性化支撑,负责跨会话、跨任务复用用户相关关键信息,为任务推进和知识调用提供个性化适配;
  4. 知识级记忆:专业支撑,负责提供行业知识、外部文档、结构化关联知识,确保任务推进的专业性和准确性;
  5. 记忆清理与优化模块:保障性能,负责清理过期记忆、冗余记忆,控制Token消耗,优化检索精度,避免记忆膨胀。

协同流程:用户发起指令 → 会话记忆记录当前语境 → 任务记忆规划任务步骤 → 按需检索长期记忆(用户偏好)和知识记忆(专业知识) → 完成任务/生成回复 → 记忆清理模块清理冗余信息 → 长期记忆/知识记忆更新关键内容。

二、工业级落地完整代码(可直接复用,养老智能体示例)

以下代码整合五大记忆模块,实现“用户个性化需求+自主任务推进+专业知识支撑+性能优化”,适配生产环境,包含完整的持久化、记忆清理、多模块协同逻辑。

from langchain.memory import (
    ConversationBufferWindowMemory,
    VectorStoreRetrieverMemory,
    CombinedMemory,
    ConversationTokenBufferMemory
)
from langchain_community.vectorstores import Chroma
from langchain_community.graphs import Neo4jGraph
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.chains import RetrievalQA, GraphCypherQAChain, AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate
from langchain.tools import Tool
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader
import time

# ------------------- 1. 初始化基础组件(大模型、Embedding、文本分割器)-------------------
llm = ChatOpenAI(model="gpt-3.5-turbo", api_key="你的API密钥")
embeddings = OpenAIEmbeddings(api_key="你的API密钥")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)

# ------------------- 2. 五大记忆模块实现 -------------------
# 2.1 会话级记忆(滑动窗口+Token控制,平衡连贯与性能)
short_term_memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=1000,  # 精准控制Token消耗
    return_messages=True
)

# 2.2 长期级记忆(向量检索,持久化存储用户偏好)
long_term_vectorstore = Chroma(
    embedding_function=embeddings,
    persist_directory="./long_term_memory_db"
)
long_term_memory = VectorStoreRetrieverMemory(
    retriever=long_term_vectorstore.as_retriever(search_kwargs={"k": 2})
)

# 2.3 知识级记忆(RAG+知识图谱,专业知识支撑)
# 2.3.1 RAG记忆(外部文档:养老政策、操作规范)
loader = TextLoader("养老行业知识库.txt")
documents = loader.load()
splits = text_splitter.split_documents(documents)
knowledge_vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embeddings,
    persist_directory="./knowledge_memory_db"
)
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="map_reduce",
    retriever=knowledge_vectorstore.as_retriever(search_kwargs={"k": 2})
)

# 2.3.2 知识图谱记忆(结构化关联知识)
graph = Neo4jGraph(
    url="bolt://localhost:7687",
    username="neo4j",
    password="你的数据库密码"
)
# 初始化知识图谱(若已存在可跳过)
graph.query("""
CREATE 
    (a:养老智能体 {name: "养老智能体"}),
    (b:核心功能 {name: "照护提醒"}),
    (c:核心功能 {name: "用药提醒"}),
    (d:核心功能 {name: "健康数据监测"}),
    (e:依赖设备 {name: "传感器"}),
    (f:依赖设备 {name: "血压计"}),
    (g:用户需求 {name: "语音控制"}),
    (h:政策要求 {name: "操作便捷性"}),
    (a)-[:包含]->(b), (a)-[:包含]->(c), (a)-[:包含]->(d),
    (b)-[:依赖]->(e), (d)-[:依赖]->(f),
    (a)-[:满足]->(g), (a)-[:符合]->(h)
""")
graph_chain = GraphCypherQAChain.from_llm(llm=llm, graph=graph, verbose=False)

# 2.4 任务级记忆(自定义TaskMemory类,自主推进任务)
class TaskMemory:
    def __init__(self, task_name: str):
        self.task_name = task_name
        self.steps = []
        self.current_step = 0
        self.results = []
        self.tools_used = []
        self.status = "pending"
        self.exceptions = []

    def add_step(self, step: str):
        self.steps.append(step)

    def update_step(self, step_index: int, new_step: str):
        if 0 <= step_index < len(self.steps):
            self.steps[step_index] = new_step
        else:
            raise IndexError("步骤索引超出范围")

    def add_result(self, result: str):
        self.results.append(result)

    def add_tool_usage(self, tool_name: str, tool_params: dict, tool_result: str):
        self.tools_used.append({
            "tool_name": tool_name,
            "tool_params": tool_params,
            "tool_result": tool_result
        })

    def update_status(self, status: str):
        valid_status = ["pending", "processing", "completed", "failed"]
        if status in valid_status:
            self.status = status
        else:
            raise ValueError(f"非法状态值,仅支持{valid_status}")

    def add_exception(self, exception_msg: str, handle_method: str):
        self.exceptions.append({
            "exception_msg": exception_msg,
            "handle_method": handle_method
        })

    def to_prompt(self) -> str:
        steps_str = [f"{i+1}. {step}" for i, step in enumerate(self.steps)] if self.steps else "无"
        results_str = [f"步骤{i+1}:{result}" for i, result in enumerate(self.results)] if self.results else "无"
        tools_str = ""
        if self.tools_used:
            for idx, tool in enumerate(self.tools_used, 1):
                tools_str += f"{idx}. 工具:{tool['tool_name']},参数:{tool['tool_params']},结果:{tool['tool_result']}\n"
        else:
            tools_str = "无"
        exceptions_str = ""
        if self.exceptions:
            for idx, exc in enumerate(self.exceptions, 1):
                exceptions_str += f"{idx}. 错误:{exc['exception_msg']},处理:{exc['handle_method']}\n"
        else:
            exceptions_str = "无"
        return f"""
        【任务记忆】
        任务名称:{self.task_name}
        任务步骤:{steps_str}
        当前步骤:{self.current_step + 1}. {self.steps[self.current_step] if self.steps else "无"}
        执行结果:{results_str}
        工具调用:{tools_str}
        任务状态:{self.status}
        异常记录:{exceptions_str}
        请结合会话记忆、长期记忆、知识记忆,自主推进任务,确保专业、贴合用户需求。
        """

# 2.5 记忆清理与优化模块(定期清理冗余、过期记忆)
class MemoryCleaner:
    def __init__(self, long_term_vectorstore, knowledge_vectorstore):
        self.long_term_db = long_term_vectorstore
        self.knowledge_db = knowledge_vectorstore

    def clean_short_term_memory(self, memory, max_tokens=1000):
        """清理会话记忆,确保不超过Token限制"""
        current_tokens = self._count_tokens(memory.load_memory_variables({})["history"])
        if current_tokens > max_tokens:
            # 裁剪会话记忆,保留最近3轮
            memory.k = 3
            memory.load_memory_variables({})
            print(f"会话记忆已清理,当前Token数:{self._count_tokens(memory.load_memory_variables({})['history'])}")

    def clean_long_term_memory(self, expired_days=30):
        """清理长期记忆中过期的内容(如30天未访问的记忆)"""
        # 模拟过期记忆清理(实际可结合记忆的访问时间戳实现)
        all_memories = self.long_term_db.get()["documents"]
        if len(all_memories) > 100:  # 当记忆数量超过100条,清理前50条过期内容
            self.long_term_db.delete(ids=list(range(50)))
            self.long_term_db.persist()
            print("长期记忆已清理过期内容")

    def clean_knowledge_memory(self):
        """清理知识记忆中过期的文档(如失效政策)"""
        # 模拟清理,实际可根据文档更新时间筛选
        expired_docs = ["养老行业旧政策.txt"]
        for doc in expired_docs:
            self.knowledge_db.delete(documents=[doc])
        self.knowledge_db.persist()
        print("知识记忆已清理过期文档")

    def _count_tokens(self, text):
        """统计Token数量(简化版,实际可使用tiktoken库)"""
        return len(text.split())

# ------------------- 3. 模块协同与工具定义 -------------------
# 3.1 定义工具(任务推进中调用,关联知识记忆)
def knowledge_retrieval_tool(query: str) -> str:
    """知识检索工具:调用RAG和知识图谱,获取专业知识"""
    rag_response = rag_chain.invoke(query)
    graph_response = graph_chain.invoke(query)
    return f"RAG知识:{rag_response['result']}\n知识图谱关联:{graph_response['result']}"

def memory_clean_tool() -> str:
    """记忆清理工具:清理冗余、过期记忆"""
    cleaner = MemoryCleaner(long_term_vectorstore, knowledge_vectorstore)
    cleaner.clean_short_term_memory(short_term_memory)
    cleaner.clean_long_term_memory()
    cleaner.clean_knowledge_memory()
    return "所有记忆模块已清理优化,Token消耗和检索精度已达标"

# 注册工具
tools = [
    Tool(
        name="知识检索工具",
        func=knowledge_retrieval_tool,
        description="用于获取养老行业专业知识、政策、设备关联等信息,参数为查询语句"
    ),
    Tool(
        name="记忆清理工具",
        func=memory_clean_tool,
        description="用于清理冗余、过期记忆,优化Token消耗和检索精度,无需参数"
    )
]

# 3.2 组合记忆(会话+长期)
combined_memory = CombinedMemory(memories=[short_term_memory, long_term_memory])

# 3.3 定义Prompt模板(整合所有记忆模块)
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是专业的养老智能体开发助手,需结合会话记忆、长期记忆、任务记忆、知识记忆,自主推进任务,确保回复个性化、专业、连贯。"),
    ("placeholder", "{chat_history}"),  # 组合记忆(会话+长期)
    ("placeholder", "{task_memory}"),   # 任务记忆
    ("human", "{input}"),
    ("placeholder", "{agent_scratchpad}")  # Agent推理过程
])

# ------------------- 4. 生产级智能体启动与测试 -------------------
if __name__ == "__main__":
    # 初始化任务记忆、记忆清理器、Agent
    task_memory = TaskMemory(task_name="开发社区养老服务中心AI智能体")
    cleaner = MemoryCleaner(long_term_vectorstore, knowledge_vectorstore)
    agent = create_openai_tools_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        memory=combined_memory,
        verbose=True,
        extra_context={"task_memory": task_memory.to_prompt()}  # 注入任务记忆
    )

    # 步骤1:存入长期记忆(用户个性化需求)
    long_term_memory.save_context(
        {"input": "我是社区养老服务中心的,需要开发适配老年用户的智能体,核心需求是照护提醒、用药提醒、健康数据监测,要求操作简单、支持语音控制,符合行业政策"},
        {"output": "已记录你的需求和场景,后续将优先适配"}
    )

    # 步骤2:添加任务步骤,启动任务
    task_memory.add_step("需求分析:结合用户需求和行业知识,梳理详细需求清单")
    task_memory.add_step("架构设计:设计五大记忆模块和核心功能的架构")
    task_memory.add_step("代码开发:实现核心功能和记忆模块的集成")
    task_memory.add_step("测试优化:验证功能和记忆模块协同效果,清理冗余记忆")
    task_memory.update_status("processing")

    # 步骤3:启动智能体,自主推进任务
    print("=== 生产级养老智能体启动,开始自主推进任务 ===\n")
    # 第一次交互:推进需求分析步骤
    agent_executor.invoke({
        "input": "开始推进任务,先完成需求分析步骤,结合我的需求和行业知识,梳理详细的需求清单",
        "task_memory": task_memory.to_prompt()
    })

    # 模拟任务推进:完成需求分析,记录结果,推进到架构设计
    task_memory.add_result("需求分析完成:核心需求包括照护提醒(定时、重复、异常预警)、用药提醒(剂量、禁忌、记录)、健康数据监测(实时采集、异常预警、报告生成);用户需求:语音控制、字体放大、操作简单;政策要求:符合老年用户操作标准,可申请研发补贴;依赖设备:传感器、血压计、电子病历系统。")
    task_memory.current_step = 1
    task_memory.add_tool_usage(
        tool_name="知识检索工具",
        tool_params={"query": "养老智能体需求分析的行业标准和政策要求"},
        tool_result="已获取相关知识,需求分析符合行业规范"
    )

    # 第二次交互:推进架构设计步骤,调用记忆清理工具
    print("\n=== 推进到架构设计步骤 ===\n")
    agent_executor.invoke({
        "input": "推进到架构设计步骤,设计五大记忆模块和核心功能的架构,同时调用记忆清理工具优化性能",
        "task_memory": task_memory.to_prompt()
    })

    # 步骤4:任务总结,更新长期记忆
    task_memory.update_status("completed")
    long_term_memory.save_context(
        {"input": "养老智能体开发任务总结:已完成需求分析和架构设计,核心功能明确,记忆模块协同正常,性能达标"},
        {"output": "已保存任务总结,后续开发可直接复用"}
    )

    print("\n=== 任务推进完成,所有记忆模块协同正常 ===")
    print("长期记忆已更新,知识记忆可复用,会话记忆已清理优化")

三、生产环境落地关键技巧(避坑重点)

1. Token控制技巧(核心痛点解决)

  • 会话记忆:使用ConversationTokenBufferMemory,精准控制max_token_limit(结合大模型上下文窗口,如GPT-3.5设为1000-2000);
  • 长期/知识记忆:控制检索数量(k=2-3),避免检索过多无关记忆导致Prompt冗余;
  • 定时清理:通过MemoryCleaner模块,定期清理会话记忆、过期长期记忆、失效知识,避免Token消耗过高。

2. 记忆精度优化

  • 长期记忆:只存用户偏好、任务总结等关键信息,不存无关闲聊;
  • 知识记忆:定期更新知识库/知识图谱,删除过期内容,新增最新行业知识;
  • 检索优化:向量数据库选择适配场景(中小规模用Chroma,大规模用Pinecone/FAISS),调整检索参数(k值、相似度阈值)。

3. 持久化与可维护性

  • 所有向量数据库(长期、知识)均设置persist_directory,确保重启应用后记忆不丢失;
  • 建立记忆更新机制:任务完成后自动更新长期记忆,定期手动更新知识记忆;
  • 日志记录:为每个记忆模块添加操作日志,便于排查记忆检索、存储、清理中的问题。

4. 模块协同优化

  • 任务记忆优先调用知识记忆获取专业支撑,再调用长期记忆适配用户个性化需求;
  • 会话记忆为所有模块提供即时语境,避免记忆脱节;
  • 记忆清理模块定期运行(如每天凌晨),不影响智能体正常运行。

四、五大记忆模块整体总结

大模型智能体的记忆体系,核心是“分层存储、按需检索、协同工作”,五大模块各司其职、相互支撑,共同实现智能体的“连贯交互、自主推进、个性化、专业化”:

记忆模块

核心定位

核心实现

适用场景

会话级记忆

对话连贯

滑动窗口、Token缓冲

所有即时交互场景

任务级记忆

自主推进任务

自定义TaskMemory、LangChain Agent

复杂任务、自动执行场景

长期级记忆

跨会话个性化

向量检索(Chroma/FAISS)

需要长期记住用户偏好的场景

知识级记忆

专业知识支撑

RAG、知识图谱(Neo4j)

专业问答、政策解读、文档咨询

记忆清理与优化

保障性能

定期清理、Token控制

生产环境、大规模记忆存储

落地建议:新手可从“会话+长期+RAG知识”三个基础模块入手,快速验证智能体功能;中大型项目需整合五大模块,重点优化Token控制、记忆精度和协同逻辑,确保智能体适配生产环境,实现“真正的智能”。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐