文章目录

第 0 章 引子:为什么 LLM 需要一个"记忆层"?

大语言模型有两个先天限制:

  1. 无状态:每次 API 调用之间不共享任何信息,所有"上下文"必须显式写进 messages 数组;
  2. 上下文窗口有限:即便是 200K token 的模型,把所有历史塞进去也会导致延迟、成本、注意力稀释一起恶化。

最朴素的解法是"把对话写进向量库,查询时召回 Top-K",这就是早期的 RAG-style memory。但实践中很快暴露出几个新问题:

  • 粒度太粗:一条用户消息可能同时包含"姓名"、“偏好”、“日程”,向量召回时只能整段返回,无法精确利用;
  • 结构化缺失:用户说过"我喜欢川菜"和"我讨厌香菜",应该作为可独立查询、可独立删除的两条事实,而不是绑在同一段文本里;
  • 时间线丢失:对话是有顺序的,"昨天用户改主意了"这种信息必须保留顺序;
  • 多租户隔离:一个 Agent 通常要同时服务多个用户/会话,记忆必须可按维度切割;
  • 模型可替换:今天用 OpenAI,明天换 Bedrock,记忆不能跟着丢。

MemMachine 的设计哲学,就是模拟人类的认知分层来解决这些问题:

┌────────────────────────────────────────────────────────────┐
│                        人类大脑                              │
├────────────────────────────────────────────────────────────┤
│  工作记忆      :  正在想的事     (秒级)                       │
│  情景记忆      :  发生过什么     (按时间,可回放)              │
│  语义记忆      :  我知道什么     (脱离时间的事实/常识)          │
└────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌────────────────────────────────────────────────────────────┐
│                       MemMachine                            │
├────────────────────────────────────────────────────────────┤
│  Working Memory   :  当前会话内存(短期上下文)              │
│  Episodic Memory  :  对话/事件流(时间序列,可检索)          │
│  Semantic Memory  :  用户画像(LLM 抽取的结构化事实)         │
└────────────────────────────────────────────────────────────┘

接下来的所有章节,都是围绕这三种记忆,以及围绕它们的存储、检索、生命周期管理展开的。


第 1 章 30 秒看懂 MemMachine

1.1 一张图看懂

存储与模型

EpisodicMemory 单实例

核心服务层

摘要

API 层 FastAPI

projects

memories

metrics, /health

客户端层

Python SDK

TS SDK

REST

MCP

MemMachine 主类

EpisodicMemoryManager

SemanticSessionManager

RetrievalAgent

ResourceManager

ShortTermMemory
滚动摘要

LongTermMemory
声明式记忆

EventMemory
v0.3.x 新增

EpisodeStore
PostgreSQL/SQLite

VectorGraphStore
Neo4j/NebulaGraph

VectorStore
Qdrant/SQLite/sqlite-vec

SemanticStorage
pgvector/Neo4j

LLM / Embedder / Reranker

1.2 关键模块的源码位置

packages/server/src/memmachine_server/
├── main/memmachine.py             # 总入口 MemMachine 主类
├── episodic_memory/
│   ├── episodic_memory.py         # 单会话情景记忆 (STM+LTM 编排)
│   ├── episodic_memory_manager.py # 多会话/LRU 缓存
│   ├── short_term_memory/         # deque + 滚动摘要
│   ├── long_term_memory/          # 基于 VectorGraphStore 的 LTM
│   ├── declarative_memory/        # LTM 的底层:派生 + 嵌入
│   └── event_memory/              # v0.3.x 新一代 LTM
│       ├── event_memory.py
│       ├── segmenter/             # Event → Segment
│       ├── deriver/               # Segment → Derivative
│       └── segment_store/         # Segment 持久化
├── semantic_memory/
│   ├── semantic_memory.py         # 后台抽取服务
│   ├── semantic_ingestion.py      # 单 set 的处理循环
│   ├── semantic_llm.py            # LLM 抽取/巩固调用
│   ├── cluster_manager.py         # 相邻语义特征聚类
│   └── util/semantic_prompt_template.py  # Profile 提示词
├── retrieval_agent/
│   ├── service_locator.py         # 工厂函数
│   └── agents/
│       ├── tool_select_agent.py   # LLM 路由
│       ├── coq_agent.py           # Chain-of-Query 迭代式
│       ├── split_query_agent.py   # 多实体并发查询
│       └── memmachine_retriever.py# 直接查 LTM
├── common/
│   ├── vector_store/              # 向量库统一抽象
│   ├── vector_graph_store/        # 图+向量统一抽象
│   ├── episode_store/             # 原始 Episode 持久化
│   ├── embedder/, reranker/, language_model/
│   ├── filter/                    # FilterExpr DSL
│   └── metrics_factory/           # Prometheus
└── server/                        # FastAPI + MCP HTTP/Stdio

后续所有章节都会回链到具体文件与行号。


第 2 章 核心数据模型

理解 MemMachine 之前,先认清下面几个数据类型。

2.1 Episode:一切的起点

所有写入 MemMachine 的最小单位都是 Episode。它定义在 packages/server/src/memmachine_server/common/episode_store/ 之下:

class Episode:
    uid: str                    # 唯一标识符
    content: str                # 消息内容
    session_key: str            # 所属会话,例如 "my_org/my_project"
    producer_id: str            # 内容产生者,例如 "alice"
    producer_role: str          # 角色:user / assistant / system
    produced_for_id: str        # 内容接收者,例如 "travel_agent"
    episode_type: EpisodeType   # MESSAGE / TEXT / ...
    content_type: ContentType   # MESSAGE / TEXT / ...
    sequence_num: int           # 会话内的递增序号
    created_at: datetime
    metadata: dict              # 业务自定义元数据
    filterable_metadata: dict   # 用于检索过滤的元数据子集

Episode 与一个 OpenAI/Anthropic 消息几乎一一对应;但相比裸消息,它额外携带了 session_keyfilterable_metadatasequence_num,这些字段是后续多租户隔离、按维度过滤、跨会话排序的关键。

2.2 Segment / Derivative:事件记忆里的派生单元

EventMemory 引入了"事件 → 片段 → 派生"的三级抽象,源码:packages/server/src/memmachine_server/episodic_memory/event_memory/data_types.py:96-188

class Event:
    uuid: UUID
    timestamp: datetime
    context: Context            # ProducerContext | NullContext
    blocks: list[Block]         # 一条事件可包含多个 Block

class Segment:                  # Event 的最小可检索片段
    uuid: UUID
    event_uuid: UUID
    index: int                  # 在 event.blocks 中的位置
    offset: int                 # 在 block 内的切分偏移
    timestamp: datetime
    block: Block

class Derivative:               # Segment 派生出的可向量化文本
    uuid: UUID
    segment_uuid: UUID
    block: Block                # 通常是 TextBlock

为什么要分这么细?

真正写进向量库的是 Derivative,不是 Event 本身。一条 Event 可能展开为 5 个 Segment、10 个 Derivative,从而把召回粒度精细到句子级别。

2.3 SemanticFeature:语义记忆的最小单元

# packages/server/src/memmachine_server/semantic_memory/semantic_model.py:70-103
class SemanticFeature:
    set_id: SetIdT              # 所属"集合"(典型为 org/project 或 user)
    category: str               # 分类,如 "profile_prompt"
    tag: str                    # 顶级标签,如 "Demographic Information"
    feature_name: str           # 二级特征名,如 "name"
    value: str                  # 实际值,如 "张三"
    metadata: Metadata          # 包含 id、citations(关联 Episode UID)

注意它是 三元组 + 引用(tag, feature_name, value) 描述事实,metadata.citations 反向链回原始 Episode,便于做"为什么记忆里有这条?"的可追溯审计。

2.4 三层命名空间

MemMachine 使用三层结构隔离记忆:

Organization (org_id)
  └── Project (project_id)
        └── Session (session_id)
              └── 具体的 Episode / Feature

源码里它们被合并成一个字符串:

session_key = f"{org_id}/{project_id}"  # 形式由配置决定

session_key 是所有底层存储的过滤主键。多租户场景下:

  • org_id 用来隔离组织(一般对应公司/客户);
  • project_id 用来隔离产品/应用(不同 Agent 互不可见);
  • metadata.user_id / metadata.agent_id / metadata.session_id 在同一个 (org, project) 内做更细粒度切分。

提示:SDK 里 Project.memory(user_id=..., agent_id=..., session_id=...) 调用,最终都会把这些字段写到 Episode 的 metadatafilterable_metadata,从而保证后续检索可以按它们过滤。


第 3 章 写入路径:一条消息进来之后发生了什么

这是 MemMachine 工作流里最关键的一段代码。理解它,你就理解了 80% 的 MemMachine。

3.1 入口:MemMachine.add_episodes()

代码位置:packages/server/src/memmachine_server/main/memmachine.py:649-703。删掉日志、计数器之后的核心逻辑:

async def add_episodes(
    self,
    session_data,
    episode_entries,
    *,
    target_memories=ALL_MEMORY_TYPES,
) -> list[EpisodeIdT]:
    # 1) 原始 Episode 永久落盘(关系型/嵌入式存储)
    episode_storage = await self._resources.get_episode_storage()
    episodes = await episode_storage.add_episodes(
        session_data.session_key, episode_entries,
    )

    tasks = []

    # 2) 情景记忆(STM + LTM/EventMemory)
    if MemoryType.Episodic in target_memories:
        async with episodic_memory_manager.open_or_create_episodic_memory(...) as ep:
            tasks.append(ep.add_memory_episodes(episodes))

    # 3) 语义记忆(异步排队,等待后台抽取)
    if MemoryType.Semantic in target_memories:
        tasks.append(
            semantic_session_manager.add_message(
                episodes=episodes, session_data=session_data,
            )
        )

    await asyncio.gather(*tasks)      # 步骤 2、3 并发执行
    return [e.uid for e in episodes]

划重点:

  1. 始终先落 EpisodeStore:哪怕你显式禁用了 Episodic/Semantic,原始事件也已经持久化了,这是事后审计与回溯的基础;
  2. Episodic / Semantic 写入并发:通过 asyncio.gather,不会互相阻塞;
  3. Semantic 不是同步抽取add_message 只是把 Episode 加入"待抽取"队列,真正的 LLM 调用发生在 SemanticService 的后台任务里(见 3.5 节)。

3.2 Episodic 路径:EpisodicMemory.add_memory_episodes()

源码:packages/server/src/memmachine_server/episodic_memory/episodic_memory.py:208-242

async def add_memory_episodes(self, episodes: list[Episode]) -> None:
    # 把 metadata 中所有"标量"提取到 filterable_metadata,
    # 这是后续按用户/会话/标签过滤的关键
    for episode in episodes:
        if episode.metadata is not None and episode.filterable_metadata is None:
            episode.filterable_metadata = {
                k: v for k, v in episode.metadata.items()
                if isinstance(v, get_args(PropertyValue))
            }

    tasks: list[Coroutine] = []
    if self._short_term_memory:
        tasks.append(self._short_term_memory.add_episodes(episodes))
    if self._long_term_memory:
        tasks.append(self._long_term_memory.add_episodes(episodes))
    await asyncio.gather(*tasks)

注意 STM 与 LTM 是并发写入 的。它们彼此独立、互不依赖;这意味着你可以单独开启 STM(最便宜,只有内存 + 偶尔的 LLM 摘要)或单独开启 LTM(重一些,需要向量库与图库),也可以同时启用。

3.3 ShortTermMemory:deque + 滚动摘要

源码:packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py

它的逻辑非常直观——一个带容量的双端队列

# packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py:187-209
async def add_episodes(self, episodes: list[Episode]) -> bool:
    async with self._lock.write_lock():
        self._memory.extend(episodes)
        self._current_episode_count += len(episodes)
        self._current_message_len += sum(len(e.content) for e in episodes)
        full = await self._is_full()
        if full:
            await self._do_evict()
        return full

当总长度 = 已存 episode 字符数 + 摘要字符数 超过 message_capacity(默认 64000,单位为字符)时,触发 eviction

# packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py:222-240
async def _do_evict(self) -> None:
    # 1) 先把已经被摘要过的旧 episode 直接淘汰
    while len(self._memory) > self._current_episode_count and await self._is_full():
        self._current_message_len -= len(self._memory[0].content)
        self._memory.popleft()

    # 2) 还是满,就把所有 episode 异步压缩成一份新摘要
    if len(self._memory) > 0 and await self._is_full():
        result = list(self._memory)
        self._current_episode_count = 0
        await self._consolidator.summarize(result)

摘要的提示词模板由 packages/server/src/memmachine_server/common/configuration/default_episode_summary_system_prompt.txt 等文件提供,包含"保留实体/事件/未决问题"等关键约束。

为什么用滚动摘要而不是滑动窗口?

  • 单纯丢弃旧 Episode 会导致长会话失忆;
  • 全部塞入上下文会爆 token;
  • 滚动摘要做"压缩 + 增量重写",让短期记忆始终保持一个有界但语义完整的视图。

3.4 LongTermMemory:声明式记忆 + 图存储

LTM 的核心抽象是 DeclarativeMemory:把 Episode 派生为可向量召回的 Derivative,再落到一个同时具备图能力与向量能力的存储(默认 Neo4j)。

源码:packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py:99-211

async def add_episodes(self, episodes: Iterable[Episode]) -> None:
    episodes = sorted(episodes, key=lambda e: (e.timestamp, e.uid))

    # 1) Episode → Node:保留原始字段 + 业务可过滤字段
    episode_nodes = [Node(uid=e.uid, properties={...}) for e in episodes]

    # 2) 并发派生 Derivative(默认 1 episode → N derivative)
    episodes_derivatives = await asyncio.gather(
        *[self._derive_derivatives(e) for e in episodes]
    )
    derivatives = [d for lst in episodes_derivatives for d in lst]

    # 3) 批量嵌入 Derivative 文本
    derivative_embeddings = await self._embedder.ingest_embed(
        [d.content for d in derivatives],
    )

    # 4) 写入两类节点
    derivative_nodes = [
        Node(uid=d.uid, properties={...},
             embeddings={emb_name: (embedding, similarity_metric)})
        for d, embedding in zip(derivatives, derivative_embeddings)
    ]

    await asyncio.gather(
        self._vector_graph_store.add_nodes(self._episode_collection, episode_nodes),
        self._vector_graph_store.add_nodes(self._derivative_collection, derivative_nodes),
    )

    # 5) 建立 (Derivative)-[DERIVED_FROM]->(Episode) 关系
    await self._vector_graph_store.add_edges(
        relation=self._derived_from_relation,
        source_collection=self._derivative_collection,
        target_collection=self._episode_collection,
        edges=derivative_episode_edges,
    )

