Kafka认证权限配置(动态添加⽤户)
  之前写过⼀篇,⾥⾯演⽰了如何配置SASL PLAIN + ACL来为Kafka集提供认证/权限安全保障,但有⼀个问题经常被问到:这种⽅案下是否⽀持动态增加/移除认证⽤户
——这⾥给出明确的答案:不可以!因为所有认证⽤户信息全部配置在静态的jaas⽂件中,故⽆法在不重启broker的情况下实现⽤户增减。这⼀次我以Kafka 2.1.0版本为例演⽰
配置SASL SCRAM + ACL来实现如何动态增减⽤户,另外也想完善和优化上⼀篇中的⼀些不⾜之处(⽐如说不⽤再修改初始的.sh脚本,改⽤环境变量的⽅式来使设置⽣效)。
1. 环境准备
Kafka服务器:⼀台云主机,4 core,8GB RAM,1Gbps带宽
Kafka客户端:另⼀台云主机
客户端与服务器通过内⽹交互
2. 集拓扑
启动两台Kafka服务器,由于我只在⼀台云主机上演⽰,故上⾯启动两个broker实例。客户端这边使⽤console-producer和console-consumer脚本来模拟客户端程序。
3. 创建⽤户
  我们使⽤kafka-configs.sh来创建⽤户,Kafka的SCRAM实现机制会把⽤户认证信息保存在Zookeeper中。假设我要创建3个⽤户admin, writer, reader分别⽤于实现Kafka
broker间通讯、⽣产消息和消费消息。下⾯我们开始具体的配置:⾸先启动Zookeeper,但不要启动Kafka broker,ZK启动成功后执⾏以下命令去创建3个⽤户:
创建writer⽤户,密码是writer-pwd:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer-pwd],SCRAM-SHA-512=
[password=writer-pwd]' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.
创建reader⽤户,密码是reader-pwd:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader-pwd],SCRAM-SHA-512=[password=reader-pwd]' --
entity-type users --entity-name reader
Completed Updating config for entity: user-principal 'reader'.
创建admin⽤户,密码是admin:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type
users --entity-name admin
Completed Updating config for entity: user-principal 'admin'.
3个测试⽤户都创建好了,下⾯我们使⽤kafka-configs.sh查看⼀下writer⽤户的信息:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --describe --entity-type users --entity-name writer
Configs for user-principal 'writer' are SCRAM-SHA-
512=salt=dTlvNzl4Y3BvZ3BuMmx5ODY0aWlzN2RsZg==,stored_key=Yc02SwxDkAKDQH01W98bkJLJcVO24q9vR5tS0nWaq5Jg2Z7DtzwrOt6J2Cr8Oib+dHq7TUIeG+NLiCAMnRlfVg==,server_ SHA-
256=salt=Y2dpcnB4aTU5NWNwMDZjNmFvbHluMWJpOQ==,stored_key=GGMhtO1PhxZFpEHOaDiqA4AM16Ma19nky1UV/gFoC1s=,server_key=L0R1xkcULaWcGMu6TdtWi5mf5lu1VTS8imW ⾥⾯包含了writer⽤户加密算法SCRAM-SHA-256以及SCRAM-SHA-512对应的盐值(salt)、ServerKey和StoreKey等,总之都是SCRAM机制的术语了。
4. Broker端配置
  和SASL PLAIN⼀样,我们依然需要为每个broker创建⼀个对应的jaas⽂件。注:由于本例中我的两个broker实例都是在同⼀台云主机上启动的,故我只创建⼀份jaas⽂件即
可。实际使⽤中需要为每台单独的物理broker机器创建⼀份jaas⽂件。
KafkaServer {
org.apache.kafkamon.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
将上⾯内容保存成f⽂件。注意末尾的两个分号,另外不要任何空⽩键。这⾥配置admin⽤户⽤于实现broker间的通讯。接下来是配置broker端的
server.properties,配置项如下:
# 启⽤ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置本例中admin为超级⽤户
super.users=User:admin
# 启⽤SCRAM机制,采⽤SCRAM-SHA-512算法
# 为broker间通讯开启SCRAM机制,采⽤SCRAM-SHA-512算法
# broker间通讯使⽤PLAINTEXT,本例中不演⽰SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使⽤SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://172.21.0.9:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://172.21.0.9:9092
  另⼀台broker的配置和它基本类似,只是要使⽤不同的端⼝(⽐如9093)、broker.id和log.dirs。现在分别启动两个broker实例,如果⼀切配置正常,这两个broker实例应该kafka命令
能够正常启动——注意引⼊jaas⽂件的⽅式,将-Djava.security.fig作为KAKFA_OPTS环境变量的⽅式进⾏设置。
$ KAFKA_OPTS=-Djava.security.fig=/xfs/bigdata/kafka_2.12-2.1.f bin/kafka-server-start.sh /xfs/bigdata/kafka_2.12-2.1.0/config/server1.properties ......
[2019-02-05 17:12:08,365] INFO Kafka version : 2.1.0 (org.apache.kafkamon.utils.AppInfoParser)
[2019-01-05 17:12:08,365] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafkamon.utils.AppInfoParser)
[2019-02-05 17:12:08,367] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
$ KAFKA_OPTS=-Djava.security.fig=/xfs/bigdata/kafka_2.12-2.1.f bin/kafka-server-start.sh /xfs/bigdata/kafka_2.12-2.1.0/config/server2.properties ......
[2019-02-05 17:22:12,970] INFO Kafka version : 2.1.0 (org.apache.kafkamon.utils.AppInfoParser)
[2019-02-05 17:22:12,970] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafkamon.utils.AppInfoParser)
[2019-02-05 17:22:12,971] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
现在创建测试topic,本例只创建⼀个单分区单副本的topic即可:
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.9:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".
5. Client端配置
  Okay,⼀切准备就绪了。我们先来使⽤console-producer脚本来尝试发送消息:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test
