开发技术分享

  • 文章分类
    • 技术
    • 工作
    • 相册
    • 杂谈
    • 未分类
  • 工具
    • AI 试衣
靡不有初,鲜克有终
[换一句]
  1. 首页
  2. 技术
  3. 正文

Kafka源码·八- Follower 拉取消息

2025年5月26日 729点热度

一、从 LeaderAndIsr 请求谈起

这个请求的作用是:通知集群中的节点,某个 Broker 成为某个分区的 Leader 或 Follower,以及 该分区最新的的副本状态信息。

1. 发送时机

controller 通常会在以下场景发送 LeaderAndIsrRequest:

  1. 分区 Leader 选举或切换时:比如原 leader 宕机,controller 会从 ISR(In-Sync Replica)中选出新的 leader,并通知相关副本(所在的 broker)
  2. 新 Topic 创建后:controller 会为各个分区选择 leader,并将该信息下发到分区副本所在的 broker
  3. Replica 状态变化时:例如重新加入 ISR
  4. 分区重新分配(Reassignment)时

2. 请求参数

LeaderAndIsrRequest 中携带的重要参数有:

字段名 用途
controllerId 当前 controller id
controllerEpoch 用于保证请求的时效性
brokerEpoch 接收方 broker 的 epoch,同样用于保证请求的时效性
partitionStates Map<TopicPartition, LeaderAndIsrPartitionState>,每个分区的状态信息,见下表
liveLeaders 所有相关副本的 broker metadata 信息(包含 host, port 等),用于建立连接

而 LeaderAndIsrPartitionState 包含的字段有:

字段名 用途
leader 新 leader 的 brokerId
leaderEpoch 分区当前 leader 的 epoch,用于版本控制,保证请求的时效性
isr ISR 副本列表
replicas 所有副本的 brokerId 列表
partitionEpoch 分区的版本号,用于幂等控制
topicName, partitionIndex 分区信息

3. 接收并处理请求

当 broker 收到请求之后,首先会验证 brokerEpoch 有效性:

  private def isBrokerEpochStale(brokerEpochInRequest: Long): Boolean = {
    // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown
    // if the controller hasn't been upgraded to use KIP-380
    if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
    else {
      // brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
      // about the new broker epoch and sends a control request with this epoch before the broker learns about it
      //
      // 首先,每个 broker 上都有一个 controller 实例,
      // 但是只有选举成功的那个 broker 上的 controller 实例才会履行职责,管理整个集群。
      //
      // 我理解,这里的 controller.brokerEpoch 是指真正的 controller 分配给当前 broker 的 epoch,
      //
      // 所以,存在下面的情况:
      // 真正的 controller 中某个线程,为当前 broker 确定了更大的 brokerEpoch,
      // 但是还没来得及通知当前 broker(或者说当前 broker 还没有把数据更新到本地的 controller.brokerEpoch 中),
      // 此时,leaderAndIsr 请求中的 brokerEpochInRequest 就会大于controller.brokerEpoch
      // 
      // 这种情况也认为是合法的请求。
      brokerEpochInRequest < controller.brokerEpoch
    }
  }

当 brokerEpochInRequest 合法时,会调用下面的 becomeLeaderOrFollower 方法,尝试更新集群状态:


  def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    val startMs = time.milliseconds()
    replicaStateChangeLock synchronized {
      val controllerId = leaderAndIsrRequest.controllerId
      val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
      ······

      val response = {

        if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
          // 忽略来自旧 controller 的请求
          stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
            s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
            s"Latest known controller epoch is $controllerEpoch")
          leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)

        } else {

          val responseMap = new mutable.HashMap[TopicPartition, Errors]
          controllerEpoch = leaderAndIsrRequest.controllerEpoch

          val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()

          // 如果 partition 不存在,首先创建它
          requestPartitionStates.foreach { partitionState =>

            // 从本地缓存中找出要处理的 partition,如果没找到则新建一个
            val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
            val partitionOpt = getPartition(topicPartition) match {

              case HostedPartition.Offline =>
                // 分区离线,返回 None
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                  "partition is in an offline log directory")
                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
                None

              case HostedPartition.Online(partition) =>
                // 分区存在,返回分区
                Some(partition)

              case HostedPartition.None =>
                // 分区不存在,创建新分区并返回
                val partition = Partition(topicPartition, time, this)
                allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
                Some(partition)
            }

            // Next check partition's leader epoch
            // 校验 leader epoch
            partitionOpt.foreach { partition =>

              // 本地缓存中的 leaderEpoch
              val currentLeaderEpoch = partition.getLeaderEpoch
              // 请求中的 leaderEpoch
              val requestLeaderEpoch = partitionState.leaderEpoch

              if (requestLeaderEpoch > currentLeaderEpoch) {
                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
                // 记录到合法的 partitionStates 中,以便后续处理(makeLeader、makeFollower)
                if (partitionState.replicas.contains(localBrokerId))
                  partitionStates.put(partition, partitionState)
                else {
                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
                }
              } else if (requestLeaderEpoch < currentLeaderEpoch) {
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
                  s"leader epoch $requestLeaderEpoch is smaller than the current " +
                  s"leader epoch $currentLeaderEpoch")
                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
              } else {
                stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition since its associated " +
                  s"leader epoch $requestLeaderEpoch matches the current leader epoch")
                responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
              }
            }
          }

          // 根据请求中的 leader 信息,将 partition 分为两组单独处理
          val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
            partitionState.leader == localBrokerId
          }
          val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }

          val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
          val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty) {

            // 变为 Leader
            makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
              highWatermarkCheckpoints)
          } else
            Set.empty[Partition]
          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) {

            // 变为 follower
            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
              highWatermarkCheckpoints)
          } else
            Set.empty[Partition]

          ······

          // 关闭空闲的 data fetcher
          replicaFetcherManager.shutdownIdleFetcherThreads()
          replicaAlterLogDirsManager.shutdownIdleFetcherThreads()

          /*
           * 如果发生变更的 partition 属于消费者组协调器 __consumer_offsets 或事务协调器 __transaction_state,
           * 则执行下面的方法,来更新元数据及状态:
           * a. groupCoordinator.onElection
           *    1. 从磁盘(Log)中加载这个分区对应的 group 元数据和 offsets
           *    2. 将数据保存在内存里(如 groupMetadataCache)
           *    3. 然后开始处理客户端的 group join、heartbeat、commit offset 等请求
           * b. groupCoordinator.onResignation
           *    1. 清理内存缓存(groupMetadataCache)
           *    2. 停止处理来自客户端的 group 请求
           *    3. 释放相关锁与资源
           * c. txnCoordinator.onElection
           *    1. 从 __transaction_state 的日志文件读取历史事务状态(类似 group metadata 的加载)
           *    2. 将 producerId -> 状态 等信息恢复到内存
           *    3. 初始化协调器状态机,准备处理以下请求:InitProducerId\AddPartitionsToTxn\EndTxn
           *    4. 确保事务语义(如 exactly-once, 幂等)继续保持一致性
           * d. txnCoordinator.onResignation
           *    1. 停止处理事务相关请求(防止双写或冲突)
           *    2. 清理内存中加载的事务状态
           *    3. 保证新 leader 可以独占性地接管协调事务工作
           */
          onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
          ······
        }
      }
      ······
      stateChangeLogger.info(s"Finished LeaderAndIsr request in ${elapsedMs}ms correlationId $correlationId from controller " +
        s"$controllerId for ${requestPartitionStates.size} partitions")
      response
    }
  }