派生逻辑见 _derive_derivatives

# packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py:213-272
match episode.content_type:
    case ContentType.MESSAGE:
        if not self._message_sentence_chunking:
            return [Derivative(content=f"{episode.source}: {episode.content}", ...)]
        sentences = extract_sentences(episode.content)
        return [
            Derivative(content=f"{episode.source}: {sentence}", ...)
            for sentence in sentences
        ]
    case ContentType.TEXT:
        return [Derivative(content=episode.content, ...)]

两个值得关注的设计:

  1. 每条 derivative 文本前会拼接 {producer}:。这是给向量模型"加 context"的一种廉价手法,让 “alice: 我喜欢川菜” 与 “bob: 我讨厌川菜” 在嵌入空间天然分开。
  2. 可选 message_sentence_chunking:开启后,按句子切分嵌入;适合"一条消息里夹杂多种事实"的场景。

最终的 Neo4j 图结构如下:

(:Episode {uid, content, producer_id, ...})
   ▲
   │ [DERIVED_FROM]
   │
(:Derivative {uid, content, embedding, ...})

向量索引建立在 Derivative 上;命中之后通过 DERIVED_FROM 反向找回 Episode。

3.5 EventMemory:v0.3.x 引入的新一代情景记忆

EventMemory 与 LongTermMemory 在功能上等价(都是"长期、可检索的情景记忆"),但实现思路不同。它的核心在于:

  • 把"图能力"从存储底座中剥离(不再强依赖 Neo4j);
  • 用通用的 VectorStore(Qdrant / SQLite-USearch / SQLite-vec)做向量召回;
  • 用独立的 SegmentStore 维护片段邻接关系,实现"以 seed 为中心扩展前后文"。

源码:packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py:204-307

async def _encode_events(self, events: Iterable[Event]) -> None:
    events = list(events); self._validate_events(events)

    # 阶段 1:Segmenter 把 Event 切成 Segment
    segment_lists = await asyncio.gather(
        *(self._segmenter.segment(event) for event in events)
    )
    segments = [s for lst in segment_lists for s in lst]

    # 阶段 2:Deriver 从 Segment 派生 Derivative
    derivative_lists = await asyncio.gather(
        *(self._deriver.derive(seg) for seg in segments)
    )
    segments_to_derivatives = dict(zip(segments, derivative_lists))
    derivatives = [d for lst in derivative_lists for d in lst]

    # 阶段 3:批量嵌入
    derivative_texts = [EventMemory._extract_text(d.block) for d in derivatives]
    derivative_embeddings = await self._embedder.ingest_embed(derivative_texts)

    # 阶段 4:写 SegmentStore(保留 Segment 之间的邻接顺序)
    await self._segment_store_partition.add_segments({
        seg: [d.uuid for d in ds] for seg, ds in segments_to_derivatives.items()
    })

    # 阶段 5:写 VectorStore(按 Derivative 维度)
    derivative_records = [EventMemory._build_derivative_record(d, e)
                          for d, e in zip(derivatives, derivative_embeddings)]
    if derivative_records:
        await self._vector_store_collection.upsert(records=derivative_records)

更进一步:

  • 五个阶段都在 Prometheus 上以 histogram 形式上报(标签 phase=segmentation/derivation/embedding/segment_store/vector_store),便于运维定位性能瓶颈;
  • _validate_events 会拒绝带保留字段的事件_segment_uuid_timestamp 是 EventMemory 占用的保留字段);
  • 写入 derivative 时同时写两类属性:系统属性(_segment_uuid_timestamp)与用户自定义属性(直接落到向量库 record 的 properties)。

EventMemory vs LongTermMemory 怎么选?

  • 想用 Neo4j 一体化做"图遍历 + 向量召回",选 LTM;
  • 想跑在轻量级单机(SQLite/sqlite-vec),或想把向量库换成 Qdrant 集群、不引入图数据库,选 EventMemory;
  • 两者可以并存,但同一会话通常二选一即可。

3.6 SemanticMemory:把对话蒸馏成"用户画像"

写入路径里最有意思的一段是语义记忆。它不是同步抽取,而是依赖一个后台轮询任务

# packages/server/src/memmachine_server/semantic_memory/semantic_memory.py:783-836
async def _background_ingestion_task(self) -> None:
    ingestion_service = IngestionService(
        params=IngestionService.Params(
            semantic_storage=self._semantic_storage,
            resource_retriever=self._set_id_resource,
            history_store=self._episode_storage,
            max_features_per_update=self._max_features_per_update,
        ),
    )
    backoff_sec = self._background_ingestion_interval_sec

    while not self._is_shutting_down:
        # 1) 找出"有未抽取消息 / 距上次抽取已经超时"的 set
        dirty_sets = [
            s async for s in self._semantic_storage.get_history_set_ids(
                min_uningested_messages=self._feature_update_message_limit,
                older_than=datetime.now(tz=UTC) - self._feature_time_limit,
            )
        ]
        if len(dirty_sets) == 0:
            await self._interruptible_sleep(self._background_ingestion_interval_sec)
            continue

        # 2) 批量处理,失败则指数退避
        try:
            await ingestion_service.process_set_ids(dirty_sets)
        except Exception:
            had_errors = True
            ...

        # 3) 清理已被消化的历史行
        purged = await self._semantic_storage.purge_ingested_rows(dirty_sets)

每个 set 的处理逻辑:

# packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py:122-278
async def _process_single_set(self, set_id: str) -> None:
    resources = await self._resource_retriever(set_id)         # 拿 LLM / Embedder / Category
    history_ids = [h async for h in self._semantic_storage.get_history_messages(
        set_ids=[set_id], limit=5, is_ingested=False,
    )]
    ... # 加载原始 Episode

    async def process_semantic_type(semantic_category):
        for message in messages:
            features = [...]   # 已有特征作为"旧 profile"喂给 LLM
            commands = await llm_feature_update(
                features=features,
                message_content=message.content,
                model=resources.language_model,
                update_prompt=semantic_category.prompt.update_prompt,
            )
            await self._apply_commands(commands=commands, ...)

    await asyncio.gather(*[
        process_semantic_type(t) for t in resources.semantic_categories
    ])

    await self._semantic_storage.mark_messages_ingested(...)
    await self._consolidate_set_memories_if_applicable(set_id=set_id, resources=resources)

LLM 调用本身在 llm_feature_update 中(packages/server/src/memmachine_server/semantic_memory/semantic_llm.py:69-101),输出格式严格约束为:

{ "commands": [
    {"command": "add",    "tag": "Demographic Information", "feature": "name",  "value": "张三"},
    {"command": "delete", "tag": "Hobbies & Interests",     "feature": "smoker"}
]}

为什么要走"命令"而不是"覆盖式更新"?

  • 命令易于撤销与回放;
  • 命令天然支持并发:多个 category 各自独立产出命令;
  • delete 命令使得"用户改主意"这种场景可以优雅落地:旧值删除 + 新值新增。

