kafkatopic消息分配partition规则(Java源码)
我们知道Kafka 的消息通过topic进⾏分类。topic可以被分为若⼲个partition来存储消息。消息以追加的⽅式写⼊partition,然后以先⼊先出的顺序读取。
下⾯是topic和partition的关系图:
我们⼀般会在f中通过num.partitions参数指定创建topic时包含多少个partition。默认是num.partitions=1。kafka为什么那么快
既然⼀个topic有多个partition,那么消息是怎么样分配到partition的呢?
⽣产者⽣产⼀个消息send到topic分区器,分区器会根据消息⾥⾯的分区参数key值把消息分到对应的partition。这⾥就像我们快递代发⽹点⼀样,快递代发⽹点可以代理很多种快递公司,如果要寄快递者P
(⽣产者)指定⽤什么快递公司,代发⽹点⼈员C(分区器)就会把该物品M(消息)归类到指定的快递公司区域存放。如果P不要求具体的快递公司寄件,那么就由C随意分配快递公司(哈哈,那就要看这个家伙的⼼情了,⼼情好点给你⼀个顺丰⽐较快到达,⼼情不好时就GG吧)。
下⾯是Kafka对消息分配分区 DefaultPartitioner.java 类的核⼼代码:
1public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
2        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
3int numPartitions = partitions.size();
4if (keyBytes == null) {
5int nextValue = AndIncrement();
6            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
7if (availablePartitions.size() > 0) {
8int part = Positive(nextValue) % availablePartitions.size();
(part).partition();
10            } else {
11// no partitions are available, give a non-available partition
Positive(nextValue) % numPartitions;
13            }
14        } else {
15// hash the keyBytes to choose a partition
Positive(Utils.murmur2(keyBytes)) % numPartitions;
17        }
18    }
第4、7⾏:如果没有指定key值并且可⽤分区个数⼤于0时,在就可⽤分区中做轮询决定改消息分配到哪个partition。
第4、10⾏:如果没有指定key值并且没有可⽤分区时,在所有分区中轮询决定改消息分配到哪个partition。
第14⾏:如果指定key值,对key做hash分配到指定的partition。
所以当同⼀个key的消息会被分配到同⼀个partition中。消息在同⼀个partition处理的顺序是FIFO,这就保证了消息的顺序性。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。