本系列共 2 部分,从抽象接口到向量存储,完整拆解 LangChain Embeddings 的设计与实现。

  • 第 1 部分:Embeddings 接口——向量化的抽象(本文)
  • 第 2 部分:VectorStore——Embeddings 的消费者与 RAG 完整链路

LangChain Embeddings 深度解析(一):Embeddings 接口——向量化的抽象

一个能跑的例子

from langchain_core.embeddings import Embeddings, DeterministicFakeEmbedding

# ---- 用确定性假模型演示接口 ----
embed = DeterministicFakeEmbedding(size=5)

# 单条查询
vec = embed.embed_query("LangChain 是什么?")
print(f"维度: {len(vec)}")           # 维度: 5
print(f"向量: {[round(v, 4) for v in vec]}")  # 每次运行相同

# 批量文档
docs = ["苹果是水果", "Python 是语言", "LangChain 是框架"]
vecs = embed.embed_documents(docs)
print(f"文档数: {len(vecs)}")        # 文档数: 3
print(f"每条维度: {len(vecs[0])}")   # 每条维度: 5

# 相同文本 → 相同向量(确定性)
vec_again = embed.embed_query("LangChain 是什么?")
print(f"确定性: {vec == vec_again}") # 确定性: True

Embeddings 是 LangChain 中最小的核心抽象之一——整个模块只有 3 个文件、约 240 行代码。但它是 RAG 管线的关键枢纽:把自然语言文本映射到向量空间,让"语义相似度"变成可计算的数学距离。


Embeddings ABC——四个方法,两个抽象

打开 embeddings/embeddings.py:8

# embeddings/embeddings.py:8-78
class Embeddings(ABC):
    """Interface for embedding models.

    Text embedding models are used to map text to a vector
    (a point in n-dimensional space).
    """

    @abstractmethod
    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        """Embed search docs."""

    @abstractmethod
    def embed_query(self, text: str) -> list[float]:
        """Embed query text."""

    async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
        return await run_in_executor(None, self.embed_documents, texts)

    async def aembed_query(self, text: str) -> list[float]:
        return await run_in_executor(None, self.embed_query, text)

整个接口只有 4 个方法

方法 签名 是否抽象 说明
embed_documents (texts: list[str]) → list[list[float]] 抽象 批量嵌入文档
embed_query (text: str) → list[float] 抽象 单条查询嵌入
aembed_documents (texts: list[str]) → list[list[float]] 非抽象 异步版,默认用 run_in_executor
aembed_query (text: str) → list[float] 非抽象 异步版,默认用 run_in_executor

为什么分 embed_documentsembed_query

表面上看两者都是"文本 → 向量",为什么要分开?

原因 1:某些模型对查询和文档使用不同的编码策略
─────────────────────────────────────────────
例如 Google 的 text-embedding-004 在 embed 时需要指定 task_type:
  - RETRIEVAL_DOCUMENT  → 用于被检索的文档
  - RETRIEVAL_QUERY     → 用于查询

原因 2:批量 vs 单条的性能优化
─────────────────────────────
  embed_documents → 接收 list[str],可以做 batching/GPU 并行
  embed_query    → 接收单个 str,查询通常只有一条

原因 3:绝大多数实现中两者结果相同
─────────────────────────────────
  OpenAIEmbeddings.embed_query 内部就是调用
  embed_documents([text])[0]

异步默认实现——run_in_executor 模式

# embeddings/embeddings.py:58-67
async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
    return await run_in_executor(None, self.embed_documents, texts)

这是 LangChain 中非常常见的模式(Messages、Runnables、Loaders 系列都见过):

同步方法已实现? ─── Yes ──→ 异步版自动获得
     │                        (run_in_executor 包装)
     No
     ↓
子类需要同时实现 sync + async

run_in_executor(None, ...) 表示使用默认线程池执行器,把同步阻塞调用放到线程池中运行。合约集成(如 langchain-openai)可以覆盖异步方法提供原生 async 实现。


Embeddings 不继承 Pydantic 也不继承 Runnable

注意一个重要细节——Embeddings 只继承 ABC

# 对比其他核心抽象
class BaseChatModel(BaseLanguageModel):  # → Runnable 子类
class BaseRetriever(RunnableSerializable): ...
class BaseLoader: ...                     # 无基类(除 object)
class Embeddings(ABC): ...               # 纯 ABC,最轻量

