上一篇【第57篇】Kafka身份认证源码解析——SASL/PLAIN认证是怎么实现的
下一篇【第59篇】Kafka监控体系解析——用JMX+Metrics打造生产级监控


摘要

上一篇我们拆解了身份认证——服务端确认"你是谁"。认证通过后,下一个问题是"你能做什么"。这就是ACL权限控制要回答的。

Kafka的权限控制设计成插件形式,内置了SimpleAclAuthorizer实现,将ACL规则存储在ZooKeeper的/kafka-acl路径下,同时在内存中维护一份缓存(aclCache)加速查询。每个请求到达Handler线程处理时,都会先走authorize()方法检查权限。本文从权限模型(5要素)、ZooKeeper存储结构、内存缓存同步机制、匹配逻辑(Deny优先)到kafka-acls脚本的底层原理,完整拆解Kafka ACL的源码实现。


一、权限模型——ACL的五要素

在Kafka中,一条ACL规则由五个要素组成。一句话概括:某个主体(Principal)对某个资源(Resource)的某个操作(Operation)是被允许还是被拒绝(PermissionType)

【ACL五要素】

  ┌──────────────────────────────────────────────────────────┐
  │                   ACL 规则                               │
  │                                                          │
  │  Principal      PermissionType    Operation     Host     │
  │  (用户:xiaoming)  Allow           Read          *        │
  │                                                          │
  │  Resource: Topic:test                                    │
  │                                                          │
  │  含义:"用户xiaoming可以从任何主机读取Topic test的数据"     │
  └──────────────────────────────────────────────────────────┘

五个要素的详细说明:

要素 类型 可选值 说明
Principal KafkaPrincipal User:xxx 主体(即用户身份),从认证通过的Session中获取
PermissionType PermissionType Allow / Deny 允许还是拒绝
Operation Operation Read/Write/Create/Delete/Alter/Describe/ClusterAction/All 操作类型,共8种
Host String IP地址或*(通配) 限制客户端来源IP
Resource Resource Topic / Group / Cluster + 资源名 资源类型+资源名称

资源类型(ResourceType)有三种

┌────────────────────────────────────────────────┐
│              ResourceType                      │
│                                                │
│  Cluster       Topic          Group           │
│  (集群级别)    (Topic级别)    (消费者组级别)   │
│  例:Cluster   例:test      例:order-consumer│
└────────────────────────────────────────────────┘

Operation八种操作类型

操作 含义 适用资源
Read 读取数据 Topic
Write 写入数据 Topic
Create 创建资源 Cluster, Topic
Delete 删除资源 Topic, Group
Alter 修改配置 Topic, Cluster
Describe 查看描述信息 Topic, Group, Cluster
ClusterAction 执行集群操作 Cluster
All 所有操作的集合 通配符

二、ZooKeeper中的ACL存储——/kafka-acl路径下的秘密

SimpleAclAuthorizer把所有ACL规则存储在ZooKeeper中,路径结构如下:

【ZooKeeper ACL存储结构】

  /kafka-acl                              ← 根节点(由SimpleAclAuthorizer创建)
  ├── /Topic                             ← 资源类型:Topic
  │   ├── /test                         ← 资源名
  │   │   └── JSON:                     ← 该资源的ACL规则
  │   │       {
  │   │         "version": 1,
  │   │         "acls": [
  │   │           {"principal":"User:xiaoming","permissionType":"Allow",
  │   │            "operation":"Read","host":"*"},
  │   │           {"principal":"User:xiaoming","permissionType":"Allow",
  │   │            "operation":"Write","host":"*"}
  │   │         ]
  │   │       }
  │   └── /order-events
  │       └── JSON: {...}
  │
  ├── /Group                             ← 资源类型:Consumer Group
  │   └── /order-consumer
  │       └── JSON: {...}
  │
  └── /Cluster                           ← 资源类型:集群级别
      └── /kafka-cluster
          └── JSON: {...}

  /kafka-acl-changes                     ← ACL变更通知节点
  ├── /acl_changes_00000000001           ← 持久顺序节点
  └── /acl_changes_00000000002

ACL规则在内存中对应的数据结构:

// VersionedAcls —— 包装ACL集合和ZK版本号
private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)

