检索器(Retrievers)

相关概念
检索系统

检索系统(Information Retrieval System, IR System)是一个为了满足用户信息需求,从大规模、非结构化的数据集合中,自动、高效地查找、排序并返回相关信息的计算机系统。

它的核心任务是:在正确的时间,以正确的方式,将正确的信息传递给正确的人。最常见的例子就是:搜索引擎(如Google百度)。

随着大型语言模型的流行,检索系统已成为人工智能应用(例如 RAG)的重要组成部分。且存在多种【不同类型】的检索系统,包括:

  • 关系数据库: 关系数据库是许多应用程序中使用的结构化数据存储的基本类型。数据存储在行(记录)和列(属性)中,可以通过 SQL(结构化查询语言)进行高效的查询合作。关系数据库擅长维护数据完整性、支持复杂查询以及处理不同数据实体之间的关系。

  • 词法搜索索引: 许多搜索引擎基于将查询中的单词与每个文档中的单词进行匹配,这种方法称为词法检索。即一个单词经常出现在用户的查询和特定文档中,那么这个文档可能是一个很好的匹配。这通常使用【倒排索引】实现。

  • 向量数据库: 向量存储不使用词频,而是使用【嵌入模型】将文档转换为高维向量表示。这允许使用余弦相似度等数学运算对嵌入向量进行有效的相似性搜索。


检索器

检索器是检索系统中的一个核心组件,它接收来自用户接口的查询(Query),检索出包含查询关键词的候选文档集合。