这意味着:

  1. 不是 Runnable——不能直接用 | 管道组合,也没有 .invoke()
  2. 不是 Pydantic Model——没有序列化、验证、schema 生成
  3. 最大灵活性——具体实现可以自由选择继承 BaseModel 或不继承

实际中,大多数合约实现(如 OpenAIEmbeddings)都会同时继承 EmbeddingsBaseModel

# 典型的 partner 实现模式
class OpenAIEmbeddings(BaseModel, Embeddings):
    model: str = "text-embedding-ada-002"
    ...

FakeEmbeddings——测试用的随机向量

打开 embeddings/fake.py:16

# embeddings/fake.py:16-67
class FakeEmbeddings(Embeddings, BaseModel):
    """Fake embedding model for unit testing purposes."""

    size: int
    """The size of the embedding vector."""

    def _get_embedding(self) -> list[float]:
        return list(np.random.default_rng().normal(size=self.size))

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        return [self._get_embedding() for _ in texts]

    def embed_query(self, text: str) -> list[float]:
        return self._get_embedding()

关键特征:

  • 随机生成:每次调用 np.random.default_rng().normal() 产生不同向量
  • 忽略文本内容texts 参数完全不使用,只决定返回多少个向量
  • 用途:单元测试中占位,验证管道连通性

DeterministicFakeEmbedding——确定性的假模型

打开 embeddings/fake.py:70

# embeddings/fake.py:70-129
class DeterministicFakeEmbedding(Embeddings, BaseModel):
    """Deterministic fake embedding model for unit testing purposes."""

    size: int

    def _get_embedding(self, seed: int) -> list[float]:
        # set the seed for the random generator
        rng = np.random.default_rng(seed)          # ← 固定种子
        return list(rng.normal(size=self.size))

    @staticmethod
    def _get_seed(text: str) -> int:
        """Get a seed for the random generator, using the hash of the text."""
        return int(hashlib.sha256(text.encode("utf-8")).hexdigest(), 16) % 10**8

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        return [self._get_embedding(seed=self._get_seed(_)) for _ in texts]

    def embed_query(self, text: str) -> list[float]:
        return self._get_embedding(seed=self._get_seed(text))

核心区别在于种子生成策略

文本 "hello"
   │
   ↓ SHA-256
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
   │
   ↓ int(..., 16) % 10**8
42596730                    ← 确定性种子
   │
   ↓ np.random.default_rng(42596730).normal(size=5)
[-0.3021, 1.4528, ...]     ← 每次运行结果相同

两种假模型对比

特性 FakeEmbeddings DeterministicFakeEmbedding
文本敏感 否(忽略输入) 是(SHA-256 → 种子)
确定性 否(每次随机) 是(相同文本→相同向量)
适用场景 管道连通性测试 需要可复现结果的测试
相似度检索 无意义 有限意义(同文本→同向量)

模块导出——延迟导入模式

打开 embeddings/__init__.py

# embeddings/__init__.py:1-31
from typing import TYPE_CHECKING
from langchain_core._import_utils import import_attr

if TYPE_CHECKING:
    from langchain_core.embeddings.embeddings import Embeddings
    from langchain_core.embeddings.fake import (
        DeterministicFakeEmbedding,
        FakeEmbeddings,
    )

__all__ = ("DeterministicFakeEmbedding", "Embeddings", "FakeEmbeddings")

_dynamic_imports = {
    "Embeddings": "embeddings",
    "DeterministicFakeEmbedding": "fake",
    "FakeEmbeddings": "fake",
}

def __getattr__(attr_name: str) -> object:
    module_name = _dynamic_imports.get(attr_name)
    result = import_attr(attr_name, module_name, __spec__.parent)
    globals()[attr_name] = result
    return result

这是 LangChain Core 的标准模式——利用 __getattr__ 实现延迟导入:

import langchain_core.embeddings
         │
         ↓ 此时只执行 __init__.py 的模块级代码
         ↓ Embeddings / FakeEmbeddings 类还没加载

langchain_core.embeddings.Embeddings
         │
         ↓ 触发 __getattr__("Embeddings")
         ↓ import_attr 从 "embeddings" 子模块动态导入
         ↓ 缓存到 globals() 避免重复导入