// Acl —— 单条权限规则
case class Acl(
  principal: KafkaPrincipal,      // 主体(User:xiaoming)
  permissionType: PermissionType,  // Allow 或 Deny
  host: String,                    // 来源IP,*表示通配
  operation: Operation             // Read/Write/.../All
)

三、SimpleAclAuthorizer的初始化——从ZooKeeper加载ACL规则

Kafka启动时,如果配置了authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer,就会用反射创建SimpleAclAuthorizer并初始化:

// KafkaServer.scala - startup()
authorizer = Option(config.authorizerClassName)
    .filter(_.nonEmpty).map { authorizerClassName =>
  val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)  // 反射创建
  authZ.configure(config.originals())  // 配置初始化
  authZ
}

SimpleAclAuthorizer.configure()做了四件事:

// SimpleAclAuthorizer.scala - configure()
override def configure(javaConfigs: util.Map[String, _]) {
  val configs = javaConfigs.asScala
  
  // ① 解析超级用户列表
  superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
    case str: String if str.nonEmpty => 
      str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
  }.getOrElse(Set.empty[KafkaPrincipal])
  
  // ② 读取"无规则时的默认行为"配置
  shouldAllowEveryoneIfNoAclIsFound = configs.get(
    SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp)
    .exists(_.toString.toBoolean)
  
  // ③ 确保ZK路径存在
  zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
  
  // ④ 从ZooKeeper加载ACL → 内存缓存
  loadCache()
  
  // ⑤ 注册ZK监听器,监听ACL变更
  zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
  aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils,
    SimpleAclAuthorizer.AclChangedZkPath, 
    SimpleAclAuthorizer.AclChangedPrefix,
    AclChangedNotificationHandler)
  aclChangeListener.init()
}

loadCache()方法——启动时全量加载ACL规则到内存:

private def loadCache() {
  inWriteLock(lock) {
    // 遍历 /kafka-acl 下的所有资源类型(Topic/Group/Cluster)
    val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
    for (rType <- resourceTypes) {
      val resourceType = ResourceType.fromString(rType)
      val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
      
      // 遍历该资源类型下的所有具体资源
      val resourceNames = zkUtils.getChildren(resourceTypePath)
      for (resourceName <- resourceNames) {
        // 读取JSON数据 → 解析为VersionedAcls
        val versionedAcls = getAclsFromZk(
          Resource(resourceType, resourceName.toString))
        // 存入aclCache
        updateCache(new Resource(resourceType, resourceName), versionedAcls)
      }
    }
  }
}

四、ACL变更通知机制——管理员修改规则后如何实时生效

当你用kafka-acls脚本增删权限时,不仅会修改/kafka-acl下的JSON数据,还会在/kafka-acl-changes下创建一个持久顺序节点作为"变更通知"。SimpleAclAuthorizer通过ZkNodeChangeNotificationListener监听这些通知,实时更新内存缓存。

【ACL变更通知流程】

  kafka-acls脚本                  ZooKeeper                   SimpleAclAuthorizer
       │                              │                              │
       │ ① 修改 /kafka-acl/Topic/test│                              │
       │ ────────────────────────────►│                              │
       │    (写入新ACL JSON数据)       │                              │
       │                              │                              │
       │ ② 创建通知节点                │                              │
       │ /kafka-acl-changes/         │                              │
       │   acl_changes_0000000003    │                              │
       │ ────────────────────────────►│                              │
       │                              │ ③ NodeChangeListener被触发     │
       │                              │ ────────────────────────────►│
       │                              │                              │
       │                              │                  ④ processNotifications()
       │                              │                     读取通知 → 重新加载该资源ACL
       │                              │                     更新 aclCache
       ▼                              ▼                              ▼

监听器注册代码:

// ZkNodeChangeNotificationListener.init()
def init() {
  // 确保通知节点存在
  zkUtils.makeSurePersistentPathExists(seqNodeRoot)
  // 注册子节点变更监听
  zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
  // 注册ZK连接状态监听(重连后重新注册)
  zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
  // 处理已存在的未处理通知
  processAllNotifications()
}

处理通知的核心逻辑:

