首页 文章详情

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令

老周聊架构 | 281 2022-08-25 14:28 0 0 0
UniSMS (合一短信)

我们前面说过:

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

这一篇我们继续说下:
《聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 GROUP、OFFSET、HEARTBEAT 相关命令》

五、GROUP 相关命令

  • ApiKeys.JOIN_GROUP

  • ApiKeys.LEAVE_GROUP

  • ApiKeys.SYNC_GROUP

  • ApiKeys.DESCRIBE_GROUPS

  • ApiKeys.LIST_GROUPS

  • ApiKeys.DELETE_GROUPS

JOIN_GROUP 和 SYNC_GROUP请求处理在
聊聊 Kafka:Consumer 源码解析之 Consumer 如何加入 Consumer Group 已经说过了。

接下来主要说说另外两个 GROUP 的命令:

5.1 DESCRIBE_GROUPS

主要是返回 group 中各个 member 的详细信息,比如 memberId、groupInstanceId、clientId、clientHost、memberMetadata、memberAssignment。

5.2 LEAVE_GROUP



不难看出,LEAVE_GROUP 请求最重要的逻辑是在 removeMemberAndUpdateGroup 方法,从 group 移除失败的 member,并且将进行相应的状态转换。

  • 如果 group 原来是在 Dead 或 Empty 时,那么由于 group 本来就没有 member,就不再进行任何操作。

  • 如果 group 原来是在 Stable 或 CompletingRebalance 时,那么将会执行 maybePrepareRebalance() 方法,进行 rebalance 操作。

  • 如果 group 已经在 PreparingRebalance 状态了,那么这里将检查一下 join-group 的延迟操作是否完成了,如果操作完成了,那么 GroupCoordinator 就会向 group 的 member 发送 join-group response,然后将状态更新为 CompletingRebalance。

六、OFFSET 相关命令

  • ApiKeys.OFFSET_COMMIT

  • ApiKeys.OFFSET_FETCH

  • ApiKeys.OFFSET_FOR_LEADER_EPOCH

  • ApiKeys.OFFSET_DELETE

这里我们来说下 OFFSET_FETCH 与 OFFSET_COMMIT 两个重要的请求命令,

6.1 OFFSET_FETCH

关于 OFFSET_FETCH 的请求,Server 端的处理如下,新版本>= 1从Kafka读取偏移量,我们这里直接看新版,fetch commit 分两种情况:

  • 获取 group 所消费的所有 topic-partition 的 offset

  • 获取指定 topic-partition 的 offset


在 groupCoordinator.handleFetchOffsets() 的实现中,主要是调用了 groupManager.getOffsets() 获取相应的 offset 信息,在查询时加锁的原因应该是为了避免在查询的过程中 offset 不断更新。

6.2 OFFSET_COMMIT

同样,我们也直接看高版本的移交 offset 的方式:




这里主要介绍下 groupManager.storeOffsets() 方法,主要逻辑如下:

  • 首先过滤掉 offset 信息超过范围的 metadata

  • 将 offset 信息追加到 replicated log 中

  • 调用 prepareOffsetCommit() 方法,先将 offset 信息更新到 group 的 pendingOffsetCommits 中(这时还没有真正提交,后面如果失败的话,是可以撤回的)

  • 在 putCacheCallback 回调函数中,如果 offset 信息追加到 replicated log 成功,那么就更新缓存(将 group 的 pendingOffsetCommits 中的信息更新到 offset 变量中)

七、HEARTBEAT 相关命令

  • ApiKeys.HEARTBEAT

心跳请求是很重要的请求之一,为啥这么说呢?

  • 对于 Client 端而言,心跳请求是 client 感应 group 状态变化的一个重要中介,比如:此时有一个新的 consumer 加入到 consumer group 中了,这时候会进行 rebalance 操作,group 端的状态会发送变化,当 group 其他 member 发送心跳请求,GroupCoordinator 就会通知 client 此时这个 group 正处于 rebalance 阶段,让它们 rejoin group。

  • 对于 Server 端来说,它是 GroupCoordinator 判断一个 consumer member 是否存活的重要条件,如果其中一个 consumer 在给定的时间没有发送心跳请求,那么就会将这个 consumer 从这个 group 中移除,并执行 rebalance 操作。


八、Group 的状态机


通过上面的分析发现,GroupCoordinator 针对 GROUP、OFFSET、HEARTBEAT 相关命令的请求,Group 的状态机的维护是非常重要的,状态机中里的 rebalance 操作是重中之重,我们来回顾下 rebalance 操作的流程:

  • 当消费者小跳丢失时将 group 从 Stable 状态变为 PreparingRebalance;

  • 然后就是等待 group 中的所有 consumer member 发送 join-group 请求加入 group,如果都已经发送 join-group 请求,此时 GroupCoordinator 会向所有 member 发送 join-group response,那么 group 的状态变为 CompletingRebalance;

  • leader consumer 会收到各个 member 订阅的 topic 详细信息,等待其分配好 partition 后,通过 sync-group 请求将结果发给 GroupCoordinator(非 leader consumer 发送的 sync-group 请求的 data 是为空的);

  • 如果 GroupCoordinator 收到了 leader consumer 发送的 response,获取到了这个 group 各个 member 所分配的 topic-partition 列表,group 的状态就会变成 Stable。



Java


good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter