Apache Kafka流处理架构:分区与消费者组深度解析

源码版本:Apache Kafka 3.6.x(kafka/core/src/main/scala/kafka/cluster/Partition.scala)
阅读时间:约 25 分钟
难度等级:⭐⭐⭐⭐☆


📚 前言:为什么 Kafka 如此重要?

在当今流数据爆炸的时代,Apache Kafka 已成为企业级流处理平台的事实标准。从 LinkedIn 的 1 万亿级消息日处理量,到 Uber 的实时事件系统,Kafka 凭借其高吞吐、低延迟、可扩展的特性,支撑着全球最严苛的生产环境。

但很多开发者对 Kafka 的理解停留在"会调用 API"层面:

  • ❌ 不清楚为什么需要消费者组
  • ❌ 不理解分区如何影响性能?
  • ❌ 不掌握再平衡的触发时机?
  • ❌ 更不懂得如何保证Exactly Once 语义?

本文将从架构设计的角度,深入剖析 Kafka 的核心机制,帮助你构建完整的知识体系。


🔥 一、Kafka 架构与核心概念:全景视图

1.1 整体架构图

渲染错误: Mermaid 渲染失败: Parse error on line 43: ...oKeeper <--> Broker 1 ZooKeeper <--> -----------------------^ Expecting 'SEMI', 'NEWLINE', 'EOF', 'AMP', 'START_LINK', 'LINK', 'LINK_ID', got 'NUM'

1.2 核心概念对比

概念 职责 关键特性 生产建议
Broker Kafka 服务器节点 存储+转发消息,水平扩展 至少 3 台,奇数台便于选举
Topic 逻辑消息分类 逻辑概念,非物理存储 按业务域划分,避免过大
Partition 物理存储单元 顺序写入,并行消费 根据吞吐量设置,初始保守
Replica 数据副本机制 Leader 读写,Follower 同步 生产环境至少 3 副本
Consumer Group 逻辑消费者组 组内单消费,组间广播 每个独立消费者应用使用独立组

1.3 核心概念详解

1.3.1 Broker(代理服务器)

Broker 是 Kafka 集群的核心节点,每个 Broker 都有唯一标识(broker.id),主要职责:

  • 存储消息:消息以追加日志(Append-Only Log)形式持久化到磁盘
  • 响应请求:处理生产者的写入请求和消费者的读取请求
  • 副本同步:作为 Leader 时处理读写,作为 Follower 时从 Leader 拉取数据
// 源码位置:kafka/core/src/main/scala/kafka/server/KafkaServer.scala
// Kafka Broker 启动入口
def startup(): Unit = {
  try {
    // 1. 启动定时任务管理器
    kafkaScheduler.startup()
    
    // 2. 启动日志管理器(负责消息存储)
    logManager.startup()
    
    // 3. 启动副本管理器(负责副本同步)
    replicaManager.startup()
    
    // 4. 启动网络处理器(处理客户端请求)
    socketServer.startup()
    
    // 5. 启动请求处理模块
    requestHandlerPool.startup()
    
    info("Kafka Server started")
  }
}
1.3.2 Topic(主题)

Topic 是逻辑上的消息分类,本质上是分区的命名容器

  • 命名规范:建议使用 . 分隔业务域,如 order.createduser.login
  • 保留策略:基于时间(7 天)或大小(1GB)自动清理
  • 分区数:创建后可增加但不减少(因为涉及哈希重分配)
1.3.3 Partition(分区)

分区是 Kafka 并行能力的核心,每个分区是一个有序的、不可变的消息序列

// 源码位置:kafka/core/src/main/scala/kafka/cluster/Partition.scala
class Partition(val topic: String, 
                val partitionId: Int,
                time: Time) extends Logging {
  
  // 分区的副本集合
  private val replicaStateMachine = new ReplicaStateMachine
  
  // 分区的 Leader 副本
  private var leaderReplicaIdOpt: Option[Int] = None
  
  // 分区的所有消息日志
  private val log: Log = logManager.getOrCreateLog(
    new TopicPartition(topic, partitionId)
  )
  
  /**
   * 处理生产者的写入请求
   * @param messages 消息集合
   * @return 写入的起始位移
   */
  def appendRecordsToLeader(messages: MemoryRecords): LogAppendInfo = {
    // 1. 检查是否为 Leader
    if (!isLeader) {
      throw new NotLeaderForPartitionException
    }
    
    // 2. 写入本地日志
    val appendInfo = log.append(messages, leaderHwChangeMessage)
    
    // 3. 更新高水位(High Watermark)
    // 高水位 = 所有 ISR(同步副本集合)都已同步的位移
    updateLeaderHW(leaderReplicaIdOpt.get)
    
    appendInfo
  }
}
1.3.4 Replica(副本)

副本是 Kafka 高可用的基石,通过主从复制机制保证数据不丢失:

副本类型 职责 读写支持 数据同步
Leader 处理所有读写请求 ✅ 读写 -
Follower 从 Leader 同步数据 ❌ 只读 定时拉取 Leader 数据

关键概念:ISR(In-Sync Replicas):与 Leader 保持同步的副本集合,只有 ISR 中的副本才有资格被选为新 Leader。


🎯 二、分区机制与数据分发策略

2.1 分区写入流程

ZooKeeper Follower Partition Leader Partition Partitioner Producer ZooKeeper Follower Partition Leader Partition Partitioner Producer alt [ACKs=all] [ACKs=1] 如果网络超时,Producer 会重试 计算分区号(消息key哈希) 返回目标分区 发送消息(带 ACKs=1) 写入本地日志 推送消息到 ISR 确认已同步 更新高水位 返回成功(带位移) 返回成功(仅写入 Leader)

2.2 默认分区器源码解析

Kafka 默认使用 DefaultPartitioner,分区策略:

  1. 指定分区:直接使用(不常见)
  2. 有 Keyhash(key) % partition_count(保证相同 Key 到同一分区)
  3. 无 KeySticky Partitioning(粘性分区,随机选分区后用满 batch)
// 源码位置:kafka-clients/src/main/java/org/apache/kafka/clients/producer/DefaultPartitioner.java
public class DefaultPartitioner implements Partitioner {
    
    // 粘性分区缓存(每个 Topic 维护一个当前分区)
    private final ConcurrentMap<String, Integer> stickyPartitionCache = new ConcurrentHashMap<>();

    /**
     * 计算消息应该发送到哪个分区
     * @param topic 主题名称
     * @param key 消息键(可为 null)
     * @param keyBytes 消息键序列化后的字节数组
     * @param value 消息值
     * @param valueBytes 消息值序列化后的字节数组
     * @param cluster 集群元数据(包含分区信息)
     * @return 目标分区号
     */
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes,
                        Cluster cluster) {
        
        // 1. 获取主题的所有可用分区
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 2. 如果有 Key,使用哈希取模算法(保证顺序性)
        if (keyBytes != null) {
            // 使用 murmur2 哈希算法(均匀性优于 Java hashCode)
            int hash = Utils.murmur2(keyBytes);
            // 哈希值可能为负数,需要取绝对值后取模
            return Math.abs(hash) % numPartitions;
        }
        
        // 3. 如果没有 Key,使用粘性分区策略(Kafka 2.4+ 优化)
        // 粘性分区:随机选择一个分区,发送满一个 batch 后再切换
        // 优点:减少批量请求次数,提升吞吐量
        Integer cachedPartition = stickyPartitionCache.get(topic);
        if (cachedPartition == null) {
            // 首次发送,随机选择一个分区
            cachedPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) 
                            % numPartitions;
            stickyPartitionCache.put(topic, cachedPartition);
        }
        
        return cachedPartition;
    }
    
    /**
     * 当一个 batch 填满时,调用此方法切换分区
     */
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        // 移除缓存,下次 partition() 时会重新随机选择
        stickyPartitionCache.remove(topic);
    }
}

2.3 自定义分区器实战

场景:日志采集系统,需要根据日志级别将 ERROR 日志发送到独立分区便于优先处理。

// 文件:LogLevelPartitioner.java
package com.example.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

/**
 * 日志级别分区器
 * 
 * 分区策略:
 * - ERROR 级别日志 → 前 10% 分区(高优先级)
 * - WARN 级别日志 → 中间 20% 分区
 * - INFO/DEBUG 级别日志 → 后 70% 分区
 * 
 * 使用方式:
 * props.put("partitioner.class", "com.example.kafka.partitioner.LogLevelPartitioner");
 */
public class LogLevelPartitioner implements Partitioner {
    
    // 分区比例配置
    private static final float ERROR_RATIO = 0.1f;  // ERROR 占 10%
    private static final float WARN_RATIO = 0.2f;   // WARN 占 20%
    
    private int errorEndIndex;   // ERROR 分区结束索引
    private int warnEndIndex;    // WARN 分区结束索引

    /**
     * 初始化分区器(在 Producer 启动时调用一次)
     * @param config 生产者配置
     * @param appName 应用名称
     */
    @Override
    public void configure(Map<String, ?> configs) {
        // 可以从配置中读取自定义参数
        // String errorRatioStr = (String) configs.get("log.partition.error.ratio");
        // this.errorRatio = errorRatioStr != null ? Float.parseFloat(errorRatioStr) : 0.1f;
    }

