Kafka管理【主题、分区、消费者组】
主题操作
使⽤ kafka-topics.sh ⼯具可以执⾏主题的⼤部分操作。可以⽤它创建、修改、删除和查看集⾥的主题。要使⽤该⼯具的全部功能,需要通过 --zookeeper参数提供 Zookeeper的连接字符串。
kafka 的⼤部分命令⾏⼯具直接操作 Zookeeper 上的元数据,并不会连接到 broker上。因此,要确保所使⽤⼯具版本与集⾥的broker版本相匹配。直接使⽤集 broker⾃带的⼯具是最保险的。
创建主题
在集中创建⼀个主题需要3个参数:主题名字(可以包含字母、数字、下划线以及英⽂状态下的破折号和句号),复制系数(主题的副本数量),分区(主题的分区数量)
主题命名的开头部分包含两个下划线是合法的,但不建议这么做。具有这种格式的主题⼀般是集的内部主题(⽐如
__consumer_offsets 主题⽤于保存消费者组的偏移量)。也不建议在单个集⾥使⽤英⽂状态下的句号和下划线来命名,因为主题的名字会被在度量指标上,句号会被替换成下划线(⽐如“topic.1会变成topic_1”)
如下命令会创建⼀个主题,主题的名字为指定的值,并包含了指定数量的分区。集会为这个分区创建指定数量的副本。如果为集指定了基于机架信息的分配策略,可以指定参数 --disable-rack-aware
kafka-topic.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer>
⽰例:使⽤以下命令创建⼀个叫做 my-topic 主题,主要包括⼋个分区,每个分区拥有两个副本。
kafka-topic.sh --zookeeper hadoop1:2181 --create --topic my-topic --replication-factor 2 --partitions 8
忽略重复创建主题的错误:在⾃动化系统⾥调⽤这个脚本时,可以使⽤ --if-not-exists参数,这样即使主题已经存在,也不会抛出重复创建主题的错误。
增加分区
有时我们需要为主题增加分区数量,主题基于分区进⾏伸缩和复制,增加分区主要是为了扩展主题容量和降低单个分区的吞吐量。如果要在单个消费者组内运⾏更多的消费者,那么分区数量也需要响应增加,因为⼀个分区只能由组⾥的⼀个消费者读取。
调整基于键的主题:从消费者⾓度来看,为基于键的主题添加分区是很困难。因为如果改变了分区的数
量,键到分区之间的映射也会发⽣改变。所以,基于键的主题来说,建议⼀开始就设置好分区数量,避免以后对其进⾏调整。
⽰例:将 my-topic 主题的分区增加到16个。
kafka-topic.sh --zookeeper hadoop1:2181 --alter --topic my-topic --partitions 16
减少分区数量:我们⽆法减少主题的分区数量。因为如果删除了分区,分区⾥的数据也⼀并被删除,导致数据不⼀致。我们也⽆法将这些数据分配给其他分区,因为这样做很难,⽽且会出现消息乱序。所以,如果⼀定要减少分区数量,只能删除整个topic,然后重新创建它。
删除主题
如果⼀个主题不再被使⽤,只要它还在集中,就会占⽤⼀定数量的磁盘空间和⽂件句柄。把它删除就可以释放占⽤的资源。为了能够删除主题,broker 的 able 参数必须设置为 true。如果为 false,删除主题的请求会被忽略。删除主题会丢弃主题⾥的所有数据。这是⼀个不可逆的操作,执⾏时需要⼗分消息。
⽰例:删除 my-topic 主题。
kafka-topic.sh --zookeeper hadoop1:2181 --delete --topic my-topic
列出集⾥的所有主题
可以使⽤主题⼯具列出集⾥的所有主题。每个主题占⽤⼀⾏输出,主题之间没有特定的顺序。
⽰例:列出集⾥的所有主题
kafka-topic.sh --zookeeper hadoop1:2181 --list
主题⼯具还能⽤来获取主题的详细信息。信息⾥包含了分区数量、主题的覆盖配置以及每个分区的副本清单。如果通过 --topic参数指定特定的主题,就可以只列出指定主题的详细信息。⽰例:列出集⾥所有主题的详细信息
kafka-topic.sh --zookeeper hadoop1:2181 --describe
describe 命令还提供了⼀些参数,⽤于过滤输出结果,这个在诊断集问题时会很有⽤。不要为这些参数指定 --topic参数。这些参数也⽆法与 list命令⼀起使⽤。使⽤ --topics-with-overrides 参数可以出所有包含覆盖配置的主题,它只会列出包含了与集不⼀样配置的主题。
有两个参数可⽤于出问题的分区。使⽤ --under-replicated-partitions 参数可以列出所有包含不同步副本的分区。使⽤ --unavaliable-partitions 参数可以列出所有没有⾸领的分区,这些分区已经处理离线状态,对于⽣产者和消费者来说是不可⽤的。
⽰例:列出包含不同步副本的分区
kafka-topic.sh --zookeeper hadoop1:2181 --describe --under-replicated-partitions
消费者组
kafka中有两个地⽅保存着消费者组的消息,旧版本的消费者来说,它的信息保存在 zk上。对于新版本的消费者来说,它的信息保存在broker上。kafka-consumer-group.sh ⼯具可以⽤于列出上述两种消费者组。也可以⽤于删除消费者组和偏移量信息,不过这个功能仅限于旧版本的消费者组(信息保存在zk上)。在对旧版本的消费者组进⾏操作时,需要通过 --zookeeper 参数指定 Zookeeper的地址;在对新版本的消费者组进⾏操作时,则需要使⽤ --bootstrap-server参数指定 broker的主机名和端⼝。
列出并描述组
旧版本的消费者组,使⽤ --zookeeper 和 --list参数列出消费者组;⽰例:列出旧版本的消费者组
kafka-consumer-group.sh --zookeeper hadoop1:2181 --list
新版本的消费者组,使⽤ --bootstrap-server、--list 和 --new-consumer 参数。
kafka-consumer-group.sh --new-consumer --bootstrap-server hadoop1:9092 --list
对于任意组来说,使⽤ --describe 代替 --list,并通过 --group 指定特定的组,就可以获取该组的详细信息。它会列出组⾥所有主题的信息和分区的偏移量。⽰例:
kafka-consumer-group.sh --new-consumer --bootstrap-server hadoop1:9092 --describe --group myGroup
输出字段解释:
字段描述
GROUP消费者组的名字
TOPIC正在读取的主题的名字
PARTITION正在读取的分区 ID
CURRENT-OFFSET消费者组最近提交的偏移量,也就是消费者在分区⾥读取的当前位置
LOG-END-OFFSET当前⾼⽔位偏移量,也就是最近⼀个被读取消息的偏移量,同时也是最近⼀个被提
交到集的偏移量
LAG消费者的 CURRENT-OFFSET 和 broker 的 LOG-END-OFFSET之间的差距
OWNER消费者组⾥正在读取该分区的消费者。这是⼀个消费者的ID,不⼀定包含消费者的主机名
删除组
只有旧版本⽀持删除组操作。删除组将在 zk上移除整个组,包括所有已保存的偏移量。在执⾏该操作之前,必须关闭所有的消费者。如果不关闭,可能会导致消费者出现不可预期的⾏为,因为组的元数据已经从 Zookeeper 上移除了。⽰例:删除消费者组 testgroup
kafka-consumer-group.sh --zookeeper hadoop1:2181 --delete --group testgroup
该命令也可以⽤于删除单个主题的偏移量。再次强调,在进⾏删除操作之前,需要关闭消费者,或者不要让他们读取即将被删除的主题。⽰例:从消费者组 testgroup ⾥删除 my-topic 主题的偏移量。
kafka-consumer-group.sh --zookeeper hadoop1:2181 --delete --group testgroup --topic my-topic
偏移量管理
可以获取偏移量,并保存批次的最新偏移量,从⽽实现偏移量的重置。在需要重新读取消息或者因消费者⽆法正常处理消息(⽐如包含了⾮法格式的消息)需要跳过偏移量时,需要进⾏偏移量重置。
管理已提交到 kafka 的偏移量:⽬前,还没有⼯具可以⽤于管理由消费者客户端提交到 Kafka 的偏移量,管理功能只对提交到 zk的偏移量可⽤。另外,为了能够管理提交到 Kafka的消费者组偏移量,需要在客户端使⽤响应的 API来提交组的偏移量。
导出偏移量:kafka 没有为导出偏移量提供线程的脚本,不过可以使⽤ kafka-run-class.sh 脚本调⽤底层的 Java类来实现导出。在导出偏移量时,会⽣成⼀个⽂件,⽂件⾥包含了分区和偏移量的信息。偏移量信息以⼀种导⼊⼯具能够识别的格式保存在⽂件⾥。每个分区在⽂件⾥占⽤⼀⾏,格式为:/consumer/GROUPNAME/offsets/topic/TOPICNAME/PARTITIONID-0:OFFSET。⽰例:将组 testgroup 的偏移量导出到 offsets ⽂件⾥。
1 # kafka-run-class.ls.ExportZkOffsets --zkconnect hadoop1:2181 --group testgroup --output-file offsets
2 # cat offsets
3 /consumers/testgroup/offsets/my-topic/0:8904
导⼊偏移量:使⽤导出的⽂件重置消费者组的偏移量。⼀般情况下,我们会导出消费者组的当前偏移量,并将导出的⽂件复制⼀份(备
份),然后修改复制⽂件⾥的偏移量。这⾥要注意,在使⽤导⼊命令时,不需要使⽤ --group 参数,因为⽂件⾥已经包含了消费者组的名字。需要注意,要先关闭消费者,如果消费者组处于活跃状态,它们不会读取新的偏移量,反⽽有可能将导⼊的偏移量覆盖掉。从 offsets ⽂件⾥将偏移量导⼊到消费者组 testgroup
kafka-run-class.ls.ImportZkOffsets --zkconnect hadoop1:2181 --input-file offsets
动态配置变更
我们可以在集处于运⾏状态时覆盖主题配置和客户端的配置参数。为了会增加更多的配置参数,这也是为什么这些参数被单独放进 kafka-configs.sh。这样就可以为特定的主题和客户端指定配置参数。⼀旦设置完毕,它就成为集的永久配置,被保存在 Zookeeper 上,broker 在启动时会读取它们。不管是在⼯具⾥还是⽂档⾥,他们所说的动态配合参数都是基于“主题”实例或者“客户端”实例的,都是可以被“覆
盖”的。这⾥也需要提供 --zookeeper 参数提供 Zookeeper集的连接字符串。
覆盖主题的默认配置
为了满⾜不同的使⽤场景,主题的很多参数都可以进⾏单独的设置。它们⼤部分都有 broker 级别的默认值,在没有被覆盖的情况下使⽤默认值。更改主题配置的命令格式如下:kafka-config.sh
kafka-configs.sh --zookeeper hadoop1:2181 --alter --entity-type topics --entity-name <topic name> --add-config <key>=<value>[,<key>=<value>...]
⽰例:将主题 my-topic 的消息保留时间设为1个⼩时
kafka-configs.sh --zookeeper hadoop1:2181 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000
覆盖客户端的默认配置
对于 kafka客户端⽽⾔,只能覆盖⽣产者配额和消费者配额参数。这两个配额都以字节每秒为单位,表⽰客户端在每个 broker上的⽣产速率或消费速率。也就是说,如果集⾥有 5个broker,⽣产者的配额是 10MB/s 的速率在单个 broker上⽣产数据,总共的速率可以达到
50MB/s。
kafka-configs.sh --zookeeper hadoop1:2181 -alter --entity-type clients --entity-name <client ID> --add-config <key> = <value>[,<key> = <value>...]
可⽤的客户端配置参数(键)如下所⽰:
配置项描述
producer_bytes_rate单个⽣产者每秒钟可以往单个 broker上⽣产的消息字节数
consumer_bytes_rate单个消费者每秒钟可以从单个 broker 读取的消息字节数
列出被覆盖的配置
使⽤命令⼯具可以列出所有被覆盖的配置,从⽽⽤于检查主题或客户端的配置。与其他⼯具类似,这个功能通过 --describe 命令来实现。⽰例:列出主题 my-topic 所有被覆盖的配置
kafka-configs.sh --zookeeper hadoop1:2181 --describe --entity-type topics --entity-name my-topic
移除被覆盖的配置
动态的配置完成可以被移除,从⽽恢复到集的默认配置。可以使⽤ --alter 命令和 --delete-config 参数来删除被覆盖的配置。⽰例:删除主题 my-topic 的 retention.ms 覆盖配置
kafka-configs.sh --zookeeper hadoop1:2181 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
分区管理
kafka提供了两个脚本管理⽤于管理分区,⼀个⽤于重新选举⾸领,另⼀个⽤于将分区分配给 broker。从⽽实现集流量的负载均衡。
⾸选的⾸领选举
使⽤多个分区副本可以提升可靠性。不过,只有其中⼀个副本可以成为分区⾸领,⽽且只有⾸领所在的 broker 可以进⾏⽣产和消费活动。kafka 将副本清单⾥⾯的第⼀个同步副本选为⾸领,但是在关闭重启 broker之后,并不会⾃动恢复原先⾸领的⾝份。
触发⾸选的副本选举,可以让 broker重新获取⾸领。当该事件被触发时,集控制器会为分区重新选择理想的⾸领。选举过程不会造成负⾯影响,因为客户端可以⾃动跟踪⾸领的变化。也可以通过⼯具⼿动触发选举。
kafka-preferred-replica-election.sh --zookeeper hadoop1:2181
修改分区副本
有些时候,可能需要修改分区的副本,⼀下是需要修改分区副本的场景:
【1】主题分区在整个集⾥的不均衡分区造成了集负载的不均衡;
【2】broker 离线造成分区不同步;
【3】新加⼊的 broker需要从集⾥获得负载;
可以使⽤ kafka-reassign-partitions.sh⼯具来修改分区。使⽤该⼯具需要经过如下步骤:第⼀步,根据 broker清单和主题清单⽣成⼀组迁移步骤;第⼆步,执⾏这些迁移步骤。第三步可选,可以使⽤⽣成的迁移步骤验证分区重分配的进度和完成情况。为了⽣成迁移步骤,需要先创建⼀个包含了主题清单的 JSON⽂件,⽂件格式如下:
1 {
2 "topics": [
3 {
4 "topic": "foo"
5 },
6 {
7 "topic": "foo1"
8 }
9 ],
10 "version": 1
11 }
⽰例:为 topic.json ⽂件⾥的主题⽣成迁移步骤,以便将这些主题迁移到 broker0 和 broker1上。broker 的ID以逗号分隔,并作为参数提供给命令⼯具。这个⼯具会在标准控制台上输出两个 JSON对象,分别描述了当前的分区分配情况以及建议的分区分配⽅案。这些 JSON对象的格式如下:{"partitions": [{"topic": "my-topic","partition": 0,"replicas": [1,2]}], "version":1}可以把第⼀个 JSON 对象保存起来,以便在必要的时候进⾏回滚。第⼆个 JSON对象应该被保存到另⼀个⽂件⾥,作为 kafka-reassign-partitions.sh ⼯具的输⼊来执⾏第⼆个步骤。
kafka-reassign-partitions.sh --zookeeper hadoop1:2181 --generate --topics-to-move-json-file topic.json --broker-list 0,1
⽰例:使⽤ reassign.json 来执⾏建议的分区分配⽅案。该命令会将指定分区的副本重新分配到新的 broker上。集控制器通过为每个分区添加新副本实现重新分配(增加复制系数)。新的副本将从⾸领哪⾥复制所有数据。根据分区⼤⼩不同,复制过程可能需要花⼀些时间,因为数据是通过⽹络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单⾥⾯移除。
kafka-reassign-partitions.sh --zookeeper hadoop1:2181 --execute --reassignment-json-file reassign.json
如果要从单个broker上移除多个分区,⽐如将 broker移除集,那么在重新分配副本之前最好先关闭或者重启 broker。这样,这个 broker就不再是任何⼀个分区的⾸领,它的分区就可以被分配到集⾥的其他 broker上。
重分配完成之后,可以通过 kafka-reassign-partitions.sh ⼯具验证重分配的状态。它可以显⽰重分配的进度、已完成重分配的分区以及错误信息,为了实现这⼀点,需要在执⾏过程中使⽤ JSON对象⽂件。⽰例:验证 reassign.json ⽂件⾥指定的分区重分配情况。
kafka-reassign-partitions.sh --zookeeper hadoop1:2181 --verify --reassignment-json-file reassign.json
分区重分配对集的性能影响很⼤,因为它会引起内存页缓存发⽣变化,并占⽤额外的⽹络和磁盘资源。将重分配过程拆分成多个⼩步骤可以将这种影响降到最低。
修改复制系数
分区重新分配⼯具提供了改变分区复制系数的特性。如果在创建分区时指定了错误的复制系数(⽐如创建主题时没有⾜够多的broker)那么就有必要修改它们。可以通过创建 JSON对象来完成,该对象使⽤
分区重分配的执⾏步骤中使⽤的格式,显⽰指定分区需要的副本数量。集将完成重分配过程,并使⽤新的复制系数。例如,假设主题 my-topic 有⼀个分区,该分区的复制系数为1。
1 {
2 "partitions": [
3 {
4 "topics": "my-topic",
5 "partition": 0,
6 "replicas": [1]
7 }
8 ],
9 "version": 1
10 }
kafka命令在分区重新分配的执⾏步骤中使⽤以下 JSON可以将复制系数改为2。也可以通过类似的⽅法减少分区的复制系数。
1 {
2 "partitions": [
3 {
4 "partition": 0,
5 "replicas": [1,2],
6 "topic": "my-topic"
7 }
8 ],
9 "version": 1
10 }
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论