自己实现一个 Embeddings

理解了接口后,实现一个简单的 Embeddings 非常直接:

from langchain_core.embeddings import Embeddings

class BagOfWordsEmbeddings(Embeddings):
    """极简的词袋模型嵌入——仅用于教学。"""

    def __init__(self, vocab: list[str]):
        self.vocab = vocab

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        return [self._encode(t) for t in texts]

    def embed_query(self, text: str) -> list[float]:
        return self._encode(text)

    def _encode(self, text: str) -> list[float]:
        words = text.lower().split()
        return [float(words.count(w)) for w in self.vocab]

# 测试
bow = BagOfWordsEmbeddings(vocab=["python", "java", "is", "great"])
v1 = bow.embed_query("Python is great")
v2 = bow.embed_query("Java is great")
print(v1)  # [1.0, 0.0, 1.0, 1.0]
print(v2)  # [0.0, 1.0, 1.0, 1.0]

只需实现 embed_documentsembed_query 两个方法。异步版本自动获得。


Embeddings 在 LangChain 生态中的位置

                    ┌──────────────────────────────────┐
                    │         Partner Integrations       │
                    │  OpenAIEmbeddings                  │
                    │  HuggingFaceEmbeddings             │
                    │  GoogleGenerativeAIEmbeddings      │
                    │  OllamaEmbeddings                  │
                    │  ...                               │
                    └──────────┬───────────────────────┘
                               │ 实现
                               ↓
                    ┌──────────────────────┐
                    │   Embeddings (ABC)    │ ← embeddings/embeddings.py
                    │                      │
                    │  embed_documents()   │    文本 → 向量(批量)
                    │  embed_query()       │    文本 → 向量(单条)
                    │  aembed_documents()  │    异步批量
                    │  aembed_query()      │    异步单条
                    └──────────┬───────────┘
                               │ 被注入
                    ┌──────────▼───────────┐
                    │  VectorStore (ABC)    │ ← vectorstores/base.py
                    │                      │
                    │  from_texts(embed)   │    构造时注入 Embeddings
                    │  add_texts()         │    内部调 embed_documents
                    │  similarity_search() │    内部调 embed_query
                    │  as_retriever()      │    → VectorStoreRetriever
                    └──────────┬───────────┘
                               │ 适配
                    ┌──────────▼───────────┐
                    │ VectorStoreRetriever  │ ← BaseRetriever 子类
                    │                      │    → 可进入 LCEL 管道
                    │  invoke(query)       │    query → Documents
                    └──────────────────────┘

小结

要点 内容
文件 embeddings/embeddings.py(79 行)
核心类 Embeddings(ABC)
抽象方法 embed_documents + embed_query
异步 默认 run_in_executor,可覆盖
继承 纯 ABC,不是 Runnable、不是 Pydantic
测试替身 FakeEmbeddings(随机)+ DeterministicFakeEmbedding(确定性)

下一篇问题:Embeddings 把文本变成了向量,但向量需要存储、索引、检索。VectorStore 是如何消费 Embeddings 的?InMemoryVectorStore 用什么算法计算相似度?as_retriever() 又是如何把 VectorStore 接入 LCEL 管道的?


LangChain Embeddings 深度解析(二):VectorStore——Embeddings 的消费者与 RAG 完整链路

一个能跑的例子

from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.vectorstores import InMemoryVectorStore

# ---- 构建向量存储 ----
embed = DeterministicFakeEmbedding(size=50)
store = InMemoryVectorStore(embedding=embed)

# 添加文档
docs = [
    Document(page_content="Python 是一门解释型编程语言", metadata={"topic": "python"}),
    Document(page_content="Java 是一门编译型编程语言", metadata={"topic": "java"}),
    Document(page_content="苹果是一种常见的水果", metadata={"topic": "fruit"}),
    Document(page_content="香蕉富含钾元素", metadata={"topic": "fruit"}),
]
ids = store.add_documents(docs)
print(f"添加了 {len(ids)} 条文档")  # 添加了 4 条文档

# ---- 相似度搜索 ----
results = store.similarity_search("编程语言", k=2)
for doc in results:
    print(f"  [{doc.metadata['topic']}] {doc.page_content}")

