【Kafka源码解读和使用指南】第58篇:Kafka权限控制ACL源码解析——谁能读,谁能写,谁说了算
上一篇【第57篇】Kafka身份认证源码解析——SASL/PLAIN认证是怎么实现的
下一篇【第59篇】Kafka监控体系解析——用JMX+Metrics打造生产级监控
摘要
上一篇我们拆解了身份认证——服务端确认"你是谁"。认证通过后,下一个问题是"你能做什么"。这就是ACL权限控制要回答的。
Kafka的权限控制设计成插件形式,内置了SimpleAclAuthorizer实现,将ACL规则存储在ZooKeeper的/kafka-acl路径下,同时在内存中维护一份缓存(aclCache)加速查询。每个请求到达Handler线程处理时,都会先走authorize()方法检查权限。本文从权限模型(5要素)、ZooKeeper存储结构、内存缓存同步机制、匹配逻辑(Deny优先)到kafka-acls脚本的底层原理,完整拆解Kafka ACL的源码实现。
一、权限模型——ACL的五要素
在Kafka中,一条ACL规则由五个要素组成。一句话概括:某个主体(Principal)对某个资源(Resource)的某个操作(Operation)是被允许还是被拒绝(PermissionType)。
【ACL五要素】
┌──────────────────────────────────────────────────────────┐
│ ACL 规则 │
│ │
│ Principal PermissionType Operation Host │
│ (用户:xiaoming) Allow Read * │
│ │
│ Resource: Topic:test │
│ │
│ 含义:"用户xiaoming可以从任何主机读取Topic test的数据" │
└──────────────────────────────────────────────────────────┘
五个要素的详细说明:
| 要素 | 类型 | 可选值 | 说明 |
|---|---|---|---|
| Principal | KafkaPrincipal | User:xxx | 主体(即用户身份),从认证通过的Session中获取 |
| PermissionType | PermissionType | Allow / Deny | 允许还是拒绝 |
| Operation | Operation | Read/Write/Create/Delete/Alter/Describe/ClusterAction/All | 操作类型,共8种 |
| Host | String | IP地址或*(通配) | 限制客户端来源IP |
| Resource | Resource | Topic / Group / Cluster + 资源名 | 资源类型+资源名称 |
资源类型(ResourceType)有三种:
┌────────────────────────────────────────────────┐
│ ResourceType │
│ │
│ Cluster Topic Group │
│ (集群级别) (Topic级别) (消费者组级别) │
│ 例:Cluster 例:test 例:order-consumer│
└────────────────────────────────────────────────┘
Operation八种操作类型:
| 操作 | 含义 | 适用资源 |
|---|---|---|
| Read | 读取数据 | Topic |
| Write | 写入数据 | Topic |
| Create | 创建资源 | Cluster, Topic |
| Delete | 删除资源 | Topic, Group |
| Alter | 修改配置 | Topic, Cluster |
| Describe | 查看描述信息 | Topic, Group, Cluster |
| ClusterAction | 执行集群操作 | Cluster |
| All | 所有操作的集合 | 通配符 |
二、ZooKeeper中的ACL存储——/kafka-acl路径下的秘密
SimpleAclAuthorizer把所有ACL规则存储在ZooKeeper中,路径结构如下:
【ZooKeeper ACL存储结构】
/kafka-acl ← 根节点(由SimpleAclAuthorizer创建)
├── /Topic ← 资源类型:Topic
│ ├── /test ← 资源名
│ │ └── JSON: ← 该资源的ACL规则
│ │ {
│ │ "version": 1,
│ │ "acls": [
│ │ {"principal":"User:xiaoming","permissionType":"Allow",
│ │ "operation":"Read","host":"*"},
│ │ {"principal":"User:xiaoming","permissionType":"Allow",
│ │ "operation":"Write","host":"*"}
│ │ ]
│ │ }
│ └── /order-events
│ └── JSON: {...}
│
├── /Group ← 资源类型:Consumer Group
│ └── /order-consumer
│ └── JSON: {...}
│
└── /Cluster ← 资源类型:集群级别
└── /kafka-cluster
└── JSON: {...}
/kafka-acl-changes ← ACL变更通知节点
├── /acl_changes_00000000001 ← 持久顺序节点
└── /acl_changes_00000000002
ACL规则在内存中对应的数据结构:
// VersionedAcls —— 包装ACL集合和ZK版本号
private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
// Acl —— 单条权限规则
case class Acl(
principal: KafkaPrincipal, // 主体(User:xiaoming)
permissionType: PermissionType, // Allow 或 Deny
host: String, // 来源IP,*表示通配
operation: Operation // Read/Write/.../All
)
三、SimpleAclAuthorizer的初始化——从ZooKeeper加载ACL规则
Kafka启动时,如果配置了authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer,就会用反射创建SimpleAclAuthorizer并初始化:
// KafkaServer.scala - startup()
authorizer = Option(config.authorizerClassName)
.filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) // 反射创建
authZ.configure(config.originals()) // 配置初始化
authZ
}
SimpleAclAuthorizer.configure()做了四件事:
// SimpleAclAuthorizer.scala - configure()
override def configure(javaConfigs: util.Map[String, _]) {
val configs = javaConfigs.asScala
// ① 解析超级用户列表
superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty =>
str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
}.getOrElse(Set.empty[KafkaPrincipal])
// ② 读取"无规则时的默认行为"配置
shouldAllowEveryoneIfNoAclIsFound = configs.get(
SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp)
.exists(_.toString.toBoolean)
// ③ 确保ZK路径存在
zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
// ④ 从ZooKeeper加载ACL → 内存缓存
loadCache()
// ⑤ 注册ZK监听器,监听ACL变更
zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils,
SimpleAclAuthorizer.AclChangedZkPath,
SimpleAclAuthorizer.AclChangedPrefix,
AclChangedNotificationHandler)
aclChangeListener.init()
}
loadCache()方法——启动时全量加载ACL规则到内存:
private def loadCache() {
inWriteLock(lock) {
// 遍历 /kafka-acl 下的所有资源类型(Topic/Group/Cluster)
val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
// 遍历该资源类型下的所有具体资源
val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
// 读取JSON数据 → 解析为VersionedAcls
val versionedAcls = getAclsFromZk(
Resource(resourceType, resourceName.toString))
// 存入aclCache
updateCache(new Resource(resourceType, resourceName), versionedAcls)
}
}
}
}
四、ACL变更通知机制——管理员修改规则后如何实时生效
当你用kafka-acls脚本增删权限时,不仅会修改/kafka-acl下的JSON数据,还会在/kafka-acl-changes下创建一个持久顺序节点作为"变更通知"。SimpleAclAuthorizer通过ZkNodeChangeNotificationListener监听这些通知,实时更新内存缓存。
【ACL变更通知流程】
kafka-acls脚本 ZooKeeper SimpleAclAuthorizer
│ │ │
│ ① 修改 /kafka-acl/Topic/test│ │
│ ────────────────────────────►│ │
│ (写入新ACL JSON数据) │ │
│ │ │
│ ② 创建通知节点 │ │
│ /kafka-acl-changes/ │ │
│ acl_changes_0000000003 │ │
│ ────────────────────────────►│ │
│ │ ③ NodeChangeListener被触发 │
│ │ ────────────────────────────►│
│ │ │
│ │ ④ processNotifications()
│ │ 读取通知 → 重新加载该资源ACL
│ │ 更新 aclCache
▼ ▼ ▼
监听器注册代码:
// ZkNodeChangeNotificationListener.init()
def init() {
// 确保通知节点存在
zkUtils.makeSurePersistentPathExists(seqNodeRoot)
// 注册子节点变更监听
zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
// 注册ZK连接状态监听(重连后重新注册)
zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
// 处理已存在的未处理通知
processAllNotifications()
}
处理通知的核心逻辑:
private def processNotifications(notifications: Seq[String]) {
if (notifications.nonEmpty) {
val now = time.milliseconds
for (notification <- notifications) {
val changeId = changeNumber(notification) // 提取顺序编号
if (changeId > lastExecutedChange) { // 防止重复处理
val changeZnode = seqNodeRoot + "/" + notification
val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
// 调用NotificationHandler更新缓存
data.map(notificationHandler.processNotification(_))
}
lastExecutedChange = changeId
}
purgeObsoleteNotifications(now, notifications) // 删除过期节点(15分钟后)
}
}
AclChangedNotificationHandler重新加载对应资源的ACL:
object AclChangedNotificationHandler extends NotificationHandler {
override def processNotification(notificationMessage: String) {
val resource = Resource.fromString(notificationMessage)
inWriteLock(lock) {
val versionedAcls = getAclsFromZk(resource) // 重新读取ZK数据
updateCache(resource, versionedAcls) // 更新缓存
}
}
}
五、Authorize()——权限判定的"终审法官"
每次Handler线程处理客户端请求时,都会调用KafkaApis.authorize()检查权限:
// KafkaApis.scala
private def authorize(session: Session, operation: Operation,
resource: Resource): Boolean =
authorizer.map(_.authorize(session, operation, resource)).getOrElse(true)
SimpleAclAuthorizer.authorize()是权限判定的核心:
【authorize() 判定流程】
传入参数:
- session (包含用户身份principal + 来源IP)
- operation (Read/Write/Create/...)
- resource (Topic:test / Group:order-consumer / Cluster:kafka-cluster)
① 是超级用户(superUsers)吗? ──是──► 允许 ✓
│
否
▼
② 该资源无ACL规则吗? ──是──► shouldAllowEveryoneIfNoAclIsFound?
│ │
否 ├─ true → 允许 ✓
▼ └─ false → 拒绝 ✗
③ 存在 Deny 规则 且 匹配吗? ──是──► 拒绝 ✗
│ (Deny优先级高于Allow)
否
▼
④ 存在 Allow 规则 且 匹配吗? ──是──► 允许 ✓
│
否
▼
⑤ 无匹配规则 ──► 拒绝 ✗
源码实现(精简版):
// SimpleAclAuthorizer.scala - authorize()
override def authorize(session: Session, operation: Operation,
resource: Resource): Boolean = {
val principal = session.principal
val host = session.clientAddress.getHostAddress
// 获取该资源的ACL + 通配符资源的ACL
val acls = getAcls(resource) ++ getAcls(
new Resource(resource.resourceType, Resource.WildCardResource))
// 先检查Deny规则(Deny优先原则)
val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
// Describe权限的特殊处理:有Read或Write就自动给Describe
val ops = if (Describe == operation)
Set[Operation](operation, Read, Write)
else
Set[Operation](operation)
val allowMatch = ops.exists(operation =>
aclMatch(session, operation, resource, principal, host, Allow, acls))
// 三级判定:超级用户 || 无ACL策略 || (无Deny匹配 && 有Allow匹配)
val authorized = isSuperUser(operation, resource, principal, host) ||
isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
(!denyMatch && allowMatch)
authorized
}
aclMatch()匹配方法——支持通配符:
private def aclMatch(session: Session, operations: Operation, resource: Resource,
principal: KafkaPrincipal, host: String,
permissionType: PermissionType, acls: Set[Acl]): Boolean = {
acls.find { acl =>
acl.permissionType == permissionType // ① 匹配 Allow/Deny
&& (acl.principal == principal
|| acl.principal == Acl.WildCardPrincipal) // ② 匹配用户(支持*)
&& (operations == acl.operation
|| acl.operation == All) // ③ 匹配操作(支持All)
&& (acl.host == host
|| acl.host == Acl.WildCardHost) // ④ 匹配IP(支持*)
}.map(_ => true).getOrElse(false)
}
关键判定原则:
- Deny优先:先检查Deny规则,一旦匹配立即拒绝,不管有没有Allow规则
- 超级用户特权:
super.users配置的用户跳过所有ACL检查 - Describe自动继承:有Read或Write权限自动拥有Describe权限
- 通配符支持:Principal、Host、Operation都支持
*或All
六、kafka-acls脚本——管理员的操作入口
kafka-acls脚本的本质是调用AclCommand类,通过KafkaAdminClient向服务端发送ACL增删请求:
# 允许xiaoming读写Topic test
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:xiaoming \
--operation Read --operation Write --topic test
# 允许wanglei作为消费者使用Topic test
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:wanglei --consumer --topic test
# 查看Topic test的所有ACL规则
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--list --topic test
# 删除某条规则
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--remove --allow-principal User:xiaoming --operation Read --topic test
常用参数一览:
| 参数 | 说明 |
|---|---|
--add |
添加ACL规则 |
--remove |
删除ACL规则 |
--list |
查询ACL规则 |
--allow-principal |
允许的主体(User:xxx) |
--deny-principal |
拒绝的主体 |
--allow-host |
允许的IP |
--deny-host |
拒绝的IP |
--operation |
操作类型 |
--producer |
快捷参数 = Write+Describe+Create |
--consumer |
快捷参数 = Read+Describe |
七、完整权限检查链路——从客户端请求到authorize()
最后来看完整的权限检查调用链路:
【权限检查完整链路】
KafkaProducer.send() KafkaConsumer.poll()
│ │
▼ ▼
ProduceRequest FetchRequest / OffsetCommitRequest
│ │
▼ ▼
┌─────────────────────────────────────────────────────────┐
│ 网络层 → SocketServer │
│ Processor → RequestChannel │
│ │
│ processCompletedReceives(): │
│ 从KafkaChannel获取身份信息 │
│ → 封装成 Session 对象 │
│ → 放入 RequestChannel │
└────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Handler线程 → KafkaApis │
│ │
│ handleProduceRequest(session, request): │
│ ① authorize(session, Write, Resource(Topic, xxx)) │
│ ├─ SimpleAclAuthorizer.authorize() │
│ │ ├─ isSuperUser? → 直接通过 │
│ │ ├─ isEmptyAcl? → 默认策略 │
│ │ ├─ denyMatch? → 拒绝 ✗ │
│ │ └─ allowMatch? → 允许 ✓ │
│ │ │
│ ② 如果authorize返回false │
│ → TopicAuthorizationException │
│ → 返回错误给客户端 │
│ │
│ ③ 如果authorize返回true │
│ → 继续处理请求 │
└─────────────────────────────────────────────────────────┘
本篇小结
这篇文章完整拆解了Kafka ACL权限控制的源码实现:
- ACL五要素模型:Principal(用户)+ PermissionType(Allow/Deny)+ Operation(8种操作)+ Host(来源IP)+ Resource(Topic/Group/Cluster),每条规则都在这五个维度上做匹配
- ZooKeeper存储:
/kafka-acl按资源类型分目录,每个资源的ACL规则以JSON格式存储,SimpleAclAuthorizer启动时通过loadCache()全量加载到aclCache内存缓存 - 变更通知:kafka-acls脚本修改ACL时,同时在
/kafka-acl-changes下创建持久顺序节点,ZkNodeChangeNotificationListener监听该路径变化,实时调用AclChangedNotificationHandler更新缓存 - authorize()判定:Deny优先原则(先检查Deny),超级用户跳过所有检查,Describe权限自动继承Read/Write,支持通配符匹配
- 链路整合:网络层从KafkaChannel获取身份→封装Session→Handler线程调用KafkaApis.authorize()→SimpleAclAuthorizer.authorize()做最终裁定
下一篇我们将进入Kafka监控体系——JMX指标、MetricsReporter、关键监控项,以及Prometheus+Grafana搭建生产级监控的完整方案。
上一篇【第57篇】Kafka身份认证源码解析——SASL/PLAIN认证是怎么实现的
下一篇【第59篇】Kafka监控体系解析——用JMX+Metrics打造生产级监控
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)