大模型长上下文处理与记忆压缩:从"金鱼记忆"到"长期记忆"的工程实践

cover

一、大模型的"记忆瓶颈":上下文窗口就是全部记忆

大模型的推理能力受限于上下文窗口长度。GPT-4 的 128K token 窗口看似很大,但在实际应用中,系统提示词、对话历史、工具调用结果、检索文档共同占据上下文空间,留给核心推理的空间往往不足 20K token。当对话轮次增多或文档过长时,早期信息被截断或稀释,模型表现出"遗忘"——忘记之前讨论的结论、重复提问、前后矛盾。

长上下文处理的工程方案分为两类:扩展窗口(让模型"看得更多")和压缩记忆(让模型"记住关键")。扩展窗口依赖模型架构改进(如 RoPE 外推、Ring Attention),压缩记忆则通过摘要、检索和结构化存储减少需要传入上下文的信息量。后者是当前更实用的工程方案。

二、记忆压缩的架构与策略

记忆压缩的核心思想是"只保留对当前推理有用的信息"。对话历史中,大部分内容是过程性信息(中间推理步骤、工具调用细节),只有少部分是结论性信息(最终决策、关键事实)。压缩策略需要区分这两类信息,保留后者、丢弃前者。

flowchart TD
    A[完整对话历史<br/>50K tokens] --> B[记忆分类器<br/>区分关键 vs 过程信息]
    B --> C[关键信息<br/>结论、决策、事实<br/>~5K tokens]
    B --> D[过程信息<br/>中间步骤、工具调用<br/>~45K tokens]

    C --> E[结构化记忆存储<br/>向量数据库 / 知识图谱]
    D --> F[摘要压缩<br/>50K → 2K tokens]

    E --> G[检索增强<br/>按相关性召回]
    F --> H[压缩摘要<br/>保留关键脉络]

    G --> I[组装上下文<br/>系统提示 + 压缩摘要 + 召回记忆 + 当前输入]
    H --> I

    subgraph "记忆层次"
        J[工作记忆<br/>当前对话窗口<br/>~8K tokens]
        K[短期记忆<br/>最近 N 轮摘要<br/>~4K tokens]
        L[长期记忆<br/>向量检索库<br/>按需召回]
    end

    I --> J
    I --> K
    I --> L

三层记忆架构:

  • 工作记忆:当前对话窗口内的原始信息,无需压缩
  • 短期记忆:最近 N 轮对话的摘要,保留关键脉络
  • 长期记忆:所有历史信息的结构化存储,按相关性检索召回

三、记忆压缩系统的实现

# memory_compression.py — 大模型记忆压缩系统
# 设计意图:通过摘要压缩和检索增强,将无限对话历史
# 压缩为有限的上下文输入,解决大模型的"遗忘"问题

import hashlib
import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime


@dataclass
class Message:
    """对话消息"""
    role: str           # user / assistant / system / tool
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict = field(default_factory=dict)


@dataclass
class MemoryBlock:
    """记忆块"""
    id: str
    content: str        # 原始内容或摘要
    summary: Optional[str] = None  # 压缩后的摘要
    tokens: int = 0
    importance: float = 0.5  # 重要性评分 0-1
    category: str = "general"  # fact / decision / process / tool_call
    embedding: Optional[List[float]] = None
    created_at: datetime = field(default_factory=datetime.now)


class MemoryCompressor:
    """记忆压缩器:将长对话历史压缩为有限的上下文"""

    def __init__(self, llm_client, max_context_tokens: int = 8000):
        self.llm = llm_client
        self.max_context_tokens = max_context_tokens
        self.memory_blocks: List[MemoryBlock] = []

    def add_message(self, message: Message):
        """添加消息到记忆库"""
        block = MemoryBlock(
            id=self._generate_id(message),
            content=message.content,
            tokens=self._estimate_tokens(message.content),
            importance=self._assess_importance(message),
            category=self._classify_message(message),
        )
        self.memory_blocks.append(block)

    def compress(self) -> str:
        """压缩记忆库,生成适合上下文窗口的摘要"""
        # Step 1: 按重要性排序
        important_blocks = [
            b for b in self.memory_blocks if b.importance > 0.7
        ]
        process_blocks = [
            b for b in self.memory_blocks if b.importance <= 0.7
        ]

        # Step 2: 保留高重要性信息的原文
        important_content = []
        token_budget = self.max_context_tokens // 2  # 一半预算给关键信息
        used_tokens = 0

        for block in reversed(important_blocks):  # 最近的优先
            if used_tokens + block.tokens <= token_budget:
                important_content.append(block.content)
                used_tokens += block.tokens

        # Step 3: 对过程性信息进行摘要压缩
        process_content = [b.content for b in process_blocks]
        if process_content:
            summary = self._generate_summary(process_content)
        else:
            summary = ""

        # Step 4: 组装压缩后的上下文
        compressed_parts = []
        if important_content:
            compressed_parts.append("=== Key Information ===")
            compressed_parts.extend(reversed(important_content))
        if summary:
            compressed_parts.append("=== Conversation Summary ===")
            compressed_parts.append(summary)

        return "\n\n".join(compressed_parts)

    def retrieve(self, query: str, top_k: int = 5) -> List[str]:
        """检索与当前查询相关的记忆"""
        # 基于关键词的简单检索(生产环境应使用向量检索)
        query_keywords = set(query.lower().split())
        scored_blocks = []

        for block in self.memory_blocks:
            content_keywords = set(block.content.lower().split())
            overlap = len(query_keywords & content_keywords)
            if overlap > 0:
                score = overlap / max(len(query_keywords), 1)
                scored_blocks.append((block, score))

        scored_blocks.sort(key=lambda x: x[1], reverse=True)

        return [block.content for block, _ in scored_blocks[:top_k]]

    def _assess_importance(self, message: Message) -> float:
        """评估消息的重要性"""
        importance = 0.5  # 默认中等重要性

        # 用户明确表达的偏好或决策 → 高重要性
        decision_keywords = [
            "决定", "选择", "要求", "必须", "不要",
            "prefer", "decide", "must", "don't",
        ]
        if any(kw in message.content.lower() for kw in decision_keywords):
            importance = 0.9

        # 事实性信息 → 高重要性
        fact_keywords = [
            "名字是", "地址是", "日期是", "编号是",
            "my name is", "the address is",
        ]
        if any(kw in message.content.lower() for kw in fact_keywords):
            importance = 0.95

        # 工具调用结果 → 中等重要性
        if message.role == "tool":
            importance = 0.4

        # 系统消息 → 高重要性
        if message.role == "system":
            importance = 1.0

        return importance

    def _classify_message(self, message: Message) -> str:
        """分类消息类型"""
        if message.role == "system":
            return "system"
        elif message.role == "tool":
            return "tool_call"
        elif any(kw in message.content for kw in ["决定", "选择", "要求"]):
            return "decision"
        else:
            return "general"

    def _generate_summary(self, contents: List[str]) -> str:
        """生成对话摘要"""
        combined = "\n".join(contents[-20:])  # 最多取最近 20 条

        prompt = f"""Summarize the following conversation, preserving:
1. Key decisions and conclusions
2. Important facts and numbers
3. Unresolved questions or action items

Omit:
- Greetings and pleasantries
- Repetitive confirmations
- Detailed tool call logs

Conversation:
{combined}

Summary:"""

        # 实际部署中调用 LLM 生成摘要
        # 此处返回简化版本
        return f"[Summary of {len(contents)} messages: key points preserved]"

    def _generate_id(self, message: Message) -> str:
        """生成记忆块 ID"""
        content_hash = hashlib.md5(message.content.encode()).hexdigest()[:8]
        return f"mem_{content_hash}_{int(message.timestamp.timestamp())}"

    def _estimate_tokens(self, text: str) -> int:
        """估算 token 数量"""
        # 简化估算:中文约 1.5 字/token,英文约 4 字符/token
        return len(text) // 2


