Kafka消费者-从Kafka读取数据
(1)Customer和Customer Group
(1)两种常⽤的消息模型
队列模型(queuing)和发布-订阅模型(publish-subscribe)。
队列的处理⽅式是⼀组消费者从服务器读取消息,⼀条消息只由其中的⼀个消费者来处理。
发布-订阅模型中,消息被⼴播给所有的消费者,接收到消息的消费者都可以处理此消息。
(2)Kafka的消费者和消费者组
Kafka为这两种模型提供了单⼀的消费者抽象模型:消费者组(consumer group)。消费者⽤⼀个消费者组名标记⾃⼰。⼀个发布在Topic 上消息被分发给此消费者组中的⼀个消费者。假如所有的消费者都在⼀个组中,那么这就变成了队列模型。假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。⼀个消费者组中消费者订阅同⼀个Topic,每个消费者接受Topic的⼀部分分区的消息,从⽽实现对消费者的横向扩展,对消息进⾏分流。
注意:当单个消费者⽆法跟上数据⽣成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从⽽实现单个应⽤程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应⽤程序从同⼀个Topic读取所有的消息,此时只要保证每个应⽤程序有⾃⼰的消费者组即可。
消费者组的概念就是:当有多个应⽤程序都需要从Kafka获取消息时,让每个app对应⼀个消费者组,从⽽使每个应⽤程序都能获取⼀个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能⼒和处理能⼒,消费者组中的每个消费者只处理每个Topic 的⼀部分的消息,每个消费者对应⼀个线程。
(3)线程安全
(2)Partition Rebalance分区再均衡
(1)消费者组中新添加消费者读取到原本是其他消费者读取的消息
(2)消费者关闭或崩溃之后离开组,原本由他读取的partition将由组⾥其他消费者读取
(3)当向⼀个Topic添加新的partition,会发⽣partition在消费者中的重新分配
以上三种现象会使partition的所有权在消费者之间转移,这样的⾏为叫作再均衡。
再均衡的优点:
给消费者组带来了⾼可⽤性和伸缩性
再均衡的缺点:
(1)再均衡期间消费者⽆法读取消息,整个组有⼀⼩段时间不可⽤
(2)partition被重新分配给⼀个消费者时,消费者当前的读取状态会丢失,有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应⽤程序。
因此需要进⾏安全的再均衡和避免不必要的再均衡。
(3)创建Kafka消费者、订阅主题、轮询
Properties props = new Properties();
props.put("bootstrap", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
//1.创建消费者
KafkaConsuner<String, String> consumer = new KafkaConsumer<String, String>(props);
//2.订阅Topic
//创建⼀个只包含单个元素的列表,Topic的名字叫作customerCountries
consumer.subscribe(Collections.singletonList("customerCountries"));
//⽀持正则表达式,订阅所有与test相关的Topic
//consumer.subscribe("test.*");
//3.轮询
//消息轮询是消费者的核⼼API,通过⼀个简单的轮询向服务器请求数据,⼀旦消费者订阅了Topic,轮询就会处理所欲的细节,包括组协调、partition再均衡、发送⼼跳 //以及获取数据,开发者只要处理从partition返回的数据即可。
try {
while (true) {//消费者是⼀个长期运⾏的程序,通过持续轮询向Kafka请求数据。在其他线程中调⽤consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可⽤的数据都要返回
ConsumerRecord<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
log.pic() + record.partition() + record.offset() + record.key() + record.value());
//统计各个地区的客户数量,即模拟对消息的处理
int updatedCount = 1;
updatedCount += OrDefault(record.value(), 0) + 1;
custCountryMap.put(record.value(), updatedCount);
//真实场景中,结果⼀般会被保存到数据存储系统中
JSONObject json = new JSONObject(custCountryMap);
System.out.String(4));
}
}
} finally {
//退出应⽤程序前使⽤close⽅法关闭消费者,⽹络连接和socket也会随之关闭,并⽴即触发⼀次再均衡
consumer.close();
}
(4)消费者的配置
1:fetch.min.bytes,指定消费者从broker获取消息的最⼩字节数,即等到有⾜够的数据时才把它返回给消费者
2:fetch.max.wait.ms,等待broker返回数据的最⼤时间,默认是500ms。fetch.min.bytes和fetch.max.wait.ms哪个条件先得到满⾜,就按照哪种⽅式返回数据
3:max.partition.fetch.bytes,指定broker从每个partition中返回给消费者的最⼤字节数,默认1MB
4:session.timeout.ms,指定消费者被认定死亡之前可以与服务器断开连接的时间,默认是3s
5:set,消费者在读取⼀个没有偏移量或者偏移量⽆效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认是latest(消费者从最新的记录开始读取数据)。另⼀个值是 earliest(消费者从起始位置读取partition的记录)
6:enable.automit,指定消费者是否⾃动提交偏移量,默认为true
7:partition.assignment.strategy,指定partition如何分配给消费者,默认是Range。Range:把Topic的若⼲个连续的partition分配给消费者。RoundRobin:把Topic的所有partition逐个分配给消费者
8:ds,单次调⽤poll⽅法能够返回的消息数量
(5)提交和偏移量
1、消费者为什么要提交偏移量
当消费者崩溃或者有新的消费者加⼊,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,⽽不是之前处理那个,为了能够继续之前的⼯作,消费者需要读取每个partition最后⼀次提交的偏移量,然后从偏移量指定的地⽅继续处理。
2、提交偏移量可能带来的问题
case1:如果提交的偏移量⼩于客户端处理的最后⼀个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
case2:如果提交的偏移量⼤于客户端处理的最后⼀个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
3、提交偏移量的⽅式
(1)⾃动提交 Automatic Commit
enable.automit设置成true(默认为true),那么每过5s,消费者⾃动把从poll()⽅法接收到的最⼤的偏移量提交。提交的时间间隔由automit.interval.ms控制,默认是5s
⾃动提交的优点是⽅便,但是可能会重复处理消息
(2)提交当前偏移量 Commit Current Offset
将enable.automit设置成false,让应⽤程序决定何时提交偏移量。commitSync()提交由poll()⽅法返回的最新偏移量,所以在处理完所有消息后要确保调⽤commitSync,否则会有消息丢失的风险。commitSync在提交成功或碰到⽆法恢复的错误之前,会⼀直重试。如果发⽣了再均衡,从最近⼀批消息到发⽣再均衡之间的所有消息都会被重复处理。
不⾜:broker在对提交请求作出回应之前,应⽤程序会⼀直阻塞,会限制应⽤程序的吞吐量
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", pic(),
record.partition(), record.offset(), record.key(),
record.value());
}
try {
consumermitSync();//处理完当前批次的消息,在轮询更多的消息之前,调⽤commitSync⽅法提交当前批次最新的消息
} catch (CommitFailedException e) {
<("commit failed", e);//只要没有发⽣不可恢复的错误,commitSync⽅法会⼀直尝试直⾄提交成功。如果提交失败,我们也只能把异常记录到错误⽇志⾥
}
}
(3)异步提交
异步提交的commitAsync,只管发送提交请求,⽆需等待broker响应。commitAsync提交之后不进⾏重试,假设要提交偏移量2000,这时候发⽣短暂的通信问题,服务器接收不到提交请求,因此也就不会作出响应。与此同时,我们处理了另外⼀批消息,并成功提交了偏移量3000,。如果commitAsync重新尝试提交2000,那么它有可能在3000之后提交成功,这个时候如果发⽣再均衡,就会出现重复消息。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", pic(),
record.partition(), record.offset(), record.key(),
record.value());
}
consumermitAsync(new OffsetCommitCallback() {//在broker作出响应后执⾏回调函数,回调经常被⽤于记录提交错误或⽣成度量指标
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
("Commit Failed for offsets {}", offsets, e);
}
}});
}
(4)同步和异步组合提交
⼀般情况下,针对偶尔出现的提交失败,不进⾏重试不会有太⼤的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后⼀次提交,
就要确保提交成功。
因此,在消费者关闭之前⼀般会组合使⽤commitAsync和commitSync提交偏移量。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", pic(),
record.partition(), record.offset(), record.key(),
record.value());
}
consumermitAsync();//如果⼀切正常,我们使⽤commitAsync来提交,这样速度更快,⽽且即使这次提交失败,下次提交很可能会成功
} catch (CommitFailedException e) {
<("commit failed", e);
} finally {
try {
consumermitSync();//关闭消费者前,使⽤commitSync,直到提交成成功或者发⽣⽆法恢复的错误
} finally {
consumer.close();
}
}
(5)提交特定的偏移量
消费者API允许调⽤commitSync()和commitAsync()⽅法时传⼊希望提交的partition和offset的map,即提交特定的偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//⽤于跟踪偏移量的map
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", pic(),
record.partition(), record.offset(), record.key(),
record.value());//模拟对消息的处理
//在读取每条消息后,使⽤期望处理的下⼀个消息的偏移量更新map⾥的偏移量。下⼀次就从这⾥开始读取消息
currentOffsets.put(new pic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, “no matadata”));
if (count++ % 1000 == 0) {//每处理1000条消息就提交⼀次偏移量,在实际应⽤中,可以根据时间或者消息的内容进⾏提交
consumermitAsync(currentOffsets, null);
}
}
}
(6)再均衡
在为消费者分配新的partition或者移除旧的partition时,可以通过消费者API执⾏⼀些应⽤程序代码,在使⽤subscribe()⽅法时传⼊⼀
个ConsumerRebalanceListener实例。
ConsumerRebalanceListener需要实现的两个⽅法
1:public void onPartitionRevoked(Collection<TopicPartition> partitions)⽅法会在再均衡开始之前和消费者停⽌读取消息之后被调⽤。如果在这⾥提交偏移量,下⼀个接管partition的消费者就知道该从哪⾥开始读取了。
2:public void onPartitionAssigned(Collection<TopicPartition> partitions)⽅法会在重新分配partition之后和消费者开始读取消息之前被调⽤。
下⾯的例⼦演⽰如何在失去partition的所有权之前通过onPartitionRevoked()⽅法来提交偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//⽤于跟踪偏移量的map
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//如果发⽣再均衡,要在即将失去partition所有权时提交偏移量。
//注意:(1)提交的是最近处理过的偏移量,⽽不是批次中还在处理的最后⼀个偏移量。因为partition有可能在我们还在处理消息时被撤回。
//(2)我们要提交所有分区的偏移量,⽽不只是即将市区所有权的分区的偏移量。因为提交的偏移量是已经处理过的,所以不会有什么问题。
//(3)调⽤commitSync⽅法,确保在再均衡发⽣之前提交偏移量
consumermitSync(currentOffsets);
}
}
try{
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n", pic(),
record.partition(), record.offset(), record.key(),
record.value());//模拟对消息的处理
/
/在读取每条消息后,使⽤期望处理的下⼀个消息的偏移量更新map⾥的偏移量。下⼀次就从这⾥开始读取消息
currentOffsets.put(new pic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, “no matadata”));
session如何设置和读取}
consumermitAsync(currentOffsets, null);
} catch(WakeupException e) {
//忽略异常,正在关闭消费者
} catch (Exception e) {
<("unexpected error", e);
} finally {
try{
consumermitSync(currentOffsets);
} finally {
consumer.close();
}
}
参考:《Kafka权威指南》
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论