1.RAG数据流水线示意图

 构建RAG系统:涉及的技术链路环节: 文档加载器->文档转换器->文本嵌入模型->向量存储->检索器

2.为什么要⽤向量数据库,不能⽤MySQL存储?

文档块通过嵌入模型处理后得到对应向量,下一步就是将向量存储到数据库中,方便后续进行检索使用

传统数据库的局限性:
维度灾难:传统索引(B-Tree/Hash)在100+维度时效率断崖式下降,无法高效处理高维向量(常达768-1536维)
相似度计算:无法高效处理余弦相似度/Euclidean距离等复杂运算
实时性要求:亿级向量场景下传统方案响应延迟高达秒级,暴力搜索时间复杂度O(N)

// 传统关系型数据库查询(精确匹配)
SELECT * FROM products WHERE category = 'electronics';
// 向量数据库查询(相似度匹配)
Find top5 similar_products where description ≈ '高性能游戏本'

向量数据库的核心能力:
相似性搜索:余弦相似度/欧式距离
混合查询:向量搜索 + 传统条件过滤
动态扩展:支持实时数据更新
高效存储:压缩向量存储技术

3.Milvus优缺点

核心优势:分布式架构支持千亿级向量规模,QPS超百万级,提供HNSW、IVF-PQ等多样化索引算法,支持高并发场景如金融风控、生物医药分子库检索。  
优点:高扩展性、多租户支持、完整的API生态(Python/Java/Go等)。  
缺点:部署复杂度高,运维成本较大,适合有专业团队的企业。

4.向量数据库对比关系型数据库

Milvus 向量数据库Collection相当于关系型数据库表
Milvus 向量数据库Entity相当于关系型数据库行
Milvus 向量数据库Field相当于关系型数据库表字段

5.Milvus Docker部署

部署脚本:

#!/usr/bin/env bash

# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

run_embed() {
    cat << EOF > embedEtcd.yaml
listen-client-urls: http://0.0.0.0:2379
advertise-client-urls: http://0.0.0.0:2379
quota-backend-bytes: 4294967296
auto-compaction-mode: revision
auto-compaction-retention: '1000'
EOF

    cat << EOF > user.yaml
# Extra config to override default milvus.yaml
EOF
    if [ ! -f "./embedEtcd.yaml" ]
    then
        echo "embedEtcd.yaml file does not exist. Please try to create it in the current directory."
        exit 1
    fi

    if [ ! -f "./user.yaml" ]
    then
        echo "user.yaml file does not exist. Please try to create it in the current directory."
        exit 1
    fi

    sudo docker run -d \
        --name milvus-standalone \
        --security-opt seccomp:unconfined \
        -e ETCD_USE_EMBED=true \
        -e ETCD_DATA_DIR=/var/lib/milvus/etcd \
        -e ETCD_CONFIG_PATH=/milvus/configs/embedEtcd.yaml \
        -e COMMON_STORAGETYPE=local \
        -v $(pwd)/volumes/milvus:/var/lib/milvus \
        -v $(pwd)/embedEtcd.yaml:/milvus/configs/embedEtcd.yaml \
        -v $(pwd)/user.yaml:/milvus/configs/user.yaml \
        -p 19530:19530 \
        -p 9091:9091 \
        -p 2379:2379 \
        --health-cmd="curl -f http://localhost:9091/healthz" \
        --health-interval=30s \
        --health-start-period=90s \
        --health-timeout=20s \
        --health-retries=3 \
        milvusdb/milvus:v2.5.6 \
        milvus run standalone  1> /dev/null
}

wait_for_milvus_running() {
    echo "Wait for Milvus Starting..."
    while true
    do
        res=`sudo docker ps|grep milvus-standalone|grep healthy|wc -l`
        if [ $res -eq 1 ]
        then
            echo "Start successfully."
            echo "To change the default Milvus configuration, add your settings to the user.yaml file and then restart the service."
            break
        fi
        sleep 1
    done
}

