AI Agent Harness Engineering 的短期记忆与长期记忆机制解析

作者:15年经验资深软件架构师 | AI Agent基建领域专家
预计阅读时间:45分钟 | 适合读者:AI应用开发工程师、大模型架构师、产品技术负责人


引子:你的AI Agent是不是“金鱼脑”?

2024年是AI Agent大规模落地的元年,不管是企业级客服机器人、个人智能助理,还是研发Copilot、医疗问诊Agent,都在快速渗透到各行各业。但你一定遇到过这些让人抓狂的场景:

  • 你和客服Agent说了你上周刚买的手机出了故障,转人工前它还让你重复报一遍订单号
  • 你让个人助理帮你订下周三去北京的机票,过了10分钟问它“我下周的行程”,它完全忘了机票的事
  • 研发Copilot刚帮你改完代码的bug,你再问它刚才改了什么,它给的回答和实际修改完全对不上

这些问题的核心根源,不是大模型不够聪明,而是Agent的记忆机制设计存在根本性缺陷:绝大多数开发者只会简单拼接会话上下文+用向量库存历史记录,根本没有分层记忆的工程化设计,更没有统一的记忆管控体系。而AI Agent Harness Engineering(Agent执缰工程)正是解决这类问题的核心基建,其中分层记忆机制(短期记忆+长期记忆)是Harness体系中最核心的模块之一。

本文将从核心概念、数学模型、算法实现、项目实战、最佳实践等多个维度,系统解析AI Agent Harness中的记忆机制,帮助你彻底解决Agent“失忆”“幻觉”“上下文溢出”等痛点。


一、核心概念与问题背景

1.1 核心概念定义

我们先把本文涉及的核心概念做清晰的界定,避免歧义:

概念 定义
AI Agent Harness Engineering 又称Agent执缰工程,是管控Agent生命周期、记忆、工具调用、安全、可观测性的整套工程化基建体系,核心目标是将Agent的通用能力从业务逻辑中抽离,提供标准化、可复用、可管控的Agent运行底座,避免业务层重复造轮子。
短期记忆(Short-Term Memory, STM) 又称工作记忆,是Agent在单次会话/任务周期内的上下文缓存,类似人类的工作记忆,特点是容量有限、访问速度极快、生命周期和会话绑定,任务结束后可清空或归档到长期记忆。
长期记忆(Long-Term Memory, LTM) 又称持久化记忆,是Agent跨任务、跨会话的持久化存储的记忆,类似人类的长时记忆,特点是容量近乎无限、访问速度稍慢、需要主动召回机制,生命周期独立于会话。
记忆编排引擎 Harness层的核心组件,负责协调短期记忆和长期记忆的读写、召回、合并、归档、消歧等全流程操作,是记忆机制的“大脑”。
记忆召回引擎 负责从长期记忆中快速筛选和当前请求相关的内容,支持向量召回、关键词召回、元数据过滤、重排序等能力。

1.2 问题背景:为什么我们需要分层记忆机制?

很多开发者会问:现在大模型的上下文窗口越来越大,GPT-4o已经支持2M Token, Claude 3 Opus支持200K Token,直接把所有历史记录都塞到上下文里不行吗?为什么还要做复杂的分层记忆设计?

答案非常明确:上下文窗口的增长永远赶不上业务对记忆的需求,而且上下文越长,大模型的推理成本越高、准确率越低。我们可以从三个维度来看这个问题:

1.2.1 上下文窗口的物理限制与成本约束

即使是2M Token的上下文,换算成中文也只有约150万字,仅相当于一本普通长篇小说的内容。而企业级Agent的需求是什么?一个客服Agent需要存储每个用户数年的历史咨询记录、订单记录、售后记录;一个研发Copilot需要存储整个代码仓库的所有代码、提交记录、需求文档;一个医疗Agent需要存储每个病人几年的病史、检查报告、诊疗记录。这些数据量动辄几十GB、几百GB,根本不可能全部塞到上下文里。
同时,大模型的推理成本和上下文长度是线性相关的,2M Token的输入成本是4K Token的500倍,每次请求都传全量历史记录,成本是完全不可接受的。

1.2.2 大模型的“中间迷失效应”

大量研究已经证明:当上下文长度超过大模型的最优窗口时,大模型对上下文中间部分的信息召回准确率会急剧下降,甚至完全忽略。OpenAI的官方文档也明确建议,即使是支持128K Token的GPT-4 Turbo,核心信息也尽量放在上下文的开头和结尾,中间部分的信息召回率只有约60%。如果不加筛选地把所有历史记录塞到上下文里,大模型大概率会“看不到”关键信息,出现“失忆”“幻觉”等问题。

