spirng boot 整合 InfluxDB2

InfluxDB是一个由 InfluxData 开发的开源时序型数据。它由 Go 写成,着力于高性能地查询与存储时序型数据。InfluxDB 被广泛应用于存储系统的监控数据,IoT 行业的实时数据等场景。

在docker 中安装InfluxDB2

  1. 下载镜像

    docker pull tutum/influxdb2
    
  2. 查询influxdb2是否下载成功

    docker images
    
  3. 打开8083、8086 端口

    firewall-cmd --zone=public --add-port=8083-8086/tcp --permanent
    
  4. 启动influxdb2

    docker run -d -p 8083:8083 -p 8086:8086 --name my_influxdb2 influxdb2
    

    –name就是把influxdb容器命名为 “my_influxdb”

  5. 查看是否启动

    docker ps
    

使用 influxdb2

  1. 访问 http://localhost:8086/ 或者 http://127.0.0.1:8086/ 如果是服务器地址 把localhost换为 ip 即可。点击回车键进入influxDB浏览器访问首页。首次登录会设置用户名,密码,设置完成后点击sign in完成登录。
    在这里插入图片描述

  2. 创建bucket,存储数据

    依次点击Load Data → Bucket → Create Bucket,在弹框中输入bucket名字,然后点击创建,完成bucket的创建,此处创建了一个name为ito的 bucket。
    在这里插入图片描述

详细使用就不多说了,可自行查询资料

spring boot 整合 influxdb2

引入依赖

maven中引入

<dependency>
  <groupId>com.influxdb</groupId>
  <artifactId>influxdb-client-java</artifactId>
  <version>3.1.0</version>
</dependency>

Grade 中引入

dependencies {
  compile "com.influxdb:influxdb-client-java:3.1.0"
}
配置influxdb

新建 influx2.properties 加入influxdb 配置,也可以在yml 中直接加入配置

influx2.properties版

influx2.url= http://localhost:8086
influx2.org= my_influxdb
influx2.token= v16ZLzHrfcn9Qd54chI-sisIGdIe7qP78jz6yKQT8O2bvB74wnaOwQmjH_YEkRUjhuv7i6rYtyupWTrKUy07qw==
influx2.bucket= ito

yml版

spring:
  influx:
    url: http://localhost:8086
    token: v16ZLzHrfcn9Qd54chI-sisIGdIe7qP78jz6yKQT8O2bvB74wnaOwQmjH_YEkRUjhuv7i6rYtyupWTrKUy07qw==
    org: my_influxdb
    bucket: ito

这里跟1.x版本已经有了区别,2.x不再使用user、password作为连接信息,而是改成了token和org,database也变成了bucket

token可在Load Data → API TOKENS→ XXXS Token→编辑 获取token
在这里插入图片描述

或者可通过Load Data → Sources→ java,可看到连接 Java 代码,在代码内复制,推荐使用该方式
在这里插入图片描述

创建InfluxDBProperties

用于获取yml 文件中的配置

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

/**
 * @author skies
 */
@Component
@ConfigurationProperties(prefix = "influx2")
@PropertySource(value = "influx2.properties")
public class InfluxDBProperties {

    private String url;

    private String token;

    private String org;

    private String bucket;

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getToken() {
        return token;
    }

    public void setToken(String token) {
        this.token = token;
    }

    public String getOrg() {
        return org;
    }

    public void setOrg(String org) {
        this.org = org;
    }

    public String getBucket() {
        return bucket;
    }

    public void setBucket(String bucket) {
        this.bucket = bucket;
    }

}
创建InfluxDBConfig
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author skies
 */
@Configuration
public class InfluxDBConfig {

    @Bean
    public InfluxDBClient influxDBClient() {
        return InfluxDBClientFactory.create();
    }
}

时序库操作