我们可以使用上面提到的任何检索系统实现方式创建检索器!如关系数据库、向量数据库等。由于其重要性和多样性,LangChain 提供了一个统一的接口来与不同类型的检索系统进行交互。LangChain 的检索器接口非常简单:

  1. 输入:查询字符串
  2. 输出:文档列表(标准化的 LangChain 文档对象 Document

例如,使用关系数据库的检索系统,检索器可以将问题转换为 SQL 语句,并执行查询,最后将查询结果响应用户。(该示例将来会在 LangGraph 中讲解,在这里帮助理解概念即可)

下面,我们来演示如何使用向量存储作为检索器。


使用向量数据库作为检索器

基本使用

向量存储是索引和检索非结构化数据的一种强大而有效的方法。可以通过调用向量数据库的 as_retriever 方法,将向量存储用作检索器。在这里我们使用 Redis 向量存储。

"""
LangChain + Redis 向量检索示例
功能:使用 OpenAI 嵌入模型将文档向量化,存储在 Redis 中,并进行相似性检索

依赖库:
    pip install langchain-openai langchain-redis redis openai
"""

# ==================== 导入必要的库 ====================
from langchain_openai import OpenAIEmbeddings
from langchain_redis import RedisConfig, RedisVectorStore

# ==================== 1. 定义嵌入模型 ====================
# OpenAIEmbeddings: 将文本转换为向量(embedding)
# model="text-embedding-3-large": 使用 OpenAI 最新的 embedding 模型 v3 大版本
#   - 维度:3072 维
#   - 性能:比 v2 更好,支持降低维度(通过 dimensions 参数)
#   - 成本:相比 text-embedding-ada-002 略高
# 
# 其他可选模型:
#   - "text-embedding-3-small": 小模型,1536 维,成本更低
#   - "text-embedding-ada-002": 老版本,1536 维
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

# ==================== 2. 配置 Redis 客户端 ====================
# Redis 连接 URL 格式:redis://[用户名:密码@]主机:端口/数据库号
redis_url = "redis://192.168.100.238:6379"

# RedisConfig: Redis 向量存储的配置类
# 参数说明:
#   - index_name: Redis 索引名称(必须唯一,用于标识不同的向量集合)
#   - redis_url: Redis 连接地址
#   - metadata_schema: 元数据字段的 schema 定义(用于过滤和精确查询)
config = RedisConfig(
    index_name="qa",                                    # 索引名称,用于区分不同的业务场景
    redis_url=redis_url,                                # Redis 服务器地址
    metadata_schema=[                                   # 元数据字段定义(可选)
        {
            "name": "category",                         # 字段名称
            "type": "tag"                               # 类型:tag(标签,用于分类过滤)
        },
        {
            "name": "num",                              # 字段名称
            "type": "numeric"                           # 类型:numeric(数字,用于范围过滤)
        },
    ],
)

# 元数据类型说明:
#   - tag: 标签类型,适合枚举值过滤(如 category="AI")
#   - numeric: 数字类型,适合范围查询(如 num >= 10 and num <= 100)
#   - text: 文本类型,支持全文搜索(较少用于向量检索的元数据)
#   - geo: 地理位置类型(如坐标点)

# ==================== 3. Redis 向量存储初始化 ====================
# RedisVectorStore: 创建 Redis 向量存储实例
# 功能:
#   1. 管理文档的向量化存储
#   2. 支持向量相似性检索
#   3. 支持元数据过滤
#   4. 支持批量操作
vector_store = RedisVectorStore(embeddings, config=config)

# ==================== 4. 获取检索器并执行查询 ====================
# as_retriever(): 将向量存储转换为检索器对象
# 检索器提供统一的接口:invoke() 方法进行检索
# 
# 可选参数(可以在 as_retriever 中配置):
#   - search_type: 检索类型
#     * "similarity": 纯向量相似度(默认)
#     * "mmr": 最大边际相关性(多样性)
#     * "similarity_score_threshold": 带阈值过滤
#   - search_kwargs: 检索参数
#     * k: 返回文档数量(默认 4)
#     * filter: 元数据过滤条件
#     * score_threshold: 相似度阈值
#
# 示例:
#   retriever = vector_store.as_retriever(
#       search_type="similarity",
#       search_kwargs={"k": 5, "filter": {"category": "AI"}}
#   )
retriever = vector_store.as_retriever()

# invoke(): 执行相似性检索
# 参数:查询字符串(自然语言问题)
# 返回:Document 对象列表,按相似度降序排列
#   - Document.page_content: 文本内容
#   - Document.metadata: 元数据(包含 category, num 等)
docs = retriever.invoke("数据库表怎么设计的? ")

# ==================== 5. 输出检索结果 ====================
# 遍历检索结果并打印
for idx, doc in enumerate(docs, 1):
    print("*" * 30)                      # 分隔线
    print(f"结果 {idx}:")                 # 结果序号
    print(f"内容: {doc.page_content[:100]}...")  # 只打印前100个字符
    print(f"元数据: {doc.metadata}")      # 打印元数据信息
    # 如果只想打印原始的前30个字符(保持原代码风格)
    # print(doc.page_content[:30])

# ==================== 扩展知识 ====================
# 
# 1. 如何添加文档到向量存储?
#    texts = ["文档1内容", "文档2内容"]
#    metadatas = [{"category": "AI", "num": 1}, {"category": "DB", "num": 2}]
#    vector_store.add_texts(texts, metadatas=metadatas)
#
# 2. 如何删除文档?
#    vector_store.delete(ids=["doc_id_1", "doc_id_2"])
#
# 3. 如何使用元数据过滤?
#    retriever = vector_store.as_retriever(
#        search_kwargs={"filter": {"category": "AI"}}  # 只检索 category="AI" 的文档
#    )
#
# 4. 如何设置相似度阈值?
#    retriever = vector_store.as_retriever(
#        search_type="similarity_score_threshold",
#        search_kwargs={"score_threshold": 0.8}  # 只返回相似度 > 0.8 的结果
#    )
#
# 5. Redis 索引查看命令(redis-cli 中执行):
#    FT.INFO qa                    # 查看索引信息
#    FT.SEARCH qa "*" LIMIT 0 10   # 搜索所有文档
#    FT.DROPINDEX qa               # 删除索引(不删除数据)
#    DEL qa:docs                   # 删除索引和数据
#
# 6. 环境变量配置(推荐):
#    export OPENAI_API_KEY="your-api-key"
#    export OPENAI_BASE_URL="https://your-proxy.com/v1"  # 如有代理

LangChain 检索器是一个 Runnable 的对象,它是 LangChain 组件的标准接口。这意味着它有一些常用方法,包括 invoke,用于与其交互。默认情况下,向量存储检索器使用相似性搜索。

输出结果如下:

******************************
提供两种方案有以下好处:
降低门槛: 单机版让用户快速体验核
******************************
数据库: MySQL + MyBatis

MySQL 和 M
******************************
不同状态(上架/已租/下架)的查询频率不同,拆分可减少锁争用
******************************
分库分表: 按user_id哈希分库,按月份分表(chat_

as_retriever 方法也支持我们传递相关参数修改搜索结果,如:

search_type: 设置相似算法,包括: "similarity"(默认)、"mmr""similarity_score_threshold"(相似性分数阈值)

search_kwargs:

  • k: 限制检索器返回的文档 k 数量。
retriever = vector_store.as_retriever(search_kwargs={"k": 2})
  • fetch_k: 要传递给 MMR 算法的文档量。
retriever = vector_store.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 2,
        "fetch_k": 10
    }
)

要注意的是,Retrievers 检索器虽然是 Runnable 对象,但其不提供任何流式处理,因为它本身通常是同步的、阻塞的操作。如下所示:

chunks = []
for chunk in retriever.stream("数据库表怎么设计的? "):
    chunks.append(chunk)
    print(chunk, end="|", flush=True)

调用可以发现,它是一次性返回,与 invoke 调用无异。

其实这也不难理解:检索器的本质是"查询"而非"生成"


使用 @chain 创建 “检索器”

除了使用 as_retriever 方法,我们还可以自行创建一个 “检索器”。回想一下检索器的特点:

  1. LangChain 检索器是一个 Runnable 的对象
  2. LangChain 检索器输入为查询字符串,输出为文档列表(标准化的 LangChain 文档对象 Document

这个就是我们需要修饰函数的函数本身的写法限制【输入输出,返回参数】

综上所述,我们可以:

"""
LangChain 自定义检索器示例
功能:使用 @chain 装饰器将普通函数转换为 Runnable 对象

依赖库:
    pip install langchain langchain-core langchain-openai langchain-redis redis openai
"""

# ==================== 导入必要的库 ====================
from langchain_core.runnables import chain
from typing import List
from langchain_core.documents import Document

# ==================== @chain 装饰器详解 ====================
# @chain 是 LangChain 提供的一个装饰器,用于将普通函数转换为 Runnable 对象
# 
# 装饰前后的对比:
# 
# 装饰前(普通函数):
#   def my_func(x):
#       return x * 2
#   result = my_func(5)  # 只能直接调用
#
# 装饰后(Runnable 对象):
#   @chain
#   def my_func(x):
#       return x * 2
#   result = my_func.invoke(5)  # 支持 Runnable 接口
#   result = my_func.batch([1, 2, 3])  # 支持批量
#   my_func | another_runnable  # 支持管道操作
#   my_func.bind()  # 支持绑定参数
#   my_func.with_config()  # 支持配置
# 
# Runnable 接口的优势:
#   1. 统一接口:与 LCEL(LangChain Expression Language)无缝集成
#   2. 可组合性:可以使用管道运算符 | 连接多个 Runnable
#   3. 批处理:自动支持 batch() 方法
#   4. 异步支持:自动支持 ainvoke(), abatch() 等方法
#   5. 可观测性:自动集成 LangSmith 等追踪工具

@chain
def retriever(query: str) -> List[Document]:
    """
    自定义检索器函数
    
    经过 @chain 装饰后,这个函数变成了一个 Runnable 对象,具有以下能力:
        - .invoke(query): 同步调用
        - .ainvoke(query): 异步调用
        - .batch([query1, query2]): 批量调用
        - .stream(query): 流式输出(虽然检索器不支持真正的流式)
        - .bind(**kwargs): 绑定参数
        - | 运算符: 与其他 Runnable 组合
    
    参数说明:
        query (str): 查询字符串,用户的自然语言问题
    
    返回值说明:
        List[Document]: 检索到的文档列表,每个 Document 包含:
            - page_content: 文档的文本内容
            - metadata: 文档的元数据(如来源、作者、时间戳等)
    
    函数内部逻辑:
        1. 调用 vector_store.similarity_search() 执行向量相似度检索
        2. 指定 k=2 表示返回最相似的 2 个文档
        3. 返回检索到的文档列表
    """
    # similarity_search(): 向量相似度检索方法
    #   工作原理:
    #     1. 将查询文本转换为向量(使用 OpenAIEmbeddings)
    #     2. 在向量数据库中计算与所有文档向量的余弦相似度
    #     3. 按相似度降序排序
    #     4. 返回前 k 个最相似的文档
    # 
    # 参数说明:
    #   - query: 查询字符串
    #   - k: 返回的文档数量(默认为 4)
    #   - filter: 元数据过滤条件(可选)
    #   - score_threshold: 相似度阈值(可选)
    #
    # 返回值:
    #   List[Document]: 文档列表,按相似度从高到低排序
    return vector_store.similarity_search(query, k=2)

# ==================== 使用自定义检索器 ====================
# 调用方式1: 使用 .invoke() 方法(推荐,符合 Runnable 规范)
# invoke 是 Runnable 接口的标准方法,会调用函数并返回结果
docs = retriever.invoke("数据库表怎么设计的? ")

# 其他调用方式示例:
# ====================
# 1. 直接调用(也可以,但不符合 Runnable 规范)
# docs = retriever("数据库表怎么设计的? ")
#
# 2. 异步调用
# import asyncio
# async def main():
#     docs = await retriever.ainvoke("数据库表怎么设计的? ")
# asyncio.run(main())
#
# 3. 批量调用(自动处理多个查询)
# queries = ["数据库表怎么设计的?", "什么是索引?", "如何优化SQL?"]
# results = retriever.batch(queries)
# for docs in results:
#     print(f"查询返回 {len(docs)} 个文档")
#
# 4. 带配置的调用
# docs = retriever.with_config({"run_name": "my_retriever"}).invoke("问题")
#
# 5. 使用管道操作符(LCEL)
# from langchain_core.prompts import PromptTemplate
# from langchain_openai import ChatOpenAI
# 
# prompt = PromptTemplate.from_template("基于以下文档回答问题:{context}\n问题:{question}")
# llm = ChatOpenAI(model="gpt-4")
# 
# # 构建 RAG 链
# rag_chain = retriever | prompt | llm
# 
# # 执行
# result = rag_chain.invoke("数据库表怎么设计的? ")
#
# 6. 使用 bind() 绑定参数
# bound_retriever = retriever.bind(k=5)  # 默认返回 5 个文档
# docs = bound_retriever.invoke("问题")

# ==================== 输出检索结果 ====================
# 遍历并打印检索到的文档
print("=" * 50)
print(f"检索查询: 数据库表怎么设计的?")
print(f"返回文档数量: {len(docs)}")
print("=" * 50)

for idx, doc in enumerate(docs, 1):
    print("*" * 30)
    print(f"文档 {idx}:")
    print(f"内容预览: {doc.page_content[:100]}...")  # 打印前100个字符
    print(f"元数据: {doc.metadata}")                  # 打印元数据信息
    print(f"完整内容长度: {len(doc.page_content)} 字符")
    
    # ==================== 原始代码(保持不变)====================
    # 这段原始代码只打印前30个字符
    # print(doc.page_content[:30])

# ==================== @chain 装饰器的高级用法 ====================

# 1. 带参数绑定的函数
@chain
def retriever_with_score(query: str) -> List[tuple]:
    """
    返回带相似度分数的检索结果
    """
    return vector_store.similarity_search_with_score(query, k=2)

# 使用方式
# docs_with_scores = retriever_with_score.invoke("问题")
# for doc, score in docs_with_scores:
#     print(f"相似度: {score}, 内容: {doc.page_content[:50]}")

# 2. 带元数据过滤的检索器
@chain
def filtered_retriever(query: str, category: str = None) -> List[Document]:
    """
    支持元数据过滤的检索器
    """
    filter_dict = {}
    if category:
        filter_dict["category"] = category
    
    if filter_dict:
        return vector_store.similarity_search(query, k=2, filter=filter_dict)
    else:
        return vector_store.similarity_search(query, k=2)

# 使用方式
# docs = filtered_retriever.invoke("问题", {"category": "AI"})

# 3. 组合多个检索器
@chain
def ensemble_retriever(query: str) -> List[Document]:
    """
    组合多个检索策略的结果
    """
    # 向量检索
    vector_docs = vector_store.similarity_search(query, k=1)
    
    # 可以添加其他检索方式
    # keyword_docs = keyword_search(query, k=1)
    # bm25_docs = bm25_search(query, k=1)
    
    # 去重并返回
    return vector_docs  # + keyword_docs + bm25_docs

# ==================== 装饰前后的类型变化 ====================
def without_decorator(query: str) -> List[Document]:
    """普通函数"""
    return vector_store.similarity_search(query, k=2)

# 类型对比:
# type(without_decorator)  -> <class 'function'>          # 普通函数
# type(retriever)           -> <class 'RunnableBinding'>    # Runnable 对象

# 普通函数只能直接调用:
result1 = without_decorator("问题")

# Runnable 对象支持多种调用方式:
result2 = retriever.invoke("问题")              # 标准调用
result3 = retriever.batch(["问题1", "问题2"])   # 批量调用
result4 = await retriever.ainvoke("问题")      # 异步调用(需要在 async 函数中)

# ==================== 实际应用示例 ====================
# 构建一个完整的 RAG 问答链
if __name__ == "__main__":
    # 假设已经初始化了 vector_store
    # from your_config import vector_store
    
    # 定义检索器
    @chain
    def my_retriever(query: str) -> List[Document]:
        return vector_store.similarity_search(query, k=3)
    
    # 测试检索
    question = "数据库表怎么设计的?"
    print(f"\n查询: {question}")
    print("正在检索...")
    
    docs = my_retriever.invoke(question)
    
    print(f"\n找到 {len(docs)} 个相关文档:")
    for i, doc in enumerate(docs, 1):
        print(f"\n文档 {i}:")
        print(f"  {doc.page_content[:150]}...")
        if doc.metadata:
            print(f"  元数据: {doc.metadata}")

上面定义了一个函数,使用 @chain 修饰,该修饰可以使其成为 Runnable 函数,且满足检索器输入输出的要求。在函数中,我们依旧使用向量数据库的相似性搜索方法,这样灵活性也更高,想要进行元数据筛选也更方便。

注意,这并不是真正的检索器,检索器是一个 Runnable 对象,而我们定义的只是一个函数,具备其特点罢了。

RAG 案例

RAG 是当前大语言模型应用的核心模式。当用户向 LLM 提问时,系统首先使用嵌入模型在知识库中进行语义搜索,找到最相关的内容,然后将这些内容和问题一起交给 LLM 来生成答案。这极大地提高了答案的准确性和时效性。

由于 LangChain 检索器是一个 Runnable 的对象,我们便可以方便的使用链完成相关的调用。下面我们来完成一个最简单的 RAG 案例,将会完成:

  1. 根据 Query 搜索最相关的 4 篇文档。
  2. 将相关的文档转换为字符串,以便后续发送给聊天模型。
  3. 将 Query 与文档字符串发送给聊天模型。
  4. 聊天模型依据输出解析器格式输出内容。
# 导入OpenAI相关模块:嵌入模型(生成向量) + 聊天大模型(生成回答)
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
# 导入Redis向量数据库相关配置与存储类
from langchain_redis import RedisConfig, RedisVectorStore
# 导入输出解析器:把LLM返回结果转成普通字符串
from langchain_core.output_parsers import StrOutputParser
# 导入提示词模板:构造给大模型的提问格式
from langchain_core.prompts import ChatPromptTemplate
# 导入数据透传工具:在链中直接传递原始问题
from langchain_core.runnables import RunnablePassthrough

# ===================== 1. 初始化大模型 =====================
# 定义用于最终回答生成的聊天模型(gpt-4o-mini)
model = ChatOpenAI(model="gpt-4o-mini")
# 定义用于把文本转成向量的嵌入模型(生成向量用于检索)
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

# ===================== 2. 配置Redis向量数据库 =====================
# Redis服务地址(IP + 端口)
redis_url = "redis://192.168.100.238:6379"
# 创建Redis配置对象:指定索引名、连接地址、元数据字段结构
config = RedisConfig(
    index_name="qa",                # 向量数据库索引名称
    redis_url=redis_url,            # Redis连接地址
    metadata_schema=[               # 元数据字段定义(用于过滤检索)
        {"name": "category", "type": "tag"},
        {"name": "num", "type": "numeric"},
    ],
)

# ===================== 3. 创建向量存储 & 检索器 =====================
# 初始化Redis向量存储:绑定嵌入模型 + 配置
vector_store = RedisVectorStore(embeddings, config=config)
# 把向量库转换成LangChain标准检索器(用于根据问题查相关文档)
retriever = vector_store.as_retriever()

# ===================== 4. 构造提示词模板 =====================
# 创建提示词模板:规定AI如何回答、使用什么上下文、遵守什么规则
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "human",
            # 提示词内容:约束AI行为 + 传入问题 + 传入检索到的上下文
            """你是负责回答问题的助手。使用以下检索到的上下文片段来回答问题。如果你不知道答案,就说你不知道。最多只用三句话,回答要简明扼要。
Question: {question}    # 用户的问题
Context: {context}      # 检索出来的相关文档内容
Answer:""",             # AI回答开始标记
        ),
    ]
)