# ---- 转为 Retriever(接入 LCEL 管道) ----
retriever = store.as_retriever(search_kwargs={"k": 2})
docs = retriever.invoke("水果")
for doc in docs:
    print(f"  [{doc.metadata['topic']}] {doc.page_content}")

VectorStore 是 Embeddings 的直接消费者——它在写入时调用 embed_documents() 将文本转为向量,在查询时调用 embed_query() 将查询转为向量,然后通过相似度计算返回最相关的文档。


VectorStore ABC——接口全景

打开 vectorstores/base.py:43

# vectorstores/base.py:43-44
class VectorStore(ABC):
    """Interface for vector store."""

VectorStore 有 1111 行代码,但核心结构清晰。按功能分组:

写入方法

# vectorstores/base.py:46-98  ← add_texts
def add_texts(self, texts, metadatas=None, *, ids=None, **kwargs) -> list[str]:
    """Run more texts through the embeddings and add to the VectorStore."""
    # 如果子类实现了 add_documents,则转发
    if type(self).add_documents != VectorStore.add_documents:
        docs = [Document(id=id_, page_content=text, metadata=metadata_)
                for text, metadata_, id_ in zip(texts, metadatas_, ids_)]
        return self.add_documents(docs, **kwargs)
    raise NotImplementedError(...)

# vectorstores/base.py:234-263  ← add_documents
def add_documents(self, documents, **kwargs) -> list[str]:
    """Add or update documents in the VectorStore."""
    # 如果子类实现了 add_texts,则转发
    if type(self).add_texts != VectorStore.add_texts:
        texts = [doc.page_content for doc in documents]
        metadatas = [doc.metadata for doc in documents]
        return self.add_texts(texts, metadatas, **kwargs)
    raise NotImplementedError(...)

这是一个双向委托设计——子类只需实现 add_textsadd_documents 其中之一:

子类实现了 add_texts?
     │
    Yes → add_documents 自动转发到 add_texts
     │
    No → 子类实现了 add_documents?
          │
         Yes → add_texts 自动转发到 add_documents
          │
         No → 两者都抛 NotImplementedError

Embeddings 属性

# vectorstores/base.py:99-106
@property
def embeddings(self) -> Embeddings | None:
    """Access the query embedding object if available."""
    logger.debug(
        "The embeddings property has not been implemented for %s",
        self.__class__.__name__,
    )
    return None

基类默认返回 None,子类覆盖返回实际的 Embeddings 实例。这个属性被 VectorStoreRetriever 用于 LangSmith 追踪。

搜索方法

VectorStore 定义了三种搜索策略:

# vectorstores/base.py:293-324  ← search 统一入口
def search(self, query: str, search_type: str, **kwargs) -> list[Document]:
    if search_type == "similarity":
        return self.similarity_search(query, **kwargs)
    if search_type == "similarity_score_threshold":
        docs_and_similarities = self.similarity_search_with_relevance_scores(...)
        return [doc for doc, _ in docs_and_similarities]
    if search_type == "mmr":
        return self.max_marginal_relevance_search(query, **kwargs)