这一节我们重点关注的是 follower 是如何拉取数据的,所以,我们下来看一看 kafka 是如何 makeFollowers 的。

4. makeFollowers:成为 follower

makeFollowers 方法通过以下步骤,使当前 Broker 成为指定分区的 follower:

  1. 从 leader 分区集合中移除这些分区。
  2. 将这些副本标记为 follower,以阻止生产者客户端继续写入数据。
  3. 停止这些分区的 fetcher(拉取线程),以防副本拉取线程继续添加数据。
  4. 截断这些分区的日志,并检查点(checkpoint)其偏移量。
  5. 清除 purgatory(暂存区)中的生产和拉取请求。
  6. 如果 broker 没有在关闭中状态,则添加 fetcher 向新 leader 拉取数据。

  private def makeFollowers(controllerId: Int,
                            controllerEpoch: Int,
                            partitionStates: Map[Partition, LeaderAndIsrPartitionState],
                            correlationId: Int,
                            responseMap: mutable.Map[TopicPartition, Errors],
                            highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
    val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
    partitionStates.forKeyValue { (partition, partitionState) =>
      if (traceLoggingEnabled)
        stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
          s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
          s"${partitionState.leader}")
      responseMap.put(partition.topicPartition, Errors.NONE)
    }

    val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
    try {
      // TODO: Delete leaders from LeaderAndIsrRequest
      partitionStates.forKeyValue { (partition, partitionState) =>
        val newLeaderBrokerId = partitionState.leader
        try {
          // 从 controller 发过来的 metadata 缓存中拿到所有存活的 broker
          // 根据 brokerId 匹配到新的 leader
          metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
            // Only change partition state when the leader is available
            case Some(_) =>
              // 更新 leaderBrokerId、清空 ISR、查询zk上的配置并创建日志目录
              if (partition.makeFollower(partitionState, highWatermarkCheckpoints))
                partitionsToMakeFollower += partition
              else
                stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
                  s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " +
                  s"for partition ${partition.topicPartition} (last update " +
                  s"controller epoch ${partitionState.controllerEpoch}) " +
                  s"since the new leader $newLeaderBrokerId is the same as the old leader")
            case None =>
              // Controller 发送的 leaderAndIsr 告知新 leader 是 newLeaderBrokerId
              // Controller 发送的 metadataCache 中找不到 newLeaderBrokerId
              // 处理不了啦
              //
              // The leader broker should always be present in the metadata cache.
              // If not, we should record the error message and abort the transition process for this partition
              stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
                s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
                s"(last update controller epoch ${partitionState.controllerEpoch}) " +
                s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
              // Create the local replica even if the leader is unavailable. This is required to ensure that we include
              // the partition's high watermark in the checkpoint file (see KAFKA-1647)
              partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false,
                highWatermarkCheckpoints)
          }
        } catch {
          case e: KafkaStorageException =>
            ······
            responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
        }
      }

      // 移除失效的 fetcherThread。
      // 比如说原先是通过 fetcherA 向 brokerA 拉取数据,后来 brokerB 上的 replica 选举成功成为 leader了,
      // 就要向 brokerB 拉取数据,此时需要删掉 fetcherA
      replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
      stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
        s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")

      // 尝试完成延时请求:
      // 在分区重分配(reassignment)期间,一个 broker 可能不再维护某个 partition 的 replica,
      // 因此它就不会再收到 LeaderAndIsr 请求,也就只能一直等到相关的延时请求超时了,
      // 所以这里在接收到 StopReplica 请求时,强制触发和相关分区有关的 delayed operations 的完成逻辑
      partitionsToMakeFollower.foreach { partition =>
        completeDelayedFetchOrProduceRequests(partition.topicPartition)
      }

      if (isShuttingDown.get()) {
        if (traceLoggingEnabled) {
          partitionsToMakeFollower.foreach { partition =>
            stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
              s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
              s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " +
              "since it is shutting down")
          }
        }
      } else {
        // we do not need to check if the leader exists again since this has been done at the beginning of this process
        // 查找 leader 的基本信息,以便同步数据
        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
          val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
            .brokerEndPoint(config.interBrokerListenerName)
          val fetchOffset = partition.localLogOrException.highWatermark
          partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
       }.toMap

        // 创建 log fetcher,同步数据
        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
      }
    } catch {
      case e: Throwable =>
        stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
          s"received from controller $controllerId epoch $controllerEpoch", e)
        // Re-throw the exception for it to be caught in KafkaApis
        throw e
    }

    if (traceLoggingEnabled)
      partitionStates.keys.foreach { partition =>
        stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
          s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
          s"${partitionStates(partition).leader}")
      }

    partitionsToMakeFollower
  }

