Milvus详解
第一章 Milvus 核心概念与基础
1.1 向量数据库的概念与价值
向量数据库是一种专门用于存储和检索向量数据的数据库系统。与传统关系型数据库不同,向量数据库能够高效处理高维向量数据,并支持相似度搜索。
核心价值:
- 相似度搜索:快速找到与查询向量最相似的向量
- 高维数据处理:高效处理数百甚至数千维的向量数据
- 实时性:支持毫秒级的查询响应
- 可扩展性:能够处理大规模向量数据集
1.2 Milvus 定位与特点
Milvus 是一个开源的向量数据库,专为 AI 应用场景设计,提供高性能的向量相似度搜索服务。
主要特点:
- 高性能:采用先进的索引技术,支持每秒百万级的查询
- 可扩展:支持水平扩展,轻松处理大规模数据集
- 多语言支持:提供 Python、Java、Go、C++ 等多语言 SDK
- 丰富的索引类型:支持 IVF、HNSW 等多种索引类型
- 混合检索:支持向量与标量的混合查询
1.3 核心术语解释
| 术语 | 解释 |
|---|---|
| 向量 | 由多个数值组成的数组,用于表示数据的特征 |
| 维度 | 向量中元素的数量,如 128 维、256 维等 |
| 距离度量 | 衡量两个向量相似度的方法,如欧氏距离、余弦相似度等 |
| 索引 | 用于加速向量搜索的数据结构 |
| Collection | Milvus 中的数据集合,类似于传统数据库中的表 |
| Partition | Collection 的分区,用于数据管理和查询优化 |
| Embedding | 将非结构化数据转换为向量的过程 |
1.4 向量计算数学基础
距离度量方法:
-
欧氏距离 (Euclidean Distance)
- 计算两个向量之间的直线距离
- 适用场景:向量空间为欧几里得空间的情况
-
余弦相似度 (Cosine Similarity)
- 计算两个向量的夹角余弦值
- 适用场景:关注方向而非大小的场景,如文本相似度
-
曼哈顿距离 (Manhattan Distance)
- 计算两个向量对应维度差的绝对值之和
- 适用场景:高维稀疏向量
1.5 向量嵌入(Embedding)技术
向量嵌入是将非结构化数据(如文本、图像、音频)转换为数值向量的过程。常用的嵌入模型包括:
- 文本嵌入:BERT、Sentence-BERT、GPT 等
- 图像嵌入:ResNet、CLIP、ViT 等
- 音频嵌入:VGGish、Whisper 等
嵌入模型选择考量:
- 模型大小与性能
- 嵌入维度
- 计算资源需求
- 特定领域的表现
1.6 与传统数据库的对比
| 特性 | 传统关系型数据库 | 向量数据库 |
|---|---|---|
| 数据类型 | 结构化数据 | 高维向量 |
| 查询类型 | 精确匹配 | 相似度搜索 |
| 索引结构 | B树、哈希等 | IVF、HNSW等 |
| 适用场景 | 事务处理、报表 | 推荐系统、图像搜索 |
| 扩展性 | 垂直扩展为主 | 水平扩展为主 |
第二章 Milvus 架构设计与组件
2.1 整体架构 overview
Milvus 是一个开源的向量数据库,其架构设计为分布式、高性能的向量检索系统。以下是其核心组件和架构图的分解说明:
核心组件分层
接入层(Access Layer)
- Proxy:接收客户端请求,负责路由、负载均衡和查询协调。
- SDK/API:提供多种语言的客户端接口(Python、Java、Go等),支持 REST 和 gRPC 协议。
协调服务层(Coordinator Service)
- Root Coordinator:管理元数据(集合、分区、索引等)和全局时间戳分配。
- Query Coordinator:协调查询任务,优化执行计划。
- Data Coordinator:管理数据节点的数据分布与均衡。
- Index Coordinator:协调索引构建与更新。
执行层(Worker Node)
- Query Node:执行向量检索和标量过滤,支持近实时搜索。
- Data Node:处理数据插入、删除和持久化,写入日志并同步到对象存储。
- Index Node:构建和管理向量索引(如 IVF、HNSW、ANNOY)。
存储层(Storage)
- 对象存储(Object Storage):持久化存储原始向量和日志(如 S3、MinIO)。
- 日志存储(Log Broker):流式日志队列(如 Kafka、Pulsar),用于实时数据同步。
- 元数据存储(Meta Store):存储系统元数据(如 etcd、MySQL)。
数据流与协作
-
写入流程
客户端通过 Proxy 发送写入请求,Data Node 将数据写入日志存储并同步到对象存储。Index Node 异步构建索引。 -
查询流程
Query Node 从对象存储加载数据,结合索引快速检索,通过 Proxy 返回结果。
可视化架构图
以下是文字描述的架构图关键点:
> 客户端 → Proxy → Coordinator Service
> ↓
> Query/Data/Index Nodes
> ↓
> Log Broker → Object Storage
> ↑
> Meta Store (etcd/MySQL)
扩展性设计
- 分片(Sharding):数据水平分片,支持横向扩展。
- 读写分离:Query Node 和 Data Node 独立扩展,适应不同负载。
- 多云支持:存储层兼容主流对象存储服务。
2.2 核心组件详解
2.2.1 Proxy
- 功能:接收客户端请求,进行负载均衡,分发到相应的节点
- 特点:无状态设计,可水平扩展
- 作用:作为系统的入口点,处理所有客户端请求
2.2.2 Query Node
- 功能:处理向量搜索请求
- 特点:缓存索引数据,提供快速查询
- 作用:执行相似度搜索,返回查询结果
2.2.3 Data Node
- 功能:处理数据写入请求
- 特点:批量处理写入操作,提高性能
- 作用:将数据持久化到存储层,构建索引
2.2.4 Index Node
- 功能:构建和维护向量索引
- 特点:后台异步处理,不影响查询性能
- 作用:优化查询性能,加速相似度搜索
2.3 存储架构
Milvus 使用分层存储架构,包括:
- 元数据存储:使用 etcd 存储元数据,如 Collection、Partition 信息
- 对象存储:使用 S3、MinIO 等存储原始数据和索引文件
- 缓存:Query Node 本地缓存热数据,提高查询性能
2.4 集群部署模式
部署模式:
- 单机部署:适用于开发和测试环境
- 集群部署:适用于生产环境,提供高可用性和可扩展性
集群配置:
- Proxy 集群:多个 Proxy 节点,实现负载均衡
- Query Node 集群:多个 Query Node,提高查询吞吐量
- Data Node 集群:多个 Data Node,提高写入吞吐量
- Index Node 集群:多个 Index Node,加速索引构建
2.5 高可用性设计
- 数据冗余:数据多副本存储
- 节点故障自动恢复:当节点故障时,其他节点接管其工作
- 负载均衡:自动将请求分发到健康节点
- 监控告警:实时监控系统状态,及时发现问题
第三章 向量索引原理与实现
3.1 索引类型与适用场景
| 索引类型 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| FLAT | 小数据集,追求精确性 | 100% 召回率 | 查询速度慢 |
| IVF_FLAT | 中大型数据集 | 平衡速度和精度 | 需要调优 nlist 参数 |
| IVF_SQ8 | 内存有限场景 | 内存占用小 | 精度略有损失 |
| IVF_PQ | 超大规模数据集 | 内存占用极小 | 精度损失较大 |
| HNSW | 追求查询速度 | 查询速度快 | 索引构建慢,内存占用大 |
| DISKANN | 超大规模数据集 | 存储效率高 | 查询速度相对较慢 |
3.2 IVF 系列索引原理与参数
IVF (Inverted File) 原理:
- 将向量空间划分为多个聚类(cluster)
- 为每个向量分配到最近的聚类中心
- 查询时,只搜索最相似的几个聚类
核心参数:
-
nlist:聚类数量
- 取值范围:16 - 65536
- 推荐值:数据量的平方根
- 作用:影响索引精度和查询速度
-
nprobe:查询时搜索的聚类数量
- 取值范围:1 - nlist
- 推荐值:16 - 128
- 作用:nprobe 越大,精度越高,但速度越慢
3.3 HNSW 索引原理与参数
HNSW (Hierarchical Navigable Small World) 原理:
- 构建多层图结构
- 每层都是一个近似最近邻图
- 高层图作为快速导航,低层图提供精确结果
- 查询时从顶层开始,逐层细化搜索
核心参数:
-
M:每个节点的最大邻居数
- 取值范围:4 - 64
- 推荐值:16 - 32
- 作用:M 越大,索引质量越高,但内存占用越大
-
efConstruction:索引构建时的搜索宽度
- 取值范围:100 - 2000
- 推荐值:200 - 400
- 作用:影响索引构建质量和速度
-
ef:查询时的搜索宽度
- 取值范围:10 - 1000
- 推荐值:50 - 200
- 作用:ef 越大,查询精度越高,但速度越慢
3.4 其他索引类型
3.4.1 ANNOY
- 基于树结构的索引
- 适合中小规模数据集
- 支持余弦相似度
3.4.2 NGT
- 基于图结构的索引
- 平衡了查询速度和内存占用
- 适合中等规模数据集
3.5 索引构建与优化策略
索引构建流程:
- 数据准备:收集和预处理向量数据
- 选择索引类型:根据数据特点和查询需求
- 配置索引参数:根据数据规模和性能要求
- 执行索引构建:后台异步处理
- 验证索引质量:评估查询性能和精度
优化策略:
- 批量构建:批量处理提高构建速度
- 增量更新:支持新数据的增量索引
- 并行构建:利用多线程加速索引构建
- 索引压缩:减少内存占用
3.6 索引参数调优指南
调优步骤:
- 确定目标:明确是追求速度还是精度
- 基准测试:使用默认参数进行测试
- 参数调整:根据测试结果调整参数
- 性能评估:评估调整后的性能
- 持续优化:根据实际使用情况持续调整
调优建议:
- 对于 IVF 索引,nlist 一般设置为数据量的平方根
- 对于 HNSW 索引,M 一般设置为 16-32,efConstruction 设置为 200-400
- 查询参数 nprobe 和 ef 应根据延迟要求进行调整
3.7 索引评估指标
评估指标:
- Recall@k:所有相关结果中,被检索到的比例(召回率)
- Precision@k:前 k 个结果中,相关结果的比例(精确率)
- F1 Score:Recall 和 Precision 的调和平均
- 查询延迟:从查询到结果返回的时间
- 吞吐量:每秒处理的查询数
评估方法:
- 准备测试数据集和查询集
- 构建不同参数的索引
- 执行查询并记录结果
- 计算评估指标
- 分析结果并选择最优参数
第四章 Milvus 核心 API 与 SDK
4.1 连接与配置
Python SDK 示例:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
# 连接到 Milvus
connections.connect(
alias="default",
host="localhost",
port="19530"
)
# 连接参数说明
# host: Milvus 服务地址
# port: Milvus 服务端口
# alias: 连接别名,用于后续操作
配置选项:
- timeout:连接超时时间
- retry:连接失败后的重试次数
- secure:是否使用安全连接
4.2 集合(Collection)操作
创建集合:
# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512)
]
# 创建集合 schema
schema = CollectionSchema(fields, description="文本向量集合")
# 创建集合
collection = Collection(name="text_embeddings", schema=schema)
# 集合参数说明
# name: 集合名称
# schema: 集合 schema
# shards_num: 分片数量(集群模式)
集合操作:
- load:将集合加载到内存
- release:从内存中释放集合
- drop:删除集合
- describe:查看集合信息
- has_collection:检查集合是否存在
4.3 分区(Partition)管理
创建分区:
# 创建分区
collection.create_partition(partition_name="partition_2024")
# 分区操作
# 插入数据到指定分区
collection.insert(data, partition_name="partition_2024")
# 删除分区
collection.drop_partition(partition_name="partition_2024")
# 列出所有分区
partitions = collection.list_partitions()
分区策略:
- 时间分区:按时间范围分区
- 地域分区:按地理位置分区
- 业务分区:按业务类型分区
4.4 向量插入与删除
插入数据:
# 准备数据
import random
# 生成 1000 条数据
ids = [i for i in range(1000)]
vectors = [[random.random() for _ in range(128)] for _ in range(1000)]
texts = [f"Text {i}" for i in range(1000)]
data = [ids, vectors, texts]
# 插入数据
collection.insert(data)
# 插入参数说明
# data: 数据列表,顺序与字段定义一致
# partition_name: 目标分区名称
删除数据:
# 根据主键删除
collection.delete("id in [1, 2, 3]")
# 删除条件
# 支持 in、and、or 等操作符
4.5 相似度搜索 API
向量搜索:
# 准备查询向量
query_vector = [[random.random() for _ in range(128)]]
# 执行搜索
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10,
expr=None,
output_fields=["text"]
)
# 搜索参数说明
# data: 查询向量
# anns_field: 向量字段名
# param: 搜索参数
# limit: 返回结果数量
# expr: 标量过滤条件
# output_fields: 需要返回的字段
# 处理搜索结果
for i, result in enumerate(results[0]):
print(f"Rank {i+1}: ID={result.id}, Distance={result.distance}, Text={result.entity.get('text')}")
搜索参数:
- metric_type:距离度量类型(L2、IP、COSINE)
- nprobe:搜索的聚类数量(IVF 索引)
- ef:搜索宽度(HNSW 索引)
4.6 混合检索(标量+向量)
混合搜索:
# 标量过滤条件
expr = "text like '%important%'"
# 执行混合搜索
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10,
expr=expr,
output_fields=["text"]
)
# 过滤条件语法
# 支持 like、in、>、<、= 等操作符
# 支持 and、or、not 逻辑操作
4.7 可视化管理系统 Attu
Attu 是 Milvus 的官方图形化管理工具,提供直观的 Web 界面来管理和监控 Milvus 集群。
主要功能:
- 集合管理:创建、删除、查看 Collection 和 Partition
- 数据操作:插入、查询、删除向量数据
- 索引管理:创建和监控索引构建进度
- 系统监控:查看系统状态、节点健康度、性能指标
- 向量搜索:可视化执行相似度搜索
- 用户管理:管理用户和权限(RBAC)
安装方式:
# Docker 安装(推荐)
docker run -p 8000:3000 -e MILVUS_URL={milvus_server_ip}:19530 zilliz/attu:latest
# 访问地址
http://localhost:8000
Attu 界面说明:
使用示例:
-
创建集合:
- 点击"创建集合"按钮
- 输入集合名称和描述
- 定义字段(向量维度、数据类型等)
- 选择索引类型和参数
- 点击"创建"
-
插入数据:
- 选择目标集合
- 点击"插入数据"
- 上传 JSON/CSV 文件或手动输入
- 确认数据格式正确后提交
-
执行搜索:
- 进入"向量搜索"页面
- 选择集合和索引
- 输入查询向量或上传查询文件
- 设置搜索参数(topk、metric_type 等)
- 执行搜索并查看结果
-
监控系统:
- 查看"概览"页面的系统状态
- 监控 CPU、内存、磁盘使用率
- 查看查询 QPS 和延迟指标
- 检查节点健康状态
4.8 命令行工具 Milvus CLI
Milvus CLI 是一个命令行工具,用于通过终端与 Milvus 进行交互。
安装方式:
# pip 安装
pip install milvus-cli
# 或者从源码安装
git clone https://github.com/zilliztech/milvus-cli.git
cd milvus-cli
pip install -e .
基本命令:
# 启动 CLI
milvus-cli
# 或者直接执行命令
milvus-cli <command>
连接管理:
# 连接到 Milvus 服务器
milvus_cli > connect -h localhost -p 19530 -a default
# 使用用户名密码连接
milvus_cli > connect -h localhost -p 19530 -u root -w password
# 查看连接状态
milvus_cli > connection
# 断开连接
milvus_cli > exit
集合操作:
# 列出所有集合
milvus_cli > list collections
# 创建集合
milvus_cli > create collection -c my_collection -f id:INT64:primary -f vector:FLOAT_VECTOR:128 -d "My collection"
# 查看集合详情
milvus_cli > describe collection -c my_collection
# 删除集合
milvus_cli > delete collection -c my_collection
# 加载集合到内存
milvus_cli > load collection -c my_collection
# 从内存释放集合
milvus_cli > release collection -c my_collection
数据操作:
# 插入数据
milvus_cli > insert -c my_collection -d '[{"id": 1, "vector": [0.1, 0.2, ...]}]'
# 查询数据
milvus_cli > query -c my_collection -e "id in [1, 2, 3]"
# 删除数据
milvus_cli > delete -c my_collection -e "id in [1, 2, 3]"
# 搜索相似向量
milvus_cli > search -c my_collection -v "[0.1, 0.2, ...]" -k 10
索引管理:
# 创建索引
milvus_cli > create index -c my_collection -f vector -i HNSW -p "{\"M\": 16, \"efConstruction\": 200}"
# 查看索引信息
milvus_cli > describe index -c my_collection
# 删除索引
milvus_cli > delete index -c my_collection -f vector
分区管理:
# 列出分区
milvus_cli > list partitions -c my_collection
# 创建分区
milvus_cli > create partition -c my_collection -p partition_2024
# 删除分区
milvus_cli > delete partition -c my_collection -p partition_2024
导入导出:
# 从文件导入数据
milvus_cli > import -c my_collection -f data.json
# 导出数据到文件
milvus_cli > export -c my_collection -f output.json -e "id >= 0"
用户和权限管理(企业版):
# 列出用户
milvus_cli > list users
# 创建用户
milvus_cli > create user -u new_user -w password123
# 删除用户
milvus_cli > delete user -u new_user
# 修改密码
milvus_cli > update password -u new_user -o old_password -n new_password
# 创建角色
milvus_cli > create role -r data_reader
# 授予权限
milvus_cli > grant privilege -r data_reader -o Collection -p Search -c "*"
# 分配角色给用户
milvus_cli > assign role -r data_reader -u new_user
脚本示例:批量创建集合
#!/bin/bash
# create_collections.sh - 批量创建集合
collections=("image_vectors" "text_vectors" "audio_vectors")
dims=(512 384 256)
for i in "${!collections[@]}"; do
collection="${collections[$i]}"
dim="${dims[$i]}"
echo "创建集合: $collection (维度: $dim)"
milvus-cli create collection \
-c "$collection" \
-f "id:INT64:primary" \
-f "vector:FLOAT_VECTOR:$dim" \
-f "create_time:TIMESTAMP" \
-d "Collection for $collection"
echo "创建索引..."
milvus-cli create index \
-c "$collection" \
-f "vector" \
-i "HNSW" \
-p '{"M": 16, "efConstruction": 200}'
done
echo "所有集合创建完成!"
CLI 与 Attu 对比:
| 特性 | Milvus CLI | Attu |
|---|---|---|
| 使用方式 | 命令行 | Web 界面 |
| 适用场景 | 自动化脚本、批量操作 | 可视化监控、交互式操作 |
| 学习曲线 | 需要记忆命令 | 直观易用 |
| 功能覆盖 | 完整 | 完整 |
| 批量操作 | 适合 | 适合 |
| 实时监控 | 有限 | 优秀 |
4.9 批量操作 API
批量插入:
# 批量插入数据
batch_size = 1000
for i in range(0, total_data_size, batch_size):
batch_data = data[i:i+batch_size]
collection.insert(batch_data)
# 批量插入建议
# 批次大小:1000-10000
# 并发控制:避免同时插入过多批次
批量搜索:
# 批量搜索
query_vectors = [[random.random() for _ in range(128)] for _ in range(10)]
results = collection.search(
data=query_vectors,
anns_field="vector",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=10
)
# 处理批量搜索结果
for i, query_result in enumerate(results):
print(f"Query {i+1} results:")
for j, result in enumerate(query_result):
print(f" Rank {j+1}: ID={result.id}, Distance={result.distance}")
4.10 Docker 部署 Milvus
Docker 是部署 Milvus 最简单、最快速的方式,适合开发测试环境和快速原型验证。
4.10.1 环境准备
系统要求:
- Docker Engine 19.03 或更高版本
- Docker Compose 1.25.1 或更高版本
- 至少 8GB 内存
- 至少 50GB 磁盘空间
安装 Docker:
# Ubuntu/Debian
curl -fsSL https://get.docker.com | sh
sudo usermod -aG docker $USER
# macOS
brew install --cask docker
# Windows
# 下载 Docker Desktop 并安装
4.10.2 单机部署(Standalone)
下载配置文件:
# 创建 Milvus 工作目录
mkdir -p milvus-docker
cd milvus-docker
# 下载官方配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-standalone-docker-compose.yml -O docker-compose.yml
docker-compose.yml 配置说明:
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- etcd
- minio
启动 Milvus:
# 启动服务
docker-compose up -d
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f standalone
# 停止服务
docker-compose down
# 停止并删除数据卷
docker-compose down -v
验证部署:
from pymilvus import connections, utility
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 检查版本
print(f"Milvus 版本: {utility.get_server_version()}")
# 检查服务状态
print(f"服务器状态: {utility.get_server_status()}")
4.10.3 集群部署(Cluster)
集群架构:
┌───────────────────────────────────────────────────────────────────────┐
│ Docker 集群部署 │
├───────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Proxy │ │ Proxy │ │ Proxy │ │
│ │ x 2 │ │ │ │ │ │
│ └──────┬──────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ┌──────┴──────────────────────────────────────────────────────┐ │
│ │ Coordinator │ │
│ │ (Root Coord, Query Coord, Data Coord, Index Coord) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────┴──────────────────────────────────────────────────────┐ │
│ │ Worker Nodes │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Query Node │ │ Data Node │ │ Index Node │ │ │
│ │ │ x 2 │ │ x 2 │ │ x 2 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────┴──────────────────────────────────────────────────────┐ │
│ │ Storage Layer │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ etcd │ │ MinIO │ │ │
│ │ │ (Meta) │ │ (Object) │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────┘
下载集群配置文件:
# 下载集群配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.3.3/milvus-cluster-docker-compose.yml -O docker-compose-cluster.yml
启动集群:
# 使用集群配置文件启动
docker-compose -f docker-compose-cluster.yml up -d
# 查看所有服务
docker-compose -f docker-compose-cluster.yml ps
# 查看特定服务日志
docker-compose -f docker-compose-cluster.yml logs -f proxy
4.10.4 自定义配置
修改资源限制:
services:
standalone:
deploy:
resources:
limits:
cpus: '4.0'
memory: 8G
reservations:
cpus: '2.0'
memory: 4G
持久化存储配置:
services:
standalone:
volumes:
# 使用命名卷
- milvus-data:/var/lib/milvus
# 或使用本地路径
- /data/milvus:/var/lib/milvus
volumes:
milvus-data:
driver: local
网络配置:
services:
standalone:
networks:
- milvus-network
- external-network
networks:
milvus-network:
driver: bridge
external-network:
external: true
4.10.5 常用操作命令
容器管理:
# 查看运行中的容器
docker ps | grep milvus
# 进入 Milvus 容器
docker exec -it milvus-standalone bash
# 查看容器资源使用
docker stats milvus-standalone
# 重启服务
docker-compose restart standalone
# 查看容器日志
docker logs -f --tail=100 milvus-standalone
数据备份:
# 备份数据卷
docker run --rm -v milvus-docker_milvus-data:/source -v $(pwd)/backup:/backup alpine tar czf /backup/milvus-backup.tar.gz -C /source .
# 恢复数据卷
docker run --rm -v milvus-docker_milvus-data:/target -v $(pwd)/backup:/backup alpine sh -c "rm -rf /target/* && tar xzf /backup/milvus-backup.tar.gz -C /target"
性能调优:
services:
standalone:
environment:
# 调整线程池大小
COMMON_SECURITY_AUTHORIZATIONENABLED: "false"
# 调整日志级别
LOG_LEVEL: "info"
sysctls:
# 增加文件描述符限制
- fs.file-max=65536
ulimits:
nofile:
soft: 65536
hard: 65536
4.10.6 与 Attu 集成
完整部署方案(Milvus + Attu):
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
depends_on:
- etcd
- minio
attu:
container_name: milvus-attu
image: zilliz/attu:latest
environment:
MILVUS_URL: standalone:19530
ports:
- "8000:3000"
depends_on:
- standalone
启动完整环境:
# 启动 Milvus 和 Attu
docker-compose up -d
# 访问 Attu
open http://localhost:8000
# 连接信息
# Milvus Address: standalone:19530
4.10.7 故障排查
常见问题:
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 端口冲突 | 19530 或 9000 被占用 | 修改 docker-compose.yml 中的端口映射 |
| 内存不足 | 系统内存不足 | 增加 Docker 内存限制或关闭其他服务 |
| 权限错误 | 数据卷权限问题 | 检查 volumes 目录权限 |
| 连接失败 | 服务未完全启动 | 等待 30 秒后重试 |
排查命令:
# 检查服务健康状态
docker-compose ps
# 查看详细日志
docker-compose logs --tail=200
# 检查端口占用
netstat -tuln | grep 19530
# 检查资源使用
docker system df
docker stats
# 清理未使用的资源
docker system prune -a
第五章 数据管理与操作
5.1 数据模型设计
设计原则:
- 向量维度:根据嵌入模型选择合适的维度
- 标量字段:添加必要的标量字段用于过滤和排序
- 分区策略:根据数据特点选择合适的分区策略
- 索引选择:根据查询需求选择合适的索引类型
示例数据模型:
| 字段名 | 数据类型 | 描述 |
|---|---|---|
| id | INT64 | 主键,自增 |
| vector | FLOAT_VECTOR | 128 维向量 |
| text | VARCHAR | 原始文本 |
| category | VARCHAR | 文本分类 |
| create_time | TIMESTAMP | 创建时间 |
| user_id | INT64 | 用户ID |
5.2 向量数据预处理
预处理步骤:
- 数据清洗:去除噪声和异常值
- 特征提取:使用嵌入模型生成向量
- 向量归一化:确保向量具有相同的尺度
- 数据验证:检查向量维度和质量
归一化示例:
import numpy as np
def normalize_vector(vector):
"""
向量归一化
Args:
vector: 原始向量
Returns:
归一化后的向量
"""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
# 批量归一化
vectors = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]
normalized_vectors = [normalize_vector(v) for v in vectors]
5.3 批量操作与性能
批量操作优化:
- 批量大小:根据内存和网络带宽调整
- 并发控制:使用线程池或异步操作
- 批量写入:减少网络往返次数
- 批量读取:提高查询效率
性能调优:
- 批量插入:每次插入 1000-10000 条数据
- 批量搜索:每次搜索 10-100 个查询向量
- 并发数:根据服务器性能调整
5.4 数据备份与恢复
备份策略:
- 定期备份:按计划执行备份
- 增量备份:只备份变更数据
- 跨区域备份:提高数据安全性
备份操作:
# 创建备份
from pymilvus import utility
utility.create_backup(
collection_name="text_embeddings",
backup_name="backup_20240313"
)
# 列出备份
backups = utility.list_backups()
# 恢复备份
utility.restore_backup(
backup_name="backup_20240313",
collection_name="text_embeddings_restored"
)
5.5 数据一致性保证
一致性级别:
- 强一致性:写入后立即可读
- 最终一致性:写入后一段时间内可读
保证机制:
- 事务支持:确保操作的原子性
- 版本控制:跟踪数据变更
- 冲突解决:处理并发写入冲突
5.6 数据导入导出
数据导入:
# 从文件导入
import csv
# 读取 CSV 文件
with open('data.csv', 'r') as f:
reader = csv.reader(f)
next(reader) # 跳过表头
data = []
for row in reader:
id = int(row[0])
vector = list(map(float, row[1].split(',')))
text = row[2]
data.append([id, vector, text])
# 导入数据
collection.insert(data)
数据导出:
# 导出数据
results = collection.query(
expr="id >= 0",
output_fields=["id", "vector", "text"]
)
# 保存到文件
import json
with open('exported_data.json', 'w') as f:
json.dump(results, f)
5.7 实际开发流程
开发流程:
- 需求分析:明确业务需求和数据特点
- 数据准备:收集和预处理数据
- 模型选择:选择合适的嵌入模型
- 索引设计:选择索引类型和参数
- 性能测试:评估系统性能
- 部署上线:部署到生产环境
- 监控维护:监控系统运行状态
最佳实践:
- 从小规模开始,逐步扩展
- 定期评估和优化系统性能
- 建立完善的监控和告警机制
- 制定数据备份和恢复策略
第六章 性能优化策略
6.1 硬件配置优化
硬件选择:
- CPU:多核心、高主频
- 内存:足够大的内存,建议至少 32GB
- 存储:SSD 存储,提高 I/O 性能
- 网络:高速网络,建议 10Gbps 以上
配置建议:
| 数据规模 | CPU | 内存 | 存储 |
|---|---|---|---|
| 100 万向量 | 8 核 | 32GB | 100GB SSD |
| 1000 万向量 | 16 核 | 64GB | 1TB SSD |
| 1 亿向量 | 32 核 | 128GB+ | 10TB SSD |
6.2 查询参数优化
参数调优:
-
nprobe:根据查询延迟要求调整
- 低延迟场景:nprobe = 10-20
- 高精度场景:nprobe = 50-100
-
ef:HNSW 索引的搜索宽度
- 低延迟场景:ef = 50-100
- 高精度场景:ef = 200-300
-
limit:返回结果数量
- 根据业务需求设置,不要返回过多结果
6.3 并发与缓存优化
并发优化:
- 连接池:使用连接池管理数据库连接
- 异步操作:使用异步 API 提高并发性能
- 负载均衡:在集群模式下合理分配请求
缓存优化:
- 内存缓存:Query Node 会自动缓存热数据
- 客户端缓存:缓存频繁查询的结果
- TTL 设置:合理设置缓存过期时间
6.4 内存与存储优化
内存优化:
- 索引选择:根据内存情况选择合适的索引类型
- 数据压缩:使用 IVF_SQ8 或 IVF_PQ 减少内存占用
- 内存限制:合理设置服务的内存限制
存储优化:
- 压缩存储:启用数据压缩
- 分层存储:热数据存储在 SSD,冷数据存储在 HDD
- 清理策略:定期清理过期数据
6.5 监控指标与调优
关键指标:
- 查询延迟:P95、P99 延迟
- 查询吞吐量:每秒查询数 (QPS)
- 索引构建时间:索引构建的耗时
- 内存使用率:内存使用情况
- 磁盘使用率:磁盘空间使用情况
- CPU 使用率:CPU 负载情况
监控工具:
- Prometheus:收集监控指标
- Grafana:可视化监控数据
- Alertmanager:设置告警规则
6.6 性能压测与评估
压测方法:
- 准备测试数据:生成符合真实场景的测试数据
- 设计测试用例:覆盖不同查询场景
- 执行压测:使用压测工具执行测试
- 分析结果:分析性能瓶颈
- 优化调整:根据测试结果进行优化
压测工具:
- milvus-benchmark:Milvus 官方压测工具
- Locust:开源压测工具
- JMeter:功能强大的压测工具
milvus-benchmark 使用示例:
# 安装 milvus-benchmark
# pip install milvus-benchmark
# 配置文件 config.yaml
"""
milvus:
host: localhost
port: 19530
collection:
collection_name: benchmark_test
dim: 128
index_type: HNSW
metric_type: L2
load:
nb: 100000 # 每次插入的向量数量
total: 1000000 # 总向量数量
search:
nq: 100 # 查询数量
topk: 10 # 返回结果数量
threads: 10 # 并发线程数
"""
# 运行压测
# milvus-benchmark -c config.yaml
# Python 压测脚本示例
import time
import numpy as np
from pymilvus import connections, Collection
from concurrent.futures import ThreadPoolExecutor
def benchmark_search(collection_name, num_queries=1000, concurrency=10):
"""
压测搜索性能
Args:
collection_name: 集合名称
num_queries: 查询次数
concurrency: 并发数
Returns:
压测结果统计
"""
# 连接Milvus
connections.connect("default", host="localhost", port="19530")
collection = Collection(collection_name)
collection.load()
# 生成查询向量
dim = 128
query_vectors = [np.random.rand(dim).tolist() for _ in range(num_queries)]
# 记录延迟
latencies = []
def search_task(vector):
start_time = time.time()
collection.search(
data=[vector],
anns_field="vector",
param={"metric_type": "L2", "params": {"ef": 64}},
limit=10
)
end_time = time.time()
return (end_time - start_time) * 1000 # 转换为毫秒
# 并发执行查询
start_time = time.time()
with ThreadPoolExecutor(max_workers=concurrency) as executor:
latencies = list(executor.map(search_task, query_vectors))
total_time = time.time() - start_time
# 计算统计指标
latencies.sort()
avg_latency = np.mean(latencies)
p50_latency = np.percentile(latencies, 50)
p95_latency = np.percentile(latencies, 95)
p99_latency = np.percentile(latencies, 99)
qps = num_queries / total_time
print(f"压测结果:")
print(f"总查询数: {num_queries}")
print(f"并发数: {concurrency}")
print(f"总耗时: {total_time:.2f} 秒")
print(f"QPS: {qps:.2f}")
print(f"平均延迟: {avg_latency:.2f} ms")
print(f"P50延迟: {p50_latency:.2f} ms")
print(f"P95延迟: {p95_latency:.2f} ms")
print(f"P99延迟: {p99_latency:.2f} ms")
return {
"qps": qps,
"avg_latency": avg_latency,
"p50_latency": p50_latency,
"p95_latency": p95_latency,
"p99_latency": p99_latency
}
# 运行压测
# results = benchmark_search("my_collection", num_queries=1000, concurrency=10)
6.7 冷启动问题解决方案
冷启动挑战:
- 新系统缺乏足够的向量数据
- 索引质量不高,查询性能差
- 系统需要时间预热
解决方案:
- 预加载数据:在系统上线前加载足够的测试数据
- 增量索引:边写入边构建索引
- 缓存预热:系统启动后执行预热查询
- 渐进式扩展:从小规模开始,逐步增加数据量
第七章 常见应用场景实现
7.1 图像相似度搜索
实现步骤:
- 图像预处理:调整大小、归一化
- 特征提取:使用 ResNet、CLIP 等模型生成向量
- 向量存储:将向量存储到 Milvus
- 相似搜索:根据查询图像的向量搜索相似图像
代码示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
from PIL import Image
import torch
from torchvision import models, transforms
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512),
FieldSchema(name="image_path", dtype=DataType.VARCHAR, max_length=512)
]
schema = CollectionSchema(fields, "图像向量集合")
collection = Collection("image_embeddings", schema)
# 加载预训练模型
model = models.resnet50(pretrained=True)
model.eval()
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 提取图像特征
def extract_features(image_path):
"""
提取图像特征
Args:
image_path: 图像路径
Returns:
图像特征向量,如果文件不存在则返回None
"""
try:
image = Image.open(image_path)
image = transform(image).unsqueeze(0)
with torch.no_grad():
features = model(image)
return features.squeeze().numpy().tolist()
except FileNotFoundError:
print(f"错误:图像文件 {image_path} 不存在")
return None
except Exception as e:
print(f"处理图像时发生错误:{e}")
return None
# 插入图像向量
image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]
vectors = [extract_features(path) for path in image_paths]
data = [vectors, image_paths]
collection.insert(data)
# 创建索引
collection.create_index("vector", {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 16, "efConstruction": 200}
})
# 加载集合
collection.load()
# 搜索相似图像
query_image = "query_image.jpg"
query_vector = [extract_features(query_image)]
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "L2", "params": {"ef": 100}},
limit=5,
output_fields=["image_path"]
)
# 打印结果
print("相似图像:")
for i, result in enumerate(results[0]):
print(f"Rank {i+1}: {result.entity.get('image_path')}, 距离: {result.distance}")
7.2 文本语义搜索
实现步骤:
- 文本预处理:分词、去停用词
- 语义嵌入:使用 BERT、Sentence-BERT 等模型生成向量
- 向量存储:将向量存储到 Milvus
- 语义搜索:根据查询文本的向量搜索相似文本
代码示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from sentence_transformers import SentenceTransformer
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1024),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=128)
]
schema = CollectionSchema(fields, "文本向量集合")
collection = Collection("text_embeddings", schema)
# 加载预训练模型
model = SentenceTransformer('all-MiniLM-L6-v2')
# 生成文本向量
def generate_embedding(text):
"""
生成文本嵌入向量
Args:
text: 文本内容
Returns:
文本嵌入向量
"""
return model.encode(text).tolist()
# 插入文本向量
texts = [
"Milvus 是一个开源的向量数据库",
"向量数据库用于相似度搜索",
"FastMCP 是一个 MCP 协议的实现",
"Python 是一种流行的编程语言"
]
categories = ["数据库", "数据库", "编程", "编程"]
vectors = [generate_embedding(text) for text in texts]
data = [vectors, texts, categories]
collection.insert(data)
# 创建索引
collection.create_index("vector", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 200}
})
# 加载集合
collection.load()
# 语义搜索
query_text = "什么是向量数据库"
query_vector = [generate_embedding(query_text)]
# 混合搜索(按类别过滤)
expr = "category == '数据库'"
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=3,
expr=expr,
output_fields=["text", "category"]
)
# 打印结果
print("搜索结果:")
for i, result in enumerate(results[0]):
print(f"Rank {i+1}: 相似度: {result.distance:.4f}")
print(f" 文本: {result.entity.get('text')}")
print(f" 类别: {result.entity.get('category')}")
7.3 推荐系统
实现步骤:
- 用户和物品嵌入:生成用户和物品的向量表示
- 向量存储:将向量存储到 Milvus
- 相似推荐:根据用户向量搜索相似物品
- 协同过滤:结合用户历史行为
代码示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
import time
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 创建物品集合
item_fields = [
FieldSchema(name="item_id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="item_vector", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="item_name", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=128)
]
item_schema = CollectionSchema(item_fields, "物品向量集合")
item_collection = Collection("items", item_schema)
# 创建用户集合
user_fields = [
FieldSchema(name="user_id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="user_vector", dtype=DataType.FLOAT_VECTOR, dim=128)
]
user_schema = CollectionSchema(user_fields, "用户向量集合")
user_collection = Collection("users", user_schema)
# 生成模拟数据
np.random.seed(42)
# 生成物品数据
item_ids = list(range(1000))
item_vectors = [np.random.rand(128).tolist() for _ in range(1000)]
item_names = [f"Item {i}" for i in range(1000)]
categories = [["Electronics", "Clothing", "Books", "Home"][i % 4] for i in range(1000)]
item_data = [item_ids, item_vectors, item_names, categories]
item_collection.insert(item_data)
# 生成用户数据
user_ids = list(range(100))
user_vectors = [np.random.rand(128).tolist() for _ in range(100)]
user_data = [user_ids, user_vectors]
user_collection.insert(user_data)
# 创建索引
item_collection.create_index("item_vector", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 200}
})
user_collection.create_index("user_vector", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 200}
})
# 加载集合
item_collection.load()
user_collection.load()
# 推荐物品给用户
def recommend_items(user_id, top_k=10):
"""
为用户推荐物品
Args:
user_id: 用户ID
top_k: 推荐数量
Returns:
推荐物品列表
"""
# 获取用户向量
user_result = user_collection.query(
expr=f"user_id == {user_id}",
output_fields=["user_vector"]
)
if not user_result:
return []
user_vector = user_result[0]["user_vector"]
# 搜索相似物品
results = item_collection.search(
data=[user_vector],
anns_field="item_vector",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=top_k,
output_fields=["item_name", "category"]
)
# 处理推荐结果
recommendations = []
for result in results[0]:
recommendations.append({
"item_id": result.id,
"item_name": result.entity.get("item_name"),
"category": result.entity.get("category"),
"similarity": result.distance
})
return recommendations
# 测试推荐
user_id = 1
recommendations = recommend_items(user_id)
print(f"为用户 {user_id} 推荐的物品:")
for i, item in enumerate(recommendations):
print(f"Rank {i+1}: {item['item_name']} (类别: {item['category']}, 相似度: {item['similarity']:.4f})")
7.4 异常检测
实现步骤:
- 正常数据嵌入:收集正常数据并生成向量
- 向量存储:将向量存储到 Milvus
- 异常检测:计算查询向量与正常向量的距离,超过阈值则为异常
代码示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=64),
FieldSchema(name="timestamp", dtype=DataType.TIMESTAMP)
]
schema = CollectionSchema(fields, "异常检测向量集合")
collection = Collection("anomaly_detection", schema)
# 生成正常数据
np.random.seed(42)
normal_data = []
for i in range(1000):
# 生成正态分布的向量
vector = np.random.normal(0, 1, 64).tolist()
timestamp = int(time.time()) - i * 3600
normal_data.append([vector, timestamp])
# 插入正常数据
collection.insert(normal_data)
# 创建索引
collection.create_index("vector", {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 16, "efConstruction": 200}
})
# 加载集合
collection.load()
# 异常检测
def detect_anomaly(vector, threshold=3.0):
"""
检测异常
Args:
vector: 待检测的向量
threshold: 异常阈值
Returns:
(是否异常, 最小距离)
"""
results = collection.search(
data=[vector],
anns_field="vector",
param={"metric_type": "L2", "params": {"ef": 100}},
limit=1
)
min_distance = results[0][0].distance
is_anomaly = min_distance > threshold
return is_anomaly, min_distance
# 测试异常检测
# 正常数据
normal_vector = np.random.normal(0, 1, 64).tolist()
is_anomaly, distance = detect_anomaly(normal_vector)
print(f"正常数据 - 异常: {is_anomaly}, 距离: {distance:.4f}")
# 异常数据
anomaly_vector = np.random.normal(5, 1, 64).tolist()
is_anomaly, distance = detect_anomaly(anomaly_vector)
print(f"异常数据 - 异常: {is_anomaly}, 距离: {distance:.4f}")
7.5 多模态检索
实现步骤:
- 多模态嵌入:使用 CLIP 等模型生成跨模态向量
- 向量存储:将不同模态的向量存储到 Milvus
- 跨模态搜索:使用一种模态的向量搜索另一种模态
代码示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512),
FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=16), # "image" or "text"
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=1024)
]
schema = CollectionSchema(fields, "多模态向量集合")
collection = Collection("multimodal", schema)
# 加载 CLIP 模型
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# 生成图像嵌入
def get_image_embedding(image_path):
"""
生成图像嵌入
Args:
image_path: 图像路径
Returns:
图像嵌入向量
"""
image = Image.open(image_path)
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
embedding = model.get_image_features(**inputs)
return embedding.squeeze().numpy().tolist()
# 生成文本嵌入
def get_text_embedding(text):
"""
生成文本嵌入
Args:
text: 文本内容
Returns:
文本嵌入向量
"""
inputs = processor(text=text, return_tensors="pt")
with torch.no_grad():
embedding = model.get_text_features(**inputs)
return embedding.squeeze().numpy().tolist()
# 插入数据
# 图像数据
image_paths = ["cat.jpg", "dog.jpg", "car.jpg"]
for path in image_paths:
vector = get_image_embedding(path)
collection.insert([[vector], ["image"], [path]])
# 文本数据
texts = ["a cat sitting on a couch", "a dog running in the park", "a red car"]
for text in texts:
vector = get_text_embedding(text)
collection.insert([[vector], ["text"], [text]])
# 创建索引
collection.create_index("vector", {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 200}
})
# 加载集合
collection.load()
# 跨模态搜索 - 文本搜索图像
query_text = "a feline animal"
query_vector = [get_text_embedding(query_text)]
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=3,
expr="type == 'image'",
output_fields=["content", "type"]
)
print("文本搜索图像结果:")
for i, result in enumerate(results[0]):
print(f"Rank {i+1}: {result.entity.get('content')}, 相似度: {result.distance:.4f}")
# 跨模态搜索 - 图像搜索文本
query_image = "cat.jpg"
query_vector = [get_image_embedding(query_image)]
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=3,
expr="type == 'text'",
output_fields=["content", "type"]
)
print("\n图像搜索文本结果:")
for i, result in enumerate(results[0]):
print(f"Rank {i+1}: {result.entity.get('content')}, 相似度: {result.distance:.4f}")
7.6 地理空间向量搜索
实现步骤:
- 地理坐标转换:将经纬度转换为向量
- 向量存储:将向量存储到 Milvus
- 距离搜索:根据地理距离搜索附近的点
代码示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
from math import radians, cos, sin, asin, sqrt
# 连接 Milvus
connections.connect("default", host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=2), # 经纬度
FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="latitude", dtype=DataType.FLOAT),
FieldSchema(name="longitude", dtype=DataType.FLOAT)
]
schema = CollectionSchema(fields, "地理空间向量集合")
collection = Collection("geospatial", schema)
# 计算两个经纬度之间的距离(Haversine公式)
def haversine(lon1, lat1, lon2, lat2):
"""
计算两个经纬度之间的距离
Args:
lon1, lat1: 第一个点的经纬度
lon2, lat2: 第二个点的经纬度
Returns:
距离(公里)
"""
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # 地球半径(公里)
return c * r
# 插入数据
locations = [
{"name": "北京", "lat": 39.9042, "lon": 116.4074},
{"name": "上海", "lat": 31.2304, "lon": 121.4737},
{"name": "广州", "lat": 23.1291, "lon": 113.2644},
{"name": "深圳", "lat": 22.5431, "lon": 114.0579},
{"name": "杭州", "lat": 30.2741, "lon": 120.1551}
]
data = []
for loc in locations:
vector = [loc["lon"], loc["lat"]]
data.append([vector, loc["name"], loc["lat"], loc["lon"]])
collection.insert(data)
# 创建索引
collection.create_index("vector", {
"index_type": "FLAT", # 对于地理空间数据,FLAT 索引更准确
"metric_type": "L2",
"params": {}
})
# 加载集合
collection.load()
# 搜索附近的地点
def search_nearby(lat, lon, radius=1000): # radius 单位:公里
"""
搜索附近的地点
Args:
lat: 纬度
lon: 经度
radius: 搜索半径(公里)
Returns:
附近的地点列表
"""
query_vector = [[lon, lat]]
# 先进行向量搜索
results = collection.search(
data=query_vector,
anns_field="vector",
param={"metric_type": "L2", "params": {}},
limit=10,
output_fields=["name", "latitude", "longitude"]
)
# 过滤距离
nearby_locations = []
for result in results[0]:
loc_lat = result.entity.get("latitude")
loc_lon = result.entity.get("longitude")
distance = haversine(lon, lat, loc_lon, loc_lat)
if distance <= radius:
nearby_locations.append({
"name": result.entity.get("name"),
"latitude": loc_lat,
"longitude": loc_lon,
"distance": distance
})
# 按距离排序
nearby_locations.sort(key=lambda x: x["distance"])
return nearby_locations
# 测试搜索
current_lat = 39.9042 # 北京
current_lon = 116.4074
nearby = search_nearby(current_lat, current_lon, 500)
print(f"北京附近 500 公里内的城市:")
for loc in nearby:
print(f"{loc['name']}: 距离 {loc['distance']:.2f} 公里")
7.7 向量模型选择与评估
模型选择考量:
- 模型大小:小型模型适合边缘设备,大型模型适合服务器
- 嵌入维度:维度越高,表达能力越强,但存储和计算成本也越高
- 推理速度:实时应用需要快速的推理速度
- 领域适配:特定领域的模型效果更好
常用模型:
- 文本:BERT、Sentence-BERT、GPT、CLIP
- 图像:ResNet、ViT、CLIP、DINO
- 音频:VGGish、Whisper、Wav2Vec2
模型评估:
- 指标:Recall@k、Precision@k、F1 Score
- 数据集:使用领域相关的数据集
- 测试方法:构建测试集,计算评估指标
第八章 监控与运维
8.1 监控指标体系
核心指标:
- 查询指标:查询延迟(P95、P99)、查询吞吐量(QPS)、查询错误率、
- 写入指标:写入吞吐量、写入延迟、写入错误率
- 系统指标:CPU 使用率、内存使用率、磁盘使用率、网络流量
- 索引指标:索引构建时间、索引大小、索引质量
8.2 常见故障排查
常见问题:
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 查询超时 | 索引未加载、参数设置不当 | 加载索引、调整查询参数 |
| 写入失败 | 内存不足、磁盘空间不足 | 增加内存、清理磁盘空间 |
| 服务崩溃 | 内存溢出、硬件故障 | 增加内存、检查硬件 |
| 索引构建失败 | 数据量过大、内存不足 | 分批构建、增加内存 |
| 连接失败 | 网络问题、服务未启动 | 检查网络、启动服务 |
排查步骤:1. 查看日志文件;2. 检查系统资源;3. 验证网络连接;4. 检查配置文件;5. 测试基本功能;6. 分析性能指标
8.3 常见问题与解决方案
Q: Milvus 服务启动失败怎么办?
A: 检查端口是否被占用,检查配置文件是否正确,查看日志文件了解具体错误原因。
Q: 查询速度慢怎么办?
A: 检查索引是否已加载,调整查询参数(如 nprobe、ef),考虑使用更适合的索引类型。
Q: 内存使用过高怎么办?
A: 调整索引类型(如使用 IVF_SQ8),限制加载的集合数量,增加服务器内存。
Q: 数据导入失败怎么办?
A: 检查数据格式是否正确,检查服务器资源是否充足,尝试分批导入。
Q: 如何提高写入性能?
A: 使用批量写入,调整批量大小,增加 Data Node 数量。
第九章 生产环境最佳实践
9.1 部署架构设计
单节点部署:
- 适用场景:开发、测试、小规模生产
- 配置:8 核 CPU、32GB 内存、100GB SSD
- 优势:部署简单,维护成本低
- 劣势:不具备高可用性
集群部署:
- 适用场景:大规模生产环境
- 配置:
- Proxy:2-4 节点
- Query Node:4-8 节点
- Data Node:2-4 节点
- Index Node:2-4 节点
- 优势:高可用性,可扩展性强
- 劣势:部署复杂,维护成本高
云服务部署:
- 适用场景:快速部署,按需扩展
- 选项:
- Milvus Cloud
- 云厂商托管服务
- 容器服务(EKS、GKE、ACK)
- 优势:无需维护基础设施,按需付费
- 劣势:成本较高,定制性受限
9.2 安全配置
import os
from pymilvus import connections
# 从环境变量读取敏感信息
MILVUS_HOST = os.environ.get('MILVUS_HOST', 'localhost')
MILVUS_PORT = os.environ.get('MILVUS_PORT', '19530')
MILVUS_USER = os.environ.get('MILVUS_USER', 'root')
MILVUS_PASSWORD = os.environ.get('MILVUS_PASSWORD', '')
# 使用TLS/SSL连接
connections.connect(
alias="default",
host=MILVUS_HOST,
port=MILVUS_PORT,
user=MILVUS_USER,
password=MILVUS_PASSWORD,
secure=True, # 启用TLS
server_pem_path="/path/to/server.pem", # 服务器证书路径
server_name="localhost" # 服务器名称
)
# RBAC权限管理示例
from pymilvus import utility
# 创建角色
utility.create_role("data_reader")
# 授予权限
utility.grant_privilege("data_reader", "Collection", "Search", "*")
utility.grant_privilege("data_reader", "Collection", "Query", "*")
# 创建用户
utility.create_user("analyst", "secure_password_123")
# 将角色分配给用户
utility.add_user_to_role("analyst", "data_reader")
# 验证用户权限
connections.connect(
alias="secure_connection",
host=MILVUS_HOST,
port=MILVUS_PORT,
user="analyst",
password="secure_password_123"
)
Docker Compose安全配置示例:
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
# 安全:限制网络访问
networks:
- milvus-internal
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: ${MINIO_ACCESS_KEY:-minioadmin}
MINIO_SECRET_KEY: ${MINIO_SECRET_KEY:-minioadmin}
# 安全:使用强密码
command: minio server /minio_data --console-address ":9001"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
networks:
- milvus-internal
milvus:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
# 安全:启用认证
COMMON_SECURITY_AUTHORIZATIONENABLED: "true"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091"
networks:
- milvus-internal
- milvus-external
# 安全:资源限制
deploy:
resources:
limits:
cpus: '4.0'
memory: 8G
reservations:
cpus: '2.0'
memory: 4G
networks:
milvus-internal:
internal: true # 内部网络,不暴露到外部
milvus-external:
driver: bridge
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)