Profile 抽取提示词的完整骨架在 packages/server/src/memmachine_server/semantic_memory/util/semantic_prompt_template.py:6-153。它把语义记忆设计成两级 K-V

tag (大类,如 "Demographic Information")
   └─ feature (具体特征名,如 "name")
        └─ value (值,如 "张三")

预置的 6 个 category(profile_prompt / coding_prompt / writing_assistant_prompt / financial_analyst_prompt / health_assistant_prompt / crm_prompt)都基于这套骨架定制(见 packages/server/src/memmachine_server/server/prompt/default_prompts.py:19-26)。

最后一步是巩固(consolidation):当语义特征积累到阈值(默认 20),就调用 llm_consolidate_features 把"重复 / 关联 / 矛盾"的特征合并、改写、删除。提示词在同一文件的 build_consolidation_prompt,思路类似神经网络的"长时程巩固"——把"原始记忆矿石 → 纯净的记忆颗粒 → 分箱 → 合金记忆"。

3.7 写入路径全景图

VectorGraphStore / VectorStore LLM (抽取) SemanticService 后台任务 SemanticSessionManager LongTermMemory / EventMemory ShortTermMemory EpisodicMemoryManager EpisodeStore MemMachine 主类 FastAPI Router Client VectorGraphStore / VectorStore LLM (抽取) SemanticService 后台任务 SemanticSessionManager LongTermMemory / EventMemory ShortTermMemory EpisodicMemoryManager EpisodeStore MemMachine 主类 FastAPI Router Client par [并发] 每 N 秒轮询一次 POST /api/v2/memories add_episodes(session, entries) add_episodes() open_or_create_episodic_memory() add_episodes() add_episodes() add_nodes + add_edges add_message() episode_ids 200 OK get_history_messages() llm_feature_update() SemanticCommand[] apply_commands() (写 pgvector) llm_consolidate_features()

到这里,我们已经看完了"写"的完整故事。接下来是"读"。


第 4 章 读取路径:一次查询是如何被回答的

4.1 入口:MemMachine.query_search()

源码:packages/server/src/memmachine_server/main/memmachine.py:916-986

async def query_search(self, session_data, *, target_memories, query, limit, expand_context,
                      score_threshold, search_filter, agent_mode=False, ...) -> SearchResponse:
    property_filter = parse_filter(search_filter) if search_filter else None
    episodic_task = None
    semantic_task = None

    if MemoryType.Episodic in target_memories:
        retrieval_agent = await self._get_retrieval_agent() if agent_mode else None
        episodic_task = asyncio.create_task(self._search_episodic_memory(
            session_data=session_data, query=query, limit=limit, ...,
            retrieval_agent=retrieval_agent,
        ))

    if MemoryType.Semantic in target_memories:
        semantic_session = await self._resources.get_semantic_session_manager()
        async def _collect_semantic_results():
            return [f async for f in semantic_session.search(
                message=query, session_data=session_data, ..., search_filter=property_filter,
            )]
        semantic_task = asyncio.create_task(_collect_semantic_results())

    return MemMachine.SearchResponse(
        episodic_memory=await episodic_task if episodic_task else None,
        semantic_memory=await semantic_task if semantic_task else None,
    )

要点:

  • 情景与语义查询并发:通过两个 asyncio.create_task
  • agent_mode=True 时启用 RetrievalAgent(第 5 章详述);
  • search_filter 是 DSL 字符串parse_filter 把它解析成 FilterExpr(见 6.4 节)。

4.2 Episodic 查询的双路径

源码:packages/server/src/memmachine_server/main/memmachine.py:711-767

async with episodic_memory_manager.open_or_create_episodic_memory(...) as episodic_session:
    if retrieval_agent is None or episodic_session.long_term_memory is None:
        # 普通路径:直接打底层 query_memory
        response = await episodic_session.query_memory(
            query=query, limit=limit, expand_context=expand_context,
            score_threshold=score_threshold, property_filter=search_filter,
        )
    else:
        # Agent 路径:由 RetrievalAgent 编排 LLM + 多个子代理
        response = await self._query_episodic_with_retrieval_agent(
            episodic_session=episodic_session, retrieval_agent=retrieval_agent, ...,
        )
return response

4.3 普通路径:EpisodicMemory.query_memory()

源码:packages/server/src/memmachine_server/episodic_memory/episodic_memory.py:352-476。简化后的核心:

async def query_memory(self, query, *, limit=None, expand_context=0,
                      score_threshold=-inf, property_filter=None,
                      mode=QueryMode.BOTH) -> QueryResponse | None:
    search_limit = limit if limit is not None else 20

    # 1) STM + LTM 并发查询
    session_result, scored_long_episodes = await asyncio.gather(
        self._query_short_term_memory(query=query, limit=search_limit, ...),
        self._query_long_term_memory(query=query, limit=search_limit,
                                     expand_context=expand_context, ...),
    )
    short_episode, short_summary = session_result

    # 2) 去重:STM 优先(命中的 LTM 如果 uid 已经在 STM,则跳过)
    episode_uid_set = {e.uid for e in short_episode}
    unique_scored_long_episodes = []
    for score, episode in scored_long_episodes:
        if episode.uid not in episode_uid_set:
            episode_uid_set.add(episode.uid)
            unique_scored_long_episodes.append((score, episode))

    # 3) 组装 QueryResponse(短期 + 长期 + 摘要)
    return EpisodicMemory.QueryResponse(
        short_term_memory=ShortTermMemoryResponse(
            episodes=[...], episode_summary=[short_summary],
        ),
        long_term_memory=LongTermMemoryResponse(
            episodes=[EpisodeResponse(score=score, **e.model_dump())
                      for score, e in unique_scored_long_episodes],
        ),
    )

注意几个细节:

  • STM 优先:同一条 Episode 如果 STM 已经包含,就不会再从 LTM 返回。这避免了"近期会话里说过的话被重复列出"。
  • QueryMode 支持三种模式BOTH / LONG_TERM_ONLY / SHORT_TERM_ONLY,方便调用方做"只看长期 / 只看短期"。
  • STM 不仅返回 episode,还返回 summary:当用户问"刚刚我们聊了啥",summary 就是天然的答案。

4.4 EventMemory 的查询流水线

如果情景记忆走 EventMemory,调用链最终落到 EventMemory.query(),源码:packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py:344-524。它分四个阶段:

query: str

阶段 1: search_embed

阶段 2: VectorStore.query

抽取 seed_segment_uuids

阶段 3: get_segment_contexts

阶段 4: 评分 + 排序

ScoredSegmentContext 列表

代码骨架:

