KafkaACL使⽤实战
⾃0.9.0.0.版本引⼊Security之后,Kafka⼀直在完善security的功能。当前Kafka security主要包含3⼤功能:认证(authentication)、信道加密(encryption)和授权(authorization)。信道加密就是为client到broker、broker到broker以及⼯具脚本与broker之间的数据传输配置SSL;认证机制主要是指配置SASL,⽽授权是通过ACL接⼝命令来完成的。
⽣产环境中,⽤户若要使⽤SASL则必须配置Kerberos,但对于⼀些⼩公司⽽⾔,他们的⽤户系统并不复杂(特别是专门为Kafka集服务的⽤户可能不是很多),显然使⽤Kerberos有些⼤材⼩⽤,⽽且由于运⾏在内⽹环境,SSL加密也不是很必要。因此⼀个SASL+PLAINTEXT的集环境⾜以应付⼀般的使⽤场景。本⽂给出⼀个可运⾏的实例来演⽰⼀下如何在不使⽤Kerberos的情况下配置SASL + ACL来构建secured Kafka集。
在开始之前,我们简单学习下Kafka ACL的格式。根据官⽹的介绍,Kafka中⼀条ACL的格式如下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它们的含义描述如下:
principal:表⽰⼀个Kafka user
operation:表⽰⼀个具体的操作类型,如WRITE, READ, DESCRIBE等。完整的操作列表详见:
fluent.io/current/kafka/authorization.html#overview
Host:表⽰连向Kafka集的client的IP地址,如果是‘*’则表⽰所有IP。注意:当前Kafka不⽀持主机名,只能指定IP地址
Resource:表⽰⼀种Kafka资源类型。当前共有4种类型:TOPIC、CLUSTER、GROUP、TRANSACTIONID
下⾯我使⽤Kafka 0.11.0.0版本来演⽰下如何构建⽀持SASL + PLAINTEXT + ACL的Kafka集环境。
1. Broker端配置
要配置SASL和ACL,我们需要在broker端进⾏两个⽅⾯的设置。⾸先是创建包含所有认证⽤户信息的JAAS⽂件。本例中,我们假设有3个⽤户:admin, reader和writer,其中admin是管理员,reader⽤户读取Kafka集中topic数据,⽽writer⽤户则负责向Kafka集写⼊消息。我们假设这3个⽤户的密码分别与⽤户名相同(在实际场景中,管理员需要单独把密码发给各⾃的⽤户),因此我们可以这样编写JAAS⽂件:
KafkaServer {
  org.apache.kafkamon.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_reader="reader"
  user_writer="writer";
};
保存该⽂件为kafka_f(本例中的完整路径是/Users/huxi/SourceCode/newenv/kafka_f),之后我们需要把该⽂件的完整路径作为⼀个JVM参数传递给Kafka的启动脚本。不过由于bin/kafka-server-start.sh只接收server.properties的位置,不再接收其他任何参数,故我们需要修改该启动脚本。具体做法如下:
$ pwd
/Users/huxi/SourceCode/newenv/kafka_0.11
$ cp bin/kafka-server-start.sh bin/secured-kafka-server-start.sh
$ vi bin/secured-kafka-server-start.sh
把该⽂件中的这⾏:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改为下⾯这⾏,然后保存退出
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -
Djava.security.fig=/Users/huxi/SourceCode/newenv/kafka_f kafka.Kafka "$@"
做完上⾯的步骤后,我们就在bin/⽬录下新建了⼀个secured-kafka-server-start.sh启动脚本。
配置好JAAS⽂件后,我们开始修改broker启动所需的server.properties⽂件,你⾄少需要配置(或修改)以下这些参数:
# 配置ACL⼊⼝类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使⽤SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol= SASL_PLAINTEXT
# 设置本例中admin为超级⽤户
super.users=User:admin
Okay,现在我们可以启动broker了(当前肯定要先启动Zookeeper):
bin/secured-kafka-server-start.sh ../config_files/server.properties
.......
[2017-08-17 16:07:30,417] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
可见,Kafka broker已经成功启动了。不过当前该broker只会接收已认证client发来的请求。下⾯我们继续clients端的配置。
2. Client端配置
我们先来创建⼀个测试topic,名为test,单分区,副本因⼦是1。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".
咦?稍等下,我们不是启⽤ACL了吗?这⾥为什么会创建成功呢?这⾥我简要说明⼀下:⾸先我们启⽤ACL了吗?当然!由于我们没有显式地设置allow.acl.found,故这个参数默认是false,也就是说对于⽬前的这个Kafka集⽽⾔,除了超级⽤户之外的其他任何⽤户都⽆法执⾏任何操作。其次,那为什么创建topic成功了呢?这是因为当前kafka-topics.sh脚本直接连接Zookeeper,故不受ACL的限制。所以⽆论是否配置了security,⽤户总是可以使⽤kafka-topics来管理topic。
⾔归正传,本例中我们的⽬的是要测试:
⽤户writer向test topic写⼊消息
⽤户reader从test topic读取消息
下⾯我们先来启动⼀个console-consumer和⼀个console-producer来看下当前是个什么状况:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello, kafka
[2017-08-17 16:16:09,626] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,685] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,740] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,799] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
......
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[2017-08-17 16:16:47,936] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:47,992] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:48,052] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:48,163] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
......
看到了吧,当前的console producer/consumer都⽆法⼯作,报的错误就是⽆法连接broker(localhost:9092)。这就是因为我们设置了security的缘故。下⾯我们为writer⽤户配置安全,⾸先需要创建⼀个属于writer⽤户的JAAS⽂件,该⽂件中指定了writer⽤户的credentials,如下所⽰:
KafkaClient {
org.apache.kafkamon.security.plain.PlainLoginModule required
username="writer"
password="writer";
};
把上述内容保存到f⽂件(/Users/huxi/SourceCode/newenv/f)中。和broker类似,我们需要拷贝⼀份新的bin/kafka-console-producer.sh将该JAAS⽂件作为⼀个JVM参数传给console producer:
$ cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh
$ vi bin/writer-kafka-console-producer.sh
把该⽂件中的这⾏:
exec $(dirname $0)/kafka-run-class.ls.ConsoleProducer "$@"
修改为下⾯这⾏,然后保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.fig=/Users/huxi/SourceCode/newenv/f
然后,我们创建⼀个fig为该console producer指定以下两个属性:
security.protocol=SASL_PLAINTEXT
现在我们使⽤新的脚本来启动console producer:
$ bin/writer-kafka-console-producer.sh --broker-list localhost:9092 --topic test --fig
>hello, kafka
[2017-08-17 16:28:13,930] WARN Error while fetching metadata with correlation id 1 : {test=UNKNO
WN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,036] WARN Error while fetching metadata with correlation id 3 : {test=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,143] WARN Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,246] WARN Error while fetching metadata with correlation id 5 : {test=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
依然有错误,不过错误变成“⽆法获取元数据”了。这说明我们运⾏的console producer通过了认证,但是没有通过授权,因此我们需要配置ACL来让writer⽤户有权限写⼊topic:
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=localhost:2181 --add --allow-principal User:writer --operation Write --topic test
再试producer:
$ bin/writer-kafka-console-producer.sh --broker-list localhost:9092 --topic test --fig
为什么使用bootstrap?
>hello
>hello, kafka
>message abc
......
producer⽣产消息成功了。等等!⼦⽈:没有消费之前永远不要断⾔⽣产成功了!下⾯我们就来配置下consumer,即⽤户reader。
和writer⽤户类似,我们⾸先创建reader⽤户的JAAS⽂件,保存在/Users/huxi/SourceCode/newenv/kafka_0.11/f中:KafkaClient {
org.apache.kafkamon.security.plain.PlainLoginModule required
username="reader"
password="reader";
};
然后拷贝⼀份新的console consumer来指定上⾯的f:
$ cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh
$ vi bin/reader-kafka-console-consumer.sh
把该⽂件中的这⾏:
exec $(dirname $0)/kafka-run-class.ls.ConsoleConsumer "$@"
修改为下⾯这⾏,然后保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.fig=/Users/huxi/SourceCode/newenv/f
然后,我们创建⼀个fig为该console producer指定以下3个属性:
security.protocol=SASL_PLAINTEXT
group.id=test-group
现在运⾏console consumer:
$ bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --fig
[2017-08-17 16:37:54,124] WARN Error while fetching metadata with correlation id 2 : {test=UNKNOWN_TOPIC_OR_PARTITION}
(org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:37:54,127] ERROR Unknown error when running consumer: (ls.ConsoleConsumer$)
org.s.GroupAuthorizationException: Not authorized to access group: test-group
可以看到这次出现了两个错误:第⼀个问题依然是⽆法获取元数据——这表明reader⽤户通过了认证但没有通过授权;第⼆个问题表明reader⽤户⽆权访问consumer group——这同样是授权的问题。我们需要设置ACL来解决它们。⾸先,我们为reader⽤户设置test topic的读权限:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=localhost:2181 --add --allow-principal User:reader --operation Read --topic test
然后设置访问group的权限:
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --t=localhost:2181 --add --
allow-principal User:reader --operation Read --group test-group
做完这些之后,我们重新运⾏console consumer程序:
$ bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --fig
hello
hello, kafka
message abc
.......
可以看到,这次reader⽤户成功地读取了writer⽤户⽣产的消息。这证明了writer和reader⽤户都可以正常地⼯作了。
最后总结⼀下,这种⽅案适⽤于⽤户数很少但⼜必须启⽤安全的Kafka集,不过此⽅案⽐较⿇烦的地⽅在于需要为每个脚本都重新定制,加上-Djava.security.fig参数以识别JAAS⽂件,不如Kerberos来得简单。另外⽤户完全可以根据官⽹的教程配置SSL,然后很轻易地把本⽂中的例⼦改成SASL_SSL。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。