rebalance是什么意思_relief品牌

rebalance是什么意思_relief品牌只有消费组才会涉及Rebalance

只有消费组才会涉及Rebalance

在Rebalance发生期间,消费组内消费者无法读取消息。也就是说Rebalance期间的这一小段时间内,消费者组将变得不可用。
另外,当一个分区重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交位移消费者就发生了Rebalance,之后这个分区又被分配给了消费者组内的另一个消费者,原来被消费完的那部分消息又会被重新消费一次,发生重复消费的情况。

消费组的rebalance本质上是一组协议,它规定了一个消费组是如何达成一致来分配订阅topic的所有分区的。
和旧版本consumer依托于zookeeper进行Rebalance不同,新版本consumer使用了kafka内置的一个全新的组协调协议(GroupCoordinator Protocol)。

新版消费者客户端将全部的消费组分成多个子集,每个消费组的子集在服务端对应一个GroupCoordinator对其进行管理。GroupCoordinator角色由kafka的某个broker承担

GroupCoordinator是kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。
GroupCoordinator和ConsumerCoordinator之间最重要的职责就是负责执行消费者Rebalance的操作,包括分区分配的工作也是在Rebalance期间完成的。


Rebalance触发条件

组Rebalance触发的条件有以下几个:

  1. 有新的消费者加入消费组;
  2. 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况,GroupCoordinator会认为消费者已经下线;
  3. 有消费者主动退出消费组(发送LeaveGroupRequest请求)。比如从客户端调用了unsubscrible()方法取消对某些topic的订阅;
  4. 消费组所对应的GroupCoordinator节点发生变化;
  5. 消费组内所订阅的任一topic或者topic的分区数量发生变化。

引发Rebalance最常见的就是违背了前3个条件,特别是第二个条件中consumer崩溃的情况。这里的崩溃不一定就是指consumer进程“挂掉”或consumer进程所在的机器宕机。当consumer无法再指定的时间内完成消息的处理,那么coordinator就认为该consumer已经崩溃,从而引发新一轮Rebalance。


