【Kafka源码解读和使用指南】第57篇:Kafka身份认证源码解析——SASL/PLAIN认证是怎么实现的
上一篇【第56篇】GroupCoordinator源码解析(二)——GroupMetadataManager持久化
下一篇【第58篇】Kafka权限控制ACL源码解析——谁能读,谁能写,谁说了算
摘要
Kafka从0.9版本开始支持身份认证和权限控制,SASL/PLAIN是最简单(也是最危险——明文传输)的认证方式。它的源码实现涉及JAAS架构、ChannelBuilder体系、Authenticator状态机、Challenge-Response协议等多个组件。
这篇文章从配置入手,一路追到认证完成的最后一行代码:PlainLoginModule怎么从JAAS配置文件读取用户名密码、ChannelBuilder怎么根据security.protocol创建正确的通道、SaslClientAuthenticator的状态机怎么驱动握手和挑战应答流程,以及SaslServerAuthenticator在服务端如何验证客户端身份。读完这篇,你就能理解Kafka身份认证的完整底层逻辑。
一、身份认证 vs 权限控制——两个容易被搞混的概念
在开始源码之前,先把两个概念搞清楚:
| 概念 | 通俗解释 | 在Kafka中的体现 |
|---|---|---|
| 身份认证 (Authentication) | “你是谁?”——验证你的身份 | 客户端和服务端之间建立连接时,验证用户名和密码 |
| 权限控制 (Authorization) | “你能干嘛?”——决定你的权限 | 认证通过后,检查用户有没有读写某个Topic的权限 |
【认证 + 授权 = 完整安全链条】
客户端 服务端
│ │
│ ① "我是xiaoming,密码123" │ (身份认证)
│ ───────────────────────► │
│ ├─ 去JAAS配置文件查
│ │ "xiaoming:/123" 存在吗?
│ ├─ 存在!认证通过
│ │
│ ② "我要读topic test" │ (权限控制)
│ ───────────────────────► │
│ ├─ 查ACL规则
│ │ xiaoming有Read权限?
│ ├─ 有!允许操作
│ │
│◄─────────────────────── │ 返回数据
▼ ▼
简单说:认证是"进门",授权是"拿东西"。先进门,再拿东西。
二、SASL/PLAIN配置——先跑通再用源码拆
在看源码之前,先把环境搭起来。SASL/PLAIN需要配置两端:
2.1 服务端配置
修改config/server.properties:
# 启用SASL/PLAIN认证
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 可选:启用ACL权限控制
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
创建kafka_server_jaas.conf——服务端的"用户密码本":
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_xiaoming="xiaoming";
};
启动时加载JAAS配置:
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
bin/kafka-server-start.sh config/server.properties
2.2 客户端配置
创建kafka_client_jaas.conf:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xiaoming"
password="xiaoming";
};
Java代码中加载:
// 方式1:JVM参数
System.setProperty("java.security.auth.login.config",
"D:/myConf/kafka_client_jaas.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
配置好后,客户端连接时就会走SASL/PLAIN认证流程。下面我们进入源码拆解。
三、JAAS架构——PlainLoginModule的"本职工作"
JAAS(Java Authentication Authorization Service)是Java安全框架的核心,它在应用代码和底层安全机制之间加了一层抽象。核心概念:面向LoginContext编程,底层由LoginModule完成实际认证。
【JAAS分层架构】
┌──────────────────────────────────────────┐
│ Application Code │
│ (KafkaProducer / KafkaConsumer) │
│ │
│ 使用 Login / LoginManager │
└──────────────────┬───────────────────────┘
│
┌──────────────────▼───────────────────────┐
│ LoginContext │
│ (JAAS 的核心调度入口) │
│ - 读取 .conf 配置文件 │
│ - 加载指定的 LoginModule │
│ - 依次调用 login() → commit() │
└──────────────────┬───────────────────────┘
│
┌──────────────────▼───────────────────────┐
│ PlainLoginModule │
│ (Kafka 自定义的 LoginModule) │
│ - initialize(): 读取 username/password │
│ - 把凭证写入 Subject 对象 │
└──────────────────────────────────────────┘
PlainLoginModule的源码非常简单,它只做了一件事:从JAAS配置文件读取用户名密码,塞进Subject对象里:
// PlainLoginModule.java
public class PlainLoginModule implements LoginModule {
private static final String USERNAME_CONFIG = "username";
private static final String PASSWORD_CONFIG = "password";
// 注册Plain SASL服务提供者(服务端用)
static {
PlainSaslServerProvider.initialize();
}
@Override
public void initialize(Subject subject, CallbackHandler callbackHandler,
Map<String, ?> sharedState, Map<String, ?> options) {
// ① 读取用户名 → 放入公钥凭证
String username = (String) options.get(USERNAME_CONFIG);
if (username != null)
subject.getPublicCredentials().add(username);
// ② 读取密码 → 放入私钥凭证
String password = (String) options.get(PASSWORD_CONFIG);
if (password != null)
subject.getPrivateCredentials().add(password);
}
// login() / logout() / commit() 都是空返回
// abort() 始终返回 false
}
关键发现:PlainLoginModule的login()和commit()方法都是空实现!它并没有真正做认证。真正认证是谁干的?答案是后续的Authenticator对象。
Subject对象的三个字段
Subject代表一个"访问主体",有三个核心属性:
┌──────────────────────────────────────────┐
│ Subject │
│ │
│ principals: 用户身份(如用户名) │
│ pubCredentials: 公钥凭证(用户名) │
│ privCredentials: 私钥凭证(密码) │
└──────────────────────────────────────────┘
PlainLoginModule在initialize中把username放进了pubCredentials,password放进了privCredentials。后续认证时从这个Subject里取出来用。
四、Login接口——对LoginContext的二次封装
Kafka没有直接使用JAAS的LoginContext,而是通过Login接口做了封装。类层次关系:
┌──────────────────────────────────────────────────────┐
│ 《interface》 │
│ Login │
│ + configure() // 配置Login对象 │
│ + login() // 执行认证,返回LoginContext │
│ + subject() // 返回Subject │
│ + serviceName() // 服务名,默认"kafka" │
└──────────────────────┬───────────────────────────────┘
│
┌───────────┴───────────┐
│ │
┌──────────▼──────────┐ ┌────────▼──────────┐
│ DefaultLogin │ │ (其他Login... │
│ (SASL/PLAIN使用) │ │ 实现) │
└─────────────────────┘ └───────────────────┘
DefaultLogin的核心login()方法:
// AbstractLogin.java (DefaultLogin的父类)
public LoginContext login() throws LoginException {
// ① 检查java.security.auth.login.config是否配置
String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
if (jaasConfigFile == null) {
log.debug("System property '{}' is not set", JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
}
// ② 根据loginContextName("KafkaClient"或"KafkaServer")查找配置
AppConfigurationEntry[] configEntries = Configuration.getConfiguration()
.getAppConfigurationEntry(loginContextName);
if (configEntries == null) {
throw new IllegalArgumentException(
"找不到 " + loginContextName + " 的JAAS配置!");
}
// ③ 创建LoginContext → 调用所有LoginModule的login()和commit()
loginContext = new LoginContext(loginContextName, new LoginCallbackHandler());
loginContext.login();
return loginContext;
}
loginContextName的匹配:
- 客户端代码中这个值是
"KafkaClient",对应kafka_client_jaas.conf第一行的KafkaClient - 服务端代码中这个值是
"KafkaServer",对应kafka_server_jaas.conf第一行的KafkaServer
五、ChannelBuilder体系——不同协议不同通道
Kafka支持多种安全协议(PLAINTEXT / SSL / SASL_PLAINTEXT / SASL_SSL),不同协议需要不同的通道。ChannelBuilder体系负责根据配置创建正确的KafkaChannel:
┌──────────────────────────────────────────────────────┐
│ ChannelBuilder (接口) │
│ + buildChannel() → KafkaChannel │
│ + configure() │
└──────────┬───────────────────────────┬───────────────┘
│ │
┌───────▼────────┐ ┌───────▼────────────┐
│PlaintextChannel│ │ SaslChannelBuilder │
│ Builder │ │ │
│ (明文通道) │ │ fields: │
└────────────────┘ │ • loginType │
│ • loginManager │
│ • clientSaslMechanism│
│ • securityProtocol │
└──────────────────────┘
创建ChannelBuilder的入口在ClientUtils.createChannelBuilder():
// ClientUtils.java
public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
// ① 读取security.protocol配置
SecurityProtocol securityProtocol = SecurityProtocol.forName(
(String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
// ② 读取sasl.mechanism配置
String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
// ③ 根据协议类型创建对应的ChannelBuilder
return ChannelBuilders.create(securityProtocol, Mode.CLIENT,
LoginType.CLIENT, configs, clientSaslMechanism, true);
}
// ChannelBuilders.java
public static ChannelBuilder create(SecurityProtocol securityProtocol, ...) {
ChannelBuilder channelBuilder;
switch (securityProtocol) {
case SSL:
channelBuilder = new SslChannelBuilder(mode);
break;
case SASL_SSL:
case SASL_PLAINTEXT: // ← 我们配置的是这个
channelBuilder = new SaslChannelBuilder(mode, loginType,
securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
break;
case PLAINTEXT:
default:
channelBuilder = new PlaintextChannelBuilder();
break;
}
channelBuilder.configure(configs);
return channelBuilder;
}
SaslChannelBuilder.configure()的核心操作是创建LoginManager → 创建Login对象 → 调用login()方法,这一步会完成JAAS配置文件的读取和Subject的初始化。
六、SaslClientAuthenticator——客户端认证的核心引擎
SaslChannelBuilder.buildChannel()创建KafkaChannel时,会根据是客户端还是服务端来创建不同的Authenticator:
// SaslChannelBuilder.java
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 底层传输层,包装了SocketChannel
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
Authenticator authenticator;
if (mode == Mode.SERVER)
authenticator = new SaslServerAuthenticator(...); // 服务端
else
authenticator = new SaslClientAuthenticator(...); // 客户端
authenticator.configure(transportLayer, null, this.configs);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
}
SaslClientAuthenticator的configure()方法完成最后的初始化:
public void configure(TransportLayer transportLayer, ...) {
this.transportLayer = transportLayer;
// 初始状态:先发握手指令
setSaslState(handshakeRequestEnable ?
SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
// 从Subject的principals中提取身份信息
if (!subject.getPrincipals().isEmpty()) {
Principal clientPrincipal = subject.getPrincipals().iterator().next();
this.clientPrincipalName = clientPrincipal.getName();
}
// 创建用于收集认证信息的回调处理器
callbackHandler = new SaslClientCallbackHandler();
callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
// 创建SaslClient对象(PLAIN认证对应PlainClient)
saslClient = createSaslClient();
}
七、Challenge-Response协议——认证的"问答题"
SASL是一种**Challenge-Response(挑战-应答)**协议——服务端发一个问题(Challenge),客户端根据问题给出答案(Response),反复问答直到认证通过。
【Challenge-Response 协议流程】
客户端 (SaslClient) 服务端 (SaslServer)
│ │
│ ① NameCallback: "username=?" │
│◄────────────────────────────────────│
│ │
│ ② Response: "xiaoming" │
│─────────────────────────────────────►│
│ │
│ ③ PasswordCallback: "password=?" │
│◄────────────────────────────────────│
│ │
│ ④ Response: "xiaoming" │
│─────────────────────────────────────►│
│ │
│ ⑤ 认证通过 │
│◄────────────────────────────────────│
▼ ▼
Java标准库的SaslClient API:
// SaslClient伪代码 - Challenge-Response流程
SaslClient sc = Sasl.createSaslClient(mechanisms, ..., callbackHandler);
String mechanism = sc.getName();
// 初始Response(如果有的话)
byte[] response = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
send(mechanism, response);
// 循环处理Challenge
msg = receive();
while (!sc.isComplete()) { // 直到认证完成
response = sc.evaluateChallenge(msg.contents); // 生成Response
if (msg.status == SUCCESS) {
break; // 认证成功
} else {
send(mechanism, response); // 发送Response
msg = receive(); // 等待下一个Challenge
}
}
回调处理中从Subject提取凭证:
// SaslClientCallbackHandler.java
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
// 从Subject的pubCredentials中取出username
NameCallback nc = (NameCallback) callback;
if (subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
nc.setName(subject.getPublicCredentials(String.class).iterator().next());
} else {
nc.setName(nc.getDefaultName());
}
} else if (callback instanceof PasswordCallback) {
// 从Subject的privCredentials中取出password
if (subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
char[] password = subject.getPrivateCredentials(String.class)
.iterator().next().toCharArray();
((PasswordCallback) callback).setPassword(password);
}
} else if (callback instanceof AuthorizeCallback) {
AuthorizeCallback ac = (AuthorizeCallback) callback;
ac.setAuthorized(ac.getAuthenticationID().equals(ac.getAuthorizationID()));
if (ac.isAuthorized())
ac.setAuthorizedID(ac.getAuthorizationID());
}
}
}
八、SaslClientAuthenticator状态机——五步完成认证
SaslClientAuthenticator内部有一个状态机驱动整个认证流程:
【SaslClientAuthenticator状态转换图】
┌──────────────────────┐
│ SEND_HANDSHAKE_REQUEST│ ← 初始状态
│ 发送SASL握手指令 │
└──────────┬───────────┘
│ 发送完成
▼
┌──────────────────────┐
│RECEIVE_HANDSHAKE_ │
│RESPONSE │
│ 接收服务端握手响应 │
└──────────┬───────────┘
│ 握手成功
▼
┌──────────────────────┐
│ INITIAL │
│ 发送空的初始Response │
└──────────┬───────────┘
│ Response发送完成
▼
┌──────────────────────┐
│ INTERMEDIATE │ ← 循环
│ 处理Challenge/Response│
└──────────┬───────────┘
│ 认证通过
▼
┌──────────────────────┐
│ COMPLETE │
│ 认证完成 │
└──────────────────────┘
Step 1: SEND_HANDSHAKE_REQUEST → RECEIVE_HANDSHAKE_RESPONSE
客户端构建SaslHandshakeRequest,告诉服务端"我要用PLAIN方式认证":
case SEND_HANDSHAKE_REQUEST:
// 创建握手指令,包含当前使用的SASL机制名("PLAIN")
SaslHandshakeRequest handshakeRequest =
new SaslHandshakeRequest.Builder(clientSaslMechanism).build();
// 发送 → 状态切换为RECEIVE_HANDSHAKE_RESPONSE
netOutBuffer = handshakeRequest.toSend(...);
setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
Step 2: RECEIVE_HANDSHAKE_RESPONSE → INITIAL / FAILED
接收服务端的握手响应,检查服务端是否支持客户端声明的SASL机制:
case RECEIVE_HANDSHAKE_RESPONSE:
SaslHandshakeResponse response = SaslHandshakeResponse.parse(netInBuffer);
if (response.errorCode() != Errors.NONE) {
// 服务端不支持客户端声明的SASL机制 → 失败
setSaslState(SaslState.FAILED);
throw new SaslAuthenticationException(...);
}
// 握手成功 → 进入认证阶段
setSaslState(SaslState.INITIAL);
Step 3: INITIAL → INTERMEDIATE
发送一个空的初始Response,启动Challenge-Response循环:
case INITIAL:
if (saslClient.hasInitialResponse()) {
byte[] initialResponse = saslClient.evaluateChallenge(new byte[0]);
// 在PLAIN模式下:一次性发送"username\0username\0password"格式
sendSaslClientMessage(initialResponse);
} else {
sendSaslClientMessage(null); // 发送空消息
}
setSaslState(SaslState.INTERMEDIATE);
Step 4: INTERMEDIATE → COMPLETE
循环处理Challenge-Response,直到saslClient.isComplete()返回true:
case INTERMEDIATE:
// 读取服务端发来的Challenge
byte[] serverToken = netInBuffer.array();
// 调用saslClient.evaluateChallenge()生成Response
byte[] clientResponse = saslClient.evaluateChallenge(serverToken);
if (saslClient.isComplete()) {
// 认证通过!不再关注写事件
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
setSaslState(SaslState.COMPLETE);
} else {
// 继续发送Response
sendSaslClientMessage(clientResponse);
// 保持INTERMEDIATE状态
}
Step 5: COMPLETE
认证完成,后续所有网络通信恢复正常——不再走认证逻辑。
九、SaslServerAuthenticator——服务端如何验证
服务端的SaslServerAuthenticator比客户端更复杂,它不仅要验证客户端凭证,还要从JAAS配置中查找该用户是否存在。
服务端PlainLoginModule初始化时会读取所有配置用户:
// PlainLoginModule中读取的用户列表
// kafka_server_jaas.conf内容:
// user_admin="admin"
// user_xiaoming="xiaoming"
// 这些user_xxx键值对都会被加载为一个"用户密码字典"
@Override
public void initialize(Subject subject, CallbackHandler callbackHandler,
Map<String, ?> sharedState, Map<String, ?> options) {
// 遍历所有 user_xxx 配置项
for (Map.Entry<String, ?> entry : options.entrySet()) {
String key = entry.getKey();
if (key.startsWith("user_")) {
String username = key.substring(5); // "admin"
String password = (String) entry.getValue(); // "admin"
userDatabase.put(username, password);
}
}
}
服务端认证时从userDatabase中查找客户端发来的用户名密码,比对成功则通过。
认证通过后,客户端身份被确定。下一步就是权限控制(ACL)来检查"这个用户能做什么"——这是我们下一篇文章的内容。
本篇小结
这篇文章从配置到源码,完整拆解了Kafka SASL/PLAIN身份认证的链路:
- JAAS架构:Kafka通过PlainLoginModule从配置文件读取用户名密码,但
login()方法为空实现——真正认证不在这里 - Login封装:Login接口封装了LoginContext,loginContextName(“KafkaClient”/“KafkaServer”)必须与JAAS配置文件匹配
- ChannelBuilder体系:根据
security.protocol配置创建对应的ChannelBuilder,SASL_PLAINTEXT对应SaslChannelBuilder - Authenticator核心:SaslClientAuthenticator使用五状态状态机(HANDSHAKE→INITIAL→INTERMEDIATE→COMPLETE)驱动整个认证流程
- Challenge-Response协议:客户端通过
saslClient.evaluateChallenge(challenge)循环处理服务端发来的挑战,直到认证完成 - 凭证流转:JAAS配置 → PlainLoginModule.initialize() → Subject.pubCredentials/privCredentials → SaslClientCallbackHandler → SaslClient对象
下一篇我们继续Kafka安全模块——ACL权限控制的源码解析,看看认证通过之后,Kafka怎么决定"你能读哪个Topic,能写哪个Topic"。
上一篇【第56篇】GroupCoordinator源码解析(二)——GroupMetadataManager持久化
下一篇【第58篇】Kafka权限控制ACL源码解析——谁能读,谁能写,谁说了算
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)