GitCode 开源社区 Kafka生产者流程分析
cover

Kafka生产者流程分析

1.数据生成流程解析

Kafka生产者流程分析_kafka

步骤如下:

  • 1.Producer创建时,会创建一个Sender线程并设置为守护线程
  • 2.生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
  • 3.批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个
  • 4.批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试
  • 5.落盘到broker成功,返回生产元数据给生产者
  • 6.元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回

2. 必要参数配置

2.1. Broker配置

Properties props = new Properties();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");


        // acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。
        //acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
        //这是Kafka最强的可靠性保证,等效于 acks=-1
        props.put("acks", "all");
        // 设置key的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置value的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("retries",3);
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

特别说明

  • 生产者发送消息到 Kafka 集群,是以推送的方式发送的
  • 生产者只需要连接任意一台 Kafka 节点即可
  • 生产者发送的消息会被封装成为一个 record 对象,其中包含了发送的主题,分区,key,value

3.序列化器

Kafka生产者流程分析_序列化_02

Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。序列化器的作用就是用于序列化要发送的消息

Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。

/**
 *将对象转换为byte数组的接口
 *
 * 该接口的实现类需要提供无参构造器
 */
public interface Serializer<T>  extends Closeable
{
    /**
     *  类的配置信息
     * @param configs key/value pairs
     * @param isKey key的序列化还是value的序列化
     */
    default void configure(Map<String, ?> configs, boolean isKey) {}

    /**
     *  将对象转换为字节数组
     * @param paramString
     * @param paramT
     * @return
     */
    byte[] serialize(String paramString, T paramT);

    /**
     * 将对象转换为字节数组
     * @param topic 主题名称
     * @param headers
     * @param data 需要转换的对象
     * @return
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * 关闭序列化器
     * 该方法需要提供幂等性,因为可能调用多次。
     */
    default void close() {}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.

系统提供了该接口的子接口以及实现类:实现类如下