1.2.3 跨会话记忆的需求

大模型的上下文是单次请求级的,会话结束后就会丢失,如果没有持久化的长期记忆,Agent根本不可能实现跨会话的个性化服务。比如用户上周和客服Agent说过自己对芒果过敏,这周再问“你们的蛋糕有没有芒果成分”,如果Agent没有长期记忆,就不可能主动提醒用户过敏风险。

1.3 常见的记忆机制痛点

目前行业内Agent记忆机制的常见问题可以总结为以下6类:

  1. 上下文溢出:会话内容超过大模型窗口长度,直接被截断,导致关键信息丢失
  2. 跨会话遗忘:会话结束后记忆全部丢失,无法实现个性化、跨任务服务
  3. 召回噪音大:长期记忆召回的内容和当前请求无关,反而干扰大模型推理
  4. 记忆幻觉:记忆和事实不符,或者多个记忆冲突,导致大模型输出错误结果
  5. 一致性差:不同业务模块的记忆逻辑独立,同一个用户的记忆在不同Agent中不一致
  6. 合规风险:敏感信息未脱敏存储,无法满足GDPR等法规的“被遗忘权”要求

而AI Agent Harness的分层记忆机制,正是为了系统性解决以上所有痛点而设计的。


二、记忆机制的核心结构与要素

2.1 短期记忆(STM)的核心要素

短期记忆的核心定位是为当前会话/任务提供高速的上下文缓存,它的核心要素包括:

要素 说明
会话绑定 每个短期记忆实例和唯一的会话ID绑定,不同会话的记忆完全隔离
容量限制 一般设置为大模型上下文窗口的70%~80%,保留足够空间给当前请求和工具返回结果
高速存储 一般用Redis等内存数据库存储,读写延迟在毫秒级
生命周期管理 自带TTL过期机制,会话结束后自动过期,避免存储浪费
重要性评分 每条记忆都有重要性得分,截断时优先保留高得分内容
元数据 包括角色(用户/助理/工具)、时间戳、Token数、关联的工具调用ID等

2.2 长期记忆(LTM)的核心要素

长期记忆的核心定位是跨会话、跨任务的持久化知识存储,按照记忆类型可以分为4大类:

记忆类型 定义 示例
情景记忆(Episodic Memory) 存储用户的历史交互事件、会话记录、操作行为等时序数据 用户上周三买了一部iPhone 15、用户上个月咨询过退货政策
事实记忆(Factual Memory) 存储客观事实、用户属性、业务规则等静态数据 用户的收货地址是北京市朝阳区、用户对芒果过敏、订单超过7天不能退货
程序记忆(Procedural Memory) 存储Agent的技能、工作流、工具使用方法等过程性知识 退货的流程是先查订单→确认是否符合条件→生成退货地址→通知用户
语义记忆(Semantic Memory) 存储通用知识、行业知识库、产品手册等结构化/非结构化知识 iPhone 15的保修期是1年、芒果过敏的症状是皮疹、呼吸困难

长期记忆的核心要素包括:

要素 说明
主体绑定 每个记忆实例和唯一的主体ID(用户ID/设备ID/企业ID等)绑定,不同主体的记忆完全隔离
无限容量 支持TB级别的存储规模,可水平扩展
持久化存储 一般用向量库+关系型数据库+对象存储组合存储,数据永久保存(除非主动删除)
召回机制 支持向量召回、关键词召回、元数据过滤、重排序等多层召回逻辑
置信度评分 每条记忆都有置信度得分,冲突时优先保留高置信度内容
元数据 包括记忆类型、主体ID、标签、创建时间、更新时间、访问次数、来源、权限等级等

2.3 短期记忆 vs 长期记忆核心属性对比

我们从多个维度对两类记忆做详细对比,帮助大家更清晰地理解差异:

对比维度 短期记忆(STM) 长期记忆(LTM)
核心定位 单次会话的上下文缓存 跨会话的持久化知识存储
容量限制 一般4K~128K Token,和大模型窗口匹配 无上限,支持TB级扩展
持久性 临时存储,TTL一般1~24小时,会话结束可清空 永久存储,除非主动删除
访问速度 毫秒级,内存数据库存储 几十毫秒~几百毫秒,向量库+磁盘存储
存储介质 Redis、本地内存等 FAISS、Pinecone、PGVector、MongoDB、对象存储等
召回方式 直接读取全量(截断后) 混合召回(向量+关键词+元数据过滤+重排序)
核心用途 为当前会话提供上下文,避免重复交互 为Agent提供历史知识、个性化信息、业务规则
生命周期 和会话绑定,会话结束后归档或删除 独立于会话,永久有效
写入逻辑 每次交互实时写入 会话结束后批量归档,或主动触发写入
一致性要求 最终一致性即可 强一致性,避免记忆冲突

