前言与学习路线

什么是向量数据库?

向量数据库是专门为存储、索引和查询高维向量数据而设计的数据库。与传统的关系型数据库不同,向量数据库专注于相似性搜索,能够在毫秒级时间内从数十亿条向量中找到最相似的结果。

核心应用场景

  • 语义搜索和问答系统
  • 推荐系统
  • 图像/视频/音频检索
  • 异常检测
  • 生物信息学

为什么选择 Milvus?

Milvus 是 LF AI & Data Foundation 旗下的开源向量数据库,具有以下优势:

特性 说明
高性能 支持十亿级向量的毫秒级检索
高可用 分布式架构,支持水平扩展
丰富的索引 支持 FLAT、IVF、HNSW、ANNOY、DISKANN 等多种索引
多云支持 支持本地部署、Docker、Kubernetes、云服务
多语言 SDK Python、Java、Go、Node.js、RESTful API
企业级特性 多租户、TTL、备份恢复、监控告警

Milvus 2.x 架构升级

Milvus 2.3 新特性

  • GPU 支持,查询性能提升 3-10 倍
  • Arm64 架构支持
  • QueryNode V2 重构,性能和稳定性大幅提升
  • IndexCoord 和 DataCoord 合并,简化部署
  • Upsert 功能支持
  • Range Search 范围搜索
  • Count 接口,实时统计数据行数
  • Cosine 相似度原生支持
  • 查询返回原始向量
  • ScaNN 索引支持

Milvus 2.4 新特性

  • 稀疏向量支持(SPARSE_FLOAT_VECTOR)
  • 多向量混合检索(Hybrid Search)
  • 倒排索引和模糊查询
  • 分组检索(Grouping Search)
  • Float16 和 BFloat16 数据类型支持
  • 动态字段增强

学习路线图

入门阶段                  进阶阶段                  高级阶段
┌───────────────┐        ┌───────────────┐        ┌───────────────┐
│ 1. 基础概念   │        │ 1. 索引原理   │        │ 1. 架构设计   │
│ 2. 环境搭建   │───────▶│ 2. 性能调优   │───────▶│ 2. 源码解读   │
│ 3. Python SDK │        │ 3. 高级特性   │        │ 3. 二次开发   │
│ 4. CRUD操作   │        │ 4. 故障排查   │        │ 4. 贡献社区   │
└───────────────┘        └───────────────┘        └───────────────┘

第1章 Milvus基础概念

1.1 Milvus 整体架构

Milvus 采用共享存储的分布式架构,分为三个层次:

┌─────────────────────────────────────────────────────────┐
│                    接入层 (Access Layer)                 │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐│
│  │  Proxy   │  │  Proxy   │  │  Proxy   │  │  Proxy   ││
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘│
└─────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────┐
│                  协调服务层 (Coord Layer)                │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐  │
│  │ Root Coord  │  │ Query Coord │  │ Data/Index Coord│  │
│  └─────────────┘  └─────────────┘  └─────────────────┘  │
└─────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────┐
│                  工作节点层 (Worker Layer)               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────┐  │
│  │ Query Node  │  │  Data Node  │  │  Index Node     │  │
│  └─────────────┘  └─────────────┘  └─────────────────┘  │
└─────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────┐
│                    存储层 (Storage Layer)                │
│  ┌─────────┐     ┌──────────────┐     ┌──────────────┐  │
│  │  Meta   │     │  Message     │     │   Object     │  │
│  │  Store  │     │  Broker      │     │   Storage    │  │
│  │ (etcd)  │     │ (Pulsar/Kafka)│    │  (S3/MinIO)  │  │
│  └─────────┘     └──────────────┘     └──────────────┘  │
└─────────────────────────────────────────────────────────┘

1.2 核心组件详解

1.2.1 Proxy(代理节点)

功能

  • 客户端请求的统一入口
  • 请求验证和解析
  • DDL 请求转发给 Root Coord
  • DML 请求转发给 Data Node
  • 查询请求转发给 Query Coord
  • 结果合并和返回

特性

  • 无状态设计,可水平扩展
  • 支持负载均衡
  • 连接池管理
1.2.2 Root Coord(根协调器)

功能

  • 元数据管理(Collection、Partition、Schema)
  • TSO(时间戳分配器),保证全局有序
  • Collection/Partition 的创建和删除
  • 负载均衡管理
1.2.3 Query Coord(查询协调器)

功能

  • Query Node 的生命周期管理
  • 分片(Segment)的加载和卸载调度
  • 查询请求的路由和分发
  • 全局查询结果合并
1.2.4 Data Coord(数据协调器)

功能

  • Data Node 的生命周期管理
  • Segment 的创建、flush、compaction
  • 数据写入的路由调度
  • 与 Index Coord 协作构建索引
1.2.5 Index Coord(索引协调器)

功能

  • Index Node 的生命周期管理
  • 索引构建任务的调度
  • 索引元数据管理

注意:Milvus 2.3 已将 Data Coord 和 Index Coord 合并,简化部署。

1.2.6 Query Node(查询节点)

功能

  • 执行向量相似度搜索
  • 执行标量过滤和混合查询
  • 增量索引(Growing Segment)查询
  • 历史索引(Sealed Segment)查询

优化

  • QueryNode V2(Milvus 2.3):无状态设计,消息队列优化,稳定性大幅提升
1.2.7 Data Node(数据节点)

功能

  • 消费消息队列中的写入日志
  • 构建 Growing Segment
  • 执行 Flush 操作,将数据持久化到对象存储
  • 执行 Compaction 操作,合并小 Segment
1.2.8 Index Node(索引节点)

功能

  • 接收索引构建任务
  • 执行向量索引构建(HNSW、IVF 等)
  • 将构建好的索引保存到对象存储

1.3 数据模型

1.3.1 核心概念
概念 说明 类比关系型数据库
Collection 最顶层的数据容器,包含多个字段 表(Table)
Partition Collection 的子集,用于数据隔离 分区表
Field 数据字段,支持向量和标量类型 列(Column)
Entity 一条完整的数据记录 行(Row)
Segment Milvus 内部的数据分片单位 数据文件
Shard 数据分片,用于分布式写入 分片(Shard)
1.3.2 支持的数据类型

向量类型

FLOAT_VECTOR      # 单精度浮点向量(最常用)
FLOAT16_VECTOR    # 半精度浮点向量(Milvus 2.4+)
BFLOAT16_VECTOR   # BFloat16 向量(Milvus 2.4+)
SPARSE_FLOAT_VECTOR  # 稀疏向量(Milvus 2.4+)
BINARY_VECTOR     # 二值向量

标量类型

BOOL              # 布尔值
INT8              # 8位整数
INT16             # 16位整数
INT32             # 32位整数
INT64             # 64位整数(主键常用)
FLOAT             # 单精度浮点数
DOUBLE            # 双精度浮点数
VARCHAR           # 变长字符串
JSON              # JSON 类型
ARRAY             # 数组类型(Milvus 2.2+)
1.3.3 Schema 定义示例
from pymilvus import FieldSchema, CollectionSchema, DataType

# 定义字段
fields = [
    # 主键字段(必须)
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    # 向量字段(必须)
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    # 标量字段
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="category", dtype=DataType.INT32),
    FieldSchema(name="create_time", dtype=DataType.INT64),
]

# 创建 Schema,开启动态字段支持
schema = CollectionSchema(
    fields=fields,
    description="文档知识库",
    enable_dynamic_field=True  # Milvus 2.3+,支持非预定义字段
)

1.4 一致性级别

Milvus 支持四种一致性级别,适用于不同场景:

级别 说明 适用场景 性能影响
Strong 强一致,确保读取最新写入的数据 金融、对账 最慢
Bounded bounded staleness,允许一定时间窗口的不一致 大多数业务场景 中等
Session 会话级一致,同一个客户端会话内读写一致 单用户操作
Eventually 最终一致,只保证数据最终同步 日志、离线分析 最快

默认值:Milvus 默认使用 Bounded 一致性级别,在性能和一致性之间取得平衡。


第2章 环境搭建与安装

2.1 Docker Compose 安装(推荐)

2.1.1 单机版(Standalone)

适合开发测试和小规模生产环境。

步骤 1:下载 docker-compose.yml

wget https://github.com/milvus-io/milvus/releases/download/v2.3.15/milvus-standalone-docker-compose.yml -O docker-compose.yml

步骤 2:启动服务

docker-compose up -d

步骤 3:验证安装

docker-compose ps
# 应该看到 3 个容器:milvus-standalone, etcd, minio

docker-compose.yml 内容说明

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.15
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"  # gRPC 端口
      - "9091:9091"    # RESTful API 端口
    depends_on:
      - "etcd"
      - "minio"
2.1.2 GPU 版本(Milvus 2.3+)
wget https://github.com/milvus-io/milvus/releases/download/v2.3.15/milvus-standalone-gpu-docker-compose.yml -O docker-compose.yml
docker-compose up -d

2.2 Milvus Lite(Python 嵌入式)

Milvus Lite 是轻量级嵌入式版本,无需 Docker,直接作为 Python 库运行。

# 安装
pip install pymilvus

# 使用
from pymilvus import MilvusClient

# 数据存储在本地文件
client = MilvusClient(uri="./milvus_demo.db")

适用场景

  • 本地开发和调试
  • 小型应用
  • Jupyter Notebook 演示

2.3 二进制安装

适合需要自定义配置的生产环境。

# 下载二进制包
wget https://github.com/milvus-io/milvus/releases/download/v2.3.15/milvus_2.3.15_Linux_x86_64.tar.gz
tar -xzf milvus_2.3.15_Linux_x86_64.tar.gz

# 配置文件
cp configs/milvus.yaml default.yaml

# 启动
./milvus run standalone

2.4 Kubernetes 集群部署

2.4.1 使用 Helm Chart
# 添加 Milvus Helm 仓库
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update

# 安装单机版
helm install my-milvus milvus/milvus --set cluster.enabled=false

# 安装集群版
helm install my-milvus milvus/milvus --set cluster.enabled=true

# 查看 Pods
kubectl get pods
2.4.2 关键配置参数
# values.yaml 关键配置
cluster:
  enabled: true
  replica:
    rootCoord: 1
    dataCoord: 1
    queryCoord: 1
    indexCoord: 1
    dataNode: 2
    queryNode: 2
    indexNode: 1
    proxy: 2

# 资源限制
resources:
  limits:
    cpu: "4"
    memory: "8Gi"
  requests:
    cpu: "2"
    memory: "4Gi"

# 存储配置
persistence:
  enabled: true
  storageClass: "standard"
  accessMode: ReadWriteOnce
  size: 50Gi

2.5 云服务选项

服务商 产品名称 特点
Zilliz Cloud Zilliz Cloud Milvus 官方托管服务,全托管、自动扩缩容
阿里云 向量检索服务 与阿里云生态深度集成
腾讯云 向量数据库 支持 Milvus 引擎
AWS Amazon OpenSearch Serverless 内置向量检索

Zilliz Cloud 使用示例

from pymilvus import connections

connections.connect(
    alias="default",
    uri="https://your-cluster-endpoint",
    token="api-key"
)

2.6 端口说明

端口 协议 用途
19530 gRPC 主要客户端通信端口
9091 HTTP RESTful API 端口
2379 gRPC etcd 客户端端口
9000 HTTP MinIO/S3 对象存储端口
6650 TCP Pulsar 消息队列端口

第3章 快速上手(Python SDK)

3.1 安装 PyMilvus

# 安装最新版本
pip install pymilvus

# 安装指定版本(推荐与服务端版本匹配)
pip install pymilvus==2.3.7

# 验证安装
python -c "from pymilvus import __version__; print(__version__)"

3.2 建立连接

3.2.1 传统连接方式
from pymilvus import connections, utility

# 连接本地 Milvus
connections.connect(
    alias="default",
    host="localhost",
    port="19530"
)

# 连接带认证的 Milvus
connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    user="root",
    password="Milvus"
)

# 检查连接状态
print(utility.get_server_version())

# 断开连接
connections.disconnect("default")
3.2.2 MilvusClient 方式(推荐,2.3+)

MilvusClient 是简化的封装,更易于使用:

from pymilvus import MilvusClient

# 本地文件模式(Milvus Lite)
client = MilvusClient(uri="./milvus_demo.db")

# 连接远程服务
client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"
)

# Zilliz Cloud
client = MilvusClient(
    uri="https://your-cluster.zillizcloud.com:19530",
    token="your-api-key"
)

3.3 创建集合(Collection)

3.3.1 快速创建(MilvusClient)
collection_name = "quick_start"

# 删除已存在的集合
if client.has_collection(collection_name):
    client.drop_collection(collection_name)

# 快速创建(自动生成 Schema)
client.create_collection(
    collection_name=collection_name,
    dimension=768,  # 向量维度
    metric_type="COSINE",  # 相似度度量
    auto_id=True,  # 自动生成主键
    enable_dynamic_field=True  # 支持动态字段
)

print(f"集合 {collection_name} 创建成功")
3.3.2 自定义 Schema 创建
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType

# 定义字段
fields = [
    FieldSchema(
        name="book_id",
        dtype=DataType.INT64,
        is_primary=True,
        auto_id=False,
        description="图书ID"
    ),
    FieldSchema(
        name="word_count",
        dtype=DataType.INT64,
        description="字数"
    ),
    FieldSchema(
        name="book_intro",
        dtype=DataType.FLOAT_VECTOR,
        dim=768,
        description="图书简介向量"
    ),
    FieldSchema(
        name="title",
        dtype=DataType.VARCHAR,
        max_length=256,
        description="书名"
    )
]

# 创建 Schema
schema = CollectionSchema(
    fields=fields,
    description="图书知识库",
    enable_dynamic_field=True
)

# 创建集合
collection = Collection(
    name="book_collection",
    schema=schema,
    using="default"  # 使用的连接别名
)

3.4 插入数据

3.4.1 MilvusClient 方式
import numpy as np

# 生成模拟数据
data = []
for i in range(1000):
    data.append({
        "book_id": i,
        "word_count": i * 100 + 5000,
        "book_intro": np.random.rand(768).tolist(),
        "title": f"Book Title #{i}",
        "author": f"Author {i % 10}",  # 动态字段
        "publish_year": 2020 + (i % 5)  # 动态字段
    })

# 批量插入
res = client.insert(
    collection_name="quick_start",
    data=data
)

print(f"插入成功: {res['insert_count']} 条")
print(f"插入耗时: {res['cost']} ms")
3.4.2 传统 Collection 方式
import random

# 按列组织数据(Milvus 列存格式)
entities = [
    [i for i in range(1000)],  # book_id
    [random.randint(1000, 10000) for _ in range(1000)],  # word_count
    [[random.random() for _ in range(768)] for _ in range(1000)],  # book_intro
    [f"Book {i}" for i in range(1000)]  # title
]

# 插入数据
insert_result = collection.insert(entities)
print(f"插入了 {insert_result.insert_count} 条数据")

# 手动 flush(确保数据可查询)
collection.flush()

3.5 创建索引

# 定义索引参数
index_params = {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {
        "M": 16,  # 每层最大邻居数
        "efConstruction": 256  # 构建时搜索邻居的范围
    }
}

# 创建索引
client.create_index(
    collection_name="quick_start",
    index_params=index_params
)

# 查看索引信息
index_info = client.describe_index("quick_start")
print(index_info)

3.6 向量相似度搜索

3.6.1 基础搜索
# 准备查询向量
query_vector = np.random.rand(768).tolist()

# 加载集合到内存(必须)
client.load_collection("quick_start")

# 执行搜索
search_result = client.search(
    collection_name="quick_start",
    data=[query_vector],  # 查询向量列表
    limit=10,  # 返回 Top 10
    output_fields=["book_id", "title", "word_count"],  # 返回的字段
    search_params={
        "metric_type": "COSINE",
        "params": {"ef": 128}  # HNSW 搜索参数
    }
)

# 解析结果
print("搜索结果:")
for hits in search_result:
    for hit in hits:
        print(f"ID: {hit['id']}, Score: {hit['distance']:.4f}, "
              f"Title: {hit['entity']['title']}")
3.6.2 带标量过滤的混合搜索
# 只搜索 2022 年以后出版,且字数大于 5000 的图书
search_result = client.search(
    collection_name="quick_start",
    data=[query_vector],
    limit=10,
    filter="publish_year >= 2022 and word_count > 5000",
    output_fields=["book_id", "title", "word_count", "publish_year"]
)

print("过滤后的搜索结果:")
for hits in search_result:
    for hit in hits:
        print(f"ID: {hit['id']}, Score: {hit['distance']:.4f}, "
              f"Year: {hit['entity']['publish_year']}, "
              f"Word Count: {hit['entity']['word_count']}")

3.7 标量查询(Query)

# 查询特定条件的实体
query_result = client.query(
    collection_name="quick_start",
    filter="word_count > 8000 and publish_year == 2023",
    output_fields=["book_id", "title", "word_count"],
    limit=100
)

print(f"查询到 {len(query_result)} 条结果")
for item in query_result:
    print(f"ID: {item['book_id']}, Title: {item['title']}, "
          f"Word Count: {item['word_count']}")

3.8 Upsert 操作(Milvus 2.3+)

Upsert = Update + Insert,如果主键存在则更新,不存在则插入。

# 准备 upsert 数据
upsert_data = [
    {
        "book_id": 999,  # 已存在的 ID
        "word_count": 99999,  # 更新字段
        "book_intro": np.random.rand(768).tolist(),
        "title": "Updated Book Title"
    },
    {
        "book_id": 1000,  # 新 ID
        "word_count": 10000,
        "book_intro": np.random.rand(768).tolist(),
        "title": "New Book"
    }
]

# 执行 Upsert
res = client.upsert(
    collection_name="quick_start",
    data=upsert_data
)

print(f"Upsert 完成,影响 {res['upsert_count']} 条数据")

3.9 删除数据

# 按表达式删除
client.delete(
    collection_name="quick_start",
    filter="book_id < 100"
)

# 按主键删除
client.delete(
    collection_name="quick_start",
    pks=[100, 101, 102]
)

3.10 获取实体(Get)

# 按主键获取实体
entities = client.get(
    collection_name="quick_start",
    ids=[500, 501, 502],
    output_fields=["book_id", "title", "word_count"]
)

print(f"获取到 {len(entities)} 条实体")
for entity in entities:
    print(entity)

第4章 核心功能详解

4.1 集合管理

4.1.1 查看和列出集合
# 列出所有集合
collections = client.list_collections()
print("所有集合:", collections)

# 检查集合是否存在
exists = client.has_collection("quick_start")
print(f"集合是否存在: {exists}")

# 获取集合详情
desc = client.describe_collection("quick_start")
print(f"集合详情: {desc}")

# 获取集合统计信息
stats = client.get_collection_stats("quick_start")
print(f"集合统计: {stats}")
4.1.2 加载和释放集合
# 加载到内存(查询前必须)
client.load_collection("quick_start")

# 异步加载
client.load_collection("quick_start", _async=True)

# 加载指定分区
client.load_partitions("quick_start", ["partition_1"])

# 从内存释放(节省内存)
client.release_collection("quick_start")
4.1.3 重命名和删除
# 重命名集合(Milvus 2.3+)
client.rename_collection("old_name", "new_name")

# 删除集合
client.drop_collection("collection_to_delete")

4.2 索引类型对比

Milvus 支持多种索引类型,各有优劣:

索引类型 查询速度 召回率 内存占用 构建时间 适用场景
FLAT ⭐⭐ ⭐⭐⭐⭐⭐ 100% ⭐⭐⭐⭐⭐ 极高 ⭐⭐⭐⭐⭐ 最快 小数据集、需要 100% 召回率
IVF_FLAT ⭐⭐⭐⭐ ⭐⭐⭐⭐ 高 ⭐⭐⭐⭐ 高 ⭐⭐⭐⭐ 快 中大规模数据、平衡性能
IVF_PQ ⭐⭐⭐⭐⭐ 极快 ⭐⭐⭐ 中 ⭐⭐ 极低 ⭐⭐⭐ 中 超大规模数据、内存受限
IVF_SQ8 ⭐⭐⭐⭐⭐ 极快 ⭐⭐⭐⭐ 高 ⭐⭐⭐ 低 ⭐⭐⭐ 中 内存受限、可接受精度损失
HNSW ⭐⭐⭐⭐⭐ 极快 ⭐⭐⭐⭐⭐ 极高 ⭐⭐⭐⭐⭐ 极高 ⭐⭐ 慢 高精度、低延迟场景
ANNOY ⭐⭐⭐ 中 ⭐⭐⭐ 中 ⭐⭐⭐⭐ 高 ⭐⭐⭐⭐ 快 中等规模、简单易用
DISKANN ⭐⭐⭐ 中 ⭐⭐⭐⭐ 高 ⭐ 极低 ⭐⭐ 慢 超大规模、磁盘存储
SCANN ⭐⭐⭐⭐⭐ 极快 ⭐⭐⭐⭐ 高 ⭐⭐⭐ 中 ⭐⭐⭐ 中 Milvus 2.3+,快速检索

详细对比表

索引 index_type metric_type 主要参数 数据量建议
FLAT “FLAT” L2/IP/COSINE < 100万
IVF_FLAT “IVF_FLAT” L2/IP/COSINE nlist, nprobe 100万-1亿
IVF_PQ “IVF_PQ” L2/IP nlist, m, nbits > 1亿
IVF_SQ8 “IVF_SQ8” L2 nlist > 1亿
HNSW “HNSW” L2/IP/COSINE M, efConstruction, ef 100万-10亿
ANNOY “ANNOY” L2/IP n_trees 10万-1000万
DISKANN “DISKANN” L2 search_list > 10亿

4.3 索引创建示例

4.3.1 HNSW 索引(最常用,高精度)
index_params = {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {
        "M": 16,           # 每层邻居数,8-64,越大精度越高但内存越大
        "efConstruction": 256  # 构建时搜索范围,100-500,越大构建越慢精度越高
    }
}

HNSW 参数调优

  • M:影响内存占用和构建时间,通常 16(平衡)或 32(高精度)
  • efConstruction:影响构建时间和索引质量,通常 200-500
  • ef(搜索时):影响查询速度和召回率,>= limit,通常 100-500
4.3.2 IVF_FLAT 索引(快速)
index_params = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {
        "nlist": 1024  # 聚类中心数量,sqrt(N) ~ 4*sqrt(N)
    }
}

# 搜索时参数
search_params = {
    "metric_type": "L2",
    "params": {
        "nprobe": 32  # 搜索的聚类数量,1-nlist,越大越准越慢
    }
}

IVF 参数调优

  • nlist:建议 4 * sqrt(N),N 为向量数量
  • nprobe:建议从 10 开始,根据召回率调整
4.3.3 IVF_PQ 索引(极致压缩)
index_params = {
    "index_type": "IVF_PQ",
    "metric_type": "L2",
    "params": {
        "nlist": 1024,
        "m": 16,  # 子向量数量,必须能整除维度
        "nbits": 8  # 每个子向量的比特数,通常 8
    }
}

4.4 标量过滤(Filter)

4.4.1 支持的运算符
# 比较运算符
"field > value"
"field >= value"
"field < value"
"field <= value"
"field == value"
"field != value"

# 逻辑运算符
"cond1 and cond2"
"cond1 or cond2"
"not cond"

# 集合运算符
"field in [value1, value2, value3]"
"field not in [value1, value2]"

# 字符串匹配(Milvus 2.4+)
"field like 'prefix%'"
"field like '%suffix'"
"field like '%contains%'"
4.4.2 过滤示例
# 复杂过滤示例
filter_expr = """
    (category == 1 or category == 2) and 
    view_count > 1000 and 
    create_time >= 1700000000 and 
    title like 'Tech%'
"""

results = client.search(
    collection_name="articles",
    data=[query_vector],
    limit=20,
    filter=filter_expr,
    output_fields=["title", "view_count", "category"]
)
4.4.3 JSON 字段过滤(动态字段)
# 动态字段中的 JSON 过滤(Milvus 2.4+)
filter_expr = "meta['tags'] contains 'python' and meta['rating'] > 4.5"

results = client.search(
    collection_name="articles",
    data=[query_vector],
    limit=10,
    filter=filter_expr
)

4.5 分页查询

4.5.1 Limit + Offset 方式
# 第一页
page1 = client.query(
    collection_name="quick_start",
    filter="word_count > 5000",
    output_fields=["book_id", "title"],
    limit=10,
    offset=0
)

# 第二页
page2 = client.query(
    collection_name="quick_start",
    filter="word_count > 5000",
    output_fields=["book_id", "title"],
    limit=10,
    offset=10
)
4.5.2 游标方式(推荐,大数据集)
# 使用游标分页
all_results = []
cursor = None

while True:
    batch = client.query(
        collection_name="quick_start",
        filter="word_count > 5000",
        output_fields=["book_id", "title"],
        limit=100,
        cursor=cursor
    )
    
    if not batch:
        break
    
    all_results.extend(batch)
    # 获取下一页游标
    cursor = batch[-1]['book_id']

print(f"总共获取 {len(all_results)} 条记录")

4.6 批量操作优化

4.6.1 批量插入最佳实践
# 建议单次批量大小:1000-5000 条
BATCH_SIZE = 2000

# 模拟大量数据
total_data = []
for i in range(100000):
    total_data.append({
        "id": i,
        "vector": np.random.rand(768).tolist(),
        "text": f"Document {i}"
    })

# 分批插入
for i in range(0, len(total_data), BATCH_SIZE):
    batch = total_data[i:i + BATCH_SIZE]
    res = client.insert("large_collection", batch)
    print(f"已插入 {i + len(batch)}/{len(total_data)} 条")

# 最后手动 flush
client.flush("large_collection")
4.6.2 Bulk Insert(导入工具)

对于 TB 级数据,建议使用 Milvus Bulk Insert 工具:

# 使用 Milvus 官方 bulk load 工具
milvus_cli tools import \
    --path ./data.json \
    --collection large_collection \
    --batch-size 10000

4.7 Count 操作(Milvus 2.3+)

# 实时统计集合中的实体数量(无需 flush)
count = client.query(
    collection_name="quick_start",
    filter="",
    output_fields=["count(*)"]
)
print(f"集合总数据量: {count[0]['count(*)']}")

# 带条件统计
count_filtered = client.query(
    collection_name="quick_start",
    filter="word_count > 5000",
    output_fields=["count(*)"]
)
print(f"满足条件的数据量: {count_filtered[0]['count(*)']}")

第5章 索引原理与性能优化

5.1 向量索引核心原理

向量索引的核心目标是:在可接受的精度损失下,将 O(N) 的暴力搜索降低到 O(log N) 或更低

5.1.1 近似最近邻搜索(ANN)
精确最近邻(NN):100% 准确率,但搜索速度 O(N)
         ↓
近似最近邻(ANN):90%-99% 准确率,搜索速度 O(log N)

ANN 的核心思想

  • 空间划分:将向量空间划分为多个子空间
  • 量化:用更少的比特表示向量
  • 图结构:构建邻居关系图进行导航

5.2 HNSW 索引原理解析

HNSW(Hierarchical Navigable Small World)是目前工业界最流行的向量索引算法,以高召回率和快速查询著称。

5.2.1 核心思想
Layer 2 (顶层):  A─────B           稀疏连接,用于快速导航
                   │     │
Layer 1:         A─────B─────C     中等连接密度
                   │     │     │
Layer 0 (底层):  A──D──B──E──C    稠密连接,存储所有点

多层图结构

  • 顶层(Layer N):节点最少,连接最稀疏,用于快速定位大致区域
  • 中间层(Layer N-1 … 1):中等节点密度
  • 底层(Layer 0):包含所有节点,连接最稠密,用于精确搜索
5.2.2 搜索过程
1. 从顶层的一个入口点开始
2. 在当前层进行贪心搜索,找到最近的节点
3. 下降到下一层,以上一层的最近节点为起点
4. 重复步骤 2-3,直到底层
5. 在底层扩展搜索,返回 Top K 结果
5.2.3 参数深度解析
参数 作用 推荐值 影响
M 每层每个节点的最大邻居数 8-64 越大:精度↑、内存↑、构建慢
efConstruction 构建时搜索邻居的候选数量 100-500 越大:精度↑、构建慢
ef(搜索时) 查询时的候选数量 >= limit,100-500 越大:精度↑、查询慢

经验公式

召回率要求 90%: ef ≈ 100
召回率要求 95%: ef ≈ 200
召回率要求 99%: ef ≈ 500

5.3 IVF 索引原理解析

IVF(Inverted File)基于聚类的倒排索引。

5.3.1 构建过程
1. 使用 K-Means 将所有向量聚类为 nlist 个中心
   ┌─────────────────────────────────────────┐
   │    •     •     •     •     •     •      │ 向量空间
   │      •    ○    •    ○    •    ○        │ ○ = 聚类中心
   │    •     •     •     •     •     •      │
   └─────────────────────────────────────────┘

2. 为每个聚类中心维护一个倒排列表
   Cluster 0: [vec1, vec5, vec9, ...]
   Cluster 1: [vec2, vec3, vec7, ...]
   ...
   Cluster N: [vec4, vec6, vec8, ...]
5.3.2 搜索过程
1. 计算查询向量与所有 nlist 个聚类中心的距离
2. 选择距离最近的 nprobe 个聚类
3. 只在这 nprobe 个聚类内进行精确搜索
5.3.3 参数深度解析
参数 推荐公式 说明
nlist 4 * sqrt(N) ~ 16 * sqrt(N) 聚类中心数量
nprobe max(10, nlist / 100) 搜索的聚类数量

示例

  • N = 1,000,000 向量
  • nlist = 4 * sqrt(1,000,000) = 4000
  • nprobe = 40 (nlist / 100)

5.4 PQ 量化原理

PQ(Product Quantization)乘积量化是一种极致压缩技术。

5.4.1 核心思想
原始向量 (128维 × 4字节) = 512字节
     ↓ 切分为 m 个子向量
子向量0 (16维) → 聚类 → 码本索引 (1字节)
子向量1 (16维) → 聚类 → 码本索引 (1字节)
...
子向量m-1 (16维) → 聚类 → 码本索引 (1字节)
     ↓
压缩后大小 = m 字节 (m=16 时,压缩 32 倍!)
5.4.2 距离计算优化

PQ 使用查表法快速计算近似距离:

预计算:查询子向量 vs 码本中所有中心的距离
查表:通过码本索引快速获取距离值
求和:所有子向量的距离之和即为近似距离

5.5 Milvus 性能基准测试

5.5.1 典型性能数据
数据集 向量数 维度 索引 QPS P99 延迟 召回率
SIFT1M 100万 128 HNSW 10,000+ < 10ms 99%
SIFT1M 100万 128 IVF_PQ 20,000+ < 5ms 95%
GIST1M 100万 960 HNSW 2,000+ < 50ms 98%
Deep1B 10亿 96 IVF_PQ 10,000+ < 20ms 95%
5.5.2 性能测试代码
import time
import numpy as np

def benchmark_search(client, collection_name, nq=1000, limit=10):
    """性能测试工具"""
    
    # 生成查询向量
    query_vectors = np.random.rand(nq, 768).tolist()
    
    # 预热
    client.search(collection_name, [query_vectors[0]], limit=limit)
    
    # 开始测试
    start_time = time.time()
    
    for vec in query_vectors:
        client.search(
            collection_name,
            [vec],
            limit=limit,
            search_params={"params": {"ef": 128}}
        )
    
    total_time = time.time() - start_time
    qps = nq / total_time
    avg_latency = total_time * 1000 / nq
    
    print(f"QPS: {qps:.2f}")
    print(f"Average Latency: {avg_latency:.2f} ms")
    print(f"Total Time: {total_time:.2f} s")
    
    return qps, avg_latency

5.6 性能优化最佳实践

5.6.1 硬件配置建议
组件 最低配置 推荐配置 高性能配置
CPU 8 核 16 核 32+ 核
内存 16 GB 64 GB 256+ GB
磁盘 SATA SSD NVMe SSD 高性能 NVMe
网络 1 Gbps 10 Gbps 25+ Gbps

内存估算公式

索引内存 ≈ 向量数 × (维度 × 4字节 + 索引开销)

HNSW 开销: ~ 8-10 × 向量大小
IVF_FLAT 开销: ~ 1.2 × 向量大小
IVF_PQ 开销: 极低,取决于压缩率

示例:100万 × 768维 × HNSW
内存 ≈ 1,000,000 × (768 × 4) × 10 ≈ 30 GB
5.6.2 索引选择策略

决策树

数据规模?
├─ < 100万 → 选择 FLAT(100%召回)或 HNSW(高性能)
├─ 100万 - 1亿 → 选择 HNSW(首选)或 IVF_FLAT
└─ > 1亿 → 
    内存充足?
    ├─ 是 → HNSW
    └─ 否 → 
        能接受精度损失?
        ├─ 是 → IVF_PQ / IVF_SQ8
        └─ 否 → DISKANN(基于磁盘)
5.6.3 查询优化

1. 批量查询

# 批量查询比单条查询快 2-5 倍
results = client.search(
    collection_name="my_collection",
    data=[vec1, vec2, vec3, ..., vec100],  # 一次查 100 个向量
    limit=10
)

2. 合理设置一致性级别

# 对一致性要求不高的场景,使用最终一致可提升 30% 性能
search_result = client.search(
    collection_name="my_collection",
    data=[query_vector],
    limit=10,
    consistency_level="Eventually"  # 最快
)

3. 利用分区进行数据隔离

# 将数据按时间或类别分区,查询时只扫描相关分区
results = client.search(
    collection_name="logs",
    data=[query_vector],
    limit=10,
    partition_names=["2024_05"]  # 只查 5 月份的数据
)
5.6.4 写入优化

1. 批量写入

  • 建议单次写入 1000-5000 条
  • 避免逐条写入(性能下降 10-100 倍)

2. 控制 flush 频率

  • 不要频繁手动 flush
  • 让 Milvus 自动管理(默认 1 秒)

3. 索引构建时机

  • 先写入所有数据,再创建索引
  • 增量写入会触发增量索引,性能较低

5.7 常见性能问题排查

问题 可能原因 解决方案
查询慢 ef 太大,nprobe 太大,内存不足 降低 ef/nprobe,增加内存,使用更快的索引
QPS 低 并发不足,CPU 核数不够 增加并发,升级 CPU,批量查询
写入慢 批量太小,数据节点不足,网络慢 增大批量,扩容 DataNode,检查网络
索引构建慢 M/efConstruction 太大,CPU 不足 降低参数,升级 CPU,使用更快磁盘
OOM 集合太多,索引太大,内存不足 释放不常用集合,使用 PQ/SQ8 压缩索引

第6章 高级特性

6.1 分区(Partition)

分区是 Collection 的子集,用于数据隔离和查询优化。

6.1.1 创建和使用分区
# 创建分区
client.create_partition(
    collection_name="article_collection",
    partition_name="2024_01"
)

# 列出所有分区
partitions = client.list_partitions("article_collection")
print(partitions)

# 插入到指定分区
client.insert(
    collection_name="article_collection",
    data=data_batch,
    partition_name="2024_01"
)

# 只查询指定分区(大幅提升性能)
results = client.search(
    collection_name="article_collection",
    data=[query_vector],
    limit=10,
    partition_names=["2024_01", "2024_02"]  # 只查这两个月
)
6.1.2 分区最佳实践

适用场景

  • 时间序列数据:按年/月/日分区
  • 多租户数据:按租户 ID 分区
  • 类别数据:按业务类别分区

建议

  • 分区数量控制在 4096 以内
  • 每个分区数据量建议 100万-1亿 条
  • 查询时尽量指定分区,避免全表扫描

6.2 TTL(Time-To-Live)

Milvus 2.3+ 支持数据自动过期。

# 创建集合时设置 TTL
client.create_collection(
    collection_name="temp_logs",
    dimension=512,
    ttl=3600  # 1 小时后自动过期(秒)
)

# 修改现有集合的 TTL
client.alter_collection(
    collection_name="temp_logs",
    ttl=86400  # 改为 1 天
)

使用场景

  • 临时日志数据
  • 会话缓存
  • 时效性推荐数据

6.3 多租户(Multi-Tenancy)

6.3.1 三种多租户方案
方案 隔离级别 性能 资源利用率 适用场景
数据库隔离 最高 大型企业,严格隔离
Collection 隔离 中型应用
分区隔离 大规模多租户 SaaS
6.3.2 分区隔离实现
class MultiTenantMilvus:
    def __init__(self, client, base_collection):
        self.client = client
        self.base_collection = base_collection
    
    def _get_partition_name(self, tenant_id):
        return f"tenant_{tenant_id}"
    
    def insert_for_tenant(self, tenant_id, data):
        partition = self._get_partition_name(tenant_id)
        if not self.client.has_partition(self.base_collection, partition):
            self.client.create_partition(self.base_collection, partition)
        
        return self.client.insert(
            self.base_collection,
            data,
            partition_name=partition
        )
    
    def search_for_tenant(self, tenant_id, query_vector, limit=10):
        partition = self._get_partition_name(tenant_id)
        return self.client.search(
            self.base_collection,
            [query_vector],
            limit=limit,
            partition_names=[partition]
        )

6.4 动态字段(Dynamic Field)

Milvus 2.3+ 支持动态字段,无需预定义 Schema 即可插入任意字段。

# 创建集合时开启动态字段
client.create_collection(
    collection_name="flexible_data",
    dimension=768,
    enable_dynamic_field=True  # 关键参数
)

# 插入包含任意字段的数据
data = [
    {
        "id": 1,
        "vector": [...],
        "name": "Alice",  # 动态字段
        "age": 30,        # 动态字段
        "tags": ["a", "b"],  # 动态字段
        "extra": {"k": "v"}  # 嵌套 JSON
    }
]

client.insert("flexible_data", data)

# 动态字段也可以用于过滤
results = client.search(
    collection_name="flexible_data",
    data=[query_vector],
    limit=10,
    filter="age > 25 and tags contains 'a'",  # 使用动态字段过滤
    output_fields=["name", "age", "tags"]  # 返回动态字段
)

6.5 数据备份与恢复

6.5.1 使用 Milvus Backup 工具
# 安装备份工具
pip install milvus-backup

# 创建备份
milvus-backup create \
    --name backup_20240519 \
    --collections collection1,collection2 \
    --path ./backups

# 恢复备份
milvus-backup restore \
    --name backup_20240519 \
    --path ./backups

# 列出备份
milvus-backup list
6.5.2 导出和导入数据
# 查询并导出所有数据
def export_collection(client, collection_name, batch_size=1000):
    """导出集合数据"""
    all_data = []
    offset = 0
    
    while True:
        batch = client.query(
            collection_name=collection_name,
            filter="",
            output_fields=["*"],
            limit=batch_size,
            offset=offset
        )
        
        if not batch:
            break
        
        all_data.extend(batch)
        offset += len(batch)
    
    return all_data

# 导出到 JSON
import json
data = export_collection(client, "my_collection")
with open("backup.json", "w") as f:
    json.dump(data, f)

6.6 监控与告警

6.6.1 Prometheus 监控指标

Milvus 暴露了丰富的 Prometheus 指标:

# Query 相关指标
milvus_query_latency_ms         # 查询延迟分布
milvus_query_qps                # 查询 QPS
milvus_query_recall_rate        # 召回率(需自行计算)

# 写入相关指标
milvus_insert_latency_ms        # 插入延迟
milvus_insert_throughput        # 写入吞吐量(条/秒)

# 资源相关
milvus_memory_usage_bytes       # 内存使用
milvus_cpu_usage_percent        # CPU 使用率
milvus_segment_count            # Segment 数量
milvus_vector_count             # 向量总数
6.6.2 Grafana 面板配置

可导入 Milvus 官方 Grafana Dashboard:

  • ID: 15380(Milvus 2.x Overview)
  • 包含集群状态、查询性能、写入性能、资源使用等面板
6.6.3 关键告警规则
# Prometheus Alert Rules
groups:
  - name: milvus_alerts
    rules:
      - alert: MilvusHighLatency
        expr: milvus_query_latency_ms{quantile="0.99"} > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Milvus 查询 P99 延迟 > 1s"

      - alert: MilvusHighMemoryUsage
        expr: milvus_memory_usage_bytes / 1024^3 > 0.9 * total_memory_gb
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Milvus 内存使用率 > 90%"

      - alert: MilvusSegmentGrowing
        expr: milvus_segment_count{type="growing"} > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Growing Segment 数量过多,可能存在 Flush 问题"

6.7 增量索引和实时搜索

Milvus 支持实时搜索的核心机制:

写入数据流
    ↓
Growing Segment(增量索引)
    ↓ 达到大小阈值(默认 512MB)
Flush → Sealed Segment
    ↓
后台构建索引
    ↓
Sealed Segment(历史索引)

实时搜索特性

  • 写入后 1 秒内即可搜索到
  • Growing Segment 使用暴力搜索保证精度
  • Sealed Segment 使用 ANN 索引保证性能

第7章 典型应用场景

7.1 RAG 系统构建(检索增强生成)

RAG 是向量数据库最热门的应用场景,结合检索和大语言模型。

7.1.1 基础 RAG 架构
用户问题
    ↓
Embedding 模型 → 查询向量
    ↓
Milvus 向量检索 → Top K 相关文档
    ↓
Prompt 组装:问题 + 上下文
    ↓
大语言模型 → 生成答案
7.1.2 完整实现代码
from pymilvus import MilvusClient
from sentence_transformers import SentenceTransformer
import openai
import numpy as np

class RAGSystem:
    def __init__(
        self,
        milvus_uri="./milvus_rag.db",
        collection_name="knowledge_base",
        embedding_model="BAAI/bge-small-zh-v1.5",
        openai_api_key="your-api-key"
    ):
        # 初始化 Milvus
        self.client = MilvusClient(milvus_uri)
        self.collection_name = collection_name
        
        # 初始化 Embedding 模型
        self.embedding_model = SentenceTransformer(embedding_model)
        self.dim = self.embedding_model.get_sentence_embedding_dimension()
        
        # 初始化 OpenAI
        openai.api_key = openai_api_key
        
        # 初始化集合
        self._init_collection()
    
    def _init_collection(self):
        if not self.client.has_collection(self.collection_name):
            self.client.create_collection(
                collection_name=self.collection_name,
                dimension=self.dim,
                metric_type="COSINE",
                enable_dynamic_field=True
            )
            
            # 创建索引
            self.client.create_index(
                self.collection_name,
                {
                    "index_type": "HNSW",
                    "metric_type": "COSINE",
                    "params": {"M": 16, "efConstruction": 256}
                }
            )
        
        self.client.load_collection(self.collection_name)
    
    def add_documents(self, documents, batch_size=32):
        """添加文档到知识库"""
        data = []
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # 批量生成 Embedding
            embeddings = self.embedding_model.encode(
                [doc["content"] for doc in batch],
                normalize_embeddings=True
            )
            
            for j, doc in enumerate(batch):
                data.append({
                    "id": i + j,
                    "vector": embeddings[j].tolist(),
                    "title": doc["title"],
                    "content": doc["content"],
                    "source": doc.get("source", ""),
                    "category": doc.get("category", "general")
                })
        
        # 批量插入
        res = self.client.insert(self.collection_name, data)
        print(f"成功添加 {res['insert_count']} 条文档")
        return res
    
    def retrieve(self, query, top_k=5, filter_expr=None):
        """检索相关文档"""
        # 生成查询向量
        query_vec = self.embedding_model.encode(
            query,
            normalize_embeddings=True
        ).tolist()
        
        # 检索
        results = self.client.search(
            collection_name=self.collection_name,
            data=[query_vec],
            limit=top_k,
            filter=filter_expr,
            output_fields=["title", "content", "source", "category"],
            search_params={"params": {"ef": 128}}
        )
        
        return results[0]
    
    def generate_answer(self, query, retrieved_docs):
        """基于检索结果生成答案"""
        # 构建上下文
        context = "\n\n".join([
            f"文档 {i+1} (相似度: {hit['distance']:.3f}):\n"
            f"标题: {hit['entity']['title']}\n"
            f"内容: {hit['entity']['content']}"
            for i, hit in enumerate(retrieved_docs)
        ])
        
        # 构建 Prompt
        prompt = f"""你是一个专业的问答助手。请基于以下文档回答用户的问题。
如果文档中没有相关信息,请回答"根据现有资料无法回答"。

参考文档:
{context}

用户问题:{query}

回答:"""
        
        # 调用 LLM
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=1024
        )
        
        return response.choices[0].message.content
    
    def query(self, question, top_k=5, filter_expr=None):
        """完整的 RAG 流程"""
        # 1. 检索
        retrieved = self.retrieve(question, top_k, filter_expr)
        
        # 2. 生成答案
        answer = self.generate_answer(question, retrieved)
        
        return {
            "answer": answer,
            "sources": [
                {
                    "title": hit["entity"]["title"],
                    "source": hit["entity"]["source"],
                    "similarity": hit["distance"]
                }
                for hit in retrieved
            ]
        }


# 使用示例
if __name__ == "__main__":
    # 初始化 RAG 系统
    rag = RAGSystem(
        milvus_uri="./milvus_rag.db",
        openai_api_key="your-api-key"
    )
    
    # 添加示例文档
    sample_docs = [
        {
            "title": "Milvus 简介",
            "content": "Milvus 是一个开源的向量数据库,专为 AI 应用设计,支持十亿级向量的毫秒级检索。",
            "source": "官网",
            "category": "技术"
        },
        {
            "title": "RAG 技术原理",
            "content": "检索增强生成(RAG)结合了向量检索和大语言模型,通过检索相关文档来增强生成答案的准确性。",
            "source": "技术博客",
            "category": "AI"
        }
    ]
    
    rag.add_documents(sample_docs)
    
    # 提问
    result = rag.query("什么是 Milvus 和 RAG?")
    print("回答:", result["answer"])
    print("\n参考来源:")
    for src in result["sources"]:
        print(f"- {src['title']} (相似度: {src['similarity']:.3f})")
7.1.3 混合检索(Hybrid Search)Milvus 2.4+

结合稠密向量(语义)和稀疏向量(关键词)的优势。

from pymilvus import AnnSearchRequest, WeightedRanker

# 构建两个检索请求
dense_req = AnnSearchRequest(
    data=[dense_vector],
    ann_field="dense_vector",
    param={"metric_type": "COSINE", "params": {"ef": 128}},
    limit=100
)

sparse_req = AnnSearchRequest(
    data=[sparse_vector],
    ann_field="sparse_vector",
    param={"metric_type": "IP", "params": {"drop_ratio_search": 0.2}},
    limit=100
)

# 加权融合重排序
rerank = WeightedRanker(0.7, 0.3)  # 70% 语义 + 30% 关键词

# 执行混合检索
results = client.hybrid_search(
    collection_name="hybrid_rag",
    reqs=[dense_req, sparse_req],
    ranker=rerank,
    limit=10,
    output_fields=["title", "content"]
)

7.2 推荐系统

基于用户行为和物品特征的向量召回。

class RecommendationSystem:
    def __init__(self, client):
        self.client = client
        self.item_collection = "item_vectors"
        self.user_collection = "user_vectors"
    
    def get_user_recommendations(self, user_id, top_n=20):
        """基于用户向量获取推荐"""
        # 获取用户向量
        user = self.client.get(
            self.user_collection,
            ids=[user_id],
            output_fields=["user_vector"]
        )
        
        if not user:
            return []
        
        user_vector = user[0]["user_vector"]
        
        # 检索相似物品
        results = self.client.search(
            collection_name=self.item_collection,
            data=[user_vector],
            limit=top_n,
            filter="is_active == true",  # 只推荐上架的商品
            output_fields=["item_id", "name", "category", "price"]
        )
        
        return results[0]
    
    def get_similar_items(self, item_id, top_n=10):
        """相似物品推荐"""
        item = self.client.get(
            self.item_collection,
            ids=[item_id],
            output_fields=["item_vector", "category"]
        )
        
        if not item:
            return []
        
        # 同类目优先推荐
        category_filter = f"category == '{item[0]['category']}'"
        
        results = self.client.search(
            collection_name=self.item_collection,
            data=[item[0]["item_vector"]],
            limit=top_n + 1,  # +1 排除自己
            filter=category_filter,
            output_fields=["item_id", "name", "price"]
        )
        
        # 排除查询的物品自身
        return [hit for hit in results[0] if hit["id"] != item_id][:top_n]