# ===================== 5. 文档格式化工具 =====================
# 定义函数:把检索出来的多个文档对象 拼接成一段纯文本
def format_docs(docs):
    # 用两个换行分隔多个文档内容,方便AI阅读
    return "\n\n".join(doc.page_content for doc in docs)

# ===================== 6. 组装RAG链(核心流程) =====================
# 定义完整RAG链:检索 → 格式化 → 提示词 → 模型 → 解析输出
rag_chain = (
    # 第一步:构造输入字典
    # context:检索器查询 → 文档转字符串
    # question:透传用户原始问题
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    # 第二步:把上下文+问题填入提示词模板
    | prompt
    # 第三步:把提示词送给大模型生成答案
    | model
    # 第四步:把模型输出解析成字符串
    | StrOutputParser()
)

# ===================== 7. 执行RAG链并流式输出结果 =====================
# 调用RAG链,传入问题,流式逐字输出答案
for chunk in rag_chain.stream("数据库表怎么设计的? "):
    # end="|" 用于分隔每个输出片段;flush=True 实时打印
    print(chunk, end="|", flush=True)

上述代码中,RunnablePassthrough 我们之前还没有见过,简单来说,RunnablePassthrough 是一个 “伪” Runnable,它的主要作用是在链(Chain)中透明地传递输入数据,而不做任何修改。

当我们需要将原始输入和另一个处理过程的输出一起传递给下一个步骤时,就需要 RunnablePassthrough。就例如代码中,我们需要将【Query】与【通过检索出来的文档转换的字符串】同时发送给提示词模板。


最终打印结果如下:

|数据库|表|设计|可以|通过|水平|分|表|来|优化|性能|,|特别|是|当|单|表|数据|量|过|大|时|,|可以|减少|锁|争|用|和|提升|查询|效率|。|实现|方式|是|将|状态|字段|独|立|存|储|在|house|_status|表|中|,|并|通过|house|_|id|进行|关联|查询|。|使用|My|SQL|和|My|Batis|进行|数据|存|储|与|访问|,|同时|可以|利用|Redis|缓存|来|进一步|提高|性|能|。|||

上面讲过检索器不提供任何流式处理。但这里可以使用流式输出。我们可以把像 RAG 这样的链的执行过程想象成两个阶段:

  • 阶段一:准备阶段(同步、阻塞):接收输入、检索文档、构建提示
  • 阶段二:生成阶段(可流式):调用 LLM

流式输出的是最终答案,而非中间过程。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