    /**
     * 核心分区逻辑
     * @param topic 主题名称
     * @param key 日志级别(如 "ERROR"、"WARN"、"INFO")
     * @param keyBytes 日志级别字节数组
     * @param value 日志内容
     * @param valueBytes 日志内容字节数组
     * @param cluster 集群元数据
     * @return 目标分区号
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes,
                        Cluster cluster) {
        
        // 1. 获取主题的分区信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 2. 如果分区数小于 10,退回到默认分区器
        if (numPartitions < 10) {
            return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
        }
        
        // 3. 计算分区边界(只计算一次)
        if (errorEndIndex == 0) {
            errorEndIndex = (int) (numPartitions * ERROR_RATIO);
            warnEndIndex = errorEndIndex + (int) (numPartitions * WARN_RATIO);
        }
        
        // 4. 根据日志级别选择分区
        String logLevel = (key != null) ? key.toString() : "INFO";
        
        int targetPartition;
        switch (logLevel.toUpperCase()) {
            case "ERROR":
                // ERROR 日志:前 10% 分区(随机选择)
                targetPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) 
                                % errorEndIndex;
                break;
                
            case "WARN":
                // WARN 日志:中间 20% 分区
                targetPartition = errorEndIndex + 
                                (Utils.toPositive(ThreadLocalRandom.current().nextInt()) 
                                % (warnEndIndex - errorEndIndex));
                break;
                
            case "INFO":
            case "DEBUG":
            default:
                // INFO/DEBUG:后 70% 分区
                targetPartition = warnEndIndex + 
                                (Utils.toPositive(ThreadLocalRandom.current().nextInt()) 
                                % (numPartitions - warnEndIndex));
                break;
        }
        
        return targetPartition;
    }

    @Override
    public void close() {
        // 清理资源(如果有)
    }

    @Override
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        // 不需要实现粘性分区切换(已按级别分配)
    }
}

完整生产者配置示例

// 文件:KafkaProducerDemo.java
package com.example.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {
    
    public static void main(String[] args) {
        // 1. 配置生产者参数
        Properties props = new Properties();
        
        // ===== 必须配置 =====
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  "localhost:9092,localhost:9093,localhost:9094");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class.getName());
        
        // ===== 自定义分区器 =====
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
                  "com.example.kafka.partitioner.LogLevelPartitioner");
        
        // ===== 可靠性配置 =====
        // ACKs=all(或 -1):Leader + 所有 ISR 副本都确认
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 重试次数(生产环境建议 3+)
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 启用幂等生产者(Exactly Once 的基础)
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        // ===== 性能调优 =====
        // 批量发送大小(默认 16KB,可提升到 32KB)
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
        // 等待时间(默认 0,可设置 10ms 提升批量效果)
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        // 缓冲区大小(默认 32MB)
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
        
        // 2. 创建生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // 3. 发送消息(同步方式)
        try {
            for (int i = 0; i < 100; i++) {
                String logLevel = i % 10 == 0 ? "ERROR" : 
                                 i % 5 == 0 ? "WARN" : "INFO";
                String message = String.format("[%s] Log message %d: System check", 
                                               logLevel, i);
                
                // 构建消息对象
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("application-logs", logLevel, message);
                
                // 同步发送(阻塞等待结果)
                RecordMetadata metadata = producer.send(record).get();
                
                System.out.printf(
                    "Sent message: %s → Topic: %s, Partition: %d, Offset: %d%n",
                    message, metadata.topic(), metadata.partition(), 
                    metadata.offset()
                );
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 4. 关闭生产者(会 flush 所有缓冲数据)
            producer.close();
        }
    }
}

2.4 分区策略对比

策略 实现方式 优点 缺点 适用场景
轮询 round-robin 最大均衡 丢失顺序 无 Key、高吞吐场景
哈希 hash(key) % N 保顺序 扩容需重分区 有 Key、需保序场景
随机 random 简单 不均衡 测试环境
粘性分区 sticky(随机填满) 高吞吐 短期不均衡 高吞吐、无 Key 场景
自定义 用户实现 灵活 复杂度高 特殊业务逻辑

🔄 三、消费者组与再平衡(Rebalance)原理

3.1 消费者组架构

消费者组是 Kafka 实现单播和广播的关键机制

  • 组内单播:同一组内的消费者,每个分区只能被其中一个消费
  • 组间广播:不同组的消费者可以独立消费同一分区的数据

消费者组B(离线分析)

消费者组A(实时处理)

Kafka Topic: order-events(3个分区)

Partition 0

Partition 1

Partition 2

Consumer 1
负责: P0

Consumer 2
负责: P1, P2

Consumer 1
负责: P0, P1, P2

3.2 再平衡(Rebalance)触发时机与流程

再平衡是消费者组重新分配分区的过程,期间会暂停消费

3.2.1 触发条件
触发条件 说明 影响
消费者数变化 新消费者加入/退出/崩溃 分区重新分配
分区数变化 Topic 分区数增加 分区重新分配
消费者主动离组 调用 unsubscribe() 触发再平衡
会话超时 max.poll.interval.ms 内未调用 poll 消费者被踢出
3.2.2 再平衡流程(2.4+ 协调器版本)
Leader Consumer Group Coordinator (Broker) 消费者 Leader Consumer Group Coordinator (Broker) 消费者 第一阶段:加入组 第二阶段:分配分区 第三阶段:同步方案 如果超时未提交位移,会触发再平衡 JoinGroup 请求(发送订阅信息) 选举 Leader(第一个加入的) 返回 Leader ID + 成员列表 执行分区分配策略 SyncGroup 请求(提交分配方案) 返回最终分配结果 开始拉取消息
3.2.3 分区分配策略对比
策略 Range RoundRobin Sticky Cooperative Sticky
分配算法 按范围分段 轮询分配 保序 + 均衡 渐进式再平衡
均衡度 ⭐⭐⭐(可能不均) ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
保序性
再平衡代价 高(全量停止) 中(部分保留) 低(渐进式)
推荐场景 默认 无 Key 优先 保序优先 生产环境首选

3.3 消费者源码解析

// 源码位置:kafka-clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
public class KafkaConsumer<K, V> implements Consumer<K, V> {
    
    // 消费者协调器(负责再平衡、位移提交)
    private final ConsumerCoordinator coordinator;
    
    // 拉取器(负责从 Broker 拉取数据)
    private final Fetcher<K, V> fetcher;

    /**
     * 核心方法:拉取消息
     * @param timeout 超时时间
     * @return 消息记录集合
     */
    public ConsumerRecords<K, V> poll(Duration timeout) {
        // 1. 确保消费者组已加入
        if (!this.subscription.hasAssignedSubscriptions()) {
            throw new IllegalStateException("No subscription assigned");
        }
        
        // 2. 触发再平衡(如果需要)
        this.coordinator.poll(timeout.toMillis());
        
        // 3. 更新消费位移(如果自动提交开启)
        if (this.autoCommitEnabled) {
            this.coordinator.maybeAutoCommitOffsetsAsync(now);
        }
        
        // 4. 从 Broker 拉取数据
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
            this.fetcher.fetchRecords(timeout.toMillis());
        
        return new ConsumerRecords<>(records);
    }

