欢迎来到Milvus高效数据导入与预处理的世界!在本文,我将带你深入了解Milvus的批量数据导入方式和数据预处理技巧。通过这篇博客,你将学会如何高效地将大规模数据导入Milvus,并对数据进行预处理,以确保高效的检索和分析。准备好了吗?让我们开始这段知识之旅吧!

Milvus的批量数据导入

批量数据导入的方式

Milvus支持多种批量数据导入方式,以适应不同的应用场景和需求。主要方式包括:

  1. 直接插入:通过API接口一次性插入大批量数据。
  2. 文件导入:将数据保存到文件中,然后批量导入Milvus。
  3. 分布式导入:利用分布式系统并行导入大规模数据。
直接插入

直接插入方式适用于数据量较小或中等的数据集,数据可以通过API一次性传输到Milvus。

文件导入

文件导入方式适用于数据量较大、需要批量处理的数据集。数据可以先保存到文件(如CSV、JSON)中,然后通过脚本或工具批量导入Milvus。

分布式导入

分布式导入方式适用于超大规模数据集,通过分布式系统并行处理和导入数据,提高导入效率。

批量数据导入方式
直接插入
文件导入
分布式导入
小规模数据集
大规模数据集
超大规模数据集

数据预处理技巧

数据预处理是数据导入过程中的重要环节,通过合理的数据预处理,可以显著提高数据导入和检索的效率。主要预处理技巧包括:

  1. 数据清洗:去除无效数据、修复缺失值等。
  2. 数据标准化:将数据转换到统一的尺度或分布。
  3. 数据降维:使用PCA、t-SNE等方法降低数据维度。
  4. 特征提取:从原始数据中提取重要特征。
数据预处理技巧
数据清洗
数据标准化
数据降维
特征提取

数据清洗

数据清洗适用于数据质量较差、存在大量缺失值或噪声的数据集。

数据标准化

数据标准化适用于不同尺度或分布的数据集,通过标准化处理可以提高算法的性能和稳定性。

数据降维

数据降维适用于高维数据集,通过降维可以减少数据的冗余,提高计算效率。

特征提取

特征提取适用于复杂的原始数据集,通过提取重要特征,可以提高数据的可解释性和检索性能。

批量数据导入的注意点

注意点

  1. 数据格式:确保导入的数据格式与Milvus兼容,如向量数据应为浮点数。
  2. 数据量控制:根据系统资源合理控制单次导入的数据量,避免资源耗尽。
  3. 错误处理:设置错误处理机制,保证导入过程的稳定性和可靠性。

批量数据导入示例

直接插入示例
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.grpc.MutationResult;

import java.util.Arrays;
import java.util.List;

public class MilvusBatchInsertExample {
    public static void main(String[] args) {
        // 连接到Milvus服务器
        MilvusClient client = new MilvusServiceClient(ConnectParam.newBuilder()
                .withHost("localhost")  // Milvus服务器地址
                .withPort(19530)  // Milvus服务器端口
                .build());

        // 准备插入的数据
        List<Long> idList = Arrays.asList(1L, 2L, 3L);
        List<List<Float>> vectorList = Arrays.asList(
                Arrays.asList(0.1f, 0.2f, 0.3f, 0.4f),
                Arrays.asList(0.5f, 0.6f, 0.7f, 0.8f),
                Arrays.asList(0.9f, 1.0f, 1.1f, 1.2f)
        );

        // 创建插入参数
        InsertParam insertParam = InsertParam.newBuilder()
                .withCollectionName("example_collection")  // 指定集合名称
                .withFields(Arrays.asList(
                        InsertParam.Field.newBuilder()
                                .withName("id")  // 字段名称
                                .withValues(idList)  // 字段值
                                .build(),
                        InsertParam.Field.newBuilder()
                                .withName("vector")  // 字段名称
                                .withValues(vectorList)  // 字段值
                                .build()
                ))
                .build();

        // 执行数据插入
        MutationResult insertResult = client.insert(insertParam);
        System.out.println("Data inserted successfully!");
    }
}

文件导入示例

假设我们有一个CSV文件data.csv,内容如下:

id,vector
1,0.1 0.2 0.3 0.4
2,0.5 0.6 0.7 0.8
3,0.9 1.0 1.1 1.2

可以使用以下Java代码读取CSV文件并导入数据:

import com.opencsv.CSVReader;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.grpc.MutationResult;

