本文介绍spring中使用Kafka的三种方式,其中container方式最灵活,但是开发相对较复杂,stream方式使用最简便,listener方式由于提供的最早,使用的较普遍。
具体的代码参照 示例项目 https://github.com/qihaiyan/springcamp/tree/master/spring-kafka

一、概述

在实际项目中,用到kafka的场景非常普遍,特别是事件驱动的编程模式,kafka基本是标配。

二、KafkaListener

KafkaListener应该是目前使用比较多的一种方式,开发简单,易于理解。但是从易用性的角度,应该会逐步被spring-cloud-stream所替代。
KafkaListener是一个注解,在对应的方法上加上这个注解,方法就可以处理接收到的kakfa消息,注解中通过topics参数指定需要消费的kakfa的topic,topics参数支持SPEL表达式,可以同时消费多个kafka topic:

@KafkaListener(topics = "test-topic")
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        this.payload = consumerRecord.value();
        log.info("received payload='{}'", payload);
    }

带注解的方法的入参是一个ConsumerRecord变量,存放了kafka中接收到的消息。

同时需要对kafka进行配置,可以指定kakfa服务器的地址,以及序列化方式:

spring.kafka:
    bootstrap-servers: 192.168.1.1:2181
    consumer:
      group-id: utgroup
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

三、ConcurrentMessageListenerContainer

通过ConcurrentMessageListenerContainer可以已可编程的方式来处理kafka消息,这种方式的好处是topic是在程序中指定的,这样可以将topic的配置存贮在任何地方,比如数据库中,也可以按照不同的条件分支指定不同的topic,非常灵活。这是其它两种方式做不到的。

ConcurrentMessageListenerContainer的配置:

@Component
public class MessageListenerContainerConsumer {

    public static final String LISTENER_CONTAINER_TOPIC = "container-topic";

    public Set<String> consumedMessages = new HashSet<>();

    @PostConstruct
    void start() {
        MessageListener<String, String> messageListener = record -> {
            System.out.println("MessageListenerContainerConsumer received message: " + record.value());
            consumedMessages.add(record.value());
        };

        ConcurrentMessageListenerContainer<String, String> container =
                new ConcurrentMessageListenerContainer<>(
                        consumerFactory(),
                        containerProperties(LISTENER_CONTAINER_TOPIC, messageListener));

        container.start();
    }

    private DefaultKafkaConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                new HashMap<String, Object>() {
                    {
                        put(BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.kafka.bootstrap-servers"));
                        put(GROUP_ID_CONFIG, "groupId");
                        put(AUTO_OFFSET_RESET_CONFIG, "earliest");
                    }
                },
                new StringDeserializer(),
                new StringDeserializer());
    }

    private ContainerProperties containerProperties(String topic, MessageListener<String, String> messageListener) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(messageListener);
        return containerProperties;
    }
}

以上代码定义了MessageListenerContainerConsumer这个类,是一个spring的bean,在PostConstruct这个bean的初始化代码中,我们使用了ConcurrentMessageListenerContainer,并指定了topic public static final String LISTENER_CONTAINER_TOPIC = "container-topic",在这个为了便于演示我们使用了一个常量,实际上这个topic的值可以是任意变量,可以从数据库中读取,也可以通过实际的场景动态计算,这样就做到了topic的灵活配置。

四、spring-cloud-stream

spring-cloud-stream是springcloud的一个子项目,这个项目的目标是一个事件驱动(Event-Driven)的编程框架。spring-cloud-stream对kafka进行了非常好的抽象,除了kakfa,还支持RabbitMQ,程序中除了配置文件外,完全看不到kafka的痕迹,意味着我们在开发的时候不需要关心底层的kakfa的细节,如果像从kafka切换到RabbitMQ,只需要修改一下引入的jar包和配置文件。

详细介绍见spring的官方文档: https://spring.io/projects/spring-cloud-stream

spring-cloud-stream是用了spring-cloud-function,我们只需要在程序中实现一个function接口,就可以处理kafka消息,编写非常简单。

i@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public Function<String, Object> handle() {
        return String::toUpperCase;
    }
}

除了配置文件外,代码中只需要一行代码public Function<String, Object> handle()就实现了kafka消息的处理,这行代码完全看不出跟kakfa有什么关系,就是一个普通的方法,把抽象做到了极致。
需要注意方法的名字于配置文件中的名字要匹配。

配置:

spring:
  cloud.stream:
    bindings:
      handle-in-0:
        destination: testEmbeddedIn
        content-type: text/plain
        group: utgroup
      handle-out-0:
        destination: testEmbeddedOut
    kafka:
      binder:
        brokers: 192.168.1.1:2181
        configuration:
          key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
          value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer

注意配置文件中 handle-in-0handle-out-0这2行配置,handle指的就是前面代码中的public Function<String, Object> handle()handle这个方法。spring-cloud-stream就是通过方法名和配置文件中配置项的名字,来确立代码和配置的匹配关系,这也是约定优于配置的编程思想的体现。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