    /**
     * 提交当前位移(手动提交)
     */
    public void commitSync() {
        this.coordinator.commitOffsetsSync(
            this.subscription.allConsumed(), 
            Long.MAX_VALUE
        );
    }
}

3.4 完整消费者实战代码

// 文件:KafkaConsumerDemo.java
package com.example.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumerDemo {
    
    public static void main(String[] args) {
        // 1. 配置消费者参数
        Properties props = new Properties();
        
        // ===== 必须配置 =====
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  "localhost:9092,localhost:9093,localhost:9094");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class.getName());
        
        // ===== 消费者组配置 =====
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processor-group");
        
        // ===== 位移提交策略 =====
        // 手动提交(生产环境推荐)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        // ===== 再平衡配置 =====
        // 会话超时:Broker 检测消费者崩溃的时间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 30s
        // 最大轮询间隔:两次 poll() 调用的最大间隔
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5min
        // 分区分配策略
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
                  "org.apache.kafka.clients.consumer.StickyAssignor");
        
        // ===== 性能调优 =====
        // 单次拉取最大字节数(默认 1MB)
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
        // 单次拉取最大记录数(默认 500)
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

        // 2. 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // 3. 订阅主题(支持正则表达式)
        consumer.subscribe(Collections.singletonList("application-logs"));
        
        // 4. 注册再平衡监听器(用于优雅停机)
        consumer.subscribe(Collections.singletonList("application-logs"),
            new ConsumerRebalanceListener() {
                /**
                 * 再平衡开始前调用(提交位移)
                 */
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.println("Partitions revoked: " + partitions);
                    // 提交当前位移,避免重复消费
                    consumer.commitSync();
                }

                /**
                 * 再平衡完成后调用(重新初始化)
                 */
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.println("Partitions assigned: " + partitions);
                    // 可以在这里读取之前保存的位移
                    // consumer.seekToBeginning(partitions);
                }
            }
        );
        
        // 5. 消费消息主循环
        try {
            while (true) {
                // 拉取消息(超时 100ms)
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(100));
                
                // 处理消息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf(
                        "Received: Topic=%s, Partition=%d, Offset=%d, Key=%s, Value=%s%n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value()
                    );
                    
                    // 业务逻辑:例如写入数据库
                    processRecord(record);
                }
                
                // 手动提交位移(处理完当前批次后)
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 6. 关闭消费者
            consumer.close();
        }
    }
    
    /**
     * 模拟业务处理
     */
    private static void processRecord(ConsumerRecord<String, String> record) {
        // 实际场景:解析日志、过滤、存储到数据库等
        // 这里简化为打印
        String logLevel = record.key();
        String message = record.value();
        
        // 示例:统计 ERROR 级别日志
        if ("ERROR".equals(logLevel)) {
            // 发送告警通知...
            System.err.println("ERROR detected: " + message);
        }
    }
}

💎 四、消息语义保证:从 At Most Once 到 Exactly Once

4.1 三种语义对比

语义 含义 实现方式 代价 适用场景
At Most Once 最多一次 消费后不提交位移 可能丢数据 可容忍丢失(日志统计)
At Least Once 至少一次 消费后提交位移 可能重复消费 默认语义,推荐
Exactly Once 精确一次 事务 + 幂等 性能降低 金融交易、库存扣减