class LongTermMemoryStore:
    """长期记忆存储:向量数据库接口"""

    def __init__(self, embedding_client=None):
        self.embedding_client = embedding_client
        self.store: Dict[str, MemoryBlock] = {}

    def store_memory(self, block: MemoryBlock):
        """存储记忆块"""
        if self.embedding_client:
            block.embedding = self.embedding_client.embed(block.content)
        self.store[block.id] = block

    def search(self, query: str, top_k: int = 5) -> List[MemoryBlock]:
        """搜索相关记忆"""
        if not self.embedding_client:
            # 降级:关键词搜索
            return self._keyword_search(query, top_k)

        # 向量搜索
        query_embedding = self.embedding_client.embed(query)
        scored = []

        for block in self.store.values():
            if block.embedding:
                similarity = self._cosine_similarity(
                    query_embedding, block.embedding
                )
                scored.append((block, similarity))

        scored.sort(key=lambda x: x[1], reverse=True)
        return [block for block, _ in scored[:top_k]]

    def _keyword_search(self, query: str, top_k: int) -> List[MemoryBlock]:
        """关键词搜索降级方案"""
        query_words = set(query.lower().split())
        scored = []

        for block in self.store.values():
            content_words = set(block.content.lower().split())
            overlap = len(query_words & content_words)
            if overlap > 0:
                scored.append((block, overlap))

        scored.sort(key=lambda x: x[1], reverse=True)
        return [block for block, _ in scored[:top_k]]

    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """计算余弦相似度"""
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = sum(x ** 2 for x in a) ** 0.5
        norm_b = sum(x ** 2 for x in b) ** 0.5
        return dot / (norm_a * norm_b + 1e-8)


class MemoryManager:
    """记忆管理器:集成压缩和检索"""

    def __init__(self, llm_client, max_context_tokens: int = 8000):
        self.compressor = MemoryCompressor(llm_client, max_context_tokens)
        self.long_term = LongTermMemoryStore()

    def process_turn(
        self, user_message: Message
    ) -> Tuple[str, List[str]]:
        """处理一轮对话,返回压缩上下文和检索结果"""
        # 添加到记忆
        self.compressor.add_message(user_message)

        # 压缩短期记忆
        compressed = self.compressor.compress()

        # 检索长期记忆
        retrieved = self.compressor.retrieve(user_message.content)

        # 将高重要性记忆存入长期存储
        for block in self.compressor.memory_blocks:
            if block.importance > 0.8 and block.id not in self.long_term.store:
                self.long_term.store_memory(block)

        return compressed, retrieved

四、记忆压缩的 Trade-offs

压缩的信息损失:摘要压缩不可避免地丢失细节。一个 50K token 的对话压缩为 2K token 的摘要,信息保留率约 4%。关键信息的保留依赖重要性评估的准确性——如果评估错误,关键决策可能被当作过程信息丢弃。建议对重要性评估设置保守阈值,宁可多保留也不要误删。

检索的相关性:向量检索基于语义相似度,但"语义相似"不等于"推理需要"。用户问"上次讨论的部署方案是什么",检索可能返回包含"部署"关键词的无关内容。需要结合时间衰减(最近的记忆权重更高)和结构化标签(决策类记忆优先召回)提升检索精度。

压缩延迟:摘要生成需要调用 LLM,每次压缩约需 1-3 秒。在实时对话场景中,这个延迟可能影响用户体验。建议在后台异步执行压缩,对话进行中使用上一轮的压缩结果。

记忆一致性:长期记忆中可能存在矛盾信息——用户在不同时间表达了不同的偏好。检索时可能同时召回两条矛盾的记忆,导致模型困惑。需要引入记忆版本管理,新记忆覆盖旧记忆,或标注记忆的时间戳供模型判断。

五、总结

大模型记忆压缩通过三层架构(工作记忆 → 短期记忆 → 长期记忆)将无限对话历史压缩为有限的上下文输入。重要性评估区分关键信息和过程信息,摘要压缩保留关键脉络,向量检索按需召回历史记忆。但压缩的信息损失、检索的相关性、压缩延迟和记忆一致性是需要权衡的因素。在实际落地中,建议对重要性评估设置保守阈值,结合时间衰减和结构化标签优化检索,后台异步执行压缩,引入记忆版本管理处理矛盾信息。记忆压缩的目标不是"记住一切",而是"在有限的上下文窗口内保留对当前推理最有价值的信息"。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