# 情景介绍
目前要对flink进行多数据源适配工作,需要支持的有pclickhouse,elasticsearch


# 版本介绍
flink:1.13.1
elasticsearch:7.6.2
clickhouse:21.9.3.30


# 参考文献
# github
flink全量案例demo:https://github.com/zhisheng17/flink-learning
flink-connector连接器:https://github.com/DTStack/flinkx
flink-sink-clickhouse:https://github.com/ivi-ru/flink-clickhouse-sink

# csdn
jdbc模式flink写入clickhouse:https://blog.csdn.net/weixin_32265569/article/details/112133937
中间件模式flink写入clickhouse:https://blog.csdn.net/weixin_32265569/article/details/112133937


注:以防原文连接丢失,此处综合两种方式的内容如下所示


flink-sink-clickhouse/elasticsearch
我亲测的github项目地址:https://github.com/ainusers/flink-adapter-datasource

# jdbc模式 - flink写入clickhouse


1. clickhouse jdbc 依赖
<!-- 写入数据到clickhouse -->
<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.1.54</version>
</dependency>


2. User实体类
package com.lei.domain;
 
public class J_User {
    public int id;
    public String name;
    public int age;
 
    public J_User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
 
    public static J_User of(int id, String name, int age) {
        return new J_User(id, name, age);
    }
}


3. 工具类
package com.lei.util;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
 
 
public class ClickHouseUtil {
    private static Connection connection;
 
    public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String  address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
        connection = DriverManager.getConnection(address);
        return connection;
    }
 
    public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
        return getConn(host,port,"default");
    }
    public static Connection getConn() throws SQLException, ClassNotFoundException {
        return getConn("node-01",8123);
    }
    public void close() throws SQLException {
        connection.close();
    }
}


4. 自定义clickhouse-connector
package com.lei.util;
 
import com.lei.domain.J_User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
import java.sql.Connection;
import java.sql.PreparedStatement;
 
 
public class J_MyClickHouseUtil extends RichSinkFunction<J_User> {
    Connection connection = null;
 
    String sql;
 
    public J_MyClickHouseUtil(String sql) {
        this.sql = sql;
    }
 
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = ClickHouseUtil.getConn("node-01", 8123, "default");
    }
 
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
    }
 
    @Override
    public void invoke(J_User user, Context context) throws Exception {
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        preparedStatement.setLong(1, user.id);
        preparedStatement.setString(2, user.name);
        preparedStatement.setLong(3, user.age);
        preparedStatement.addBatch();
 
        long startTime = System.currentTimeMillis();
        int[] ints = preparedStatement.executeBatch();
        connection.commit();
        long endTime = System.currentTimeMillis();
        System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
    }
}


5. 主程序测试类
package com.lei.sinktest;
 
import com.lei.domain.J_User;
import com.lei.util.J_MyClickHouseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
/*
    进入clickhouse-client
    use default;
    drop table if exists user_table;
    CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
 */
public class J05_ClickHouseSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
 
        // source
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
 
        // Transform 操作
        SingleOutputStreamOperator<J_User> dataStream = inputStream.map(new MapFunction<String, J_User>() {
            @Override
            public J_User map(String data) throws Exception {
                String[] split = data.split(",");
                return J_User.of(Integer.parseInt(split[0]),
                        split[1],
                        Integer.parseInt(split[2]));
            }
        });
 
        // sink
        String sql = "INSERT INTO default.user_table (id, name, age) VALUES (?,?,?)";
        J_MyClickHouseUtil jdbcSink = new J_MyClickHouseUtil(sql);
        dataStream.addSink(jdbcSink);
        dataStream.print();
 
        env.execute("clickhouse sink test");
    }
}


6. 创建clickhouse表
-- 进入clickhouse-client
use default;
drop table if exists user_table;
 
CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();

 

# 中间件模式 - flink写入clickhouse


1. 添加clickhouse-maven依赖
<dependency>
    <groupId>ru.ivi.opensource</groupId>
    <artifactId>flink-clickhouse-sink</artifactId>
    <version>1.2.0</version>
</dependency>


2. User实体类
package com.lei.domain;
 
public class J_User {
    public int id;
    public String name;
    public int age;
 
    public J_User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
 
    public static J_User of(int id, String name, int age) {
        return new J_User(id, name, age);
    }
 
    // Java Bean 必须实现的方法,信息通过字符串进行拼接
    public static String convertToCsv(J_User user) {
        StringBuilder builder = new StringBuilder();
        builder.append("(");
 
        // add user.id
        builder.append(user.id);
        builder.append(", ");
 
        // add user.name
        builder.append("'");
        builder.append(String.valueOf(user.name));
        builder.append("', ");
 
        // add user.age
        builder.append(user.age);
 
        builder.append(" )");
        return builder.toString();
    }
}


3. flink测试方法
package com.lei.sinktest;
 
import com.lei.domain.J_User;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 
/*
    进入clickhouse-client
    use default;
    drop table if exists user_table;
    CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog();
 */
public class J05_ClickHouseSinkTestByLib {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        Map<String, String> globalParameters = new HashMap<>();
 
        // ClickHouse cluster properties
        globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://node-01:8123/");
        //globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
        //globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);
 
        // sink common
        globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1");
        globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/");
        globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2");
        globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2");
        globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2");
        globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false");
 
        // set global paramaters
        ParameterTool parameters = ParameterTool.fromMap(globalParameters);
        env.getConfig().setGlobalJobParameters(parameters);
 
        env.setParallelism(1);
 
        // source
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
 
        // Transform 操作
        SingleOutputStreamOperator<String> dataStream = inputStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String data) throws Exception {
                String[] split = data.split(",");
                J_User user = J_User.of(Integer.parseInt(split[0]),
                        split[1],
                        Integer.parseInt(split[2]));
                return J_User.convertToCsv(user);
            }
        });
 
        // create props for sink
        Properties props = new Properties();
        props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "default.user_table");
        props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
        ClickHouseSink sink = new ClickHouseSink(props);
        dataStream.addSink(sink);
        dataStream.print();
 
        env.execute("clickhouse sink test");
    }
}


4. 结果验证

 

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