5. 创建 fetcher 以备拉取数据

在上面的代码中,我们看到了关键的一行,一个 broker 会在变为 follower 的时候,创建 data fetcher。这就是 follower 可以从 leader 拉取数据的原因。

replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

下面是 addFetcherForPartitions 方法的实现,我们可以看出:

  1. 一个 Fetcher Thread 是根据 (brokerId, fetcherId) 来标识的。多个 Fetcher 线程可以连接到同一个 broker
  2. fetcherId 的目的是为了将 partition 分布在多个 fetcher 线程中,以实现负载均衡
  3. 同一个 fetcher 线程会拉取多个分区的数据,只要它们属于相同的 broker 且 fetcherId 相同。
  def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {
    lock synchronized {
      // 按 [leader_endpoint + fetcherId] 分组
      // fetcherId = Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition)
      //             % numFetchersPerBroker
      val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>
        BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
      }

      def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
                                   brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
        val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
        fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
        fetcherThread.start()
        fetcherThread
      }

      for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {

        // 根据 [brokerId + fetcherId] 找到 fetcher thread
        val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
        val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {

          case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker =>
            // 如果 map 中找到一个 fetcher thread(目标 broker id & fetcherId 相同),目标 endpoint 也和本次请求相同,则复用这个 fetcher thread
            currentFetcherThread

          case Some(f) =>
            // 如果 map 中找到一个 fetcher thread(目标 broker id & fetcherId 相同),但是目标 endpoint 和本次请求不同
            // 则可能是原来的 broker 挂了,因此需要重新创建 fetcher thread
            f.shutdown()
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)

          case None =>
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
        }

        // 初始化 fetch offset(设置为分区的 HW) 和 leaderEpoch
        val initialOffsetAndEpochs = initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
          tp -> OffsetAndEpoch(brokerAndInitOffset.initOffset, brokerAndInitOffset.currentLeaderEpoch)
        }

        // 向 fetcherThread 中添加 partition,以备从指定的 offset 拉取数据
        addPartitionsToFetcherThread(fetcherThread, initialOffsetAndEpochs)
      }
    }
  }

  protected def addPartitionsToFetcherThread(fetcherThread: T,
                                             initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
    fetcherThread.addPartitions(initialOffsetAndEpochs)
    info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
  }

那么,向 fetcher 中增加 partition 时,又是怎么处理的呢?

  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
    partitionMapLock.lockInterruptibly()
    try {
      // 从 failedPartitions 中移除这部分 TopicPartition
      failedPartitions.removeAll(initialFetchStates.keySet)

      initialFetchStates.forKeyValue { (tp, initialFetchState) =>
        // We can skip the truncation step iff the leader epoch matches the existing epoch
        val currentState = partitionStates.stateValue(tp)

        // initialFetchState.offset 取自 Log 对象 的 highWatermarkMetadata.messageOffset

        val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {

          // 如果当前状态存在(之前发生过 fetch 或 truncate),
          // 且 leader epoch 一致(说明未发生 leader 变更),则什么都不需要做
          currentState

        } else if (initialFetchState.offset < 0) {
          // 如果 offset 非法,需从 leader 获取最新 offset 并执行日志截断;
          // (可能由于某种初始状态或者异常状态,导致被设置为负值;比如创建一个全新的 log 目录的时候,offset 就设置为 -1
          //   这个 case 可以参考 LogOffsetMetadata.UnknownOffsetMetadata 的代码)
          fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)

        } else {
          // 正常情况,构造一个状态为 Truncating 的 PartitionFetchState.
          // (当前场景下,truncate 的位置(即 initialFetchState.offset)就是 Log 对象的 HW)
          PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, state = Truncating)
        }

        // 更新状态
        partitionStates.updateAndMoveToEnd(tp, updatedState)
      }

      // 唤醒所有等待线程
      //(例如:Fetcher 中没有任何状态 ok 的 partition 时,此时 fetcher 的主任务循环中,
      //  会等待 replica.fetch.backoff.ms=1000 时间,等待 partition 被添加或者 partition 分区状态恢复正常,
      //  以防循环跑飞了 )
      partitionMapCond.signalAll()

      initialFetchStates.keySet
    } finally partitionMapLock.unlock()
  }

这里其实存在 3 个分支:

  1. state 已存在且 leaderEpoch 和 LeaderAndIsr 请求中的 leaderEpoch 一致:则保持 state 不变
  2. offset 非法:重新去 leader 查询 log end offset,
  3. 其他情况:构造一个 truncating 状态的 state,设置 state 中的 leaderEpoch 为 LeaderAndIsr 请求中携带的最新值

truncating 状态的日志,后续会在 fetcherThread 线程中进行截断。这里我们看下 offset 非法的情况:

  /**
   * Handle a partition whose offset is out of range and return a new fetch offset.
   *
   * 处理拉取数据时,fetch offset 超出 leader 所保存的日志范围之外的情况。
   *
   * 查询 leader 的 log end offset,结合本地的 log end offset,
   * 找到一个合理的 truncate offset
   *
   */
  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): PartitionFetchState = {

    // 查询本地 ReplicaManager 中缓存的 topicPartition 的 log end offset
    val replicaEndOffset = logEndOffset(topicPartition)

    /**
     * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
     * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
     * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
     * and it may discover that the current leader's end offset is behind its own end offset.
     *
     * Unclean leader election:
     * 某个 follower 挂了,同时,leader 一直在写入消息。
     * 然后 follower 又恢复了,但还没有追上 leader 的 log,此时如果 isr 中的副本都挂了(包括 leader)
     * 那么这个 follower 会通过 unclean elect 成为 leader,然后开始接受 client 的数据。
     * 此时,如果原 leader 又恢复了,成为了一个 follower,他就会发现新 leader 的 log end offset 比自己的小
     *
     * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
     *
     * 这种情况下,将截断当前 follower(旧 leader) 的 log 到新 leader 的 log end offset,然后继续 fetching。
     *
     * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
     *
     * 这情况暂时不修
     *
     * 注意:unclean election 可以通过配置文件控制,默认是关闭的
     */

    // 向 leader 发送请求(ApiKeys.LIST_OFFSETS),查询 log end offset
    val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition, currentLeaderEpoch)
    if (leaderEndOffset < replicaEndOffset) {
      warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
        s"leader's latest offset $leaderEndOffset")

      // 截取日志
      truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true))

      fetcherLagStats.getAndMaybePut(topicPartition).lag = 0

      // 截取完毕,构造一个 fetching 状态的 state,以备拉取数据
      PartitionFetchState(leaderEndOffset, Some(0), currentLeaderEpoch, state = Fetching)
    } else {
      /**
       * If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
       * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
       * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
       * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
       * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
       * to fetch from the new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
       * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
       * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
       *
       * 如果 leader log end offset >= follower log end offset,有 2 种情况:
       * 1. follower 挂了。当挂了很长时间,然后恢复的时候,还会出现 follower 的 log end offset 会比 leader 的 log start offset
       *    还小的情况。
       * 2. unclean elect 时,旧 leader 的 hw > 新 leader 的 log end offset,此时旧 leader 会从 hw 截断,
       *    当旧 leader 再次从 leader 拉取数据时,会报 OffsetOutOfRangeException。此时新 leader 则继续接受数据,
       *    而旧 leader 会尝试处理这个异常,处理异常的过程中,会查询 leader 的 log end offset,最终就会触发这个场景。
       *
       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
       * start offset.
       *
       * 第一种情况,follower 的 log end offset < leader 的 log start offset,follower 应该截掉所有的内容,
       * 创建一个新的 segment,然后从 leader 的 log start offset 最开头拉取数据。
       *
       * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
       * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
       * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
       * brokers and producers.
       *
       * 第二种情况,follower 只需要继续 fetch 就可以了。但第二种情况下,可能会有数据不一致的问题,暂时不处理。
       * 如果用户需要强一致性保证,需要同时在 broker 和 producer 进行设置。
       *
       * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
       * and the current leader's log start offset.
       *
       * 综上所述,follower 需要从 local log end offset 和 leader log start offset 中选择一个较大的值进行 fetch。
       */
      val leaderStartOffset = fetchEarliestOffsetFromLeader(topicPartition, currentLeaderEpoch)
      warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
        s"leader's start offset $leaderStartOffset")

      /**
       * 如果 leaderStartOffset > replicaEndOffset: 说明 follower 数据太旧了,需要全部删除,创建一个新的 segment,再拉取数据
       * 如果 leaderStartOffset < replicaEndOffset: 则直接从 replicaEndOffset 拉取数据就可以了
       */

      val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)

      // Only truncate log when current leader's log start offset is greater than follower's log end offset.
      // follower 的数据太旧了,全部删掉,创建一个从 leaderStartOffset 开始的新 segment
      if (leaderStartOffset > replicaEndOffset)
        truncateFullyAndStartAt(topicPartition, leaderStartOffset)

      val initialLag = leaderEndOffset - offsetToFetch
      fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
      PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching)
    }
  }

至此,我们就把待处理的 partition 添加到 fetcherThread 中了。下来我们看一看 fetcherThread 是如何拉取数据的。

二、FetcherThread

1. PartitionFetchState

上文我们一直提到一个 state,它枚举出了两种状态:

sealed trait ReplicaState

// 当副本需要执行截断操作时,状态置为 Truncating
case object Truncating extends ReplicaState

// 当副本需要拉取数据时,状态置为 Fetching
case object Fetching extends ReplicaState

而分区的状态 paritition fetch State 的结构如下:

/**
 * case class to keep partition offset and its state(truncatingLog, delayed)
 * This represents a partition as being either:
 * (1) Truncating its log, for example having recently become a follower
 * (2) Delayed, for example due to an error, where we subsequently back off a bit
 * (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
 *
 * 保存 partition offset 和 state(truncating 或者 delayed),共有三种状态:
 * 1. Truncating 表示副本正在执行截断日志操作;比如最近刚成为 follower 的时候
 * 2. Delayed 延迟状态;比如由于某种异常,导致了日志拉取延迟,落后于 leader
 * 3. ReadyForFetch,表示可以拉取数据
 */
case class PartitionFetchState(fetchOffset: Long,
                               lag: Option[Long],
                               currentLeaderEpoch: Int,
                               delay: Option[DelayedItem],
                               state: ReplicaState) {

  // 副本处于 Fetching 状态且未被 delay 执行
  def isReadyForFetch: Boolean = state == Fetching && !isDelayed

  // 副本处于 Truncating 状态且未被 delay 执行
  def isTruncating: Boolean = state == Truncating && !isDelayed

  def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)

  /////////////////////////////////////////////////////////////////////////

  // 副本在 ISR 中,没有 lag
  def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0

  ......
}

其核心还是 ReplicaState,又通过 lag、delay 等参数更加详细的标识了当前的拉取状态。

2. AbstractFetcherThread

上文提到的 ReplicaFetcherThread 继承自 AbstractFetcherThread,同样继承自这个类的,还有一个 ReplicaAlterLogDirsThread,这个 thread 是用来处理日志目录迁移的。

现在我们来看看 AbstractFetcherThread 的庐山真面目:

