开发技术分享

  • 文章分类
    • 技术
    • 工作
    • 相册
    • 杂谈
    • 未分类
  • 工具
    • AI 试衣
靡不有初,鲜克有终
[换一句]
  1. 首页
  2. 技术
  3. 正文

Kafka源码·六 - Controller(二)Partition 状态机和 Replica 状态机

2025年3月4日 872点热度

上一篇文章我们学习了三部分内容:

  1. controller 与 broker 的通信机制
  2. controller 内部的事件处理机制
  3. controller 的选举过程

这一节,我们以删除 topic 作为切入点,来学习一下 kafka 中的 Partition State Machine 和 Replica State Machine

一、删除 topic

1、kafka-topics.sh 脚本

kafka 安装目录下会给我们提供一些脚本来管理 kafka,其中就有 topic 的管理工具 kafka-topics.sh,我们可以用下面的命令来删除 topic

kafka-topics.sh --bootstrap-server <kafka-server>:<port> --delete --topic <topic-name>

如果我们打开 kafka-topics.sh 脚本,会发现里面只有一行内容:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

这行代码的意思是:它获取当前脚本 (kafka-topics.sh) 所在的路径,找到 kafka-run-class.sh,这个脚本会启动一个 JVM 进程,并调用 kafka.admin.TopicCommand 的 main 方法来执行 delete 操作,同时(通过$@)把所有的参数都传递给 main 方法。

接下来我们再看看 TopicCommand 的 main 方法:

object TopicCommand extends Logging {

  // 原始命令中的参数都会作为 main 方法的入参
  def main(args: Array[String]): Unit = {

    // 解析命令行中传入的参数
    val opts = new TopicCommandOptions(args)

    // 校验参数合法性。如:
    // 入参不能为空,必须是 createOpt, listOpt, alterOpt, describeOpt, deleteOpt 中的一种、
    // 必须指明 --bootstrap-server 或者 --zookeeper 等
    opts.checkArgs()

    // 早期 topic 的管理请求是去更新 zk 上的数据,
    // 后期 topic 的管理请求可以用 AdminClient 直接向 Controller 发请求
    //
    // 我理解,后期 kafka 在逐渐减少对 zk 的依赖,所以这里把管理请求交给
    // controller 去处理,更符合整体架构的演进方向。
    val topicService = if (opts.zkConnect.isDefined)
      // 如果指明 --zookeeper ,则向 zk 发请求
      ZookeeperTopicService(opts.zkConnect)
    else {
      // 如果指明的是 --bootstrap-server 则使用 AdminClient 向 Controller 发请求
      AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)
    }

    // 只支持下面五种操作
    var exitCode = 0
    try {
      if (opts.hasCreateOption)
        topicService.createTopic(opts)
      else if (opts.hasAlterOption)
        topicService.alterTopic(opts)
      else if (opts.hasListOption)
        topicService.listTopics(opts)
      else if (opts.hasDescribeOption)
        topicService.describeTopic(opts)
      else if (opts.hasDeleteOption)
        topicService.deleteTopic(opts)
    } catch {
      ......
    } finally {
      topicService.close()
      Exit.exit(exitCode)
    }
  ......
}

上面的 topicService.deleteTopic(opts) ,最终会使用 ApiKeys.DELETE_TOPICS 这个 apikey,将请求发送给集群的 controller 节点,,我们可以从 Kafka源码·四 - KafkaRequestHandler 这一节找到这个 apikey,以及对应的处理器:handleDeleteTopicsRequest(request) :

  def handleDeleteTopicsRequest(request: RequestChannel.Request): Unit = {
    ......
    val deleteTopicRequest = request.body[DeleteTopicsRequest]
    val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
    val toDelete = mutable.Set[String]()

    if (!controller.isActive) {
      // 只有 controller 才能履行删除职责,否则会返回 NOT_CONTROLLER 错误
      deleteTopicRequest.data.topicNames.forEach { topic =>
        results.add(new DeletableTopicResult()
          .setName(topic)
          .setErrorCode(Errors.NOT_CONTROLLER.code))
      }
      sendResponseCallback(results)

    } else if (!config.deleteTopicEnable) {
      // 配置文件中,如果配置了不允许删除 topic,则返回错误
      val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED
      deleteTopicRequest.data.topicNames.forEach { topic =>
        results.add(new DeletableTopicResult()
          .setName(topic)
          .setErrorCode(error.code))
      }
      sendResponseCallback(results)

    } else {

      // 可以删除,开始删除

      deleteTopicRequest.data.topicNames.forEach { topic =>
        results.add(new DeletableTopicResult()
          .setName(topic))
      }

      // 判断请求方是否有删除 topic 的权限
      val authorizedTopics = filterByAuthorized(request.context, DELETE, TOPIC,
        results.asScala)(_.name)
      results.forEach { topic =>
         if (!authorizedTopics.contains(topic.name))
           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
         else if (!metadataCache.contains(topic.name))
           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
         else
           toDelete += topic.name
      }

      // 无删除权限,直接退出
      if (toDelete.isEmpty)
        sendResponseCallback(results)
      else {

        // 定义回调函数,用来响应 response
        def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
          errors.foreach {
            case (topicName, error) =>
              results.find(topicName)
                .setErrorCode(error.code)
          }
          sendResponseCallback(results)
        }

        // 删除操作:
        // 这里实际上是去给 zk 的 /admin/delete_topics/ 节点下新增一个节点,
        // 节点名就是待删除的 topic name
        // 后续 zk 监听器会监听到这个变动,然后再触发后续的删除操作,删除结束后,通过
        // handleDeleteTopicsResults 来响应 response 给请求方
        adminManager.deleteTopics(
          deleteTopicRequest.data.timeoutMs,
          toDelete,
          controllerMutationQuota,
          handleDeleteTopicsResults
        )
      }
    }
  }

在 Kafka源码·五 - Controller(一) 这一节中,我们知道,controller 选举成功的时候,会注册各种 zk 监听器,其中就包括用来监听待删除 topic 的 TopicDeletionHandler 监听器:


class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {

  // 监听 /admin/delete_topics/
  override val path: String = TopicsZNode.path

  // 监听到子节点变化之后,向队列中放入一个 TopicDeletion 事件
  override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
}

同样,在这一篇文章中,我们可以看到 controller 是如何处理这个 TopicDeletion 事件的:

  private def processTopicDeletion(): Unit = {
    if (!isActive) return

    // 查询 /admin/delete_topics/ 下的子节点 (节点名称就是待删除的 topic name)
    var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
    debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")

    // 如果 topic 不存在,则直接删除 /admin/delete_topics/ 下对应的节点即可
    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
    if (nonExistentTopics.nonEmpty) {
      warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
      zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
    }

    // 排除掉实际不存在的 topic
    topicsToBeDeleted --= nonExistentTopics

    if (config.deleteTopicEnable) {

      if (topicsToBeDeleted.nonEmpty) {
        info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")

        // mark topic ineligible for deletion if other state changes are in progress
        // 如果此时 topic 有分区迁移的任务尚未完成,则标记为不满足删除条件(ineligible for deletion)
        // 并记录到 ctx 中,后续删除的时候会跳过
        topicsToBeDeleted.foreach { topic =>
          val partitionReassignmentInProgress =
            controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
          if (partitionReassignmentInProgress)
            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
              reason = "topic reassignment in progress")
        }

        // 先添加到 controller ctx 的 topicsToBeDeleted map 中,
        // 然后调用 TopicDeletionManager.resumeDeletions() 激活删除任务
        topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
      }
    } else {
      // 如果没有启用 “删除 topic” 功能,则清空 /admin/delete_topics 下的内容
      info(s"Removing $topicsToBeDeleted since delete topic is disabled")
      zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
    }
  }