2.4 记忆机制的实体关系(ER)架构

我们用Mermaid ER图来清晰展示Harness记忆体系的实体关系:

controls

controls

reads_writes

reads_writes

queries

owns

binds

owns

interacts

calls

Harness_Controller

string

harness_id

PK

string

version

json

memory_config

json

security_config

Short_Term_Memory

string

stm_id

PK

string

session_id

FK

string

content

float

importance_score

int

token_count

datetime

create_time

int

ttl

Long_Term_Memory

string

ltm_id

PK

string

subject_id

FK

enum

memory_type

EPISODIC/FACTUAL/PROCEDURAL/SEMANTIC

vector

embedding

json

content

float

confidence

json

tags

datetime

create_time

datetime

update_time

int

access_count

enum

permission_level

PUBLIC/INTERNAL/PRIVATE

Memory_Recall_Engine

string

engine_id

PK

enum

recall_strategy

HYBRID/VECTOR/KEYWORD

float

similarity_threshold

int

top_k

json

rerank_config

Memory_Orchestration_Engine

string

orchestration_id

PK

json

merge_rule

json

archive_rule

json

deduplication_rule

json

disambiguation_rule

LLM

string

model_id

PK

string

model_name

int

context_window

float

input_cost_per_k_token

float

output_cost_per_k_token

Subject

string

subject_id

PK

enum

subject_type

USER/DEVICE/ENTERPRISE

json

permission

datetime

create_time

Session

string

session_id

PK

string

subject_id

FK

datetime

start_time

datetime

end_time

enum

status

ACTIVE/ENDED/EXPIRED

Tool

string

tool_id

PK

string

tool_name

json

parameters

string

endpoint

2.5 记忆机制的交互流程

我们用Mermaid流程图来展示一次用户请求的完整记忆交互流程:

用户请求

Harness 会话校验

会话存在?

读取当前会话的短期记忆STM

创建新会话 初始化空STM

生成用户请求的Embedding向量

调用记忆召回引擎

长期记忆LTM召回:元数据过滤→向量召回→关键词召回→RRF重排序

记忆合并:将STM内容和召回的LTM内容按重要性排序拼接

上下文窗口校验:总Token数是否超过大模型窗口阈值?

截断低重要性内容,直到符合窗口要求

组装LLM请求上下文

调用LLM生成回复草稿

需要调用工具?

调用对应工具获取结果

将工具结果写入STM

将用户请求、LLM回复写入STM

会话是否结束?

STM归档到LTM:过滤临时内容→去重→消歧→分类打标签→写入LTM

返回最终回复给用户

2.6 边界与外延:什么内容不需要存到记忆里?

很多开发者容易走入一个误区:把所有交互内容都存到记忆里,导致记忆库越来越臃肿,召回噪音越来越大。我们需要明确,以下内容不需要存储到记忆中:

  1. 实时工具调用结果:比如实时查询的天气、股票价格、物流信息等,这些数据时效性极强,下次查询需要重新获取,不需要存到记忆里
  2. 临时计算中间结果:比如大模型推理过程中的中间步骤、临时变量等,用完即可丢弃
  3. 低价值的交互内容:比如用户的无效输入、重复的无意义内容、测试数据等,不需要归档到长期记忆
  4. 敏感未脱敏内容:比如用户的银行卡号、身份证号、密码等,必须脱敏后才能存储,或者根本不存储

三、记忆机制的数学模型

记忆机制的核心算法都有严谨的数学支撑,我们来逐一解析核心公式和模型。

3.1 短期记忆重要性评分模型

短期记忆截断的核心是优先保留高重要性的内容,我们用加权求和的方式计算每条记忆的重要性得分:
Si=α⋅Ri+β⋅Ti+γ⋅Roi S_i = \alpha \cdot R_i + \beta \cdot T_i + \gamma \cdot Ro_i Si=αRi+βTi+γRoi
其中:

  • SiS_iSi:第i条记忆的重要性得分,取值范围[0,1]
  • RiR_iRi:第i条记忆和当前查询的相关性得分,取值范围[0,1],一般用向量余弦相似度计算
  • TiT_iTi:第i条记忆的时间衰减得分,取值范围[0,1],越新的记忆得分越高,计算公式为 Ti=11+λ⋅ΔtT_i = \frac{1}{1 + \lambda \cdot \Delta t}Ti=1+λΔt1,其中Δt\Delta tΔt是记忆创建时间到当前时间的小时数,λ\lambdaλ是衰减系数,一般取0.1
  • RoiRo_iRoi:第i条记忆的角色权重得分,取值范围[0,1],一般用户输入为1.0,助理回复为0.8,工具结果为0.6
  • α、β、γ\alpha、\beta、\gammaαβγ是权重系数,满足α+β+γ=1\alpha + \beta + \gamma = 1α+β+γ=1,一般取α=0.5,β=0.3,γ=0.2\alpha=0.5, \beta=0.3, \gamma=0.2α=0.5,β=0.3,γ=0.2,可根据业务场景调整

3.2 长期记忆向量召回模型

长期记忆的第一层召回是向量召回,用查询向量和记忆向量的余弦相似度来衡量相关性:
sim(vq,vm)=vq⋅vm∣∣vq∣∣⋅∣∣vm∣∣ sim(v_q, v_m) = \frac{v_q \cdot v_m}{||v_q|| \cdot ||v_m||} sim(vq,vm)=∣∣vq∣∣∣∣vm∣∣vqvm
其中:

  • vqv_qvq:用户查询的Embedding向量
  • vmv_mvm:长期记忆的Embedding向量
  • sim(vq,vm)sim(v_q, v_m)sim(vq,vm):两个向量的余弦相似度,取值范围[-1,1],值越大相关性越高,一般设置阈值0.6~0.8,超过阈值的记忆才会进入下一阶段

3.3 混合召回重排序模型(RRF倒数秩融合)

为了兼顾向量召回的语义相关性和关键词召回的字面相关性,我们用RRF(Reciprocal Rank Fusion)算法对多路召回的结果进行重排序:
RRF(d)=∑r∈R(d)1k+r RRF(d) = \sum_{r \in R(d)} \frac{1}{k + r} RRF(d)=rR(d)k+r1
其中:

  • RRF(d)RRF(d)RRF(d):文档d的最终重排序得分
  • R(d)R(d)R(d):文档d在各路召回结果中的排名集合
  • rrr:文档d在某一路召回结果中的排名(从1开始)
  • kkk:常数,一般取60,用来避免排名靠前的结果权重过高

RRF算法的优势是不需要训练,不需要统一各路召回的得分范围,直接基于排名融合,效果稳定,是工业界最常用的重排序算法之一。

3.4 记忆衰减模型(艾宾浩斯遗忘曲线的应用)

长期记忆的置信度会随时间衰减,我们参考艾宾浩斯遗忘曲线设计了记忆衰减模型:
C(t)=C0⋅e−λ⋅t C(t) = C_0 \cdot e^{-\lambda \cdot t} C(t)=C0eλt
其中:

  • C(t)C(t)C(t):记忆在时间t的置信度
  • C0C_0C0:记忆的初始置信度,取值范围[0,1],人工录入的知识初始置信度为1.0,从交互中提取的记忆初始置信度为0.7~0.9
  • λ\lambdaλ:遗忘系数,不同类型的记忆取值不同,情景记忆取0.01(每天衰减1%),事实记忆取0.001(每天衰减0.1%),程序记忆和语义记忆取0(永久有效)
  • ttt:记忆更新后经过的天数

当记忆的置信度衰减到低于0.5时,会触发记忆校验机制,重新确认记忆的有效性,避免过时记忆干扰Agent推理。


四、核心算法原理与实现

4.1 短期记忆管理算法

4.1.1 算法流程图

触发短期记忆截断:总Token数超过阈值

计算每条记忆的重要性得分S_i

按S_i降序排序,得分相同按时间降序

累加排序后记忆的Token数,直到达到窗口阈值

保留累加范围内的记忆,删除其余记忆

更新STM存储

4.1.2 算法源代码(Python)
import redis
import tiktoken
import json
from typing import List, Dict, Optional
from pydantic import BaseModel
from datetime import datetime, timedelta

class STMMessage(BaseModel):
    role: str
    content: str
    timestamp: datetime
    importance_score: float = 0.5
    token_count: int = 0

class ShortTermMemory:
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379/0",
        max_context_tokens: int = 8000,
        ttl: int = 86400,
        alpha: float = 0.5,
        beta: float = 0.3,
        gamma: float = 0.2,
        decay_lambda: float = 0.1
    ):
        self.redis = redis.from_url(redis_url)
        self.max_context_tokens = max_context_tokens
        self.ttl = ttl  # 短期记忆默认24小时过期
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        self.alpha = alpha  # 相关性权重
        self.beta = beta    # 时间衰减权重
        self.gamma = gamma  # 角色权重
        self.decay_lambda = decay_lambda  # 时间衰减系数

    def _calculate_token_count(self, content: str) -> int:
        """计算文本的Token数"""
        return len(self.tokenizer.encode(content))
    
    def _calculate_importance(self, message: STMMessage, query: Optional[str] = None, query_embedding: Optional[List[float]] = None) -> float:
        """计算记忆的重要性得分"""
        # 1. 时间衰减得分
        time_diff_hours = (datetime.now() - message.timestamp).total_seconds() / 3600
        time_score = 1.0 / (1 + self.decay_lambda * time_diff_hours)
        
        # 2. 角色权重得分
        role_score_map = {"user": 1.0, "assistant": 0.8, "tool": 0.6, "system": 0.9}
        role_score = role_score_map.get(message.role, 0.5)
        
        # 3. 相关性得分
        relevance_score = 0.5
        if query_embedding and hasattr(self, 'embedding_model'):
            # 实际生产环境用Embedding模型计算余弦相似度
            message_embedding = self.embedding_model.encode(message.content)
            dot_product = sum(a*b for a,b in zip(query_embedding, message_embedding))
            norm_q = sum(a*a for a in query_embedding) ** 0.5
            norm_m = sum(a*a for a in message_embedding) ** 0.5
            relevance_score = dot_product / (norm_q * norm_m) if norm_q * norm_m != 0 else 0
        elif query:
            # 简化版:关键词匹配
            query_words = set(query.lower().split())
            content_words = set(message.content.lower().split())
            if len(query_words) > 0:
                relevance_score = len(query_words & content_words) / len(query_words)
        
        return self.alpha * relevance_score + self.beta * time_score + self.gamma * role_score

    def add_message(
        self,
        session_id: str,
        role: str,
        content: str,
        importance_score: Optional[float] = None,
        query: Optional[str] = None,
        query_embedding: Optional[List[float]] = None
    ) -> None:
        """添加消息到短期记忆"""
        message = STMMessage(
            role=role,
            content=content,
            timestamp=datetime.now(),
            token_count=self._calculate_token_count(content)
        )
        if importance_score:
            message.importance_score = importance_score
        else:
            message.importance_score = self._calculate_importance(message, query, query_embedding)
        
        # 写入Redis
        key = f"stm:{session_id}"
        self.redis.rpush(key, json.dumps(message.model_dump()))
        self.redis.expire(key, self.ttl)
        
        # 检查是否需要截断上下文
        self._truncate_context(session_id)
    
    def get_context(
        self,
        session_id: str,
        query: Optional[str] = None,
        query_embedding: Optional[List[float]] = None
    ) -> List[Dict]:
        """获取当前会话的上下文,按重要性排序"""
        key = f"stm:{session_id}"
        messages = self.redis.lrange(key, 0, -1)
        if not messages:
            return []
        
        # 反序列化
        stm_messages = [STMMessage(**json.loads(m)) for m in messages]
        
        # 如果有查询,重新计算重要性得分
        if query or query_embedding:
            for msg in stm_messages:
                msg.importance_score = self._calculate_importance(msg, query, query_embedding)
        
        # 按重要性降序,相同得分按时间降序
        stm_messages.sort(key=lambda x: (-x.importance_score, -x.timestamp.timestamp()))
        
        return [{"role": msg.role, "content": msg.content} for msg in stm_messages]
    
    def _truncate_context(self, session_id: str) -> None:
        """截断上下文,确保不超过最大Token数"""
        key = f"stm:{session_id}"
        messages = self.redis.lrange(key, 0, -1)
        if not messages:
            return
        
        stm_messages = [STMMessage(**json.loads(m)) for m in messages]
        total_tokens = sum(msg.token_count for msg in stm_messages)
        
        if total_tokens <= self.max_context_tokens:
            return
        
        # 按重要性排序,保留高得分的
        stm_messages.sort(key=lambda x: (-x.importance_score, -x.timestamp.timestamp()))
        current_tokens = 0
        retained_messages = []
        
        for msg in stm_messages:
            if current_tokens + msg.token_count <= self.max_context_tokens:
                retained_messages.append(msg)
                current_tokens += msg.token_count
            else:
                break
        
        # 写回Redis
        self.redis.delete(key)
        for msg in retained_messages:
            self.redis.rpush(key, json.dumps(msg.model_dump()))
        self.redis.expire(key, self.ttl)
    
    def archive_session(self, session_id: str) -> List[Dict]:
        """归档会话,返回需要写入长期记忆的内容"""
        context = self.get_context(session_id)
        # 过滤临时内容,比如工具返回的实时数据
        filtered_context = [msg for msg in context if msg["role"] != "tool" or "real_time" not in msg["content"]]
        # 清空短期记忆
        self.redis.delete(f"stm:{session_id}")
        return filtered_context