>hello, world
[2019-02-05 18:17:19,005] ERROR Error when sending message to topic test with key: null, value: 12 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.s.TimeoutException: Failed to update metadata after 60000 ms.
消息发送失败了,原因是没有指定合法的认证⽤户,现在我改⽤writer⽤户发送——为此我需要创建⼀个名为f的配置⽂件给producer程序使⽤,其内容如下:security.protocol=SASL_PLAINTEXT
fig=org.apache.kafkamon.security.scram.ScramLoginModule required username="writer" password="writer-pwd";
之后运⾏console-producer脚本:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --fig /opt/data/kafka_2.12-2.1.f
>hello
[2019-02-05 18:25:40,272] WARN [Producer clientId=console-producer] Bootstrap broker 172.21.0.9:9092 (id: -1 rack: null) disconnected
(org.apache.kafka.clients.NetworkClient)
异常发⽣变化了,现在报的是“⽆法创建连接”的错误,这是因为writer⽤户没有对test topic的写权限所致,故需要给writer⽤户增加该topic的写权限:$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=172.21.0.9:2181 --add --allow-principal User:writer --operation Write --topic test
Adding ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *
再次执⾏console-producer脚本:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --fig /opt/data/kafka_2.12-2.1.f
>hello
>Kafka
发送成功!
下⾯是配置consumer程序,和producer⼀样,为reader⽤户创建f,同时设置对topic的读权限:
security.protocol=SASL_PLAINTEXT
fig=org.apache.kafkamon.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=172.21.0.9:2181 --add --allow-principal User:reader --operation Read --topic test
Adding ACLs for resource `Topic:LITERAL:test`:
User:reader has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *
User:reader has Allow permission for operations: Read from hosts: *
执⾏console-consumer脚本:
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.9:9092,172.21.0.9:9093 --topic test --fro
m-beginning --fig /opt/data/kafka2-
2.1.f --group test-group
[2019-02-05 18:55:57,272] ERROR Error processing message, terminating consumer process: (ls.ConsoleConsumer$)
org.s.GroupAuthorizationException: Not authorized to access group: test-group
报错提⽰reader⽤户没有访问consumer group的权限,加之:
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=172.21.0.9:2181 --add --allow-principal User:reader --operation Read --group test-group
Adding ACLs for resource `Group:LITERAL:test-group`:
User:reader has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Group:LITERAL:test-group`:
User:reader has Allow permission for operations: Read from hosts: *
再次执⾏console-consumer脚本:
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.9:9092,172.21.0.9:9093 --topic test --from-beginning --fig /opt/data/kafka_2.12-
2.1.f --group test-group
hello
Kafka
6. 动态增加/删除⽤户
现在我们在不重启broker的情况下增加新⽤户writer1和reader1,分别为它们赋予test topic的写权限和读权限:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer1-pwd],SCRAM-SHA-512=
[password=writer1-pwd]' --entity-type users --entity-name writer1
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader1-pwd],SCRAM-SHA-512=[password=reader1-pwd]' --entity-type users --entity-name reader1
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=172.21.0.9:2181 --add --allow-principal User:writer1 --operation Write --topic test
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=172.21.0.9:2181 --add --allow-principal User:reader1 --operation Read --topic test
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=172.21.0.9:2181 --add --allow-principal User:reader1 --operation Read --group test-group1
同时删除原来的⽤户writer:
bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer
bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
现在检验writer⽤户不能写⼊消息:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --fig /opt/data/kafka_2.12-2.1.f >hello by writer
[2019-02-06 09:30:54,492] ERROR [Producer clientId=console-producer] Connection to node -2 (172.21.0.9/172.21.0.9:9093) failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
[2019-02-06 09:30:54,492] ERROR Error when sending message to topic test with key: null, value: 15 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.s.SaslAuthenticationException: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 [2019-02-06 09:30:54,493] ERROR [Producer clientId=console-producer] Connection to node -1 (172.21.0.9/172.21.0.9:9092) failed authentication d
ue to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
最后修改f中的writer为writer1,验证writer1⽤户有权限⽣产消息:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --fig /opt/data/kafka_2.12-2.1.f >hello by writer1
>successful
>
⾄此,⼀个⽀持动态增加/删除⽤户的Kafka安全配置就做好了。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。