首页 文章详情

Kafka消费者分区分配策略及自定义分配策略

程序源代码 | 428 2020-07-30 18:55 0 0 0
UniSMS (合一短信)

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!

暴走大数据
点击右侧关注,暴走大数据!

kafka消费者如何分配分区以及分配分区策略和源码解释

我们知道kafka的主题中数据数据是按照分区的概念来的,一个主题可能分配了多个分区,每个分区配置了复制系数,为了可用性,在多个broker中进行复制,一个分区在多个broker中选举出一个副本首领,消费者只访问这个分区副本首领,这些在本章节不重要,本章节阐述一个消费者如何选定一个主题中多个分区中的一个分区,和kafka的分区分配策略核心源码解析。

kafka中分区策略核心实现有两种 一种是range范围策略,一种是roudRobin轮询策略,在构建KafkaConsumer类的时候配置,看一下策略的关系就能自行配置, 配置key为partition.assignment.strategy的具体实现,看下图:

首先我们需要有多种假设来举例

假设我们创建了一个主题,并且8个分区p0-p8,我们有3个消费者c0-c2

先来说说第一种策略, range策略

上面已经做好了一些假设

根据range策略,分区按照顺序平铺,消费者按照顺序平铺

分区数量除以消费者数量,这里是分区数量8除以消费者数量3 等于 2 (N),再分区数量8对消费数量3取余得到2 ( M ),kafka的range算法是前 M个消费能得到N+1个分区,剩余的消费者分配到N个分区

具体算法:假设区分数量为pCout,消费者数量为cCount

n = pCout / cCount   8  / 3 = 2

m = pCount % cCount  8 % 3 = 2

前m(2)个消费者得到n+1(2+1)个分区,剩余的消费者分配到N(2)个分区,最终结果如下图

range策略是kafka默认的一个分区分配的策略可以看看ConsumerConfig类的static块,默认配置的RangeAssignor 

想看一下分配分区的策略的入口可以参考KafkaConsumer类中的pollOnce方法进去,里面调用的ensurePartitionAssignment方法,不过这里debug进去看还是挺复杂的,有兴趣的可以参考,篇幅讲的不是这些重点,具体入口可以看下图

下面看一看range策略中核心源码的实现,具体查看RangeAssignor类

@Override    public Map<String, List> assign(Map<String, Integer> partitionsPerTopic,                                                    Map<String, List<String>> subscriptions) {        //获取每个主题消费者们    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);        Map<String, List> assignment = new HashMap<>();        for (String memberId : subscriptions.keySet())            assignment.put(memberId, new ArrayList());         for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {          //主题            String topic = topicEntry.getKey();            //这个主题的消费们            List<String> consumersForTopic = topicEntry.getValue();            //主题的分区数量            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);            if (numPartitionsForTopic == null)                continue;            //对主题的消费者进行排序            Collections.sort(consumersForTopic);            //主题数量除以主题消费者数量            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();            //主题数量对消费者数量进行取余            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();            //封装主题和分区信息            List partitions = partitions(topic, numPartitionsForTopic);            //下面就开始为每一个消费者分配分区,看到这里是不是会发现 消费者分区再均衡,每次添加消费者或者添加分区都会发生再均衡            //事件,不过这里不是重点            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {              //消费者分区起始位置                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);                //分配的分区数量, 从我们上面的假设的分区数量和消费者数量可以得出这里的值                // int length = 2 + (i + 1 > 2 ? 0 : 1);                //因为有的无法整除和取余的,所以前面的2个消费者这里会获得3 的结果, 最后一个消费者这里只能得到2                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);                //为每个消费者分配分区信息                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));            }        }        return assignment;    }

下面讲一讲kafka自带的第二种消费者分配分区的策略

轮询策略

还是按照上面的假设8个分区3个消费者

8个分区按照顺序平铺

构造消费者环 c0,c1,c2,c0,c1,c2.......

轮询分配过程是  p0 分配给了 c0, p1 分配给了 c1, p2分配给了 c2, p3分配给了c0, p4分配给了 c1, p5分配给了c2, 一次类推,所有分区轮询分配给一个消费者环,大概草图如下

 上面草图 多多理解 , 核心源码如下

@Override    public Map> assign(Map partitionsPerTopic,                                                    Map> subscriptions) {        Map> assignment = new HashMap<>();        for (String memberId : subscriptions.keySet())            assignment.put(memberId, new ArrayList());        //讲消费者集合进行排序,构建一个消费者环, 内部通过索引位置+1对总数取余的方式实现的环        CircularIterator assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));        //对所有主题和分区进行排序, 假设集合中有多个主题/分区-分区,最终排序结果为        // t1/p0-p1-p2,t2/p0-p1,t3/p0-p1-p2        for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {          //当前主题            final String topic = partition.topic();            //这里循环遍历看看消费者有没有订阅改topic,否则一直next到下一个消费者,主要的作用是跳过            //没有订阅该主题的消费者            while (!subscriptions.get(assigner.peek()).contains(topic))                assigner.next();            //未当前消费者添加分区信息            assignment.get(assigner.next()).add(partition);        }        return assignment;    }

通过上面的的案例我们是不是可以通过继承AbstractPartitionAssignor抽象类,实现它的assign方法,来自定义消费者分区分配策略,因为这里我们得到了一个所有相关主题和主题分区数量,所有主题对应的消费者,那么就可以在这里根据自己实际场景自定义一些分配策略。

欢迎点赞+收藏+转发朋友圈素质三连



文章不错?点个【在看】吧! ?

good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter