第一章 Milvus 核心概念与基础

1.1 向量数据库的概念与价值

向量数据库是一种专门用于存储和检索向量数据的数据库系统。与传统关系型数据库不同,向量数据库能够高效处理高维向量数据,并支持相似度搜索。

核心价值

  • 相似度搜索:快速找到与查询向量最相似的向量
  • 高维数据处理:高效处理数百甚至数千维的向量数据
  • 实时性:支持毫秒级的查询响应
  • 可扩展性:能够处理大规模向量数据集

1.2 Milvus 定位与特点

Milvus 是一个开源的向量数据库,专为 AI 应用场景设计,提供高性能的向量相似度搜索服务。

主要特点

  • 高性能:采用先进的索引技术,支持每秒百万级的查询
  • 可扩展:支持水平扩展,轻松处理大规模数据集
  • 多语言支持:提供 Python、Java、Go、C++ 等多语言 SDK
  • 丰富的索引类型:支持 IVF、HNSW 等多种索引类型
  • 混合检索:支持向量与标量的混合查询

1.3 核心术语解释

术语 解释
向量 由多个数值组成的数组,用于表示数据的特征
维度 向量中元素的数量,如 128 维、256 维等
距离度量 衡量两个向量相似度的方法,如欧氏距离、余弦相似度等
索引 用于加速向量搜索的数据结构
Collection Milvus 中的数据集合,类似于传统数据库中的表
Partition Collection 的分区,用于数据管理和查询优化
Embedding 将非结构化数据转换为向量的过程

1.4 向量计算数学基础

距离度量方法

  1. 欧氏距离 (Euclidean Distance)

    • 计算两个向量之间的直线距离
    • 适用场景:向量空间为欧几里得空间的情况
  2. 余弦相似度 (Cosine Similarity)

    • 计算两个向量的夹角余弦值
    • 适用场景:关注方向而非大小的场景,如文本相似度
  3. 曼哈顿距离 (Manhattan Distance)

    • 计算两个向量对应维度差的绝对值之和
    • 适用场景:高维稀疏向量

1.5 向量嵌入(Embedding)技术

向量嵌入是将非结构化数据(如文本、图像、音频)转换为数值向量的过程。常用的嵌入模型包括:

  • 文本嵌入:BERT、Sentence-BERT、GPT 等
  • 图像嵌入:ResNet、CLIP、ViT 等
  • 音频嵌入:VGGish、Whisper 等

嵌入模型选择考量

  • 模型大小与性能
  • 嵌入维度
  • 计算资源需求
  • 特定领域的表现

1.6 与传统数据库的对比

特性 传统关系型数据库 向量数据库
数据类型 结构化数据 高维向量
查询类型 精确匹配 相似度搜索
索引结构 B树、哈希等 IVF、HNSW等
适用场景 事务处理、报表 推荐系统、图像搜索
扩展性 垂直扩展为主 水平扩展为主

第二章 Milvus 架构设计与组件

2.1 整体架构 overview

Milvus 是一个开源的向量数据库,其架构设计为分布式、高性能的向量检索系统。以下是其核心组件和架构图的分解说明:

核心组件分层

接入层(Access Layer)

  • Proxy:接收客户端请求,负责路由、负载均衡和查询协调。
  • SDK/API:提供多种语言的客户端接口(Python、Java、Go等),支持 REST 和 gRPC 协议。

协调服务层(Coordinator Service)

  • Root Coordinator:管理元数据(集合、分区、索引等)和全局时间戳分配。
  • Query Coordinator:协调查询任务,优化执行计划。
  • Data Coordinator:管理数据节点的数据分布与均衡。
  • Index Coordinator:协调索引构建与更新。

执行层(Worker Node)

  • Query Node:执行向量检索和标量过滤,支持近实时搜索。
  • Data Node:处理数据插入、删除和持久化,写入日志并同步到对象存储。
  • Index Node:构建和管理向量索引(如 IVF、HNSW、ANNOY)。

存储层(Storage)

  • 对象存储(Object Storage):持久化存储原始向量和日志(如 S3、MinIO)。
  • 日志存储(Log Broker):流式日志队列(如 Kafka、Pulsar),用于实时数据同步。
  • 元数据存储(Meta Store):存储系统元数据(如 etcd、MySQL)。
数据流与协作
  1. 写入流程
    客户端通过 Proxy 发送写入请求,Data Node 将数据写入日志存储并同步到对象存储。Index Node 异步构建索引。

  2. 查询流程
    Query Node 从对象存储加载数据,结合索引快速检索,通过 Proxy 返回结果。

可视化架构图

以下是文字描述的架构图关键点:

> 客户端 → Proxy → Coordinator Service  
>                 ↓  
>         Query/Data/Index Nodes  
>                 ↓  
>         Log Broker → Object Storage  
>                 ↑  
>         Meta Store (etcd/MySQL)
扩展性设计
  • 分片(Sharding):数据水平分片,支持横向扩展。
  • 读写分离:Query Node 和 Data Node 独立扩展,适应不同负载。
  • 多云支持:存储层兼容主流对象存储服务。

2.2 核心组件详解

2.2.1 Proxy
  • 功能:接收客户端请求,进行负载均衡,分发到相应的节点
  • 特点:无状态设计,可水平扩展
  • 作用:作为系统的入口点,处理所有客户端请求
2.2.2 Query Node
  • 功能:处理向量搜索请求
  • 特点:缓存索引数据,提供快速查询
  • 作用:执行相似度搜索,返回查询结果
2.2.3 Data Node
  • 功能:处理数据写入请求
  • 特点:批量处理写入操作,提高性能
  • 作用:将数据持久化到存储层,构建索引
2.2.4 Index Node
  • 功能:构建和维护向量索引
  • 特点:后台异步处理,不影响查询性能
  • 作用:优化查询性能,加速相似度搜索

2.3 存储架构

Milvus 使用分层存储架构,包括:

  • 元数据存储:使用 etcd 存储元数据,如 Collection、Partition 信息
  • 对象存储:使用 S3、MinIO 等存储原始数据和索引文件
  • 缓存:Query Node 本地缓存热数据,提高查询性能

2.4 集群部署模式

部署模式

  1. 单机部署:适用于开发和测试环境
  2. 集群部署:适用于生产环境,提供高可用性和可扩展性

集群配置

  • Proxy 集群:多个 Proxy 节点,实现负载均衡
  • Query Node 集群:多个 Query Node,提高查询吞吐量
  • Data Node 集群:多个 Data Node,提高写入吞吐量
  • Index Node 集群:多个 Index Node,加速索引构建

2.5 高可用性设计

  • 数据冗余:数据多副本存储
  • 节点故障自动恢复:当节点故障时,其他节点接管其工作
  • 负载均衡:自动将请求分发到健康节点
  • 监控告警:实时监控系统状态,及时发现问题

第三章 向量索引原理与实现

3.1 索引类型与适用场景