influxdb-client-java 提供了系列的工具类,推荐使用

    @Autowired
    InfluxDBClient influxDBClient;
    
    @Autowired
    private InfluxDBProperties properties;
    
    /**
     * 批量数据写入
     */
    public void write(){
        List<Point> list = new ArrayList<>();
        Point point = Point
                .measurement("mem")
                .addTag("host", "host1")
                .addField("used_percent", 23.43234543)
                .time(Instant.now(), WritePrecision.NS); 
        list.add(point);
        influxDBClient.getWriteApi().writePoints(list);
    }
    
    
    /**
     * 单条数据写入
     */
    public void write(){
        List<Point> list = new ArrayList<>();
        Point point = Point
                .measurement("mem")
                .addTag("host", "host1")
                .addField("used_percent", 23.43234543)
                .time(Instant.now(), WritePrecision.NS);
        list.add(point);
        influxDBClient.getWriteApi().writePoint(point);
    }
    /**
     * 删除数据
     */
    public void delete() {
        DeletePredicateRequest deletePredicateRequest = new DeletePredicateRequest();
        deletePredicateRequest.start(LocalDateTime.now().atOffset(ZoneOffset.ofHours(0)).minusDays(5));
        deletePredicateRequest.stop(LocalDateTime.now().atOffset(ZoneOffset.ofHours(0)));

        influxDBClient.getDeleteApi().delete(deletePredicateRequest, properties.getBucket(), properties.getOrg());
    }

	  /**
		 *查询数据
     */
   public void select(){
 		StringBuffer stringBuilder = new StringBuffer();
        InfluxDBFluxExpression.appendCommonFlux(stringBuilder, properties.getBucket(), sn, DateUtils.UTCTime(start), DateUtils.UTCTime(stop));
        InfluxDBFluxExpression.appendTagFlux(stringBuilder, map.get("sn").toString());
        InfluxDBFluxExpression.appendTimeShiftFlux(stringBuilder);
        log.info("查询sql :{}", stringBuilder.toString());
        // 通过时间分组  查询时间段的数据
        List<FluxTable> tables = influxDBClient.getQueryApi().query(stringBuilder.toString());
        List<Map<String, Object>> list = new ArrayList<>();
        for (FluxTable table : tables) {
            List<FluxRecord> records = table.getRecords();
            for (FluxRecord record : records) {
                log.info("{}---{}---{}---{}", record.getMeasurement(),record.getField(),record.getValue(),record.getTime());
            }
        }
   }
    
InfluxDBFluxExpression
import java.util.*;
import java.util.Map.Entry;

/**
 * @Description:
 * @author: skies
 * @date:
 */
public class InfluxDBFluxExpression {

    /**
     * 通用表达式
     *
     * @param buffer
     * @param bucketName 名称
     * @param tableName  表名
     * @param start      开始时间
     * @param stop       结束时间
     */
    public static void appendCommonFlux(StringBuffer buffer, String bucketName, String tableName,
                                        String start, String stop) {
        appendBucketFlux(buffer, bucketName);
        appendTimeRangeFlux(buffer, start, stop);
        appendTableFlux(buffer, tableName);
//		if(timestampFlag) {
//			appendTimestampFlux(buffer);
//		}
//		if(dropDefaultFlag) {
//			appendDropFlux(buffer);
//		}

    }


    /**
     * 数据源(桶)表达式
     *
     * @param buffer
     * @param bucketName 名称
     */
    public static void appendBucketFlux(StringBuffer buffer, String bucketName) {
        buffer.append("from(bucket: \"" + bucketName + "\") ");
    }

    /**
     * 表名表达式
     *
     * @param buffer
     * @param tableName 名称
     */
    public static void appendTableFlux(StringBuffer buffer, String tableName) {
        buffer.append("|> filter(fn: (r) => r._measurement == \"" + tableName + "\") ");
    }

    /**
     * 表名表达式
     *
     * @param buffer
     * @param tag    名称
     */
    public static void appendTagFlux(StringBuffer buffer, String tag) {
        buffer.append("|> filter(fn: (r) => r.tag == \"" + tag + "\") ");
    }

    /**
     * field表达式
     *
     * @param buffer
     * @param field    名称
     */
    public static void appendTagField(StringBuffer buffer, String field) {
        buffer.append("|> filter(fn: (r) => r._field == \"" + field + "\") ");
    }


    /**
     * 时间范围表达式  UTC时间
     *
     * @param buffer
     * @param start  开始时间
     * @param stop   结束时间
     */
    public static void appendTimeRangeFlux(StringBuffer buffer, String start, String stop) {
        if (StringUtils.isBlank(start)) {
            start = "1970-01-01T00:00:00.000Z";
        }
        if (StringUtils.isBlank(stop)) {
            buffer.append("|> range(start:" + start + ") ");
        } else {
            buffer.append("|> range(start:" + start + ", stop:" + stop + ") ");
        }
    }

