[Pulsar源码]Pulsarclient原理解析
Pulsar client原理解析
⽂章⽬录
导语
Pulsar 作为⼀个消息传输的解决⽅案,最基本的功能是提供了pub/sub模型的消息服务,即作为⼀个消息中间件的能⼒,本⽂主要以Java Client为例讲述⽣产者、消费者和Broker之间的交互过程。
1. Client 与 broker 交互流程
和常见的MQ⼀样,Topic(分区)由 Broker 持有,因此⽣产者和消费者⾸先需要创建连接到 Broker,然后查询对应发布或者订阅的Topic 所在的 Broker 信息。
这个过程起始分为以下⼏个步骤:
创建到 Broker 的连接
连接创建完毕之后,向 Broker 查询 Topic 的分区数信息
根据分区数,查询对应分区当前被哪个 Broker 持有
创建到分区所在 Broker 的连接
当 producer/consumer 已经连接到具体分区所在的 Broker 之后,
⽣产者可以进⾏⽣产:
向 Broker 发送请求注册 Producer
发送数据
pending消费者可以进⾏消费:
向 Broker 发送请求注册 Subscription
请求数据
发送 Ack 请求到 Broker
当 Broker 接收到上述请求之后,会⼀⼀处理并且回应:
对于创建链接请求,响应创建成功
对于分区查询请求,响应分区数
对于分区所有 Broker 查询请求,响应 Broker URL
对于⽣产者/消费者注册请求,响应⽣产者、消费者注册成功
对于⽣产者注册请求,响应⽣产注册成功
对于⽣产数据请求,响应⽣产已经接受
对于消费数据请求,响应数据
总体上的⽣产者、消费者和Broker的交互逻辑如上所述,下⾯结合代码详细描述整个过程。
2. PulsarClient 初始化过程
初始化Pulsar Producer和Consumer都需要先初始化 Pulsar client。创建⼀个 pulsar client 的代码如下:
PulsarClient pulsarClient =
PulsarClient.builder()
.serviceUrl(url)
.statsInterval(intervalInSecs, TimeUnit.SECONDS)
.build();
这⾥会涉及到两个参数
serviceUrl: broker 地址列表,也可以通过ServiceUrlProvider提供
statsInterval: client状态采集的周期,默认是60s,⽐如⽣产者会采集⽣产速率,消费者会采集消费速率,当配置⼤于0时⽣效,如果⼩于等于0则不采集状态
初始化 PulsarClientImpl
当所有的参数设置完成之后,最后build()会初始化⼀个PulsarClientImpl对象。在这个过程中,⾸先会初始化 EventLoopGroup 和ConnectionPool ,然后执⾏ PulsarClientImpl的初始化。
初始化EventLoopGroup
根据平台是否⽀持epoll,决定初始化⼀个 EpollEventLoopGroup 或者NioEventLoopGroup(netty可以使⽤native transport来提升性能),并且设置EventLoopGroup的IO线程数。
⼀个EventLoopGroup包含多个EventLoop, Netty使⽤EventLoop来处理连接上的读写事件,⽽⼀个连接上的所有请求都保证在⼀个EventLoop中被处理,⼀个EventLoop中只有⼀个Thread,所以也就实现了⼀个连接上的所有事件只会在⼀个线程中被执⾏。
初始化ConnectionPool
主要是初始化⼀下内容
eventLoopGroup
连接池缓存pool,pool⽤来保存 链接地址和ClientCnx的映射关系
初始化 Netty Bootstrap,设置⼀些参数以及handler,PulsarChannelInitializer, 在链接建⽴之后,会初始化 Channel 的 handler pipeline,主要的handler是 ClientCnx
初始化 Netty dnsResolver
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier<ClientCnx> clientCnxSupplier)throws PulsarClientException {
this.eventLoopGroup = eventLoopGroup;
this.maxConnectionsPerHosts = ConnectionsPerBroker();
pool =new ConcurrentHashMap<>();
bootstrap =new Bootstrap();
bootstrap.ClientSocketChannelClass(eventLoopGroup));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ConnectionTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
try{
bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
}catch(Exception e){
<("Failed to create channel initializer");
throw new PulsarClientException(e);
}
this.dnsResolver =new ()).traceEnabled(true)
.DatagramChannelClass(eventLoopGroup)).build();
}
EventLoopGroup 和 ConnectionPool初始化完毕之后,继续进⾏PulsarClientImpl的初始化⼯作
初始化lookup服务 :Lookup服务⽤来查topic 的元数据信息,⽐如分区数、分区所在broker地址、Schema信息等,有两种类型,HttpLookupService 和 BinaryProtoLookupService
初始化producer/consumer缓存(set)
初始化MemoryLimitController:MemoryLimitController⽤来做⽣产消息的内存占⽤限制,内部会维护内存占⽤的计数器,当⽣产消息时,会增加计数;⽣产完毕时减少计数;如果计数达到上限时,根据blockIfQueueFull来决定阻塞请求或者抛出异常
完成初始化⼯作之后,将Client的状态置为Open。
3. Publish⼯作原理
向Pulsar Publish数据,需要⾸先初始化⼀个producer,初始化的过程中可以为producer指定⼀些属性
.enableBatching(true)// 开启batch
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)// batch的最⼤等待时间
.batchingMaxBytes(12800)// batch的最⼤⼤⼩
.batchingMaxMessages(10)// batch中最多保留的消息条数
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)// Message Routing Mode
.roundRobinRouterBatchingPartitionSwitchFrequency(10)// batch开启时,分区切换频率
.maxPendingMessages(50000)// 最⼤缓存的⽣成请求数
.maxPendingMessagesAcrossPartitions(100000)// 分区topic配置,即所有分区的最⼤缓存数量
.blockIfQueueFull(true)// 当达到最⼤缓存数时是否block客户端
.sendTimeout(5, TimeUnit.SECONDS)// 发送超时时间
.topic("test")
pressionType(CompressionType.SNAPPY)// 压缩类型
.producerName("producer-name")
.enableChunking(true)// 是否开启Chunking特性
.accessMode(ProducerAccessMode.Shared)// producer mode
.intercept(new ProducerInterceptor(){// 设置interceptor,实现⾃定义的功能
@Override
public void close(){}
@Override
public boolean eligible(Message message){return false;}
@Override
public Message beforeSend(Producer producer, Message message){return null;}
@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception){}
})
.create();
Producer配置
Batch 配置
Producer可以配置Batch发送消息和单条发送消息,开启了batch之后,可以通过:
消息条数
消息⼤⼩
batch等待时间
以上三个条件来产⽣batch,上述配置只要满⾜⼀个就会触发⼀个新的batch
Routing 配置
这个配置是针对分区topic的,可以为message设置路由策略来决定message会被发到哪个分区,包含三种:SinglePartition:随机选择⼀个分区,将所有消息写到这个分区
RoundRobinPartition:轮询策略,会将消息轮询的发送到每个分区,如果producer没有开启batch,则每发送⼀条message,就会切换⼀个分区;如果开启了batch,则按照batch的最⼤等待时间batchingMaxPublishDelayMicros的倍数来决定,具体的倍数由参数 batchingPartitionSwitchFrequenc
yByPublishDelay 指定,即每经过 batchingMaxPublishDelayMicros *
batchingPartitionSwitchFrequencyByPublishDelay 的时间,分区切换⼀次,在切换的时间内message被发送到同⼀个分区CustomPartition: ⾃定义分区策略,分区⽅式有MessageRouter决定,即需要⾃⼰实现MessageRouter
对于 SinglePartition 和 RoundRobinPartition,如果message设置了key,都会按照hash的⽅式决定message写到哪个分区。
默认配置RoundRobinPartition;如果存在customMessageRouter,则RoutingMode只能是CustomPartition。
Pending配置
Producer可以最多hold的message数量,如果超过这个配置,则按照blockIfQueueFull来决定是阻塞⽣产者或者抛出异常信息。
如果是分区topic,则每个分区可以hold的最⼤消息数量为:
Math.MaxPendingMessages(), MaxPendingMessagesAcrossPartitions()/ numPartitions);
Chunk 配置
当⼀条Message的⼤⼩超出了broker配置的最⼤消息⼤⼩时,开启Chunk配置,可以让producer⾃动将消息split成为多个⼩的消息,并且按照顺序发送到Broker。如果消费者希望按照获取切分之前的Message,需要对producer和consumer进⾏以下配置:
⽬前只⽀持 non-shared subscription 和 persistent topic
disable batch
Pulsar-client 在接收到 ack之后才能继续发送消息到Broker(防⽌chunk乱序),所以可以调⼩maxPendingMessages来避免太⾼的内存占⽤
配置message 的 ttl/retention来清理不完整的chunk message, 有些场景⽐如producer或者broker重启,会导致broker接收到了不完整的chunk message,这⼀部分消息consumer⽆法ack,需要按照ttl或者retention被清理掉;或者通过配置
ConsumerBuilder#expireTimeOfIncompleteChunkedMessage(long, TimeUnit)来执⾏message expire逻辑
消费者最好配置 receiverQueueSize 和 maxPendingChuckedMessage
如果⼀个消息⼤⼩超过了最⼤值,那么会默认做切分,这是后会根据producerName-sequenceId拼接出⼀个UUid,所有的chunk都有⼀样的sequenceId和uuid。
Producer Mode 配置
⽣产者的模式,⽬前包括三种:
Shared :默认多个producer可以同时向⼀个topic publish消息
Exclusive:请求独占式⽣产,如果已经存在其他producer,则⽴即失败
WaitForExclusive:producer的创建会pending,直⾄获取到独占访问权限
Producer 与 Broker 交互

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。