索引类型 适用场景 优势 劣势
FLAT 小数据集,追求精确性 100% 召回率 查询速度慢
IVF_FLAT 中大型数据集 平衡速度和精度 需要调优 nlist 参数
IVF_SQ8 内存有限场景 内存占用小 精度略有损失
IVF_PQ 超大规模数据集 内存占用极小 精度损失较大
HNSW 追求查询速度 查询速度快 索引构建慢,内存占用大
DISKANN 超大规模数据集 存储效率高 查询速度相对较慢

3.2 IVF 系列索引原理与参数

IVF (Inverted File) 原理

  1. 将向量空间划分为多个聚类(cluster)
  2. 为每个向量分配到最近的聚类中心
  3. 查询时,只搜索最相似的几个聚类

核心参数

  • nlist:聚类数量

    • 取值范围:16 - 65536
    • 推荐值:数据量的平方根
    • 作用:影响索引精度和查询速度
  • nprobe:查询时搜索的聚类数量

    • 取值范围:1 - nlist
    • 推荐值:16 - 128
    • 作用:nprobe 越大,精度越高,但速度越慢

3.3 HNSW 索引原理与参数

HNSW (Hierarchical Navigable Small World) 原理

  1. 构建多层图结构
  2. 每层都是一个近似最近邻图
  3. 高层图作为快速导航,低层图提供精确结果
  4. 查询时从顶层开始,逐层细化搜索

核心参数

  • M:每个节点的最大邻居数

    • 取值范围:4 - 64
    • 推荐值:16 - 32
    • 作用:M 越大,索引质量越高,但内存占用越大
  • efConstruction:索引构建时的搜索宽度

    • 取值范围:100 - 2000
    • 推荐值:200 - 400
    • 作用:影响索引构建质量和速度
  • ef:查询时的搜索宽度

    • 取值范围:10 - 1000
    • 推荐值:50 - 200
    • 作用:ef 越大,查询精度越高,但速度越慢

3.4 其他索引类型

3.4.1 ANNOY
  • 基于树结构的索引
  • 适合中小规模数据集
  • 支持余弦相似度
3.4.2 NGT
  • 基于图结构的索引
  • 平衡了查询速度和内存占用
  • 适合中等规模数据集

3.5 索引构建与优化策略

索引构建流程

  1. 数据准备:收集和预处理向量数据
  2. 选择索引类型:根据数据特点和查询需求
  3. 配置索引参数:根据数据规模和性能要求
  4. 执行索引构建:后台异步处理
  5. 验证索引质量:评估查询性能和精度

优化策略

  • 批量构建:批量处理提高构建速度
  • 增量更新:支持新数据的增量索引
  • 并行构建:利用多线程加速索引构建
  • 索引压缩:减少内存占用

3.6 索引参数调优指南

调优步骤

  1. 确定目标:明确是追求速度还是精度
  2. 基准测试:使用默认参数进行测试
  3. 参数调整:根据测试结果调整参数
  4. 性能评估:评估调整后的性能
  5. 持续优化:根据实际使用情况持续调整

调优建议

  • 对于 IVF 索引,nlist 一般设置为数据量的平方根
  • 对于 HNSW 索引,M 一般设置为 16-32,efConstruction 设置为 200-400
  • 查询参数 nprobe 和 ef 应根据延迟要求进行调整

3.7 索引评估指标

评估指标

  • Recall@k:所有相关结果中,被检索到的比例(召回率)
  • Precision@k:前 k 个结果中,相关结果的比例(精确率)
  • F1 Score:Recall 和 Precision 的调和平均
  • 查询延迟:从查询到结果返回的时间
  • 吞吐量:每秒处理的查询数

评估方法

  1. 准备测试数据集和查询集
  2. 构建不同参数的索引
  3. 执行查询并记录结果
  4. 计算评估指标
  5. 分析结果并选择最优参数

第四章 Milvus 核心 API 与 SDK

4.1 连接与配置

Python SDK 示例

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection

# 连接到 Milvus
connections.connect(
    alias="default",
    host="localhost",
    port="19530"
)

# 连接参数说明
# host: Milvus 服务地址
# port: Milvus 服务端口
# alias: 连接别名,用于后续操作

配置选项

  • timeout:连接超时时间
  • retry:连接失败后的重试次数
  • secure:是否使用安全连接

4.2 集合(Collection)操作

创建集合

# 定义字段
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512)
]

# 创建集合 schema
schema = CollectionSchema(fields, description="文本向量集合")

# 创建集合
collection = Collection(name="text_embeddings", schema=schema)

# 集合参数说明
# name: 集合名称
# schema: 集合 schema
# shards_num: 分片数量(集群模式)

集合操作

  • load:将集合加载到内存
  • release:从内存中释放集合
  • drop:删除集合
  • describe:查看集合信息
  • has_collection:检查集合是否存在

4.3 分区(Partition)管理

创建分区

# 创建分区
collection.create_partition(partition_name="partition_2024")

# 分区操作
# 插入数据到指定分区
collection.insert(data, partition_name="partition_2024")

# 删除分区
collection.drop_partition(partition_name="partition_2024")

# 列出所有分区
partitions = collection.list_partitions()

分区策略

  • 时间分区:按时间范围分区
  • 地域分区:按地理位置分区
  • 业务分区:按业务类型分区

4.4 向量插入与删除

插入数据

# 准备数据
import random

# 生成 1000 条数据
ids = [i for i in range(1000)]
vectors = [[random.random() for _ in range(128)] for _ in range(1000)]
texts = [f"Text {i}" for i in range(1000)]

data = [ids, vectors, texts]

# 插入数据
collection.insert(data)

# 插入参数说明
# data: 数据列表,顺序与字段定义一致
# partition_name: 目标分区名称

删除数据

# 根据主键删除
collection.delete("id in [1, 2, 3]")

# 删除条件
# 支持 in、and、or 等操作符

4.5 相似度搜索 API

向量搜索

# 准备查询向量
query_vector = [[random.random() for _ in range(128)]]

# 执行搜索
results = collection.search(
    data=query_vector,
    anns_field="vector",
    param={"metric_type": "L2", "params": {"nprobe": 10}},
    limit=10,
    expr=None,
    output_fields=["text"]
)

# 搜索参数说明
# data: 查询向量
# anns_field: 向量字段名
# param: 搜索参数
# limit: 返回结果数量
# expr: 标量过滤条件
# output_fields: 需要返回的字段

# 处理搜索结果
for i, result in enumerate(results[0]):
    print(f"Rank {i+1}: ID={result.id}, Distance={result.distance}, Text={result.entity.get('text')}")

搜索参数

  • metric_type:距离度量类型(L2、IP、COSINE)
  • nprobe:搜索的聚类数量(IVF 索引)
  • ef:搜索宽度(HNSW 索引)

4.6 混合检索(标量+向量)

混合搜索

# 标量过滤条件
expr = "text like '%important%'"

# 执行混合搜索
results = collection.search(
    data=query_vector,
    anns_field="vector",
    param={"metric_type": "L2", "params": {"nprobe": 10}},
    limit=10,
    expr=expr,
    output_fields=["text"]
)

# 过滤条件语法
# 支持 like、in、>、<、= 等操作符
# 支持 and、or、not 逻辑操作

4.7 可视化管理系统 Attu

