KafkaController模块的初始化是由ZookeeperLeaderElector决定的,当KafkaController被选为leader时或者Follower会执行不同的回调函数,详细代码如下:
object KafkaController extends Logging {......//KafkaController启动函数private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId)def startup() = { inLock(controllerContext.controllerLock) { //注册session状态改变监听操作 registerSessionExpirationListener() isRunning = true //启动ZookeeperLeaderElector controllerElector.startup } }......}
在ZookeeperLeaderElector的启动函数startup执行KafkaController的选举函数
class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, onResigningAsLeader: () => Unit, brokerId: Int) extends LeaderElector with Logging { ...... def startup { inLock(controllerContext.controllerLock) { controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect } } ...... }def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) leaderId = getControllerID ...... try { createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) leaderId = brokerId onBecomingLeader() } ...... }
二、KafkaController成为Leader后的流程当KafkaController被选举为leader时会触发调用onBecomingLeader函数,而onBecomingLeader函数的实现就是onControllerFailover。其具体实现如下:
def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) //初始化集群内部时钟 readControllerEpochFromZookeeper() incrementControllerEpoch(zkClient) //注册各种监听函数 registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() //初始化Controller上下文,即集群内部元数据信息 initializeControllerContext() //初始化replica状态,启动副本状态转化 replicaStateMachine.startup() //初始化partition状态,启动分区状态转化 partitionStateMachine.startup() controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) //切换状态为RunningAsController brokerState.newState(RunningAsController) //处理集群初始化之前用户下发的PartitionReassignment请求和PreferredReplica请求 maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() sendUpdatemetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) if (config.autoLeaderRebalanceEnable) { //启动负载均衡线程 autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) } //启动Topic删除线程 deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
其大致步骤如下:
1)初始化时钟信息,从zookeeper中读取时钟信息,然后将时钟信息加1,再将这个更新的时钟和版本写入zookeeper,如果成功说明时效性正常,Broker Server利用此值区分请求的时效性。
2)注册各种监听函数,由于Kafka把元数据持久化在Zookeeper中,因此KafkaController针对Zookeeper的不同目录注册不同的监听函数,监听函数主要有以下几种:
在/admin/reassign_partitions目录上注册PartitionReassignedListener,响应用户下发的重分配Topic分区的请求。
在/admin/preferred_replica_election目录上注册PreferredReplicaElectionListener,响应用户下发的重新选举Topic分区副本的请求。
在/admin/delete_topics目录上注册DeleteTopicsListener,响应用户下发的删除Topic的请求。
在/brokers/topics目录上注册TopicChangeListener,响应用户下发的创建Topic的请求。
在/brokers/topics/Topic-0(具体的某个Topic)目录上注册AddPartitionsListener,响应用户下发的增加Topic分区的请求。
3)通过initializeControllerContext()、replicaStateMachine.startup()、partitionStateMachine.startup()初始化Kafka集群内部的元数据信息,比如liveBrokers(在线Broker列表)、allTopics(Topic列表)等,以及建立和集群内其他状态为Follower的KafkaController的通信链路,同时通过maybeTriggerPartition()和maybeTriggerPreferredReplicaElection()处理kafka集群启动前没有及时处理的用户请求,此时可能会变更上述Kafka集群内部的元数据信息,最后通过sendUpdatemetadataRequest()将Kafka集群内部的元数据信息同步给其它状态为Follower的KafkaController。
4)根据auto.leader.rebalance.enable配置项按需启动Kafka集群内部的负载均衡线程。
5)根据delete.topic.enable配置项按需启动Kafka集群内部的Topic删除线程。
三、KafkaController成为Follower后的流程当KafkaController被选举为Follower时出触发调用onResigningAsLeader回调函数,onResigningAsLeader的实现就是onControllerResignation,其具体实现如下:
def onControllerResignation() { //取消针对/admin/reassign_partitions目录的监听 deregisterReassignedPartitionsListener() //取消针对/admin/preferred_replica_election目录的监听 deregisterPreferredReplicaElectionListener() // 关闭Topic删除线程 if (deleteTopicManager != null) deleteTopicManager.shutdown() // 关闭负载均衡线程 if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() inLock(controllerContext.controllerLock) { //取消针对类似/brokers/topics/Topic-0/0/state 目录的监控 deregisterReassignedPartitionsIsrChangeListeners() // 关闭分区状态转换机,内部会注销监听函数,清除分区状态 partitionStateMachine.shutdown() // 关闭副本状态转换机,内部会注销监听函数,清除副本状态 replicaStateMachine.shutdown() //关闭和其他KafkaController的通信链路 if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } // 重置集群内部时钟 controllerContext.epoch=0 controllerContext.epochZkVersion=0 //切换状态为RunningAsBroker brokerState.newState(RunningAsBroker) } }
onControllerResignation的处理逻辑正好和onControllerFailover的处理逻辑相反,其大致可以分为以下几步:
1)取消各种Zookeeper路径上的监听函数,由于当前KafkaController被选举为Follower,所以有关集群的所有元数据信息必须来自状态为Leader的KafkaController。
2)根据delete.topic.enable配置项关闭Kafka集群内部的Topic删除线程。
3)根据auto.leader.rebalance.enable配置项按需关闭Kafka集群内部的负载均衡线程。
4)断开和集群中其它状态为Follower的KafkaController的通信链路。
5)重置集群内部时钟。