import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MilvusCSVImportExample {
    public static void main(String[] args) {
        try {
            // 连接到Milvus服务器
            MilvusClient client = new MilvusServiceClient(ConnectParam.newBuilder()
                    .withHost("localhost")  // Milvus服务器地址
                    .withPort(19530)  // Milvus服务器端口
                    .build());

            // 读取CSV文件
            CSVReader reader = new CSVReader(new FileReader("data.csv"));
            String[] nextLine;
            List<Long> idList = new ArrayList<>();
            List<List<Float>> vectorList = new ArrayList<>();
            reader.readNext(); // 跳过头行
            while ((nextLine = reader.readNext()) != null) {
                idList.add(Long.parseLong(nextLine[0]));
                List<Float> vector = new ArrayList<>();
                for (String value : nextLine[1].split(" ")) {
                    vector.add(Float.parseFloat(value));
                }
                vectorList.add(vector);
            }
            reader.close();

            // 创建插入参数
            InsertParam insertParam = InsertParam.newBuilder()
                    .withCollectionName("example_collection")  // 指定集合名称
                    .withFields(Arrays.asList(
                            InsertParam.Field.newBuilder()
                                    .withName("id")  // 字段名称
                                    .withValues(idList)  // 字段值
                                    .build(),
                            InsertParam.Field.newBuilder()
                                    .withName("vector")  // 字段名称
                                    .withValues(vectorList)  // 字段值
                                    .build()
                    ))
                    .build();

            // 执行数据插入
            MutationResult insertResult = client.insert(insertParam);
            System.out.println("Data imported successfully from CSV file!");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

分布式导入示例

假设我们使用Apache Spark进行分布式数据处理和导入:

首先,引入依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>io.milvus</groupId>
    <artifactId>milvus-sdk-java</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.opencsv</groupId>
    <artifactId>opencsv</artifactId>
    <version>5.5.2</version>
</dependency>

然后,使用以下Java代码进行分布式

数据处理和导入:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.grpc.MutationResult;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Iterator;

public class MilvusSparkImportExample {
    public static void main(String[] args) {
        // 创建SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("MilvusBatchInsert")
                .master("local[*]") // 本地模式,适用于测试
                .getOrCreate();

        // 读取CSV文件
        Dataset<Row> df = spark.read().format("csv")
                .option("header", "true")
                .load("data.csv");

        // 分布式处理和导入
        df.foreachPartition((ForeachPartitionFunction<Row>) partition -> {
            // 连接到Milvus服务器
            MilvusClient client = new MilvusServiceClient(ConnectParam.newBuilder()
                    .withHost("localhost")  // Milvus服务器地址
                    .withPort(19530)  // Milvus服务器端口
                    .build());

            List<Long> idList = new ArrayList<>();
            List<List<Float>> vectorList = new ArrayList<>();

            // 处理分区数据
            while (partition.hasNext()) {
                Row row = partition.next();
                idList.add(Long.parseLong(row.getString(0)));
                List<Float> vector = new ArrayList<>();
                for (String value : row.getString(1).split(" ")) {
                    vector.add(Float.parseFloat(value));
                }
                vectorList.add(vector);
            }

            // 创建插入参数
            InsertParam insertParam = InsertParam.newBuilder()
                    .withCollectionName("example_collection")  // 指定集合名称
                    .withFields(Arrays.asList(
                            InsertParam.Field.newBuilder()
                                    .withName("id")  // 字段名称
                                    .withValues(idList)  // 字段值
                                    .build(),
                            InsertParam.Field.newBuilder()
                                    .withName("vector")  // 字段名称
                                    .withValues(vectorList)  // 字段值
                                    .build()
                    ))
                    .build();

            // 执行数据插入
            MutationResult insertResult = client.insert(insertParam);
            System.out.println("Data imported successfully from Spark partition!");

            client.close();
        });

        spark.stop();
    }
}

数据预处理技巧

数据清洗示例

数据清洗是数据预处理的重要步骤,通过去除无效数据和修复缺失值,可以提高数据质量和系统性能。以下是Java实现的数据清洗示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

public class DataCleaningExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("data.csv"))
                    .lines()
                    .collect(Collectors.toList());

            // 去除无效数据和修复缺失值
            List<String> cleanedData = new ArrayList<>();
            cleanedData.add(lines.get(0)); // 保留头行
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                if (parts.length == 2 && !parts[1].isEmpty()) {
                    cleanedData.add(line);
                }
            }

            // 保存清洗后的数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("cleaned_data.csv"));
            for (String line : cleanedData) {
                writer.write(line);
                writer.newLine();
            }
            writer.close();

            System.out.println("Data cleaned successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

数据标准化示例

数据标准化是将数据转换到统一的尺度或分布,有助于提高算法的性能和稳定性。以下是Java实现的数据标准化示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;

public class DataStandardizationExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("cleaned_data.csv"))
                    .lines()
                    .collect(Collectors.toList());

            List<List<Double>> vectors = new ArrayList<>();
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                List<Double> vector = Arrays.stream(parts[1].split(" "))
                        .map(Double::parseDouble)
                        .collect(Collectors.toList());
                vectors.add(vector);
            }

            // 计算均值和标准差
            int dim = vectors.get(0).size();
            double[] means = new double[dim];
            double[] stds = new double[dim];
            for (int i = 0; i < dim; i++) {
                DescriptiveStatistics stats = new DescriptiveStatistics();
                for (List<Double> vector : vectors) {
                    stats.addValue(vector.get(i));
                }
                means[i] = stats.getMean();
                stds[i] = stats.getStandardDeviation();
            }

            // 标准化数据
            List<String> standardizedData = new ArrayList<>();
            standardizedData.add(lines.get(0)); // 保留头行
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                List<Double> vector = Arrays.stream(parts[1].split(" "))
                        .map(Double::parseDouble)
                        .collect(Collectors.toList());
                for (int i = 0; i < dim; i++) {
                    vector.set(i, (vector.get(i) - means[i]) / stds[i]);
                }
                standardizedData.add(parts[0] + "," + vector.stream()
                        .map(String::valueOf)
                        .collect(Collectors.joining(" ")));
            }

            // 保存标准化后的数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("standardized_data.csv"));
            for (String line : standardizedData) {
                writer.write(line);
                writer.newLine();
            }
            writer.close();

            System.out.println("Data standardized successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

数据降维示例

数据降维是减少数据维度的一种技术,通过PCA等方法,可以降低数据的冗余,提高计算效率。以下是Java实现的数据降维示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.math3.linear.*;

public class DataDimensionalityReductionExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("standardized_data.csv"))
                    .lines()
                    .collect(Collectors.toList());

            // 转换为矩阵
            List<double[]> vectors = new ArrayList<>();
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                double[] vector = Arrays.stream(parts[1].split(" "))
                        .mapToDouble(Double::parseDouble)
                        .toArray();
                vectors.add(vector);
            }
            RealMatrix matrix = new Array2DRowRealMatrix(vectors.toArray(new double[0][0]));

            // 计算协方差矩阵
            RealMatrix covarianceMatrix = new Covariance(matrix).getCovarianceMatrix();

            // 计算特征值和特征向量
            EigenDecomposition ed = new EigenDecomposition(covarianceMatrix);
            RealMatrix eigenVectors = ed.getV();

            // 选择前两个主成分
            RealMatrix pcaMatrix = matrix.multiply(eigenVectors.getSubMatrix(0, eigenVectors.getRowDimension() - 1, 0, 1));

            // 保存降维后的数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("reduced_data.csv"));
            writer.write(lines.get(0)); // 保留头行
            writer.newLine();
            for (int i = 0; i < pcaMatrix.getRowDimension(); i++) {
                writer.write(i + 1 + "," + Arrays.stream(pcaMatrix.getRow(i))
                        .mapToObj(String::valueOf)
                        .collect(Collectors.joining(" ")));
                writer.newLine();
            }
            writer.close();

            System.out.println("Data reduced successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

特征提取示例

特征提取是从原始数据中提取重要特征的一种技术,通过特征提取,可以提高数据的可解释性和检索性能。以下是Java实现的特征提取示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.math3.linear.*;

public class DataFeatureExtractionExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("reduced_data.csv"))
                    .lines()


                    .collect(Collectors.toList());

            // 假设我们已经知道哪些特征是重要的(例如通过领域知识或特征选择算法确定)
            // 这里只是简单示例,选择前两个特征
            List<String> extractedFeatures = lines.stream()
                    .map(line -> {
                        String[] parts = line.split(",");
                        return parts[0] + "," + parts[1] + " " + parts[2];
                    })
                    .collect(Collectors.toList());

            // 保存提取后的特征数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("extracted_features.csv"));
            for (String line : extractedFeatures) {
                writer.write(line);
                writer.newLine();
            }
            writer.close();

            System.out.println("Features extracted successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

完整的 pom.xml 文件依赖包

以下是一个完整的 pom.xml 文件示例,包含上述所有依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>milvus-data-import</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- Milvus SDK -->
        <dependency>
            <groupId>io.milvus</groupId>
            <artifactId>milvus-sdk-java</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!-- OpenCSV -->
        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.5.2</version>
        </dependency>

        <!-- Apache Commons Math -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.6.1</version>
        </dependency>

        <!-- Apache Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>
</project>

其他注意事项

  1. 确保网络连接:下载这些依赖包需要网络连接,请确保Maven能够连接到中央仓库或配置了镜像。
  2. 环境配置:确保本地环境已配置好Java开发环境(JDK 8或更高版本)。
  3. 数据文件准备:确保本地有准备好的CSV文件(如data.csv)用于测试数据导入。

总结

通过这篇博客,我们详细介绍了Milvus的批量数据导入方式和数据预处理技巧。我们探讨了批量数据导入的多种方式,包括直接插入、文件导入和分布式导入,并详细讲解了数据预处理的各种技巧,如数据清洗、数据标准化、数据降维和特征提取。通过具体的Java代码示例,我们展示了如何在实际应用中实现这些技巧。

如果你喜欢这篇文章,别忘了收藏文章、关注作者、订阅专栏,感激不尽。

GitHub 加速计划 / mi / milvus
28.68 K
2.76 K
下载
A cloud-native vector database, storage for next generation AI applications
最近提交(Master分支:3 个月前 )
92e6ee62 Related to #37895 Only resolves the starving issue which caused goroutine leakage Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> 5 小时前
83df7251 Related to #37767 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> 6 小时前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