async def _query(self, query, *, vector_search_limit, expand_context,
                 property_filter, format_options) -> QueryResult:
    # 阶段 1: 嵌入查询
    query_embedding = (await self._embedder.search_embed([query]))[0]

    # 阶段 2: 在 derivative 集合上做向量召回
    [query_result] = await self._vector_store_collection.query(
        query_vectors=[query_embedding], limit=vector_search_limit,
        property_filter=collection_filter, return_vector=False, return_properties=True,
    )

    # 多个 derivative 可能指向同一个 segment,按 segment 去重
    seed_embedding_scores: dict[UUID, float] = {}
    for match in query_result.matches:
        segment_uuid = UUID(str(match.record.properties[_SEGMENT_UUID_FIELD_NAME]))
        if segment_uuid not in seed_embedding_scores:
            seed_embedding_scores[segment_uuid] = match.score
    seed_segment_uuids = list(seed_embedding_scores)

    # 阶段 3: 以 seed 为中心向前/向后扩展上下文
    max_backward_segments = expand_context // 3
    max_forward_segments = expand_context - max_backward_segments
    segment_contexts_by_seed = await self._segment_store_partition.get_segment_contexts(
        seed_segment_uuids=seed_segment_uuids,
        max_backward_segments=max_backward_segments,
        max_forward_segments=max_forward_segments,
        property_filter=property_filter,
    )

    # 阶段 4: 评分。无 reranker 用 embedding 分数,有 reranker 用语义重排
    if self._reranker is None:
        scores = [seed_embedding_scores[u] for u in kept_seed_segment_uuids]
    else:
        scores = await self._score_segment_contexts(query, segment_contexts, format_options)

    # 排序后返回 ScoredSegmentContext 列表
    return QueryResult(scored_segment_contexts=[...])

两个值得展开的点:

  1. expand_context 1:2 倾斜:源码里 max_backward_segments = expand_context // 3,剩下的给前向。原因是大多数对话场景下,后文比前文更能解释当前段(用户提到的"它"通常指代后面才出现的实体)。
  2. property_filter 的字段名转换:用户写 filter_dict={"user_id": "alice"},MemMachine 内部会把 user_id 翻译为 _user_idm.user_id(依赖于是否为系统字段),具体在 _to_vector_record_property 方法里。这套规则使得"用户元数据"和"系统元数据"在同一个向量库 record 上共存而不冲突。

4.5 Semantic 查询:向量召回 + 跨 set 过滤

源码:packages/server/src/memmachine_server/semantic_memory/semantic_memory.py:169-200

async def _set_id_search(self, *, set_id, embedding, min_distance=None, limit=30,
                         load_citations=False, filter_expr=None) -> AsyncIterator[SemanticFeature]:
    filter_expr = _with_has_set_ids(set_ids=[set_id], filter_expr=filter_expr)
    async for feature in self._semantic_storage.get_feature_set(
        filter_expr=filter_expr, page_size=limit,
        vector_search_opts=SemanticStorage.VectorSearchOpts(
            query_embedding=np.array(embedding), min_distance=min_distance,
        ),
        load_citations=load_citations,
    ):
        yield feature

特点:

  • 语义查询走的是pgvector(默认)或 Neo4j 的向量索引;
  • 一次查询可以跨多个 set_id(用 merge_async_iterators 合并多个 set 的结果);
  • load_citations=True 时,返回的特征会带上原始 Episode UID,便于上层做"引用展示"。

4.6 读取路径全景图

Reranker Embedder SemanticSessionManager RetrievalAgent (可选) LongTermMemory / EventMemory ShortTermMemory EpisodicMemory EpisodicMemoryManager MemMachine Client Reranker Embedder SemanticSessionManager RetrievalAgent (可选) LongTermMemory / EventMemory ShortTermMemory EpisodicMemory EpisodicMemoryManager MemMachine Client par [STM/LTM 并发] alt [agent_mode=False] [agent_mode=True] par [情景检索] [语义检索] query_search(query, agent_mode=?) open_or_create() query_memory(query) get_short_term_memory_context() search_scored() search_embed() rerank() QueryResponse(STM+LTM) do_query() list[Episode] QueryResponse search(query) search_embed() list[SemanticFeature] SearchResponse(episodic + semantic)

第 5 章 RetrievalAgent:Agent Lightning 思路下的智能检索

RetrievalAgent 是 MemMachine 在 v0.3.x 里引入的一个新功能:把"检索"本身视为一个 Agent,而不是一次确定性的向量查询。它的灵感来自 Agent Lightning: Train ANY AI Agents with Reinforcement Learning(arXiv: 2508.03680,见仓库 README.md 的 BibTeX)。

5.1 角色卡:三个 child agent 加一个路由器

源码:packages/server/src/memmachine_server/retrieval_agent/service_locator.py:17-57

def create_retrieval_agent(*, model, reranker, agent_name="ToolSelectAgent"):
    memory_agent = MemMachineAgent(...)
    if agent_name == memory_agent.agent_name:
        return memory_agent

    coq_agent = ChainOfQueryAgent(...)
    split_agent = SplitQueryAgent(...)
    if agent_name == coq_agent.agent_name:
        return coq_agent
    if agent_name == split_agent.agent_name:
        return split_agent

    return ToolSelectAgent(AgentToolBaseParam(
        model=model,
        children_tools=[split_agent, coq_agent, memory_agent],
        extra_params={"default_tool_name": coq_agent.agent_name},
        reranker=reranker,
    ))

四种 agent 的分工:

Agent 思路 accuracy token_cost time_cost 适用查询
MemMachineAgent 直接调 EpisodicMemory.query_memory(LONG_TERM_ONLY) 0 0 0 单跳、直接的事实型查询
SplitQueryAgent 把"包含多个实体的单跳查询"拆成 N 个独立查询并行执行 - “比较 A 和 B”、“列出 X、Y、Z”
ChainOfQueryAgent 迭代式:查 → 判断证据是否充分 → 重写 → 再查 10 9 10 多跳推理、必须 hop 才能回答
ToolSelectAgent LLM 路由器,决定具体调哪一个 child agent 8 6 8 默认入口;不知道查询类型时

5.2 ToolSelectAgent:用 LLM 做路由

核心提示词(节选自 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py:21-81):

You are a tool router. ...
GOAL
- Choose exactly one of: {coq}, {split_query}, {memory_retrieval}
- Output NONE only when the query type cannot be determined...

MECHANISM
1) Validate input
2) Classify the query type:
   A) MULTI-HOP            -> {coq}
   B) SINGLE-HOP MULTI ENTITY  -> {split_query}
   C) SINGLE-HOP / DIRECT  -> {memory_retrieval}

它在调用链上的角色:

多跳

多实体

单跳

无法判定

原始 query

ToolSelectAgent.LLM

ChainOfQueryAgent

SplitQueryAgent

MemMachineAgent

default_tool = COQ

do_query 实现见 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py:179-197,会把所选 agent 的名字写回 perf_metrics["selected_tool"],方便观测。

5.3 ChainOfQueryAgent:迭代式查询

ChainOfQuery 的关键创新是用 单个 LLM 调用 同时完成"充分性判断 + 查询重写"。看核心提示词(packages/server/src/memmachine_server/retrieval_agent/agents/coq_agent.py:25-125)的输出契约:

{
  "is_sufficient": true | false,
  "evidence_indices": [0, 2, 5],
  "new_query": "...",
  "confidence_score": 0.0 ~ 1.0
}

调用流程:

attempt = 0
while attempt < max_attempts:
    docs = memmachine_agent.do_query(current_query)
    rsp  = LLM(COMBINED_SUFFICIENCY_AND_REWRITE_PROMPT,
               original_query, used_queries, retrieved_docs=docs)
    if rsp.is_sufficient and rsp.confidence_score >= 0.8:
        break
    current_query = rsp.new_query
    attempt += 1
