上一篇【第56篇】GroupCoordinator源码解析(二)——GroupMetadataManager持久化
下一篇【第58篇】Kafka权限控制ACL源码解析——谁能读,谁能写,谁说了算


摘要

Kafka从0.9版本开始支持身份认证和权限控制,SASL/PLAIN是最简单(也是最危险——明文传输)的认证方式。它的源码实现涉及JAAS架构、ChannelBuilder体系、Authenticator状态机、Challenge-Response协议等多个组件。

这篇文章从配置入手,一路追到认证完成的最后一行代码:PlainLoginModule怎么从JAAS配置文件读取用户名密码、ChannelBuilder怎么根据security.protocol创建正确的通道、SaslClientAuthenticator的状态机怎么驱动握手和挑战应答流程,以及SaslServerAuthenticator在服务端如何验证客户端身份。读完这篇,你就能理解Kafka身份认证的完整底层逻辑。


一、身份认证 vs 权限控制——两个容易被搞混的概念

在开始源码之前,先把两个概念搞清楚:

概念 通俗解释 在Kafka中的体现
身份认证 (Authentication) “你是谁?”——验证你的身份 客户端和服务端之间建立连接时,验证用户名和密码
权限控制 (Authorization) “你能干嘛?”——决定你的权限 认证通过后,检查用户有没有读写某个Topic的权限
【认证 + 授权 = 完整安全链条】

  客户端                     服务端
    │                         │
    │ ① "我是xiaoming,密码123" │ (身份认证)
    │ ───────────────────────► │
    │                         ├─ 去JAAS配置文件查
    │                         │  "xiaoming:/123" 存在吗?
    │                         ├─ 存在!认证通过
    │                         │
    │ ② "我要读topic test"    │ (权限控制)
    │ ───────────────────────► │
    │                         ├─ 查ACL规则
    │                         │  xiaoming有Read权限?
    │                         ├─ 有!允许操作
    │                         │
    │◄─────────────────────── │ 返回数据
    ▼                         ▼

简单说:认证是"进门",授权是"拿东西"。先进门,再拿东西。


二、SASL/PLAIN配置——先跑通再用源码拆

在看源码之前,先把环境搭起来。SASL/PLAIN需要配置两端:

2.1 服务端配置

修改config/server.properties

# 启用SASL/PLAIN认证
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# 可选:启用ACL权限控制
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

创建kafka_server_jaas.conf——服务端的"用户密码本":

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_xiaoming="xiaoming";
};

启动时加载JAAS配置:

export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
bin/kafka-server-start.sh config/server.properties

2.2 客户端配置

创建kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="xiaoming"
    password="xiaoming";
};

Java代码中加载:

// 方式1:JVM参数
System.setProperty("java.security.auth.login.config",
    "D:/myConf/kafka_client_jaas.conf");

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

配置好后,客户端连接时就会走SASL/PLAIN认证流程。下面我们进入源码拆解。


三、JAAS架构——PlainLoginModule的"本职工作"

JAAS(Java Authentication Authorization Service)是Java安全框架的核心,它在应用代码和底层安全机制之间加了一层抽象。核心概念:面向LoginContext编程,底层由LoginModule完成实际认证

【JAAS分层架构】

  ┌──────────────────────────────────────────┐
  │          Application Code                 │
  │   (KafkaProducer / KafkaConsumer)        │
  │                                           │
  │         使用 Login / LoginManager         │
  └──────────────────┬───────────────────────┘
                     │
  ┌──────────────────▼───────────────────────┐
  │           LoginContext                    │
  │   (JAAS 的核心调度入口)                    │
  │   - 读取 .conf 配置文件                    │
  │   - 加载指定的 LoginModule                │
  │   - 依次调用 login() → commit()           │
  └──────────────────┬───────────────────────┘
                     │
  ┌──────────────────▼───────────────────────┐
  │        PlainLoginModule                   │
  │   (Kafka 自定义的 LoginModule)            │
  │   - initialize(): 读取 username/password  │
  │   - 把凭证写入 Subject 对象               │
  └──────────────────────────────────────────┘