2、TopicDeletionManager 类的定义及初始化

TopicDeletionManager 顾名思义就是负责删除 topic 的,下面是它的定义:

class TopicDeletionManager(config: KafkaConfig,
                           controllerContext: ControllerContext,

                           // 副本状态机;要删除 topic 就得先删除 partition,要删除 paitition,就得先删除 replica
                           // 同时就要更新这个状态机。
                           replicaStateMachine: ReplicaStateMachine,

                           // 分区状态机
                           partitionStateMachine: PartitionStateMachine,

                           // 与 zk 交互的客户端,用于删除 zk 上指定的资源
                           client: DeletionClient) extends Logging {

  // 是否允许删除 topic
  val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable

  // 每个 broker 都会创建 controller 对象,但只有选举成功成为新 controller 的时候,才会履行职责。
  // TopicDeletionManager 在 controller 创建的时候被创建并注入;
  //
  // 而在当前 broker 成为新 controller 的时候,才会调用这个 init 方法,
  // 查找是否有因为选举而被挂起的删除任务,并将其重新放入任务队列
  //
  // initialTopicsToBeDeleted:
  //     选举成功之后,controller 查询 /admin/delete_topics 下的 topic_name 列表,获取所有待删除的 topic
  // initialTopicsIneligibleForDeletion:
  //     上述 topic 列表中,有 topic 正在执行 partition reassignment 或者
  //     持有 topic 某个 replica 的 broker 宕机了
  //
  def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]): Unit = {
    info(s"Initializing manager with initial deletions: $initialTopicsToBeDeleted, " +
      s"initial ineligible deletions: $initialTopicsIneligibleForDeletion")

    if (isDeleteTopicEnabled) {

      // append 到 ctx 的 topicsToBeDeleted 列表中
      controllerContext.queueTopicDeletion(initialTopicsToBeDeleted)

      // 确保 append 到 topicsIneligibleForDeletion 中的 topic 是
      // 应该删除 但 不能立刻删除 的
      controllerContext.topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & controllerContext.topicsToBeDeleted

    } else {
      // if delete topic is disabled clean the topic entries under /admin/delete_topics
      // 如果没有开启删除 topic 功能,则删除 /admin/delete_topics/{topic} 下的内容
      client.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
    }
  }

  // 在上一篇文章的 onControllerFailover() 方法中,新 controller 选举成功时,
  // 会先调用上面的 init 方法,将待删除 topic 放入 topicsToBeDeleted 队列中
  // 然后再调用这个方法激活删除操作
  def tryTopicDeletion(): Unit = {
    if (isDeleteTopicEnabled) {
      resumeDeletions()
    }
  }
  ......
}

继续看 resumeDeletions() 是如何实现的:

  private def resumeDeletions(): Unit = {

    // 拷贝一份待删除 topic 列表
    val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted

    val topicsEligibleForRetry = mutable.Set.empty[String]
    val topicsEligibleForDeletion = mutable.Set.empty[String]

    if (topicsQueuedForDeletion.nonEmpty)
      info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")

    // 逐个检查 topic 及其 replica 所处的状态,
    // 判断 是否可以删除,是否需要重试
    topicsQueuedForDeletion.foreach { topic =>

      // 如果所有的 replica 都被标记为 删除成功,就认为 topic 删除完成
      if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {

        // 所有的 replica 都已经流转到 ReplicaDeletionSuccessful 状态了,
        // 这时候再将 replica 全部设置为 NonExistentReplica
        // 然后就可以删除 zk 上该 topic 相关的所有节点了,以及 ctx 中 topic 相关的所有数据
        completeDeleteTopic(topic)
        info(s"Deletion of topic $topic successfully completed")

      } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
        // 相当于 controllerContext.noReplicaInState(topic, ReplicaDeletionStarted))

        // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
        // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
        // or there is at least one failed replica (which means topic deletion should be retried).
        //
        // 到这里,
        // 所有的 replica 都不在 ReplicaDeletionStarted 状态,
        // 并且存在 replica 不在 ReplicaDeletionSuccessful 状态,
        // 有两种可能,
        // 一是这个 topic 可能还没有初始化删除任务,
        // 二是有部分 replica 删除失败了,处于 Ineligible to delete 状态,
        //   这时候就需要重试删除任务
        if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
          topicsEligibleForRetry += topic
        }
      }

      // Add topic to the eligible set if it is eligible for deletion.
      //
      // 下列条件都满足时,可以删除 topic
      // 1. topicsToBeDeleted 中包含 topic
      // 2. replica 都不处于 ReplicaDeletionStarted 状态(started 表示已经处于删除进程中了)
      // 3. topicsIneligibleForDeletion 中不包含 topic
      if (isTopicEligibleForDeletion(topic)) {
        info(s"Deletion of topic $topic (re)started")
        topicsEligibleForDeletion += topic
      }
    }

    // 将 ReplicaDeletionIneligible 状态的 replica,设置为 OfflineReplica
    // 以备后续删除
    if (topicsEligibleForRetry.nonEmpty) {
      retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
    }

    // 执行 topic deletion
    if (topicsEligibleForDeletion.nonEmpty) {
      onTopicDeletion(topicsEligibleForDeletion)
    }
  }

主要都还是一些状态的校验,最后一段的 onTopicDeletion(topicsEligibleForDeletion) 才是更新 Partition State Machine 的地方

二、分区状态机

在上一篇文章中,我们了解到当 controller 选举成功之后,就会调用 stateMachine 的 startup 方法,来启动状态机

  private def onControllerFailover(): Unit = {
    ......
    replicaStateMachine.startup()
    partitionStateMachine.startup()
    ......
  }

1、PartitionStateMachine.startup()

下面是 PartitionStateMachine 类的定义以及 startup 方法:

abstract class PartitionStateMachine(controllerContext: ControllerContext) extends Logging {
  /**
   * Invoked on successful controller election.
   */
  def startup(): Unit = {

    // 为 ctx 中所有的 partition 都指定初始状态
    info("Initializing partition state")
    initializePartitionState()

    // 让处于 NewPartition 和 OfflinePartition 状态的分区流转起来
    info("Triggering online partition state changes")
    triggerOnlinePartitionStateChange()
    debug(s"Started partition state machine with initial state -> ${controllerContext.partitionStates}")
  }

