kafkaproducer的batch.size和linger.ms
1.问题
b atch.size和linger.ms是对kafka producer性能影响⽐较⼤的两个参数。batch.size是p roducer批量发送的基本单位,默认是16384Bytes,即16kB;lingger.ms是sender线程在检查batch是否ready时候,判断有没有过期的参数,默认⼤⼩是0ms。
那么producer是按照batch.size⼤⼩批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?这⾥先说结论:其实满⾜batch.size和ling.ms之⼀,producer便开始发送消息。
2.源码分析
⾸先sender线程主要代码如下,我们主要关⼼sender线程阻塞的情况:
void run(long now) {
Cluster cluster = metadata.fetch();
// ReadyCheckDelayMs表⽰下次检查是否ready的时间,也是//selecotr会阻塞的时间
RecordAccumulator.ReadyCheckResult result = ady(cluster, now);
if (result.unknownLeadersExist)
Iterator<Node> iter = adyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = ();
if (!ady(node, now)) {
notReadyTimeout = Math.min(notReadyTimeout, tionDelay(node, now));
}
}
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.picPartition);
}
}
List<RecordBatch> expiredBatches = this.accumulator.questTimeout, now);
for (RecordBatch expiredBatch : expiredBatches)
pic(), dCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// 暂且只关⼼ReadyCheckDelayMs
long pollTimeout = Math.ReadyCheckDelayMs, notReadyTimeout);
if (adyNodes.size() > 0) {
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);
// poll最终会调⽤s elector,pollTimeout也就是selector阻塞的时间
this.client.poll(pollTimeout, now);
}
nodeselectorselector
private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}
我们可以从实例化⼀个新的KafkaProducer开始分析(还没有调⽤send⽅法),在sender线程调⽤accumulator#ready(..)时候,会返
回r esult,其中包含selector可能要阻塞的时间。由于还没有调⽤send⽅法,所以Deque<RecordBatch>为空,所以result中包含的nextReadyCheckDelayMs也是最⼤值,这个时候selector会⼀直阻塞。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();
// 初始化为最⼤值
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : Set()) {
TopicPartition part = Key();
Deque<RecordBatch> deque = Value();
Node leader = cluster.leaderFor(part);
if (leader == null) {
unknownLeadersExist = true;
} else if (!ains(leader) && !ains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// 和linger.ms有关
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || ds.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
然后我们调⽤send⽅法往内存中放⼊了⼀条数据,由于是新建的⼀个RecordBatch,所以会唤醒sender线程
KafkaProducer#doSend(...)
if (result.batchIsFull || wBatchCreated) {
this.sender.wakeup();
}
这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程⼜运⾏到accumulator#ready(..),由于Deque<RecordBatch>有值,所以返回的result包含的nextReadyCheckDelayMs不再是最⼤值,⽽是和linger.ms有关的值。也就是时候selector会z最多阻塞lingger.ms后就返回,然后再次轮询。
也就是说当Deque<RecordBatch>不为空的时候,sender线程会最多阻塞linger.ms时间;Deque<RecordBatch>为空的时
候,sender线程会阻塞Long.MAX_VALUE时间;⼀旦调⽤了K afkaProduer#send(..)将消息放到内存中,新建了个
RecordBatch,则会将sender线wakeup。
另外从上⾯的代码,即KafkaProducer#doSend(...)中也可以看到,如果有⼀个RecordBatch满了,也会调⽤Sender#wakeup(..),所以综上所述:只要满⾜linger.ms和b atch.size满了就会激活sender线程来发送消息。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论