abstract class AbstractFetcherThread(name: String,
                                     clientId: String,  // 用于记录日志
                                     val sourceBroker: BrokerEndPoint,  // 数据源Broker地址
                                     failedPartitions: FailedPartitions,  // 处理过程中出现失败的分区
                                     fetchBackOffMs: Int = 0, // 重试的间隔
                                     isInterruptible: Boolean = true, // 线程是否允许被中断
                                     val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
  extends ShutdownableThread(name, isInterruptible) {

  // 保存了 hw、LSO、log start offset 以及拉取的消息集合等信息
  type FetchData = FetchResponse.PartitionData[Records]
  // 保存了 leaderEpoch 信息
  type EpochData = OffsetsForLeaderEpochRequest.PartitionData

  // PartitionStates 是 Kafka 自定义的一个集合类。用来保存了每个分区的 PartitionFetchState
  // PartitionFetchState 对应下面三种:
  // 1. isTruncatingLog
  // 2. isDelayed
  // 3. isReadyForFetch
  // 其中还保存着 isReplicaInSync、是否lag、LeaderEpoch、副本的状态(Fetching,Truncating)等信息
  private val partitionStates = new PartitionStates[PartitionFetchState]

  ......

  // 线程启动之后,就会在死循环中执行这个方法
  override def doWork(): Unit = {
    maybeTruncate()
    maybeFetch()
  }

  // 截取日志
  private def maybeTruncate(): Unit = {
    val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()
    if (partitionsWithEpochs.nonEmpty) {
      truncateToEpochEndOffsets(partitionsWithEpochs)
    }
    if (partitionsWithoutEpochs.nonEmpty) {
      truncateToHighWatermark(partitionsWithoutEpochs)
    }
  }

  // 拉取日志
  private def maybeFetch(): Unit = {
    val fetchRequestOpt = inLock(partitionMapLock) {
      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)

      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")

      if (fetchRequestOpt.isEmpty) {
        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }

      fetchRequestOpt
    }

    fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
      processFetchRequest(sessionPartitions, fetchRequest)
    }
  }
}

AbstractFetcherThread 它继承自 ShutdownableThread,这个类我们之前也遇到过,它的 run 方法就是在循环中不停的调用 doWork() 方法,也就是说,我们的 fetcher thread 运行的时候,也是在死循环中不停的调用上面代码中的 doWork() 方法:

  override def run(): Unit = {
    isStarted = true
    info("Starting")
    try {
      while (isRunning)
        doWork()
    } catch {
      case e: FatalExitError =>
        shutdownInitiated.countDown()
        shutdownComplete.countDown()
        info("Stopped")
        Exit.exit(e.statusCode())
      case e: Throwable =>
        if (isRunning)
          error("Error due to", e)
    } finally {
       shutdownComplete.countDown()
    }
    info("Stopped")
  }

3. maybeTruncate()

上文中,我们向 fetcher 中新增 partition 时,会构造出一个 Truncating 状态的 state 加入到缓存中:

而在 maybeTruncate() 中,就会处理(截断)这些 truncating 状态的日志:

  private def maybeTruncate(): Unit = {

    // 从 partitionStates 取出 TopicPartition,据此找到 Log 类实例,
    // 然后取出 leaderEpochCache 中最后一项(即最新的 leaderEpoch 对应的日志 index)
    // 根据是否找到,将 TopicPartition 分为两组
    val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()

    if (partitionsWithEpochs.nonEmpty) {
      // 如果有 leaderEpoch 及对应的 offset,可以少截取一点
      truncateToEpochEndOffsets(partitionsWithEpochs)
    }
    if (partitionsWithoutEpochs.nonEmpty) {
      // 如果没有,则出于安全考虑,只能从所有 follower 都确认过的 hw 截取了
      truncateToHighWatermark(partitionsWithoutEpochs)
    }
  }

分组的代码很简单:


  private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], Set[TopicPartition]) = inLock(partitionMapLock) {
    val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
    val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]

    partitionStates.partitionStateMap.forEach { (tp, state) =>
      if (state.isTruncating) {
        latestEpoch(tp) match {
          case Some(epoch) if isOffsetForLeaderEpochSupported =>
            partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
          case _ =>
            partitionsWithoutEpochs += tp
        }
      }
    }

    (partitionsWithEpochs, partitionsWithoutEpochs)
  }

a. truncateToEpochEndOffsets()

  /**
    * - Build a leader epoch fetch based on partitions that are in the Truncating phase
    * - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
    *   leader epoch. This is the offset the follower should truncate to ensure
    *   accurate log replication.
    * - Finally truncate the logs for partitions in the truncating phase and mark the
    *   truncation complete. Do this within a lock to ensure no leadership changes can
    *   occur during truncation.
    *
    * - 针对 Truncating 状态的 partition,构建一个 api 请求用于查询 leader epoch
    * - 发送 OffsetsForLeaderEpochRequest,从 leader 取回最新的 leaderEpoch 及对应的 log offset
    *   这个 log offset 就是应该截取的位置
    * - 最后,执行 truncate 操作,并标记为 truncation complete 状态。这部分操作需在 partitionMapLock 中执行,
    *   以确保在此期间没有 leadership 变更。
    *
    */
  private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = {

    // 从 fetch target(即 leader 副本)查询最新 leaderEpoch 对应的 log offset
    val endOffsets = fetchEpochEndOffsets(latestEpochsForPartitions)

    //Ensure we hold a lock during truncation.
    inLock(partitionMapLock) {
      //Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs
      //
      // 外层从 partitionStates 中取的 topicPartition 来获取 latestEpoch,
      // 没有加锁,到这里有可能已经不存在了/被更新了,所以需要再做一次过滤,确保
      // 1. 存在
      // 2. leaderEpoch 相同(领导权未发生过变更)
      val epochEndOffsets = endOffsets.filter { case (tp, _) =>
        val curPartitionState = partitionStates.stateValue(tp)
        val partitionEpochRequest = latestEpochsForPartitions.getOrElse(tp, {
          throw new IllegalStateException(
            s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request")
        })
        val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
        curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
      }

      // 执行日志截断操作
      val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)

      // 处理异常,通常是由于 leaderShip 变更导致的:
      // 更新 partitionStates 中的状态,标记为 replica.fetch.backoff.ms 延时后再 fetch
      handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets")

      // 将成功 truncate 的 tp,状态设置为 Fetching,等待拉取数据
      updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
    }
  }