当有消费者加入消费组时,消费者、消费组以及组协调器之间会经历如下几个阶段(基于2.0.0版本):

  • 第一阶段(find_coordinator

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker,并创建与该broker相互通信的网络连接。向集群中负载最小的节点(leastLoadedNode)发送FindCoordinatorRequest请求。
在0.10.2.2源码中涉及的方法:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendGroupCoordinatorRequest

rebalance是什么意思_relief品牌

请求体中主要的参数:
(1) coordinator_key:消费组名称,即groupId
(2) coordinator_type:固定值为0
0.10.2.2版本,只用groupId参数

kafka在收到FindCoordinatorRequest请求之后,会根据coordinator_key查找对应的GroupCoordinator节点,如果找到对应的GroupCoordinator则会返回相对应的node_id,host和port信息。

具体查找GroupCoordinator的方式是:
先根据消费组groupId的哈希值 % __consumer_offsets分区数量(默认值50)计算__consumer_offsets中的分区编号,知道分区编号后,再寻找此分区leader副本所在的broker节点,该broker节点即为这个groupId所对应的GroupCoordinator节点。

消费者最终的分区分配方案及组内消费者所提交的位移信息都会发送给此分区leader副本所在的broker节点,让此broker节点既扮演GroupCoordinator的角色,又扮演保存分区分配方案和组内消费者位移的角色。

  • 第二阶段(join_group

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求(org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest)。

rebalance是什么意思_relief品牌

请求体中主要的参数:
(1) group_id:groupId
(2) session_timeout:对应消费者端参数session.timeout.ms,默认值为10s。GroupCoordinator超过session.timeout.ms指定的时间内没有收到心跳则认为消费者已经下线
(3) rebalance_timeout:对应消费者端参数max.poll.interval.ms,默认值为5分钟。表示当消费者组Rebalance时,GroupCoordinator等待各个消费者重新加入的最长等待时间
(4) member_id:GroupCoordinator分配给消费者的Id标识。消费者第一次发送请求的时候此字段设置为null(0.10.2.2版本为空字符串)
如果是消费者第一次加入消费组,GroupCoordinator负责给消费者生成member_id。
生成算法:String memberId = clientId + “-” + UUID.randomUUID().toString()
clientld是消费者客户端的clientId,对应请求头中的client_id,由consumer参数client.id指定,默认值为””,如果客户端不设置,则KafkaConsumer会自动生成一个非空字符串,格式为“consumer-数字”。数字是单个KafkaConsumer实例中原子递增分配
(5) protocal_type:固定值consumer,表示消费组实现的协议
(6) group_protocals:数组类型,用于存放消费者指定的分区分配策略(可以囊括多个),这个主要取决于消费者客户段参数partition.assignment.strategy的配置。如果配置了多种策略,那么JoinGroupRequest请求中将包含多个protocol_name和protocol_metadata

如果是原有的消费者重新加入消费组,那么在真正的发送JoinGroupRequest请求之前还要执行一些准备工作(0.10.2.2版本中新消费者加入消费组也会进行如下步骤):
(1) 如果消费端参数enable.auto.commit设置为true,默认值为true,那么在请求加入消费组之前需要向GroupCoordinator提交位移,这个过程是阻塞的,要么成功要么失败;
(2) 如果消费者添加了自定义的Rebalance监听器,那么在此时会调用onPartitionsRevoked();
(3) 因为是重新加入消费组,之前与GroupCoordinator节点之间的心跳检测也就不需要了,所以在成功地重新加入消费组之前需要禁止心跳检测的动作。

消费者在发送JoinGroupRequest请求之后会阻塞等待kafka服务端的响应。

选举消费者的leader
GroupCoordinator需要为消费组内的消费者选举出一个leader。
分两种情况:
(1) 如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader;
(2) 如果某一时刻leader消费者因为某些原因退出了消费组,那么会重新选举一个新的leader,这个选举过程就更加随意了,即为取HashMap的第一个键值对的key。在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。

选举分区分配策略
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从个各个消费者呈报上来的各个分配策略中选取一个彼此都“信服”的策略来进行整体上的分配。分区分配的选举并非由leader消费者决定,而是根据消费组内的各个消费者投票来决定的,这里的投票不是指GroupCoordinator还要在与各个消费者进一步交互,而是根据各个消费者呈报的分配策略来实施。具体的选举过程是:
(1) 收集各个消费者支持的所有分配策略,组成候选集candidates;
(2) 根据每个消费者的指定的分区分配策略,从候选集中找出第一个自身支持的策略,为这个策略投1票;
(3) 计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
如果消费者并不支持选出的分配策略,那么就会报出IllagalArgumentException:Member does not support protocol

然后broker发送JoinGroupResponse响应给各个消费者。leader消费者和其他消费者收到的响应内容并不相同,只有leader消费者收到的响应中包含其他消费者的分区策略分配。

rebalance是什么意思_relief品牌

响应体中主要包含:
(1) generation_id:表示当前消费组的年代信息,避免受到过期请求的影响。
(2) leader_id:表示消费组leader消费者的member_id。
(3) member_id:
(4) members:数组类型。其中包含各个成员信息。发给leader消费者响应中的members内容包含数据,发给其余消费者响应中的members内容为空。

由选举分区分配的策略得知:在GroupCoordinator收到JoinGroupRequest请求之后,GroupCoordinator就已经明确了唯一的分配策略由此可见kafka把分区分配的具体分配还是交给了客户端,自身并不参与具体的分配细节。这样的好处就是:即使以后分配的策略发生了变更,也只需要重启消费者客户端,而不是重启服务器。

rebalance是什么意思_relief品牌

  • 第三阶段(sync_group

leader消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此之后需要将分配的方案同步给各个消费者。此时leader消费者并不是直接和其余的普通消费者同步分配方案,而是通过GroupCoordinator来负责转发同步方案。各个消费者会向GroupCoordinator发送SyncGroupRequest请求来同步分配方案。leader消费者和其他的消费者不同的是只有leader消费者发送的SyncGroupRequest请求中包含具体的分区分配方案。(org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendSyncGroupRequest)

rebalance是什么意思_relief品牌

请求体中包含的主要参数:
(1) group_id:
(2) generation_id:表示当前消费组的年代信息,避免受到过期请求的影响
(3) member_id:
(4) group_assignment:数组类型,其中包含了各个消费者对应得具体分配方案。其他消费者该请求字段为空。
key=member_id,value=member_assignment(与消费者对应的分配方案)

服务端在收到消费者发送的请求之后,将会交由GroupCoordinator来负责具体的逻辑处理。将从leader消费者发过来的分配方案提取出来,连同整个消费组的元数据信息一起存入__consumer_offsets中。最后发送SyncGroupResponse响应给各个消费者以提供各个消费者各自所属的方案。

rebalance是什么意思_relief品牌

当消费者收到所属的分配方案之后会调用PartitionAssignor中的onAssignment()方法,随后再调用ConsumerRebalanceListener中的OnPartitionAssignment()方法。之后开启心跳任务,消费者定期向服务端的GroupCoordinator发送HeartbeatRequest来确定彼此在线。

消费组元数据信息
除客户端提交的消费位移会保存在__consumer_offsets中,消费组的元数据也会保存在__consumer_offsets中。具体来说,每个消费组的元数据都是一条信息,不过这类信息不依赖于具体版本的消息格式,因为它只定义了消息中的key和value字段的具体内容。

rebalance是什么意思_relief品牌

其中key包含version(当前固定值为2)和group(group_id)字段;value中包含很多字段,可以参照和JoinGroupRequest或SyncGroupRequest请求中的内容来理解:
version:固定值为1
protocol_type:固定值consumer
generation:表示当前消费组的年代信息,避免受到过期请求的影响
protocol:消费组选取的分区分配策略
leader:消费者的leader消费者名称
member:数组类型,其中包含了消费组的各个消费者成员信息。

rebalance是什么意思_relief品牌

  • 第四阶段(heartbeat

进入第四阶段之后,消费组中的各个消费者就会处于正常工作状态。在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前的消费位移已经正常保存在了__consumer_offsets中,此时消费者可以通过OffsetFetchRequest请求获取上次提交的消费位移并从此处继续消费。

消费者通过向GroupCoordinator发送心跳来维持它们与消费组的从属关系,以及它们对分区的所有权关系

心跳线程是一个独立的线程,可以在轮询消息的空挡发送心跳。如果过消费者停止发送心跳的时间足够长,则整个会话就被判断为过期,GroupCoordinator也会认为这个消费者已经死亡,就会触发一次Rebalance行为。

消费者心跳间隔时间由参数heartbeat.interval.ms指定,默认值为3秒,这个参数必须比session.timeout.ms参数设定的值要小,一般情况下heartbeat.interval.ms配置的值不能超过session.timeout.ms配置值的1/3。

如果一个消费者发生崩溃,并停止读取消息,那么GroupCoordinator会等待一小段时间,确认这个消费者死亡之后才会触发Rebalance。在这一小段时间内,死掉的消费者并不会读取分区里的消息。这个一小段时间有session.timeout.ms参数控制,该参数的配置值必须控制在broker端参数group.min.session.timeout.ms(默认值为6000,即6秒)和group.max.session.timeout.ms(默认值为30000,即5分钟允许的范围内)。

还有一个参数max.poll.interval.ms,它用来指定使用消费者组管理时poll()方法调用之间的最大延迟,也就是消费者在获取更多消息之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有再次调用,则消费者被视为失败,并且消费者组将Rebalance,以便将分区重新分配给别的成员。

  • 第五阶段(leave_group

除了被动退出消费组,还可以使用LeaveGroupRequest请求主动退出消费组,比如客户端调用了unsubscribe()方法取消对某些topic的订阅。

消费组状态机

kafka为消费组定义了5个状态:

  • Empty:该状态表明group下没有任何active consumer,但可能包含位移信息。每个消费组创建时就处于Empty状态。若消费组一段时间后所有的consumer都离开组,那么消费组也会处于该状态。
    由于可能包含位移信息,处于次状态下的消费组依然可以响应OffsetFetchRequest请求,即返回clients端对应的位移信息
  • PreparingRebalance:该状态表明消费组正在准备进行Rebalance。此时,消费组已经“受理”了部分成员发送过来的JoinGroup请求,同时等待其他成员发送JoinGroupRequest请求,直到所有的成员都成功加入消费组或超时。
    由于该状态下的消费组依然可能保存有位移信息,因此clients依然可以发起OffsetFetchRequest请求,甚至还可以发起OffsetCommitRequest去提交位移。
  • AwaitingSync:该状态表明所有成员都已经加入组等待consume leader发送分区分配方案。同样的,此时依然可以提交位移,当若提交位移,GroupCoordinator将会抛出REBALANCE_IN_PROGRESS异常来表明该消费组正在进行Rebalance。
  • Stable:该状态表明消费组开始正常消费。此时group必须响应clients发送过来的任何请求。比如位移提交请求,位移获取请求,心跳请求等
  • Dead:该状态表明消费组已经彻底废弃,消费组内没有任何active成员并且下消费组的所有数据都已经被删除。处于此状态的消费组不会响应任何请求。严格来说,GroupCoordinator会返回UNKNOWN_MEMBER_ID异常。

参考自《深入理解kafka:核心设计与实践原理》

今天的文章rebalance是什么意思_relief品牌分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/78889.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注