return aggregated_docs

实践中你会看到这种回放:

[CoQ #0] query="Marie Curie 的丈夫从事什么领域?"  -> 不充分
[CoQ #1] query="Pierre Curie 的研究领域"         -> 充分,置信 0.92

这正是 Agent Lightning 论文中的"小模型迭代式 RAG"思路。

5.4 SplitQueryAgent:单跳多实体并发

ToolSelectAgent 判定 query 是"单跳但包含多个独立实体"(如 Compare GDP of Japan and South Korea)时,会路由到 SplitQueryAgent。它会用 LLM 把 query 拆成多个子查询并并发执行,再用 reranker 合并去重。源码在 packages/server/src/memmachine_server/retrieval_agent/agents/split_query_agent.py

5.5 MemMachineAgent:直查 LTM

最便宜的 agent,定义见 packages/server/src/memmachine_server/retrieval_agent/agents/memmachine_retriever.py:47-91。它直接调用 EpisodicMemory.query_memory(mode=LONG_TERM_ONLY),不做任何重写、不调任何额外 LLM。

async def do_query(self, policy, query):
    query_response = await query.memory.query_memory(
        query=query.query, limit=query.limit,
        expand_context=query.expand_context,
        score_threshold=query.score_threshold,
        property_filter=query.property_filter,
        mode=EpisodicMemory.QueryMode.LONG_TERM_ONLY,
    )
    return [Episode(...) for episode in query_response.long_term_memory.episodes], perf

5.6 Reranker 在 RetrievalAgent 中的作用

AgentToolBase._do_rerank 是所有 child agent 在返回之前的最后一道工序(packages/server/src/memmachine_server/retrieval_agent/common/agent_api.py:97-137):

async def _do_rerank(self, query, episodes):
    if query.limit <= 0:
        return sorted(episodes, key=lambda x: x.created_at)
    if len(episodes) <= query.limit or self._reranker is None:
        return sorted(episodes[:query.limit], key=lambda x: x.created_at)

    contents = [episodes_to_string([e]) for e in episodes]
    scores  = await self._reranker.score(query.query, contents)  # 带重试 + 限流
    result  = sorted(zip(episodes, scores), key=lambda x: x[1], reverse=True)
    return sorted([r[0] for r in result[:query.limit]], key=lambda x: x.created_at)

排序策略很务实:

  1. 先按重排分数挑 Top-K,保证质量;
  2. 再按 created_at 升序输出,让上层把"语义最相关但时间正确"的 Episode 串起来;
  3. 重排出错且是限流时,自动 await sleep(5) 后重试,最多 60 次。

可选的 reranker(在 packages/server/src/memmachine_server/common/reranker/ 下):

  • identity:不改变顺序,作为占位;
  • bm25:纯文本匹配;
  • rrf-hybrid:Reciprocal Rank Fusion 把 embedding 分与 BM25 分融合;
  • cohere:Cohere Rerank API;
  • amazon-bedrock:AWS Bedrock Rerank API。

第 6 章 存储抽象:MemMachine 如何在不同后端间自由切换

MemMachine 的"可移植性"来自三个核心抽象:EpisodeStoreVectorStoreVectorGraphStore。它们各自承担不同的职责。

6.1 三类存储抽象

EpisodicMemory
EventMemory
SemanticMemory

EpisodeStore

VectorStore

VectorGraphStore

SemanticStorage

PostgreSQL

SQLite

Qdrant

SQLite + USearch/hnswlib

SQLite-vec

Neo4j

NebulaGraph

PostgreSQL + pgvector

Neo4j

抽象 主要职责 典型实现 适用场景
EpisodeStore 永久持久化原始 Episode PostgreSQL / SQLite 关系/SQL 风格的事件存档
VectorStore 纯向量检索 + 属性过滤 Qdrant、SQLite-USearch、SQLite-vec EventMemory、可独立扩展的向量层
VectorGraphStore 向量 + 图遍历一体 Neo4j、NebulaGraph 经典长期情景记忆(DERIVED_FROM 等关系)
SemanticStorage 结构化特征 + 向量索引 PostgreSQL + pgvector、Neo4j 语义记忆(Profile)

6.2 VectorStore 抽象

源码:packages/server/src/memmachine_server/common/vector_store/vector_store.py

核心接口(伪代码):

class VectorStore:
    async def create_collection(namespace: str, name: str,
                                config: VectorStoreCollectionConfig) -> VectorStoreCollection
    async def get_collection(...)  -> VectorStoreCollection | None
    async def drop_collection(...) -> None

class VectorStoreCollection:
    async def upsert(records: list[Record]) -> None
    async def query(query_vectors, *, limit, property_filter,
                    return_vector, return_properties) -> list[QueryResult]
    async def get(uuids) -> list[Record]
    async def delete(uuids) -> None

约束:

  • 每个 Collection 在创建时固定 dimensionssimilarity_metricindexed_properties_schema。换嵌入模型要么用新 Collection,要么用别名机制。
  • 命名约束 [a-z0-9_]+、单段 ≤ 32 字节,是为了和 PostgreSQL 表名、Neo4j label、Qdrant collection 名等都兼容。
  • Record.properties 同时支持系统属性(如 _segment_uuid)与用户属性(直接进入 record)。EventMemory 利用这一点把"切片元数据"和"业务元数据"放在一张表里。

6.3 VectorGraphStore 抽象

源码:packages/server/src/memmachine_server/common/vector_graph_store/

这一抽象的特点是:节点既能向量检索,又能图遍历。Neo4j 的 vector index + Cypher 是天然实现;NebulaGraph 5.2+ 也内置了向量类型。

它的主要接口可以理解为 VectorStore 的超集:

  • add_nodes / add_edges:写节点与关系;
  • query_nodes:向量召回 + 属性过滤;
  • traverse:从命中的节点出发做图遍历(如沿 [DERIVED_FROM] 拿回 Episode)。

这就是为什么 LongTermMemory 用一个抽象解决了"召回 + 关联",不需要再调用 EpisodeStore。

6.4 FilterExpr:跨存储的过滤 DSL

源码:packages/server/src/memmachine_server/common/filter/filter_parser.py

用户写:

user_id = "alice" AND (tag IN ("flight", "hotel") OR session_id = "s001")

被解析成 FilterExpr 树:

And(
  left=Comparison(field="user_id", op="=", value="alice"),
  right=Or(
    left=In(field="tag", values=["flight", "hotel"]),
    right=Comparison(field="session_id", op="=", value="s001"),
  )
)

然后由每种存储实现各自把这棵树翻译成原生过滤:

存储 翻译目标
PostgreSQL WHERE 子句 + JSONB 操作符
Qdrant Filter 嵌套 must / should
Neo4j Cypher WHERE 子句
SQLite-vec WHERE 子句

这样调用方写一份 filter 字符串,无论后端怎么切换都能工作。

6.5 公共组件总览

packages/server/src/memmachine_server/common/ 下的组件构成了 MemMachine 的"标准件库":

组件 作用 支持的实现
embedder/ 嵌入模型抽象 OpenAI、Bedrock、Ollama、任意 OpenAI 兼容
language_model/ LLM 抽象,统一 generate_response / generate_parsed_response OpenAI、Anthropic、Bedrock、Ollama、OpenAI 兼容
reranker/ 重排序器 identity、bm25、rrf-hybrid、cohere、amazon-bedrock
metrics_factory/ Prometheus 指标 Counter / Summary / Histogram
resource_manager/ 按名字索取上述资源 + 缓存 -
session_manager/ 会话信息存取 PostgreSQL / SQLite
episode_store/ 原始 Episode 持久化 PostgreSQL / SQLite

ResourceManager 是这套组件的"中央集线器":MemMachine 主类里所有 await self._resources.get_embedder(...) 都从它出。


第 7 章 工程细节:决定生产可用的关键

7.1 实例 LRU 缓存

每个 (org_id, project_id) 都需要一个 EpisodicMemory 实例(持有 STM 内存、LTM 连接),频繁创建/销毁会拖慢响应。源码:packages/server/src/memmachine_server/episodic_memory/instance_lru_cache.pyepisodic_memory_manager.py

# packages/server/src/memmachine_server/episodic_memory/episodic_memory_manager.py:43-52
class EpisodicMemoryManagerParams(BaseModel):
    instance_cache_size: int = 100   # 同时驻留的最大实例数
    max_life_time: int = 600         # 空闲多久后回收(秒)
    ...

机制要点:

  • 引用计数:通过 release_ref 跟踪有多少协程正在访问,正在用的不淘汰;
  • 后台清理协程:每 2 秒扫一次,把空闲超时的实例 close 掉;
  • session-level 读写锁:保证同一会话同时只有一个 create 在跑。

这套机制让你可以同时服务上万会话,而 Neo4j / Qdrant 的连接池不会爆。

7.2 全异步 IO 模型

整个 MemMachine 服务端是async def

  • asyncio.gather 用来 fan-out(写入 STM/LTM/Semantic 并发、STM/LTM 并发查询);
  • asyncio.TaskGroup 用在 ingestion 时收集 episode(Python 3.12+);
  • 所有外部 IO(数据库、嵌入、LLM)都是协程友好的库;
  • rw_locks.AsyncRWLock 实现了非阻塞读写锁(一写多读)。

理解这一点之后就不会惊讶为什么 MemMachine 在单 worker 下也能撑住几千 QPS——FastAPI 默认 single-process + uvicorn 已经够了。

7.3 多 Worker 部署

packages/server/src/memmachine_server/server/app.py:100-144 给了如何起多 worker:

workers_env = os.getenv("MEMMACHINE_WORKERS")
if workers_env:
    workers = int(workers_env)
else:
    workers = 1   # 容器化环境默认 1,避免误用宿主 CPU 数

uvicorn.run("memmachine_server.server.app:app",
            host=config.server.host, port=config.server.port,
            workers=workers, access_log=True, log_level=...)

提醒:

  • 不要无脑设 os.cpu_count():容器里这个值经常是宿主机的核数,不是 cgroup 限额;
  • 多 worker 共用同一份 Postgres/Neo4j 后端:每个 worker 自己起一个 Python 进程,不共享内存中的 LRU 缓存,因此实际容量是 workers × instance_cache_size

7.4 会话删除的异步队列

源码:packages/server/src/memmachine_server/main/memmachine.py:309-359。删除流程是软删除 + 异步清理

async def delete_session(self, session_data) -> None:
    await session_manager.update_session_status(
        session_key=session_data.session_key,
        status=SessionDataManager.SessionStatus.Deleted,   # 先打软删除标
    )
    self._deletion_queue.put_nowait(session_data)          # 进入清理队列

后台 _delete_session_worker 依次:

  1. 删 EpisodeStore 里的所有 episode(分批 EPISODE_DELETE_BATCH_SIZE=1000);
  2. 调用 _cleanup_semantic_history 清掉对应的语义引用;
  3. 删 episodic memory;
  4. 删 semantic memory;
  5. 最后把 session 行从 session 管理器移除。

为什么要异步?

  • 大会话动辄 10 万 episode,同步删会让 API 超时;
  • 删除失败时实例仍然是"软删除"状态,可以重试;
  • 服务重启时通过 get_sessions_by_status(Deleted) 还能把未完成的清理恢复出来(见 start() 内的 key_to_session 钩子)。

7.5 配置合并:分阶段使用不同模型

源码:packages/server/src/memmachine_server/main/memmachine.py:420-460

_with_default_episodic_memory_conf 把"全局默认"和"用户 per-session 配置"做深度合并。这使得:

  • 全局可以指定 default_long_term_memory_embedder = openai_largedefault_long_term_memory_reranker = cohere
  • 单会话可以覆盖:{"long_term_memory": {"embedder": "bedrock_titan"}}
  • 三类 LLM(agent / answer / judge)可以各自使用不同模型,节省成本(便宜模型做路由 + 重写,贵模型只在最后一步)。

7.6 可观测性

MemMachine 内嵌 Prometheus 指标,几个最有用的指标:

指标名 含义
Ingestion_latency EpisodicMemory 写入延迟(毫秒,Summary)
query_latency EpisodicMemory 查询延迟
Ingestion_count / query_count 调用次数
event_memory_encode_events_phase_seconds{phase=...} EventMemory 五阶段时序
event_memory_query_phase_seconds{phase=...} EventMemory 查询四阶段时序

直接挂 Grafana 即可。配合 event_memory_* 系列指标,你可以一眼看出"是嵌入慢还是 segment_store 慢"。


第 8 章 端到端实战:把所有知识拼成一张图

8.1 写入:memory.add("我喜欢靠窗的座位") 究竟做了多少事?

client = MemMachineClient(base_url="http://localhost:8080")
project = client.get_or_create_project(org_id="acme", project_id="travel")
memory = project.memory(user_id="alice", agent_id="travel_agent", session_id="s001")

memory.add("我喜欢靠窗的座位,最好是商务舱。", metadata={"category": "travel"})

服务端发生了:

  1. REST 解析AddMemoriesSpecEpisodeEntry
  2. 路由到 MemMachine 主类add_episodes(session_data, episode_entries)
  3. EpisodeStore 写入 → 一行 episodes 落库,返回新分配的 uid
  4. 并发开启两个任务:
    • EpisodicMemory.add_memory_episodes
      • category=travel 拷贝到 filterable_metadata
      • 并发跑 STM 与 LTM:
        • STM:append 到 deque;若总长度超阈值,弹出旧 episode + 异步走滚动摘要;
        • LTM/EventMemory:把消息切成 segment / derivative,嵌入后写 Neo4j 或 VectorStore;
    • SemanticSessionManager.add_message:仅把 episode_id 写入"待抽取"表;
  5. API 立即返回 episode_ids
  6. 若干秒后,后台的 _background_ingestion_task 扫到该 set “脏了”,触发:
    • 拉取 5 条未抽取 episode;
    • 对每个语义 category(如 profile_promptcrm_prompt),调 LLM 产出 SemanticCommand:
      [{"command":"add", "tag":"Travel Preferences",
        "feature":"seat_preference", "value":"User prefers window seat in business class"}]
      
    • apply commands → 嵌入 → 写 pgvector,并把这条 episode 标记为 is_ingested=True
    • 若特征数 ≥ 20,触发巩固 LLM 调用,合并/删除冗余特征。

8.2 读取:memory.search("用户的座位偏好?")

results = memory.search("用户的座位偏好?")
print(results.content.episodic_memory.long_term_memory.episodes[0].content)
# → "我喜欢靠窗的座位,最好是商务舱。"

print(results.content.semantic_memory[0])
# → SemanticFeature(tag="Travel Preferences", feature_name="seat_preference",
#                   value="User prefers window seat in business class", ...)

服务端:

  1. MemMachine.query_search(session_data, query=...) 并发开两个 task;
  2. Episodic task
    • 打开 EpisodicMemory 实例(从 LRU 缓存);
    • 普通模式:并发查 STM + LTM;STM 优先去重;按嵌入分或 reranker 分数排序;
    • Agent 模式:ToolSelectAgent 用 LLM 判定"这是单跳直接查询" → 路由到 MemMachineAgent → 直查 LTM;
  3. Semantic task
    • 嵌入 query;
    • 在 pgvector 上做向量召回(按 set_id 过滤、min_distance 阈值);
    • 返回 SemanticFeature 列表(含 citations 回链到原 Episode);
  4. 合并成 SearchResponse 返回。

8.3 跟踪建议

当你想看清楚一条请求的实际行为:

# 1) 提高 server 日志级别
export MEMMACHINE_LOG_LEVEL=DEBUG

# 2) 实时看 EventMemory 五阶段时序
curl http://localhost:8080/metrics | grep event_memory_

# 3) 在 client 端开 timing
import time; t=time.time(); memory.search("..."); print(time.time()-t)

EventMemory 的 encode_events timing: segmentation=... derivation=... ... 日志非常直观,定位瓶颈一目了然。


第 9 章 把 MemMachine 接到你的 Agent

这里只给出"原理视角的决策表"。具体的 SDK / REST / MCP 接入示例可在 MemMachine 仓库的 README.mdexamples/ 目录中找到。

接入方式 推荐场景 关键点
Python SDK 内嵌 你的 Agent 是 Python 写的,希望最低延迟 直接调用 memmachine-client;与 LangChain / LangGraph / CrewAI / LlamaIndex 都有现成 wrapper
REST API 跨语言、跨服务部署(Go / Node / Rust 等);多个 Agent 共享一个 Memory 服务 /api/v2/memories/*;注意 agent_mode 参数选 RetrievalAgent 与否
MCP Server Claude Desktop、Cursor 等 MCP 客户端,或想把记忆作为标准 MCP 工具暴露 启动 memmachine-mcp-stdiomcp-http,工具签名即记忆 CRUD
Function Calling Tool 在 OpenAI Function Calling、Anthropic Tool Use 框架下显式调用 add / search 包装成 tool spec;让 LLM 自己决定何时调用
隐式注入(Sidecar/Hook) 想让旧 Agent "零侵入"获得记忆 在 LLM 请求前 hook,把 memory.search(query) 的结果拼到 system prompt

原理层面上要选对方式,只需要回答两个问题:

  1. 你希望 LLM 主动决定何时记忆 / 何时回忆吗?
    • 是 → 用 Function Calling Tool 或 MCP;
    • 否 → 用 SDK / REST 在应用代码里"埋点"。
  2. 你能容忍多少端到端延迟?
    • <100ms 的本地 Agent → SDK + 嵌入式 SQLite 后端;
    • 多租户云服务 → REST + Qdrant/Neo4j;
    • 离线分析 → 全部走 REST,启用 RetrievalAgent + 大模型重排。

附录 A:源码路径速查表

你想理解 看这个文件
HTTP 入口 / FastAPI 装配 packages/server/src/memmachine_server/server/app.py
REST 路由 (v2) packages/server/src/memmachine_server/server/api_v2/router.py
MemMachine 主类 packages/server/src/memmachine_server/main/memmachine.py
单会话情景记忆 packages/server/src/memmachine_server/episodic_memory/episodic_memory.py
会话级 LRU packages/server/src/memmachine_server/episodic_memory/episodic_memory_manager.py
短期记忆 + 滚动摘要 packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py
长期记忆(图存储) packages/server/src/memmachine_server/episodic_memory/long_term_memory/long_term_memory.py
声明式记忆底座 packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py
EventMemory 编排 packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py
Event/Segment/Derivative 数据模型 packages/server/src/memmachine_server/episodic_memory/event_memory/data_types.py
文本切分器 packages/server/src/memmachine_server/episodic_memory/event_memory/segmenter/text_segmenter.py
派生器 packages/server/src/memmachine_server/episodic_memory/event_memory/deriver/text_deriver.py
语义记忆主服务 packages/server/src/memmachine_server/semantic_memory/semantic_memory.py
语义抽取循环 packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py
语义 LLM 调用 packages/server/src/memmachine_server/semantic_memory/semantic_llm.py
Profile 提示词框架 packages/server/src/memmachine_server/semantic_memory/util/semantic_prompt_template.py
预置语义 Category packages/server/src/memmachine_server/server/prompt/default_prompts.py
检索代理工厂 packages/server/src/memmachine_server/retrieval_agent/service_locator.py
Chain-of-Query packages/server/src/memmachine_server/retrieval_agent/agents/coq_agent.py
Tool 路由 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py
直接检索 Agent packages/server/src/memmachine_server/retrieval_agent/agents/memmachine_retriever.py
检索代理基类 packages/server/src/memmachine_server/retrieval_agent/common/agent_api.py
向量库抽象 packages/server/src/memmachine_server/common/vector_store/vector_store.py
过滤 DSL packages/server/src/memmachine_server/common/filter/filter_parser.py
语义聚类 packages/server/src/memmachine_server/semantic_memory/cluster_manager.py

附录 B:术语对照

中文 英文 在 MemMachine 中的含义
事件 Episode 写入 MemMachine 的最小单位,对应一条消息或一段文本
工作记忆 Working Memory 单次请求的临时上下文,不持久化
短期记忆 Short-Term Memory (STM) 当前会话的滚动摘要 + 最近 episode
长期记忆 Long-Term Memory (LTM) 经过派生 / 嵌入后落到图存储的情景记忆
事件记忆 Event Memory v0.3.x 引入的新一代 LTM,基于 VectorStore + SegmentStore
语义记忆 Semantic Memory LLM 抽取的结构化用户画像 (tag/feature/value)
派生 Derivative 由 Segment 派生出的"可向量化文本"
片段 Segment 由 Segmenter 把 Event 切分而成的最小检索单元
集合 Set 语义记忆里的逻辑分组,通常对应 (org_id, project_id) 或某个 user
巩固 Consolidation 语义记忆里的"合并/删除/重写"过程
检索代理 RetrievalAgent 把检索看作 Agent 的智能编排模块

结语

如果只用一句话总结 MemMachine 的设计:

它把"AI 长期记忆"建模成多层的、可分离的、可观测的存储抽象,再用 LLM 把对话蒸馏成结构化事实,最后用 Agent 化的检索代理把找回来的内容做精准编排。

它不是简单的"向量库套一层 API",而是把人类认知科学(工作 / 情景 / 语义记忆)与现代 LLM 工程实践(异步抽取、Agent Lightning、可插拔后端)有机融合的工程作品。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