  /**
   * 启动 partition state machine 的时候,为 zk 上所有的分区设置初始状态
   */
  private def initializePartitionState(): Unit = {

    // 为 ctx 中所有的 partition 都指定初始状态
    for (topicPartition <- controllerContext.allPartitions) {

      // check if leader and isr path exists for partition. If not, then it is in NEW state
      //
      // Controller 为 partition 选出了 leader 之后,会先将 LeaderAndIsr 信息保存在
      // zk 的 /brokers/topics/[topic]/partitions/[partition]/state 路径上
      // 再将 LeaderAndIsr 信息保存到 ctx 的 partitionLeadershipInfo 中
      //
      // 这里去 ctx 的 partitionLeadershipInfo 中查找 partition 的 leader
      // 如果没找到,说明这个 partition 处于 NewPartition 状态
      controllerContext.partitionLeadershipInfo(topicPartition) match {

        case Some(currentLeaderIsrAndEpoch) =>

          // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
          // 如果 broker 存活,则将 partition 置为 OnlinePartition 状态,否则置为 OfflinePartition
          if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
            controllerContext.putPartitionState(topicPartition, OnlinePartition)
          else
            controllerContext.putPartitionState(topicPartition, OfflinePartition)
        case None =>
          controllerContext.putPartitionState(topicPartition, NewPartition)
      }
    }
  }

  def triggerOnlinePartitionStateChange(): Unit = {
    val partitions = controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
    triggerOnlineStateChangeForPartitions(partitions)
  }

  private def triggerOnlineStateChangeForPartitions(partitions: collection.Set[TopicPartition]): Unit = {
    // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
    // that belong to topics to be deleted
    //
    // 除了待删除 topic 的 partition,将其他所有处于 NewPartition 或 OfflinePartition 
    // 状态的 partition 置为 OnlinePartition 状态
    val partitionsToTrigger = partitions.filter { partition =>
      !controllerContext.isTopicQueuedUpForDeletion(partition.topic)
    }.toSeq

    handleStateChanges(partitionsToTrigger, OnlinePartition, Some(OfflinePartitionLeaderElectionStrategy(false)))
  }

注意这里最后一段代码中,有一个 OfflinePartitionLeaderElectionStrategy,它属于分区选举策略的一种,不同的场景下,策略会有所不同,这里先简单看一下,后文再做详细探讨:

sealed trait PartitionLeaderElectionStrategy

// 因为 Leader 副本下线而引发的分区 Leader 选举
final case class OfflinePartitionLeaderElectionStrategy(allowUnclean: Boolean) extends PartitionLeaderElectionStrategy

// 因为执行分区副本重分配操作而引发的分区 Leader 选举
final case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy

// 因为执行 Preferred 副本 Leader选举而引发的分区 Leader 选举
final case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy

// 因为正常关闭 Broker 而引发的分区 Leader 选举
final case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy

2、状态机流转

从 PartitionStateMachine 类的签名中可以看到,它是一个抽象类,它有两个子类,一个叫 MockPartitionStateMachine,看名字就知道是测试用的。另一个就是真正干活的 ZkPartitionStateMachine,我们这里直接去看它是如何实现 handleStateChanges() 方法的:

class ZkPartitionStateMachine(config: KafkaConfig,
                              stateChangeLogger: StateChangeLogger,
                              controllerContext: ControllerContext,
                              zkClient: KafkaZkClient,
                              controllerBrokerRequestBatch: ControllerBrokerRequestBatch)
  extends PartitionStateMachine(controllerContext) {

  private val controllerId = config.brokerId
  this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "

  /**
   * 尝试将分区的状态设置为 targetState, 如果过程中需要重新为分区选举 leader,则
   * 使用传入的 partitionLeaderElectionStrategyOpt 策略
   */
  override def handleStateChanges(
    partitions: Seq[TopicPartition],
    targetState: PartitionState,
    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
    if (partitions.nonEmpty) {
      try {

        // 确保缓存中没有未发送的请求,如果有的话会抛出异常
        controllerBrokerRequestBatch.newBatch()

        // 执行状态转换
        // 状态转换过程中,如果有以下三类请求需要发送,
        // 1. LeaderAndIsr
        // 2. UpdateMetadata
        // 3. StopReplica
        // 会先暂存起来,然后通过下文的 sendRequestsToBrokers 方法发送给其他 broker
        val result = doHandleStateChanges(
          partitions,
          targetState,
          partitionLeaderElectionStrategyOpt
        )

        // 将上文暂存的请求发送给 broker
        controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)

        result
      } catch {
        case e: ControllerMovedException =>
          // 不再是 controller 了,无权更新状态机
          error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
          throw e
        case e: Throwable =>
          ......
      }
    } else {
      Map.empty
    }
  }

源码中,为 PartitionState 定义了四种状态,并限制了流转过程中的 previous state:

sealed trait PartitionState {
  ......
  def validPreviousStates: Set[PartitionState]
}

case object NewPartition extends PartitionState {
  ......
  val validPreviousStates: Set[PartitionState] = Set(NonExistentPartition)
}

case object OnlinePartition extends PartitionState {
  ......
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

case object OfflinePartition extends PartitionState {
  ......
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

case object NonExistentPartition extends PartitionState {
  ......
  val validPreviousStates: Set[PartitionState] = Set(OfflinePartition)
}

他们之间的流转关系如下图所示:

参照这张图,我们再来看 doHandleStateChanges() 的具体实现:

  private def doHandleStateChanges(
    partitions: Seq[TopicPartition],
    targetState: PartitionState,
    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
    val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
    val traceEnabled = stateChangeLog.isTraceEnabled

    // 新分区的状态初始化为 NonExistentPartition
    partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))

    // 状态机之间有严格的流转关系,这里过略掉非法的状态流转
    val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
    invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))

    targetState match {
      case NewPartition =>
        validPartitions.foreach { partition =>
          stateChangeLog.info(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
            s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
          // 直接设置为 NewPartition
          controllerContext.putPartitionState(partition, NewPartition)
        }
        Map.empty
      case OnlinePartition =>

        // 从 ctx 中找出状态是 NewPartition 的分区
        val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)

        // 从 ctx 中找出状态是 OnlinePartition 或 OfflinePartition 的分区
        val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)

        if (uninitializedPartitions.nonEmpty) {
          // 新创建的分区,在 zk 的 /brokers/topics/<topic>/partitions/<partition>
          // 目录下写入 leader 和 isr 信息
          val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
          successfulInitializations.foreach { partition =>
            stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
              s"${controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr}")
            // 写入成功,更新状态为 OnlinePartition
            controllerContext.putPartitionState(partition, OnlinePartition)
          }
        }

        // 从上文 partitionsToElectLeader 的来源可知,这一步有两种可能:
        // 1. 从 OfflinePartition 到 OnlinePartition
        // 2. 从 OnlinePartition  到 OnlinePartition
        if (partitionsToElectLeader.nonEmpty) {
          val electionResults = electLeaderForPartitions(
            partitionsToElectLeader,
            partitionLeaderElectionStrategyOpt.getOrElse(
              throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
            )
          )

          // 选举成功,更新 ctx 信息,记录日志
          electionResults.foreach {
            case (partition, Right(leaderAndIsr)) =>
              stateChangeLog.info(
                s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
              )
              controllerContext.putPartitionState(partition, OnlinePartition)
            case (_, Left(_)) => // Ignore; no need to update partition state on election error
          }

          electionResults
        } else {
          Map.empty
        }
      case OfflinePartition =>
        // 直接更新 ctx
        validPartitions.foreach { partition =>
          if (traceEnabled)
            stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
          controllerContext.putPartitionState(partition, OfflinePartition)
        }
        Map.empty
      case NonExistentPartition =>
        // 直接更新 ctx
        validPartitions.foreach { partition =>
          if (traceEnabled)
            stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
          controllerContext.putPartitionState(partition, NonExistentPartition)
        }
        Map.empty
    }
  }