start() {
    res=`sudo docker ps|grep milvus-standalone|grep healthy|wc -l`
    if [ $res -eq 1 ]
    then
        echo "Milvus is running."
        exit 0
    fi

    res=`sudo docker ps -a|grep milvus-standalone|wc -l`
    if [ $res -eq 1 ]
    then
        sudo docker start milvus-standalone 1> /dev/null
    else
        run_embed
    fi

    if [ $? -ne 0 ]
    then
        echo "Start failed."
        exit 1
    fi

    wait_for_milvus_running
}

stop() {
    sudo docker stop milvus-standalone 1> /dev/null

    if [ $? -ne 0 ]
    then
        echo "Stop failed."
        exit 1
    fi
    echo "Stop successfully."

}

delete_container() {
    res=`sudo docker ps|grep milvus-standalone|wc -l`
    if [ $res -eq 1 ]
    then
        echo "Please stop Milvus service before delete."
        exit 1
    fi
    sudo docker rm milvus-standalone 1> /dev/null
    if [ $? -ne 0 ]
    then
        echo "Delete milvus container failed."
        exit 1
    fi
    echo "Delete milvus container successfully."
}

delete() {
    delete_container
    sudo rm -rf $(pwd)/volumes
    sudo rm -rf $(pwd)/embedEtcd.yaml
    sudo rm -rf $(pwd)/user.yaml
    echo "Delete successfully."
}

upgrade() {
    read -p "Please confirm if you'd like to proceed with the upgrade. The default will be to the latest version. Confirm with 'y' for yes or 'n' for no. > " check
    if [ "$check" == "y" ] ||[ "$check" == "Y" ];then
        res=`sudo docker ps -a|grep milvus-standalone|wc -l`
        if [ $res -eq 1 ]
        then
            stop
            delete_container
        fi

        curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed_latest.sh && \
        bash standalone_embed_latest.sh start 1> /dev/null && \
        echo "Upgrade successfully."
    else
        echo "Exit upgrade"
        exit 0
    fi
}

case $1 in
    restart)
        stop
        start
        ;;
    start)
        start
        ;;
    stop)
        stop
        ;;
    upgrade)
        upgrade
        ;;
    delete)
        delete
        ;;
    *)
        echo "please use bash standalone_embed.sh restart|start|stop|upgrade|delete"
        ;;
esac

启动命令:

#启动
bash standalone_embed.sh start

#停止
bash standalone_embed.sh stop

#删除
bash standalone_embed.sh delete

#升级
bash standalone_embed.sh upgrade 

首次使用会开始拉取镜像  速度比较慢.本文使用的是Milvus版本2.5.6

阿里云网络安全组记得开放端口 2379、9091, 19530

访问地址:http://121.11.11111:9091/webui/

Milvus Web UI 与 Attu等可视化工具 不同,它是一个内置工具,只是提供简单直观的界面,查看系统的基本信息
主要功能:运行环境、数据库/ Collections 详情、任务和慢查询请求,不支持数据库管理和操作任务

6.Milvus可视化客户端Attu安装

下载地址:https://github.com/zilliztech/attu/releases/tag/v2.5.6

下载后 直接双击安装就可以。

7.Python操作Milvus-增删改

安装依赖:pip install pymilvus==2.5.5

Milvus字段类型

Milvus索引类型

通过python创建表、列、索引

# 导入MilvusClient和DataType模块,用于连接Milvus服务器并操作数据类型
from pymilvus import MilvusClient, DataType

# 实例化MilvusClient以连接到指定的Milvus服务器
client = MilvusClient(uri="http://121.11.11.111:19530")
# 定义Schema
schema = client.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=128)
schema.verify()  # 验证Schema

# 定义索引参数
index_params = client.prepare_index_params()
index_params.add_index(
    field_name="vector",
    index_type="IVF_FLAT",  # 量化索引,平衡速度与精度
    metric_type="L2",  # 相似性度量标准(欧式距离)
    params={"nlist": 1024},  # 聚类中心数
)