PlainLoginModule的源码非常简单,它只做了一件事:从JAAS配置文件读取用户名密码,塞进Subject对象里:

// PlainLoginModule.java
public class PlainLoginModule implements LoginModule {
    private static final String USERNAME_CONFIG = "username";
    private static final String PASSWORD_CONFIG = "password";

    // 注册Plain SASL服务提供者(服务端用)
    static {
        PlainSaslServerProvider.initialize();
    }

    @Override
    public void initialize(Subject subject, CallbackHandler callbackHandler,
                          Map<String, ?> sharedState, Map<String, ?> options) {
        // ① 读取用户名 → 放入公钥凭证
        String username = (String) options.get(USERNAME_CONFIG);
        if (username != null)
            subject.getPublicCredentials().add(username);
        
        // ② 读取密码 → 放入私钥凭证
        String password = (String) options.get(PASSWORD_CONFIG);
        if (password != null)
            subject.getPrivateCredentials().add(password);
    }

    // login() / logout() / commit() 都是空返回
    // abort() 始终返回 false
}

关键发现:PlainLoginModule的login()commit()方法都是空实现!它并没有真正做认证。真正认证是谁干的?答案是后续的Authenticator对象。

Subject对象的三个字段

Subject代表一个"访问主体",有三个核心属性:

┌──────────────────────────────────────────┐
│              Subject                     │
│                                          │
│  principals:       用户身份(如用户名)    │
│  pubCredentials:   公钥凭证(用户名)      │
│  privCredentials:  私钥凭证(密码)        │
└──────────────────────────────────────────┘

PlainLoginModule在initialize中把username放进了pubCredentials,password放进了privCredentials。后续认证时从这个Subject里取出来用。


四、Login接口——对LoginContext的二次封装

Kafka没有直接使用JAAS的LoginContext,而是通过Login接口做了封装。类层次关系:

┌──────────────────────────────────────────────────────┐
│                    《interface》                      │
│                      Login                          │
│  + configure()    // 配置Login对象                   │
│  + login()        // 执行认证,返回LoginContext      │
│  + subject()      // 返回Subject                     │
│  + serviceName()  // 服务名,默认"kafka"              │
└──────────────────────┬───────────────────────────────┘
                       │
           ┌───────────┴───────────┐
           │                       │
┌──────────▼──────────┐  ┌────────▼──────────┐
│   DefaultLogin     │  │  (其他Login...  │
│  (SASL/PLAIN使用)   │  │   实现)          │
└─────────────────────┘  └───────────────────┘

DefaultLogin的核心login()方法:

// AbstractLogin.java (DefaultLogin的父类)
public LoginContext login() throws LoginException {
    // ① 检查java.security.auth.login.config是否配置
    String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
    if (jaasConfigFile == null) {
        log.debug("System property '{}' is not set", JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
    }
    
    // ② 根据loginContextName("KafkaClient"或"KafkaServer")查找配置
    AppConfigurationEntry[] configEntries = Configuration.getConfiguration()
        .getAppConfigurationEntry(loginContextName);
    
    if (configEntries == null) {
        throw new IllegalArgumentException(
            "找不到 " + loginContextName + " 的JAAS配置!");
    }
    
    // ③ 创建LoginContext → 调用所有LoginModule的login()和commit()
    loginContext = new LoginContext(loginContextName, new LoginCallbackHandler());
    loginContext.login();
    
    return loginContext;
}

loginContextName的匹配

  • 客户端代码中这个值是"KafkaClient",对应kafka_client_jaas.conf第一行的KafkaClient
  • 服务端代码中这个值是"KafkaServer",对应kafka_server_jaas.conf第一行的KafkaServer

五、ChannelBuilder体系——不同协议不同通道

Kafka支持多种安全协议(PLAINTEXT / SSL / SASL_PLAINTEXT / SASL_SSL),不同协议需要不同的通道。ChannelBuilder体系负责根据配置创建正确的KafkaChannel:

┌──────────────────────────────────────────────────────┐
│               ChannelBuilder (接口)                   │
│  + buildChannel() → KafkaChannel                     │
│  + configure()                                       │
└──────────┬───────────────────────────┬───────────────┘
           │                           │
   ┌───────▼────────┐         ┌───────▼────────────┐
   │PlaintextChannel│         │  SaslChannelBuilder  │
   │   Builder     │         │                      │
   │  (明文通道)     │         │  fields:            │
   └────────────────┘         │  • loginType        │
                              │  • loginManager     │
                              │  • clientSaslMechanism│
                              │  • securityProtocol │
                              └──────────────────────┘

创建ChannelBuilder的入口在ClientUtils.createChannelBuilder()

// ClientUtils.java
public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
    // ① 读取security.protocol配置
    SecurityProtocol securityProtocol = SecurityProtocol.forName(
        (String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
    
    // ② 读取sasl.mechanism配置
    String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
    
    // ③ 根据协议类型创建对应的ChannelBuilder
    return ChannelBuilders.create(securityProtocol, Mode.CLIENT,
        LoginType.CLIENT, configs, clientSaslMechanism, true);
}

// ChannelBuilders.java
public static ChannelBuilder create(SecurityProtocol securityProtocol, ...) {
    ChannelBuilder channelBuilder;
    switch (securityProtocol) {
        case SSL:
            channelBuilder = new SslChannelBuilder(mode);
            break;
        case SASL_SSL:
        case SASL_PLAINTEXT:  // ← 我们配置的是这个
            channelBuilder = new SaslChannelBuilder(mode, loginType,
                securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
            break;
        case PLAINTEXT:
        default:
            channelBuilder = new PlaintextChannelBuilder();
            break;
    }
    channelBuilder.configure(configs);
    return channelBuilder;
}

SaslChannelBuilder.configure()的核心操作是创建LoginManager → 创建Login对象 → 调用login()方法,这一步会完成JAAS配置文件的读取和Subject的初始化。


六、SaslClientAuthenticator——客户端认证的核心引擎

SaslChannelBuilder.buildChannel()创建KafkaChannel时,会根据是客户端还是服务端来创建不同的Authenticator:

// SaslChannelBuilder.java
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    // 底层传输层,包装了SocketChannel
    TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
    
    Authenticator authenticator;
    if (mode == Mode.SERVER)
        authenticator = new SaslServerAuthenticator(...);   // 服务端
    else
        authenticator = new SaslClientAuthenticator(...);   // 客户端
    
    authenticator.configure(transportLayer, null, this.configs);
    return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
}

SaslClientAuthenticator的configure()方法完成最后的初始化:

public void configure(TransportLayer transportLayer, ...) {
    this.transportLayer = transportLayer;
    
    // 初始状态:先发握手指令
    setSaslState(handshakeRequestEnable ? 
        SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
    
    // 从Subject的principals中提取身份信息
    if (!subject.getPrincipals().isEmpty()) {
        Principal clientPrincipal = subject.getPrincipals().iterator().next();
        this.clientPrincipalName = clientPrincipal.getName();
    }
    
    // 创建用于收集认证信息的回调处理器
    callbackHandler = new SaslClientCallbackHandler();
    callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
    
    // 创建SaslClient对象(PLAIN认证对应PlainClient)
    saslClient = createSaslClient();
}

七、Challenge-Response协议——认证的"问答题"

SASL是一种**Challenge-Response(挑战-应答)**协议——服务端发一个问题(Challenge),客户端根据问题给出答案(Response),反复问答直到认证通过。

【Challenge-Response 协议流程】

  客户端 (SaslClient)                      服务端 (SaslServer)
       │                                       │
       │ ① NameCallback: "username=?"        │
       │◄────────────────────────────────────│
       │                                      │
       │ ② Response: "xiaoming"              │
       │─────────────────────────────────────►│
       │                                      │
       │ ③ PasswordCallback: "password=?"    │
       │◄────────────────────────────────────│
       │                                      │
       │ ④ Response: "xiaoming"              │
       │─────────────────────────────────────►│
       │                                      │
       │ ⑤ 认证通过                            │
       │◄────────────────────────────────────│
       ▼                                      ▼

Java标准库的SaslClient API:

// SaslClient伪代码 - Challenge-Response流程
SaslClient sc = Sasl.createSaslClient(mechanisms, ..., callbackHandler);
String mechanism = sc.getName();

// 初始Response(如果有的话)
byte[] response = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
send(mechanism, response);

// 循环处理Challenge
msg = receive();
while (!sc.isComplete()) {  // 直到认证完成
    response = sc.evaluateChallenge(msg.contents);  // 生成Response
    if (msg.status == SUCCESS) {
        break;  // 认证成功
    } else {
        send(mechanism, response);  // 发送Response
        msg = receive();            // 等待下一个Challenge
    }
}

回调处理中从Subject提取凭证:

// SaslClientCallbackHandler.java
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
    for (Callback callback : callbacks) {
        if (callback instanceof NameCallback) {
            // 从Subject的pubCredentials中取出username
            NameCallback nc = (NameCallback) callback;
            if (subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
                nc.setName(subject.getPublicCredentials(String.class).iterator().next());
            } else {
                nc.setName(nc.getDefaultName());
            }
        } else if (callback instanceof PasswordCallback) {
            // 从Subject的privCredentials中取出password
            if (subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
                char[] password = subject.getPrivateCredentials(String.class)
                    .iterator().next().toCharArray();
                ((PasswordCallback) callback).setPassword(password);
            }
        } else if (callback instanceof AuthorizeCallback) {
            AuthorizeCallback ac = (AuthorizeCallback) callback;
            ac.setAuthorized(ac.getAuthenticationID().equals(ac.getAuthorizationID()));
            if (ac.isAuthorized())
                ac.setAuthorizedID(ac.getAuthorizationID());
        }
    }
}

八、SaslClientAuthenticator状态机——五步完成认证

SaslClientAuthenticator内部有一个状态机驱动整个认证流程:

【SaslClientAuthenticator状态转换图】

  ┌──────────────────────┐
  │ SEND_HANDSHAKE_REQUEST│  ← 初始状态
  │ 发送SASL握手指令       │
  └──────────┬───────────┘
             │ 发送完成
             ▼
  ┌──────────────────────┐
  │RECEIVE_HANDSHAKE_    │
  │RESPONSE              │
  │ 接收服务端握手响应     │
  └──────────┬───────────┘
             │ 握手成功
             ▼
  ┌──────────────────────┐
  │      INITIAL         │
  │ 发送空的初始Response  │
  └──────────┬───────────┘
             │ Response发送完成
             ▼
  ┌──────────────────────┐
  │    INTERMEDIATE      │  ← 循环
  │ 处理Challenge/Response│
  └──────────┬───────────┘
             │ 认证通过
             ▼
  ┌──────────────────────┐
  │      COMPLETE        │
  │ 认证完成               │
  └──────────────────────┘

Step 1: SEND_HANDSHAKE_REQUEST → RECEIVE_HANDSHAKE_RESPONSE

客户端构建SaslHandshakeRequest,告诉服务端"我要用PLAIN方式认证":

case SEND_HANDSHAKE_REQUEST:
    // 创建握手指令,包含当前使用的SASL机制名("PLAIN")
    SaslHandshakeRequest handshakeRequest = 
        new SaslHandshakeRequest.Builder(clientSaslMechanism).build();
    // 发送 → 状态切换为RECEIVE_HANDSHAKE_RESPONSE
    netOutBuffer = handshakeRequest.toSend(...);
    setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);

Step 2: RECEIVE_HANDSHAKE_RESPONSE → INITIAL / FAILED

接收服务端的握手响应,检查服务端是否支持客户端声明的SASL机制:

case RECEIVE_HANDSHAKE_RESPONSE:
    SaslHandshakeResponse response = SaslHandshakeResponse.parse(netInBuffer);
    if (response.errorCode() != Errors.NONE) {
        // 服务端不支持客户端声明的SASL机制 → 失败
        setSaslState(SaslState.FAILED);
        throw new SaslAuthenticationException(...);
    }
    // 握手成功 → 进入认证阶段
    setSaslState(SaslState.INITIAL);

Step 3: INITIAL → INTERMEDIATE

发送一个空的初始Response,启动Challenge-Response循环:

case INITIAL:
    if (saslClient.hasInitialResponse()) {
        byte[] initialResponse = saslClient.evaluateChallenge(new byte[0]);
        // 在PLAIN模式下:一次性发送"username\0username\0password"格式
        sendSaslClientMessage(initialResponse);
    } else {
        sendSaslClientMessage(null);  // 发送空消息
    }
    setSaslState(SaslState.INTERMEDIATE);

Step 4: INTERMEDIATE → COMPLETE

循环处理Challenge-Response,直到saslClient.isComplete()返回true:

case INTERMEDIATE:
    // 读取服务端发来的Challenge
    byte[] serverToken = netInBuffer.array();
    
    // 调用saslClient.evaluateChallenge()生成Response
    byte[] clientResponse = saslClient.evaluateChallenge(serverToken);
    
    if (saslClient.isComplete()) {
        // 认证通过!不再关注写事件
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
        setSaslState(SaslState.COMPLETE);
    } else {
        // 继续发送Response
        sendSaslClientMessage(clientResponse);
        // 保持INTERMEDIATE状态
    }

Step 5: COMPLETE

认证完成,后续所有网络通信恢复正常——不再走认证逻辑。


九、SaslServerAuthenticator——服务端如何验证

服务端的SaslServerAuthenticator比客户端更复杂,它不仅要验证客户端凭证,还要从JAAS配置中查找该用户是否存在。

服务端PlainLoginModule初始化时会读取所有配置用户:

// PlainLoginModule中读取的用户列表
// kafka_server_jaas.conf内容:
//   user_admin="admin"
//   user_xiaoming="xiaoming"
// 这些user_xxx键值对都会被加载为一个"用户密码字典"

@Override
public void initialize(Subject subject, CallbackHandler callbackHandler,
                      Map<String, ?> sharedState, Map<String, ?> options) {
    // 遍历所有 user_xxx 配置项
    for (Map.Entry<String, ?> entry : options.entrySet()) {
        String key = entry.getKey();
        if (key.startsWith("user_")) {
            String username = key.substring(5);  // "admin"
            String password = (String) entry.getValue();  // "admin"
            userDatabase.put(username, password);
        }
    }
}

服务端认证时从userDatabase中查找客户端发来的用户名密码,比对成功则通过。

认证通过后,客户端身份被确定。下一步就是权限控制(ACL)来检查"这个用户能做什么"——这是我们下一篇文章的内容。


本篇小结

这篇文章从配置到源码,完整拆解了Kafka SASL/PLAIN身份认证的链路:

  • JAAS架构:Kafka通过PlainLoginModule从配置文件读取用户名密码,但login()方法为空实现——真正认证不在这里
  • Login封装:Login接口封装了LoginContext,loginContextName(“KafkaClient”/“KafkaServer”)必须与JAAS配置文件匹配
  • ChannelBuilder体系:根据security.protocol配置创建对应的ChannelBuilder,SASL_PLAINTEXT对应SaslChannelBuilder
  • Authenticator核心:SaslClientAuthenticator使用五状态状态机(HANDSHAKE→INITIAL→INTERMEDIATE→COMPLETE)驱动整个认证流程
  • Challenge-Response协议:客户端通过saslClient.evaluateChallenge(challenge)循环处理服务端发来的挑战,直到认证完成
  • 凭证流转:JAAS配置 → PlainLoginModule.initialize() → Subject.pubCredentials/privCredentials → SaslClientCallbackHandler → SaslClient对象

下一篇我们继续Kafka安全模块——ACL权限控制的源码解析,看看认证通过之后,Kafka怎么决定"你能读哪个Topic,能写哪个Topic"。


上一篇【第56篇】GroupCoordinator源码解析(二)——GroupMetadataManager持久化
下一篇【第58篇】Kafka权限控制ACL源码解析——谁能读,谁能写,谁说了算


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