4.2 长期记忆管理算法

4.2.1 算法流程图

触发长期记忆归档:会话结束

提取STM中的记忆,过滤临时内容

生成每条记忆的Embedding向量

重复记忆检测:计算与现有LTM的相似度,超过0.9则合并

记忆消歧:冲突记忆对比置信度,高置信度保留

记忆分类:打标签,分为情景/事实/程序/语义记忆

写入LTM存储:向量存入向量库,元数据存入PG

更新记忆索引

4.2.2 算法源代码(Python)
import faiss
import numpy as np
import psycopg2
from psycopg2.extras import Json
from typing import List, Dict, Optional
from sentence_transformers import SentenceTransformer
import json
from datetime import datetime

class LongTermMemory:
    def __init__(
        self,
        pg_config: Dict,
        embedding_model_name: str = "all-MiniLM-L6-v2",
        similarity_threshold: float = 0.7,
        top_k: int = 5,
        rrf_k: int = 60
    ):
        # 初始化PG连接,存储元数据
        self.pg_conn = psycopg2.connect(**pg_config)
        self._init_pg_table()
        
        # 初始化Embedding模型
        self.embedding_model = SentenceTransformer(embedding_model_name)
        self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
        
        # 初始化FAISS向量库,实际生产环境用Pinecone/PGVector等分布式向量库
        self.index = faiss.IndexFlatL2(self.embedding_dim)
        self.memory_id_map = {}  # 向量索引到记忆ID的映射
        self._load_existing_memories()
        
        self.similarity_threshold = similarity_threshold
        self.top_k = top_k
        self.rrf_k = rrf_k

    def _init_pg_table(self):
        """初始化PG表结构"""
        with self.pg_conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS long_term_memory (
                    ltm_id VARCHAR(64) PRIMARY KEY,
                    subject_id VARCHAR(64) NOT NULL,
                    memory_type VARCHAR(32) NOT NULL,
                    content JSONB NOT NULL,
                    confidence FLOAT NOT NULL DEFAULT 0.8,
                    tags JSONB NOT NULL DEFAULT '[]'::JSONB,
                    create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    access_count INT NOT NULL DEFAULT 0,
                    permission_level VARCHAR(32) NOT NULL DEFAULT 'PRIVATE',
                    embedding BYTEA NOT NULL
                );
                CREATE INDEX IF NOT EXISTS idx_ltm_subject_id ON long_term_memory(subject_id);
                CREATE INDEX IF NOT EXISTS idx_ltm_memory_type ON long_term_memory(memory_type);
                CREATE INDEX IF NOT EXISTS idx_ltm_tags ON long_term_memory USING GIN(tags);
            """)
            self.pg_conn.commit()
    
    def _load_existing_memories(self):
        """加载现有记忆到FAISS索引"""
        with self.pg_conn.cursor() as cur:
            cur.execute("SELECT ltm_id, embedding FROM long_term_memory")
            rows = cur.fetchall()
            for ltm_id, embedding_bytes in rows:
                embedding = np.frombuffer(embedding_bytes, dtype=np.float32)
                self.index.add(embedding.reshape(1, -1))
                self.memory_id_map[len(self.memory_id_map)] = ltm_id
    
    def _cosine_similarity(self, v1: np.ndarray, v2: np.ndarray) -> float:
        """计算余弦相似度"""
        dot_product = np.dot(v1, v2)
        norm_v1 = np.linalg.norm(v1)
        norm_v2 = np.linalg.norm(v2)
        return dot_product / (norm_v1 * norm_v2) if norm_v1 * norm_v2 != 0 else 0

    def add_memory(
        self,
        subject_id: str,
        content: Dict,
        memory_type: str = "EPISODIC",
        confidence: float = 0.8,
        tags: List[str] = None,
        permission_level: str = "PRIVATE"
    ) -> str:
        """添加记忆到长期记忆库"""
        tags = tags or []
        content_str = json.dumps(content, ensure_ascii=False)
        embedding = self.embedding_model.encode(content_str).astype(np.float32)
        
        # 重复记忆检测
        if self.index.ntotal > 0:
            distances, indices = self.index.search(embedding.reshape(1, -1), 1)
            if indices[0][0] != -1:
                existing_embedding = self.index.reconstruct(int(indices[0][0]))
                similarity = self._cosine_similarity(embedding, existing_embedding)
                if similarity > 0.9:
                    # 重复记忆,更新现有记忆的置信度和访问时间
                    existing_ltm_id = self.memory_id_map[int(indices[0][0])]
                    with self.pg_conn.cursor() as cur:
                        cur.execute("""
                            UPDATE long_term_memory 
                            SET confidence = GREATEST(confidence, %s), update_time = CURRENT_TIMESTAMP, access_count = access_count + 1
                            WHERE ltm_id = %s
                        """, (confidence, existing_ltm_id))
                        self.pg_conn.commit()
                    return existing_ltm_id
        
        # 生成新的记忆ID
        ltm_id = f"ltm_{int(datetime.now().timestamp())}_{subject_id}"
        
        # 写入PG
        with self.pg_conn.cursor() as cur:
            cur.execute("""
                INSERT INTO long_term_memory (ltm_id, subject_id, memory_type, content, confidence, tags, permission_level, embedding)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """, (ltm_id, subject_id, memory_type, Json(content), confidence, Json(tags), permission_level, embedding.tobytes()))
            self.pg_conn.commit()
        
        # 添加到FAISS索引
        self.index.add(embedding.reshape(1, -1))
        self.memory_id_map[len(self.memory_id_map) - 1] = ltm_id
        
        return ltm_id
    
    def recall_memories(
        self,
        subject_id: str,
        query: str,
        memory_types: List[str] = None,
        tags: List[str] = None,
        top_k: int = None
    ) -> List[Dict]:
        """召回相关记忆"""
        top_k = top_k or self.top_k
        memory_types = memory_types or ["EPISODIC", "FACTUAL", "PROCEDURAL", "SEMANTIC"]
        
        # 1. 生成查询向量
        query_embedding = self.embedding_model.encode(query).astype(np.float32)
        
        # 2. 向量召回
        distances, indices = self.index.search(query_embedding.reshape(1, -1), top_k * 2)
        vector_results = []
        for idx, dist in zip(indices[0], distances[0]):
            if idx == -1:
                continue
            ltm_id = self.memory_id_map[int(idx)]
            similarity = 1 - dist / (np.linalg.norm(query_embedding) * np.linalg.norm(self.index.reconstruct(int(idx))))
            if similarity >= self.similarity_threshold:
                vector_results.append((ltm_id, similarity))
        
        # 3. 关键词召回(简化版,实际生产环境用ES)
        keyword_results = []
        query_words = set(query.lower().split())
        with self.pg_conn.cursor() as cur:
            cur.execute("""
                SELECT ltm_id, content FROM long_term_memory
                WHERE subject_id = %s AND memory_type = ANY(%s)
            """, (subject_id, memory_types))
            rows = cur.fetchall()
            for ltm_id, content in rows:
                content_str = json.dumps(content, ensure_ascii=False).lower()
                match_count = sum(1 for word in query_words if word in content_str)
                if match_count > 0:
                    keyword_results.append((ltm_id, match_count / len(query_words)))
        
        # 4. RRF重排序
        rank_map = {}
        # 向量召回排名
        for rank, (ltm_id, score) in enumerate(sorted(vector_results, key=lambda x: -x[1]), 1):
            if ltm_id not in rank_map:
                rank_map[ltm_id] = []
            rank_map[ltm_id].append(rank)
        # 关键词召回排名
        for rank, (ltm_id, score) in enumerate(sorted(keyword_results, key=lambda x: -x[1]), 1):
            if ltm_id not in rank_map:
                rank_map[ltm_id] = []
            rank_map[ltm_id].append(rank)
        
        # 计算RRF得分
        rrf_scores = []
        for ltm_id, ranks in rank_map.items():
            rrf_score = sum(1 / (self.rrf_k + r) for r in ranks)
            rrf_scores.append((ltm_id, rrf_score))
        
        # 取Top K
        rrf_scores.sort(key=lambda x: -x[1])
        top_ltm_ids = [ltm_id for ltm_id, score in rrf_scores[:top_k]]
        
        # 查询记忆详情
        with self.pg_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute("""
                SELECT ltm_id, memory_type, content, confidence, tags, create_time, update_time
                FROM long_term_memory
                WHERE ltm_id = ANY(%s)
            """, (top_ltm_ids,))
            memories = [dict(row) for row in cur.fetchall()]
        
        # 更新访问次数
        with self.pg_conn.cursor() as cur:
            cur.execute("""
                UPDATE long_term_memory
                SET access_count = access_count + 1
                WHERE ltm_id = ANY(%s)
            """, (top_ltm_ids,))
            self.pg_conn.commit()
        
        return memories
    
    def delete_memories_by_subject(self, subject_id: str) -> int:
        """删除指定主体的所有记忆,满足GDPR被遗忘权要求"""
        with self.pg_conn.cursor() as cur:
            cur.execute("DELETE FROM long_term_memory WHERE subject_id = %s RETURNING ltm_id", (subject_id,))
            deleted_ids = [row[0] for row in cur.fetchall()]
            self.pg_conn.commit()
        
        # 重建FAISS索引(生产环境用支持删除的向量库)
        self.index.reset()
        self.memory_id_map = {}
        self._load_existing_memories()
        
        return len(deleted_ids)

五、项目实战:企业级客服Agent的记忆系统实现

我们以一个实际的企业级客服Agent为例,完整展示Harness记忆机制的落地过程。

5.1 项目背景与需求

某电商企业需要搭建一个智能客服Agent,核心需求如下:

  1. 跨会话记忆用户的历史订单、历史咨询记录、个人偏好,不需要用户重复说明
  2. 支持100万+用户,每个用户的记忆存储周期为3年
  3. 上下文自动压缩,避免溢出,单次请求的上下文Token数不超过8K
  4. 支持用户主动删除所有个人记忆,满足合规要求
  5. 记忆召回准确率不低于90%

5.2 开发环境搭建

5.2.1 依赖安装
# 安装核心依赖
pip install redis tiktoken pydantic sentence-transformers faiss-cpu psycopg2-binary openai fastapi uvicorn

# 启动依赖服务
docker run -d -p 6379:6379 redis:7-alpine
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_DB=agent_harness postgres:15-alpine
5.2.2 系统架构设计

我们的客服Agent系统分为四层架构:

接入层:API网关/网页端/APP端

Harness层:记忆编排引擎/STM管理器/LTM管理器/召回引擎/安全管控

模型层:OpenAI GPT-3.5-Turbo/Embedding模型

工具层:订单查询接口/物流查询接口/退货申请接口

存储层:Redis(STM)/PostgreSQL(元数据)/FAISS(向量库)

5.3 核心功能实现

5.3.1 记忆编排引擎实现
from typing import List, Dict
from openai import OpenAI

class MemoryOrchestrator:
    def __init__(self, stm: ShortTermMemory, ltm: LongTermMemory, openai_api_key: str):
        self.stm = stm
        self.ltm = ltm
        self.llm_client = OpenAI(api_key=openai_api_key)
        self.system_prompt = """