4.2 Exactly Once 的三层保障

数据流转

Exactly Once 三层保障

只读已提交消息

Producer 幂等性
enable.idempotence=true

Transactional API
initTransaction/commit

Consumer 事务隔离
isolation.level=read_committed

消息写入

Offset 更新

数据库事务

4.2.1 Producer 幂等性
// 源码位置:kafka-clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
class Sender implements Runnable {
    
    // 幂等序列号(每个分区维护一个)
    private final ProducerIdAndEpoch producerIdAndEpoch;
    private final Map<TopicPartition, Integer> sequenceNumbers;
    
    /**
     * 发送消息时自动附加序列号
     */
    private void sendProducerData() {
        for (ProducerBatch batch : batches) {
            // 获取分区当前序列号
            int sequence = sequenceNumbers.get(batch.topicPartition);
            
            // 构建请求(附加 PID 和序列号)
            ProduceRequest request = new ProduceRequest.Builder(
                producerIdAndEpoch.producerId,
                producerIdAndEpoch.epoch,
                sequence,
                batch.records
            ).build();
            
            // 序列号自增
            sequenceNumbers.put(batch.topicPartition, sequence + 1);
        }
    }
}

幂等性配置示例

Properties props = new Properties();
// 启用幂等生产者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 以下参数会自动配置,无需手动设置:
// - ACKS_CONFIG = "all"
// - RETRIES_CONFIG = Integer.MAX_VALUE
// - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
4.2.2 跨分区事务
// 文件:TransactionalProducerDemo.java
package com.example.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 跨分区事务示例
 * 
 * 场景:从源 Topic 读取数据,处理后写入目标 Topic
 * 保证:源 Topic 消费 + 目标 Topic 写入 + 位移提交 三者原子性
 */
public class TransactionalProducerDemo {
    
    public static void main(String[] args) {
        // 1. 配置生产者(事务模式)
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                          "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                          StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                          StringSerializer.class.getName());
        
        // ===== 事务配置 =====
        // 设置事务 ID(必须唯一)
        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
                          "order-processor-1");
        // ACKs 必须为 all
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        // 启用幂等性(事务的前提)
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // 2. 配置消费者(只读已提交消息)
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                          "localhost:9092");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                          StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                          StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
                          "order-processor-group");
        // 只读取已提交的消息(过滤掉回滚的事务)
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
                          "read_committed");
        // 禁用自动提交(由生产者事务控制)
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 3. 创建生产者和消费者
        KafkaProducer<String, String> producer = 
            new KafkaProducer<>(producerProps);
        KafkaConsumer<String, String> consumer = 
            new KafkaConsumer<>(consumerProps);

        try {
            // 4. 初始化事务(必须首先调用)
            producer.initTransactions();
            
            // 5. 订阅源 Topic
            consumer.subscribe(Collections.singletonList("orders"));

            while (true) {
                // 6. 拉取消息
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));

                if (records.isEmpty()) {
                    continue;
                }

                // 7. 开始事务
                producer.beginTransaction();

                try {
                    // 8. 处理消息并写入目标 Topic
                    for (ConsumerRecord<String, String> record : records) {
                        String orderId = record.key();
                        String orderData = record.value();

                        // 业务逻辑:例如金额计算、库存检查
                        String processedData = processOrder(orderData);

                        // 写入目标 Topic
                        ProducerRecord<String, String> outputRecord =
                            new ProducerRecord<>("validated-orders",
                                                orderId, processedData);
                        producer.send(outputRecord);
                    }

                    // 9. 提交消费位移(包含在事务中)
                    Map<TopicPartition, OffsetAndMetadata> offsets =
                        Collections.singletonMap(
                            new TopicPartition("orders", 0),
                            new OffsetAndMetadata(records.offset() + 1)
                        );

                    // 10. 发送位移到事务(关键步骤)
                    producer.sendOffsetsToTransaction(
                        offsets,
                        consumer.groupMetadata()
                    );

                    // 11. 提交事务(消息 + 位移原子性提交)
                    producer.commitTransaction();

                    System.out.println("Transaction committed successfully");

                } catch (Exception e) {
                    // 12. 回滚事务(消息不发送,位移不提交)
                    producer.abortTransaction();
                    System.err.println("Transaction aborted: " + e.getMessage());
                }
            }

        } finally {
            producer.close();
            consumer.close();
        }
    }

    /**
     * 模拟订单处理逻辑
     */
    private static String processOrder(String orderData) {
        // 实际场景:金额计算、库存检查、风控规则等
        return "PROCESSED:" + orderData;
    }
}

4.3 语义保证机制对比

机制 Producer 幂等 跨分区事务 消费者隔离 数据库事务
保证级别 单分区 Exactly Once 多分区 Exactly Once 防读未提交 端到端 Exactly Once
性能开销
配置复杂度 简单 中等 简单 复杂
典型场景 单分区日志 数据流转 消费过滤 金融交易

🚀 五、生产实战案例:高吞吐与顺序保证

5.1 场景一:高吞吐日志采集系统

需求:每秒处理 100 万条日志,容忍少量乱序,优先保证吞吐量。

消费者层

Kafka 集群

生产者层

应用1
日志

应用2
日志

应用N
日志

Topic: logs
100个分区

消费者组1
实时告警
10个消费者

消费者组2
离线分析
20个消费者

消费者组3
归档存储
5个消费者

5.1.1 生产者配置(高吞吐优化)
// 文件:HighThroughputProducer.java
package com.example.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 高吞吐生产者
 * 
 * 优化策略:
 * 1. 粘性分区(Sticky Partitioning)
 * 2. 增大 batch.size 和 linger.ms
 * 3. 异步发送 + 批量回调
 * 4. 压缩(compression.type=snappy)
 */
public class HighThroughputProducer {
    
    private final KafkaProducer<String, String> producer;
    private final AtomicInteger successCounter = new AtomicInteger(0);
    private final AtomicInteger failureCounter = new AtomicInteger(0);

    public HighThroughputProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  "localhost:9092,localhost:9093,localhost:9094");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class.getName());

        // ===== 高吞吐优化配置 =====
        // 1. 批量大小:32KB(默认 16KB)
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
        // 2. 等待时间:20ms(默认 0,允许更大数据批量)
        props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
        // 3. 压缩类型:Snappy(CPU 消耗低,压缩比适中)
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        // 4. 缓冲区大小:64MB(默认 32MB)
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
        // 5. 最大并发请求数:5(默认 5)
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        // 6. ACKs=1(Leader 确认即可,容忍少量丢失)
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        // 7. 重试次数:3(避免无限重试阻塞)
        props.put(ProducerConfig.RETRIES_CONFIG, 3);

        this.producer = new KafkaProducer<>(props);
    }

    /**
     * 异步发送消息(批量回调处理)
     */
    public void sendAsync(String topic, String key, String value) {
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(topic, key, value);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    // 成功:计数
                    int count = successCounter.incrementAndGet();
                    if (count % 10000 == 0) {
                        System.out.println("Successfully sent: " + count);
                    }
                } else {
                    // 失败:记录错误
                    failureCounter.incrementAndGet();
                    System.err.println("Failed to send: " + exception.getMessage());
                }
            }
        });
    }

    /**
     * 批量发送接口
     */
    public void sendBatch(String topic, 
                         java.util.List<String> messages) {
        for (String msg : messages) {
            sendAsync(topic, null, msg);
        }
    }

    public void close() {
        producer.close();
        System.out.printf(
            "Final stats: Success=%d, Failure=%d%n",
            successCounter.get(), failureCounter.get()
        );
    }

    public static void main(String[] args) throws InterruptedException {
        HighThroughputProducer producer = new HighThroughputProducer();
        ExecutorService executor = Executors.newFixedThreadPool(10);

        // 模拟 10 个线程并发发送
        for (int i = 0; i < 10; i++) {
            final int threadId = i;
            executor.submit(() -> {
                for (int j = 0; j < 100000; j++) {
                    String msg = String.format(
                        "[Thread-%d] Log message %d: timestamp=%d",
                        threadId, j, System.currentTimeMillis()
                    );
                    producer.sendAsync("logs", null, msg);
                }
            });
        }

        Thread.sleep(60000); // 运行 1 分钟
        producer.close();
        executor.shutdown();
    }
}
5.1.2 消费者配置(多线程并发消费)
// 文件:HighThroughputConsumer.java
package com.example.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 高吞吐消费者
 * 
 * 优化策略:
 * 1. 增大 max.poll.records(减少 poll 调用频率)
 * 2. 多线程处理(消费者线程 + 业务线程池)
 * 3. 手动提交位移(精确控制提交时机)
 */
public class HighThroughputConsumer {
    
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executorService;
    private final int threadPoolSize = 16;

    public HighThroughputConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
                  "log-processor-group");

        // ===== 高吞吐优化配置 =====
        // 1. 单次拉取最大记录数:1000(默认 500)
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        // 2. 单次拉取最大字节数:2MB(默认 1MB)
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
                  2097152);
        // 3. 会话超时:60s(避免网络抖动触发再平衡)
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
        // 4. 心跳间隔:10s(加快故障检测)
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
        // 5. 手动提交位移
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        this.consumer = new KafkaConsumer<>(props);
        // 业务处理线程池
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
    }

    public void consume() {
        consumer.subscribe(Collections.singletonList("logs"));

        try {
            while (true) {
                // 拉取消息
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(100));

                if (records.isEmpty()) {
                    continue;
                }

                // 提交到线程池异步处理
                records.forEach(record -> {
                    executorService.submit(() -> {
                        try {
                            // 业务处理(例如写入 ES)
                            processRecord(record);
                        } catch (Exception e) {
                            System.err.println("Process failed: " + e.getMessage());
                        }
                    });
                });

                // 手动提交位移(处理完成后)
                consumer.commitSync();
            }
        } finally {
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.SECONDS);
            consumer.close();
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // 实际场景:解析日志、索引、存储等
        // 这里简化为打印
        System.out.println("Processing: " + record.value());
    }

    public static void main(String[] args) {
        HighThroughputConsumer consumer = new HighThroughputConsumer();
        consumer.consume();
    }
}