private def processNotifications(notifications: Seq[String]) {
  if (notifications.nonEmpty) {
    val now = time.milliseconds
    for (notification <- notifications) {
      val changeId = changeNumber(notification)  // 提取顺序编号
      if (changeId > lastExecutedChange) {       // 防止重复处理
        val changeZnode = seqNodeRoot + "/" + notification
        val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
        // 调用NotificationHandler更新缓存
        data.map(notificationHandler.processNotification(_))
      }
      lastExecutedChange = changeId
    }
    purgeObsoleteNotifications(now, notifications)  // 删除过期节点(15分钟后)
  }
}

AclChangedNotificationHandler重新加载对应资源的ACL:

object AclChangedNotificationHandler extends NotificationHandler {
  override def processNotification(notificationMessage: String) {
    val resource = Resource.fromString(notificationMessage)
    inWriteLock(lock) {
      val versionedAcls = getAclsFromZk(resource)  // 重新读取ZK数据
      updateCache(resource, versionedAcls)         // 更新缓存
    }
  }
}

五、Authorize()——权限判定的"终审法官"

每次Handler线程处理客户端请求时,都会调用KafkaApis.authorize()检查权限:

// KafkaApis.scala
private def authorize(session: Session, operation: Operation,
                      resource: Resource): Boolean =
    authorizer.map(_.authorize(session, operation, resource)).getOrElse(true)

SimpleAclAuthorizer.authorize()是权限判定的核心:

【authorize() 判定流程】

  传入参数:
  - session (包含用户身份principal + 来源IP)
  - operation (Read/Write/Create/...)
  - resource (Topic:test / Group:order-consumer / Cluster:kafka-cluster)

  ① 是超级用户(superUsers)吗?          ──是──► 允许 ✓
      │
      否
      ▼
  ② 该资源无ACL规则吗?                    ──是──► shouldAllowEveryoneIfNoAclIsFound?
      │                                         │
      否                                        ├─ true  → 允许 ✓
      ▼                                        └─ false → 拒绝 ✗
  ③ 存在 Deny 规则 且 匹配吗?             ──是──► 拒绝 ✗
      │    (Deny优先级高于Allow)
      否
      ▼
  ④ 存在 Allow 规则 且 匹配吗?            ──是──► 允许 ✓
      │
      否
      ▼
  ⑤ 无匹配规则                              ──► 拒绝 ✗

源码实现(精简版):

// SimpleAclAuthorizer.scala - authorize()
override def authorize(session: Session, operation: Operation, 
                       resource: Resource): Boolean = {
  val principal = session.principal
  val host = session.clientAddress.getHostAddress
  
  // 获取该资源的ACL + 通配符资源的ACL
  val acls = getAcls(resource) ++ getAcls(
    new Resource(resource.resourceType, Resource.WildCardResource))
  
  // 先检查Deny规则(Deny优先原则)
  val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
  
  // Describe权限的特殊处理:有Read或Write就自动给Describe
  val ops = if (Describe == operation)
    Set[Operation](operation, Read, Write)
  else
    Set[Operation](operation)
  
  val allowMatch = ops.exists(operation =>
    aclMatch(session, operation, resource, principal, host, Allow, acls))
  
  // 三级判定:超级用户 || 无ACL策略 || (无Deny匹配 && 有Allow匹配)
  val authorized = isSuperUser(operation, resource, principal, host) ||
    isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
    (!denyMatch && allowMatch)
  
  authorized
}

aclMatch()匹配方法——支持通配符:

private def aclMatch(session: Session, operations: Operation, resource: Resource,
    principal: KafkaPrincipal, host: String, 
    permissionType: PermissionType, acls: Set[Acl]): Boolean = {
  
  acls.find { acl =>
    acl.permissionType == permissionType  // ① 匹配 Allow/Deny
      && (acl.principal == principal 
          || acl.principal == Acl.WildCardPrincipal)  // ② 匹配用户(支持*)
      && (operations == acl.operation 
          || acl.operation == All)                    // ③ 匹配操作(支持All)
      && (acl.host == host 
          || acl.host == Acl.WildCardHost)            // ④ 匹配IP(支持*)
  }.map(_ => true).getOrElse(false)
}

关键判定原则

  • Deny优先:先检查Deny规则,一旦匹配立即拒绝,不管有没有Allow规则
  • 超级用户特权super.users配置的用户跳过所有ACL检查
  • Describe自动继承:有Read或Write权限自动拥有Describe权限
  • 通配符支持:Principal、Host、Operation都支持*All

