我们前面说过:
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR
这一篇我们继续说下:
《聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令》
五、GROUP 相关命令
ApiKeys.JOIN_GROUP
ApiKeys.LEAVE_GROUP
ApiKeys.SYNC_GROUP
ApiKeys.DESCRIBE_GROUPS
ApiKeys.LIST_GROUPS
ApiKeys.DELETE_GROUPS
JOIN_GROUP 和 SYNC_GROUP请求处理在
聊聊 Kafka:Consumer 源码解析之 Consumer 如何加入 Consumer Group 已经说过了。
接下来主要说说另外两个 GROUP 的命令:
5.1 DESCRIBE_GROUPS
主要是返回 group 中各个 member 的详细信息,比如 memberId、groupInstanceId、clientId、clientHost、memberMetadata、memberAssignment。
5.2 LEAVE_GROUP
不难看出,LEAVE_GROUP 请求最重要的逻辑是在 removeMemberAndUpdateGroup 方法,从 group 移除失败的 member,并且将进行相应的状态转换。
如果 group 原来是在 Dead 或 Empty 时,那么由于 group 本来就没有 member,就不再进行任何操作。
如果 group 原来是在 Stable 或 CompletingRebalance 时,那么将会执行 maybePrepareRebalance() 方法,进行 rebalance 操作。
如果 group 已经在 PreparingRebalance 状态了,那么这里将检查一下 join-group 的延迟操作是否完成了,如果操作完成了,那么 GroupCoordinator 就会向 group 的 member 发送 join-group response,然后将状态更新为 CompletingRebalance。
六、OFFSET 相关命令
ApiKeys.OFFSET_COMMIT
ApiKeys.OFFSET_FETCH
ApiKeys.OFFSET_FOR_LEADER_EPOCH
ApiKeys.OFFSET_DELETE
这里我们来说下 OFFSET_FETCH 与 OFFSET_COMMIT 两个重要的请求命令,
6.1 OFFSET_FETCH
关于 OFFSET_FETCH 的请求,Server 端的处理如下,新版本>= 1从Kafka读取偏移量,我们这里直接看新版,fetch commit 分两种情况:
获取 group 所消费的所有 topic-partition 的 offset
获取指定 topic-partition 的 offset
在 groupCoordinator.handleFetchOffsets() 的实现中,主要是调用了 groupManager.getOffsets() 获取相应的 offset 信息,在查询时加锁的原因应该是为了避免在查询的过程中 offset 不断更新。
6.2 OFFSET_COMMIT
同样,我们也直接看高版本的移交 offset 的方式:
这里主要介绍下 groupManager.storeOffsets() 方法,主要逻辑如下:
首先过滤掉 offset 信息超过范围的 metadata
将 offset 信息追加到 replicated log 中
调用 prepareOffsetCommit() 方法,先将 offset 信息更新到 group 的 pendingOffsetCommits 中(这时还没有真正提交,后面如果失败的话,是可以撤回的)
在 putCacheCallback 回调函数中,如果 offset 信息追加到 replicated log 成功,那么就更新缓存(将 group 的 pendingOffsetCommits 中的信息更新到 offset 变量中)
七、HEARTBEAT 相关命令
ApiKeys.HEARTBEAT
心跳请求是很重要的请求之一,为啥这么说呢?
对于 Client 端而言,心跳请求是 client 感应 group 状态变化的一个重要中介,比如:此时有一个新的 consumer 加入到 consumer group 中了,这时候会进行 rebalance 操作,group 端的状态会发送变化,当 group 其他 member 发送心跳请求,GroupCoordinator 就会通知 client 此时这个 group 正处于 rebalance 阶段,让它们 rejoin group。
对于 Server 端来说,它是 GroupCoordinator 判断一个 consumer member 是否存活的重要条件,如果其中一个 consumer 在给定的时间没有发送心跳请求,那么就会将这个 consumer 从这个 group 中移除,并执行 rebalance 操作。
八、Group 的状态机
通过上面的分析发现,GroupCoordinator 针对 GROUP、OFFSET、HEARTBEAT 相关命令的请求,Group 的状态机的维护是非常重要的,状态机中里的 rebalance 操作是重中之重,我们来回顾下 rebalance 操作的流程:
当消费者小跳丢失时将 group 从 Stable 状态变为 PreparingRebalance;
然后就是等待 group 中的所有 consumer member 发送 join-group 请求加入 group,如果都已经发送 join-group 请求,此时 GroupCoordinator 会向所有 member 发送 join-group response,那么 group 的状态变为 CompletingRebalance;
leader consumer 会收到各个 member 订阅的 topic 详细信息,等待其分配好 partition 后,通过 sync-group 请求将结果发给 GroupCoordinator(非 leader consumer 发送的 sync-group 请求的 data 是为空的);
如果 GroupCoordinator 收到了 leader consumer 发送的 response,获取到了这个 group 各个 member 所分配的 topic-partition 列表,group 的状态就会变成 Stable。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。