之前实际开发都是用的blink-sql模式,还没实际开发过streaming模式的作业,有个简单的例子,特意记录下

因为下游我们是odps,目前没有jar包,这个之后调通了再加上

作业描述:

     上游是kafka,原始数据格式为json数组,输出为json对象,本地日志打印输出,source连接参照阿里云的例子

package com.alibaba.blink.datastreaming;

import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.io.Serializable;
import java.util.Properties;

public class AliKafkaConsumerDemo implements Serializable {
    public static void main(String[] args) throws Exception {
        AliKafkaConsumerDemo aliKafkaConsumerDemo = new AliKafkaConsumerDemo();
        aliKafkaConsumerDemo.runExample();
    }
    public void runExample() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //全局设置并行度
        env.setParallelism(1);


        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //加载kafka.properties
        Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //可更加实际拉去数据和客户的版本等设置此值,默认30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //当前消费实例所属的消费组,请在控制台申请之后填写
        //属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<String>(kafkaProperties.getProperty("topic"), new SimpleStringSchema(), props);



        DataStreamSource<String> stringDataStream = env.addSource(kafkaConsumer);

        DataStream<JsonArray> mapStream = stringDataStream.map(new MapFunction<String, JsonArray>() {
            @Override
            public JsonArray map(String value) throws Exception {
                return new JsonParser().parse(value).getAsJsonArray();
            }
        });

        SingleOutputStreamOperator<String> flatMapStream = mapStream.flatMap(new FlatMapFunction<JsonArray, String>() {
            @Override
            public void flatMap(JsonArray value, Collector<String> out) throws Exception {
                for (int i = 0; i < value.size(); i++) {
                    String field = value.get(i).getAsJsonObject().toString();
                    out.collect(field);
                }
            }
        });

        //本地打印输出
        flatMapStream.print();

        //写kafka
        // 拆出来的jsonObject输出到kafka
        Properties properties = new Properties();
                
        properties.setProperty("bootstrap.servers","ip:port");
        flatMapStream.addSink( new FlinkKafkaProducer010<String>("dflink_sink_tst", (SerializationSchema<String>) new SimpleStringSchema(),properties));


        env.execute("alikafkaconsumerdemo");
    }
}

结果:

GitHub 加速计划 / js / json
41.72 K
6.61 K
下载
适用于现代 C++ 的 JSON。
最近提交(Master分支:1 个月前 )
960b763e 4 个月前
8c391e04 6 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