你是电商平台的智能客服,你需要基于用户的历史记忆和当前请求回答问题,回答要简洁准确,不要编造信息。
如果需要调用工具,请明确说明需要调用的工具名称和参数。
以下是用户的相关记忆:
{memory_context}
"""

    def process_request(self, session_id: str, subject_id: str, user_query: str) -> str:
        # 1. 生成查询Embedding
        query_embedding = self.ltm.embedding_model.encode(user_query)
        
        # 2. 获取短期记忆
        stm_context = self.stm.get_context(session_id, user_query, query_embedding)
        
        # 3. 召回长期记忆
        ltm_memories = self.ltm.recall_memories(subject_id, user_query)
        ltm_context = "\n".join([f"- {json.dumps(mem['content'], ensure_ascii=False)}" for mem in ltm_memories])
        
        # 4. 合并上下文
        memory_context = f"短期记忆:\n{json.dumps(stm_context, ensure_ascii=False)}\n长期记忆:\n{ltm_context}"
        system_prompt = self.system_prompt.format(memory_context=memory_context)
        
        # 5. 调用LLM
        messages = [
            {"role": "system", "content": system_prompt},
            *stm_context,
            {"role": "user", "content": user_query}
        ]
        
        response = self.llm_client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=messages,
            max_tokens=1024,
            temperature=0.3
        )
        reply = response.choices[0].message.content
        
        # 6. 更新短期记忆
        self.stm.add_message(session_id, "user", user_query, query=user_query, query_embedding=query_embedding)
        self.stm.add_message(session_id, "assistant", reply)
        
        # 7. 检查是否需要调用工具(简化版,实际生产环境用函数调用)
        if "订单查询" in reply:
            # 调用订单查询工具,获取结果,更新STM
            order_info = {"order_id": "123456", "product": "iPhone 15", "status": "已发货", "logistics_no": "SF123456"}
            self.stm.add_message(session_id, "tool", json.dumps(order_info, ensure_ascii=False))
            # 重新调用LLM生成回复
            messages.append({"role": "assistant", "content": reply})
            messages.append({"role": "tool", "content": json.dumps(order_info, ensure_ascii=False)})
            response = self.llm_client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=messages,
                max_tokens=1024,
                temperature=0.3
            )
            reply = response.choices[0].message.content
            self.stm.add_message(session_id, "assistant", reply)
        
        return reply
    
    def end_session(self, session_id: str, subject_id: str) -> None:
        """结束会话,归档短期记忆到长期记忆"""
        archived_context = self.stm.archive_session(session_id)
       
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