搜索类型 方法 说明
similarity similarity_search() 余弦相似度 top-k,唯一的抽象方法
similarity_score_threshold similarity_search_with_relevance_scores() 带分数阈值过滤
mmr max_marginal_relevance_search() 最大边际相关——兼顾相似度与多样性
# vectorstores/base.py:360-373  ← 唯一的抽象方法
@abstractmethod
def similarity_search(
    self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
    """Return docs most similar to query."""

工厂方法

# vectorstores/base.py:846-868  ← from_texts(抽象)
@classmethod
@abstractmethod
def from_texts(cls, texts, embedding, metadatas=None, *, ids=None, **kwargs) -> VST:
    """Return VectorStore initialized from texts and embeddings."""

# vectorstores/base.py:786-814  ← from_documents(基于 from_texts)
@classmethod
def from_documents(cls, documents, embedding, **kwargs) -> Self:
    texts = [d.page_content for d in documents]
    metadatas = [d.metadata for d in documents]
    return cls.from_texts(texts, embedding, metadatas=metadatas, **kwargs)

from_texts 是第二个抽象方法。from_documents 是便捷包装。两者都接收 embedding: Embeddings 参数——这是 Embeddings 被注入 VectorStore 的主要入口。

相关度分数转换

VectorStore 提供三种距离到相似度的转换函数:

# vectorstores/base.py:375-401
@staticmethod
def _euclidean_relevance_score_fn(distance: float) -> float:
    return 1.0 - distance / math.sqrt(2)   # [0, sqrt(2)] → [1, 0]

@staticmethod
def _cosine_relevance_score_fn(distance: float) -> float:
    return 1.0 - distance                   # cosine distance → similarity

@staticmethod
def _max_inner_product_relevance_score_fn(distance: float) -> float:
    if distance > 0:
        return 1.0 - distance
    return -1.0 * distance

子类通过覆盖 _select_relevance_score_fn() 选择适合自己后端的转换函数。

as_retriever——桥接到 LCEL

# vectorstores/base.py:905-961
def as_retriever(self, **kwargs) -> VectorStoreRetriever:
    tags = kwargs.pop("tags", None) or [*self._get_retriever_tags()]
    return VectorStoreRetriever(vectorstore=self, tags=tags, **kwargs)

一行代码,把 VectorStore 包装成 BaseRetriever 子类,接入 LCEL 管道。


VectorStoreRetriever——从 VectorStore 到 Runnable

打开 vectorstores/base.py:964

# vectorstores/base.py:964-985
class VectorStoreRetriever(BaseRetriever):
    """Base Retriever class for VectorStore."""

    vectorstore: VectorStore
    search_type: str = "similarity"
    search_kwargs: dict = Field(default_factory=dict)

    allowed_search_types: ClassVar[Collection[str]] = (
        "similarity",
        "similarity_score_threshold",
        "mmr",
    )

输入验证

# vectorstores/base.py:986-1016
@model_validator(mode="before")
@classmethod
def validate_search_type(cls, values: dict) -> Any:
    search_type = values.get("search_type", "similarity")
    if search_type not in cls.allowed_search_types:
        raise ValueError(...)
    if search_type == "similarity_score_threshold":
        score_threshold = values.get("search_kwargs", {}).get("score_threshold")
        if (score_threshold is None) or (not isinstance(score_threshold, float)):
            raise ValueError("`score_threshold` is not specified...")
    return values

如果用 similarity_score_threshold 搜索类型,必须提供 score_threshold 参数。

核心检索逻辑

# vectorstores/base.py:1039-1058
def _get_relevant_documents(self, query, *, run_manager, **kwargs):
    kwargs_ = self.search_kwargs | kwargs
    if self.search_type == "similarity":
        docs = self.vectorstore.similarity_search(query, **kwargs_)
    elif self.search_type == "similarity_score_threshold":
        docs_and_similarities = (
            self.vectorstore.similarity_search_with_relevance_scores(query, **kwargs_)
        )
        docs = [doc for doc, _ in docs_and_similarities]
    elif self.search_type == "mmr":
        docs = self.vectorstore.max_marginal_relevance_search(query, **kwargs_)
    return docs

_get_relevant_documentsBaseRetriever 的抽象方法——实现后,retriever.invoke(query) 就能工作了。

LangSmith 追踪集成

# vectorstores/base.py:1018-1037
def _get_ls_params(self, **kwargs) -> LangSmithRetrieverParams:
    ls_params = super()._get_ls_params(**kwargs_)
    ls_params["ls_vector_store_provider"] = self.vectorstore.__class__.__name__
    if self.vectorstore.embeddings:
        ls_params["ls_embedding_provider"] = (
            self.vectorstore.embeddings.__class__.__name__
        )
    return ls_params

追踪记录了 VectorStore 和 Embeddings 的类名,方便在 LangSmith 中观测。


InMemoryVectorStore——从零到一的完整实现

打开 vectorstores/in_memory.py:34。这是 LangChain Core 内置的唯一具体 VectorStore 实现,用纯 Python + NumPy 实现,适合教学和小规模场景。

构造器——注入 Embeddings

# vectorstores/in_memory.py:161-170
def __init__(self, embedding: Embeddings) -> None:
    self.store: dict[str, dict[str, Any]] = {}  # id → {id, vector, text, metadata}
    self.embedding = embedding

@property
def embeddings(self) -> Embeddings:
    return self.embedding

存储就是一个字典。每条记录包含 4 个字段:idvectortextmetadata

写入——embed_documents 的调用点

# vectorstores/in_memory.py:188-221
def add_documents(self, documents, ids=None, **kwargs) -> list[str]:
    texts = [doc.page_content for doc in documents]
    vectors = self.embedding.embed_documents(texts)  # ← 批量嵌入

    id_iterator = iter(ids) if ids else iter(doc.id for doc in documents)
    ids_ = []
    for doc, vector in zip(documents, vectors):
        doc_id = next(id_iterator)
        doc_id_ = doc_id or str(uuid.uuid4())
        ids_.append(doc_id_)
        self.store[doc_id_] = {
            "id": doc_id_,
            "vector": vector,       # ← 向量存入字典
            "text": doc.page_content,
            "metadata": doc.metadata,
        }
    return ids_

写入流程:

documents: [Doc1, Doc2, Doc3]
     │
     ↓ .page_content
texts: ["text1", "text2", "text3"]
     │
     ↓ embedding.embed_documents(texts)
vectors: [[0.1, 0.3, ...], [0.2, 0.5, ...], [0.7, 0.1, ...]]
     │
     ↓ zip(documents, vectors) → 逐条存入 self.store
store: {
    "uuid-1": {"id": "uuid-1", "vector": [0.1, 0.3, ...], "text": "text1", "metadata": {...}},
    "uuid-2": {"id": "uuid-2", "vector": [0.2, 0.5, ...], "text": "text2", "metadata": {...}},
    "uuid-3": {"id": "uuid-3", "vector": [0.7, 0.1, ...], "text": "text3", "metadata": {...}},
}

搜索——embed_query 的调用点

# vectorstores/in_memory.py:358-370
def similarity_search_with_score(self, query, k=4, **kwargs):
    embedding = self.embedding.embed_query(query)    # ← 查询嵌入
    return self.similarity_search_with_score_by_vector(embedding, k, **kwargs)

查询流程:

query: "编程语言"
     │
     ↓ embedding.embed_query(query)
query_vec: [0.15, 0.4, ...]
     │
     ↓ cosine_similarity([query_vec], [所有文档向量])
similarities: [0.92, 0.88, 0.31, 0.25]
     │
     ↓ argsort()[::-1][:k]  ← top-k 索引
top_k_idx: [0, 1]
     │
     ↓ 构造 Document + score 返回
[(Doc1, 0.92), (Doc2, 0.88)]

核心搜索实现

# vectorstores/in_memory.py:291-332
def _similarity_search_with_score_by_vector(self, embedding, k=4, filter=None):
    docs = list(self.store.values())

    if filter is not None:
        docs = [doc for doc in docs
                if filter(Document(id=doc["id"], page_content=doc["text"],
                                   metadata=doc["metadata"]))]

    if not docs:
        return []

    similarity = cosine_similarity(         # ← vectorstores/utils.py
        [embedding], [doc["vector"] for doc in docs]
    )[0]

    top_k_idx = similarity.argsort()[::-1][:k]  # ← 降序取前 k

    return [
        (Document(id=doc_dict["id"], page_content=doc_dict["text"],
                  metadata=doc_dict["metadata"]),
         float(similarity[idx].item()),
         doc_dict["vector"])
        for idx in top_k_idx
        if (doc_dict := docs[idx])
    ]

注意 filter 参数接受一个 Callable[[Document], bool]——这是内存实现的灵活之处,可以用 Python 函数做任意过滤。

持久化——dump / load

# vectorstores/in_memory.py:537-546
def dump(self, path: str) -> None:
    path_ = Path(path)
    path_.parent.mkdir(exist_ok=True, parents=True)
    with path_.open("w", encoding="utf-8") as f:
        json.dump(dumpd(self.store), f, indent=2)

@classmethod
def load(cls, path, embedding, **kwargs) -> InMemoryVectorStore:
    with Path(path).open("r", encoding="utf-8") as f:
        store = load(json.load(f), allowed_objects=[Document])
    vectorstore = cls(embedding=embedding, **kwargs)
    vectorstore.store = store
    return vectorstore

使用 LangChain 的 dumpd / load 序列化工具,将 store 字典存为 JSON。注意加载时仍需传入 embedding——向量已存在文件中,但 Embeddings 实例不会被序列化。


余弦相似度与 MMR 算法

打开 vectorstores/utils.py。这是 VectorStore 的数学引擎。

余弦相似度

# vectorstores/utils.py:35-103
def _cosine_similarity(x: Matrix, y: Matrix) -> np.ndarray:
    """Row-wise cosine similarity between two equal-width matrices."""
    x = np.array(x)
    y = np.array(y)

    # NaN / Inf 检查(仅警告,不阻断)
    if np.any(np.isnan(x)) or np.any(np.isnan(y)):
        warnings.warn("NaN found in input arrays...")
    if np.any(np.isinf(x)) or np.any(np.isinf(y)):
        warnings.warn("Inf found in input arrays...")

    if x.shape[1] != y.shape[1]:
        raise ValueError(f"Number of columns in X and Y must be the same...")

    if not _HAS_SIMSIMD:
        # NumPy 实现
        x_norm = np.linalg.norm(x, axis=1)
        y_norm = np.linalg.norm(y, axis=1)
        similarity = np.dot(x, y.T) / np.outer(x_norm, y_norm)
        similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
        return similarity
    else:
        # SimSIMD 加速实现
        return 1 - np.array(simd.cdist(x, y, metric="cosine"))

计算公式:

cosine_similarity(A, B) = (A · B) / (||A|| × ||B||)

其中:
  A · B    = Σ(ai × bi)     ← 内积
  ||A||    = √Σ(ai²)        ← L2 范数

两层实现策略:

  1. SimSIMD 可用 → 使用 SIMD 指令加速,1 - cdist(cosine)
  2. 仅 NumPy → 手动计算 dot(x, y.T) / outer(x_norm, y_norm)

最大边际相关性(MMR)

# vectorstores/utils.py:106-157
def maximal_marginal_relevance(
    query_embedding: np.ndarray,
    embedding_list: list,
    lambda_mult: float = 0.5,
    k: int = 4,
) -> list[int]:
    if min(k, len(embedding_list)) <= 0:
        return []

    similarity_to_query = _cosine_similarity(query_embedding, embedding_list)[0]
    most_similar = int(np.argmax(similarity_to_query))
    idxs = [most_similar]
    selected = np.array([embedding_list[most_similar]])

    while len(idxs) < min(k, len(embedding_list)):
        best_score = -np.inf
        idx_to_add = -1
        similarity_to_selected = _cosine_similarity(embedding_list, selected)
        for i, query_score in enumerate(similarity_to_query):
            if i in idxs:
                continue
            redundant_score = max(similarity_to_selected[i])
            equation_score = (
                lambda_mult * query_score - (1 - lambda_mult) * redundant_score
            )
            if equation_score > best_score:
                best_score = equation_score
                idx_to_add = i
        idxs.append(idx_to_add)
        selected = np.append(selected, [embedding_list[idx_to_add]], axis=0)
    return idxs

MMR 的核心思想——贪心选择,平衡相关性和多样性

MMR(Di) = λ × Sim(Di, Q) - (1-λ) × max[Sim(Di, Dj)]
                                      j∈已选集

λ = 1.0 → 纯相似度排序(最相关)
λ = 0.0 → 纯多样性排序(最不像已选的)
λ = 0.5 → 平衡(默认)

算法步骤:

1. 计算所有文档与查询的相似度
2. 选出最相似的文档放入结果集
3. 循环 k-1 次:
   a. 对每个未选文档计算 MMR 分数
   b. MMR = λ × query_sim - (1-λ) × max(与已选文档的相似度)
   c. 选 MMR 最高的文档加入结果集
4. 返回选中的文档索引

InMemoryVectorStore 中的 MMR 调用

# vectorstores/in_memory.py:418-448
def max_marginal_relevance_search_by_vector(
    self, embedding, k=4, fetch_k=20, lambda_mult=0.5, *, filter=None, **kwargs
):
    prefetch_hits = self._similarity_search_with_score_by_vector(
        embedding=embedding, k=fetch_k, filter=filter   # 先取 fetch_k 个候选
    )
    mmr_chosen_indices = maximal_marginal_relevance(
        np.array(embedding, dtype=np.float32),
        [vector for _, _, vector in prefetch_hits],       # 候选向量
        k=k, lambda_mult=lambda_mult,
    )
    return [prefetch_hits[idx][0] for idx in mmr_chosen_indices]

两阶段策略:先用余弦相似度取 fetch_k(默认 20)个候选,再用 MMR 从中选 k(默认 4)个。


RAG 完整链路——从文本到答案

把所有组件串起来:

                   RAG 数据流全景图
═══════════════════════════════════════════════════════════

 原始文本              嵌入               向量存储
┌─────────┐     ┌──────────────┐     ┌──────────────┐
│ "Python  │     │              │     │ store:       │
│  是一门  │ ──→ │  Embeddings  │ ──→ │ {id: vec,    │
│  编程语言│     │ .embed_docs()│     │  text, meta} │
│ "        │     │              │     │              │
└─────────┘     └──────────────┘     └──────┬───────┘
                                            │
                                     .as_retriever()
                                            │
 用户查询              嵌入               ┌──▼───────────┐
┌─────────┐     ┌──────────────┐     │ VectorStore-  │
│"编程语言 │     │              │     │ Retriever     │
│  有哪些" │ ──→ │  Embeddings  │ ──→ │               │
│          │     │ .embed_query │     │ _get_relevant │
└─────────┘     │ ()           │     │ _documents()  │
                └──────────────┘     └──────┬───────┘
                                            │
                                   cosine_similarity
                                   / MMR 选择 top-k
                                            │
                                     ┌──────▼───────┐
                                     │ [Document,   │
                                     │  Document,   │
                                     │  ...]        │
                                     └──────┬───────┘
                                            │
                                     可接入 LLM 生成回答
                                     retriever | prompt | llm

LCEL 管道示例(概念)

# 概念代码(需要真实 LLM 才能运行)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

prompt = ChatPromptTemplate.from_template(
    "根据以下上下文回答问题:\n{context}\n\n问题:{question}"
)

# retriever 是 Runnable,可以用 | 组合
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
)
# chain.invoke("Python 是什么?")