3、 创建新分区

  private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
    val successfulInitializations = mutable.Buffer.empty[TopicPartition]

    // 找到 replica
    val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))

    // 找到存活的 replica
    val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
        val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
        partition -> liveReplicasForPartition
    }
    val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }

    // 对于 replica 全挂的分区,记录日志
    partitionsWithoutLiveReplicas.foreach { case (partition, replicas) =>
      val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +
        s"partition $partition from New to Online, assigned replicas are " +
        s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
        "replica is alive."
      logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))
    }

    // 对于有 replica 存活的分区
    val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>

      // 将 liveReplicas 中第一个设置为 leader,其他的设置为 isr
      val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
      partition -> leaderIsrAndControllerEpoch
    }.toMap

    val createResponses = try {
      // 在 zk 上创建目录 /brokers/topics/<topic>/partitions
      // 在 zk 上创建节点 /brokers/topics/<topic>/partitions/<partition>
      zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion)
    } catch {
      case e: ControllerMovedException =>
        error("Controller moved to another broker when trying to create the topic partition state znode", e)
        throw e
      case e: Exception =>
        partitionsWithLiveReplicas.foreach { case (partition, _) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
        Seq.empty
    }

    createResponses.foreach { createResponse =>
      val code = createResponse.resultCode
      val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
      val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
      if (code == Code.OK) {
        // 在 zk 上创建成功,更新 ctx 中的 leaderAndIsr 信息
        controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
        // 向 isr 中的成员发送 leaderAndIsr 请求,follower 会开始从 leader 拉取数据
        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
          partition, leaderIsrAndControllerEpoch, controllerContext.partitionFullReplicaAssignment(partition), isNew = true)
        // 记录成功完成初始化的分区
        successfulInitializations += partition
      } else {
        logFailedStateChange(partition, NewPartition, OnlinePartition, code)
      }
    }

    // 返回成功完成初始化的分区
    successfulInitializations
  }

4、 分区选举

调整分区的时候,如果目标状态是 OnlinePartition,原始状态是 OnlinePartition (比如说更新了 LeaderAndIsr) 或者 OfflinePartition(比如说挂了的分区重新上线),则可能需要重新为分区选举 leader:

  /**
   * Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry.
   *
   * 不断尝试为传入的多个分区选举,直到所有分区都成功选出 leader
   *
   * @param partitions The partitions that we're trying to elect leaders for.
   * @param partitionLeaderElectionStrategy The election strategy to use.
   * @return A map of failed and successful elections. The keys are the topic partitions and the corresponding values are
   *         either the exception that was thrown or new leader & ISR.
   */
  private def electLeaderForPartitions(
    partitions: Seq[TopicPartition],
    partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
    var remaining = partitions
    val finishedElections = mutable.Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]]

    while (remaining.nonEmpty) {
      val (finished, updatesToRetry) = doElectLeaderForPartitions(remaining, partitionLeaderElectionStrategy)

      // updatesToRetry 指成功选出了 leader,但是更新 zk 失败的分区
      remaining = updatesToRetry

      // finished 中包含两部分,
      // 一是成功选举且更新 zk 成功的分区,
      // 二是找不到可用 replica 而导致选举失败的(这又分为两种情况,要么是 replcia 都挂了, 要么是 isr 中的 replica 都挂了且不支持 unclean 选举)
      //   这种情况下就没有重试的必要了,等到有 replica 可用的时候,会重新触发选举的。
      finished.foreach {
        case (partition, Left(e)) =>
          logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
        case (_, Right(_)) => // Ignore; success so no need to log failed state change
      }

      finishedElections ++= finished

      if (remaining.nonEmpty)
        logger.info(s"Retrying leader election with strategy $partitionLeaderElectionStrategy for partitions $remaining")
    }

    finishedElections.toMap
  }

具体的选举操作如下:


  private def doElectLeaderForPartitions(
    partitions: Seq[TopicPartition],
    partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
  ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {

    // 从 zk 的 /brokers/topics/<topic>/partitions/<partition>/state
    // 节点查询分区信息
    val getDataResponses = try {
      zkClient.getTopicPartitionStatesRaw(partitions)
    } catch {
      case e: Exception =>
        return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
    }
    val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
    val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]

    getDataResponses.foreach { getDataResponse =>
      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]

      // 查询 ctx 中当前的分区状态
      val currState = partitionState(partition)

      if (getDataResponse.resultCode == Code.OK) {
        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
          case Some(leaderIsrAndControllerEpoch) =>
            if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
              // 说明当前的 controller 已经被夺权了
              val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
                s"already written by another controller. This probably means that the current controller $controllerId went through " +
                s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
              failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
            } else {
              // 将 zk 上真实有效的 leaderAndIsr 信息存起来,以备后续使用
              validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
            }

          case None =>
            val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
            failedElections.put(partition, Left(exception))
        }

      } else if (getDataResponse.resultCode == Code.NONODE) {
        val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
        failedElections.put(partition, Left(exception))
      } else {
        failedElections.put(partition, Left(getDataResponse.resultException.get))
      }
    }

    // zk 上找不到对应的节点,直接退出
    if (validLeaderAndIsrs.isEmpty) {
      return (failedElections.toMap, Seq.empty)
    }

    // 选举
    val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {

      case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
        // 因为 Leader 副本下线而引发的分区 Leader 选举

        // 找出待选举的分区信息,以及他们是否支持 unclean leader election
        // 1. 检查副本状态:根据每个分区的ISR检查是否有在线副本。
        // 2. 判断是否允许不健康选举:
        //    如果配置允许不健康选举(allowUnclean为true),直接返回允许选举。
        //    如果配置不允许,查询 zk 获取该主题的独立配置,判断是否允许不健康选举。
        // 3. 返回结果:最终返回每个分区的 isr 以及是否允许 unclean 选举
        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
          validLeaderAndIsrs,
          allowUnclean
        )

        // 选出新 leader,返回新的 leaderAndIsr 信息
        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)

      case ReassignPartitionLeaderElectionStrategy =>

        // 因为执行分区副本重分配操作而引发的分区 Leader 选举
        leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)

      case PreferredReplicaPartitionLeaderElectionStrategy =>

        // 因为执行 Preferred 副本 Leader选举而引发的分区 Leader 选举
        leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)

      case ControlledShutdownPartitionLeaderElectionStrategy =>

        // 因为正常关闭 Broker 而引发的分区 Leader 选举
        leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)

    }

    // 记录没有选举成功的 partition
    partitionsWithoutLeaders.foreach { electionResult =>
      val partition = electionResult.topicPartition
      val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
    }

    val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
    val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap

    // 更新 zk 上的 LeaderAndIsr 信息
    val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)

    // 若更新 zk 成功,则更新本地 ctx 中的信息,并通知分区 replica 所在的 broker
    finishedUpdates.forKeyValue { (partition, result) =>
      result.foreach { leaderAndIsr =>
        val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
        // 则更新本地 ctx 中的信息
        controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
        // 通知分区 replica 所在的 broker, 告诉他们去向新的 leader 拉取数据
        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
          leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
      }
    }

    if (isDebugEnabled) {
      updatesToRetry.foreach { partition =>
        debug(s"Controller failed to elect leader for partition $partition. " +
          s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
      }
    }

    (finishedUpdates ++ failedElections, updatesToRetry)
  }