    /**
     * 删除字段表达式
     *
     * @param buffer
     * @param args   需要删除的字段【 参数为空的话删除host字段】
     */
    public static void appendDropFlux(StringBuffer buffer, String... args) {
        if (args.length == 0) {
            buffer.append("|> drop(columns: [\"host\"]) ");
            return;
        }
        buffer.append("|> drop(columns: [");
        for (int i = 0; i < args.length; i++) {
            if (i != 0) {
                buffer.append(",");
            }
            buffer.append("\"" + args[i] + "\"");
        }
        buffer.append("]) ");
    }

    /**
     * 复制属性列表达式
     *
     * @param buffer
     * @param oldField 原来的字段名称
     * @param newField 新的字段名称
     */
    public static void appendDuplicateFlux(StringBuffer buffer, String oldField, String newField) {
        buffer.append("|> duplicate(column: \"" + oldField + "\", as: \"" + newField + "\") ");
    }

    /**
     * 重命名属性列表达式
     *
     * @param buffer
     * @param oldField 原来的字段名称
     * @param newField 新的字段名称
     */
    public static void appendRenameFlux(StringBuffer buffer, String oldField, String newField) {
        buffer.append(" |> rename(columns: {" + oldField + ": \"" + newField + "\"}) ");
    }


    /**
     * 最新一条数据表达式
     *
     * @param buffer
     */
    public static void appendLastFlux(StringBuffer buffer) {
        buffer.append("|> last() ");
    }

    /**
     * 分页查询
     *
     * @param buffer
     * @param n
     * @param offset
     */
    public static void appendLimitFlux(StringBuffer buffer, int n, int offset) {
        buffer.append("|> limit(n:" + n + ", offset: " + offset + ") ");
    }

    /**
     * 分组表达式
     *
     * @param buffer
     */
    public static void appendGroupFlux(StringBuffer buffer, String... columns) {
        if (columns.length == 0) {
            buffer.append("|> group() ");
        } else {
            buffer.append("|> group(columns:[ ");
            for (int i = 0; i < columns.length; i++) {
                if (i != 0) {
                    buffer.append(",");
                }
                buffer.append("\"" + columns[i] + "\"");
            }
            buffer.append("]) ");
        }

    }

    /**
     * 去重表达式
     *
     * @param buffer
     */
    public static void appendDistinctFlux(StringBuffer buffer, String... columns) {
        if (columns.length == 0) {
            buffer.append("|> distinct() ");
        } else {
            buffer.append("|> distinct(column:\"" + columns[0] + "\") ");
        }

    }

    /**
     * 总数表达式
     *
     * @param buffer
     */
    public static void appendCountFlux(StringBuffer buffer) {
        buffer.append("|> count() ");
    }

    /**
     * 前几条数据
     *
     * @param buffer
     * @param n
     */
    public static void appendTopFlux(StringBuffer buffer, int n) {
        buffer.append("|> top(n:" + n + ") ");
    }

    public static void appendBottomFlux(StringBuffer buffer, int n) {
        buffer.append("|> bottom(n:" + n + ") ");
    }

    /**
     * 排序
     *
     * @param buffer
     * @param descFlag true 降序 ;false 升序
     * @param columns
     */
    public static void appendSortFlux(StringBuffer buffer, boolean descFlag, String... columns) {
        if (columns.length == 0) {
            buffer.append("|> sort(columns: [\"_value\"], desc: " + descFlag + ")");
        } else {
            buffer.append("|> sort(columns:[ ");
            for (int i = 0; i < columns.length; i++) {
                if (i != 0) {
                    buffer.append(",");
                }
                buffer.append("\"" + columns[i] + "\"");
            }
            buffer.append("], desc: " + descFlag + ") ");
        }
    }


    /**
     * 时移八小时
     *
     * @param buffer
     */
    public static void appendTimeShiftFlux(StringBuffer buffer) {
        buffer.append("|> timeShift(duration: 8h) ");
    }

    /**
     * 过滤单个字符表达式
     *
     * @param buffer
     * @param list
     * @param operator  【== != 】
     * @param join      【and or】
     * @param fieldName
     */
    public static void appendFilterFlux(StringBuffer buffer, List<String> list, String operator, String join, String fieldName) {
        if (list == null || list.size() == 0) {
            return;
        }
        for (int i = 0, size = list.size(); i < size; i++) {
            if (i == 0) {
                buffer.append("|> filter(fn: (r) =>");
            } else {
                buffer.append(join);
            }
            buffer.append(" r." + fieldName + " " + operator + " \"" + list.get(i) + "\" ");
        }
        buffer.append(")  ");
    }

