Kafka消费者:从Kafka中读取数据本系列⽂章为对《Kafka:The Definitive Guide》的学习整理,希望能够帮助到⼤家
应⽤从Kafka中读取数据需要使⽤KafkaConsumer订阅主题,然后接收这些主题的消息。在我们深⼊这些API之前,先来看下⼏个⽐较重要的概念。
Kafka消费者相关的概念
消费者与消费组
假设这么个场景:我们从Kafka中读取消息,并且进⾏检查,最后产⽣结果数据。我们可以创建⼀个消费者实例去做这件事情,但如果⽣产者写⼊消息的速度⽐消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进⾏⽔平扩展。
Kafka消费者是消费组的⼀部分,当多个消费者形成⼀个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有⼀个T1主题,该主题有4个分区;同时我们有⼀个消费组G1,这个消费组只有⼀个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所⽰:
如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所⽰:
如果增加到4个消费者,那么每个消费者将会分别收到⼀个分区的消息,如下所⽰:
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总⽽⾔之,我们可以通过增加消费组的消费者来进⾏⽔平扩展提升消费能⼒。这也是为什么建议创建主题时使⽤⽐较多的分区数,这样可以在消费负载⾼的情况下增加消费者来提升性能。另外,消费者的数量不应该⽐分区数多,因为多出来的消费者是空闲的,没有任何帮助。
Kafka⼀个很重要的特性就是,只需写⼊⼀次消息,可以⽀持任意多的应⽤读取这个消息。换句话说,每个应⽤都可以读到全量的消息。为了使得每个应⽤都能读到全量消息,应⽤需要有不同的消费组。对于上⾯的例⼦,假如我们新增了⼀个新的消费组G2,⽽这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来说它们属于不同的应⽤。
最后,总结起来就是:如果应⽤需要读取全量消息,那么请为该应⽤设置⼀个消费组;如果该应⽤消费能⼒不⾜,那么可以考虑在这个消费组⾥增加消费者。
消费组与分区重平衡
可以看到,当新的消费者加⼊消费组,它会消费⼀个或多个分区,⽽这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(⽐如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka⼀个很重要的性质,这个性质保证了⾼可⽤和⽔平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可⽤。⽽且,将分区进⾏重平衡也会导致原来的消费者状态过期,从⽽导致消费者需要重新更新状态,这段期间也会降低消费性能。后⾯我们会讨论如何安全的进⾏重平衡以及如何尽可能避免。
消费者通过定期发送⼼跳(hearbeat)到⼀个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送⼼跳。
如果消费者超过⼀定时间没有发送⼼跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有⼀定时间的,这段时间内该消费者的分区都不能进⾏消息消费;通常情况下,我们可以进⾏优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以⽴即进⾏重平衡⽽不需要等待会话过期。
在0.10.1版本,Kafka对⼼跳机制进⾏了修改,将发送⼼跳与拉取消息进⾏分离,这样使得发送⼼跳的频率不受拉取的频率影响。另外更⾼版本的Kafka⽀持配置⼀个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应⽤没有故障但是由于某些原因不能进⼀步消费。
创建Kafka消费者
读取Kafka消息只需要创建⼀个kafkaConsumer,创建过程与KafkaProducer⾮常相像。我们需要使⽤四个基本属
性,bootstrap.servers、key.deserializer、value.deserializer和group.id。其中,bootstrap.servers与创建KafkaProducer的含义⼀样;key.deserializer和value.deserializer是⽤来做反序列化的,也就是将字节数组转换成对象;group.id不是严格必须的,但通常都会指定,这个参数是消费者的消费组。
下⾯是⼀个代码样例:
1Properties props = new Properties();
2props.put("bootstrap.servers", "broker1:9092,broker2:9092");
3props.put("group.id", "CountryCounter");
4props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
5props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
6KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
订阅主题
创建完消费者后我们便可以订阅主题了,只需要通过调⽤subscribe()⽅法即可,这个⽅法接收⼀个主题列表,⾮常简单:
nsumer.subscribe(Collections.singletonList("customerCountries"));
这个例⼦中只订阅了⼀个customerCountries主题。另外,我们也可以使⽤正则表达式来匹配多个主题,⽽且订阅之后如果⼜有匹配的新主题,那么这个消费组会⽴即对其进⾏消费。正则表达式在连接K
afka与其他系统时⾮常有⽤。⽐如订阅所有的测试主题:
consumer.subscribe("test.*");
拉取循环
消费数据的API和处理⽅式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了⼀个⾮常简洁的poll⽅法,其内部实现了协作、分区重平衡、⼼跳、数据拉取等功能,但使⽤时这些细节都被隐藏了,我们也不需要关注这些。下⾯是⼀个代码样例:
1try {
2 while (true) { //1)
3 ConsumerRecords<String, String> records = consumer.poll(100); //2)
4 for (ConsumerRecord<String, String> record : records) //3)
5 {
6 log.debug("topic = %s, partition = %s, offset = %d,
7 customer = %s, country = %s\n",
8 pic(), record.partition(), record.offset(),
9 record.key(), record.value());
10 int updatedCount = 1;
11 if (untainsValue(record.value())) {
12 updatedCount = (record.value()) + 1;
13 }
14 custCountryMap.put(record.value(), updatedCount)
15 JSONObject json = new JSONObject(custCountryMap);
16 System.out.String(4))
17 }
18 }
19} finally {
20 consumer.close(); //4
21}
其中,代码中标注了⼏点,说明如下:
1)这个例⼦使⽤⽆限循环消费并处理数据,这也是使⽤Kafka最多的⼀个场景,后⾯我们会讨论如何更好的退出循环并关闭。
2)这是上⾯代码中最核⼼的⼀⾏代码。我们不断调⽤poll拉取数据,如果停⽌拉取,那么Kafka会认为此消费者已经死亡并进⾏重平衡。参数值是⼀个超时时间,指明线程如果没有数据时等待多长时间,0表⽰不等待⽴即返回。
3)poll()⽅法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
4)主动关闭可以使得Kafka⽴即进⾏重平衡⽽不需要等待会话过期。
另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使⽤⼀个消费者对象;⽽且也不能够⼀个线程有多个消费者对象。简⽽⾔之,⼀个线程⼀个消费者,如果需要多个消费者那么请使⽤多线程来进⾏⼀⼀对应。
消费者配置
上⾯的例⼦中只设置了⼏个最基本的消费者参数,bootstrap.servers,group.id,key.deserializer和value.deserializer,其他的参数可以看。虽然我们很多情况下只是使⽤默认设置就⾏,但了解⼀些⽐较重要的参数还是很有帮助的。
fetch.min.bytes
这个参数允许消费者指定从broker读取消息时最⼩的数据量。当消费者从broker读取消息时,如果数据量⼩于这个阈值,broker会等待直到有⾜够的数据,然后才返回给消费者。对于写⼊量不⾼的主题来说,这个参数可以减少broker和消费者的压⼒,因为减少了往返的时间。⽽对于有⼤量消费者的主题来说,则可以明显减轻broker压⼒。
fetch.max.wait.ms
上⾯的fetch.min.bytes参数指定了消费者读取的最⼩数据量,⽽这个参数则指定了消费者读取时最长
等待时间,从⽽避免长时间阻塞。这个参数默认为500ms。
max.partition.fetch.bytes
这个参数指定了每个分区返回的最多字节数,默认为1M。也就是说,KafkaConsumer.poll()返回记录列表时,每个分区的记录字节数最多为1M。如果⼀个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。
需要注意的是,max.partition.fetch.bytes必须要⽐broker能够接收的最⼤的消息(由ssage.size设置)⼤,否则会导致消费者消费不了消息。另外,在上⾯的样例可以看到,我们通常循环调⽤poll⽅法来读取消息,如果max.partition.fetch.bytes设置过⼤,那么消费者需要更长的时间来处理,可能会导致没有及时poll⽽会话过期。对于这种情况,要么减⼩max.partition.fetch.bytes,要么加长会话时间。
session.timeout.ms
这个参数设置消费者会话过期时间,默认为3秒。也就是说,如果消费者在这段时间内没有发送⼼跳,那么broker将会认为会话过期⽽进⾏分区重平衡。这个参数与heartbeat.interval.ms有关,heartbeat.in
terval.ms控制KafkaConsumer的poll()⽅法多长时间发送⼀次⼼跳,这个值需要⽐session.timeout.ms⼩,⼀般为1/3,也就是1秒。更⼩的session.timeout.ms可以让Kafka快速发现故障进⾏重平衡,但也加⼤了误判的概率(⽐如消费者可能只是处理消息慢了⽽不是宕机)。
set
这个参数指定了当消费者第⼀次读取分区或者上⼀次的位置太⽼(⽐如消费者下线时间太久)时的⾏为,可以取值为latest(从最新的消息开始消费)或者earliest(从最⽼的消息开始消费)。
enable.automit
这个参数指定了消费者是否⾃动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false。如果为true,你可能需要关注⾃动提交的时间间隔,该间隔由automit.interval.ms设置。
partition.assignment.strategy
我们已经知道当消费组存在多个消费者时,主题的分区需要按照⼀定策略分配给消费者。这个策略由PartitionAssignor类决定,默认有两种策略:
范围(Range):对于每个主题,每个消费者负责⼀定的连续范围分区。假如消费者C1和消费者C2订阅了两个主题,这两个主题都有
3个分区,那么使⽤这个策略会导致消费者C1负责每个主题的分区0和分区1(下标基于0开始),消费者C2负责分区2。可以看到,如果消费者数量不能整除分区数,那么第⼀个消费者会多出⼏个分区(由主题数决定)。
轮询(RoundRobin):对于所有订阅的主题分区,按顺序⼀⼀的分配给消费者。⽤上⾯的例⼦来说,消费者C1负责第⼀个主题的分区0、分区2,以及第⼆个主题的分区1;其他分区则由消费者C2负责。可以看到,这种策略更加均衡,所有消费者之间的分区数的差值最多为1。
partition.assignment.strategy设置了分配策略,默认为org.apache.sumer.RangeAssignor(使⽤范围策略),你可以设置为org.apache.sumer.RoundRobinAssignor(使⽤轮询策略),或者⾃⼰实现⼀个分配策略然后将
session如何设置和读取partition.assignment.strategy指向该实现类。
client.id
这个参数可以为任意值,⽤来指明消息从哪个客户端发出,⼀般会在打印⽇志、衡量指标、分配配额时使⽤。
ds
这个参数控制⼀个poll()调⽤返回的记录数,这个可以⽤来控制应⽤在拉取循环中的处理数据量。
receive.buffer.bytes、send.buffer.bytes
这两个参数控制读写数据时的TCP缓冲区,设置为-1则使⽤系统的默认值。如果消费者与broker在不同的数据中⼼,可以⼀定程度加⼤缓冲区,因为数据中⼼间⼀般的延迟都⽐较⼤。
提交(commit)与位移(offset)
当我们调⽤poll()时,该⽅法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者⾃⾝来管理消费的位移,并向消费者提供更新位移的接⼝,这种更新位移⽅式称为提交(commit)。
在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进⾏记录。当消费者宕机或者新消费者加⼊时,Kafka会进⾏重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下⾯来看下两种有意思的情况。
假如⼀个消费者在重平衡前后都负责某个分区,如果提交位移⽐之前实际处理的消息位移要⼩,那么会导致消息重复消费,如下所⽰:
假如在重平衡前某个消费者拉取分区消息,在进⾏消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进⾏重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所⽰:
因此,提交位移的⽅式会对应⽤有⽐较⼤的影响,下⾯来看下不同的提交⽅式。
⾃动提交
这种⽅式让消费者来管理位移,应⽤本⾝不需要显式操作。当我们将enable.automit设置为true,那么消费者会在poll⽅法调⽤后每隔5秒(由automit.interval.ms指定)提交⼀次位移。和很多其他操作⼀样,⾃动提交也是由poll()⽅法来驱动的;在调⽤poll()时,消费者判断是否到达提交时间,如果是则提交上⼀次poll返回的最⼤位移。
需要注意到,这种⽅式可能会导致消息重复消费。假如,某个消费者poll消息后,应⽤正在处理消息,在3秒后Kafka进⾏了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
提交当前位移
为了减少消息重复消费或者避免消息丢失,很多应⽤选择⾃⼰主动提交位移。设置automit.offset为false,那么应⽤需要⾃⼰通过调⽤commitSync()来主动提交位移,该⽅法会提交poll返回的最后位移。
为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。⽽如果在处理消息时发⽣了重平衡,那么只有当前poll的消息会重复消费。下⾯是⼀个⾃动提交的代码样例:
1while (true) {
2 ConsumerRecords<String, String> records = consumer.poll(100);
3 for (ConsumerRecord<String, String> record : records)
4 {
5 System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", pic(), record.partition(), record.offset(), record.
6 }
7
8 try {
9 consumermitSync();
10 } catch (CommitFailedException e) {
11 ("commit failed", e)
12 }
13}
上⾯代码poll消息,并进⾏简单的打印(在实际中有更多的处理),最后完成处理后进⾏了位移提交。
异步提交
⼿动提交有⼀个缺点,那就是当发起提交调⽤时应⽤会阻塞。当然我们可以减少⼿动提交的频率,但这个会增加消息重复的概率(和⾃动提
交⼀样)。另外⼀个解决办法是,使⽤异步提交的API。以下为使⽤异步提交的⽅式,应⽤发了⼀个提交请求然后⽴即返回:
1while (true) {
2 ConsumerRecords<String, String> records = consumer.poll(100);
3 for (ConsumerRecord<String, String> record : records)
4 {
5 System.out.printf("topic = %s, partition = %s,
6 offset = %d, customer = %s, country = %s\n",
7 pic(), record.partition(), record.offset(),
8 record.key(), record.value());
9 }
10
11 consumermitAsync();
12}
但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进⾏重试。相⽐较起来,同步提交会进⾏重试直到成功或者最后
抛出异常给应⽤。异步提交没有实现重试是因为,如果同时存在多个异步提交,进⾏重试可能会导致位移覆盖。举个例⼦,假如我们发起了
⼀个异步提交commitA,此时的提交位移为2000,随后⼜发起了⼀个异步提交commitB且位移为3000;commitA提交失败但commitB
提交成功,此时commitA进⾏重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
因此,基于这种性质,⼀般情况下对于异步提交,我们可能会通过回调的⽅式记录提交结果:
1while (true) {
2 ConsumerRecords<String, String> records = consumer.poll(100);
3 for (ConsumerRecord<String, String> record : records) {
4 System.out.printf("topic = %s, partition = %s,
5 offset = %d, customer = %s, country = %s\n",
6 pic(), record.partition(), record.offset(),
7 record.key(), record.value());
8 }
9 consumermitAsync(new OffsetCommitCallback() {
10 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
11 if (e != null)
12 ("Commit failed for offsets {}", offsets, e);
13 }
14 });
15}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论