(LangChain)RAG系统链路向量存储之Milvus(四)
1.RAG数据流水线示意图

构建RAG系统:涉及的技术链路环节: 文档加载器->文档转换器->文本嵌入模型->向量存储->检索器
2.为什么要⽤向量数据库,不能⽤MySQL存储?
文档块通过嵌入模型处理后得到对应向量,下一步就是将向量存储到数据库中,方便后续进行检索使用
传统数据库的局限性:
维度灾难:传统索引(B-Tree/Hash)在100+维度时效率断崖式下降,无法高效处理高维向量(常达768-1536维)
相似度计算:无法高效处理余弦相似度/Euclidean距离等复杂运算
实时性要求:亿级向量场景下传统方案响应延迟高达秒级,暴力搜索时间复杂度O(N)// 传统关系型数据库查询(精确匹配)
SELECT * FROM products WHERE category = 'electronics';
// 向量数据库查询(相似度匹配)
Find top5 similar_products where description ≈ '高性能游戏本'向量数据库的核心能力:
相似性搜索:余弦相似度/欧式距离
混合查询:向量搜索 + 传统条件过滤
动态扩展:支持实时数据更新
高效存储:压缩向量存储技术
3.Milvus优缺点
核心优势:分布式架构支持千亿级向量规模,QPS超百万级,提供HNSW、IVF-PQ等多样化索引算法,支持高并发场景如金融风控、生物医药分子库检索。
优点:高扩展性、多租户支持、完整的API生态(Python/Java/Go等)。
缺点:部署复杂度高,运维成本较大,适合有专业团队的企业。
4.向量数据库对比关系型数据库
Milvus 向量数据库Collection相当于关系型数据库表
Milvus 向量数据库Entity相当于关系型数据库行
Milvus 向量数据库Field相当于关系型数据库表字段
5.Milvus Docker部署
部署脚本:
#!/usr/bin/env bash
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
run_embed() {
cat << EOF > embedEtcd.yaml
listen-client-urls: http://0.0.0.0:2379
advertise-client-urls: http://0.0.0.0:2379
quota-backend-bytes: 4294967296
auto-compaction-mode: revision
auto-compaction-retention: '1000'
EOF
cat << EOF > user.yaml
# Extra config to override default milvus.yaml
EOF
if [ ! -f "./embedEtcd.yaml" ]
then
echo "embedEtcd.yaml file does not exist. Please try to create it in the current directory."
exit 1
fi
if [ ! -f "./user.yaml" ]
then
echo "user.yaml file does not exist. Please try to create it in the current directory."
exit 1
fi
sudo docker run -d \
--name milvus-standalone \
--security-opt seccomp:unconfined \
-e ETCD_USE_EMBED=true \
-e ETCD_DATA_DIR=/var/lib/milvus/etcd \
-e ETCD_CONFIG_PATH=/milvus/configs/embedEtcd.yaml \
-e COMMON_STORAGETYPE=local \
-v $(pwd)/volumes/milvus:/var/lib/milvus \
-v $(pwd)/embedEtcd.yaml:/milvus/configs/embedEtcd.yaml \
-v $(pwd)/user.yaml:/milvus/configs/user.yaml \
-p 19530:19530 \
-p 9091:9091 \
-p 2379:2379 \
--health-cmd="curl -f http://localhost:9091/healthz" \
--health-interval=30s \
--health-start-period=90s \
--health-timeout=20s \
--health-retries=3 \
milvusdb/milvus:v2.5.6 \
milvus run standalone 1> /dev/null
}
wait_for_milvus_running() {
echo "Wait for Milvus Starting..."
while true
do
res=`sudo docker ps|grep milvus-standalone|grep healthy|wc -l`
if [ $res -eq 1 ]
then
echo "Start successfully."
echo "To change the default Milvus configuration, add your settings to the user.yaml file and then restart the service."
break
fi
sleep 1
done
}
start() {
res=`sudo docker ps|grep milvus-standalone|grep healthy|wc -l`
if [ $res -eq 1 ]
then
echo "Milvus is running."
exit 0
fi
res=`sudo docker ps -a|grep milvus-standalone|wc -l`
if [ $res -eq 1 ]
then
sudo docker start milvus-standalone 1> /dev/null
else
run_embed
fi
if [ $? -ne 0 ]
then
echo "Start failed."
exit 1
fi
wait_for_milvus_running
}
stop() {
sudo docker stop milvus-standalone 1> /dev/null
if [ $? -ne 0 ]
then
echo "Stop failed."
exit 1
fi
echo "Stop successfully."
}
delete_container() {
res=`sudo docker ps|grep milvus-standalone|wc -l`
if [ $res -eq 1 ]
then
echo "Please stop Milvus service before delete."
exit 1
fi
sudo docker rm milvus-standalone 1> /dev/null
if [ $? -ne 0 ]
then
echo "Delete milvus container failed."
exit 1
fi
echo "Delete milvus container successfully."
}
delete() {
delete_container
sudo rm -rf $(pwd)/volumes
sudo rm -rf $(pwd)/embedEtcd.yaml
sudo rm -rf $(pwd)/user.yaml
echo "Delete successfully."
}
upgrade() {
read -p "Please confirm if you'd like to proceed with the upgrade. The default will be to the latest version. Confirm with 'y' for yes or 'n' for no. > " check
if [ "$check" == "y" ] ||[ "$check" == "Y" ];then
res=`sudo docker ps -a|grep milvus-standalone|wc -l`
if [ $res -eq 1 ]
then
stop
delete_container
fi
curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed_latest.sh && \
bash standalone_embed_latest.sh start 1> /dev/null && \
echo "Upgrade successfully."
else
echo "Exit upgrade"
exit 0
fi
}
case $1 in
restart)
stop
start
;;
start)
start
;;
stop)
stop
;;
upgrade)
upgrade
;;
delete)
delete
;;
*)
echo "please use bash standalone_embed.sh restart|start|stop|upgrade|delete"
;;
esac
启动命令:
#启动
bash standalone_embed.sh start#停止
bash standalone_embed.sh stop#删除
bash standalone_embed.sh delete#升级
bash standalone_embed.sh upgrade
首次使用会开始拉取镜像 速度比较慢.本文使用的是Milvus版本2.5.6
阿里云网络安全组记得开放端口 2379、9091, 19530