六、kafka-acls脚本——管理员的操作入口

kafka-acls脚本的本质是调用AclCommand类,通过KafkaAdminClient向服务端发送ACL增删请求:

# 允许xiaoming读写Topic test
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --allow-principal User:xiaoming \
  --operation Read --operation Write --topic test

# 允许wanglei作为消费者使用Topic test
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --allow-principal User:wanglei --consumer --topic test

# 查看Topic test的所有ACL规则
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
  --list --topic test

# 删除某条规则
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
  --remove --allow-principal User:xiaoming --operation Read --topic test

常用参数一览:

参数 说明
--add 添加ACL规则
--remove 删除ACL规则
--list 查询ACL规则
--allow-principal 允许的主体(User:xxx)
--deny-principal 拒绝的主体
--allow-host 允许的IP
--deny-host 拒绝的IP
--operation 操作类型
--producer 快捷参数 = Write+Describe+Create
--consumer 快捷参数 = Read+Describe

七、完整权限检查链路——从客户端请求到authorize()

最后来看完整的权限检查调用链路:

【权限检查完整链路】

  KafkaProducer.send()                        KafkaConsumer.poll()
       │                                            │
       ▼                                            ▼
  ProduceRequest                             FetchRequest / OffsetCommitRequest
       │                                            │
       ▼                                            ▼
  ┌─────────────────────────────────────────────────────────┐
  │             网络层 → SocketServer                       │
  │             Processor → RequestChannel                  │
  │                                                        │
  │             processCompletedReceives():                 │
  │               从KafkaChannel获取身份信息                 │
  │               → 封装成 Session 对象                      │
  │               → 放入 RequestChannel                     │
  └────────────────────┬────────────────────────────────────┘
                       │
                       ▼
  ┌─────────────────────────────────────────────────────────┐
  │           Handler线程 → KafkaApis                       │
  │                                                        │
  │   handleProduceRequest(session, request):              │
  │     ① authorize(session, Write, Resource(Topic, xxx))  │
  │        ├─ SimpleAclAuthorizer.authorize()              │
  │        │   ├─ isSuperUser? → 直接通过                  │
  │        │   ├─ isEmptyAcl? → 默认策略                   │
  │        │   ├─ denyMatch? → 拒绝 ✗                     │
  │        │   └─ allowMatch? → 允许 ✓                    │
  │        │                                               │
  │     ② 如果authorize返回false                           │
  │         → TopicAuthorizationException                 │
  │         → 返回错误给客户端                              │
  │                                                        │
  │     ③ 如果authorize返回true                            │
  │         → 继续处理请求                                  │
  └─────────────────────────────────────────────────────────┘

本篇小结

这篇文章完整拆解了Kafka ACL权限控制的源码实现:

  • ACL五要素模型:Principal(用户)+ PermissionType(Allow/Deny)+ Operation(8种操作)+ Host(来源IP)+ Resource(Topic/Group/Cluster),每条规则都在这五个维度上做匹配
  • ZooKeeper存储/kafka-acl按资源类型分目录,每个资源的ACL规则以JSON格式存储,SimpleAclAuthorizer启动时通过loadCache()全量加载到aclCache内存缓存
  • 变更通知:kafka-acls脚本修改ACL时,同时在/kafka-acl-changes下创建持久顺序节点,ZkNodeChangeNotificationListener监听该路径变化,实时调用AclChangedNotificationHandler更新缓存
  • authorize()判定:Deny优先原则(先检查Deny),超级用户跳过所有检查,Describe权限自动继承Read/Write,支持通配符匹配
  • 链路整合:网络层从KafkaChannel获取身份→封装Session→Handler线程调用KafkaApis.authorize()→SimpleAclAuthorizer.authorize()做最终裁定

下一篇我们将进入Kafka监控体系——JMX指标、MetricsReporter、关键监控项,以及Prometheus+Grafana搭建生产级监控的完整方案。


上一篇【第57篇】Kafka身份认证源码解析——SASL/PLAIN认证是怎么实现的
下一篇【第59篇】Kafka监控体系解析——用JMX+Metrics打造生产级监控


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