5.2 场景二:订单状态变更(严格顺序保证)

需求:订单状态必须按时间顺序处理(创建→支付→发货→完成)。

订单消费者 Kafka Cluster 订单系统 订单消费者 Kafka Cluster 订单系统 相同 orderId 路由到同一分区 单线程消费保证顺序处理 发送消息(Key=orderId) 分区内的消息有序 处理订单1(创建) 处理订单1(支付) 处理订单1(发货)
5.2.1 生产者代码(使用 Key 保证顺序)
// 文件:OrderedProducer.java
package com.example.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 顺序保证生产者
 * 
 * 关键点:
 * 1. 使用 orderId 作为 Key
 * 2. 相同 orderId 的消息会路由到同一分区
 * 3. 分区内保证顺序性
 */
public class OrderedProducer {
    
    private final KafkaProducer<String, String> producer;

    public OrderedProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                  StringSerializer.class.getName());

        // ===== 顺序保证配置 =====
        // 1. ACKs=all(确保不丢失)
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        // 2. 重试次数:Integer.MAX_VALUE(确保不因临时故障失败)
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        // 3. 最大并发请求数:1(避免乱序)
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        // 4. 启用幂等性(防止重试导致重复)
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        this.producer = new KafkaProducer<>(props);
    }

    /**
     * 发送订单状态变更(同步方式)
     * 
     * @param orderId 订单ID(作为 Key 保证顺序)
     * @param status 订单状态
     */
    public void sendOrderStatus(String orderId, String status) {
        // 构建消息(使用 orderId 作为 Key)
        ProducerRecord<String, String> record = new ProducerRecord<>(
            "order-status-changes", // Topic
            orderId,                 // Key(决定分区)
            status                   // Value
        );

        try {
            // 同步发送(阻塞等待确认)
            RecordMetadata metadata = producer.send(record).get();

            System.out.printf(
                "Sent: OrderId=%s, Status=%s → Partition=%d, Offset=%d%n",
                orderId, status, metadata.partition(), metadata.offset()
            );

        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Failed to send: " + e.getMessage());
        }
    }

    public void close() {
        producer.close();
    }

    public static void main(String[] args) {
        OrderedProducer producer = new OrderedProducer();

        // 模拟订单状态流转
        String orderId = "ORD-20240426-001";
        producer.sendOrderStatus(orderId, "CREATED");
        producer.sendOrderStatus(orderId, "PAID");
        producer.sendOrderStatus(orderId, "SHIPPED");
        producer.sendOrderStatus(orderId, "COMPLETED");

        producer.close();
    }
}
5.2.2 消费者代码(单线程消费)
// 文件:OrderedConsumer.java
package com.example.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 顺序保证消费者
 * 
 * 关键点:
 * 1. 单线程消费(避免多线程乱序)
 * 2. 同步处理(确保当前消息处理完成后再处理下一条)
 * 3. 手动提交位移(处理完成后才提交)
 */
public class OrderedConsumer {
    
    private final KafkaConsumer<String, String> consumer;

    public OrderedConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                  "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                  StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, 
                  "order-processor-group");

        // ===== 顺序保证配置 =====
        // 1. 手动提交位移
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 2. 会话超时:30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        // 3. 最大轮询间隔:5分钟
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");

        this.consumer = new KafkaConsumer<>(props);
    }

    public void consume() {
        // 订阅 Topic
        consumer.subscribe(Collections.singletonList("order-status-changes"));

        try {
            while (true) {
                // 拉取消息
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    String orderId = record.key();
                    String status = record.value();

                    System.out.printf(
                        "Received: OrderId=%s, Status=%s, Partition=%d, Offset=%d%n",
                        orderId, status, record.partition(), record.offset()
                    );

                    // 同步处理订单状态(确保顺序)
                    processOrderStatus(orderId, status);
                }

                // 批次处理完成后提交位移
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }

    /**
     * 处理订单状态(同步)
     */
    private void processOrderStatus(String orderId, String status) {
        // 实际场景:更新数据库、调用下游服务等
        System.out.println("Processing order " + orderId + " status: " + status);

        // 模拟业务处理耗时
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        OrderedConsumer consumer = new OrderedConsumer();
        consumer.consume();
    }
}

5.3 性能优化对比总结

优化维度 高吞吐场景 顺序保证场景
分区策略 无 Key + Sticky 有 Key + Hash
ACKs 1(Leader) all(全部副本)
重试 有限次数 无限重试 + 幂等
并发数 max.in.flight=5 max.in.flight=1
发送方式 异步批量 同步单条
消费模型 多线程 单线程
吞吐量 ⭐⭐⭐⭐⭐ ⭐⭐⭐
顺序性 ❌ 不保证 ✅ 严格保证

