Apache Iceberg 数据湖:表格式与时间旅行
Apache Kafka流处理架构:分区与消费者组深度解析
源码版本:Apache Kafka 3.6.x(kafka/core/src/main/scala/kafka/cluster/Partition.scala)
阅读时间:约 25 分钟
难度等级:⭐⭐⭐⭐☆
📚 前言:为什么 Kafka 如此重要?
在当今流数据爆炸的时代,Apache Kafka 已成为企业级流处理平台的事实标准。从 LinkedIn 的 1 万亿级消息日处理量,到 Uber 的实时事件系统,Kafka 凭借其高吞吐、低延迟、可扩展的特性,支撑着全球最严苛的生产环境。
但很多开发者对 Kafka 的理解停留在"会调用 API"层面:
- ❌ 不清楚为什么需要消费者组?
- ❌ 不理解分区如何影响性能?
- ❌ 不掌握再平衡的触发时机?
- ❌ 更不懂得如何保证Exactly Once 语义?
本文将从架构设计的角度,深入剖析 Kafka 的核心机制,帮助你构建完整的知识体系。
🔥 一、Kafka 架构与核心概念:全景视图
1.1 整体架构图
1.2 核心概念对比
| 概念 | 职责 | 关键特性 | 生产建议 |
|---|---|---|---|
| Broker | Kafka 服务器节点 | 存储+转发消息,水平扩展 | 至少 3 台,奇数台便于选举 |
| Topic | 逻辑消息分类 | 逻辑概念,非物理存储 | 按业务域划分,避免过大 |
| Partition | 物理存储单元 | 顺序写入,并行消费 | 根据吞吐量设置,初始保守 |
| Replica | 数据副本机制 | Leader 读写,Follower 同步 | 生产环境至少 3 副本 |
| Consumer Group | 逻辑消费者组 | 组内单消费,组间广播 | 每个独立消费者应用使用独立组 |
1.3 核心概念详解
1.3.1 Broker(代理服务器)
Broker 是 Kafka 集群的核心节点,每个 Broker 都有唯一标识(broker.id),主要职责:
- 存储消息:消息以追加日志(Append-Only Log)形式持久化到磁盘
- 响应请求:处理生产者的写入请求和消费者的读取请求
- 副本同步:作为 Leader 时处理读写,作为 Follower 时从 Leader 拉取数据
// 源码位置:kafka/core/src/main/scala/kafka/server/KafkaServer.scala
// Kafka Broker 启动入口
def startup(): Unit = {
try {
// 1. 启动定时任务管理器
kafkaScheduler.startup()
// 2. 启动日志管理器(负责消息存储)
logManager.startup()
// 3. 启动副本管理器(负责副本同步)
replicaManager.startup()
// 4. 启动网络处理器(处理客户端请求)
socketServer.startup()
// 5. 启动请求处理模块
requestHandlerPool.startup()
info("Kafka Server started")
}
}
1.3.2 Topic(主题)
Topic 是逻辑上的消息分类,本质上是分区的命名容器:
- 命名规范:建议使用
.分隔业务域,如order.created、user.login - 保留策略:基于时间(7 天)或大小(1GB)自动清理
- 分区数:创建后可增加但不减少(因为涉及哈希重分配)
1.3.3 Partition(分区)
分区是 Kafka 并行能力的核心,每个分区是一个有序的、不可变的消息序列:
// 源码位置:kafka/core/src/main/scala/kafka/cluster/Partition.scala
class Partition(val topic: String,
val partitionId: Int,
time: Time) extends Logging {
// 分区的副本集合
private val replicaStateMachine = new ReplicaStateMachine
// 分区的 Leader 副本
private var leaderReplicaIdOpt: Option[Int] = None
// 分区的所有消息日志
private val log: Log = logManager.getOrCreateLog(
new TopicPartition(topic, partitionId)
)
/**
* 处理生产者的写入请求
* @param messages 消息集合
* @return 写入的起始位移
*/
def appendRecordsToLeader(messages: MemoryRecords): LogAppendInfo = {
// 1. 检查是否为 Leader
if (!isLeader) {
throw new NotLeaderForPartitionException
}
// 2. 写入本地日志
val appendInfo = log.append(messages, leaderHwChangeMessage)
// 3. 更新高水位(High Watermark)
// 高水位 = 所有 ISR(同步副本集合)都已同步的位移
updateLeaderHW(leaderReplicaIdOpt.get)
appendInfo
}
}
1.3.4 Replica(副本)
副本是 Kafka 高可用的基石,通过主从复制机制保证数据不丢失:
| 副本类型 | 职责 | 读写支持 | 数据同步 |
|---|---|---|---|
| Leader | 处理所有读写请求 | ✅ 读写 | - |
| Follower | 从 Leader 同步数据 | ❌ 只读 | 定时拉取 Leader 数据 |
关键概念:ISR(In-Sync Replicas):与 Leader 保持同步的副本集合,只有 ISR 中的副本才有资格被选为新 Leader。
🎯 二、分区机制与数据分发策略
2.1 分区写入流程
2.2 默认分区器源码解析
Kafka 默认使用 DefaultPartitioner,分区策略:
- 指定分区:直接使用(不常见)
- 有 Key:
hash(key) % partition_count(保证相同 Key 到同一分区) - 无 Key:Sticky Partitioning(粘性分区,随机选分区后用满 batch)
// 源码位置:kafka-clients/src/main/java/org/apache/kafka/clients/producer/DefaultPartitioner.java
public class DefaultPartitioner implements Partitioner {
// 粘性分区缓存(每个 Topic 维护一个当前分区)
private final ConcurrentMap<String, Integer> stickyPartitionCache = new ConcurrentHashMap<>();
/**
* 计算消息应该发送到哪个分区
* @param topic 主题名称
* @param key 消息键(可为 null)
* @param keyBytes 消息键序列化后的字节数组
* @param value 消息值
* @param valueBytes 消息值序列化后的字节数组
* @param cluster 集群元数据(包含分区信息)
* @return 目标分区号
*/
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
// 1. 获取主题的所有可用分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 2. 如果有 Key,使用哈希取模算法(保证顺序性)
if (keyBytes != null) {
// 使用 murmur2 哈希算法(均匀性优于 Java hashCode)
int hash = Utils.murmur2(keyBytes);
// 哈希值可能为负数,需要取绝对值后取模
return Math.abs(hash) % numPartitions;
}
// 3. 如果没有 Key,使用粘性分区策略(Kafka 2.4+ 优化)
// 粘性分区:随机选择一个分区,发送满一个 batch 后再切换
// 优点:减少批量请求次数,提升吞吐量
Integer cachedPartition = stickyPartitionCache.get(topic);
if (cachedPartition == null) {
// 首次发送,随机选择一个分区
cachedPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt())
% numPartitions;
stickyPartitionCache.put(topic, cachedPartition);
}
return cachedPartition;
}
/**
* 当一个 batch 填满时,调用此方法切换分区
*/
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
// 移除缓存,下次 partition() 时会重新随机选择
stickyPartitionCache.remove(topic);
}
}
2.3 自定义分区器实战
场景:日志采集系统,需要根据日志级别将 ERROR 日志发送到独立分区便于优先处理。
// 文件:LogLevelPartitioner.java
package com.example.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
/**
* 日志级别分区器
*
* 分区策略:
* - ERROR 级别日志 → 前 10% 分区(高优先级)
* - WARN 级别日志 → 中间 20% 分区
* - INFO/DEBUG 级别日志 → 后 70% 分区
*
* 使用方式:
* props.put("partitioner.class", "com.example.kafka.partitioner.LogLevelPartitioner");
*/
public class LogLevelPartitioner implements Partitioner {
// 分区比例配置
private static final float ERROR_RATIO = 0.1f; // ERROR 占 10%
private static final float WARN_RATIO = 0.2f; // WARN 占 20%
private int errorEndIndex; // ERROR 分区结束索引
private int warnEndIndex; // WARN 分区结束索引
/**
* 初始化分区器(在 Producer 启动时调用一次)
* @param config 生产者配置
* @param appName 应用名称
*/
@Override
public void configure(Map<String, ?> configs) {
// 可以从配置中读取自定义参数
// String errorRatioStr = (String) configs.get("log.partition.error.ratio");
// this.errorRatio = errorRatioStr != null ? Float.parseFloat(errorRatioStr) : 0.1f;
}
/**
* 核心分区逻辑
* @param topic 主题名称
* @param key 日志级别(如 "ERROR"、"WARN"、"INFO")
* @param keyBytes 日志级别字节数组
* @param value 日志内容
* @param valueBytes 日志内容字节数组
* @param cluster 集群元数据
* @return 目标分区号
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
// 1. 获取主题的分区信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 2. 如果分区数小于 10,退回到默认分区器
if (numPartitions < 10) {
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}
// 3. 计算分区边界(只计算一次)
if (errorEndIndex == 0) {
errorEndIndex = (int) (numPartitions * ERROR_RATIO);
warnEndIndex = errorEndIndex + (int) (numPartitions * WARN_RATIO);
}
// 4. 根据日志级别选择分区
String logLevel = (key != null) ? key.toString() : "INFO";
int targetPartition;
switch (logLevel.toUpperCase()) {
case "ERROR":
// ERROR 日志:前 10% 分区(随机选择)
targetPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt())
% errorEndIndex;
break;
case "WARN":
// WARN 日志:中间 20% 分区
targetPartition = errorEndIndex +
(Utils.toPositive(ThreadLocalRandom.current().nextInt())
% (warnEndIndex - errorEndIndex));
break;
case "INFO":
case "DEBUG":
default:
// INFO/DEBUG:后 70% 分区
targetPartition = warnEndIndex +
(Utils.toPositive(ThreadLocalRandom.current().nextInt())
% (numPartitions - warnEndIndex));
break;
}
return targetPartition;
}
@Override
public void close() {
// 清理资源(如果有)
}
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
// 不需要实现粘性分区切换(已按级别分配)
}
}
完整生产者配置示例:
// 文件:KafkaProducerDemo.java
package com.example.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerDemo {
public static void main(String[] args) {
// 1. 配置生产者参数
Properties props = new Properties();
// ===== 必须配置 =====
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// ===== 自定义分区器 =====
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"com.example.kafka.partitioner.LogLevelPartitioner");
// ===== 可靠性配置 =====
// ACKs=all(或 -1):Leader + 所有 ISR 副本都确认
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试次数(生产环境建议 3+)
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 启用幂等生产者(Exactly Once 的基础)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// ===== 性能调优 =====
// 批量发送大小(默认 16KB,可提升到 32KB)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
// 等待时间(默认 0,可设置 10ms 提升批量效果)
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// 缓冲区大小(默认 32MB)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
// 2. 创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 3. 发送消息(同步方式)
try {
for (int i = 0; i < 100; i++) {
String logLevel = i % 10 == 0 ? "ERROR" :
i % 5 == 0 ? "WARN" : "INFO";
String message = String.format("[%s] Log message %d: System check",
logLevel, i);
// 构建消息对象
ProducerRecord<String, String> record =
new ProducerRecord<>("application-logs", logLevel, message);
// 同步发送(阻塞等待结果)
RecordMetadata metadata = producer.send(record).get();
System.out.printf(
"Sent message: %s → Topic: %s, Partition: %d, Offset: %d%n",
message, metadata.topic(), metadata.partition(),
metadata.offset()
);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 4. 关闭生产者(会 flush 所有缓冲数据)
producer.close();
}
}
}
2.4 分区策略对比
| 策略 | 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 轮询 | round-robin | 最大均衡 | 丢失顺序 | 无 Key、高吞吐场景 |
| 哈希 | hash(key) % N | 保顺序 | 扩容需重分区 | 有 Key、需保序场景 |
| 随机 | random | 简单 | 不均衡 | 测试环境 |
| 粘性分区 | sticky(随机填满) | 高吞吐 | 短期不均衡 | 高吞吐、无 Key 场景 |
| 自定义 | 用户实现 | 灵活 | 复杂度高 | 特殊业务逻辑 |
🔄 三、消费者组与再平衡(Rebalance)原理
3.1 消费者组架构
消费者组是 Kafka 实现单播和广播的关键机制:
- 组内单播:同一组内的消费者,每个分区只能被其中一个消费
- 组间广播:不同组的消费者可以独立消费同一分区的数据
3.2 再平衡(Rebalance)触发时机与流程
再平衡是消费者组重新分配分区的过程,期间会暂停消费。
3.2.1 触发条件
| 触发条件 | 说明 | 影响 |
|---|---|---|
| 消费者数变化 | 新消费者加入/退出/崩溃 | 分区重新分配 |
| 分区数变化 | Topic 分区数增加 | 分区重新分配 |
| 消费者主动离组 | 调用 unsubscribe() | 触发再平衡 |
| 会话超时 | max.poll.interval.ms 内未调用 poll | 消费者被踢出 |
3.2.2 再平衡流程(2.4+ 协调器版本)
3.2.3 分区分配策略对比
| 策略 | Range | RoundRobin | Sticky | Cooperative Sticky |
|---|---|---|---|---|
| 分配算法 | 按范围分段 | 轮询分配 | 保序 + 均衡 | 渐进式再平衡 |
| 均衡度 | ⭐⭐⭐(可能不均) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 保序性 | ✅ | ❌ | ✅ | ✅ |
| 再平衡代价 | 高(全量停止) | 高 | 中(部分保留) | 低(渐进式) |
| 推荐场景 | 默认 | 无 Key 优先 | 保序优先 | 生产环境首选 |
3.3 消费者源码解析
// 源码位置:kafka-clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
public class KafkaConsumer<K, V> implements Consumer<K, V> {
// 消费者协调器(负责再平衡、位移提交)
private final ConsumerCoordinator coordinator;
// 拉取器(负责从 Broker 拉取数据)
private final Fetcher<K, V> fetcher;
/**
* 核心方法:拉取消息
* @param timeout 超时时间
* @return 消息记录集合
*/
public ConsumerRecords<K, V> poll(Duration timeout) {
// 1. 确保消费者组已加入
if (!this.subscription.hasAssignedSubscriptions()) {
throw new IllegalStateException("No subscription assigned");
}
// 2. 触发再平衡(如果需要)
this.coordinator.poll(timeout.toMillis());
// 3. 更新消费位移(如果自动提交开启)
if (this.autoCommitEnabled) {
this.coordinator.maybeAutoCommitOffsetsAsync(now);
}
// 4. 从 Broker 拉取数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
this.fetcher.fetchRecords(timeout.toMillis());
return new ConsumerRecords<>(records);
}
/**
* 提交当前位移(手动提交)
*/
public void commitSync() {
this.coordinator.commitOffsetsSync(
this.subscription.allConsumed(),
Long.MAX_VALUE
);
}
}
3.4 完整消费者实战代码
// 文件:KafkaConsumerDemo.java
package com.example.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
// 1. 配置消费者参数
Properties props = new Properties();
// ===== 必须配置 =====
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// ===== 消费者组配置 =====
props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processor-group");
// ===== 位移提交策略 =====
// 手动提交(生产环境推荐)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// ===== 再平衡配置 =====
// 会话超时:Broker 检测消费者崩溃的时间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 30s
// 最大轮询间隔:两次 poll() 调用的最大间隔
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5min
// 分区分配策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
// ===== 性能调优 =====
// 单次拉取最大字节数(默认 1MB)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
// 单次拉取最大记录数(默认 500)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 2. 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅主题(支持正则表达式)
consumer.subscribe(Collections.singletonList("application-logs"));
// 4. 注册再平衡监听器(用于优雅停机)
consumer.subscribe(Collections.singletonList("application-logs"),
new ConsumerRebalanceListener() {
/**
* 再平衡开始前调用(提交位移)
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// 提交当前位移,避免重复消费
consumer.commitSync();
}
/**
* 再平衡完成后调用(重新初始化)
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
// 可以在这里读取之前保存的位移
// consumer.seekToBeginning(partitions);
}
}
);
// 5. 消费消息主循环
try {
while (true) {
// 拉取消息(超时 100ms)
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"Received: Topic=%s, Partition=%d, Offset=%d, Key=%s, Value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value()
);
// 业务逻辑:例如写入数据库
processRecord(record);
}
// 手动提交位移(处理完当前批次后)
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6. 关闭消费者
consumer.close();
}
}
/**
* 模拟业务处理
*/
private static void processRecord(ConsumerRecord<String, String> record) {
// 实际场景:解析日志、过滤、存储到数据库等
// 这里简化为打印
String logLevel = record.key();
String message = record.value();
// 示例:统计 ERROR 级别日志
if ("ERROR".equals(logLevel)) {
// 发送告警通知...
System.err.println("ERROR detected: " + message);
}
}
}
💎 四、消息语义保证:从 At Most Once 到 Exactly Once
4.1 三种语义对比
| 语义 | 含义 | 实现方式 | 代价 | 适用场景 |
|---|---|---|---|---|
| At Most Once | 最多一次 | 消费后不提交位移 | 可能丢数据 | 可容忍丢失(日志统计) |
| At Least Once | 至少一次 | 消费后提交位移 | 可能重复消费 | 默认语义,推荐 |
| Exactly Once | 精确一次 | 事务 + 幂等 | 性能降低 | 金融交易、库存扣减 |
4.2 Exactly Once 的三层保障
4.2.1 Producer 幂等性
// 源码位置:kafka-clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
class Sender implements Runnable {
// 幂等序列号(每个分区维护一个)
private final ProducerIdAndEpoch producerIdAndEpoch;
private final Map<TopicPartition, Integer> sequenceNumbers;
/**
* 发送消息时自动附加序列号
*/
private void sendProducerData() {
for (ProducerBatch batch : batches) {
// 获取分区当前序列号
int sequence = sequenceNumbers.get(batch.topicPartition);
// 构建请求(附加 PID 和序列号)
ProduceRequest request = new ProduceRequest.Builder(
producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch,
sequence,
batch.records
).build();
// 序列号自增
sequenceNumbers.put(batch.topicPartition, sequence + 1);
}
}
}
幂等性配置示例:
Properties props = new Properties();
// 启用幂等生产者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 以下参数会自动配置,无需手动设置:
// - ACKS_CONFIG = "all"
// - RETRIES_CONFIG = Integer.MAX_VALUE
// - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
4.2.2 跨分区事务
// 文件:TransactionalProducerDemo.java
package com.example.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 跨分区事务示例
*
* 场景:从源 Topic 读取数据,处理后写入目标 Topic
* 保证:源 Topic 消费 + 目标 Topic 写入 + 位移提交 三者原子性
*/
public class TransactionalProducerDemo {
public static void main(String[] args) {
// 1. 配置生产者(事务模式)
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// ===== 事务配置 =====
// 设置事务 ID(必须唯一)
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"order-processor-1");
// ACKs 必须为 all
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
// 启用幂等性(事务的前提)
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 2. 配置消费者(只读已提交消息)
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"order-processor-group");
// 只读取已提交的消息(过滤掉回滚的事务)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed");
// 禁用自动提交(由生产者事务控制)
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 3. 创建生产者和消费者
KafkaProducer<String, String> producer =
new KafkaProducer<>(producerProps);
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProps);
try {
// 4. 初始化事务(必须首先调用)
producer.initTransactions();
// 5. 订阅源 Topic
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
// 6. 拉取消息
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
continue;
}
// 7. 开始事务
producer.beginTransaction();
try {
// 8. 处理消息并写入目标 Topic
for (ConsumerRecord<String, String> record : records) {
String orderId = record.key();
String orderData = record.value();
// 业务逻辑:例如金额计算、库存检查
String processedData = processOrder(orderData);
// 写入目标 Topic
ProducerRecord<String, String> outputRecord =
new ProducerRecord<>("validated-orders",
orderId, processedData);
producer.send(outputRecord);
}
// 9. 提交消费位移(包含在事务中)
Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
new TopicPartition("orders", 0),
new OffsetAndMetadata(records.offset() + 1)
);
// 10. 发送位移到事务(关键步骤)
producer.sendOffsetsToTransaction(
offsets,
consumer.groupMetadata()
);
// 11. 提交事务(消息 + 位移原子性提交)
producer.commitTransaction();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
// 12. 回滚事务(消息不发送,位移不提交)
producer.abortTransaction();
System.err.println("Transaction aborted: " + e.getMessage());
}
}
} finally {
producer.close();
consumer.close();
}
}
/**
* 模拟订单处理逻辑
*/
private static String processOrder(String orderData) {
// 实际场景:金额计算、库存检查、风控规则等
return "PROCESSED:" + orderData;
}
}
4.3 语义保证机制对比
| 机制 | Producer 幂等 | 跨分区事务 | 消费者隔离 | 数据库事务 |
|---|---|---|---|---|
| 保证级别 | 单分区 Exactly Once | 多分区 Exactly Once | 防读未提交 | 端到端 Exactly Once |
| 性能开销 | 低 | 中 | 低 | 高 |
| 配置复杂度 | 简单 | 中等 | 简单 | 复杂 |
| 典型场景 | 单分区日志 | 数据流转 | 消费过滤 | 金融交易 |
🚀 五、生产实战案例:高吞吐与顺序保证
5.1 场景一:高吞吐日志采集系统
需求:每秒处理 100 万条日志,容忍少量乱序,优先保证吞吐量。
5.1.1 生产者配置(高吞吐优化)
// 文件:HighThroughputProducer.java
package com.example.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 高吞吐生产者
*
* 优化策略:
* 1. 粘性分区(Sticky Partitioning)
* 2. 增大 batch.size 和 linger.ms
* 3. 异步发送 + 批量回调
* 4. 压缩(compression.type=snappy)
*/
public class HighThroughputProducer {
private final KafkaProducer<String, String> producer;
private final AtomicInteger successCounter = new AtomicInteger(0);
private final AtomicInteger failureCounter = new AtomicInteger(0);
public HighThroughputProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// ===== 高吞吐优化配置 =====
// 1. 批量大小:32KB(默认 16KB)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
// 2. 等待时间:20ms(默认 0,允许更大数据批量)
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
// 3. 压缩类型:Snappy(CPU 消耗低,压缩比适中)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 4. 缓冲区大小:64MB(默认 32MB)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
// 5. 最大并发请求数:5(默认 5)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 6. ACKs=1(Leader 确认即可,容忍少量丢失)
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 7. 重试次数:3(避免无限重试阻塞)
props.put(ProducerConfig.RETRIES_CONFIG, 3);
this.producer = new KafkaProducer<>(props);
}
/**
* 异步发送消息(批量回调处理)
*/
public void sendAsync(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 成功:计数
int count = successCounter.incrementAndGet();
if (count % 10000 == 0) {
System.out.println("Successfully sent: " + count);
}
} else {
// 失败:记录错误
failureCounter.incrementAndGet();
System.err.println("Failed to send: " + exception.getMessage());
}
}
});
}
/**
* 批量发送接口
*/
public void sendBatch(String topic,
java.util.List<String> messages) {
for (String msg : messages) {
sendAsync(topic, null, msg);
}
}
public void close() {
producer.close();
System.out.printf(
"Final stats: Success=%d, Failure=%d%n",
successCounter.get(), failureCounter.get()
);
}
public static void main(String[] args) throws InterruptedException {
HighThroughputProducer producer = new HighThroughputProducer();
ExecutorService executor = Executors.newFixedThreadPool(10);
// 模拟 10 个线程并发发送
for (int i = 0; i < 10; i++) {
final int threadId = i;
executor.submit(() -> {
for (int j = 0; j < 100000; j++) {
String msg = String.format(
"[Thread-%d] Log message %d: timestamp=%d",
threadId, j, System.currentTimeMillis()
);
producer.sendAsync("logs", null, msg);
}
});
}
Thread.sleep(60000); // 运行 1 分钟
producer.close();
executor.shutdown();
}
}
5.1.2 消费者配置(多线程并发消费)
// 文件:HighThroughputConsumer.java
package com.example.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 高吞吐消费者
*
* 优化策略:
* 1. 增大 max.poll.records(减少 poll 调用频率)
* 2. 多线程处理(消费者线程 + 业务线程池)
* 3. 手动提交位移(精确控制提交时机)
*/
public class HighThroughputConsumer {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executorService;
private final int threadPoolSize = 16;
public HighThroughputConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"log-processor-group");
// ===== 高吞吐优化配置 =====
// 1. 单次拉取最大记录数:1000(默认 500)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
// 2. 单次拉取最大字节数:2MB(默认 1MB)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
2097152);
// 3. 会话超时:60s(避免网络抖动触发再平衡)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
// 4. 心跳间隔:10s(加快故障检测)
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
// 5. 手动提交位移
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.consumer = new KafkaConsumer<>(props);
// 业务处理线程池
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
}
public void consume() {
consumer.subscribe(Collections.singletonList("logs"));
try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
// 提交到线程池异步处理
records.forEach(record -> {
executorService.submit(() -> {
try {
// 业务处理(例如写入 ES)
processRecord(record);
} catch (Exception e) {
System.err.println("Process failed: " + e.getMessage());
}
});
});
// 手动提交位移(处理完成后)
consumer.commitSync();
}
} finally {
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
consumer.close();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// 实际场景:解析日志、索引、存储等
// 这里简化为打印
System.out.println("Processing: " + record.value());
}
public static void main(String[] args) {
HighThroughputConsumer consumer = new HighThroughputConsumer();
consumer.consume();
}
}
5.2 场景二:订单状态变更(严格顺序保证)
需求:订单状态必须按时间顺序处理(创建→支付→发货→完成)。
5.2.1 生产者代码(使用 Key 保证顺序)
// 文件:OrderedProducer.java
package com.example.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 顺序保证生产者
*
* 关键点:
* 1. 使用 orderId 作为 Key
* 2. 相同 orderId 的消息会路由到同一分区
* 3. 分区内保证顺序性
*/
public class OrderedProducer {
private final KafkaProducer<String, String> producer;
public OrderedProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// ===== 顺序保证配置 =====
// 1. ACKs=all(确保不丢失)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 2. 重试次数:Integer.MAX_VALUE(确保不因临时故障失败)
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// 3. 最大并发请求数:1(避免乱序)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
// 4. 启用幂等性(防止重试导致重复)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
this.producer = new KafkaProducer<>(props);
}
/**
* 发送订单状态变更(同步方式)
*
* @param orderId 订单ID(作为 Key 保证顺序)
* @param status 订单状态
*/
public void sendOrderStatus(String orderId, String status) {
// 构建消息(使用 orderId 作为 Key)
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-status-changes", // Topic
orderId, // Key(决定分区)
status // Value
);
try {
// 同步发送(阻塞等待确认)
RecordMetadata metadata = producer.send(record).get();
System.out.printf(
"Sent: OrderId=%s, Status=%s → Partition=%d, Offset=%d%n",
orderId, status, metadata.partition(), metadata.offset()
);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Failed to send: " + e.getMessage());
}
}
public void close() {
producer.close();
}
public static void main(String[] args) {
OrderedProducer producer = new OrderedProducer();
// 模拟订单状态流转
String orderId = "ORD-20240426-001";
producer.sendOrderStatus(orderId, "CREATED");
producer.sendOrderStatus(orderId, "PAID");
producer.sendOrderStatus(orderId, "SHIPPED");
producer.sendOrderStatus(orderId, "COMPLETED");
producer.close();
}
}
5.2.2 消费者代码(单线程消费)
// 文件:OrderedConsumer.java
package com.example.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 顺序保证消费者
*
* 关键点:
* 1. 单线程消费(避免多线程乱序)
* 2. 同步处理(确保当前消息处理完成后再处理下一条)
* 3. 手动提交位移(处理完成后才提交)
*/
public class OrderedConsumer {
private final KafkaConsumer<String, String> consumer;
public OrderedConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"order-processor-group");
// ===== 顺序保证配置 =====
// 1. 手动提交位移
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 2. 会话超时:30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// 3. 最大轮询间隔:5分钟
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
this.consumer = new KafkaConsumer<>(props);
}
public void consume() {
// 订阅 Topic
consumer.subscribe(Collections.singletonList("order-status-changes"));
try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String orderId = record.key();
String status = record.value();
System.out.printf(
"Received: OrderId=%s, Status=%s, Partition=%d, Offset=%d%n",
orderId, status, record.partition(), record.offset()
);
// 同步处理订单状态(确保顺序)
processOrderStatus(orderId, status);
}
// 批次处理完成后提交位移
consumer.commitSync();
}
} finally {
consumer.close();
}
}
/**
* 处理订单状态(同步)
*/
private void processOrderStatus(String orderId, String status) {
// 实际场景:更新数据库、调用下游服务等
System.out.println("Processing order " + orderId + " status: " + status);
// 模拟业务处理耗时
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
OrderedConsumer consumer = new OrderedConsumer();
consumer.consume();
}
}
5.3 性能优化对比总结
| 优化维度 | 高吞吐场景 | 顺序保证场景 |
|---|---|---|
| 分区策略 | 无 Key + Sticky | 有 Key + Hash |
| ACKs | 1(Leader) | all(全部副本) |
| 重试 | 有限次数 | 无限重试 + 幂等 |
| 并发数 | max.in.flight=5 | max.in.flight=1 |
| 发送方式 | 异步批量 | 同步单条 |
| 消费模型 | 多线程 | 单线程 |
| 吞吐量 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| 顺序性 | ❌ 不保证 | ✅ 严格保证 |
📌 六、生产环境最佳实践
6.1 分区数规划
经验公式:
分区数 = max(
目标吞吐量 / 单分区最大吞吐,
消费者数量
)
示例:
- 目标吞吐:100 万条/秒
- 单分区吞吐:10 万条/秒
- 消费者数:8 个
分区数 = max(100/10, 8) = max(10, 8) = 10 个
实际设置为 16 个(2 的幂次,便于扩展)
6.2 监控指标一览
| 指标类别 | 关键指标 | 告警阈值 | 含义 |
|---|---|---|---|
| 生产者 | record-send-rate | < 目标值的 50% | 发送速率过低 |
| 生产者 | request-latency-avg | > 100ms | 延迟过高 |
| 消费者 | records-lag-max | > 10000 | 消费堆积严重 |
| 消费者 | fetch-rate | < 目标值的 50% | 消费速率过低 |
| Broker | UnderReplicatedPartitions | > 0 | 副本同步异常 |
| Broker | ActiveControllerCount | ≠ 1 | 控制器异常 |
6.3 常见生产问题排查
问题 1:消息丢失
原因排查:
- 生产者 ACKs=1(Leader 确认后即认为成功,Follower 未同步)
- 副本数 < 3(单点故障风险)
min.insync.replicas设置过低(默认 1)
解决方案:
# 生产者配置
acks=all
retries=3
# Broker 配置
min.insync.replicas=2 # 至少 2 个副本同步才算成功
default.replication.factor=3 # 默认副本数
问题 2:消费堆积
原因排查:
- 消费者处理速度 < 生产速度
- 消费者线程数不足
- 数据库/外部服务慢
解决方案:
// 1. 增加消费者数量(不超过分区数)
// 2. 增大线程池
executorService = Executors.newFixedThreadPool(32);
// 3. 批量处理
consumer.poll(Duration.ofMillis(1000)).forEach(record -> {
// 批量写入数据库
batch.add(record);
if (batch.size() >= 1000) {
database.batchInsert(batch);
batch.clear();
}
});
问题 3:再平衡频繁
原因排查:
max.poll.interval.ms过短(默认 5 分钟)- 消费者 GC 停顿时间过长
- 网络抖动导致心跳超时
解决方案:
# 增大最大轮询间隔
max.poll.interval.ms=300000 # 5 分钟
# 减小单次拉取记录数(加快处理速度)
max.poll.records=100
# 增大会话超时时间
session.timeout.ms=30000 # 30 秒
🎓 七、总结与进阶学习
7.1 核心知识点回顾
| 主题 | 核心要点 | 生产建议 |
|---|---|---|
| 分区机制 | 并行能力的基础 | 根据吞吐量规划,初始保守设置 |
| 消费者组 | 单播 + 广播的关键 | 每个独立应用使用独立组 |
| 再平衡 | 分区重新分配过程 | 使用 Sticky 策略减少影响 |
| 语义保证 | 幂等 + 事务 + 隔离 | 金融场景开启 Exactly Once |
| 性能优化 | 批量 + 压缩 + 多线程 | 监控指标驱动优化 |
7.2 进阶学习路径
推荐资源:
-
官方文档:
-
源码阅读:
kafka/core/src/main/scala/kafka/cluster/Partition.scalakafka-clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.javakafka-clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
-
实战项目:
- 搭建 3 节点 Kafka 集群
- 实现 Exactly Once 事务
- 编写自定义分区器
- 监控系统搭建(Kafka Manager + Prometheus + Grafana)
💬 互动与讨论
思考题:
- ❓ 如果你的 Kafka 集群有 100 个分区,但只有 3 个消费者,如何保证负载均衡?
- ❓ 在订单状态流转场景中,如果某个订单的处理逻辑特别慢(例如调用第三方 API 超时),如何避免阻塞其他订单的处理?
- ❓ Kafka 的 Exactly Once 能否保证跨多个 Topic 的端到端一致性?为什么?
欢迎在评论区分享你的见解! 如果本文对你有帮助,请点赞、收藏、转发三连支持~关注我,获取更多 Kafka 及大数据技术干货!
参考资料:
- Apache Kafka 官方文档 (v3.6)
- 《Kafka 权威指南》第 2 版
- Kafka 源码仓库
源码版本标注:
- Kafka Core:3.6.2 (kafka/cluster/Partition.scala)
- Kafka Clients:3.6.2 (clients/producer/KafkaProducer.java, clients/consumer/KafkaConsumer.java)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)