  • org.apache.kafka.common.serialization.ByteArraySerializer
public class ByteArraySerializer implements Serializer<byte[]>
{
    public byte[] serialize(String topic, byte[] data) {
        return data;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

-org.apache.kafka.common.serialization.ByteBufferSerializer

public class ByteBufferSerializer implements Serializer<ByteBuffer>
{
    public byte[] serialize(String topic, ByteBuffer data) {
        if (data == null) {
            return null;
        }
        data.rewind();
        
        if (data.hasArray()) {
            byte[] arr = data.array();
            if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
                return arr;
            }
        } 
        
        byte[] ret = new byte[data.remaining()];
        data.get(ret, 0, ret.length);
        data.rewind();
        return ret;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • org.apache.kafka.common.serialization.BytesSerializer
public class BytesSerializer implements Serializer<Bytes>
{
    public byte[] serialize(String topic, Bytes data) {
        if (data == null) {
            return null;
        }
        return data.get();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • org.apache.kafka.common.serialization.IntegerSerializer
public class IntegerSerializer implements Serializer<Integer>
{
    public byte[] serialize(String topic, Integer data) {
        if (data == null) {
            return null;
        }
        return new byte[] {
                (byte)(data.intValue() >>> 24), 
                (byte)(data.intValue() >>> 16), 
                (byte)(data.intValue() >>> 8), data
                .byteValue()
            };
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • org.apache.kafka.common.serialization.StringSerializer
public class StringSerializer implements Serializer<String>
{
    private String encoding = StandardCharsets.UTF_8.name();

    
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding"); 
        if (encodingValue instanceof String) {
            this.encoding = (String)encodingValue;
        }
    }
    
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null) {
                return null;
            }
            return data.getBytes(this.encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
        } 
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

3.1.自定义序列化器

自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的 serialize 方法

生产者定义自定义序列化器

public class MyUserProducer {
    public static void main(String[] args) {

        Properties props = new Properties();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");


        // acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。
        //acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
        //这是Kafka最强的可靠性保证,等效于 acks=-1
        props.put("acks", "all");
        // 设置key的序列化器
        props.put("key.serializer", StringSerializer.class);
        // 设置value的序列化器
        props.put("value.serializer",UserSerializer.class);
        props.put("retries",3);
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, User> producer = new KafkaProducer<String, User>(props);

        User user = new User();
        user.setUserId(1001);
        user.setUsername("张三");

        ProducerRecord<String, User> record = new ProducerRecord<>(
                "zbbmeta-02",
                0,
                user.getUsername(),
                user
        );

        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("消息发送成功:"
                        + metadata.topic() + "\t"
                        + metadata.partition() + "\t"
                        + metadata.offset());
            } else {
                System.out.println("消息发送异常");
            }
        });
        // 关闭生产者
        producer.close();
    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.

4.分区器

Kafka生产者流程分析_apache_03

默认(DefaultPartitioner)分区计算:思考:发送出去的消息是如何进行分区的呢?

  • 1.如果是指定了分区,那么就会按照指定的分区将所有的 value 都发送到一个分区中(优先级最高)

  1. 如果是指定了 key 但是没有指定分区,那么就会按照 DefaultPartitioner 这个类进行分区,底层使用 hash 取模的方式进行分区

  • 1.会首先在可用的分区中分配分区号

  • 2.如果没有可用的分区,则在该主题所有分区中分配分区号

  1. 如果是没有指定分区和 key,那么就会按照轮循的方式进行循环分区

  • 4.可以自定义分区类(根据 DefaultPartitioner 来进行模仿即可)

在这个过程中的 key 只是逻辑上的一个业务标记(key 是可以重复的),而 value 才是消息中的真正内容

4.1. 自定义分区器

自定义分区器步骤;


  1. 首先开发Partitioner接口的实现类

  1. 在KafkaProducer中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")

位于 org.apache.kafka.clients.producer 中的分区器接口:

/**
 * 分区器接口
 */
public interface Partitioner extends Configurable, Closeable {
    /**
     * 为指定的消息记录计算分区值
     * @param paramString
     * @param paramObject1
     * @param paramArrayOfbyte1
     * @param paramObject2
     * @param paramArrayOfbyte2
     * @param paramCluster
     * @return
     */
    int partition(String paramString, Object paramObject1, byte[] paramArrayOfbyte1, Object paramObject2, byte[] paramArrayOfbyte2, Cluster paramCluster);

    /**
     * 关闭分区器的时候调用该方法
     */
    void close();
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.

仿照org.apache.kafka.clients.producer.internals 中分区器的默认实现进行实现自定义分区器

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定义分区器
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 此处可以计算分区的数字。
        // 我们直接指定为2
        return 2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

-生产者

Properties props = new Properties();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        // 指定自定义的分区器
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        // acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。
        //acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
        //这是Kafka最强的可靠性保证,等效于 acks=-1
        props.put("acks", "all");
        // 设置key的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 设置value的序列化器
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("retries",3);

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);


        // 此处不要设置partition的值
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                "zbbmeta-01",
                "mykey",
                "myvalue"
        );

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.out.println("消息发送失败");
                } else {
                    System.out.println(metadata.topic());
                    System.out.println(metadata.partition());
                    System.out.println(metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.

5. 拦截器

Kafka生产者流程分析_apache_04

image

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

5.1 自定义拦截器

自定义拦截器步骤:


  1. 实现ProducerInterceptor接口

  1. 在KafkaProducer的设置中设置自定义的拦截器

自定义拦截器1

ublic class MyInterceptorOne<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {


    @Override
    public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {
        System.out.println("拦截器1---go");
// 此处根据业务需要对相关的数据作修改
        String topic = record.topic();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        KEY key = record.key();
        VALUE value = record.value();
        Headers headers = record.headers();
// 添加消息头
        headers.add("interceptor", "MyInterceptorOne".getBytes());
        ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,
                VALUE>(
                topic,
                partition,
                timestamp,
                key,value,
                headers
        );
        return newRecord;
    }
    

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器1---back");
        if (exception != null) {
// 如果发生异常,记录日志中
            System.err.println(exception.getMessage());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.

自定义拦截器2

public class MyInterceptorTwo<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {


    @Override
    public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {
        System.out.println("拦截器2---go");
// 此处根据业务需要对相关的数据作修改
        String topic = record.topic();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        KEY key = record.key();
        VALUE value = record.value();
        Headers headers = record.headers();
// 添加消息头
        headers.add("interceptor", "MyInterceptorTwo".getBytes());
        ProducerRecord<KEY, VALUE> newRecord = new ProducerRecord<KEY,
                VALUE>(
                topic,
                partition,
                timestamp,
                key,value,
                headers
        );
        return newRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("拦截器2---back");
        if (exception != null) {
// 如果发生异常,记录日志中
            System.err.println(exception.getMessage());
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.

生产者

Properties props = new Properties();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        // 指定自定义的分区器
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        // acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。
        //acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
        //这是Kafka最强的可靠性保证,等效于 acks=-1
        props.put("acks", "all");
        // 设置key的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 设置拦截器
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                "MyInterceptorOne," +
                        "MyInterceptorTwo"
        );

        // 设置value的序列化器
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("retries",3);

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);


        // 此处不要设置partition的值
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                "zbbmeta-01",
                "mykey",
                "myvalue"
        );

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.out.println("消息发送失败");
                } else {
                    System.out.println(metadata.topic());
                    System.out.println(metadata.partition());
                    System.out.println(metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.

运行结果: 说明拦截器生效

Kafka生产者流程分析_序列化_05

原创作者: maguobin 转载于: https://blog.51cto.com/maguobin/11715425
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐

  • 浏览量 106
  • 收藏 0
  • 0

所有评论(0)

查看更多评论 
已为社区贡献1条内容