Kafka参数详解及调优--消费者
引⾔
在实际的kafka开发中,我们会发现,⽆论是⽣产者还是消费者,都需要构建⼀个Properties对象,⾥⾯设置了很多参数。对于很多初学者来说,会看不懂这些参数分别代表什么含义。
在本篇⽂章我们就来详细地了解⼀下这些参数的作⽤,并探讨下如何使⽤合理的配置去优化提⾼⽣产/消费效率。
正⽂
1.kafka消费者参数
我们先来看⼀段消费者的构建代码。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.automit", "true");
props.put("automit.interval.ms", "1000");
props.put("set", "earliest");
props.put("session.timeout.ms", "30000");
props.put("fetch.min.bytes", "1048576");
props.put("fetch.max.wait.ms", "2000");
props.put("max.partition.fetch.bytes", "2097152");
props.put("ds", "10000");
props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
在这段代码中有很多常⽤的参数配置,在线上使⽤时,我们要根据实际的数据量和数据⼤⼩来决定这些配置的具体值。下⾯来挑出其中⽐较重要的⼏个参数来详细解析⼀下。
1.1 enable.automit
指定了消费者是否⾃动提交偏移量,默认值是true,为了尽量避免重复数据和数据丢失,可以把它设置为false,有⾃⼰控制合适提交偏移量,如果设置为true, 可以通过设置 automit.interval.ms属性来控制提交的频率。
详细地来说:
当⼀个consumer因某种原因退出Group时,进⾏重新分配partition后,同⼀group中的另⼀个consumer在读取该partition时,怎么能够知道上⼀个consumer该从哪个offset的message读取呢?也是是如何保证同⼀个group内的consumer不重复消费消息呢?上⾯说了⼀次⾛⽹络的fetch请求会拉取到⼀定量的数据,但是这些数据还没有被消息完毕,Consumer就挂掉了,下⼀次进⾏数据fetch时,是否会从上次读到的数据开始读取,⽽导致Consumer消费的数据丢失吗?
为了做到这⼀点,当使⽤完poll从本地缓存拉取到数据之后,需要client调⽤commitSync⽅法(或者commitAsync⽅法)去commit 下⼀次该去读取 哪⼀个offset的message。
⽽这个commit⽅法会通过⾛⽹络的commit请求将offset在coordinator中保留,这样就能够保证下⼀次读取(不论进⾏了rebalance)时,既不会重复消费消息,也不会遗漏消息。
对于offset的commit,Kafka Consumer Java Client⽀持两种模式:由KafkaConsumer⾃动提交,或者是⽤户通过调⽤commitSync、commitAsync⽅法的⽅式完成offset的提交。
⾃动提交的例⼦:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.automit", "true");
props.put("automit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
⼿动提交的栗⼦:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.automit", "false");
props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumermitSync();
buffer.clear();
}
}
在⼿动提交单个partition的offset时,需要注意的⼀点是:要提交的是下⼀次要读取的offset,例如:
try {
while(running) {
// 取得消息
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
// 根据分区来遍历数据:
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = ds(partition);
// 数据处理
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
// 取得当前读取到的最后⼀条记录的offset
long lastOffset = (partitionRecords.size() - 1).offset();
// 提交offset,记得要 + 1
consumermitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
1.2 set
该属性指定了消费者在读取⼀个没有偏移量后者偏移量⽆效(消费者长时间失效当前的偏移量已经过时并且被删除了)的分区的情况下,应该作何处理,默认值是latest,也就是从最新记录读取数据(消费者启动之后⽣成的记录),另⼀个值是earliest,意思是在偏移量⽆效的情况下,消费者从起始位置开始读取数据。
1.3
该属性指定了当消费者被认为已经挂掉之前可以与服务器断开连接的时间。默认是3s,消费者在3s之内没有再次向服务器发送⼼跳,那么将会被认为已经死亡。此时,协调器将会出发再均衡,把它的分区分配给其他的消费者,该属性与heartbeat.interval.ms紧密相关,该参数定义了消费者发送⼼跳的时间间隔,也就是⼼跳频率,⼀般要同时修改这两个参数,,⼀般是session.timeout.ms的三分之⼀,⽐
如,session.timeout.ms设置成3min,那么heartbeat.interval.ms⼀般设置成1min,这样,可以更快的检测以及恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致⾮预期的再均衡(有⼀种情况就是⽹络延迟,本⾝消费者是没有挂掉的,但是⽹络延迟造成了⼼跳超时,这样本不该发⽣再均衡,但是因为⽹络原因造成了⾮预期的再均衡),把该属性的值设置得⼤⼀些,可以减少意外的再均衡,不过检测节点崩愤-需要更长的时间。
1.4 max.partition.fetch.bytes
该属性指定了服务器从每个分区⾥返回给消费者的最⼤字节数。它的默认值是lMB , 也
就是说,kafkaConsumer.poll() ⽅法从每个分区⾥返回的记录最多不超max.partitions.fetch.bytes 指定的字节。如果⼀个主题有20 个分区和5 个消费者,那么每个消费者需要⾄少4MB 的可⽤内存来接收记录。在为消费者分配内存时,可以给它们多分配⼀些,因为如果组⾥有消费者发⽣奔溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须⽐broker 能够接收的最⼤消息的字节数
(通过ssage.size 属性配置)⼤, 否则消费者可能⽆法读取这些消息,导致消费者⼀直挂起重试,例如,ssage.size设置为2MB,⽽该属性设置为1MB,那么当⼀个⽣产者可能就会⽣产⼀条⼤⼩为2MB的消息,那么就会出现问题,消费者能从分区取回的最⼤消息⼤⼩就只有1MB,但是数据量是2MB,所以就会导致消费者⼀直挂起重试。
在设置该属性时,另⼀个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调⽤poll()⽅法
来避免会话过期和发⽣分区再均衡,如果单次调⽤poll()返回的数据太多,消费者需要更多的时间来处理,可能⽆怯及时进⾏下⼀个轮询来避免会话过期。如果出现这种情况, 可以把max.partitioin.fetch.bytes 值改⼩,或者延长会话过期时间。
1.5 fetch.min.bytes
消费者从服务器获取记录的最⼩字节数,broker收到消费者拉取数据的请求的时候,如果可⽤数据量⼩于设置的值,那么broker将会等待有⾜够可⽤的数据的时候才返回给消费者,这样可以降低消费者和broker的⼯作负载。
因为当主题不是很活跃的情况下,就不需要来来回回的处理消息,如果没有很多可⽤数据,但消费者的CPU 使⽤率却很⾼,那么就需要把该属性的值设得⽐默认值⼤。如果消费者的数量⽐较多,把该属性的值设置得⼤⼀点可以降低broker 的⼯作负载。
###1.6
fetch.min.bytes设置了broker返回给消费者最⼩的数据量,⽽fetch.max.wait.ms设置的则是broker的等待时间,两个属性只要满⾜了任何⼀条,broker都会将数据返回给消费者,也就是说举个例⼦,fetch.min.bytes设置成1MB,fetch.max.wait.ms设置成1000ms,那么如果在1000ms时间内,如果数据量达到了1MB,broker将会把数据返回给消费者;如果已经过了1000ms,但是数据量还没有达到
1MB,那么broker仍然会把当前积累的所有数据返回给消费者。session如何设置和读取
1.7 ds
控制单次调⽤call⽅法能够返回的记录数量,帮助控制在轮询⾥需要处理的数据量。
1.8 receive.buffer.bytes + send.buffer.bytes
socket 在读写数据时⽤到的TCP 缓冲区也可以设置⼤⼩。如果它们被设为-1 ,就使⽤操作系统的默认值。如果⽣产者或消费者与broker 处于不同的数据中⼼内,可以适当增⼤这些值,因为跨数据中⼼的⽹络⼀般都有⽐较⾼的延迟和⽐较低的带宽。
1.9 partition.assignment.strategy
分区分配策略,kafka有两个默认策略:
Range:该策略会把主题的若⼲个连续的分区分配给消费者
Robin:该策略把主题的所有分区逐个分配给消费者
分区策略默认是:org.apache.sumer.RangeAssignor=>Range策略
org.apache.sumer.RoundRobinAssignor=>Robin策略
1.10
Consumer进程的标识。如果设置⼀个⼈为可读的值,跟踪问题会⽐较⽅便。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。