这就是为什么 VectorStoreRetriever 继承 BaseRetriever(继承 RunnableSerializable)——它让向量检索可以无缝融入 LCEL 管道。


VectorStore 方法对比表

方法 类型 说明
similarity_search(query, k) 抽象 余弦相似度 top-k
similarity_search_by_vector(embedding, k) 可选 直接用向量搜索
similarity_search_with_score(query, k) 可选 返回 (doc, score)
similarity_search_with_relevance_scores(query, k) 内置 分数归一到 [0,1]
max_marginal_relevance_search(query, k, fetch_k, λ) 可选 MMR 搜索
from_texts(texts, embedding) 抽象类方法 构造 + 批量添加
from_documents(docs, embedding) 类方法 基于 from_texts
add_texts(texts, metadatas) 可选 增量添加文本
add_documents(docs) 可选 增量添加文档
delete(ids) 可选 按 ID 删除
get_by_ids(ids) 可选 按 ID 获取
as_retriever(**kwargs) 内置 转为 Retriever

“抽象” = 子类必须实现;“可选” = 基类有默认(通常是 raise NotImplementedError);“内置” = 基类有完整实现。


八个系列总览

系列 核心类 源码位置 行数
Messages BaseMessage / AIMessage / HumanMessage / ToolMessage messages/ ~1500
Runnables Runnable / RunnableSequence / RunnableLambda runnables/ ~5000
ChatModel BaseChatModel / generate / stream language_models/ ~1400
Outputs Generation / ChatResult / AIMessageChunk outputs/ ~600
Prompts BasePromptTemplate / ChatPromptTemplate prompts/ ~2200
Tools BaseTool / StructuredTool / tool 装饰器 tools/ ~1600
Tracers BaseTracer / LangSmithTracer / ConsoleCallbackHandler tracers/ + callbacks/ ~2000
Embeddings Embeddings / VectorStore / InMemoryVectorStore embeddings/ + vectorstores/ ~1900

小结

要点 内容
VectorStore 抽象方法 similarity_search() + from_texts()
写入流程 add_documents()embed_documents() → 存储向量
查询流程 similarity_search()embed_query() → 余弦相似度 → top-k
MMR 算法 贪心选择,λ × 相关性 - (1-λ) × 冗余性
InMemoryVectorStore 字典存储 + NumPy 余弦相似度,内置 dump/load
VectorStoreRetriever BaseRetriever 子类,as_retriever() 一键转换
完整链路 文本 → Embeddings → 向量 → VectorStore → Retriever → Documents
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