Attu 是 Milvus 的官方图形化管理工具,提供直观的 Web 界面来管理和监控 Milvus 集群。

主要功能

  • 集合管理:创建、删除、查看 Collection 和 Partition
  • 数据操作:插入、查询、删除向量数据
  • 索引管理:创建和监控索引构建进度
  • 系统监控:查看系统状态、节点健康度、性能指标
  • 向量搜索:可视化执行相似度搜索
  • 用户管理:管理用户和权限(RBAC)

安装方式

# Docker 安装(推荐)
docker run -p 8000:3000 -e MILVUS_URL={milvus_server_ip}:19530 zilliz/attu:latest

# 访问地址
http://localhost:8000

Attu 界面说明

使用示例

  1. 创建集合

    • 点击"创建集合"按钮
    • 输入集合名称和描述
    • 定义字段(向量维度、数据类型等)
    • 选择索引类型和参数
    • 点击"创建"
  2. 插入数据

    • 选择目标集合
    • 点击"插入数据"
    • 上传 JSON/CSV 文件或手动输入
    • 确认数据格式正确后提交
  3. 执行搜索

    • 进入"向量搜索"页面
    • 选择集合和索引
    • 输入查询向量或上传查询文件
    • 设置搜索参数(topk、metric_type 等)
    • 执行搜索并查看结果
  4. 监控系统

    • 查看"概览"页面的系统状态
    • 监控 CPU、内存、磁盘使用率
    • 查看查询 QPS 和延迟指标
    • 检查节点健康状态

4.8 命令行工具 Milvus CLI

Milvus CLI 是一个命令行工具,用于通过终端与 Milvus 进行交互。

安装方式

# pip 安装
pip install milvus-cli

# 或者从源码安装
git clone https://github.com/zilliztech/milvus-cli.git
cd milvus-cli
pip install -e .

基本命令

# 启动 CLI
milvus-cli

# 或者直接执行命令
milvus-cli <command>

连接管理

# 连接到 Milvus 服务器
milvus_cli > connect -h localhost -p 19530 -a default

# 使用用户名密码连接
milvus_cli > connect -h localhost -p 19530 -u root -w password

# 查看连接状态
milvus_cli > connection

# 断开连接
milvus_cli > exit

集合操作

# 列出所有集合
milvus_cli > list collections

# 创建集合
milvus_cli > create collection -c my_collection -f id:INT64:primary -f vector:FLOAT_VECTOR:128 -d "My collection"

# 查看集合详情
milvus_cli > describe collection -c my_collection

# 删除集合
milvus_cli > delete collection -c my_collection

# 加载集合到内存
milvus_cli > load collection -c my_collection

# 从内存释放集合
milvus_cli > release collection -c my_collection

数据操作

# 插入数据
milvus_cli > insert -c my_collection -d '[{"id": 1, "vector": [0.1, 0.2, ...]}]'

# 查询数据
milvus_cli > query -c my_collection -e "id in [1, 2, 3]"

# 删除数据
milvus_cli > delete -c my_collection -e "id in [1, 2, 3]"

# 搜索相似向量
milvus_cli > search -c my_collection -v "[0.1, 0.2, ...]" -k 10

索引管理

# 创建索引
milvus_cli > create index -c my_collection -f vector -i HNSW -p "{\"M\": 16, \"efConstruction\": 200}"

# 查看索引信息
milvus_cli > describe index -c my_collection

# 删除索引
milvus_cli > delete index -c my_collection -f vector

分区管理

# 列出分区
milvus_cli > list partitions -c my_collection

# 创建分区
milvus_cli > create partition -c my_collection -p partition_2024

# 删除分区
milvus_cli > delete partition -c my_collection -p partition_2024

导入导出

# 从文件导入数据
milvus_cli > import -c my_collection -f data.json

# 导出数据到文件
milvus_cli > export -c my_collection -f output.json -e "id >= 0"

用户和权限管理(企业版)

# 列出用户
milvus_cli > list users

# 创建用户
milvus_cli > create user -u new_user -w password123

# 删除用户
milvus_cli > delete user -u new_user

# 修改密码
milvus_cli > update password -u new_user -o old_password -n new_password

# 创建角色
milvus_cli > create role -r data_reader

# 授予权限
milvus_cli > grant privilege -r data_reader -o Collection -p Search -c "*"

# 分配角色给用户
milvus_cli > assign role -r data_reader -u new_user

脚本示例:批量创建集合

#!/bin/bash
# create_collections.sh - 批量创建集合

collections=("image_vectors" "text_vectors" "audio_vectors")
dims=(512 384 256)

for i in "${!collections[@]}"; do
    collection="${collections[$i]}"
    dim="${dims[$i]}"
    
    echo "创建集合: $collection (维度: $dim)"
    milvus-cli create collection \
        -c "$collection" \
        -f "id:INT64:primary" \
        -f "vector:FLOAT_VECTOR:$dim" \
        -f "create_time:TIMESTAMP" \
        -d "Collection for $collection"
    
    echo "创建索引..."
    milvus-cli create index \
        -c "$collection" \
        -f "vector" \
        -i "HNSW" \
        -p '{"M": 16, "efConstruction": 200}'
done

echo "所有集合创建完成!"

CLI 与 Attu 对比

特性 Milvus CLI Attu
使用方式 命令行 Web 界面
适用场景 自动化脚本、批量操作 可视化监控、交互式操作
学习曲线 需要记忆命令 直观易用
功能覆盖 完整 完整
批量操作 适合 适合
实时监控 有限 优秀

4.9 批量操作 API

批量插入

# 批量插入数据
batch_size = 1000
for i in range(0, total_data_size, batch_size):
    batch_data = data[i:i+batch_size]
    collection.insert(batch_data)

# 批量插入建议
# 批次大小:1000-10000
# 并发控制:避免同时插入过多批次

批量搜索

# 批量搜索
query_vectors = [[random.random() for _ in range(128)] for _ in range(10)]

results = collection.search(
    data=query_vectors,
    anns_field="vector",
    param={"metric_type": "L2", "params": {"nprobe": 10}},
    limit=10
)

# 处理批量搜索结果
for i, query_result in enumerate(results):
    print(f"Query {i+1} results:")
    for j, result in enumerate(query_result):
        print(f"  Rank {j+1}: ID={result.id}, Distance={result.distance}")

4.10 Docker 部署 Milvus

Docker 是部署 Milvus 最简单、最快速的方式,适合开发测试环境和快速原型验证。

4.10.1 环境准备

系统要求

  • Docker Engine 19.03 或更高版本
  • Docker Compose 1.25.1 或更高版本
  • 至少 8GB 内存
  • 至少 50GB 磁盘空间

安装 Docker

# Ubuntu/Debian
curl -fsSL https://get.docker.com | sh
sudo usermod -aG docker $USER

# macOS
brew install --cask docker

# Windows
# 下载 Docker Desktop 并安装
4.10.2 单机部署(Standalone)

下载配置文件

# 创建 Milvus 工作目录
mkdir -p milvus-docker
cd milvus-docker

# 下载官方配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml -O docker-compose.yml

docker-compose.yml 配置说明

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.3
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - etcd
      - minio