a. collectUncleanLeaderElectionState

  /* For the provided set of topic partition and partition sync state it attempts to determine if unclean
   * leader election should be performed. Unclean election should be performed if there are no live
   * replica which are in sync and unclean leader election is allowed (allowUnclean parameter is true or
   * the topic has been configured to allow unclean election).
   *
   * 根据传入的 partition 及其 leaderAndIsr,判断是否允许 unclean leader election。
   *
   * unclean leader election 是指在 isr 列表为空的情况下, Kafka 选择一个非 isr 副本作为新的 Leader,
   * 此时存在丢失数据的风险,需要配置文件中的 unclean.leader.election.enable 为 true
   * 或者 topic 被配置为允许 unclean election 时才会执行。
   *
   * @param leaderIsrAndControllerEpochs set of partition to determine if unclean leader election should be
   *                                     allowed
   * @param allowUnclean whether to allow unclean election without having to read the topic configuration
   * @return a sequence of three element tuple:
   *         1. topic partition
   *         2. leader, isr and controller epoc. Some means election should be performed
   *         3. allow unclean
   */
  private def collectUncleanLeaderElectionState(
    leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)],
    allowUnclean: Boolean
  ): Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)] = {

    // 根据 isr 中是否有 online replica 分为两组
    val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderAndIsrs.partition {
      case (partition, leaderAndIsr) =>
        val liveInSyncReplicas = leaderAndIsr.isr.filter(controllerContext.isReplicaOnline(_, partition))
        liveInSyncReplicas.isEmpty
    }

    // 如果配置文件中
    // 支持  unclean 选举,则直接返回 (partition, isr, true)
    // 不支持 unclean 选举,则去 zk 查询每个 topic 的独立配置,看是否支持 unclean 选举
    val electionForPartitionWithoutLiveReplicas = if (allowUnclean) {
      // 如果允许 unclean 选举,则直接返回这部分数据
      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
        (partition, Option(leaderAndIsr), true)
      }
    } else {

      // 查询 zk 上 /config/topics/{topic} 配置信息
      val (logConfigs, failed) = zkClient.getLogConfigs(
        partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet,
        config.originals()
      )

      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
        if (failed.contains(partition.topic)) {
          // 如果没查到配置信息,则不允许 unclean 选举
          logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
          (partition, None, false)
        } else {
          // 如果查到了配置信息,则以配置信息为准
          (
            partition,
            Option(leaderAndIsr),
            logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()
          )
        }
      }
    }

    // 将支持 unclean 选举的分区信息,与符合正常选举条件的分区信息合并起来
    electionForPartitionWithoutLiveReplicas ++
    partitionsWithLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
      (partition, Option(leaderAndIsr), false)
    }
  }

b. 分区选举核心逻辑

  private def leaderForOffline(partition: TopicPartition,
                               leaderAndIsrOpt: Option[LeaderAndIsr],
                               uncleanLeaderElectionEnabled: Boolean,
                               controllerContext: ControllerContext): ElectionResult = {

    // 从 ctx 中找到分区的 replica 信息
    val assignment = controllerContext.partitionReplicaAssignment(partition)

    // 再筛出处于 OnlineReplica 状态的 replica
    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))

    leaderAndIsrOpt match {
      case Some(leaderAndIsr) =>
        val isr = leaderAndIsr.isr

        // 选举
        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(
          assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)

        val newLeaderAndIsrOpt = leaderOpt.map { leader =>
          // 如果新 leader 就在 isr 中,则继续使用原来的 isr,保留 OnlineReplica 的即可
          // 如果新 leader 不再 isr 中,则说明选出了新的 leader,则原来的 isr 不可用了,
          //    新的 isr 中只有一个 leader,需要其他的 replica 的数据同步之后才能加入 isr
          val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
          else List(leader)

          // 更新 leaderAndIsr 信息
          leaderAndIsr.newLeaderAndIsr(leader, newIsr)
        }
        ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)

      case None =>
        ElectionResult(partition, None, liveReplicas)
    }
  }

这段代码中有一个 PartitionLeaderElectionAlgorithms,我们看看它到底是啥东西:

object PartitionLeaderElectionAlgorithms {

  /**
   * 因为 Leader 副本下线而引发的分区 Leader 选举
   *
   * @param assignment ctx 中保存的 replica assignment 信息
   * @param isr zk 上保存的 isr 信息
   * @param liveReplicas ctx 中处于 OnlineReplica 状态的 replica id
   * @param uncleanLeaderElectionEnabled 是否允许 unclean 选举
   * @param controllerContext ctx
   * @return
   */
  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
    // 找出
    // 既处于 zk 的 isr 中,又处于 OnlineReplica 状态的 replica
    // 或者
    // 如果允许 unclean 选举,则忽略 isr,直接从 OnlineReplica 状态的 replica 中找
    assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
      if (uncleanLeaderElectionEnabled) {
        val leaderOpt = assignment.find(liveReplicas.contains)
        if (leaderOpt.isDefined)
          controllerContext.stats.uncleanLeaderElectionRate.mark()
        leaderOpt
      } else {
        None
      }
    }
  }

  // 因为执行分区副本重分配操作而引发的分区 Leader 选举
  def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
    // 既处于 zk 的 isr 中,又处于 OnlineReplica 状态的 replica
    reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
  }

  // 因为执行 Preferred 副本 Leader选举而引发的分区 Leader 选举
  def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
    // 只看 assignment 中的第一个元素,
    // 如果既处于 zk 的 isr 中,又处于 OnlineReplica 状态的 replica,则返回
    assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
  }

  // 因为正常关闭 Broker 而引发的分区 Leader 选举
  def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
    assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
  }
}

ok 原来这才是真正选 leader 的代码

三、onTopicDeletion()

有了对 PartitionStateMachine 的基本认识之后,现在我们再来看一看 onTopicDeletion 方法:

  private def onTopicDeletion(topics: Set[String]): Unit = {

    // 过滤掉已经进入删除流程的 topic,留下将要删除的 topic
    val unseenTopicsForDeletion = topics.diff(controllerContext.topicsWithDeletionStarted)

    // 首次执行删除,将 topic 添加到 topicsWithDeletionStarted 列表中
    // 表示开始删除
    if (unseenTopicsForDeletion.nonEmpty) {
      val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)

      // 由于状态机严格限制 partition 状态之间的流转关系,
      // 所以这里先将 partition 标记为 offline 状态,再标记为 NonExistent 状态
      partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, OfflinePartition)
      partitionStateMachine.handleStateChanges(unseenPartitionsForDeletion.toSeq, NonExistentPartition)

      // adding of unseenTopicsForDeletion to topics with deletion started must be done after the partition
      // state changes to make sure the offlinePartitionCount metric is properly updated
      //
      // 一定要先更新 partition 的状态
      // 再去把还没开始删除的 topic 列表加入到 topicsWithDeletionStarted 中;
      // 否则会影响到 offlinePartitionCount 这个 metric 的值
      //
      // 将 topic 加入到 topicsWithDeletionStarted 列表中
      controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
    }

    // send update metadata so that brokers stop serving data for topics to be deleted
    // 发送更新元数据的请求
    client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))

    // 删除 partition
    onPartitionDeletion(topics)
  }