maybeTruncateToEpochEndOffsets 的源码如下:

  private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
                                             latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
    val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
    val partitionsWithError = mutable.HashSet.empty[TopicPartition]

    fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
      leaderEpochOffset.error match {
        case Errors.NONE =>

          // Returns truncation offset and whether this is the final offset to truncate to
          //
          // 处理每个 topic-partition 的日志截断。
          // 返回要截断的 offset 以及是否为 最终 offset。
          //
          // For each topic partition, the offset to truncate to is calculated based on leader's returned
          // epoch and offset:
          //
          // 对于每个 topic-partition,截断 offset 的计算依靠 leader 返回的 epoch 和 offset:
          //
          // -- If the leader replied with undefined epoch offset, we must use the high watermark. This can
          // happen if 1) the leader is still using message format older than KAFKA_0_11_0; 2) the follower
          // requested leader epoch < the first leader epoch known to the leader.
          //
          // -- 如果 leader 返回的 offset 是未定义(UNDEFINED_EPOCH_OFFSET),则必须使用 hw
          // 这种情况可能出现在:
          // 1)leader 仍在使用旧版本消息格式(早于 KAFKA_0_11_0);
          // 2)follower 请求的 leader epoch 小于 leader 所知道的最早 epoch
          //
          // -- If the leader replied with the valid offset but undefined leader epoch, we truncate to
          // leader's offset if it is lower than follower's Log End Offset. This may happen if the
          // leader is on the inter-broker protocol version < KAFKA_2_0_IV0
          //
          // -- 如果 leader 返回了有效 offset 但未定义的 leaderEpoch(UNDEFINED_EPOCH),
          // 如果该 offset 小于 follower 的日志末尾(Log End Offset),就截断到 leader 的 offset。
          // 这种情况可能发生在 leader 使用低于 KAFKA_2_0_IV0 的协议版本
          // 
          //  -- If the leader replied with leader epoch not known to the follower, we truncate to the
          //  end offset of the largest epoch that is smaller than the epoch the leader replied with, and
          //  send OffsetsForLeaderEpochRequest with that leader epoch. In a more rare case, where the
          //  follower was not tracking epochs smaller than the epoch the leader replied with, we
          //  truncate the leader's offset (and do not send any more leader epoch requests).
          // 
          // -- 如果 leader 返回的 epoch 对 follower 是未知的,
          //    就截断到比该 epoch 小的最大已知 epoch 的日志末尾 offset,并重新发起 OffsetsForLeaderEpoch 请求。
          //    如果 follower 甚至没有记录这些较小 epoch 的信息(很少见),
          //    就直接截断到 leader 返回的 offset,并不再发起后续 epoch 请求。
          // 
          //  -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that
          //  leader replied with, follower's Log End Offset).
          // 
          // -- 否则,就截断到以下三者中的最小值:
          // leader 返回的 offset、follower 在该 epoch 上的末尾 offset、follower 的日志末尾 offset。
          //
          val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset)

          // 截断日志
          if(doTruncate(tp, offsetTruncationState))
            // 截断成功,记录截取的位置
            fetchOffsets.put(tp, offsetTruncationState)

        ......
        case error =>
          info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
          partitionsWithError += tp
      }
    }

    ResultWithPartitions(fetchOffsets, partitionsWithError)
  }

b. truncateToHighWatermark()

  private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
    val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]

    // 外层默认赋值是 hw,所以这里是从 hw 开始截断
    for (tp <- partitions) {
      val partitionState = partitionStates.stateValue(tp)
      if (partitionState != null) {
        val highWatermark = partitionState.fetchOffset
        val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)

        info(s"Truncating partition $tp to local high watermark $highWatermark")
        // 执行日志截断
        if (doTruncate(tp, truncationState))
          fetchOffsets.put(tp, truncationState)
      }
    }

    // 将成功 truncate 的 tp,状态设置为 Fetching,等待拉取数据
    updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
  }

c. 截断日志 *

截断日志,是 Log 模块的代码,我们曾经遇到过,但并没有细看。这里我们温习一下 Log 相关的内容,看看具体是怎么实现的:

  def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean): Unit = {
    val affectedLogs = ArrayBuffer.empty[Log]
    for ((topicPartition, truncateOffset) <- partitionOffsets) {
      val log = {
        if (isFuture)
          futureLogs.get(topicPartition)
        else
          currentLogs.get(topicPartition)
      }
      // If the log does not exist, skip it
      if (log != null) {
        // May need to abort and pause the cleaning of the log, and resume after truncation is done.
        // Kafka 的 Log Cleaner 线程,用于清理压缩日志。
        //
        // 如果截断 offset 小于当前活跃 segment 的起始 offset,
        // 说明 Cleaner 有可能在处理将被截断的 segment,
        // 需要通知 Log Cleaner 先暂停处理这个 log,避免出现一致性问题。
        val needToStopCleaner = truncateOffset < log.activeSegment.baseOffset
        if (needToStopCleaner && !isFuture)
          abortAndPauseCleaning(topicPartition)

        try {
          if (log.truncateTo(truncateOffset))
            affectedLogs += log

          // 如果截断位置在 active segment 前,意味着之前的 checkpoint 也可能过时,必须一并更新。
          if (needToStopCleaner && !isFuture)
            maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log, topicPartition)

        } finally {
          if (needToStopCleaner && !isFuture) {
            // 恢复 Log Cleaner
            resumeCleaning(topicPartition)
          }
        }
      }
    }

    // 更新 log 的 recovery point
    // 清理日志压缩的 snapshot
    for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) {
      checkpointRecoveryOffsetsAndCleanSnapshotsInDir(dir, logs)
    }
  }