7.3 图像检索系统

以图搜图,支持海量图像库的相似图像检索。

from PIL import Image
import torch
from torchvision import models, transforms

class ImageSearchEngine:
    def __init__(self, milvus_client, collection_name="images"):
        self.client = milvus_client
        self.collection_name = collection_name
        
        # 加载预训练的 ResNet 模型
        self.model = models.resnet50(pretrained=True)
        self.model.eval()
        
        # 图像预处理
        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
        
        self.dim = 2048  # ResNet50 特征维度
        self._init_collection()
    
    def _init_collection(self):
        if not self.client.has_collection(self.collection_name):
            fields = [
                FieldSchema("image_id", DataType.INT64, is_primary=True),
                FieldSchema("vector", DataType.FLOAT_VECTOR, dim=self.dim),
                FieldSchema("path", DataType.VARCHAR, max_length=512),
                FieldSchema("category", DataType.VARCHAR, max_length=128)
            ]
            schema = CollectionSchema(fields)
            self.client.create_collection(self.collection_name, schema=schema)
            
            # 创建索引
            self.client.create_index(
                self.collection_name,
                {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 32}}
            )
        
        self.client.load_collection(self.collection_name)
    
    def extract_feature(self, image_path):
        """提取图像特征向量"""
        img = Image.open(image_path).convert('RGB')
        img_tensor = self.transform(img).unsqueeze(0)
        
        with torch.no_grad():
            features = self.model(img_tensor)
            features = torch.nn.functional.normalize(features, p=2, dim=1)
        
        return features.squeeze().numpy().tolist()
    
    def add_image(self, image_id, image_path, category="general"):
        """添加图像到索引"""
        vector = self.extract_feature(image_path)
        
        self.client.insert(
            self.collection_name,
            [{
                "image_id": image_id,
                "vector": vector,
                "path": image_path,
                "category": category
            }]
        )
    
    def search_similar(self, query_image_path, top_k=10, category=None):
        """搜索相似图像"""
        query_vector = self.extract_feature(query_image_path)
        
        filter_expr = f"category == '{category}'" if category else None
        
        results = self.client.search(
            collection_name=self.collection_name,
            data=[query_vector],
            limit=top_k,
            filter=filter_expr,
            output_fields=["path", "category"],
            search_params={"params": {"ef": 128}}
        )
        
        return results[0]

7.4 语义搜索

基于语义理解的全文检索,超越传统关键词匹配。

class SemanticSearch:
    def __init__(self, client, embedding_model_name):
        self.client = client
        self.collection_name = "semantic_docs"
        self.embedding_model = SentenceTransformer(embedding_model_name)
    
    def search(self, query, top_k=20, rerank=True):
        """语义搜索 + 重排序"""
        # 1. 向量召回
        query_vector = self.embedding_model.encode(query).tolist()
        
        candidates = self.client.search(
            collection_name=self.collection_name,
            data=[query_vector],
            limit=100,  # 召回更多候选
            output_fields=["title", "content"]
        )[0]
        
        if not rerank:
            return candidates[:top_k]
        
        # 2. Cross-Encoder 精排(可选)
        from sentence_transformers import CrossEncoder
        reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
        
        # 构建查询-文档对
        pairs = [(query, hit['entity']['content']) for hit in candidates]
        scores = reranker.predict(pairs)
        
        # 重新排序
        reranked = sorted(
            zip(candidates, scores),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [item[0] for item in reranked[:top_k]]

第8章 常见问题与排障指南

8.1 连接问题

Q1: 无法连接到 Milvus 服务器

可能原因

  1. Milvus 服务未启动
  2. 防火墙阻挡了端口
  3. 主机地址或端口错误
  4. etcd 或 MinIO 依赖服务异常

排查步骤

# 1. 检查容器状态
docker-compose ps

# 2. 查看日志
docker-compose logs milvus-standalone

# 3. 测试端口连通性
telnet localhost 19530

# 4. 检查 etcd 状态
docker exec milvus-etcd etcdctl endpoint health

解决方案

# 重启服务
docker-compose down
docker-compose up -d

# 等待服务就绪(约 30-60 秒)
sleep 30
Q2: 认证失败

错误信息authentication failed

解决方案

# 确认用户名密码正确
connections.connect(
    host="localhost",
    port="19530",
    user="root",
    password="Milvus"  # 默认密码
)

# 重置 root 密码(需要进入容器)
docker exec -it milvus-standalone /milvus/bin/milvus-cli reset-root-password

8.2 查询问题

Q3: 查询不到刚插入的数据

原因:Milvus 的近实时特性,数据写入后需要几秒才能搜索到。

解决方案

# 1. 手动 flush(不建议频繁调用)
collection.flush()

# 2. 等待一小段时间
import time
time.sleep(2)  # 等待 2 秒

# 3. 调整一致性级别
results = client.search(
    ...,
    consistency_level="Strong"  # 强一致,确保能查到最新数据
)

注意Strong 一致性会显著降低查询性能,仅在必要时使用。

Q4: 搜索结果为空或数量不足

可能原因

  1. 集合未加载到内存
  2. 索引未构建完成
  3. 过滤条件太严格
  4. 向量维度不匹配

排查

# 1. 检查集合是否加载
load_state = client.get_load_state("my_collection")
print(f"加载状态: {load_state}")

# 2. 检查索引状态
index_info = client.describe_index("my_collection")
print(f"索引状态: {index_info}")

# 3. 先查询看是否有数据
count = client.query("my_collection", "", ["count(*)"])
print(f"数据量: {count}")

# 4. 验证向量维度
desc = client.describe_collection("my_collection")
print(f"向量维度: {desc['schema']['fields'][1]['params']['dim']}")
Q5: 搜索速度很慢

优化方向

# 1. 检查索引类型(FLAT 比 HNSW 慢很多)
# 确保使用了 HNSW 或 IVF 索引

# 2. 调整搜索参数
search_params = {
    "metric_type": "COSINE",
    "params": {
        "ef": 64,  # 降低 ef 可提速,默认 128
        # 或
        "nprobe": 16  # IVF 索引降低 nprobe
    }
}

# 3. 使用更低的一致性级别
results = client.search(
    ...,
    consistency_level="Eventually"
)

# 4. 检查内存是否足够(OOM 会导致严重降速)
# free -h

8.3 内存问题

Q6: OOM(内存溢出)

错误信息Out of Memorymemory allocation failed

原因分析

  • 集合太大,索引全部加载超过内存
  • 同时加载了太多集合
  • HNSW 索引内存占用过高

解决方案

# 1. 释放不常用的集合
client.release_collection("unused_collection")

# 2. 使用更省内存的索引
# HNSW → IVF_SQ8 → IVF_PQ
index_params = {
    "index_type": "IVF_PQ",  # PQ 压缩比最高
    "metric_type": "L2",
    "params": {"nlist": 1024, "m": 16}
}

# 3. 使用分区,只加载需要的分区
client.release_partitions("large_collection", ["old_partition"])

# 4. 开启 MMap(Milvus 2.3+)
# 修改配置文件:
# queryNode:
#   mmap:
#     enabled: true

内存估算工具

def estimate_memory(num_vectors, dim, index_type="HNSW"):
    """估算索引内存占用(GB)"""
    vector_size = num_vectors * dim * 4  # 4 bytes per float32
    
    if index_type == "FLAT":
        overhead = 1.0
    elif index_type == "IVF_FLAT":
        overhead = 1.1
    elif index_type == "IVF_SQ8":
        overhead = 0.28  # 约 1/4
    elif index_type == "IVF_PQ":
        overhead = 0.125  # m=16, dim 倍数时约 1/8
    elif index_type == "HNSW":
        overhead = 10.0  # HNSW 内存开销大
    else:
        overhead = 1.0
    
    return vector_size * overhead / 1024**3

# 示例:100万 × 768维 HNSW 索引
print(f"预估内存: {estimate_memory(1000000, 768, 'HNSW'):.2f} GB")
# 输出: 预估内存: 28.61 GB

8.4 性能问题

Q7: 写入速度慢

优化建议

  1. 增大批量大小:单次写入 1000-5000 条
  2. 并发写入:使用多线程/多进程写入
  3. 扩容 DataNode:集群模式增加 DataNode 数量
  4. 检查磁盘:使用 NVMe SSD
# 并发写入示例
from concurrent.futures import ThreadPoolExecutor

def insert_batch(batch):
    return client.insert("my_collection", batch)

# 使用 8 线程并发写入
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(insert_batch, batch) for batch in batches]
    results = [f.result() for f in futures]
Q8: 索引构建太慢

优化建议

  1. 降低 MefConstruction 参数
  2. 增加 CPU 核数
  3. 使用更快的磁盘
  4. 先导入所有数据再建索引(避免增量索引)
# 更快的索引参数(牺牲一些精度)
index_params = {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {
        "M": 8,              # 从 16 降到 8
        "efConstruction": 128  # 从 256 降到 128
    }
}

8.5 数据问题

Q9: 删除数据后空间没有释放

原因:Milvus 使用 LSM 类似的架构,删除只是标记墓碑,不会立即释放空间。

解决方案

# 触发 Compaction(Milvus 2.3+)
client.compact_collection("my_collection")

# 等待 Compaction 完成
import time
time.sleep(60)  # 等待 1 分钟

# 查看 Segment 数量变化
stats = client.get_collection_stats("my_collection")
print(stats)

注意:Compaction 是后台异步操作,可能需要较长时间。

Q10: 数据损坏或丢失

恢复方案

  1. 从备份恢复(第 6.5 节)
  2. 重新导入原始数据
  3. 检查 etcd 和 MinIO 是否正常

预防措施

  • 定期备份
  • 使用多副本部署
  • 监控 etcd 和 MinIO 状态

8.6 兼容性问题

Q11: PyMilvus 与服务端版本不兼容

错误信息version mismatch 或 API 调用失败

解决方案

# 查看服务端版本
python -c "from pymilvus import utility; print(utility.get_server_version())"

# 安装匹配版本
pip install pymilvus==2.3.7  # 对应服务端 2.3.x

版本匹配关系

  • PyMilvus 2.3.x → Milvus 2.3.x
  • PyMilvus 2.4.x → Milvus 2.4.x
  • PyMilvus 2.5.x → Milvus 2.5.x

8.7 日志分析

关键日志位置
# Docker 部署
docker logs milvus-standalone

# 查看最近 100 行错误日志
docker logs milvus-standalone --tail 100 2>&1 | grep -i error

# 进入容器查看详细日志
docker exec -it milvus-standalone bash
cd /var/lib/milvus/logs/
ls -la
常见错误日志解读
错误关键词 含义 解决方案
timeout 操作超时 增加超时时间,检查网络
deadline exceeded gRPC 超时 同上
not found 资源不存在 检查集合/分区名称
permission denied 权限不足 检查用户名密码
already exists 重复创建 先删除再创建
channel is full 消息队列满 扩容或降低写入速度

第9章 面试高频题汇总(35 题)

基础概念(10 题)

1. 什么是向量数据库?它和传统数据库有什么区别?

回答要点

  • 向量数据库专门存储和检索高维向量
  • 核心是相似性搜索而非精确匹配
  • 使用**近似最近邻(ANN)**算法,在精度和速度间权衡
  • 支持相似度度量:L2(欧氏距离)、IP(内积)、COSINE(余弦)
  • 区别:传统 DB 是精确匹配(B+树、哈希),向量 DB 是近似搜索(ANN索引)
2. 为什么需要向量数据库?不能用传统数据库做相似搜索吗?

回答要点

  • 传统数据库暴力搜索是 O(N),100万向量需要几秒甚至几分钟
  • 向量数据库用 ANN 索引,将复杂度降到 O(log N),毫秒级返回
  • 向量数据库专门优化了高维向量的存储和检索
  • 支持动态插入、删除、分区、多租户等企业级特性
3. Milvus 的整体架构是怎样的?有哪些核心组件?

回答要点

  • 接入层:Proxy,无状态,负责请求转发和合并
  • 协调层:RootCoord、QueryCoord、DataCoord、IndexCoord
  • 工作节点:QueryNode(查询)、DataNode(写入)、IndexNode(建索引)
  • 存储引擎:etcd(元数据)、Pulsar/Kafka(消息队列)、S3/MinIO(对象存储)
  • 共享存储架构:计算存储分离,支持独立弹性扩缩容
4. Milvus 支持哪些索引类型?各自的适用场景是什么?

回答要点

  • FLAT:暴力搜索,100%召回,适合小数据集(< 100万)
  • IVF_FLAT:倒排文件,平衡精度和速度,适合中大规模数据
  • IVF_PQ/SQ8:量化压缩,极致节省内存,适合超大规模但可接受精度损失
  • HNSW:多层图索引,召回率高、查询快,内存占用高,是目前主流
  • DISKANN:基于磁盘,适合 TB 级数据,内存占用极低
  • ANNOY:树状索引,简单易理解,适合中小规模
5. 什么是一致性级别?Milvus 有哪几种一致性级别?

回答要点

  • 一致性级别决定了"读能看到多久之前的写"
  • Strong(强一致):读到最新写入,性能最差
  • Bounded(有界一致):容忍一定时间窗口不一致,默认,性能中等
  • Session(会话一致):同一客户端读写一致,性能较好
  • Eventually(最终一致):只保证最终同步,性能最好
6. 向量的相似度度量有哪些?分别适用于什么场景?

回答要点

  • L2(欧氏距离):衡量空间绝对距离,适合图像、空间坐标等
  • IP(内积):衡量向量投影长度,适合归一化后的向量
  • COSINE(余弦相似度):衡量方向相似性,不关心长度,NLP 最常用
  • 选择建议:文本语义匹配用 COSINE,图像匹配用 L2
7. 什么是 Embedding?它和向量数据库是什么关系?

回答要点

  • Embedding 是将非结构化数据(文本、图像、音频)映射到高维稠密向量的过程
  • 语义相似的内容在向量空间中距离更近
  • 向量数据库是 Embedding 的"存储器"和"搜索引擎"
  • Embedding 模型 + 向量数据库 = AI 应用的基础设施
8. Milvus 中的 Collection、Partition、Segment 分别是什么?

回答要点

  • Collection:最顶层容器,类比关系型数据库的表
  • Partition:Collection 的子集,用于数据隔离,查询可以指定分区提高性能
  • Segment:Milvus 内部数据组织单位,数据插入后先写入 Growing Segment,达到阈值后 Flush 为 Sealed Segment
9. 什么是 ANN(近似最近邻)?为什么不使用精确最近邻?

回答要点

  • 精确最近邻(KNN)需要计算与所有向量的距离,复杂度 O(N)
  • ANN 通过索引结构将复杂度降到 O(log N),牺牲少量精度换取巨大性能提升
  • 实际应用中 95%+ 的召回率完全够用
  • ANN 是向量数据库的核心技术
10. Milvus 支持哪些数据类型?

回答要点

  • 向量类型:FLOAT_VECTOR、FLOAT16_VECTOR、BFLOAT16_VECTOR、SPARSE_FLOAT_VECTOR、BINARY_VECTOR
  • 标量类型:BOOL、INT8/16/32/64、FLOAT、DOUBLE、VARCHAR、JSON、ARRAY
  • 2.4+ 新增稀疏向量支持,2.3+ 新增动态字段支持

架构原理(10 题)

11. HNSW 索引的原理是什么?

回答要点

  • 多层图结构:顶层稀疏连接(快速导航),底层稠密连接(精确搜索)
  • 搜索过程:从顶层贪心搜索到最近点,逐层下降,到底层后扩展搜索
  • 三个关键参数
    • M:每层每个节点的最大邻居数,影响内存和精度
    • efConstruction:构建时搜索候选数,影响构建时间和精度
    • ef:搜索时的候选数,影响查询速度和召回率
  • 优点:召回率高、查询快、支持动态插入
  • 缺点:内存占用大、构建慢
12. IVF 索引的原理是什么?nlist 和 nprobe 的关系?

回答要点

  • 两步搜索
    1. 构建时:K-Means 聚类得到 nlist 个中心
    2. 搜索时:找 nprobe 个最近的聚类中心,只在这些聚类内精确搜索
  • nlist:聚类中心数量,建议 4*sqrt(N)16*sqrt(N),N 为向量数
  • nprobe:搜索的聚类数,越大召回率越高,速度越慢
  • 权衡:nlist 大则每个聚类小,nprobe 大则搜索范围大
13. PQ(乘积量化)的原理是什么?为什么能大幅压缩?

回答要点

  • 切分子向量:将 D 维向量切为 m 个 D/m 维的子向量
  • 分别聚类:每个子向量空间单独聚类,生成码本
  • 编码存储:每个子向量用码本索引表示(通常 8 bit = 1 byte)
  • 压缩比:原始 D×4 字节 → m 字节,m=16 时压缩 32 倍!
  • 距离计算:预计算查询子向量与码本中心的距离表,查表快速求和
14. Milvus 的数据写入流程是怎样的?

回答要点

客户端 → Proxy → 消息队列(Pulsar)→ DataNode
                                          ↓
                                   写入 Growing Segment
                                          ↓
                                 达到阈值触发 Flush
                                          ↓
                              写入对象存储(MinIO)→ Sealed Segment
                                          ↓
                              IndexNode 异步构建索引
  • 写入是异步的,近实时(1 秒左右可搜索)
  • WAL 保证数据不丢
15. Milvus 的查询流程是怎样的?

回答要点

客户端 → Proxy → QueryCoord(分发)→ QueryNode
                                          ↓
                              Growing Segment(暴力搜)
                              Sealed Segment(ANN索引搜)
                                          ↓
                                    结果合并返回
  • Growing Segment 保证实时性,Sealed Segment 保证性能
16. 什么是时间旅行(Time Travel)查询?

回答要点

  • Milvus 保存了数据的多个版本,可以查询历史某个时间点的数据状态
  • 基于 MVCC(多版本并发控制)实现
  • 使用场景:数据恢复、A/B 测试、审计追溯
  • 通过 travel_timestamp 参数指定查询时间点
17. Milvus 如何实现高可用?

回答要点

  • 无状态组件:Proxy、QueryNode、DataNode、IndexNode 可多副本部署
  • 有状态组件:etcd、Pulsar、MinIO 本身支持集群部署
  • 故障自动转移:Coord 监控节点状态,故障时自动迁移
  • 读写分离:查询和写入链路分离,互不影响
18. 什么是动态字段?它的实现原理是什么?

回答要点

  • Milvus 2.3+ 特性,无需预定义 Schema 即可插入任意字段
  • 实现:所有动态字段打包成一个 JSON 存储在 $meta 字段
  • 支持动态字段的过滤和返回
  • 优点:灵活,适合 Schema 多变的场景
  • 缺点:动态字段过滤性能略低于预定义字段
19. Milvus 的 Compaction 机制是怎样的?

回答要点

  • Compaction 是合并小 Segment、清理删除数据的过程
  • 删除操作只是打墓碑标记,不会立即删除
  • Compaction 会:
    1. 合并小的 Segment 减少碎片
    2. 清理已删除的数据(墓碑)
    3. 释放磁盘和内存空间
  • 后台自动触发,也可手动调用
20. Milvus 如何支持多租户?有哪几种方案?

回答要点

  • 数据库隔离:每个租户一个数据库,隔离最高,资源利用率低
  • Collection 隔离:每个租户一个 Collection,隔离中等,资源中等
  • Partition 隔离:所有租户一个 Collection,每个租户一个 Partition,隔离最低,资源利用率最高
  • 选择建议:大型企业用数据库,中型用 Collection,SaaS 大规模用 Partition

性能优化(7 题)

21. 如何选择合适的索引类型?

回答要点

数据量小 (< 100万) → FLAT(100%召回)
数据量大 →
    内存充足?
    ├─ 是 → HNSW(首选,高性能高精度)
    └─ 否 →
        能接受精度损失?
        ├─ 是 → IVF_PQ / IVF_SQ8
        └─ 否 → 加内存用 HNSW 或 DISKANN
22. HNSW 的参数如何调优?

回答要点

  • M:8-64,越大精度越高、内存越大、构建越慢,16 是常用平衡值
  • efConstruction:100-500,越大构建越慢精度越高,256 是常用值
  • ef:>= limit,越大查询越慢召回率越高,按需调整:
    • 90% 召回:ef ≈ 100
    • 95% 召回:ef ≈ 200
    • 99% 召回:ef ≈ 500
23. Milvus 的内存如何估算?内存不足时有哪些优化手段?

回答要点

  • 估算公式内存 ≈ 向量数 × 维度 × 4字节 × 索引开销系数
  • 索引开销系数:FLAT=1,IVF_FLAT=1.1,HNSW=8-10,IVF_PQ=0.1
  • 内存不足优化
    1. 使用 IVF_PQ / IVF_SQ8 压缩索引
    2. 释放不常用的 Collection
    3. 开启 MMap(Milvus 2.3+)
    4. 使用 Partition,只加载需要的分区
    5. 升级内存
24. 如何提高查询 QPS?

回答要点

  • 批量查询:一次查多个向量,QPS 提升 2-5 倍
  • 降低一致性级别:Strong → Eventually,性能提升 30%+
  • 调整搜索参数:降低 ef/nprobe
  • 增加 QueryNode:横向扩展查询节点
  • 使用更快的索引:HNSW > IVF_FLAT > IVF_PQ
  • CPU 绑定:QueryNode 绑核减少上下文切换
25. 如何提高写入吞吐量?

回答要点

  • 批量写入:单次 1000-5000 条,避免逐条写
  • 并发写入:多线程/多进程写入
  • 增加 DataNode:横向扩展写入节点
  • 优化消息队列:Pulsar 性能优于 Kafka
  • 先写后建索引:避免增量索引的开销
26. 召回率不达标怎么办?

回答要点

  • 提高搜索参数:增大 ef(HNSW)或 nprobe(IVF)
  • 更换索引类型:IVF_PQ → IVF_FLAT → HNSW → FLAT
  • 重新构建索引:增大构建时参数(M、efConstruction、nlist)
  • 检查向量质量:Embedding 模型质量是否够高
  • 增加返回候选数:limit 设大一点,后处理再截断
27. Milvus 的性能瓶颈通常在哪里?如何排查?

回答要点

  • 常见瓶颈
    1. 内存:HNSW 索引内存占用大,OOM 导致降速
    2. CPU:查询是计算密集型,核数不够导致 QPS 上不去
    3. 磁盘:索引构建和 Flush 是 IO 密集型
  • 排查方法
    1. 看监控:CPU、内存、磁盘 IO、网络
    2. 看日志:慢查询日志、错误日志
    3. Profile:topiostatvmstat

实战与排障(8 题)

28. 为什么查询不到刚插入的数据?如何保证能查到最新数据?

回答要点

  • Milvus 是近实时系统,写入到可搜索有 1 秒左右延迟
  • 原因
    1. 数据先写 WAL,再到内存,需要时间
    2. Growing Segment 构建需要时间
  • 解决方案
    1. 等待几秒再查
    2. 使用 Strong 一致性级别(性能代价大)
    3. 手动 flush(不推荐频繁调用)
29. 删除数据后为什么内存没有释放?

回答要点

  • Milvus 删除是标记删除,不会立即物理删除
  • 需要等待 Compaction 进程:
    1. 合并小 Segment
    2. 清理已删除的向量
    3. 释放空间
  • 可以手动触发 compact_collection()
  • Compaction 是后台异步操作,需要时间
30. Milvus 启动失败怎么办?常见原因有哪些?

回答要点

  • 检查依赖服务:etcd、MinIO/Pulsar 是否正常
  • 检查端口:19530、9091、2379、9000 是否被占用
  • 检查磁盘:是否有剩余空间,磁盘是否只读
  • 查看日志
    docker logs milvus-standalone --tail 100
    
  • 常见错误
    • etcd 连接失败 → 检查 etcd 状态
    • MinIO 连接失败 → 检查对象存储配置
    • 端口被占用 → lsof -i :19530 找占用进程
31. 如何备份和恢复 Milvus 数据?

回答要点

  • 官方工具:milvus-backup
    milvus-backup create --name my_backup
    milvus-backup restore --name my_backup
    
  • 导出导入:query 导出所有数据到 JSON,重新 insert
  • 磁盘快照:虚拟机或云服务器做磁盘快照
  • 注意:备份时尽量停止写入,避免备份不一致
32. 索引构建失败或卡住怎么办?

回答要点

  • 检查磁盘空间:索引构建需要额外临时空间
  • 检查内存:内存不足会导致构建失败或极慢
  • 查看 IndexNode 日志:找错误信息
  • 降低索引参数:M 和 efConstruction 太大会导致 OOM
  • 分批构建:数据量太大时分批插入分批构建
33. 生产环境部署 Milvus 需要注意什么?

回答要点

  • 集群部署:不要用 standalone,用 Kubernetes 集群
  • 资源配置
    • QueryNode:高 CPU、大内存
    • DataNode/IndexNode:高 CPU、快磁盘
  • 监控告警:Prometheus + Grafana,监控延迟、QPS、内存、CPU
  • 备份策略:定期全量备份 + 增量备份
  • 安全:启用认证、TLS 加密、网络隔离
  • 滚动升级:升级时先升级 Coord,再升级 Node
34. 如何做 Milvus 的容量规划?

回答要点

  • 存储
    • 原始向量:数量 × 维度 × 4 字节
    • 索引开销:HNSW × 10,IVF_FLAT × 1.1,IVF_PQ × 0.1
    • 冗余:预留 30% buffer
  • 计算
    • 查询:每核每秒约 500-1000 次 HNSW 查询
    • 写入:每核每秒约 1000-5000 条向量
  • 示例:1 亿 × 768 维 HNSW
    • 内存:100M × 768 × 4 × 10 / 1024³ ≈ 286 GB
    • 查询 QPS 1万:需要 10-20 核 CPU
35. 对比 Milvus、Pinecone、Weaviate、Chroma 的优劣?

回答要点

  • Milvus
    • 优点:开源免费、性能好、功能全、社区活跃、支持大规模
    • 缺点:需要自己部署运维
    • 适用:有运维能力、大规模、定制化需求
  • Pinecone
    • 优点:全托管、Serverless、零运维、性能好
    • 缺点:闭源、贵、数据锁定、定制化能力弱
    • 适用:不想运维、预算充足、中小规模
  • Weaviate
    • 优点:GraphQL 接口、内置向量模型、语义搜索能力强
    • 缺点:性能略逊于 Milvus
  • Chroma
    • 优点:轻量、Python 友好、零配置
    • 缺点:仅支持小规模、功能简单
    • 适用:原型验证、个人开发

总结:生产环境大规模用 Milvus,快速原型用 Chroma,不想运维用 Pinecone。


附录 A:API 速查表

Python SDK 2.3.x

连接
from pymilvus import connections, utility, MilvusClient

# 传统方式
connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    user="root",
    password="Milvus"
)

# MilvusClient 方式
client = MilvusClient(uri="http://localhost:19530", token="root:Milvus")
集合管理
# 创建
client.create_collection(collection_name, dimension)

# 删除
client.drop_collection(collection_name)

# 检查存在
client.has_collection(collection_name)

# 列出所有
client.list_collections()

# 加载/释放
client.load_collection(collection_name)
client.release_collection(collection_name)
数据操作
# 插入
client.insert(collection_name, data)

# 更新/插入
client.upsert(collection_name, data)

# 删除
client.delete(collection_name, filter="id > 100")

# 查询
client.query(collection_name, filter, output_fields, limit)

# 搜索
client.search(collection_name, data, limit, filter, output_fields)

# 获取
client.get(collection_name, ids)
索引管理
# 创建
client.create_index(
    collection_name,
    index_params={
        "index_type": "HNSW",
        "metric_type": "COSINE",
        "params": {"M": 16, "efConstruction": 256}
    }
)

# 查看
client.describe_index(collection_name)

# 删除
client.drop_index(collection_name)

附录 B:资源链接

官方资源

  • Milvus 官网:https://milvus.io
  • GitHub:https://github.com/milvus-io/milvus
  • 官方文档:https://milvus.io/docs
  • PyMilvus 文档:https://milvus.io/api-reference/pymilvus/v2.3.x

学习资源

  • Bootcamp:https://github.com/milvus-io/bootcamp(大量示例代码)
  • Milvus 教程:https://milvus.io/docs/tutorials
  • 向量数据库课程:https://learn.zilliz.com

社区

  • Slack:https://milvusio.slack.com
  • Discord:https://discord.gg/FGNR2sWNfA
  • 论坛:https://discuss.milvus.io

托管服务

  • Zilliz Cloud:https://cloud.zilliz.com(Milvus 官方云服务)

相关项目

  • towhee:https://towhee.io(数据处理 ETL 框架)
  • GPTCache:https://github.com/zilliztech/GPTCache(LLM 响应缓存)
  • Milvus Lite:https://github.com/milvus-io/milvus-lite(轻量级嵌入式版本)
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