启动 Milvus

# 启动服务
docker-compose up -d

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f standalone

# 停止服务
docker-compose down

# 停止并删除数据卷
docker-compose down -v

验证部署

from pymilvus import connections, utility

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 检查版本
print(f"Milvus 版本: {utility.get_server_version()}")

# 检查服务状态
print(f"服务器状态: {utility.get_server_status()}")
4.10.3 集群部署(Cluster)

集群架构

┌───────────────────────────────────────────────────────────────────────┐
│                         Docker 集群部署                              │
├───────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐               │
│  │   Proxy     │    │   Proxy     │    │   Proxy     │               │
│  │   x 2       │    │             │    │             │               │
│  └──────┬──────┘    └─────────────┘    └─────────────┘               │
│         │                                                            │
│  ┌──────┴──────────────────────────────────────────────────────┐     │
│  │                      Coordinator                            │     │
│  │  (Root Coord, Query Coord, Data Coord, Index Coord)        │     │
│  └─────────────────────────────────────────────────────────────┘     │
│         │                                                            │
│  ┌──────┴──────────────────────────────────────────────────────┐     │
│  │                      Worker Nodes                           │     │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │     │
│  │  │ Query Node  │  │  Data Node  │  │ Index Node  │         │     │
│  │  │   x 2       │  │    x 2      │  │    x 2      │         │     │
│  │  └─────────────┘  └─────────────┘  └─────────────┘         │     │
│  └─────────────────────────────────────────────────────────────┘     │
│         │                                                            │
│  ┌──────┴──────────────────────────────────────────────────────┐     │
│  │                      Storage Layer                          │     │
│  │  ┌─────────────┐  ┌─────────────┐                           │     │
│  │  │    etcd     │  │    MinIO    │                           │     │
│  │  │  (Meta)     │  │  (Object)   │                           │     │
│  │  └─────────────┘  └─────────────┘                           │     │
│  └─────────────────────────────────────────────────────────────┘     │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

下载集群配置文件

# 下载集群配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-cluster-docker-compose.yml -O docker-compose-cluster.yml

启动集群

# 使用集群配置文件启动
docker-compose -f docker-compose-cluster.yml up -d

# 查看所有服务
docker-compose -f docker-compose-cluster.yml ps

# 查看特定服务日志
docker-compose -f docker-compose-cluster.yml logs -f proxy
4.10.4 自定义配置

修改资源限制

services:
  standalone:
    deploy:
      resources:
        limits:
          cpus: '4.0'
          memory: 8G
        reservations:
          cpus: '2.0'
          memory: 4G

持久化存储配置

services:
  standalone:
    volumes:
      # 使用命名卷
      - milvus-data:/var/lib/milvus
      # 或使用本地路径
      - /data/milvus:/var/lib/milvus
volumes:
  milvus-data:
    driver: local

网络配置

services:
  standalone:
    networks:
      - milvus-network
      - external-network

networks:
  milvus-network:
    driver: bridge
  external-network:
    external: true
4.10.5 常用操作命令

容器管理

# 查看运行中的容器
docker ps | grep milvus

# 进入 Milvus 容器
docker exec -it milvus-standalone bash

# 查看容器资源使用
docker stats milvus-standalone

# 重启服务
docker-compose restart standalone

# 查看容器日志
docker logs -f --tail=100 milvus-standalone

数据备份

# 备份数据卷
docker run --rm -v milvus-docker_milvus-data:/source -v $(pwd)/backup:/backup alpine tar czf /backup/milvus-backup.tar.gz -C /source .

# 恢复数据卷
docker run --rm -v milvus-docker_milvus-data:/target -v $(pwd)/backup:/backup alpine sh -c "rm -rf /target/* && tar xzf /backup/milvus-backup.tar.gz -C /target"

性能调优

services:
  standalone:
    environment:
      # 调整线程池大小
      COMMON_SECURITY_AUTHORIZATIONENABLED: "false"
      # 调整日志级别
      LOG_LEVEL: "info"
    sysctls:
      # 增加文件描述符限制
      - fs.file-max=65536
    ulimits:
      nofile:
        soft: 65536
        hard: 65536
4.10.6 与 Attu 集成

完整部署方案(Milvus + Attu)

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.3
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
    depends_on:
      - etcd
      - minio

  attu:
    container_name: milvus-attu
    image: zilliz/attu:latest
    environment:
      MILVUS_URL: standalone:19530
    ports:
      - "8000:3000"
    depends_on:
      - standalone

启动完整环境

# 启动 Milvus 和 Attu
docker-compose up -d

# 访问 Attu
open http://localhost:8000

# 连接信息
# Milvus Address: standalone:19530
4.10.7 故障排查

常见问题

问题 原因 解决方案
端口冲突 19530 或 9000 被占用 修改 docker-compose.yml 中的端口映射
内存不足 系统内存不足 增加 Docker 内存限制或关闭其他服务
权限错误 数据卷权限问题 检查 volumes 目录权限
连接失败 服务未完全启动 等待 30 秒后重试

排查命令

# 检查服务健康状态
docker-compose ps

# 查看详细日志
docker-compose logs --tail=200

# 检查端口占用
netstat -tuln | grep 19530

# 检查资源使用
docker system df
docker stats

# 清理未使用的资源
docker system prune -a

第五章 数据管理与操作

5.1 数据模型设计

设计原则

  • 向量维度:根据嵌入模型选择合适的维度
  • 标量字段:添加必要的标量字段用于过滤和排序
  • 分区策略:根据数据特点选择合适的分区策略
  • 索引选择:根据查询需求选择合适的索引类型

示例数据模型

字段名 数据类型 描述
id INT64 主键,自增
vector FLOAT_VECTOR 128 维向量
text VARCHAR 原始文本
category VARCHAR 文本分类
create_time TIMESTAMP 创建时间
user_id INT64 用户ID

5.2 向量数据预处理

预处理步骤

  1. 数据清洗:去除噪声和异常值
  2. 特征提取:使用嵌入模型生成向量
  3. 向量归一化:确保向量具有相同的尺度
  4. 数据验证:检查向量维度和质量

归一化示例

import numpy as np

def normalize_vector(vector):
    """
    向量归一化
    
    Args:
        vector: 原始向量
    
    Returns:
        归一化后的向量
    """
    norm = np.linalg.norm(vector)
    if norm == 0:
        return vector
    return vector / norm

# 批量归一化
vectors = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
normalized_vectors = [normalize_vector(v) for v in vectors]

5.3 批量操作与性能

批量操作优化

  • 批量大小:根据内存和网络带宽调整
  • 并发控制:使用线程池或异步操作
  • 批量写入:减少网络往返次数
  • 批量读取:提高查询效率

性能调优

  • 批量插入:每次插入 1000-10000 条数据
  • 批量搜索:每次搜索 10-100 个查询向量
  • 并发数:根据服务器性能调整

5.4 数据备份与恢复

备份策略

  • 定期备份:按计划执行备份
  • 增量备份:只备份变更数据
  • 跨区域备份:提高数据安全性

备份操作

# 创建备份
from pymilvus import utility

utility.create_backup(
    collection_name="text_embeddings",
    backup_name="backup_20240313"
)

# 列出备份
backups = utility.list_backups()

# 恢复备份
utility.restore_backup(
    backup_name="backup_20240313",
    collection_name="text_embeddings_restored"
)

5.5 数据一致性保证

一致性级别

  • 强一致性:写入后立即可读
  • 最终一致性:写入后一段时间内可读

保证机制

  • 事务支持:确保操作的原子性
  • 版本控制:跟踪数据变更
  • 冲突解决:处理并发写入冲突

5.6 数据导入导出

数据导入

# 从文件导入
import csv

# 读取 CSV 文件
with open('data.csv', 'r') as f:
    reader = csv.reader(f)
    next(reader)  # 跳过表头
    data = []
    for row in reader:
        id = int(row[0])
        vector = list(map(float, row[1].split(',')))
        text = row[2]
        data.append([id, vector, text])

# 导入数据
collection.insert(data)

数据导出

# 导出数据
results = collection.query(
    expr="id >= 0",
    output_fields=["id", "vector", "text"]
)

# 保存到文件
import json

with open('exported_data.json', 'w') as f:
    json.dump(results, f)

5.7 实际开发流程

开发流程

  1. 需求分析:明确业务需求和数据特点
  2. 数据准备:收集和预处理数据
  3. 模型选择:选择合适的嵌入模型
  4. 索引设计:选择索引类型和参数
  5. 性能测试:评估系统性能
  6. 部署上线:部署到生产环境
  7. 监控维护:监控系统运行状态

最佳实践

  • 从小规模开始,逐步扩展
  • 定期评估和优化系统性能
  • 建立完善的监控和告警机制
  • 制定数据备份和恢复策略

第六章 性能优化策略

6.1 硬件配置优化

硬件选择

  • CPU:多核心、高主频
  • 内存:足够大的内存,建议至少 32GB
  • 存储:SSD 存储,提高 I/O 性能
  • 网络:高速网络,建议 10Gbps 以上

配置建议

数据规模 CPU 内存 存储
100 万向量 8 核 32GB 100GB SSD
1000 万向量 16 核 64GB 1TB SSD
1 亿向量 32 核 128GB+ 10TB SSD

6.2 查询参数优化

参数调优

  • nprobe:根据查询延迟要求调整

    • 低延迟场景:nprobe = 10-20
    • 高精度场景:nprobe = 50-100
  • ef:HNSW 索引的搜索宽度

    • 低延迟场景:ef = 50-100
    • 高精度场景:ef = 200-300
  • limit:返回结果数量

    • 根据业务需求设置,不要返回过多结果

6.3 并发与缓存优化

并发优化

  • 连接池:使用连接池管理数据库连接
  • 异步操作:使用异步 API 提高并发性能
  • 负载均衡:在集群模式下合理分配请求

缓存优化

  • 内存缓存:Query Node 会自动缓存热数据
  • 客户端缓存:缓存频繁查询的结果
  • TTL 设置:合理设置缓存过期时间

6.4 内存与存储优化

内存优化

  • 索引选择:根据内存情况选择合适的索引类型
  • 数据压缩:使用 IVF_SQ8 或 IVF_PQ 减少内存占用
  • 内存限制:合理设置服务的内存限制

存储优化

  • 压缩存储:启用数据压缩
  • 分层存储:热数据存储在 SSD,冷数据存储在 HDD
  • 清理策略:定期清理过期数据

6.5 监控指标与调优

关键指标

  • 查询延迟:P95、P99 延迟
  • 查询吞吐量:每秒查询数 (QPS)
  • 索引构建时间:索引构建的耗时
  • 内存使用率:内存使用情况
  • 磁盘使用率:磁盘空间使用情况
  • CPU 使用率:CPU 负载情况

监控工具

  • Prometheus:收集监控指标
  • Grafana:可视化监控数据
  • Alertmanager:设置告警规则

6.6 性能压测与评估

压测方法

  1. 准备测试数据:生成符合真实场景的测试数据
  2. 设计测试用例:覆盖不同查询场景
  3. 执行压测:使用压测工具执行测试
  4. 分析结果:分析性能瓶颈
  5. 优化调整:根据测试结果进行优化

压测工具

  • milvus-benchmark:Milvus 官方压测工具
  • Locust:开源压测工具
  • JMeter:功能强大的压测工具

milvus-benchmark 使用示例

# 安装 milvus-benchmark
# pip install milvus-benchmark

# 配置文件 config.yaml
"""
milvus:
  host: localhost
  port: 19530

collection:
  collection_name: benchmark_test
  dim: 128
  index_type: HNSW
  metric_type: L2

load:
  nb: 100000  # 每次插入的向量数量
  total: 1000000  # 总向量数量

search:
  nq: 100  # 查询数量
  topk: 10  # 返回结果数量
  threads: 10  # 并发线程数
"""

# 运行压测
# milvus-benchmark -c config.yaml

# Python 压测脚本示例
import time
import numpy as np
from pymilvus import connections, Collection
from concurrent.futures import ThreadPoolExecutor

def benchmark_search(collection_name, num_queries=1000, concurrency=10):
    """
    压测搜索性能
    
    Args:
        collection_name: 集合名称
        num_queries: 查询次数
        concurrency: 并发数
    
    Returns:
        压测结果统计
    """
    # 连接Milvus
    connections.connect("default", host="localhost", port="19530")
    collection = Collection(collection_name)
    collection.load()
    
    # 生成查询向量
    dim = 128
    query_vectors = [np.random.rand(dim).tolist() for _ in range(num_queries)]
    
    # 记录延迟
    latencies = []
    
    def search_task(vector):
        start_time = time.time()
        collection.search(
            data=[vector],
            anns_field="vector",
            param={"metric_type": "L2", "params": {"ef": 64}},
            limit=10
        )
        end_time = time.time()
        return (end_time - start_time) * 1000  # 转换为毫秒
    
    # 并发执行查询
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=concurrency) as executor:
        latencies = list(executor.map(search_task, query_vectors))
    total_time = time.time() - start_time
    
    # 计算统计指标
    latencies.sort()
    avg_latency = np.mean(latencies)
    p50_latency = np.percentile(latencies, 50)
    p95_latency = np.percentile(latencies, 95)
    p99_latency = np.percentile(latencies, 99)
    qps = num_queries / total_time
    
    print(f"压测结果:")
    print(f"总查询数: {num_queries}")
    print(f"并发数: {concurrency}")
    print(f"总耗时: {total_time:.2f} 秒")
    print(f"QPS: {qps:.2f}")
    print(f"平均延迟: {avg_latency:.2f} ms")
    print(f"P50延迟: {p50_latency:.2f} ms")
    print(f"P95延迟: {p95_latency:.2f} ms")
    print(f"P99延迟: {p99_latency:.2f} ms")
    
    return {
        "qps": qps,
        "avg_latency": avg_latency,
        "p50_latency": p50_latency,
        "p95_latency": p95_latency,
        "p99_latency": p99_latency
    }

# 运行压测
# results = benchmark_search("my_collection", num_queries=1000, concurrency=10)

6.7 冷启动问题解决方案

冷启动挑战

  • 新系统缺乏足够的向量数据
  • 索引质量不高,查询性能差
  • 系统需要时间预热

解决方案

  • 预加载数据:在系统上线前加载足够的测试数据
  • 增量索引:边写入边构建索引
  • 缓存预热:系统启动后执行预热查询
  • 渐进式扩展:从小规模开始,逐步增加数据量

第七章 常见应用场景实现

7.1 图像相似度搜索

实现步骤

  1. 图像预处理:调整大小、归一化
  2. 特征提取:使用 ResNet、CLIP 等模型生成向量
  3. 向量存储:将向量存储到 Milvus
  4. 相似搜索:根据查询图像的向量搜索相似图像

代码示例

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
from PIL import Image
import torch
from torchvision import models, transforms

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建集合
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512),
    FieldSchema(name="image_path", dtype=DataType.VARCHAR, max_length=512)
]
schema = CollectionSchema(fields, "图像向量集合")
collection = Collection("image_embeddings", schema)

# 加载预训练模型
model = models.resnet50(pretrained=True)
model.eval()
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 提取图像特征
def extract_features(image_path):
    """
    提取图像特征
    
    Args:
        image_path: 图像路径
    
    Returns:
        图像特征向量,如果文件不存在则返回None
    """
    try:
        image = Image.open(image_path)
        image = transform(image).unsqueeze(0)
        with torch.no_grad():
            features = model(image)
        return features.squeeze().numpy().tolist()
    except FileNotFoundError:
        print(f"错误:图像文件 {image_path} 不存在")
        return None
    except Exception as e:
        print(f"处理图像时发生错误:{e}")
        return None

# 插入图像向量
image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
vectors = [extract_features(path) for path in image_paths]
data = [vectors, image_paths]
collection.insert(data)

# 创建索引
collection.create_index("vector", {
    "index_type": "HNSW",
    "metric_type": "L2",
    "params": {"M": 16, "efConstruction": 200}
})

# 加载集合
collection.load()

# 搜索相似图像
query_image = "query_image.jpg"
query_vector = [extract_features(query_image)]
results = collection.search(
    data=query_vector,
    anns_field="vector",
    param={"metric_type": "L2", "params": {"ef": 100}},
    limit=5,
    output_fields=["image_path"]
)

# 打印结果
print("相似图像:")
for i, result in enumerate(results[0]):
    print(f"Rank {i+1}: {result.entity.get('image_path')}, 距离: {result.distance}")

7.2 文本语义搜索

实现步骤

  1. 文本预处理:分词、去停用词
  2. 语义嵌入:使用 BERT、Sentence-BERT 等模型生成向量
  3. 向量存储:将向量存储到 Milvus
  4. 语义搜索:根据查询文本的向量搜索相似文本

代码示例

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from sentence_transformers import SentenceTransformer

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建集合
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1024),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=128)
]
schema = CollectionSchema(fields, "文本向量集合")
collection = Collection("text_embeddings", schema)

# 加载预训练模型
model = SentenceTransformer('all-MiniLM-L6-v2')

# 生成文本向量
def generate_embedding(text):
    """
    生成文本嵌入向量
    
    Args:
        text: 文本内容
    
    Returns:
        文本嵌入向量
    """
    return model.encode(text).tolist()

# 插入文本向量
texts = [
    "Milvus 是一个开源的向量数据库",
    "向量数据库用于相似度搜索",
    "FastMCP 是一个 MCP 协议的实现",
    "Python 是一种流行的编程语言"
]
categories = ["数据库", "数据库", "编程", "编程"]
vectors = [generate_embedding(text) for text in texts]
data = [vectors, texts, categories]
collection.insert(data)

# 创建索引
collection.create_index("vector", {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {"M": 16, "efConstruction": 200}
})

# 加载集合
collection.load()

# 语义搜索
query_text = "什么是向量数据库"
query_vector = [generate_embedding(query_text)]

# 混合搜索(按类别过滤)
expr = "category == '数据库'"
results = collection.search(
    data=query_vector,
    anns_field="vector",
    param={"metric_type": "COSINE", "params": {"ef": 100}},
    limit=3,
    expr=expr,
    output_fields=["text", "category"]
)

# 打印结果
print("搜索结果:")
for i, result in enumerate(results[0]):
    print(f"Rank {i+1}: 相似度: {result.distance:.4f}")
    print(f"   文本: {result.entity.get('text')}")
    print(f"   类别: {result.entity.get('category')}")

7.3 推荐系统

实现步骤

  1. 用户和物品嵌入:生成用户和物品的向量表示
  2. 向量存储:将向量存储到 Milvus
  3. 相似推荐:根据用户向量搜索相似物品
  4. 协同过滤:结合用户历史行为

代码示例

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
import time

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建物品集合
item_fields = [
    FieldSchema(name="item_id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="item_vector", dtype=DataType.FLOAT_VECTOR, dim=128),
    FieldSchema(name="item_name", dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=128)
]
item_schema = CollectionSchema(item_fields, "物品向量集合")
item_collection = Collection("items", item_schema)

# 创建用户集合
user_fields = [
    FieldSchema(name="user_id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="user_vector", dtype=DataType.FLOAT_VECTOR, dim=128)
]
user_schema = CollectionSchema(user_fields, "用户向量集合")
user_collection = Collection("users", user_schema)

# 生成模拟数据
np.random.seed(42)

# 生成物品数据
item_ids = list(range(1000))
item_vectors = [np.random.rand(128).tolist() for _ in range(1000)]
item_names = [f"Item {i}" for i in range(1000)]
categories = [["Electronics", "Clothing", "Books", "Home"][i % 4] for i in range(1000)]
item_data = [item_ids, item_vectors, item_names, categories]
item_collection.insert(item_data)

# 生成用户数据
user_ids = list(range(100))
user_vectors = [np.random.rand(128).tolist() for _ in range(100)]
user_data = [user_ids, user_vectors]
user_collection.insert(user_data)

# 创建索引
item_collection.create_index("item_vector", {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {"M": 16, "efConstruction": 200}
})

user_collection.create_index("user_vector", {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {"M": 16, "efConstruction": 200}
})

# 加载集合
item_collection.load()
user_collection.load()

# 推荐物品给用户
def recommend_items(user_id, top_k=10):
    """
    为用户推荐物品
    
    Args:
        user_id: 用户ID
        top_k: 推荐数量
    
    Returns:
        推荐物品列表
    """
    # 获取用户向量
    user_result = user_collection.query(
        expr=f"user_id == {user_id}",
        output_fields=["user_vector"]
    )
    
    if not user_result:
        return []
    
    user_vector = user_result[0]["user_vector"]
    
    # 搜索相似物品
    results = item_collection.search(
        data=[user_vector],
        anns_field="item_vector",
        param={"metric_type": "COSINE", "params": {"ef": 100}},
        limit=top_k,
        output_fields=["item_name", "category"]
    )
    
    # 处理推荐结果
    recommendations = []
    for result in results[0]:
        recommendations.append({
            "item_id": result.id,
            "item_name": result.entity.get("item_name"),
            "category": result.entity.get("category"),
            "similarity": result.distance
        })
    
    return recommendations

# 测试推荐
user_id = 1
recommendations = recommend_items(user_id)
print(f"为用户 {user_id} 推荐的物品:")
for i, item in enumerate(recommendations):
    print(f"Rank {i+1}: {item['item_name']} (类别: {item['category']}, 相似度: {item['similarity']:.4f})")

7.4 异常检测

实现步骤

  1. 正常数据嵌入:收集正常数据并生成向量
  2. 向量存储:将向量存储到 Milvus
  3. 异常检测:计算查询向量与正常向量的距离,超过阈值则为异常
    代码示例
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建集合
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=64),
    FieldSchema(name="timestamp", dtype=DataType.TIMESTAMP)
]
schema = CollectionSchema(fields, "异常检测向量集合")
collection = Collection("anomaly_detection", schema)

# 生成正常数据
np.random.seed(42)
normal_data = []
for i in range(1000):
    # 生成正态分布的向量
    vector = np.random.normal(0, 1, 64).tolist()
    timestamp = int(time.time()) - i * 3600
    normal_data.append([vector, timestamp])

# 插入正常数据
collection.insert(normal_data)

# 创建索引
collection.create_index("vector", {
    "index_type": "HNSW",
    "metric_type": "L2",
    "params": {"M": 16, "efConstruction": 200}
})

# 加载集合
collection.load()

# 异常检测
def detect_anomaly(vector, threshold=3.0):
    """
    检测异常
    
    Args:
        vector: 待检测的向量
        threshold: 异常阈值
    
    Returns:
        (是否异常, 最小距离)
    """
    results = collection.search(
        data=[vector],
        anns_field="vector",
        param={"metric_type": "L2", "params": {"ef": 100}},
        limit=1
    )
    
    min_distance = results[0][0].distance
    is_anomaly = min_distance > threshold
    
    return is_anomaly, min_distance

# 测试异常检测
# 正常数据
normal_vector = np.random.normal(0, 1, 64).tolist()
is_anomaly, distance = detect_anomaly(normal_vector)
print(f"正常数据 - 异常: {is_anomaly}, 距离: {distance:.4f}")

# 异常数据
anomaly_vector = np.random.normal(5, 1, 64).tolist()
is_anomaly, distance = detect_anomaly(anomaly_vector)
print(f"异常数据 - 异常: {is_anomaly}, 距离: {distance:.4f}")

7.5 多模态检索

实现步骤

  1. 多模态嵌入:使用 CLIP 等模型生成跨模态向量
  2. 向量存储:将不同模态的向量存储到 Milvus
  3. 跨模态搜索:使用一种模态的向量搜索另一种模态

代码示例

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建集合
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512),
    FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=16),  # "image" or "text"
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=1024)
]
schema = CollectionSchema(fields, "多模态向量集合")
collection = Collection("multimodal", schema)

# 加载 CLIP 模型
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# 生成图像嵌入
def get_image_embedding(image_path):
    """
    生成图像嵌入
    
    Args:
        image_path: 图像路径
    
    Returns:
        图像嵌入向量
    """
    image = Image.open(image_path)
    inputs = processor(images=image, return_tensors="pt")
    with torch.no_grad():
        embedding = model.get_image_features(**inputs)
    return embedding.squeeze().numpy().tolist()

# 生成文本嵌入
def get_text_embedding(text):
    """
    生成文本嵌入
    
    Args:
        text: 文本内容
    
    Returns:
        文本嵌入向量
    """
    inputs = processor(text=text, return_tensors="pt")
    with torch.no_grad():
        embedding = model.get_text_features(**inputs)
    return embedding.squeeze().numpy().tolist()

# 插入数据
# 图像数据
image_paths = ["cat.jpg", "dog.jpg", "car.jpg"]
for path in image_paths:
    vector = get_image_embedding(path)
    collection.insert([[vector], ["image"], [path]])

# 文本数据
texts = ["a cat sitting on a couch", "a dog running in the park", "a red car"]
for text in texts:
    vector = get_text_embedding(text)
    collection.insert([[vector], ["text"], [text]])

# 创建索引
collection.create_index("vector", {
    "index_type": "HNSW",
    "metric_type": "COSINE",
    "params": {"M": 16, "efConstruction": 200}
})

# 加载集合
collection.load()

# 跨模态搜索 - 文本搜索图像
query_text = "a feline animal"
query_vector = [get_text_embedding(query_text)]

results = collection.search(
    data=query_vector,
    anns_field="vector",
    param={"metric_type": "COSINE", "params": {"ef": 100}},
    limit=3,
    expr="type == 'image'",
    output_fields=["content", "type"]
)

print("文本搜索图像结果:")
for i, result in enumerate(results[0]):
    print(f"Rank {i+1}: {result.entity.get('content')}, 相似度: {result.distance:.4f}")

# 跨模态搜索 - 图像搜索文本
query_image = "cat.jpg"
query_vector = [get_image_embedding(query_image)]

results = collection.search(
    data=query_vector,
    anns_field="vector",
    param={"metric_type": "COSINE", "params": {"ef": 100}},
    limit=3,
    expr="type == 'text'",
    output_fields=["content", "type"]
)

print("\n图像搜索文本结果:")
for i, result in enumerate(results[0]):
    print(f"Rank {i+1}: {result.entity.get('content')}, 相似度: {result.distance:.4f}")

7.6 地理空间向量搜索

实现步骤

  1. 地理坐标转换:将经纬度转换为向量
  2. 向量存储:将向量存储到 Milvus
  3. 距离搜索:根据地理距离搜索附近的点

代码示例

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
from math import radians, cos, sin, asin, sqrt

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建集合
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=2),  # 经纬度
    FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=256),
    FieldSchema(name="latitude", dtype=DataType.FLOAT),
    FieldSchema(name="longitude", dtype=DataType.FLOAT)
]
schema = CollectionSchema(fields, "地理空间向量集合")
collection = Collection("geospatial", schema)

# 计算两个经纬度之间的距离(Haversine公式)
def haversine(lon1, lat1, lon2, lat2):
    """
    计算两个经纬度之间的距离
    
    Args:
        lon1, lat1: 第一个点的经纬度
        lon2, lat2: 第二个点的经纬度
    
    Returns:
        距离(公里)
    """
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    r = 6371  # 地球半径(公里)
    return c * r

# 插入数据
locations = [
    {"name": "北京", "lat": 39.9042, "lon": 116.4074},
    {"name": "上海", "lat": 31.2304, "lon": 121.4737},
    {"name": "广州", "lat": 23.1291, "lon": 113.2644},
    {"name": "深圳", "lat": 22.5431, "lon": 114.0579},
    {"name": "杭州", "lat": 30.2741, "lon": 120.1551}
]

data = []
for loc in locations:
    vector = [loc["lon"], loc["lat"]]
    data.append([vector, loc["name"], loc["lat"], loc["lon"]])

collection.insert(data)

# 创建索引
collection.create_index("vector", {
    "index_type": "FLAT",  # 对于地理空间数据,FLAT 索引更准确
    "metric_type": "L2",
    "params": {}
})

# 加载集合
collection.load()

# 搜索附近的地点
def search_nearby(lat, lon, radius=1000):  # radius 单位:公里
    """
    搜索附近的地点
    
    Args:
        lat: 纬度
        lon: 经度
        radius: 搜索半径(公里)
    
    Returns:
        附近的地点列表
    """
    query_vector = [[lon, lat]]
    
    # 先进行向量搜索
    results = collection.search(
        data=query_vector,
        anns_field="vector",
        param={"metric_type": "L2", "params": {}},
        limit=10,
        output_fields=["name", "latitude", "longitude"]
    )
    
    # 过滤距离
    nearby_locations = []
    for result in results[0]:
        loc_lat = result.entity.get("latitude")
        loc_lon = result.entity.get("longitude")
        distance = haversine(lon, lat, loc_lon, loc_lat)
        
        if distance <= radius:
            nearby_locations.append({
                "name": result.entity.get("name"),
                "latitude": loc_lat,
                "longitude": loc_lon,
                "distance": distance
            })
    
    # 按距离排序
    nearby_locations.sort(key=lambda x: x["distance"])
    
    return nearby_locations

# 测试搜索
current_lat = 39.9042  # 北京
current_lon = 116.4074

nearby = search_nearby(current_lat, current_lon, 500)
print(f"北京附近 500 公里内的城市:")
for loc in nearby:
    print(f"{loc['name']}: 距离 {loc['distance']:.2f} 公里")

7.7 向量模型选择与评估

模型选择考量

  • 模型大小:小型模型适合边缘设备,大型模型适合服务器
  • 嵌入维度:维度越高,表达能力越强,但存储和计算成本也越高
  • 推理速度:实时应用需要快速的推理速度
  • 领域适配:特定领域的模型效果更好

常用模型

  • 文本:BERT、Sentence-BERT、GPT、CLIP
  • 图像:ResNet、ViT、CLIP、DINO
  • 音频:VGGish、Whisper、Wav2Vec2

模型评估

  • 指标:Recall@k、Precision@k、F1 Score
  • 数据集:使用领域相关的数据集
  • 测试方法:构建测试集,计算评估指标

第八章 监控与运维

8.1 监控指标体系

核心指标

  • 查询指标:查询延迟(P95、P99)、查询吞吐量(QPS)、查询错误率、
  • 写入指标:写入吞吐量、写入延迟、写入错误率
  • 系统指标:CPU 使用率、内存使用率、磁盘使用率、网络流量
  • 索引指标:索引构建时间、索引大小、索引质量

8.2 常见故障排查

常见问题

问题 可能原因 解决方案
查询超时 索引未加载、参数设置不当 加载索引、调整查询参数
写入失败 内存不足、磁盘空间不足 增加内存、清理磁盘空间
服务崩溃 内存溢出、硬件故障 增加内存、检查硬件
索引构建失败 数据量过大、内存不足 分批构建、增加内存
连接失败 网络问题、服务未启动 检查网络、启动服务

排查步骤:1. 查看日志文件;2. 检查系统资源;3. 验证网络连接;4. 检查配置文件;5. 测试基本功能;6. 分析性能指标

8.3 常见问题与解决方案

Q: Milvus 服务启动失败怎么办?
A: 检查端口是否被占用,检查配置文件是否正确,查看日志文件了解具体错误原因。

Q: 查询速度慢怎么办?
A: 检查索引是否已加载,调整查询参数(如 nprobe、ef),考虑使用更适合的索引类型。

Q: 内存使用过高怎么办?
A: 调整索引类型(如使用 IVF_SQ8),限制加载的集合数量,增加服务器内存。

Q: 数据导入失败怎么办?
A: 检查数据格式是否正确,检查服务器资源是否充足,尝试分批导入。

Q: 如何提高写入性能?
A: 使用批量写入,调整批量大小,增加 Data Node 数量。

第九章 生产环境最佳实践

9.1 部署架构设计

单节点部署

  • 适用场景:开发、测试、小规模生产
  • 配置:8 核 CPU、32GB 内存、100GB SSD
  • 优势:部署简单,维护成本低
  • 劣势:不具备高可用性

集群部署

  • 适用场景:大规模生产环境
  • 配置
    • Proxy:2-4 节点
    • Query Node:4-8 节点
    • Data Node:2-4 节点
    • Index Node:2-4 节点
  • 优势:高可用性,可扩展性强
  • 劣势:部署复杂,维护成本高

云服务部署

  • 适用场景:快速部署,按需扩展
  • 选项
    • Milvus Cloud
    • 云厂商托管服务
    • 容器服务(EKS、GKE、ACK)
  • 优势:无需维护基础设施,按需付费
  • 劣势:成本较高,定制性受限

9.2 安全配置

import os
from pymilvus import connections

# 从环境变量读取敏感信息
MILVUS_HOST = os.environ.get('MILVUS_HOST', 'localhost')
MILVUS_PORT = os.environ.get('MILVUS_PORT', '19530')
MILVUS_USER = os.environ.get('MILVUS_USER', 'root')
MILVUS_PASSWORD = os.environ.get('MILVUS_PASSWORD', '')

# 使用TLS/SSL连接
connections.connect(
    alias="default",
    host=MILVUS_HOST,
    port=MILVUS_PORT,
    user=MILVUS_USER,
    password=MILVUS_PASSWORD,
    secure=True,  # 启用TLS
    server_pem_path="/path/to/server.pem",  # 服务器证书路径
    server_name="localhost"  # 服务器名称
)

# RBAC权限管理示例
from pymilvus import utility

# 创建角色
utility.create_role("data_reader")

# 授予权限
utility.grant_privilege("data_reader", "Collection", "Search", "*")
utility.grant_privilege("data_reader", "Collection", "Query", "*")

# 创建用户
utility.create_user("analyst", "secure_password_123")

# 将角色分配给用户
utility.add_user_to_role("analyst", "data_reader")

# 验证用户权限
connections.connect(
    alias="secure_connection",
    host=MILVUS_HOST,
    port=MILVUS_PORT,
    user="analyst",
    password="secure_password_123"
)

Docker Compose安全配置示例

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
    # 安全:限制网络访问
    networks:
      - milvus-internal

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: ${MINIO_ACCESS_KEY:-minioadmin}
      MINIO_SECRET_KEY: ${MINIO_SECRET_KEY:-minioadmin}
    # 安全:使用强密码
    command: minio server /minio_data --console-address ":9001"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    networks:
      - milvus-internal

  milvus:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.3
    command: ["milvus", "run", "standalone"]
    security_opt:
      - seccomp:unconfined
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
      # 安全:启用认证
      COMMON_SECURITY_AUTHORIZATIONENABLED: "true"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
      - "9091:9091"
    networks:
      - milvus-internal
      - milvus-external
    # 安全:资源限制
    deploy:
      resources:
        limits:
          cpus: '4.0'
          memory: 8G
        reservations:
          cpus: '2.0'
          memory: 4G

networks:
  milvus-internal:
    internal: true  # 内部网络,不暴露到外部
  milvus-external:
    driver: bridge
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