# 创建集合
client.create_collection(
    collection_name="nannanw_collection", schema=schema, index_params=index_params
)

参数说明:

#列出索引名称
res = client.list_indexes(
    collection_name="nannanw_collection"
)
print(res)

#获取索引详细信息
res = client.describe_index(
    collection_name="nannanw_collection",
    index_name="vector"
)
print(res)

#如果不再需要索引,可以直接将其删除。
client.drop_index(
    collection_name="nannanw_collection",
    index_name="vector"
)
print("索引已删除")

data = [
    {"id": 1, "vector": [0.1] * 128, "text": "Sample text 1"},
    {"id": 2, "vector": [0.2] * 128, "text": "Sample text 2"},
]

# 插入数据
insert_result = client.insert(collection_name="nannanw_collection", data=data)
print("插入ID列表:", insert_result["ids"])  # 返回主键ID

#删除数据(Delete)通过主键或条件表达式删除
# 按主键删除
client.delete(
    collection_name="nannanw_collection",
    ids=[1, 2]  # 主键列表
)

# 按条件删除(如删除text字段为空的记录)
client.delete(
    collection_name="nannanw_collection",
    filter="text == ''"
)


#更新数据(Update)Milvus不支持直接更新,需通过“删除+插入”实现:
# 删除旧数据
client.delete(collection_name="nannanw_collection", ids=[3])

# 插入新数据
client.insert(
    collection_name="nannanw_collection",
    data=[{"id": 3, "vector": [0.3]*128, "text": "Updated text"}]
)

8.Python操作Milvus-CRUD

创建包含混合数据类型(标量+向量)的集合
批量插入结构化和非结构化数据
实现带过滤条件的混合查询
验证端到端的向量搜索流程

8.1 准备数据

from pymilvus import (
    connections,
    MilvusClient,
    FieldSchema,
    CollectionSchema,
    DataType,
    Collection,
    utility,
)
import random

# # 创建Milvus客户端
client = MilvusClient(
    uri="http://121.11.111.111:19530",
)

# 删除已存在的同名集合
if client.has_collection("wnnBook"):
    client.drop_collection("wnnBook")

