kafka的leader切换速度受哪些因素影响?
3 个回答
谢邀
关于这个问题说说我的浅见~ leader切换的确和很多因素有关,事实上很难定量地分析具体的耗时,不过我们依然可以试一试~
首先说一下,Kafka如何判断leader挂了? 简单来说是依靠Zookeeper的临时节点机制:一旦发现某个broker不可用,其/brokers/ids/<id>节点会自动消失,Zookeeper的监听器至少保证会在zookeeper.session.timeout.ms时间内(默认6s)发现此节点变更,之后broker会将此消息包装成一个BrokerChange事件插入到controller的阻塞队列。那么与该队列有关的有两个JMX指标可以监控一下:一个是EventQueueSize;另一个是EventQueueTimeMs。前者是controller的阻塞队列当前大小,如果很多的话说明controller端有很多积压任务,无法及时响应leader选举;另一个表示事件在队列中的等待时间,也可以用于表明controller端的繁忙程度。
之后,当BrokerChange事件从队列中被获取之后,对应的线程会进行处理之。可以监控JMX指标:kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs,查看总体的处理时间。当然这是总体的处理时间,具体到分区leader选举的部分, 可以再重点关注一下StopRelica/LeaderAndIsr以及UpdateMetadata处理时间方面的表现。
最后,对leader选举速度影响最大的一个可能的原因就是controller本身承载了大量的客户端请求处理任务,使得无法立即响应这些请求。目前社区有个KIP正在讨论是否需要将这些需求分成不同的优先级进行处理,有兴趣的可以看看: KIP-291: Have separate queues for control requests and data requests
目录
思考几个问题
- 什么是分区状态机?
- 创建Topic的时候如何选举Leader?
- 分区的所有副本都不在线, 这个时候启动一台之前不在ISR内的副本,它会当选为Leader吗?
- 当所有副本都不在线,然后一个一个重启Broker上副本上线,谁会当选为Leader?谁先启动就谁当选吗?
- Broker下线了,Leader切换给了其他副本, 当Broker重启的时候,Leader会还给之前的副本吗?
- 选举成功的那一刻, 生产者和消费着都做了哪些事情?
- Leader选举期间对分区的影响
分区Leader选举流程分析
在开始源码分析之前, 大家先看下面这张图, 好让自己对Leader选举有一个非常清晰的认知,然后再去看后面的源码分析文章,会更容易理解。
整个流程分为三大块
- 触发选举场景 图左
- 执行选举流程 图中
- Leader选举策略 图右
分区状态机
❝首先大家得了解两个状态机
1. 分区状态机 控制分区状态流转
2. 副本状态机 控制副本状态流转
这里我们主要讲解分区状态机,这张图表示的是分区状态机
- NonExistentPartition :分区在将要被创建之前的初始状态是这个,表示不存在
- NewPartition: 表示正在创建新的分区, 是一个中间状态, 这个时候只是在Controller的内存中存了状态信息
- OnlinePartition: 在线状态, 正常的分区就应该是这种状态,只有在线的分区才能够提供服务
- OfflinePartition: 下线状态, 分区可能因为Broker宕机或者删除Topic等原因流转到这个状态, 下线了就不能提供服务了
- NonExistentPartition: 分区不存在的状态, 当Topic删除完成成功之后, 就会流转到这个状态, 当还处在删除中的时候,还是停留在下线状态。
我们今天要讲的Leader选举
就是在之前状态=>OnlinePartition状态的时候发生的。
Leader选举流程分析
源码入口:
PartitionStateMachine#electLeaderForPartitions
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
可以看到 我们最终是调用了doElectLeaderForPartitions 执行分区Leader选举。
PartitionStateMachine#doElectLeaderForPartitions
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
总结一下上面的源码
- 去zookeeper节点
/broker/topics/{topic名称}/partitions/{分区号}/state
节点读取基本信息。 - 遍历从zk中获取的leaderIsrAndControllerEpoch信息,做一些简单的校验:zk中获取的数据的controllerEpoch必须<=当前的Controller的controller_epoch。最终得到 validLeaderAndIsrs, controller_epoch 就是用来防止脑裂的, 当有两个Controller当选的时候,他们的epoch肯定不一样, 那么最新的epoch才是真的Controller
- 如果没有获取到有效的validLeaderAndIsrs 信息 则直接返回
- 根据入参partitionLeaderElectionStrategy 来匹配不同的Leader选举策略。来选出合适的Leader和ISR信息
- 根据上面的选举策略选出的 LeaderAndIsr 信息进行遍历, 将它们一个个写入到zookeeper节点
/broker/topics/{topic名称}/partitions/{分区号}/state
中。 (当然如果上面没有选择出合适的leader,那么久不会有这个过程了) - 遍历上面写入zk成功的分区, 然后更新Controller里面的分区leader和isr的内存信息 并发送LeaderAndISR请求,通知对应的Broker Leader更新了。
看上面的Leader选举策略是不是很简单, 但是中间究竟是如何选择Leader的? 这个是根据传入的策略类型, 来做不同的选择
❝那么有哪些策略呢?以及什么时候触发这些选举呢?
分区的几种策略以及对应的触发场景
1. OfflinePartitionLeaderElectionStrategy
❝遍历分区的AR, 找到第一个满足以下条件的副本:
- 副本在线
- 在ISR中。
如果找不到满足条件的副本,那么再根据 传入的参数allowUnclean判断
- allowUnclean=true:AR顺序中所有在线副本中的第一个副本。
- allowUnclean=false: 需要去查询配置
unclean.leader.election.enable
的值。
若=true ,则跟上面 1一样 。
若=false,直接返回None,没有找到合适的Leader。
源码位置:
Election#leaderForOffline
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
- 先组装所有给定的 validLeaderAndIsrs 的信息 其实主要还是要去获取每个Topic的对应的
unclean.leader.election.enable
属性值。
默认情况下,我们调用到这里的时候 这个入参allowUnclean=false
.
如果是false 那我们需要去查询一下指定的topic它的属性unclean.leader.election.enable
是什么
如果是true 则表示直接覆盖了unclean.leader.election.enable
的配置为true。
- 找到 第一个满足条件:副本在线 && 在 ISR中的副本。
- 如果没有满足条件的 则判断入uncleanLeaderElectionEnabled的配置 如果是true,则从不在isr中的存活副本中获取副本作为leader。 当然这个uncleanLeaderElectionEnabled 参数是上 步骤1中决定的。
触发场景:Controller 重新加载
❝Controller 当选的时候会启动 分区状态机 partitionStateMachine
, 启动的时候会重新加载所有分区的状态到内存中, 并触发 对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为 OnlinePartition 状态的状态。把新创建的分区和离线的分区触发一下选举流程啊
触发源码入口:
KafkaController#onControllerFailover
partitionStateMachine.startup()
partitionStateMachine.triggerOnlinePartitionStateChange()
触发场景:脚本执行脏选举
❝当执行kafka-leader-election.sh
的时候并且模式选择的是UNCLEAN
. 则会触发这个模式。
这里注意一下,入参allowUnclean
= (electionTrigger == AdminClientTriggered)
意思是: 当触发的场景是AdminClientTriggered的时候, 则allowUnclean=true
,表示 不关心配置参数unclean.leader.election.enable
是什么
如果没有找到符合条件的Leader, 则就去非ISR 列表找Leader。 刚好我们执行脚本的时候触发器就是AdminClientTriggered其他触发器有:
AutoTriggered : 定时自动触发。
ZkTriggered:Controller切换的时候触发的(zk节点/controller 的变更便是Controller角色的切换) AdminClientTriggered:客户端主动触发。
触发场景:Controller 监听到有Broker启动了
❝同上。
触发源码入口:
KafkaController#processBrokerChange#onBrokerStartup
partitionStateMachine.triggerOnlinePartitionStateChange()
触发场景:Controller 监听 LeaderAndIsrResponseReceived请求
❝同上。
当Controller向对应的Broker发起 LeaderAndIsrRequest 请求的时候.
有一个回调函数callback, 这个回调函数会向Controller发起一个事件为 LeaderAndIsrResponseReceived 请求。
具体源码在:
ControllerChannelManager#sendLeaderAndIsrRequest
Controller收到这个事件的请求之后,根据返回的 leaderAndIsrResponse 数据
会判断一下有没有新增加的离线副本(一般都是由于磁盘访问有问题)
如果有新的离线副本,则需要将这个离线副本标记为Offline状态
源码入口:
KafkaController#onReplicasBecomeOffline
partitionStateMachine.triggerOnlinePartitionStateChange()
触发场景:Controller 监听 UncleanLeaderElectionEnable请求
❝当我们在修改动态配置的时候, 将动态配置:unclean.leader.election.enable
设置为 true 的时候
会触发向Controller发起UncleanLeaderElectionEnable的请求,这个时候则需要触发一下。触发请求同上。
触发源码入口:
KafkaController#processTopicUncleanLeaderElectionEnable
partitionStateMachine.triggerOnlinePartitionStateChange(topic)
上面的触发调用的代码就是下面的接口
对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为OnlinePartition 的状态。 状态的流程触发了Leader选举。
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
- 获取所有 OfflinePartition 、NewPartition 的分区状态
- 尝试将 所有 NewPartition or OfflinePartition 状态的分区全部转别成 OnlinePartition状态, 但是如果对应的Topic正在删除中,则会被排除掉
- 分区状态机进行状态流转 使用 OfflinePartitionLeaderElectionStrategy 选举策略(
allowUnclean=true
表示如果从isr中没有选出leader,则允许从非isr列表中选举leader ,allowUnclean=false
表示如果从isr中没有选出leader, 则需要去读取配置文件的配置unclean.leader.election.enable
来决定是否允许从非ISR列表中选举Leader。 )
2. ReassignPartitionLeaderElectionStrategy
❝分区副本重分配选举策略: 当执行分区副本重分配的时候, 原来的Leader可能有变更, 则需要触发一下 Leader选举。
- 只有当之前的Leader副本在经过重分配之后不存在了。
例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。 - 当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader
分区副本重分配发生的Leader选举.
Election#leaderForReassign
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
总结:
❝从当前的副本分配列表中,获取副本在线&&副本在ISR中的 第一个副本,遍历的顺序是当前副本的分配方式(AR),跟ISR的顺序没有什么关系。
触发场景:分区副本重分配
❝并不是每次执行分区副本重分配都会触发这个Leader选举策略, 下面两种情况才会触发
- 只有当之前的Leader副本在经过重分配之后不存在了。例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。
- 当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader
对应的判断条件代码如下:
KafkaController#moveReassignedPartitionLeaderIfRequired
private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
newAssignment: ReplicaAssignment): Unit = {
// 重分配之后的所有副本
val reassignedReplicas = newAssignment.replicas
//当前的分区Leader是哪个
val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
// 如果分配后的副本不包含当前Leader副本,则需要重新选举
if (!reassignedReplicas.contains(currentLeader)) {
//触发Leader重选举,策略是ReassignPartitionLeaderElectionStrategy
partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
} else if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
// 上面2种情况都不符合, 那么就没有必要leader重选举了, 更新一下leaderEpoch就行 了
updateLeaderEpochAndSendRequest(topicPartition, newAssignment)
} else {
//触发Leader重选举,策略是ReassignPartitionLeaderElectionStrategy
partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
}
}
点击查看分区重分配的源码解析
3. PreferredReplicaPartitionLeaderElectionStrategy
❝优先副本选举策略, 必须满足三个条件:
是第一个副本&&副本在线&&副本在ISR列表中。
满足上面三个条件才会当选leader,不满足则不会做变更。
def leaderForPreferredReplica(controllerContext: ControllerContext,
leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
leaderAndIsrs.map { case (partition, leaderAndIsr) =>
leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)
}
}
private def leaderForPreferredReplica(partition: TopicPartition,
leaderAndIsr: LeaderAndIsr,
controllerContext: ControllerContext): ElectionResult = {
// AR列表
val assignment = controllerContext.partitionReplicaAssignment(partition)
// 在线副本
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
val isr = leaderAndIsr.isr
// 找出第一个副本 是否在线 并且在ISR中。
val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
// 组装leaderandisr返回 ,注意这里是没有修改ISR信息的
val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
ElectionResult(partition, newLeaderAndIsrOpt, assignment)
}
def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
}
- 从内存中获取TopicPartition的分配方式
- 过滤不在线的副本
- 找到第一个副本判断一下是否在线&&在ISR列表中。如果满足,则选他为leader,如果不满足,也不会再找其他副本了。
- 返回leaderAndIsr信息, 这里的ISR是没有做修改的。
触发场景:自动定时执行优先副本选举任务
❝Controller 启动的时候,会启动一个定时任务 。每隔一段时间就去执行 优先副本选举任务。
与之相关配置:
## 如果为true表示会创建定时任务去执行 优先副本选举,为false则不会创建
auto.leader.rebalance.enable=true
## 每隔多久执行一次 ; 默认300秒;
leader.imbalance.check.interval.seconds partition = 300
##标识每个 Broker 失去平衡的比率,如果超过该比率,则执行重新选举 Broker 的 leader;默认比例是10%;
##这个比率的算法是 :broker不平衡率=非优先副本的leader个数/总分区数,
##假如一个topic有3个分区[0,1,2],并且有3个副本 ,正常情况下,[0,1,2]分别都为一个leader副本; 这个时候 0/3=0%;
leader.imbalance.per.broker.percentage = 10
触发场景: Controller 重新加载的时候
❝在这个触发之前还有执行partitionStateMachine.startup()
相当于是先把 OfflinePartition、NewPartition状态的分区执行了OfflinePartitionLeaderElectionStrategy 策略。
然后又执行了
PreferredReplicaPartitionLeaderElectionStrategy策略 这里是从zk节点/admin/preferred_replica_election
读取数据, 来进行判断是否有需要执行Leader选举的分区
它是在执行kafka-preferred-replica-election
命令的时候会创建这个zk节点
但是这个已经被标记为废弃了,并且在3.0的时候直接移除了。
源码位置:
KafkaController#onControllerFailover
// 从zk节点/admin/preferred_replica_election找到哪些符合条件需要执行优先副本选举的分区
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
// 这里的触发类型 是 ZkTriggered
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
// 去zk读取节点 /admin/preferred_replica_election
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// 如果指定分区的 leader 已经是AR的第一个副本 或者 topic被删除了,则 过滤掉这个分区(没有必要执行leader选举了)
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
val topicDeleted = replicas.isEmpty
val successful =
if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false
successful || topicDeleted
}
// 将zk获取到的分区数据 - 刚刚需要忽略的数据 = 还需要执行选举的数据
val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
// 找到哪些分区正在删除
val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
// 待删除的分区也过滤掉
val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
// 返回最终需要执行优先副本选举的数据。
pendingPreferredReplicaElections
}
触发场景:执行优先副本选举脚本的时候
❝执行脚本kafka-leader-election.sh
并且选择的模式是PREFERRED
(优先副本选举) 则会选择 PreferredReplicaPartitionLeaderElectionStrategy 策略选举
4. ControlledShutdownPartitionLeaderElectionStrategy
❝受控关机选举策略 :
当Broker关机的过程中,会向Controller发起一个请求, 让它重新发起一次选举, 把在所有正在关机(也就是发起请求的那个Broker,或其它同时正在关机的Broker) 的Broker里面的副本给剔除掉。
根据算法算出leader:找到第一个满足条件的副本:
副本在线 && 副本在ISR中 && 副本所在的Broker不在正在关闭的Broker集合中 。
构造新的ISR列表: 在之前的isr列表中将 正在被关闭的Broker里面的副本 给剔除掉
Election#leaderForControlledShutdown
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
触发场景:Broker关机的时候
❝当Broker关闭的时候, 会向Controller发一起一个ControlledShutdownRequest
请求, Controller收到这个请求会针对性的做一些善后事件。比如说 执行Leader重选举 等等之类的。
源码位置:KafkaServer#controlledShutdown
Controller收到请求的源码位置:KafkaController#doControlledShutdown
与之相关的配置有:
controlled.shutdown.enable : 是否启用受控关闭操作
controlled.shutdown.max.retries 受控关机操作 最大重试的次数
controlled.shutdown.retry.backoff.ms 失败后等等多久再次重试
其他场景
新创建的Topic Leader选举策略
❝创建新的Topic的时候,并没有发生Leader选举的操作, 而是默认从分区对应的所有在线副本中选择第一个为leader, 然后isr就为 所有在线副本,再组装一下当前的controller_epoch信息,写入到zk节点/brokers/topics/{Topic名称}/partitions/{分区号}/state
中。
最后发起 LeaderAndIsrRequest 请求,通知 leader 的变更。
详细看看源码:
PartitionStateMachine#doHandleStateChanges
分区状态从 NewPartition
流转到OnlinePartition
因篇幅原因源码省略
想获得更好的阅读体验和【查看源码】 请点击【阅读原文】
- 从当前的Controller 内存中获取所有入参的分区对应的副本信息
- 过滤那些已经下线的副本( Broker宕机、网络异常、磁盘脱机、等等都有可能造成副本下线) 。
- 每个分区对应的所有在线副本信息 为 ISR 信息,然后取ISR的第一个副本为leader分区。当然特别注意一下, 这个时候获取的isr信息的顺序就是 分区创建时候分配好的AR顺序, 获取第一个在线的。(因为在其他情况下 ISR的顺序跟AR的顺序并不一致)
- 组装 上面的
isr
、leader
、controller_epoch
等信息 写入到zk节点 /brokers/topics/{Topic名称}/partitions/{分区号}/state例如下面所示{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]} - 然后向其他相关Broker 发起 LeaderAndIsrRequest 请求,通知他们Leader和Isr信息已经变更了,去做一下想要的处理。比如去新的leader发起Fetcher请求同步数据。
可以看看之前我们分析过的 Topic创建的源码解析 的原理图 如下
重点看:
回答上面的问题
现在,看完全文之后,我想你应该对下面的问题很清楚了吧!
什么是分区状态机
❝所有的分区状态的流转都是通过分区状态机来进行的, 统一管理! 每个分区状态的流转 都是有严格限制并且固定的,流转到不同状态需要执行的操作不一样, 例如 当分区状态流转到 OnlinePartition 的时候, 就需要判断是否需要执行 Leader选举 ,
创建Topic的时候如何选举Leader?
❝创建Topic的时候并没有发生 Leader选举, 而是默认将 在线的第一个副本设置为Leader,所有在线的副本列表 为 ISR 列表。 写入到了zookeeper中。
分区的所有副本都不在线, 这个时候启动一台之前不在ISR内的副本的Broker,它会当选为Leader吗?
❝视情况而定。 首先, 启动一台Broker, 会用什么策略选举?
看上面的图,我们可以知道是
OfflinePartitionLeaderElectionStrategy
然后看下这个策略是如何选举的?
那么最终结果就是:
所有副本不在线,那么一个Leader的候选者都当选不了
那么这个时候就会判断 unclean.leader.election.enable
配置是否为true.
如果是true, 则当前在线的副本就是只有自己这个刚启动的在线副本,自然而然就会当选Leader了。
如果是fase, 则没有副本能够当前Leader, 次数处于一个无Leader的状态。
当所有副本都不在线,然后一个一个重启Broker上副本上线,谁会当选为Leader?谁先启动就谁当选吗?
❝不是, 跟上一个问题同理
根据unclean.leader.election.enable
配置决定。
如果是true, 则谁先启动,谁就当选(会丢失部分数据)
如果是false,则第一个在ISR列表中的副本当选。
顺便再提一句, 虽然在这里可能不是AR中的第一个副本当选Leader。
但是最终还是会自动执行Leader均衡的,自动均衡使用的策略是
PreferredReplicaPartitionLeaderElectionStrategy
(前提是开启了自动均衡:auto.leader.rebalance.enable=true
)
Broker下线了,Leader切换给了其他副本, 当Broker重启的时候,Leader会还给之前的副本吗?
❝根据配置auto.leader.rebalance.enable=true
决定。true: 会自动执行Leader均衡, 自动均衡策略是 PreferredReplicaPartitionLeaderElectionStrategy 策略
false: 不执行自动均衡。 那么久不会还回去。 关于更详细的 Leader均衡机制请看 Leader 均衡机制
Leader选举期间对分区的影响
❝Leader的选举基本上不会造成什么影响, Leader的切换非常快, 每个分区不可用的时间在几毫秒内。
希望能对你有所帮助!
大家好,我是石臻臻,这是 <a href="https:// http://mp.weixin.qq.com/mp/appmsgalbum?__biz=Mzg4ODY1NTcxNg==&action=getalbum&album_id=1966026980307304450"> 「kafka专栏」 连载中的第「N」篇文章...
前几天有个群友问我: kafka如何修改优先副本? 他们有个需求是, 想指定某个分区中的其中一个副本为Leader
需求分析
对于这么一个问题,在我们生产环境还是挺常见的,经常有需要修改某个Topic中某分区的Leader
比如 topic1-0
这个分区有3个副本[0,1,2]
, 按照「优先副本」的规则,那么 0
号副本肯定就是Leader
了 我们都知道分区中的只有Leader
副本才会提供读写副本,其他副本作为备份 假如在某些情况下,「0」
号副本性能资源不够,或者网络不太好,或者IO压力比较大,那么肯定对Topic的整体读写性能有很大影响, 这个时候切换一台压力较小副本作为Leader
就显得很重要;
优先副本: 分区中的AR
(所有副本)信息, 优先选择排在第一位的副本作为Leader Leader机制: 分区中只有一个Leader来承担读写,其他副本只是作为备份
那么如何实现这样一个需求呢?
解决方案
知道了原理之后,我们就能想到对应的解决方案了 只要将 分区的 AR
中的第一个位置,替换成你指定副本就行了; AR = { 0,1,2 } ==> AR = {2,1,0}
一般能够达到这个目的有两种方案,下面我们来分析一下
方案一: 分区副本重分配
之前关于 分区副本重分配 我已经写过很多文章了,如果想详细了解 分区副本重分配、数据迁移、副本扩缩容 可以看看链接的文章, 这里我就简单说一下;
一般分区副本重分配主要有三个流程
- 生成推荐的迁移Json文件
- 执行迁移Json文件
- 验证迁移流程是否完成
这里我们主要看第2步骤, 来看看迁移文件一般是什么样子的
{
"version": 1,
"partitions": [{
"topic": "topic1",
"partition": 0,
"replicas": [0,1,2]
}]
}
这个迁移Json意思是, 把topic1
的「0」号分区的副本分配成[0,1,2]
,也就是说 topic1-0
号分区最终有3个副本分别在 {brokerId-0,brokerId-1,brokerId-2} ; 如果你有看过我之前写的 分区副本重分配原理源码分析 ,那么肯定就知道,不管你之前的分配方式是什么样子的, 最终副本分配都是 [0,1,2] , 之前副本多的,会被删掉,少的会被新增;
那么我们想要实现 我们的需求 是不是把这个Json文件 中的 "replicas": [0,1,2] 改一下就行了,比如改成 "replicas": [2,1,0] 改完Json后执行,执行execute
, 正式开始重分配流程! 迁移完成之后, 就会发现,Leader已经变成上面的第一个位置的副本「2」 了
优缺点
优点: 实现了需求, 并且主动切换了Leader
缺点: 操作比较复杂容易出错,需要先获取原先的分区分配数据,然后手动修改Json文件,这里比较容易出错,影响会比较大,当然这些都可以通过校验接口来做好限制, 最重要的一点是 副本重分配当前只能有一个任务 ! 假如你当前有一个「副本重分配」的任务在,那么这里就不能够执行了, 「副本重分配」是一个比较「重」 了的操作,出错对集群的影响比较大
方案二: 手动修改AR顺序
首先,我们知道分区副本的分配数据是保存在zookeeper中的节点brokers/topics/{topicName}
中; 我们看个Topic1
的节点数据例子;
{
"version": 2,
"partitions": {
"2": [3, 2, 1],
"1": [2, 1, 3],
"4": [2, 3, 1],
"0": [1, 3, 2],
"3": [1, 2, 3]
},
"adding_replicas": {},
"removing_replicas": {}
}
数据解释: version:
版本信息, 现在有 「1」、「2」 两个版本
removing_replicas:
需要删除的副本数据, 在进行分区副本重分配过程中, 多余的副本会在数据迁移快完成的时候被删除掉,删除成功这里的数据会被清除
adding_replicas:
需要新增的副本数据,在进行分区副本重分配过程中, 新增加的副本将会被新增,新增完成这里的数据会清除;
partitions:
Topic的所有分区副本分配方式; 上面表示总共有5个分区,以及对应的副本位置;
知道了这些之后,想要修改优先副本,是不是可以通过直接修改zookeeper中的节点数据就行了; 比如 我们把 「1」号分区的副本位置改成 [2,1,3]
改成这样之后, 还需要 执行 重新进行优先副本选举操作 ,例如通过kafka的命令执行
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1--election-type PREFERRED --partition 1
--election-type
: PREFERRED
这个表示的以优先副本的方式进行重新选举
那么做完这两步之后, 我们的修改优先副本的目的就达成了.........吗 ?
实则并没有, 因为这里仅仅只是修改了 zookeeper
节点的数据, 而bin/kafka-leader-election.sh
重选举的操作是Controller
来进行的; 如果你对Controller
的作用和源码足够了解, 肯定知道Controller里面保存了每个Topic的分区副本信息, 是保存在JVM内存中的, 然后我们手动修改Zookeeper中的节点,并没有触发 Controller
更新自身的内存 也就是说 就算我们执行了kafka-leader-election.sh
, 它也不会有任何变化,因为优先副本没有被感知到修改了;
解决这个问题也很简单,让Controller
感知到数据的变更就行了 最简单的方法, 让Controller
发生重新选举, 数据重新加载!
总结
- 手动修改zookeeper中的「AR」顺序
- Controller 重新选举
- 执行 分区副本重选举操作(优先副本策略)
简单代码 当然上面功能,肯定是要集成到LogiKM
中的咯; 简单代码如下
// 这里转换成HashMap类型,切勿自定义类型,以防kafka节点数据后续新增数据节点,导致数据丢失
HashMap partitionMap = zkConfig.get(ZkPathUtil.getBrokerTopicRoot(topicName), HashMap.class);
JSONObject partitionJson = (JSONObject)partitionMap.get("partitions");
JSONArray partitions = (JSONArray)partitionJson.get(partition);
//部分代码省略
//调换序列 优先副本
Integer first = partitions.getInteger(0);
partitions.set(0,targetBroker);
partitions.set(index,first);
zkUtils = ZookeeperUtils.getKafkaZkUtils(clusterDO.getZookeeper());
String json = JSON.toJSONString(partitionMap);
zkUtils.updatePersistentPath(ZkPathUtil.getBrokerTopicRoot(topicName), json,null);
//写入成功之后触发一下 异步去优先副本选举
new Thread(()->{
try {
// 1. 先让Controller重新选举 (不然上面修改的还没有生效) (TODO.. 待优化 -> 频繁的Controller重选举对集群性能会有影响)
zkConfig.deletePath(ZkPathUtil.CONTROLLER_ROOT_NODE);
// 等待 Controller 选举一下
Thread.sleep(1000);
//2. 然后再发起副本重新选举
preferredReplicalElectCommand.preferredReplicaElection(clusterId,topicName,partition,"");
} catch (ConfigException | InterruptedException e) {
LOGGER.error("重新选举异常.e:{}",e);
e.printStackTrace();
}
}).start();
优缺点
优点: 实现了目标需求, 简单, 操作方便
缺点: 频繁的Controller
重选举对生产环境来说会有一些影响;
优化与改进
第二种方案中,需要Controller
重选举, 频繁的选举肯定是对生产环境有影响的; Controller
承担了非常多的责任,比如分区副本重分配
、删除topic
、Leader选举
等等还有很多都是它在干
那么如何不进行Controller的重选举,也能达到我们的需求呢?
我们的需求是,当我们 修改了zookeeper中的节点数据的时候,能够迅速的让Controller感知到,并更新自己的内存数据就行了;
对于这个问题,我会在下一期文章中介绍
问题
看完这篇文章,提几个相关的问题给大家思考一下;
- 如果我在修改zk中的「AR」信息时候不仅仅是调换顺序,而是有新增或者删除副本会发生什么情况呢?
- 如果手动修改
brokers/topics/{topicName}/partitions/{分区号}/state
节点里面的leader信息,能不能直接更新Leader? - 副本选举的整个流程是什么样子的?
大家可以思考一下, 问题答案我会在后面的文章中一一讲解!
点个关注, 推送更多 干货 内容, 一起进 【滴滴技术答疑群 】 跟众多技术专家交流技术吧!