1、下载安装zk,kafka...(大把教程,不在这里过多阐述)

2、引入pom

		<!--kafka-->
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

3、Kafka配置

# kafka bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
spring.kafka.bootstrap-servers=localhost:9092
# 设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=single
## 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
spring.kafka.listener.ack-mode=batch

4、生产者配置

# producer 消费生产者配置-----

# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks=all

# 重试次数 若设置大于0的值,客户端会将发送失败的记录重新发送
spring.kafka.producer.retries=3

# 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少de请求中。这有助于提升客户端和服务器端的性能。
# 这个配置控制一个批次的默认大小(单位 byte)。16384是缺省的配置
spring.kafka.producer.batch-size=16384

# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka,33554432是缺省配置
spring.kafka.producer.buffer-memory=33554432

# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=1000

# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
spring.kafka.producer.properties.max.block.ms=6000

# 关键字的序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# 值的序列化类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

5、生产者发消息的工具类


import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 生产者发送消息工具类
 *
 * @ClassName KafkaSenderUtils
 * @Author destiny
 * @Date 2023/4/21 11:04
 */
@Component
@Slf4j
public class KafkaSenderUtils {
    private final KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 构造器方式注入 kafkaTemplate
     */
    public KafkaSenderUtils(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String topicName, String msg) {
        try {

            ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, msg);
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    log.info("发送成功回调:{}", JSONUtil.toJsonStr(result.getProducerRecord().value()));
                }

                @Override
                public void onFailure(@NonNull Throwable ex) {
                    log.error(">>>>失败原因:{}", ex.getMessage());
                    log.info("发送失败回调");
                }
            });
        } catch (Exception e) {
            log.info("发送异常");
            e.printStackTrace();
        }

    }
}

6、消费着配置

# consumer 消费端的配置,需要给consumer配置一个group-id
spring.kafka.consumer.group-id=test
# key的编解码方法
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value的编解码方法
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
# earliest:无提交记录,从头开始消费
# latest:无提交记录,从最新的消息下一条开始消费
spring.kafka.consumer.auto-offset-reset=latest
# 是否自动提交偏移量offset 默认 true
spring.kafka.consumer.enable-auto-commit=false
# 自动提交的频率。前提是 enable-auto-commit=true 单位 ms
spring.kafka.consumer.auto-commit-interval=100ms
# 一次调用poll()返回的最大记录数,默认是500(批量消费的数量)
spring.kafka.consumer.max-poll-records=5
# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
spring.kafka.consumer.properties.session.timeout.ms=120000
# 请求超时 单位 ms
spring.kafka.consumer.properties.request.timeout.ms=120000

7、消费者配置类(配置批量消费)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;

import java.util.HashMap;
import java.util.Map;

/**
 * 卡夫卡消费者配置
 *
 * @ClassName KafkaConsumerConfig
 * @Author destiny
 * @Date 2023/2/2 18:53
 */
@Slf4j
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    /**
     * kafka 集群,broker-list
     */
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    /**
     * 开启自动提交
     */
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    /**
     * 消费者组
     */
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    /**
     * 重置消费者的offset
     */
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    /**
     * 批量拉取个数
     */
    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPollRecords;
    @Value("${spring.kafka.consumer.properties.session.timeout.ms}")
    private String sessionTimeout;

    @Value("${spring.kafka.consumer.properties.request.timeout.ms}")
    private String requestTimeout;

   
    /**
     * 卡夫卡侦听器容器工厂
     *
     * @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link String}, {@link String}>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        // 设置消费者组中的线程数量
        factory.setConcurrency(3);
        // 设置轮询超时
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    /**
     * 消费者工厂
     *
     * @return {@link ConsumerFactory}<{@link Integer}, {@link String}>
     */
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * 消费者配置
     *
     * @return {@link Map}<{@link String}, {@link Object}>
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // 自动提交 offset 默认 true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // 自动提交的频率 单位 ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        // 批量消费最大数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        // 消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        // 请求超时
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
        // Key 反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Value 反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 当kafka中没有初始offset或offset超出范围时将自动重置offset
        // earliest:重置为分区中最小的offset
        // latest:重置为分区中最新的offset(消费分区中新产生的数据)
        // none:只要有一个分区不存在已提交的offset,就抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

       
        return props;
    }

    /**
     * kafka批量监听
     *
     * @return {@link KafkaListenerContainerFactory}<{@link ConcurrentMessageListenerContainer}<{@link Integer}, {@link String}>>
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        // 设置是否开启批量监听
        factory.setBatchListener(true);
        // 设置消费者组中的线程数量
        factory.setConcurrency(3);

        return factory;
    }

    /**
     * 消费异常处理器
     *
     * @return {@link ConsumerAwareListenerErrorHandler}
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return (message, exception, consumer) -> {
            // 打印消费异常的消息和异常信息
            log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());
            return exception;
        };
    }
}

8、测试类分别测试单条消费以及批量消费


import cn.hutool.json.JSONUtil;
import com.google.common.collect.Maps;
import com.xiaoju.framework.entity.common.ResultMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * @ClassName KafkaDemo
 * @Author destiny
 * @Date 2023/2/1 16:32
 */
@RestController
@Slf4j
@RequestMapping(value = {"/api/kafka"})
public class KafkaDemo {

    @Resource
    private KafkaSenderUtils kafkaSenderUtils;

    @GetMapping(value = {"/test"})
    public void test() {
        // 单条消费测试
        String message = "message";
        kafkaSenderUtils.send("testSingle", message);
    }

    @GetMapping(value = {"/testBatch"})
    public void testBatch() {
        // 批量消费测试
        String message = "message";
        for (int i = 1; i <= 20; i++) {
            HashMap<String, Object> map = Maps.newHashMap();
            map.put("id", i);
            map.put("message", i + ":" + message + System.currentTimeMillis());
            kafkaSenderUtils.send("batchTest", JSONUtil.toJsonStr(map));
        }
    }

}

9、消费者消费

package cn.ctyuncdn.consumer.kafka;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;

/**
 * Kafka消费者
 *
 * @author destiny
 * @date 2023/02/06
 */
@Component
@Slf4j
public class KafkaConsumer {

    // 单条消费
    @KafkaListener(id = "testSingle", topics = {"testSingle"}, groupId = "${spring.kafka.consumer.group-id}")
    public void testSingle(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String msg = record.value();
        log.info("消费者接受消息:topic-->" + topic + ",msg->>" + msg);
    }

    // 批量消费
    @KafkaListener(id = "batchTest", topics = {"batchTest"}, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "batchFactory")
    public void batchTest(List<ConsumerRecord<String, String>> records) {
        log.info(">>>consumer batch size ===>>{}", records.size());
        for (ConsumerRecord<String, String> record : records) {
            String topic = record.topic();
            String msg = record.value();
            log.info("消费者接受消息:topic-->" + topic + ",msg->>" + msg);
        }
    }

}

完结。。。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