    /**
     * 过滤表达式
     *
     * @param buffer
     * @param map
     * @param operator 【== != 】
     * @param join     【and or】
     */
    public static void appendFilterFlux(StringBuffer buffer, Map<String, Object> map, String operator, String join) {
        Set<Entry<String, Object>> entrySet = map.entrySet();
        Iterator<Entry<String, Object>> iterator = entrySet.iterator();
        boolean flag = true;
        while (iterator.hasNext()) {
            Entry<String, Object> next = iterator.next();
            String key = next.getKey();
            Object value = next.getValue();
            if (flag) {
                buffer.append("|> filter(fn: (r) =>");
                flag = false;
            } else {
                buffer.append(join);
            }
            buffer.append(" r." + key + " " + operator + " \"" + value + "\" ");
        }
        if (!flag) {
            buffer.append(")  ");
        }

    }

    /**
     * 过滤多个字段表达式
     *
     * @param buffer
     * @param list
     * @param innerJoin 【and or】
     * @param operator  【== != 】
     * @param outerJoin 【and or】
     */
    public static void appendMulFilterFlux(StringBuffer buffer, List<Map<String, Object>> list, String innerJoin, String operator, String outerJoin) {
        if (list == null || list.size() == 0) {
            return;
        }
        buffer.append("|> filter(fn: (r) => ");
        boolean outerFlag = true;
        for (int i = 0; i < list.size(); i++) {
            Map<String, Object> map = list.get(i);
            Set<Entry<String, Object>> entrySet = map.entrySet();
            Iterator<Entry<String, Object>> iterator = entrySet.iterator();
            boolean innerFlag = true;
            while (iterator.hasNext()) {
                Entry<String, Object> next = iterator.next();
                String key = next.getKey();
                Object value = next.getValue();
                if (innerFlag) {
                    if (outerFlag) {
                        outerFlag = false;
                    } else {
                        buffer.append(outerJoin);
                    }
                    buffer.append(" ( ");
                    innerFlag = false;
                } else {
                    buffer.append(innerJoin);
                }
                buffer.append("  r." + key + " " + operator + " \"" + value + "\"  ");
            }
            if (!innerFlag) {
                buffer.append(" )  ");
            }
        }
        buffer.append(" )  ");

    }

    /**
     * 时间窗口统计
     *
     * @param buffer
     * @param step    步长值【10m,1h,1d...】
     * @param aggType 统计类型【sum,count,min,max...)
     */
    public static void appendAggregateWindowFlux(StringBuffer buffer, String step, String aggType) {
        buffer.append("|> aggregateWindow(every: " + step + ", fn: " + aggType + ") ");
    }

    public static void appendWindowFlux(StringBuffer buffer, String step) {
        buffer.append("|> window(every: " + step + ") ");
    }

    /**
     * 不带时间窗口统计
     *
     * @param buffer
     * @param aggType 统计类型【sum,count,min,max...)
     */
    public static void appendAggregateFlux(StringBuffer buffer, String aggType) {
        buffer.append("|> " + aggType + "() ");
    }


    /**
     * 多个查询结果需要指定每个输出结果名称
     *
     * @param buffer
     * @param name
     */
    public static void appendYieldFlux(StringBuffer buffer, String name) {
        buffer.append("|> yield(name: \"" + name + "\") ");
    }

    /**
     * 将时间指定为某单位
     *
     * @param buffer
     * @param step
     */
    public static void appendTruncateTimeColumn(StringBuffer buffer, String step) {
        buffer.append("|> truncateTimeColumn(unit: " + step + ") ");
    }

    /**
     * 导入包名
     *
     * @param buffer
     * @param name   包名
     */
    public static void appendImportFlux(StringBuffer buffer, String name) {
        buffer.append("import \"" + name + "\" ");
    }

    /**
     * 过滤空值
     *
     * @param buffer
     */
    public static void appendExistsFlux(StringBuffer buffer) {
        buffer.append("|> filter(fn: (r) => exists r._value ) ");
    }


    /**
     * 过滤0值
     *
     * @param buffer
     */
    public static void appendZeroFlux(StringBuffer buffer) {
        buffer.append("|> filter(fn: (r) => r._value > 0) ");
    }

}
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