# 定义字段
fields = [
    FieldSchema(name="book_id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=200),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50),
    FieldSchema(name="price", dtype=DataType.DOUBLE),
    FieldSchema(name="book_intro", dtype=DataType.FLOAT_VECTOR, dim=1536),
]

# 创建集合Schema
schema = CollectionSchema(fields=fields, description="Book search collection")

# 创建集合
client.create_collection(collection_name="wnnBook", schema=schema)

# 生成测试数据
num_books = 1000
categories = ["科幻", "科技", "文学", "历史"]
titles = ["量子世界", "AI简史", "时光之轮", "文明起源", "未来简史", "数据科学"]

data = []
for i in range(num_books):
    data.append(
        {
            "title": f"{random.choice(titles)}_{i}",
            "category": random.choice(categories),
            "price": round(random.uniform(10, 1000), 2),
            "book_intro": [random.random() for _ in range(1536)],  # 1536维向量
        }
    )

# 批量插入
insert_result = client.insert(collection_name="wnnBook", data=data)

print(f"插入数据量:{len(insert_result['ids'])}")

8.2 创建索引


# 准备索引参数,为"vector"字段创建索引
index_params = MilvusClient.prepare_index_params()

# 添加索引配置,指定字段名、度量类型、索引类型、索引名和参数
index_params.add_index(
    field_name="book_intro",
    metric_type="L2",  # 距离计算方式 (L2/IP/COSINE)
    index_type="IVF_FLAT",
    index_name="vector_index",
    params={"nlist": 128},  # 聚类中心数 (建议值:sqrt(数据量))
)

# 创建索引,不等待索引创建完成即返回
client.create_index(collection_name="wnnBook", index_params=index_params)
print("索引创建完成")

索引创建好之后加载

8.3 执行查询


from langchain_community.embeddings import DashScopeEmbeddings

# client.load_collection(collection_name="book")  # 加载集合到内存
ali_embeddings = DashScopeEmbeddings(
    model="text-embedding-v2",  # 第二代通用模型
    max_retries=3,
    dashscope_api_key="sk-xxxxxxx",
)
query_vector_title = ["文明起源_1"]
embeddings = ali_embeddings.embed_documents(query_vector_title)

# 执行带过滤条件的向量搜索
results = client.search(
    collection_name="wnnBook",
    data=embeddings,  # 支持批量查询
    filter="category == '科幻'",#标量过滤
    output_fields=["title", "category", "price"],
    limit=3,
    search_params={"nprobe": 10},
)
# 解析结果
print("\n科幻类搜索结果:")
for result in results[0]:  # 第一个查询结果集
    print(f"ID: {result['book_id']}")
    print(f"距离: {result['distance']:.4f}")
    print(f"标题: {result['entity']['title']}")
    print(f"价格: {result['entity']['price']:.2f}")
    print("-" * 30)

 9.LangChain整合Milvus-CRUD

LangChain设计抽象类VectorStore,统一接口,具体的实现由各自数据库负责

安装依赖:pip install langchain-milvus

9.1准备数据

from langchain_community.embeddings import DashScopeEmbeddings

# from langchain.vectorstores import Milvus
from langchain_milvus import Milvus
from langchain_core.documents import Document
from uuid import uuid4

# 初始化模型
embeddings = DashScopeEmbeddings(
    model="text-embedding-v2",  # 第二代通用模型
    max_retries=3,
    dashscope_api_key="sk-005c3c11111111111111",
)
vector_store = Milvus(
    embeddings,
    connection_args={"uri": "http://121.11.111.111:19530"},
    collection_name="nnw_langchain_example",
)
document_1 = Document(
    page_content="今天早上我吃了巧克力豆煎饼和炒鸡蛋当早餐。",
    metadata={"source": "tweet"},
)

document_2 = Document(
    page_content="明天的天气预报是多云和阴天,最高气温62华氏度。",
    metadata={"source": "news"},
)

document_3 = Document(
    page_content="正在用LangChain构建一个令人兴奋的新项目——快来看看!",
    metadata={"source": "tweet"},
)

document_4 = Document(
    page_content="劫匪闯入城市银行,抢走了100万美元现金。",
    metadata={"source": "news"},
)

document_5 = Document(
    page_content="哇!那是一部令人惊叹的电影。我迫不及待想再看一次。",
    metadata={"source": "tweet"},
)

document_6 = Document(
    page_content="新的iPhone值这个价格吗?阅读这篇评论了解详情。",
    metadata={"source": "website"},
)

document_7 = Document(
    page_content="目前世界上排名前十的足球运动员。",
    metadata={"source": "website"},
)

document_8 = Document(
    page_content="LangGraph是构建有状态、智能应用的最佳框架!",
    metadata={"source": "tweet"},
)

document_9 = Document(
    page_content="由于对经济衰退的担忧,今天股市下跌了500点。",
    metadata={"source": "news"},
)

document_10 = Document(
    page_content="我有一种不好的预感,我可能会被删除 :(",
    metadata={"source": "tweet"},
)
documents = [
    document_1,
    document_2,
    document_3,
    document_4,
    document_5,
    document_6,
    document_7,
    document_8,
    document_9,
    document_10,
]


ids = [str(i + 1) for i in range(len(documents))]
print(ids)
result = vector_store.add_documents(documents=documents, ids=ids)
print(result)

删除数据:

result = vector_store.delete(ids=["1"])
print(result)

9.2查询数据

# 相似性搜索
query = "早上你吃了什么?"
results = vector_store.similarity_search(query, k=1)
for doc in results:
    print(f"内容:{doc.page_content}\n元数据:{doc.metadata}\n")

# 混合搜索(结合元数据过滤)
results = vector_store.similarity_search(query, k=1, expr='source == "tweet"')
print(results)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