你绝对能看懂的Kafka源代码分析-NetworkClient类代码分析⽬录:
-------------------------------------------------------------------
通过前⽂的学习,我们知道Sender最终把消息发送出去,依靠的是NetWorkClient。它是Kafka的⼀个重要组件,负责⽹络IO,包括连接的建⽴,读数据、写数据等等。Kafka⽹络IO的实现是通过java的NIO,Kafka对NIO进⾏了封装。在学习Kafka⽹络IO相关之前,⼤家先参考⽹上⽂章对NIO有简单的了解(后续我可能也会写⼀篇⼊门教程),再继续阅读本篇⽂章。
Kafka IO部分设计
Kafka IO部分涉及的主要类和依赖关系见下图:
上半部分是Kafka的类,下半部是java nio的类。Kafka的类讲解如下:
1、NetWorkClient,顾名思义,这是Kafka IO对外暴露的客户端。IO操作都是通过它来对外暴露⽅法调⽤。实际上它是通过Kafka的KSelector来实现。
2、KSelector,其实此类名称也是Selector,为了区分nio的selector,故称之为KSelector。他拥有nio selector的引⽤。此外他维护了所有的KafkaChannel。
3、KafkaChannel,他对应nio中的Channel概念,它通过TransportLayer间接持有SocketChannel和SelectionKey这两个nio中的核⼼对象。另外他还维护了发送和接收的数据对象:Send实现及NetWorkReceive。另外请注意唯⼀⼀个从下往上的箭头,KafkaChannel还会把⾃⼰attach到⾃⼰对应的SelectionKey中。这样可以通过SelectionKey⽅便取到对应KafkaChannel。
4、TransportLayer,从名称可以看出这个类实现传输层功能,⽽传输是通过nio实现,所以他持有SocketChannel和Selector这两个nio 的核⼼对象。他所做的事情就是通过这两个对象实现⽹络IO。
5、Send,这是⼀个接⼝,有多个实现,⽬的就是封装要发送的数据,底层是nio的ByteBuffer。
6、NetWorkReceive,接收数据的对象,底层是nio的ByteBuffer。
流程分析
NetWorkClient实现通道的建⽴,读取消息、发送消息等功能。这些功能上原理是相同的,我们继续从KafkaProducer发送消息为⼊⼝点,继续分析发送消息的流程。
前⽂讲到,Sender最终通过NetWorkClient的两个⽅法完成消息发送,如下:
client.send(clientRequest, now);
client.poll(pollTimeout, now);
那么我们就从这两个⽅法开始分析。
send()⽅法
我们回忆⼀下sender发送消息流程,sender把batch按照要发往的node分好类,分装为ClientRequest,然后调⽤NetWorkClient的send⽅法。在这个⽅法⾥并没有真正⽹络IO,⽽只是准备好了要发送的请求对象。
Sender的send⽅法中实际调⽤的是doSend(ClientRequest clientRequest, boolean isInternalRequest, long now)⽅法。
代码如下:
ensureActive();
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = questBuilder();
try {
NodeApiVersions versionInfo = (nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
"Assuming version {}.", clientRequest.apiKey(), lationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
// If the version is not supported, skip sending the request over the wire.
/
/ Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
abortedSends.add(clientResponse);
}
}
此⽅法逻辑如下:
1、检查NetWorkClient状态为激活。
2、取得请求发送⽬的地的nodeId。
3、如果是⾮内部请求,检查connectionState是否ready、Channel是否ready、是否达到发送中上限
4、通过ClientRequest携带的AbstractRequest.Builder对象获取的version以及⽬的地node的api version,来取得最终的version
5、通过builder.build(version)⽅法,来初始化request,这⾥实际⽣成的是ProduceRequest。
6、最后调⽤doSend(clientRequest, isInternalRequest, now, builder.build(version));
我们继续看doSend(clientRequest, isInternalRequest, now, builder.build(version))⽅法。
doSend()⽅法
此⽅法核⼼代码如下:
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
Send send = Send(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
逻辑如下:
1、获取destination,实际上就是要发往的node的id。
2、⽣成Requestheader对象,包括apiKey 、version、clientId、 correlation这些属性。
3、⽣成待发送的send对象,这个send对象封装了⽬的地和header⽣成的ByteBuffer对象
4、⽣成InFlightRequest读喜庆。它持有ClientRequest,request,send等对象。
5、把InFlightRequest添加到inFlightRequests中,InFlightRequests中按照node的id存储InFlightRequest的队列。
6、最后调⽤通过selector的send(send)⽅法做IO前的最后准备⼯作。
Selector的send()⽅法
代码如下:
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (ainsKey(connectionId)) {
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
this.failedSends.add(connectionId);
} else {
try {
nodeselectorchannel.setSend(send);
} catch (Exception e) {
// update the state for consistency, the channel will be discarded after `close`
channel.state(ChannelState.FAILED_SEND);
// ensure notification via `disconnected` when `failedSends` are processed in the next poll
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
<("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);
throw e;
}
}
}
}
这个⽅法逻辑如下:
1、我们知道Selector维护着所有的channel,这个⽅法先通过⽬标node的id,获取KafkaChannel或者新建Channel。
2、我们还知道KakkaChannel中维护着要发送的send对象,所以获取channel后,需要设置send对象。
⾄此,整个sender发送流程中的第⼀步才完整⾛完,也就是sender的run ⽅法中第⼀步sendProducerData(now):此时还没有进⾏⽹络IO,只是做好了IO的准备。channel、要发送的数据,已经就绪,只待被发送出去。
整个准备IO请求从run⽅法进⼊,完整调⽤链如下:
整个流程最终的⽬的,就是把累积的消息按照node分组,⽣成请求对象,把数据设置到相应的channel中,并关注写事件。接下来就是真正调⽤Client.poll⽅法进⾏⽹络IO的流程分析。
poll()⽅法
此⽅法主要代码如下:
........
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
<("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
..........
此⽅法中通过selector的poll做⽹络IO,然后处理响应
主要逻辑如下:
1、调⽤selector的poll⽅法做IO
2、处理完成的发送响应
3、处理完成的接收响应
4、处理关闭的响应
5、处理连接状态变化
6、处理初始化ApiVersion请求
7、处理所有Response的callback
selector的poll⽅法
从这个⽅法开始,终于进⼊了真正的⽹络IO。我们逐层进⾏分析。
poll⽅法的主要逻辑如下:
1、清理各种状态及内部缓存
2、通过nio selector的selectedKeys⽅法获取已经准备好的SelectionKey的集合readyKeys。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论