访问地址:http://121.11.11111:9091/webui/

Milvus Web UI 与 Attu等可视化工具 不同,它是一个内置工具,只是提供简单直观的界面,查看系统的基本信息
主要功能:运行环境、数据库/ Collections 详情、任务和慢查询请求,不支持数据库管理和操作任务
6.Milvus可视化客户端Attu安装
下载地址:https://github.com/zilliztech/attu/releases/tag/v2.5.6

下载后 直接双击安装就可以。

7.Python操作Milvus-增删改
安装依赖:pip install pymilvus==2.5.5
Milvus字段类型
Milvus索引类型

通过python创建表、列、索引

# 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型
from pymilvus import MilvusClient, DataType
# 实例化MilvusClient以连接到指定的Milvus服务器
client = MilvusClient(uri="http://121.11.11.111:19530")
# 定义Schema
schema = client.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128)
schema.verify() # 验证Schema
# 定义索引参数
index_params = client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT", # 量化索引,平衡速度与精度
metric_type="L2", # 相似性度量标准(欧式距离)
params={"nlist": 1024}, # 聚类中心数
)
# 创建集合
client.create_collection(
collection_name="nannanw_collection", schema=schema, index_params=index_params
)
参数说明:

#列出索引名称
res = client.list_indexes(
collection_name="nannanw_collection"
)
print(res)
#获取索引详细信息
res = client.describe_index(
collection_name="nannanw_collection",
index_name="vector"
)
print(res)
#如果不再需要索引,可以直接将其删除。
client.drop_index(
collection_name="nannanw_collection",
index_name="vector"
)
print("索引已删除")
data = [
{"id": 1, "vector": [0.1] * 128, "text": "Sample text 1"},
{"id": 2, "vector": [0.2] * 128, "text": "Sample text 2"},
]
# 插入数据
insert_result = client.insert(collection_name="nannanw_collection", data=data)
print("插入ID列表:", insert_result["ids"]) # 返回主键ID
#删除数据(Delete)通过主键或条件表达式删除
# 按主键删除
client.delete(
collection_name="nannanw_collection",
ids=[1, 2] # 主键列表
)
# 按条件删除(如删除text字段为空的记录)
client.delete(
collection_name="nannanw_collection",
filter="text == ''"
)
#更新数据(Update)Milvus不支持直接更新,需通过“删除+插入”实现:
# 删除旧数据
client.delete(collection_name="nannanw_collection", ids=[3])
# 插入新数据
client.insert(
collection_name="nannanw_collection",
data=[{"id": 3, "vector": [0.3]*128, "text": "Updated text"}]
)