要删 topic 就得删除 partition,要删 partition 就得删除 replica,这里的 onPartitionDeletion() 方法,实际上是去更新 Replica State Machine 的。

四、副本状态机

和分区状态机 ParititionStateMachine 一样,当 controller 选举成功之后,就会调用 stateMachine 的 startup 方法,来启动副本状态机 ReplicaStateMachine

1、ReplicaStateMachine.startup()

下面是 ReplicaStateMachine 的定义以及 startup 方法:

abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
  /**
   * Invoked on successful controller election.
   */
  def startup(): Unit = {

    // 对于每一个 replica,
    // 如果所在的 broker 可以正常工作,则标记为 OnlineReplica
    // 如果所在的 broker 挂了,则标记为暂不允许删除 ReplicaDeletionIneligible
    info("Initializing replica state")
    initializeReplicaState()

    // 读取 ctx 中的 partitionAssignments 信息,将所有的 replica 分为两组,
    // 所在 broker 正常工作的放入 onlineReplicas
    // 所在 broker 无法正常工作的放入 offlineReplicas
    val (onlineReplicas, offlineReplicas) = controllerContext.onlineAndOfflineReplicas

    // 流转状态机
    info("Triggering online replica state changes")
    handleStateChanges(onlineReplicas.toSeq, OnlineReplica)

    // 流转状态机
    info("Triggering offline replica state changes")
    handleStateChanges(offlineReplicas.toSeq, OfflineReplica)

    debug(s"Started replica state machine with initial state -> ${controllerContext.replicaStates}")
  }

  ......

  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit
}

2、状态机流转

和 PartitionStateMachine 类似,ReplicaStateMachine 也是一个抽象类,它同样有两个子类,一个叫 MockReplicaStateMachine,看名字就知道是测试用的。另一个就是真正干活的 ZkReplicaStateMachine,我们这里直接去看它是如何实现 handleStateChanges() 方法的:

class ZkReplicaStateMachine(config: KafkaConfig,
                            stateChangeLogger: StateChangeLogger,
                            controllerContext: ControllerContext,
                            zkClient: KafkaZkClient,
                            controllerBrokerRequestBatch: ControllerBrokerRequestBatch)
  extends ReplicaStateMachine(controllerContext) with Logging {

  private val controllerId = config.brokerId
  this.logIdent = s"[ReplicaStateMachine controllerId=$controllerId] "

  override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
    if (replicas.nonEmpty) {
      try {
        // 确保缓存中没有未发送的请求,如果有的话会抛出异常
        controllerBrokerRequestBatch.newBatch()

        // 进行状态转换。
        // 注意这里是根据 replicaId 分组执行的,
        // 即一次将一个 broker 的请求都处理完,避免多次发送请求。
        //
        // 状态转换过程后,如果有以下三类请求需要发送,
        // 1. LeaderAndIsr
        // 2. UpdateMetadata
        // 3. StopReplica
        // 会先暂存起来,然后通过下文的 sendRequestsToBrokers 方法发送给其他 broker
        replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) =>
          doHandleStateChanges(replicaId, replicas, targetState)
        }

        // 将上文暂存的请求发送给 broker
        controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)

      } catch {
        case e: ControllerMovedException =>
          error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
          throw e
        case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
      }
    }
  }
  ......
}

从上面的代码可以看到,为了提高通信效率,kafka 都是依次处理完所有的 replica,然后将需要发送的请求收集起来,最后统一处理的。

其中的 doHandleStateChanges() 方法,是真正去处理状态机的流转的,源码中,为 ReplicaState 定义了七种状态:

NewReplica:副本被创建之后所处的状态
OnlineReplica:副本正常运行
OfflineReplica:副本下线
ReplicaDeletionStarted:副本删除任务已启动
ReplicaDeletionSuccessful:副本删除成功
ReplicaDeletionIneligible:副本暂时无法被删除(所在的 broker 宕机,或者无法正确响应 LeaderAndIsr 请求)
NonExistentReplica:副本从副本状态机被移除前所处的状态

他们之间的流转关系如图所示:

参照这张图,我们再来看 doHandleStateChanges() 的具体实现:

  private def doHandleStateChanges(replicaId: Int, replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = {
    val stateLogger = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
    val traceEnabled = stateLogger.isTraceEnabled

    // 新加入的 replica,其状态都会被设置为 NonExistentReplica,然后再开始流转
    replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))

    // 状态机之间有严格的流转关系,这里过略掉非法的状态流转
    val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState)
    invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))

    // 根据目标状态,流转状态,执行具体操作
    targetState match {
      case NewReplica =>
        validReplicas.foreach { replica =>
          val partition = replica.topicPartition
          val currentState = controllerContext.replicaState(replica)

          // 尝试从元数据缓存中获取分区信息,包括 Leader 信息、ISR 都有哪些副本等
          controllerContext.partitionLeadershipInfo(partition) match {

            // 如果成功拿到分区数据信息
            case Some(leaderIsrAndControllerEpoch) =>

              // 如果该副本是 Leader
              if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
                // 记录错误日志。Leader副本不能被设置成 NewReplica 状态
                val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
                logFailedStateChange(replica, currentState, OfflineReplica, exception)
              } else {
                // 如果该副本不是 Leader,
                // 给该副本所在的 Broker 发送 LeaderAndIsrRequest
                // 然后给集群当前所有 Broker 发送 UpdateMetadataRequest 通知它们该分区数据发生变更
                controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
                  replica.topicPartition,
                  leaderIsrAndControllerEpoch,
                  controllerContext.partitionFullReplicaAssignment(replica.topicPartition),
                  isNew = true)
                if (traceEnabled)
                  logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)

                // 更新本地缓存
                controllerContext.putReplicaState(replica, NewReplica)
              }
            case None =>
              if (traceEnabled)
                logSuccessfulTransition(stateLogger, replicaId, partition, currentState, NewReplica)
              // 如果找不到 leader 信息,则仅更新元数据缓存的状态为 NewReplica
              controllerContext.putReplicaState(replica, NewReplica)
          }
        }
      case OnlineReplica =>
        validReplicas.foreach { replica =>
          val partition = replica.topicPartition
          val currentState = controllerContext.replicaState(replica)

          currentState match {
            case NewReplica =>
              val assignment = controllerContext.partitionFullReplicaAssignment(partition)

              // 如果 replica 不在 assignment cache 中,则添加到 assignment cache 中
              if (!assignment.replicas.contains(replicaId)) {
                error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
                val newAssignment = assignment.copy(replicas = assignment.replicas :+ replicaId)
                controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
              }

              // 在变为 NewReplica 状态的时候,已经通知了其他 broker,这里不需要再通知了

            case _ =>
              controllerContext.partitionLeadershipInfo(partition) match {
                case Some(leaderIsrAndControllerEpoch) =>
                  // 发送 LeaderAndIsr 请求给对应的 broker 来更新 replica 信息
                  // 发送 UpdateMetadata 请求给所有 broker
                  controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
                    replica.topicPartition,
                    leaderIsrAndControllerEpoch,
                    controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
                case None =>
              }
          }
          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OnlineReplica)
          controllerContext.putReplicaState(replica, OnlineReplica)
        }
      case OfflineReplica =>

        // 这里的 validReplicas 类型是 PartitionAndReplica

        // 准备发送 StopReplica 请求,注意这里不是直接发送,只是将请求先暂存起来
        // 等状态机处理完后,再统一发送。
        // (broker 在收到 StopReplica 请求后,会从 ctx 中移除 replica 的信息,并停止拉取数据)
        validReplicas.foreach { replica =>
          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
        }

        // 查询 ctx 中的分区 leader 信息
        val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
          controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
        }

        // 对于 ctx 中有 leader 信息的分区,将 replica 从 ISR 中移除
        // 更新 ctx,同时更新到 zk 的节点:
        // /brokers/topics/[topic_name]/partitions/[partition_id]/state
        // 这个节点里的数据长这样:
        // {
        //    "controller_epoch": 9,
        //    "leader": 0,
        //    "version": 1,
        //    "leader_epoch": 1,
        //    "isr": [0, 1]
        // }
        // 下文会详细阅读这个方法的代码。
        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))

        updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
          stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")

          // 如果不是待删除的 topic,需要发送 LeaderAndIsr 请求给其余 broker
          if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
            val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
            controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
              partition,
              leaderIsrAndControllerEpoch,
              controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
          }

          // 记录日志
          val replica = PartitionAndReplica(partition, replicaId)
          val currentState = controllerContext.replicaState(replica)
          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, partition, currentState, OfflineReplica)

          // 最终设置为 offline
          controllerContext.putReplicaState(replica, OfflineReplica)
        }

        // 对于 ctx 中找不到 leader 信息的分区,记录日志并设置为 offline
        replicasWithoutLeadershipInfo.foreach { replica =>
          val currentState = controllerContext.replicaState(replica)
          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, OfflineReplica)

          // 由于找不到 leader 信息,所以这个方法里仅记录日志,什么都不做
          // "Leader not yet assigned for partition $partition
          //  Skip sending UpdateMetadataRequest."
          controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(replica.topicPartition))

          // 最终设置为 offline
          controllerContext.putReplicaState(replica, OfflineReplica)
        }

      case ReplicaDeletionStarted =>
        validReplicas.foreach { replica =>
          val currentState = controllerContext.replicaState(replica)
          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted)

          // 设置为 ReplicaDeletionStarted
          controllerContext.putReplicaState(replica, ReplicaDeletionStarted)

          // enqueue stopReplica 请求
          // 注意这里 deletePartition = true,所以
          // broker 在收到 StopReplica 请求后,会停止拉取数据,
          // 还会将对应的分区(副本)目录加上 -delete 后缀,最终会删除掉整个 log 目录
          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = true)

          // 这里有一个疑问,发送了StopReplica,为什么不发送 UpdateMetadata 请求更新元数据呢?
          // 我理解是,ReplicaDeletionStarted 状态,是由 OfflineReplica 状态流转过来的,
          // 在 OfflineReplica 状态下,已经发送了 UpdateMetadata 更新过元数据了,
          // 上面这段代码只是做状态的流转,并没有其他变更,所以不需要再发送 UpdateMetadata 请求
        }
      case ReplicaDeletionIneligible =>
        // 单纯做标记
        validReplicas.foreach { replica =>
          val currentState = controllerContext.replicaState(replica)
          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionIneligible)
          controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
        }
      case ReplicaDeletionSuccessful =>
        // 单纯做标记
        validReplicas.foreach { replica =>
          val currentState = controllerContext.replicaState(replica)
          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, ReplicaDeletionSuccessful)
          controllerContext.putReplicaState(replica, ReplicaDeletionSuccessful)
        }
      case NonExistentReplica =>

        // 这里的 replica 类型是 PartitionAndReplica
        validReplicas.foreach { replica =>

          // 查出 PartitionAndReplica 当前 state
          val currentState = controllerContext.replicaState(replica)

          // 从 ctx 中删除当前 replica
          val newAssignedReplicas = controllerContext
            // 根据 topic 和 partition,查出所有的 ReplicaAssignment 列表
            .partitionFullReplicaAssignment(replica.topicPartition)
            // 删除当前 replicaId
            .removeReplica(replica.replica)
          controllerContext.updatePartitionFullReplicaAssignment(replica.topicPartition, newAssignedReplicas)

          if (traceEnabled)
            logSuccessfulTransition(stateLogger, replicaId, replica.topicPartition, currentState, NonExistentReplica)

          // 从状态机列表中删除当前 replica 的状态机,它的生命周期就结束了
          controllerContext.removeReplicaState(replica)
        }
    }
  }

a. 移出 ISR

下面再来看看 kafka 是如何从 ISR 中移除 replica 的

  private def removeReplicasFromIsr(
    replicaId: Int,
    partitions: Seq[TopicPartition]
  ): Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
    var results = Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
    var remaining = partitions
    while (remaining.nonEmpty) {
      val (finishedRemoval, removalsToRetry) = doRemoveReplicasFromIsr(replicaId, remaining)
      remaining = removalsToRetry

      finishedRemoval.foreach {
        // 移出 isr 过程中出现异常,则记录日志
        case (partition, Left(e)) =>
            val replica = PartitionAndReplica(partition, replicaId)
            val currentState = controllerContext.replicaState(replica)
            logFailedStateChange(replica, currentState, OfflineReplica, e)

        // 成功移出 isr,记录结果
        case (partition, Right(leaderIsrAndEpoch)) =>
          results += partition -> leaderIsrAndEpoch
      }
    }
    results
  }

  /**
   * Try to remove a replica from the isr of multiple partitions.
   * Removing a replica from isr updates partition state in zookeeper.
   *
   * 尝试从多个分区的 ISR(同步副本集合)中移除一个副本。
   * 从 ISR 中移除副本会更新 Zookeeper 中的分区状态。
   *
   * @param replicaId The replica being removed from isr of multiple partitions
   * 要从多个分区的 ISR 中移除的副本 ID
   *
   * @param partitions The partitions from which we're trying to remove the replica from isr
   * 需要从这些分区的 ISR 中移除副本
   *
   *@return A tuple of two elements:
   *         1. The updated Right[LeaderIsrAndControllerEpochs] of all partitions for which we successfully
   *         removed the replica from isr. Or Left[Exception] corresponding to failed removals that should
   *         not be retried
   *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
   *         the partition leader updated partition state while the controller attempted to update partition state.
   *
   *一个包含两个元素的元组:
   *         1. 一个映射,包含成功从 ISR 中移除副本的所有分区的新状态:
   *            - 如果成功,返回 Right[LeaderIsrAndControllerEpochs]。
   *            - 如果失败且不应重试,返回 Left[Exception]。
   *         2. 由于 Zookeeper 的 BADVERSION 冲突而需要重试的分区列表。
   *            版本冲突可能发生在 controller 尝试更新分区状态的同时,分区的 Leader 也更新了分区状态。
   */
  private def doRemoveReplicasFromIsr(
    replicaId: Int,
    partitions: Seq[TopicPartition]
  ): (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {

    // 查询 /brokers/topics/[topic_name]/partitions/[partition_id]/state 下的数据
    // 数据长这样:
    // {
    //    "controller_epoch": 9,
    //    "leader": 0,
    //    "version": 1,
    //    "leader_epoch": 1,
    //    "isr": [0, 1]
    // }
    //
    // 如果在 zk 上找不到 node,就把 partition 放入 partitionsWithNoLeaderAndIsrInZk 中
    val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk) = getTopicPartitionStatesFromZk(partitions)

    // 找出待移除的条目 (即 isr 中包含 replicaId 的条目)
    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (_, result) =>
      result.map { leaderAndIsr =>
        leaderAndIsr.isr.contains(replicaId)
      }.getOrElse(false)
    }

    // 调整 leader 和 ISR
    val adjustedLeaderAndIsrs: Map[TopicPartition, LeaderAndIsr] = leaderAndIsrsWithReplica.flatMap {
      case (partition, result) =>
        result.toOption.map { leaderAndIsr =>
          // 确定 leader。如果待移除的 replicaId 是 leader,则将 leader_id 设置为 -1
          val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
          // 确定 isr。如果 isr 中只有一个元素,而且是 replicaId,则不变。否则,从查询出的 isr 中移除 replicaId
          val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
          partition -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
        }
    }

    // 更新 zk 上的数据
    // /brokers/topics/[topic_name]/partitions/[partition_id]/state
    val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) = zkClient.updateLeaderAndIsr(
      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)

    // zk 上找不到 node 并且 partition 不在删除队列中,找出这部分数据
    val exceptionsForPartitionsWithNoLeaderAndIsrInZk: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] =
      partitionsWithNoLeaderAndIsrInZk.iterator.flatMap { partition =>
        if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
          val exception = new StateChangeFailedException(
            s"Failed to change state of replica $replicaId for partition $partition since the leader and isr " +
            "path in zookeeper is empty"
          )
          Option(partition -> Left(exception))
        } else None
      }.toMap

    // 收集: 不需要做删除操作的 ++ 执行完删除操作的
    val leaderIsrAndControllerEpochs: Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]] = {

      // 更新 ctx 中的 LeaderAndIsr 信息
      (leaderAndIsrsWithoutReplica ++ finishedPartitions).map { case (partition, result) =>
        (partition, result.map { leaderAndIsr =>
          val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
          controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
          leaderIsrAndControllerEpoch
        })
      }
    }

    if (isDebugEnabled) {
      updatesToRetry.foreach { partition =>
        debug(s"Controller failed to remove replica $replicaId from ISR of partition $partition. " +
          s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
      }
    }

    // 返回结果
    // Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]]
    // 对于执行成功的, value 是 LeaderIsrAndControllerEpoch
    // 对于没成功的(不属于删除队列,且在 zk 上不存在), value 是 Exception
    (leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk, updatesToRetry)
  }

五、onPartitionDeletion()

了解了 Replica State Machine 的基本工作流程,我们再来看 onPartitionDeletion 方法:

  /**
   * Invoked by onTopicDeletion with the list of partitions for topics to be deleted
   * It does the following -
   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
   *    for deletion if some replicas are dead since it won't complete successfully anyway
   * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
   *    and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
   *    it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
   * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
   *    will delete all persistent data from all replicas of the respective partitions
   *
   * 1. 把宕机的 replica 设置为 ReplicaDeletionIneligible 状态。
   *    其他相关的 topics 也都设置为该状态。因为 broker 挂了,这个请求无法完成。
   * 2. 将分区的所有 replica 设置为 OfflineReplica。
   *    向其他 replica 发送 StopReplicaRequest 请求
   *    并向 leader 副本发送 LeaderAndIsrRequest 请求,参数中携带了发生减员的 ISR。
   *    当 leader 副本被设置为 OfflineReplica 时,不会发送 LeaderAndIsrRequest,
   *    因为 leader 已经不存在了(被更新为-1)。
   * 3. 将所有的 replica 加入 ReplicaDeletionStarted 列表。
   *    发送 StopReplicaRequest(deletePartition=true) 给 replica,
   *    然后从磁盘中删除数据
   *
   */
  private def onPartitionDeletion(topicsToBeDeleted: Set[String]): Unit = {
    val allDeadReplicas = mutable.ListBuffer.empty[PartitionAndReplica]
    val allReplicasForDeletionRetry = mutable.ListBuffer.empty[PartitionAndReplica]
    val allTopicsIneligibleForDeletion = mutable.Set.empty[String]

    topicsToBeDeleted.foreach { topic =>

      // 此处的 deadReplicas 指
      // 在 shuttingDownBrokerIds 中的 broker (收到关机请求 ApiKeys.CONTROLLED_SHUTDOWN 的 broker)
      // 或者 无法正常响应 LeaderAndIsr 请求的 broker
      val (aliveReplicas, deadReplicas) = controllerContext.replicasForTopic(topic).partition { r =>
        controllerContext.isReplicaOnline(r.replica, r.topicPartition)
      }

      val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
      val replicasForDeletionRetry = aliveReplicas.diff(successfullyDeletedReplicas)

      // 统计宕机/关机的 broker,设置为 ReplicaDeletionIneligible
      allDeadReplicas ++= deadReplicas

      // 统计尚未删除的 replica,执行删除任务
      allReplicasForDeletionRetry ++= replicasForDeletionRetry

      // 大原则,关机/挂了的 broker 不能删,因为它们处理不了请求
      if (deadReplicas.nonEmpty) {
        debug(s"Dead Replicas (${deadReplicas.mkString(",")}) found for topic $topic")
        allTopicsIneligibleForDeletion += topic
      }
    }

    // 统计宕机/关机的 broker,设置为 ReplicaDeletionIneligible
    replicaStateMachine.handleStateChanges(allDeadReplicas, ReplicaDeletionIneligible)

    // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
    //
    // 发送 stopReplica(deletion=false),发送更新 ISR 请求,发送 updateMetadata 请求
    replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, OfflineReplica)
    // 发送 stopReplica(deletion=true),broker 在收到请求后,会将整个目录加上 -delete 后缀
    replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, ReplicaDeletionStarted)

    if (allTopicsIneligibleForDeletion.nonEmpty) {
      markTopicIneligibleForDeletion(allTopicsIneligibleForDeletion, reason = "offline replicas")
    }
  }

至此,这个 topic 相关的概念,就完全从 kafka 里移除了。

但其实还留有最后一个尾巴,就是 kafka 并没有立刻删除所有的 log 文件,而是给 topic 对应的 log 目录加上了 -deleted 后缀;Kafka 的 LogManager 组件负责定期检查并删除这些带 -deleted 后缀的目录(执行间隔由 log.retention.check.interval.ms(默认 5 分钟)控制),到这一步,topic 就完全从世界上消失了。

参考

Kafka 核心技术与实战 - 极客时间

标签: kafka
最后更新:2026年2月26日

zt52875287

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >
文章目录
  • 一、删除 topic
    • 1、kafka-topics.sh 脚本
    • 2、TopicDeletionManager 类的定义及初始化
  • 二、分区状态机
    • 1、PartitionStateMachine.startup()
    • 2、状态机流转
    • 3、 创建新分区
    • 4、 分区选举
      • a. collectUncleanLeaderElectionState
      • b. 分区选举核心逻辑
  • 三、onTopicDeletion()
  • 四、副本状态机
    • 1、ReplicaStateMachine.startup()
    • 2、状态机流转
      • a. 移出 ISR
  • 五、onPartitionDeletion()
  • 参考

Copyright © by zt52875287@gmail.com All Rights Reserved.

Theme Kratos Made By Seaton Jiang

陕ICP备2021009385号-1