📌 六、生产环境最佳实践

6.1 分区数规划

分区数决策流程

约 10MB/s

计算目标吞吐量
例如: 100万条/秒

单分区最大吞吐

计算所需分区数
目标吞吐 / 单分区吞吐

考虑未来增长
乘以 1.5-2 倍

向上取 2 的幂次
例如: 12 → 16

监控实际性能
动态调整

经验公式

分区数 = max(
    目标吞吐量 / 单分区最大吞吐,
    消费者数量
)

示例

  • 目标吞吐:100 万条/秒
  • 单分区吞吐:10 万条/秒
  • 消费者数:8 个

分区数 = max(100/10, 8) = max(10, 8) = 10 个

实际设置为 16 个(2 的幂次,便于扩展)

6.2 监控指标一览

指标类别 关键指标 告警阈值 含义
生产者 record-send-rate < 目标值的 50% 发送速率过低
生产者 request-latency-avg > 100ms 延迟过高
消费者 records-lag-max > 10000 消费堆积严重
消费者 fetch-rate < 目标值的 50% 消费速率过低
Broker UnderReplicatedPartitions > 0 副本同步异常
Broker ActiveControllerCount ≠ 1 控制器异常

6.3 常见生产问题排查

问题 1:消息丢失

原因排查

  1. 生产者 ACKs=1(Leader 确认后即认为成功,Follower 未同步)
  2. 副本数 < 3(单点故障风险)
  3. min.insync.replicas 设置过低(默认 1)

解决方案

# 生产者配置
acks=all
retries=3

# Broker 配置
min.insync.replicas=2  # 至少 2 个副本同步才算成功
default.replication.factor=3  # 默认副本数
问题 2:消费堆积

原因排查

  1. 消费者处理速度 < 生产速度
  2. 消费者线程数不足
  3. 数据库/外部服务慢

解决方案

// 1. 增加消费者数量(不超过分区数)
// 2. 增大线程池
executorService = Executors.newFixedThreadPool(32);

// 3. 批量处理
consumer.poll(Duration.ofMillis(1000)).forEach(record -> {
    // 批量写入数据库
    batch.add(record);
    if (batch.size() >= 1000) {
        database.batchInsert(batch);
        batch.clear();
    }
});
问题 3:再平衡频繁

原因排查

  1. max.poll.interval.ms 过短(默认 5 分钟)
  2. 消费者 GC 停顿时间过长
  3. 网络抖动导致心跳超时

解决方案

# 增大最大轮询间隔
max.poll.interval.ms=300000  # 5 分钟

# 减小单次拉取记录数(加快处理速度)
max.poll.records=100

# 增大会话超时时间
session.timeout.ms=30000  # 30 秒

🎓 七、总结与进阶学习

7.1 核心知识点回顾

主题 核心要点 生产建议
分区机制 并行能力的基础 根据吞吐量规划,初始保守设置
消费者组 单播 + 广播的关键 每个独立应用使用独立组
再平衡 分区重新分配过程 使用 Sticky 策略减少影响
语义保证 幂等 + 事务 + 隔离 金融场景开启 Exactly Once
性能优化 批量 + 压缩 + 多线程 监控指标驱动优化

7.2 进阶学习路径

掌握基础
生产消费模型

理解核心
分区与再平衡

精通高阶
事务与 Exactly Once

生产实战
性能调优与监控

源码级
深入 Kafka 内核

推荐资源

  1. 官方文档

  2. 源码阅读

    • kafka/core/src/main/scala/kafka/cluster/Partition.scala
    • kafka-clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
    • kafka-clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  3. 实战项目

    • 搭建 3 节点 Kafka 集群
    • 实现 Exactly Once 事务
    • 编写自定义分区器
    • 监控系统搭建(Kafka Manager + Prometheus + Grafana)

💬 互动与讨论

思考题

  1. ❓ 如果你的 Kafka 集群有 100 个分区,但只有 3 个消费者,如何保证负载均衡?
  2. ❓ 在订单状态流转场景中,如果某个订单的处理逻辑特别慢(例如调用第三方 API 超时),如何避免阻塞其他订单的处理?
  3. ❓ Kafka 的 Exactly Once 能否保证跨多个 Topic 的端到端一致性?为什么?

欢迎在评论区分享你的见解! 如果本文对你有帮助,请点赞、收藏、转发三连支持~关注我,获取更多 Kafka 及大数据技术干货!


参考资料

  1. Apache Kafka 官方文档 (v3.6)
  2. 《Kafka 权威指南》第 2 版
  3. Kafka 源码仓库

源码版本标注

  • Kafka Core:3.6.2 (kafka/cluster/Partition.scala)
  • Kafka Clients:3.6.2 (clients/producer/KafkaProducer.java, clients/consumer/KafkaConsumer.java)
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