下面是 Log 类中真正的截取方法:

  private[kafka] def truncateTo(targetOffset: Long): Boolean = {
    maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") {
      if (targetOffset < 0)
        throw new IllegalArgumentException(s"Cannot truncate partition $topicPartition to a negative offset (%d).".format(targetOffset))
      if (targetOffset >= logEndOffset) {
        info(s"Truncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1}")

        // Always truncate epoch cache since we may have a conflicting epoch entry at the
        // end of the log from the leader. This could happen if this broker was a leader
        // and inserted the first start offset entry, but then failed to append any entries
        // before another leader was elected.
        lock synchronized {
          leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
        }

        false
      } else {
        info(s"Truncating to offset $targetOffset")
        lock synchronized {
          checkIfMemoryMappedBufferClosed()

          if (segments.firstEntry.getValue.baseOffset > targetOffset) {

            // 截掉所有的 segments,从 targetOffset 重新开始:
            // 1. 从 segment list 中移除所有的 segment
            //    为日志和索引文件增加 .deleted 后缀,等待 file.delete.delay.ms 时间后物理删除
            // 2. 创建新的 segment
            //    更新 leaderEpochCache
            //    清空 producerStateManager
            // 3. 更新 logStartOffset、nextOffsetMetadata、recoveryPoint、HW
            //    更新 producerState
            truncateFullyAndStartAt(targetOffset)

          } else {

            // 在现有的 segments 范围内截取
            val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)

            // 从 segment list 中移除这部分 segment
            // 为日志和索引文件增加 .deleted 后缀,等待 file.delete.delay.ms 时间后物理删除
            removeAndDeleteSegments(deletable, asyncDelete = true, LogTruncation)

            // 截断索引,截断日志
            activeSegment.truncateTo(targetOffset)

            // 更新 leaderEpochCache
            leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))

            // 更新 logStartOffset、nextOffsetMetadata、recoveryPoint、HW、producerState
            completeTruncation(
              startOffset = math.min(targetOffset, logStartOffset),
              endOffset = targetOffset
            )
          }
          true
        }
      }
    }
  }

d. 更新 state

对于截断成功的 TopicPartition,需要将状态从 truncating 更新到 fetching,以备后续拉取数据

  /**
    * Loop through all partitions, updating their fetch offset and maybe marking them as
    * truncation completed if their offsetTruncationState indicates truncation completed
    *
    * 遍历所有 partitionStates,如果是刚刚处理过的(在 fetchOffsets 中)
    * 则检查它的 offsetTruncationState.truncationCompleted 字段,
    * 如果 True,表示截断完成,则更新状态为 Fetching
    * 如果 False,则更新状态为 Truncating
    *
    * @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete
    */
  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = {
    val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala
      .map { case (topicPartition, currentFetchState) =>

        val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {

          // 存在于 fetchOffsets 中(即刚刚执行过 truncate 流程的)
          case Some(offsetTruncationState) =>

            // 将 truncate 成功的 tp 的状态更新为 Fetching,表示可以正常拉取数据了
            val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating
            PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
              currentFetchState.currentLeaderEpoch, currentFetchState.delay, state)

          // 如果不是刚刚处理过的,则维持状态不变
          case None => currentFetchState
        }
        (topicPartition, maybeTruncationComplete)
      }
    partitionStates.set(newStates.asJava)
  }

4. maybeFetch()

经过 maybeTruncate() 日志被截断到正确的位置了,这时候就可以从 leader 拉取日志了:

  private def maybeFetch(): Unit = {
    val fetchRequestOpt = inLock(partitionMapLock) {

      // 检查所有的 partition state,
      // 筛掉不是 fetching 状态的 或者 被限流的条目,
      // 构造请求,并返回 offline 状态的分区(partitionsWithError)
      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)

      // 为 state 中增加一个 delay 值(来自配置 replica.fetch.backoff.ms 默认值 100),
      // 表示延迟 delay ms 后再重新尝试
      handlePartitionsWithErrors(partitionsWithError, "maybeFetch")

      if (fetchRequestOpt.isEmpty) {
        // 没有任何 fetch 任务时,等待一段时间(这个时间和上面的 delay 值取自同一个参数)
        trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }

      fetchRequestOpt
    }

    fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
      // 发送并处理 fetch 请求
      processFetchRequest(sessionPartitions, fetchRequest)
    }
  }

我从 PartitionFetchState 的源码中可以看到,延迟重试,其实就是给 state 中设置一个未来的时间点 DelayedItem ;而判断是否是延迟任务(延迟任务是否到期),则就是看这个时间点是否小于当前时间:

case class PartitionFetchState(fetchOffset: Long,
                               lag: Option[Long],
                               currentLeaderEpoch: Int,
                               delay: Option[DelayedItem],
                               state: ReplicaState) {
  ......
  def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
  ......
}

class DelayedItem(val delayMs: Long) extends Delayed with Logging {
  private val dueMs = Time.SYSTEM.milliseconds + delayMs
  def getDelay(unit: TimeUnit): Long = {
    unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
  }

下面是处理 leader response 的代码:

  private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
                                  fetchRequest: FetchRequest.Builder): Unit = {
    val partitionsWithError = mutable.Set[TopicPartition]()
    var responseData: Map[TopicPartition, FetchData] = Map.empty

    try {
      // 以 ApiKeys.FETCH 发送请求,拉取数据
      trace(s"Sending fetch request $fetchRequest")
      responseData = fetchFromLeader(fetchRequest)
    } catch {
      ......
    }

    ......

    if (responseData.nonEmpty) {

      inLock(partitionMapLock) {
        responseData.forKeyValue { (topicPartition, partitionData) =>
          Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
            // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
            // In this case, we only want to process the fetch response if the partition state is ready for fetch and
            // the current offset is the same as the offset requested.
            //
            // 只处理 partition fetch state == Fetching 且 offset 与预期一致的 TopicPartition

            // 找到 request param
            val fetchPartitionData = sessionPartitions.get(topicPartition)

            // 1. 是本次请求的 partition
            // 2. 请求中的 offset 和 partition state 中的 fetch offset 一致(即,在请求期间,partition state 没有更新过)
            // 3. state == Fetching && delayTime == 0
            if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {

              // request leader epoch
              val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None

              partitionData.error match {
                case Errors.NONE =>
                  try {
                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                    // 一旦把数据传递给 process 流程,本线程就不能再修改这些数据了

                    // 将数据 append 到 log 文件的最后,其中就调用了日志模块的 Log.append() 方法。
                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
                      partitionData)

                    logAppendInfoOpt.foreach { logAppendInfo =>

                      // 更新 follower 的落后值 lag
                      val validBytes = logAppendInfo.validBytes
                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
                      ......
                    }
                  } catch {
                    case ime@( _: CorruptRecordException | _: InvalidRecordException) =>
                      // 消息 CRC 校验失败 或 消息格式非法
                      // we log the error and continue. This ensures two things
                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
                      //    down and cause other topic partition to also lag
                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
                      //    can cause this), we simply continue and should get fixed in the subsequent fetches
                      error(s"Found invalid messages during fetch for partition $topicPartition " +
                        s"offset ${currentFetchState.fetchOffset}", ime)
                      partitionsWithError += topicPartition
                    case e: KafkaStorageException =>
                      // 副本离线、IO 异常等情况
                      error(s"Error while processing data for partition $topicPartition " +
                        s"at offset ${currentFetchState.fetchOffset}", e)
                      markPartitionFailed(topicPartition)
                    case t: Throwable =>
                      // stop monitoring this partition and add it to the set of failed partitions
                      error(s"Unexpected error occurred while processing data for partition $topicPartition " +
                        s"at offset ${currentFetchState.fetchOffset}", t)
                      markPartitionFailed(topicPartition)
                  }
                case Errors.OFFSET_OUT_OF_RANGE =>
                  // follower 请求的 offset 不再 leader 的日志范围内
                  if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
                    partitionsWithError += topicPartition

                case Errors.UNKNOWN_LEADER_EPOCH =>
                  // follower 请求中携带的 leader epoch > leader epoch
                  debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
                    s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
                  partitionsWithError += topicPartition

                case Errors.FENCED_LEADER_EPOCH =>
                  // follower 请求中携带的 leader epoch < leader epoch
                  if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition

                case Errors.NOT_LEADER_OR_FOLLOWER =>
                  debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
                    "that the partition is being moved")
                  partitionsWithError += topicPartition

                case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
                  warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from the leader for partition $topicPartition. " +
                       "This error may be returned transiently when the partition is being created or deleted, but it is not " +
                       "expected to persist.")
                  partitionsWithError += topicPartition

                case _ =>
                  error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",
                    partitionData.error.exception)
                  partitionsWithError += topicPartition
              }
            }
          }
        }
      }
    }

    // 处理失败的分区(为 state 中增加一个 delay值,延迟一段时间后重试)
    if (partitionsWithError.nonEmpty) {
      handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
    }
  }

对于正常情况,kafka 会将数据 append 到 log 文件中(append 的具体实现我们在 Kafka源码·二 - Log 这一节已经读过啦):

  override def processPartitionData(topicPartition: TopicPartition,
                                    fetchOffset: Long,
                                    partitionData: FetchData): Option[LogAppendInfo] = {
    val logTrace = isTraceEnabled
    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
    val log = partition.localLogOrException
    val records = toMemoryRecords(partitionData.records)

    maybeWarnIfOversizedRecords(records, topicPartition)

    // 确保已经 truncate 到正确的位置
    if (fetchOffset != log.logEndOffset)
      throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
        topicPartition, fetchOffset, log.logEndOffset))

    if (logTrace)
      trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
        .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))

    // 将 leader 的数据  append 到本地 log 文件中
    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)

    if (logTrace)
      trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
        .format(log.logEndOffset, records.sizeInBytes, topicPartition))
    val leaderLogStartOffset = partitionData.logStartOffset

    // leader 会在 follower 拉取数据的同时发送 hw 值,以便 follower 更新本地 hw
    val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)

    // leader 会在 follower 拉取数据的同时发送 log start offset 值,以便 follower 更新本地 log start offset
    log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
    if (logTrace)
      trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")

    // For the follower replica, we do not need to keep its segment base offset and physical position.
    // These values will be computed upon becoming leader or handling a preferred read replica fetch.
    //
    // 对于 follower replica,我们不需要保留其 segment base offset 和物理位置。
    // 这些值将在成为 leader 时或处理 preferred read replica 请求时计算。

    ......

    // 更新 metric
    if (partition.isReassigning && partition.isAddingLocalReplica)
      brokerTopicStats.updateReassignmentBytesIn(records.sizeInBytes)
    brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)

    logAppendInfo
  }

至此,一次拉取数据任务就完成了,此后,FecherThread 会不断在在循环中重复下面的任务,以保证主从数据的一致性:

  override def doWork(): Unit = {
    maybeTruncate()
    maybeFetch()
  }
标签: kafka
最后更新:2026年2月26日

zt52875287

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >
文章目录
  • 一、从 LeaderAndIsr 请求谈起
    • 1. 发送时机
    • 2. 请求参数
    • 3. 接收并处理请求
    • 4. makeFollowers:成为 follower
    • 5. 创建 fetcher 以备拉取数据
  • 二、FetcherThread
    • 1. PartitionFetchState
    • 2. AbstractFetcherThread
    • 3. maybeTruncate()
      • a. truncateToEpochEndOffsets()
      • b. truncateToHighWatermark()
      • c. 截断日志 *
      • d. 更新 state
    • 4. maybeFetch()

Copyright © by zt52875287@gmail.com All Rights Reserved.

Theme Kratos Made By Seaton Jiang

陕ICP备2021009385号-1