Kafka 使用 JAAS(Java Authentication and Authorization Service)来实现 SASL 认证授权。我们这里通过使用 Kafka 提供的 SCRAM 方式实现动态认证授权的配置,动态认证授权的好处非常直接,不用根据实际需求修改各种配置文件添加账号及授权,可以像操作数据库服务器一样,通过命令行就能实现认证授权的管理。
准备工作
操作系统:
我们使用 CentOS 系统,系统版本是 CentOS-7.9。
我们使用默认的单机安装方式,ZooKeeper 是内置在 Kakfa 安装包里面的,所以不需要单独下载。如果希望单独安装,可以自行准备。
为了方便操作,保证使用 ZooKeeper 的可访问性,这里直接把防火墙关闭掉:
systemctl stop firewalld
下载软件:
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
解压缩包:
tar xzvf jdk-17_linux-x64_bin.tar.gz tar kafka_2.13-3.3.1.tgz cd kafka_2.13-3.3.1
JDK 配置这里就不累述了。
SCRAM 动态认证授权配置
基于 SCRAM 的 Kafka 认证授权配置还是比较复杂一些,我们可以把这个配置过程分为两个阶段,一个是认证配置,一个是授权配置,区分这样两个阶段有助于我们实践过程中更加清晰地知道在做哪个环节的事情。
下面是 SCRAM 整个认证配置的过程:
1 创建 SCRAM 凭证
在没有修改任何 Kafka 配置的情况下,直接启动 ZooKeeper 和 Kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties &
我们可以单独开两个命令行终端,方便配置过程中,观察服务输出的日志信息,方便排查问题。
首先创建 3 个常用的账号,分别用于 Kafka Broker 之间进行通讯认证、生产者与 Kafka 服务之间进行通讯认证、消费者与 Kafka 服务之间进行通讯认证。
(1)创建broker建通信账号:admin
bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --alter --add-config 'SCRAM-SHA-256=[password=admin-sec]' --entity-type users --entity-name admin
(2)创建生产用户:bjprod
bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=bjprod-sec]' --entity-type users --entity-name bjprod
(3)创建消费用户:bjcons
bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=bjcons-sec]' --entity-type users --entity-name bjcons
如果上述操作没有出现问题,可以通过如下命令查看账号信息:
bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --describe --entity-type users --entity-name admin bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --describe --entity-type users --entity-name bjprod bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --describe --entity-type users --entity-name bjcons
2 服务端配置
修改 Kafka 配置文件 config/server.properties,添加内容如下:
listeners=SASL_PLAINTEXT://172.17.42.34:9092 security.protocol=SASL_PLAINTEXT security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 advertised.listeners=SASL_PLAINTEXT://172.17.42.34:9092 allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.authorizer.AclAuthorizer
上面配置中分为两块,上面一块是和认证相关的配置,如认证协议、服务器主机、端口等;下面一块是和授权相关的配置,包括授权开关、超级用户列表、授权机制框架。
服务器端还包含一个特殊 SCRAM 凭证,就是管理员操作凭证的 JAAS 配置,也就是与 Kafka Broker 交互的认证配置。创建 config/kafka_server_jaas.conf 配置文件,内容如下:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; };
该服务器端配置文件对应的绝对路径为:/home/jensen/kafka_2.13-3.3.1/config/kafka_server_jaas.conf。
将配置文件的位置作为 JVM 参数传递给每个 Kafka Broker,需要修改脚本文件 bin/kafka-server-start.sh,将最后一行命令中传递我们上面创建的 JAAS 配置文件 kafka_server_jaas.conf,内容如下:
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/jensen/kafka_2.13-3.3.1/config/kafka_server_jaas.conf kafka.Kafka "$@"
最后,需要重新启动 Kafka 服务才能生效。
后面我们会使用到的一些管理命令都会与 Kafka Broker 交互进行认证,这些命令包括:
bin/kafka-topics.sh bin/kafka-acls.sh bin/kafka-configs.sh
3 客户端配置
创建客户端认证 JAAS 配置文件 config/kafka_client_admin_jaas.conf,内容如下:
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-sec"; };
该客户端配置文件对应的绝对路径为:/home/jensen/kafka_2.13-3.3.1/config/kafka_client_admin_jaas.conf。
这个客户端 JAAS 配置文件非常重要,所有需要以管理员身份与 Kafka Broker 交互的,都需要在执行的脚本/命令中传递这个 JAAS 配置来完成认证,比如 Topic 管理、ACL 管理、配置管理等等。
另外,因为我们在 Kafka Server 的配置文件 config/server.properties 中配置了一些安全相关的协议,所以想与 Kafka Broker 交互还需要通过命令行传递这些协议相关的信息,我们把这些信息保存在一个固定的配置文件 config/adminclient-config.properties 中,该文件内容如下:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256
这样就能够方便执行命令时,将认证的协议信息直接传递进去。
- Topic 管理配置
下面,以管理 Topic 的脚本 bin/kafka-topics.sh 进行配置操作说明,后面我们也会用它来创建 Topic。
修改 bin/kafka-topics.sh 脚本,将最后一行新增我们创建的客户端 JAAS 配置文件 kafka_client_admin_jaas.conf,如下所示:
#exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/home/jensen/kafka_2.13-3.3.1/config/kafka_client_admin_jaas.conf kafka.admin.TopicCommand "$@"
这样,我们就可以通过如下方式来查询和创建 Topic:
bin/kafka-topics.sh --list --bootstrap-server 172.17.42.34:9092 --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties bin/kafka-topics.sh --create --bootstrap-server 172.17.42.34:9092 --topic mytestusers --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties
- ACL 管理配置
我们使用 Kafka 的时候,需要对使用的主体进行授权配置,当然主要是生产者(Producer)和消费者(Consumer),那么就需要对 bin/kafka-acls.sh 进行修改,将最后一行新增我们创建的客户端 JAAS 配置文件 kafka_client_admin_jaas.conf,如下所示:
#exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/home/jensen/kafka_2.13-3.3.1/config/kafka_client_admin_jaas.conf kafka.admin.AclCommand "$@"
可以通过下面的名命令行来验证,修改的脚本配置的正确性:
bin/kafka-acls.sh --bootstrap-server 172.17.42.34:9092 --list --topic mytestusers --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties
为了方便验证,后面我们使用 Kafka 自带的生产者和消费者控制台脚本工具:bin/kafka-console-producer.sh、bin/kafka-console-consumer.sh。
4 生产者配置
创建生产者(Producer)的 JAAS 配置文件 config/kafka_client_producer_jaas.conf,内容如下:
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="bjprod" password="bjprod-sec"; };
这里配置的 username 的值要与前面我们使用 bin/kafka-configs.sh 创建的生产者账号的用户名 bjprod 相一致。
然后,需要修改 bin/kafka-console-producer.sh 脚本内容,将生产者的 JAAS 配置文件传递进去,内容如下所示:
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/home/jensen/kafka_2.13-3.3.1/config/kafka_client_producer_jaas.conf kafka.tools.ConsoleProducer "$@"
最后,需要修改 config/producer.properties 配置文件,内容如下:
bootstrap.servers=172.17.42.34:9092 security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256
到现在为止,生产者(Producer)的认证配置都已经完成,下面需要对生产者 bjprod 进行操作授权,执行如下授权命令:
bin/kafka-acls.sh --bootstrap-server 172.17.42.34:9092 --add --allow-principal User:bjprod --allow-host 172.17.42.34 --operation Write --topic mytestusers --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties
可以看到控制台显示授权成功的消息:
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=mytestusers, patternType=LITERAL)`: (principal=User:bjprod, host=172.17.42.34, operation=WRITE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=mytestusers, patternType=LITERAL)`: (principal=User:bjcons, host=172.17.42.34, operation=READ, permissionType=ALLOW) (principal=User:admin, host=172.17.42.34, operation=CREATE, permissionType=ALLOW) (principal=User:bjprod, host=172.17.42.34, operation=WRITE, permissionType=ALLOW)
现在,我们就可以运行 bin/kafka-console-producer.sh 来向 mytestusers 这个 Topic 写消息:
bin/kafka-console-producer.sh --bootstrap-server 172.17.42.34:9092 --topic mytestusers --producer.config config/producer.properties
5 消费者配置
创建消费者(Consumer)的 JAAS 配置文件 config/kafka_client_consumer_jaas.conf,内容如下:
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="bjcons" password="bjcons-sec"; };
这里配置的 username 的值要与前面我们使用 bin/kafka-configs.sh 创建的消费者账号的用户名 bjcons 相一致。
然后,需要修改 bin/kafka-console-consumer.sh 脚本内容,将生产者的 JAAS 配置文件传递进去,内容如下所示:
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/home/jensen/kafka_2.13-3.3.1/config/config/kafka_client_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
最后,需要修改 config/consumer.properties 配置文件,内容如下:
bootstrap.servers=172.17.42.34:9092 security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256
到现在为止,消费者(Consumer)的认证配置都已经完成,下面需要对消费者 bjcons 进行操作授权,执行如下授权命令:
bin/kafka-acls.sh --bootstrap-server 172.17.42.34:9092 --add --allow-principal User:bjcons --allow-host 172.17.42.34 --operation Read --group bjcons-group --topic mytestusers --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties
可以看到授权成功的消息:
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=mytestusers, patternType=LITERAL)`: (principal=User:bjcons, host=172.17.42.34, operation=READ, permissionType=ALLOW) Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=bjcons-group, patternType=LITERAL)`: (principal=User:bjcons, host=172.17.42.34, operation=READ, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=mytestusers, patternType=LITERAL)`: (principal=User:bjcons, host=172.17.42.34, operation=READ, permissionType=ALLOW) (principal=User:admin, host=172.17.42.34, operation=CREATE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bjcons-group, patternType=LITERAL)`: (principal=User:bjcons, host=172.17.42.34, operation=READ, permissionType=ALLOW)
给消费者授权,这里指定了 Read 的读操作权限,还有 Consumer Group 的 ID,只有 group.id 为 bjcons-group 的消费者才能消费 mytestusers 这个 Topic 的消息,否则都没有权限去消费。
另外,也可以在 config/consumer.properties 中配置 group.id 指定消费组,不过这样没有直接通过命令操作进行授权灵活一些。通过上面授权脚本进行 Consumer Group 的授权,会覆盖掉 config/consumer.properties 中配置指定的 group.id。
现在,我们就可以运行 bin/kafka-console-consumer.sh 来消费 mytestusers 这个 Topic 的消息内容,执行命令:
bin/kafka-console-consumer.sh --bootstrap-server 172.17.42.34:9092 --topic mytestusers --from-beginning --consumer.config config/consumer.properties
6 动态认证授权管理
前面我们只是创建了生产者 bjprod 和消费者 bjcons,在实际应用中,我们还会按需创建更多的账号并授权,就像数据库管理一样。Kafka 的动态认证授权机制 SCRAM 也能实现类似的灵活认证授权的管理。
现在,我们还需要对 bin/kafka-configs.sh 进行修改,才能灵活创建 SCRAM 认证凭据。和修改管理员操作的脚本 bin/kafka-topics.sh、bin/kafka-acls.sh 类似,也需要修改 bin/kafka-configs.sh 脚本:
#exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand "$@" exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/home/jensen/kafka_2.13-3.3.1/config/kafka_client_admin_jaas.conf kafka.admin.ConfigCommand "$@"
这样就能和前面一样创建生产者、消费者的访问账号,示例如下:
bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=szprod-sec]' --entity-type users --entity-name szprod --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties bin/kafka-configs.sh --bootstrap-server 172.17.42.34:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=cdcons-sec]' --entity-type users --entity-name cdcons --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties
然后使用 bin/kafka-acls.sh 进行授权,写、读 Topic 授权示例如下:
bin/kafka-acls.sh --bootstrap-server 172.17.42.34:9092 --add --allow-principal User:szprod --allow-host 172.17.42.34 --operation Write --topic mytestusers --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties bin/kafka-acls.sh --bootstrap-server 172.17.42.34:9092 --add --allow-principal User:cdcons --allow-host 172.17.42.34 --operation Read --group cdcons-group --topic mytestusers --command-config /home/jensen/kafka_2.13-3.3.1/config/adminclient-config.properties
至此为止,SCRAM 动态认证和授权的配置已经完成。
如果有任何问题可以随时与我沟通交流。如果对配置内容有更多方面的需求,可以查看官网文档说明。
参考资源
- https://kafka.apache.org/33/documentation.html#security
- https://www.cnblogs.com/rexcheny/articles/12884990.html
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
赞一个,前几年搞过kafka2.x版本的鉴权,几年有这方面需求,发现很多api变了,2.x版本的鉴权配置在3.x版本上不适用了, 网上很多都是旧版本的资料,只有这个很详细,好使