AI Agent Harness Engineering 的短期记忆与长期记忆机制解析
AI Agent Harness Engineering 的短期记忆与长期记忆机制解析
作者:15年经验资深软件架构师 | AI Agent基建领域专家
预计阅读时间:45分钟 | 适合读者:AI应用开发工程师、大模型架构师、产品技术负责人
引子:你的AI Agent是不是“金鱼脑”?
2024年是AI Agent大规模落地的元年,不管是企业级客服机器人、个人智能助理,还是研发Copilot、医疗问诊Agent,都在快速渗透到各行各业。但你一定遇到过这些让人抓狂的场景:
- 你和客服Agent说了你上周刚买的手机出了故障,转人工前它还让你重复报一遍订单号
- 你让个人助理帮你订下周三去北京的机票,过了10分钟问它“我下周的行程”,它完全忘了机票的事
- 研发Copilot刚帮你改完代码的bug,你再问它刚才改了什么,它给的回答和实际修改完全对不上
这些问题的核心根源,不是大模型不够聪明,而是Agent的记忆机制设计存在根本性缺陷:绝大多数开发者只会简单拼接会话上下文+用向量库存历史记录,根本没有分层记忆的工程化设计,更没有统一的记忆管控体系。而AI Agent Harness Engineering(Agent执缰工程)正是解决这类问题的核心基建,其中分层记忆机制(短期记忆+长期记忆)是Harness体系中最核心的模块之一。
本文将从核心概念、数学模型、算法实现、项目实战、最佳实践等多个维度,系统解析AI Agent Harness中的记忆机制,帮助你彻底解决Agent“失忆”“幻觉”“上下文溢出”等痛点。
一、核心概念与问题背景
1.1 核心概念定义
我们先把本文涉及的核心概念做清晰的界定,避免歧义:
| 概念 | 定义 |
|---|---|
| AI Agent Harness Engineering | 又称Agent执缰工程,是管控Agent生命周期、记忆、工具调用、安全、可观测性的整套工程化基建体系,核心目标是将Agent的通用能力从业务逻辑中抽离,提供标准化、可复用、可管控的Agent运行底座,避免业务层重复造轮子。 |
| 短期记忆(Short-Term Memory, STM) | 又称工作记忆,是Agent在单次会话/任务周期内的上下文缓存,类似人类的工作记忆,特点是容量有限、访问速度极快、生命周期和会话绑定,任务结束后可清空或归档到长期记忆。 |
| 长期记忆(Long-Term Memory, LTM) | 又称持久化记忆,是Agent跨任务、跨会话的持久化存储的记忆,类似人类的长时记忆,特点是容量近乎无限、访问速度稍慢、需要主动召回机制,生命周期独立于会话。 |
| 记忆编排引擎 | Harness层的核心组件,负责协调短期记忆和长期记忆的读写、召回、合并、归档、消歧等全流程操作,是记忆机制的“大脑”。 |
| 记忆召回引擎 | 负责从长期记忆中快速筛选和当前请求相关的内容,支持向量召回、关键词召回、元数据过滤、重排序等能力。 |
1.2 问题背景:为什么我们需要分层记忆机制?
很多开发者会问:现在大模型的上下文窗口越来越大,GPT-4o已经支持2M Token, Claude 3 Opus支持200K Token,直接把所有历史记录都塞到上下文里不行吗?为什么还要做复杂的分层记忆设计?
答案非常明确:上下文窗口的增长永远赶不上业务对记忆的需求,而且上下文越长,大模型的推理成本越高、准确率越低。我们可以从三个维度来看这个问题:
1.2.1 上下文窗口的物理限制与成本约束
即使是2M Token的上下文,换算成中文也只有约150万字,仅相当于一本普通长篇小说的内容。而企业级Agent的需求是什么?一个客服Agent需要存储每个用户数年的历史咨询记录、订单记录、售后记录;一个研发Copilot需要存储整个代码仓库的所有代码、提交记录、需求文档;一个医疗Agent需要存储每个病人几年的病史、检查报告、诊疗记录。这些数据量动辄几十GB、几百GB,根本不可能全部塞到上下文里。
同时,大模型的推理成本和上下文长度是线性相关的,2M Token的输入成本是4K Token的500倍,每次请求都传全量历史记录,成本是完全不可接受的。
1.2.2 大模型的“中间迷失效应”
大量研究已经证明:当上下文长度超过大模型的最优窗口时,大模型对上下文中间部分的信息召回准确率会急剧下降,甚至完全忽略。OpenAI的官方文档也明确建议,即使是支持128K Token的GPT-4 Turbo,核心信息也尽量放在上下文的开头和结尾,中间部分的信息召回率只有约60%。如果不加筛选地把所有历史记录塞到上下文里,大模型大概率会“看不到”关键信息,出现“失忆”“幻觉”等问题。
1.2.3 跨会话记忆的需求
大模型的上下文是单次请求级的,会话结束后就会丢失,如果没有持久化的长期记忆,Agent根本不可能实现跨会话的个性化服务。比如用户上周和客服Agent说过自己对芒果过敏,这周再问“你们的蛋糕有没有芒果成分”,如果Agent没有长期记忆,就不可能主动提醒用户过敏风险。
1.3 常见的记忆机制痛点
目前行业内Agent记忆机制的常见问题可以总结为以下6类:
- 上下文溢出:会话内容超过大模型窗口长度,直接被截断,导致关键信息丢失
- 跨会话遗忘:会话结束后记忆全部丢失,无法实现个性化、跨任务服务
- 召回噪音大:长期记忆召回的内容和当前请求无关,反而干扰大模型推理
- 记忆幻觉:记忆和事实不符,或者多个记忆冲突,导致大模型输出错误结果
- 一致性差:不同业务模块的记忆逻辑独立,同一个用户的记忆在不同Agent中不一致
- 合规风险:敏感信息未脱敏存储,无法满足GDPR等法规的“被遗忘权”要求
而AI Agent Harness的分层记忆机制,正是为了系统性解决以上所有痛点而设计的。
二、记忆机制的核心结构与要素
2.1 短期记忆(STM)的核心要素
短期记忆的核心定位是为当前会话/任务提供高速的上下文缓存,它的核心要素包括:
| 要素 | 说明 |
|---|---|
| 会话绑定 | 每个短期记忆实例和唯一的会话ID绑定,不同会话的记忆完全隔离 |
| 容量限制 | 一般设置为大模型上下文窗口的70%~80%,保留足够空间给当前请求和工具返回结果 |
| 高速存储 | 一般用Redis等内存数据库存储,读写延迟在毫秒级 |
| 生命周期管理 | 自带TTL过期机制,会话结束后自动过期,避免存储浪费 |
| 重要性评分 | 每条记忆都有重要性得分,截断时优先保留高得分内容 |
| 元数据 | 包括角色(用户/助理/工具)、时间戳、Token数、关联的工具调用ID等 |
2.2 长期记忆(LTM)的核心要素
长期记忆的核心定位是跨会话、跨任务的持久化知识存储,按照记忆类型可以分为4大类:
| 记忆类型 | 定义 | 示例 |
|---|---|---|
| 情景记忆(Episodic Memory) | 存储用户的历史交互事件、会话记录、操作行为等时序数据 | 用户上周三买了一部iPhone 15、用户上个月咨询过退货政策 |
| 事实记忆(Factual Memory) | 存储客观事实、用户属性、业务规则等静态数据 | 用户的收货地址是北京市朝阳区、用户对芒果过敏、订单超过7天不能退货 |
| 程序记忆(Procedural Memory) | 存储Agent的技能、工作流、工具使用方法等过程性知识 | 退货的流程是先查订单→确认是否符合条件→生成退货地址→通知用户 |
| 语义记忆(Semantic Memory) | 存储通用知识、行业知识库、产品手册等结构化/非结构化知识 | iPhone 15的保修期是1年、芒果过敏的症状是皮疹、呼吸困难 |
长期记忆的核心要素包括:
| 要素 | 说明 |
|---|---|
| 主体绑定 | 每个记忆实例和唯一的主体ID(用户ID/设备ID/企业ID等)绑定,不同主体的记忆完全隔离 |
| 无限容量 | 支持TB级别的存储规模,可水平扩展 |
| 持久化存储 | 一般用向量库+关系型数据库+对象存储组合存储,数据永久保存(除非主动删除) |
| 召回机制 | 支持向量召回、关键词召回、元数据过滤、重排序等多层召回逻辑 |
| 置信度评分 | 每条记忆都有置信度得分,冲突时优先保留高置信度内容 |
| 元数据 | 包括记忆类型、主体ID、标签、创建时间、更新时间、访问次数、来源、权限等级等 |
2.3 短期记忆 vs 长期记忆核心属性对比
我们从多个维度对两类记忆做详细对比,帮助大家更清晰地理解差异:
| 对比维度 | 短期记忆(STM) | 长期记忆(LTM) |
|---|---|---|
| 核心定位 | 单次会话的上下文缓存 | 跨会话的持久化知识存储 |
| 容量限制 | 一般4K~128K Token,和大模型窗口匹配 | 无上限,支持TB级扩展 |
| 持久性 | 临时存储,TTL一般1~24小时,会话结束可清空 | 永久存储,除非主动删除 |
| 访问速度 | 毫秒级,内存数据库存储 | 几十毫秒~几百毫秒,向量库+磁盘存储 |
| 存储介质 | Redis、本地内存等 | FAISS、Pinecone、PGVector、MongoDB、对象存储等 |
| 召回方式 | 直接读取全量(截断后) | 混合召回(向量+关键词+元数据过滤+重排序) |
| 核心用途 | 为当前会话提供上下文,避免重复交互 | 为Agent提供历史知识、个性化信息、业务规则 |
| 生命周期 | 和会话绑定,会话结束后归档或删除 | 独立于会话,永久有效 |
| 写入逻辑 | 每次交互实时写入 | 会话结束后批量归档,或主动触发写入 |
| 一致性要求 | 最终一致性即可 | 强一致性,避免记忆冲突 |
2.4 记忆机制的实体关系(ER)架构
我们用Mermaid ER图来清晰展示Harness记忆体系的实体关系:
2.5 记忆机制的交互流程
我们用Mermaid流程图来展示一次用户请求的完整记忆交互流程:
2.6 边界与外延:什么内容不需要存到记忆里?
很多开发者容易走入一个误区:把所有交互内容都存到记忆里,导致记忆库越来越臃肿,召回噪音越来越大。我们需要明确,以下内容不需要存储到记忆中:
- 实时工具调用结果:比如实时查询的天气、股票价格、物流信息等,这些数据时效性极强,下次查询需要重新获取,不需要存到记忆里
- 临时计算中间结果:比如大模型推理过程中的中间步骤、临时变量等,用完即可丢弃
- 低价值的交互内容:比如用户的无效输入、重复的无意义内容、测试数据等,不需要归档到长期记忆
- 敏感未脱敏内容:比如用户的银行卡号、身份证号、密码等,必须脱敏后才能存储,或者根本不存储
三、记忆机制的数学模型
记忆机制的核心算法都有严谨的数学支撑,我们来逐一解析核心公式和模型。
3.1 短期记忆重要性评分模型
短期记忆截断的核心是优先保留高重要性的内容,我们用加权求和的方式计算每条记忆的重要性得分:
Si=α⋅Ri+β⋅Ti+γ⋅Roi S_i = \alpha \cdot R_i + \beta \cdot T_i + \gamma \cdot Ro_i Si=α⋅Ri+β⋅Ti+γ⋅Roi
其中:
- SiS_iSi:第i条记忆的重要性得分,取值范围[0,1]
- RiR_iRi:第i条记忆和当前查询的相关性得分,取值范围[0,1],一般用向量余弦相似度计算
- TiT_iTi:第i条记忆的时间衰减得分,取值范围[0,1],越新的记忆得分越高,计算公式为 Ti=11+λ⋅ΔtT_i = \frac{1}{1 + \lambda \cdot \Delta t}Ti=1+λ⋅Δt1,其中Δt\Delta tΔt是记忆创建时间到当前时间的小时数,λ\lambdaλ是衰减系数,一般取0.1
- RoiRo_iRoi:第i条记忆的角色权重得分,取值范围[0,1],一般用户输入为1.0,助理回复为0.8,工具结果为0.6
- α、β、γ\alpha、\beta、\gammaα、β、γ是权重系数,满足α+β+γ=1\alpha + \beta + \gamma = 1α+β+γ=1,一般取α=0.5,β=0.3,γ=0.2\alpha=0.5, \beta=0.3, \gamma=0.2α=0.5,β=0.3,γ=0.2,可根据业务场景调整
3.2 长期记忆向量召回模型
长期记忆的第一层召回是向量召回,用查询向量和记忆向量的余弦相似度来衡量相关性:
sim(vq,vm)=vq⋅vm∣∣vq∣∣⋅∣∣vm∣∣ sim(v_q, v_m) = \frac{v_q \cdot v_m}{||v_q|| \cdot ||v_m||} sim(vq,vm)=∣∣vq∣∣⋅∣∣vm∣∣vq⋅vm
其中:
- vqv_qvq:用户查询的Embedding向量
- vmv_mvm:长期记忆的Embedding向量
- sim(vq,vm)sim(v_q, v_m)sim(vq,vm):两个向量的余弦相似度,取值范围[-1,1],值越大相关性越高,一般设置阈值0.6~0.8,超过阈值的记忆才会进入下一阶段
3.3 混合召回重排序模型(RRF倒数秩融合)
为了兼顾向量召回的语义相关性和关键词召回的字面相关性,我们用RRF(Reciprocal Rank Fusion)算法对多路召回的结果进行重排序:
RRF(d)=∑r∈R(d)1k+r RRF(d) = \sum_{r \in R(d)} \frac{1}{k + r} RRF(d)=r∈R(d)∑k+r1
其中:
- RRF(d)RRF(d)RRF(d):文档d的最终重排序得分
- R(d)R(d)R(d):文档d在各路召回结果中的排名集合
- rrr:文档d在某一路召回结果中的排名(从1开始)
- kkk:常数,一般取60,用来避免排名靠前的结果权重过高
RRF算法的优势是不需要训练,不需要统一各路召回的得分范围,直接基于排名融合,效果稳定,是工业界最常用的重排序算法之一。
3.4 记忆衰减模型(艾宾浩斯遗忘曲线的应用)
长期记忆的置信度会随时间衰减,我们参考艾宾浩斯遗忘曲线设计了记忆衰减模型:
C(t)=C0⋅e−λ⋅t C(t) = C_0 \cdot e^{-\lambda \cdot t} C(t)=C0⋅e−λ⋅t
其中:
- C(t)C(t)C(t):记忆在时间t的置信度
- C0C_0C0:记忆的初始置信度,取值范围[0,1],人工录入的知识初始置信度为1.0,从交互中提取的记忆初始置信度为0.7~0.9
- λ\lambdaλ:遗忘系数,不同类型的记忆取值不同,情景记忆取0.01(每天衰减1%),事实记忆取0.001(每天衰减0.1%),程序记忆和语义记忆取0(永久有效)
- ttt:记忆更新后经过的天数
当记忆的置信度衰减到低于0.5时,会触发记忆校验机制,重新确认记忆的有效性,避免过时记忆干扰Agent推理。
四、核心算法原理与实现
4.1 短期记忆管理算法
4.1.1 算法流程图
4.1.2 算法源代码(Python)
import redis
import tiktoken
import json
from typing import List, Dict, Optional
from pydantic import BaseModel
from datetime import datetime, timedelta
class STMMessage(BaseModel):
role: str
content: str
timestamp: datetime
importance_score: float = 0.5
token_count: int = 0
class ShortTermMemory:
def __init__(
self,
redis_url: str = "redis://localhost:6379/0",
max_context_tokens: int = 8000,
ttl: int = 86400,
alpha: float = 0.5,
beta: float = 0.3,
gamma: float = 0.2,
decay_lambda: float = 0.1
):
self.redis = redis.from_url(redis_url)
self.max_context_tokens = max_context_tokens
self.ttl = ttl # 短期记忆默认24小时过期
self.tokenizer = tiktoken.get_encoding("cl100k_base")
self.alpha = alpha # 相关性权重
self.beta = beta # 时间衰减权重
self.gamma = gamma # 角色权重
self.decay_lambda = decay_lambda # 时间衰减系数
def _calculate_token_count(self, content: str) -> int:
"""计算文本的Token数"""
return len(self.tokenizer.encode(content))
def _calculate_importance(self, message: STMMessage, query: Optional[str] = None, query_embedding: Optional[List[float]] = None) -> float:
"""计算记忆的重要性得分"""
# 1. 时间衰减得分
time_diff_hours = (datetime.now() - message.timestamp).total_seconds() / 3600
time_score = 1.0 / (1 + self.decay_lambda * time_diff_hours)
# 2. 角色权重得分
role_score_map = {"user": 1.0, "assistant": 0.8, "tool": 0.6, "system": 0.9}
role_score = role_score_map.get(message.role, 0.5)
# 3. 相关性得分
relevance_score = 0.5
if query_embedding and hasattr(self, 'embedding_model'):
# 实际生产环境用Embedding模型计算余弦相似度
message_embedding = self.embedding_model.encode(message.content)
dot_product = sum(a*b for a,b in zip(query_embedding, message_embedding))
norm_q = sum(a*a for a in query_embedding) ** 0.5
norm_m = sum(a*a for a in message_embedding) ** 0.5
relevance_score = dot_product / (norm_q * norm_m) if norm_q * norm_m != 0 else 0
elif query:
# 简化版:关键词匹配
query_words = set(query.lower().split())
content_words = set(message.content.lower().split())
if len(query_words) > 0:
relevance_score = len(query_words & content_words) / len(query_words)
return self.alpha * relevance_score + self.beta * time_score + self.gamma * role_score
def add_message(
self,
session_id: str,
role: str,
content: str,
importance_score: Optional[float] = None,
query: Optional[str] = None,
query_embedding: Optional[List[float]] = None
) -> None:
"""添加消息到短期记忆"""
message = STMMessage(
role=role,
content=content,
timestamp=datetime.now(),
token_count=self._calculate_token_count(content)
)
if importance_score:
message.importance_score = importance_score
else:
message.importance_score = self._calculate_importance(message, query, query_embedding)
# 写入Redis
key = f"stm:{session_id}"
self.redis.rpush(key, json.dumps(message.model_dump()))
self.redis.expire(key, self.ttl)
# 检查是否需要截断上下文
self._truncate_context(session_id)
def get_context(
self,
session_id: str,
query: Optional[str] = None,
query_embedding: Optional[List[float]] = None
) -> List[Dict]:
"""获取当前会话的上下文,按重要性排序"""
key = f"stm:{session_id}"
messages = self.redis.lrange(key, 0, -1)
if not messages:
return []
# 反序列化
stm_messages = [STMMessage(**json.loads(m)) for m in messages]
# 如果有查询,重新计算重要性得分
if query or query_embedding:
for msg in stm_messages:
msg.importance_score = self._calculate_importance(msg, query, query_embedding)
# 按重要性降序,相同得分按时间降序
stm_messages.sort(key=lambda x: (-x.importance_score, -x.timestamp.timestamp()))
return [{"role": msg.role, "content": msg.content} for msg in stm_messages]
def _truncate_context(self, session_id: str) -> None:
"""截断上下文,确保不超过最大Token数"""
key = f"stm:{session_id}"
messages = self.redis.lrange(key, 0, -1)
if not messages:
return
stm_messages = [STMMessage(**json.loads(m)) for m in messages]
total_tokens = sum(msg.token_count for msg in stm_messages)
if total_tokens <= self.max_context_tokens:
return
# 按重要性排序,保留高得分的
stm_messages.sort(key=lambda x: (-x.importance_score, -x.timestamp.timestamp()))
current_tokens = 0
retained_messages = []
for msg in stm_messages:
if current_tokens + msg.token_count <= self.max_context_tokens:
retained_messages.append(msg)
current_tokens += msg.token_count
else:
break
# 写回Redis
self.redis.delete(key)
for msg in retained_messages:
self.redis.rpush(key, json.dumps(msg.model_dump()))
self.redis.expire(key, self.ttl)
def archive_session(self, session_id: str) -> List[Dict]:
"""归档会话,返回需要写入长期记忆的内容"""
context = self.get_context(session_id)
# 过滤临时内容,比如工具返回的实时数据
filtered_context = [msg for msg in context if msg["role"] != "tool" or "real_time" not in msg["content"]]
# 清空短期记忆
self.redis.delete(f"stm:{session_id}")
return filtered_context
4.2 长期记忆管理算法
4.2.1 算法流程图
4.2.2 算法源代码(Python)
import faiss
import numpy as np
import psycopg2
from psycopg2.extras import Json
from typing import List, Dict, Optional
from sentence_transformers import SentenceTransformer
import json
from datetime import datetime
class LongTermMemory:
def __init__(
self,
pg_config: Dict,
embedding_model_name: str = "all-MiniLM-L6-v2",
similarity_threshold: float = 0.7,
top_k: int = 5,
rrf_k: int = 60
):
# 初始化PG连接,存储元数据
self.pg_conn = psycopg2.connect(**pg_config)
self._init_pg_table()
# 初始化Embedding模型
self.embedding_model = SentenceTransformer(embedding_model_name)
self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
# 初始化FAISS向量库,实际生产环境用Pinecone/PGVector等分布式向量库
self.index = faiss.IndexFlatL2(self.embedding_dim)
self.memory_id_map = {} # 向量索引到记忆ID的映射
self._load_existing_memories()
self.similarity_threshold = similarity_threshold
self.top_k = top_k
self.rrf_k = rrf_k
def _init_pg_table(self):
"""初始化PG表结构"""
with self.pg_conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS long_term_memory (
ltm_id VARCHAR(64) PRIMARY KEY,
subject_id VARCHAR(64) NOT NULL,
memory_type VARCHAR(32) NOT NULL,
content JSONB NOT NULL,
confidence FLOAT NOT NULL DEFAULT 0.8,
tags JSONB NOT NULL DEFAULT '[]'::JSONB,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
access_count INT NOT NULL DEFAULT 0,
permission_level VARCHAR(32) NOT NULL DEFAULT 'PRIVATE',
embedding BYTEA NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ltm_subject_id ON long_term_memory(subject_id);
CREATE INDEX IF NOT EXISTS idx_ltm_memory_type ON long_term_memory(memory_type);
CREATE INDEX IF NOT EXISTS idx_ltm_tags ON long_term_memory USING GIN(tags);
""")
self.pg_conn.commit()
def _load_existing_memories(self):
"""加载现有记忆到FAISS索引"""
with self.pg_conn.cursor() as cur:
cur.execute("SELECT ltm_id, embedding FROM long_term_memory")
rows = cur.fetchall()
for ltm_id, embedding_bytes in rows:
embedding = np.frombuffer(embedding_bytes, dtype=np.float32)
self.index.add(embedding.reshape(1, -1))
self.memory_id_map[len(self.memory_id_map)] = ltm_id
def _cosine_similarity(self, v1: np.ndarray, v2: np.ndarray) -> float:
"""计算余弦相似度"""
dot_product = np.dot(v1, v2)
norm_v1 = np.linalg.norm(v1)
norm_v2 = np.linalg.norm(v2)
return dot_product / (norm_v1 * norm_v2) if norm_v1 * norm_v2 != 0 else 0
def add_memory(
self,
subject_id: str,
content: Dict,
memory_type: str = "EPISODIC",
confidence: float = 0.8,
tags: List[str] = None,
permission_level: str = "PRIVATE"
) -> str:
"""添加记忆到长期记忆库"""
tags = tags or []
content_str = json.dumps(content, ensure_ascii=False)
embedding = self.embedding_model.encode(content_str).astype(np.float32)
# 重复记忆检测
if self.index.ntotal > 0:
distances, indices = self.index.search(embedding.reshape(1, -1), 1)
if indices[0][0] != -1:
existing_embedding = self.index.reconstruct(int(indices[0][0]))
similarity = self._cosine_similarity(embedding, existing_embedding)
if similarity > 0.9:
# 重复记忆,更新现有记忆的置信度和访问时间
existing_ltm_id = self.memory_id_map[int(indices[0][0])]
with self.pg_conn.cursor() as cur:
cur.execute("""
UPDATE long_term_memory
SET confidence = GREATEST(confidence, %s), update_time = CURRENT_TIMESTAMP, access_count = access_count + 1
WHERE ltm_id = %s
""", (confidence, existing_ltm_id))
self.pg_conn.commit()
return existing_ltm_id
# 生成新的记忆ID
ltm_id = f"ltm_{int(datetime.now().timestamp())}_{subject_id}"
# 写入PG
with self.pg_conn.cursor() as cur:
cur.execute("""
INSERT INTO long_term_memory (ltm_id, subject_id, memory_type, content, confidence, tags, permission_level, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (ltm_id, subject_id, memory_type, Json(content), confidence, Json(tags), permission_level, embedding.tobytes()))
self.pg_conn.commit()
# 添加到FAISS索引
self.index.add(embedding.reshape(1, -1))
self.memory_id_map[len(self.memory_id_map) - 1] = ltm_id
return ltm_id
def recall_memories(
self,
subject_id: str,
query: str,
memory_types: List[str] = None,
tags: List[str] = None,
top_k: int = None
) -> List[Dict]:
"""召回相关记忆"""
top_k = top_k or self.top_k
memory_types = memory_types or ["EPISODIC", "FACTUAL", "PROCEDURAL", "SEMANTIC"]
# 1. 生成查询向量
query_embedding = self.embedding_model.encode(query).astype(np.float32)
# 2. 向量召回
distances, indices = self.index.search(query_embedding.reshape(1, -1), top_k * 2)
vector_results = []
for idx, dist in zip(indices[0], distances[0]):
if idx == -1:
continue
ltm_id = self.memory_id_map[int(idx)]
similarity = 1 - dist / (np.linalg.norm(query_embedding) * np.linalg.norm(self.index.reconstruct(int(idx))))
if similarity >= self.similarity_threshold:
vector_results.append((ltm_id, similarity))
# 3. 关键词召回(简化版,实际生产环境用ES)
keyword_results = []
query_words = set(query.lower().split())
with self.pg_conn.cursor() as cur:
cur.execute("""
SELECT ltm_id, content FROM long_term_memory
WHERE subject_id = %s AND memory_type = ANY(%s)
""", (subject_id, memory_types))
rows = cur.fetchall()
for ltm_id, content in rows:
content_str = json.dumps(content, ensure_ascii=False).lower()
match_count = sum(1 for word in query_words if word in content_str)
if match_count > 0:
keyword_results.append((ltm_id, match_count / len(query_words)))
# 4. RRF重排序
rank_map = {}
# 向量召回排名
for rank, (ltm_id, score) in enumerate(sorted(vector_results, key=lambda x: -x[1]), 1):
if ltm_id not in rank_map:
rank_map[ltm_id] = []
rank_map[ltm_id].append(rank)
# 关键词召回排名
for rank, (ltm_id, score) in enumerate(sorted(keyword_results, key=lambda x: -x[1]), 1):
if ltm_id not in rank_map:
rank_map[ltm_id] = []
rank_map[ltm_id].append(rank)
# 计算RRF得分
rrf_scores = []
for ltm_id, ranks in rank_map.items():
rrf_score = sum(1 / (self.rrf_k + r) for r in ranks)
rrf_scores.append((ltm_id, rrf_score))
# 取Top K
rrf_scores.sort(key=lambda x: -x[1])
top_ltm_ids = [ltm_id for ltm_id, score in rrf_scores[:top_k]]
# 查询记忆详情
with self.pg_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT ltm_id, memory_type, content, confidence, tags, create_time, update_time
FROM long_term_memory
WHERE ltm_id = ANY(%s)
""", (top_ltm_ids,))
memories = [dict(row) for row in cur.fetchall()]
# 更新访问次数
with self.pg_conn.cursor() as cur:
cur.execute("""
UPDATE long_term_memory
SET access_count = access_count + 1
WHERE ltm_id = ANY(%s)
""", (top_ltm_ids,))
self.pg_conn.commit()
return memories
def delete_memories_by_subject(self, subject_id: str) -> int:
"""删除指定主体的所有记忆,满足GDPR被遗忘权要求"""
with self.pg_conn.cursor() as cur:
cur.execute("DELETE FROM long_term_memory WHERE subject_id = %s RETURNING ltm_id", (subject_id,))
deleted_ids = [row[0] for row in cur.fetchall()]
self.pg_conn.commit()
# 重建FAISS索引(生产环境用支持删除的向量库)
self.index.reset()
self.memory_id_map = {}
self._load_existing_memories()
return len(deleted_ids)
五、项目实战:企业级客服Agent的记忆系统实现
我们以一个实际的企业级客服Agent为例,完整展示Harness记忆机制的落地过程。
5.1 项目背景与需求
某电商企业需要搭建一个智能客服Agent,核心需求如下:
- 跨会话记忆用户的历史订单、历史咨询记录、个人偏好,不需要用户重复说明
- 支持100万+用户,每个用户的记忆存储周期为3年
- 上下文自动压缩,避免溢出,单次请求的上下文Token数不超过8K
- 支持用户主动删除所有个人记忆,满足合规要求
- 记忆召回准确率不低于90%
5.2 开发环境搭建
5.2.1 依赖安装
# 安装核心依赖
pip install redis tiktoken pydantic sentence-transformers faiss-cpu psycopg2-binary openai fastapi uvicorn
# 启动依赖服务
docker run -d -p 6379:6379 redis:7-alpine
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_DB=agent_harness postgres:15-alpine
5.2.2 系统架构设计
我们的客服Agent系统分为四层架构:
5.3 核心功能实现
5.3.1 记忆编排引擎实现
from typing import List, Dict
from openai import OpenAI
class MemoryOrchestrator:
def __init__(self, stm: ShortTermMemory, ltm: LongTermMemory, openai_api_key: str):
self.stm = stm
self.ltm = ltm
self.llm_client = OpenAI(api_key=openai_api_key)
self.system_prompt = """
你是电商平台的智能客服,你需要基于用户的历史记忆和当前请求回答问题,回答要简洁准确,不要编造信息。
如果需要调用工具,请明确说明需要调用的工具名称和参数。
以下是用户的相关记忆:
{memory_context}
"""
def process_request(self, session_id: str, subject_id: str, user_query: str) -> str:
# 1. 生成查询Embedding
query_embedding = self.ltm.embedding_model.encode(user_query)
# 2. 获取短期记忆
stm_context = self.stm.get_context(session_id, user_query, query_embedding)
# 3. 召回长期记忆
ltm_memories = self.ltm.recall_memories(subject_id, user_query)
ltm_context = "\n".join([f"- {json.dumps(mem['content'], ensure_ascii=False)}" for mem in ltm_memories])
# 4. 合并上下文
memory_context = f"短期记忆:\n{json.dumps(stm_context, ensure_ascii=False)}\n长期记忆:\n{ltm_context}"
system_prompt = self.system_prompt.format(memory_context=memory_context)
# 5. 调用LLM
messages = [
{"role": "system", "content": system_prompt},
*stm_context,
{"role": "user", "content": user_query}
]
response = self.llm_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1024,
temperature=0.3
)
reply = response.choices[0].message.content
# 6. 更新短期记忆
self.stm.add_message(session_id, "user", user_query, query=user_query, query_embedding=query_embedding)
self.stm.add_message(session_id, "assistant", reply)
# 7. 检查是否需要调用工具(简化版,实际生产环境用函数调用)
if "订单查询" in reply:
# 调用订单查询工具,获取结果,更新STM
order_info = {"order_id": "123456", "product": "iPhone 15", "status": "已发货", "logistics_no": "SF123456"}
self.stm.add_message(session_id, "tool", json.dumps(order_info, ensure_ascii=False))
# 重新调用LLM生成回复
messages.append({"role": "assistant", "content": reply})
messages.append({"role": "tool", "content": json.dumps(order_info, ensure_ascii=False)})
response = self.llm_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1024,
temperature=0.3
)
reply = response.choices[0].message.content
self.stm.add_message(session_id, "assistant", reply)
return reply
def end_session(self, session_id: str, subject_id: str) -> None:
"""结束会话,归档短期记忆到长期记忆"""
archived_context = self.stm.archive_session(session_id)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)