KafkaController是Kafka集群的控制管理模块,负责集群中Topic的创建、分区的重新分配以及分区副本Leader的重新选举等管理集群的功能。虽然每个Broker Server都有一个KafkaController模块,但是有且有一个处于leader状态的KafkaController模块对外提供管理服务。下面介绍一下KafkaController的选举策略。
在kafka内部,所有的Broker Server都会启动一个KafkaController模块,但是只会有一个KafkaController成为Leader,其余的都是Follower。KafkaController的选举是通过zookeeper来实现的,当KafkaController模块启动时,会在zookeeper的相同路径上注册一个瞬时节点,由于只有一个节点会注册成功,注册成功的节点会成为leader,其余的都是Follower。由于是瞬时节点,当leader状态的KafkaController掉线时,这个节点会消失,其它处于Follower状态的KafkaController观察到这个节点的变化,会重新尝试创建节点。总之,多个zookeeper客户端在zookeeper集群中相同路径上创建瞬时节点的原子性是由zookeeper保证的,其大致流程如下:
KafkaController的选举流程
KafkaController模块中负责leader选举的类是ZookeeperLeaderElector,其构造函数如下:
class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, onResigningAsLeader: () => Unit, brokerId: Int) extends LeaderElector with Logging { ......}
其中ControllerContext为Controller的上下文,里面包含了当前Topic的元数据信息以及集群的元数据信息,electionPath为多个controller竞争写入的路径,其值为/controller,onBecomingLeader表示成为Leader状态的回调函数,onResigningAsLeader表示成为follower状态的回调函数,brokerId表示当前Broker Server的id。ZookeeperLeaderElector的启动函数如下:
def startup { inLock(controllerContext.controllerLock) { //负责观察数据节点状态,当数据节点消失时可以触发再次选举 controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) //首次选举,选举成功之后成为Leader或者Follower elect } }
从上面的代码可以看出,选举流程主要时在elect中实现的
def elect: Boolean = { val timestamp = SystemTime.milliseconds.toString //组装准备写在/controller上的数据 val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) //从/controller节点获取数据 leaderId = getControllerID //如果获取到数据,则说明已经有leader if(leaderId != -1) { debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) return amILeader } try { //尝试在/controller写入数据 createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) //写入数据成功 info(brokerId + " successfully elected as leader") leaderId = brokerId onBecomingLeader() } catch { //竞争失败,已经被其它节点写入数据 case e: ZkNodeExistsException => // If someone else has written the path, then leaderId = getControllerID if (leaderId != -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) else warn("A leader has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) resign() } amILeader }
当zookeeper的客户端超时的时候,zookeeper会释放该节点之前创建的瞬时节点,如果此时zookeeper被挂起,此时的瞬时节点的删除会被延迟,如果此时客户端恢复连接,会认为之前的节点已经被删除,从而会重新建立连接,这个时候就会报NodeExists异常。如果Zookeeper从挂起状态恢复过来,就会去删除之前的瞬时节点,此时重新连接的zookeeper客户端就会监测到瞬时节点丢失,但是此时zookeeper客户端并没有断开和zookeeper的连接。
因此,KafkaController创建瞬时节点时,需要规避Zookeeper集群在释放瞬时节点存在的问题,其过程如下:
def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerdata: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = { while (true) { try { createEphemeralPathExpectConflict(zkClient, path, data) return } catch { case e: ZkNodeExistsException => { ZkUtils.readDataMaybeNull(zkClient, path)._1 match { case Some(writtenData) => { if (checker(writtenData, expectedCallerData)) { Thread.sleep(backoffTime) } else { throw e } } case None => // the node disappeared; retry creating the ephemeral node immediately } } case e2: Throwable => throw e2 } } }
可见,KafkaController在写入瞬时节点的数据时,如果发生ZkNodeExistsException 异常,就判断当前存在的瞬时节点是否就是自己,如果是就等待一段时间,等zookeeper将之前的瞬时节点删除,自己再重新写入瞬时节点数据。