8.Python操作Milvus-CRUD
创建包含混合数据类型(标量+向量)的集合
批量插入结构化和非结构化数据
实现带过滤条件的混合查询
验证端到端的向量搜索流程
8.1 准备数据
from pymilvus import (
connections,
MilvusClient,
FieldSchema,
CollectionSchema,
DataType,
Collection,
utility,
)
import random
# # 创建Milvus客户端
client = MilvusClient(
uri="http://121.11.111.111:19530",
)
# 删除已存在的同名集合
if client.has_collection("wnnBook"):
client.drop_collection("wnnBook")
# 定义字段
fields = [
FieldSchema(name="book_id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50),
FieldSchema(name="price", dtype=DataType.DOUBLE),
FieldSchema(name="book_intro", dtype=DataType.FLOAT_VECTOR, dim=1536),
]
# 创建集合Schema
schema = CollectionSchema(fields=fields, description="Book search collection")
# 创建集合
client.create_collection(collection_name="wnnBook", schema=schema)
# 生成测试数据
num_books = 1000
categories = ["科幻", "科技", "文学", "历史"]
titles = ["量子世界", "AI简史", "时光之轮", "文明起源", "未来简史", "数据科学"]
data = []
for i in range(num_books):
data.append(
{
"title": f"{random.choice(titles)}_{i}",
"category": random.choice(categories),
"price": round(random.uniform(10, 1000), 2),
"book_intro": [random.random() for _ in range(1536)], # 1536维向量
}
)
# 批量插入
insert_result = client.insert(collection_name="wnnBook", data=data)
print(f"插入数据量:{len(insert_result['ids'])}")
8.2 创建索引
# 准备索引参数,为"vector"字段创建索引
index_params = MilvusClient.prepare_index_params()
# 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数
index_params.add_index(
field_name="book_intro",
metric_type="L2", # 距离计算方式 (L2/IP/COSINE)
index_type="IVF_FLAT",
index_name="vector_index",
params={"nlist": 128}, # 聚类中心数 (建议值:sqrt(数据量))
)
# 创建索引,不等待索引创建完成即返回
client.create_index(collection_name="wnnBook", index_params=index_params)
print("索引创建完成")
索引创建好之后加载

8.3 执行查询
from langchain_community.embeddings import DashScopeEmbeddings
# client.load_collection(collection_name="book") # 加载集合到内存
ali_embeddings = DashScopeEmbeddings(
model="text-embedding-v2", # 第二代通用模型
max_retries=3,
dashscope_api_key="sk-xxxxxxx",
)
query_vector_title = ["文明起源_1"]
embeddings = ali_embeddings.embed_documents(query_vector_title)
# 执行带过滤条件的向量搜索
results = client.search(
collection_name="wnnBook",
data=embeddings, # 支持批量查询
filter="category == '科幻'",#标量过滤
output_fields=["title", "category", "price"],
limit=3,
search_params={"nprobe": 10},
)
# 解析结果
print("\n科幻类搜索结果:")
for result in results[0]: # 第一个查询结果集
print(f"ID: {result['book_id']}")
print(f"距离: {result['distance']:.4f}")
print(f"标题: {result['entity']['title']}")
print(f"价格: {result['entity']['price']:.2f}")
print("-" * 30)

9.LangChain整合Milvus-CRUD
LangChain设计抽象类VectorStore,统一接口,具体的实现由各自数据库负责
安装依赖:pip install langchain-milvus
9.1准备数据
from langchain_community.embeddings import DashScopeEmbeddings
# from langchain.vectorstores import Milvus
from langchain_milvus import Milvus
from langchain_core.documents import Document
from uuid import uuid4
# 初始化模型
embeddings = DashScopeEmbeddings(
model="text-embedding-v2", # 第二代通用模型
max_retries=3,
dashscope_api_key="sk-005c3c11111111111111",
)
vector_store = Milvus(
embeddings,
connection_args={"uri": "http://121.11.111.111:19530"},
collection_name="nnw_langchain_example",
)
document_1 = Document(
page_content="今天早上我吃了巧克力豆煎饼和炒鸡蛋当早餐。",
metadata={"source": "tweet"},
)
document_2 = Document(
page_content="明天的天气预报是多云和阴天,最高气温62华氏度。",
metadata={"source": "news"},
)
document_3 = Document(
page_content="正在用LangChain构建一个令人兴奋的新项目——快来看看!",
metadata={"source": "tweet"},
)
document_4 = Document(
page_content="劫匪闯入城市银行,抢走了100万美元现金。",
metadata={"source": "news"},
)
document_5 = Document(
page_content="哇!那是一部令人惊叹的电影。我迫不及待想再看一次。",
metadata={"source": "tweet"},
)
document_6 = Document(
page_content="新的iPhone值这个价格吗?阅读这篇评论了解详情。",
metadata={"source": "website"},
)
document_7 = Document(
page_content="目前世界上排名前十的足球运动员。",
metadata={"source": "website"},
)
document_8 = Document(
page_content="LangGraph是构建有状态、智能应用的最佳框架!",
metadata={"source": "tweet"},
)
document_9 = Document(
page_content="由于对经济衰退的担忧,今天股市下跌了500点。",
metadata={"source": "news"},
)
document_10 = Document(
page_content="我有一种不好的预感,我可能会被删除 :(",
metadata={"source": "tweet"},
)
documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
ids = [str(i + 1) for i in range(len(documents))]
print(ids)
result = vector_store.add_documents(documents=documents, ids=ids)
print(result)

删除数据:
result = vector_store.delete(ids=["1"])
print(result)
9.2查询数据
# 相似性搜索
query = "早上你吃了什么?"
results = vector_store.similarity_search(query, k=1)
for doc in results:
print(f"内容:{doc.page_content}\n元数据:{doc.metadata}\n")
# 混合搜索(结合元数据过滤)
results = vector_store.similarity_search(query, k=1, expr='source == "tweet"')
print(results)

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)