一、从 LeaderAndIsr 请求谈起
这个请求的作用是:通知集群中的节点,某个 Broker 成为某个分区的 Leader 或 Follower,以及 该分区最新的的副本状态信息。
1. 发送时机
controller 通常会在以下场景发送 LeaderAndIsrRequest:
- 分区 Leader 选举或切换时:比如原 leader 宕机,controller 会从 ISR(In-Sync Replica)中选出新的 leader,并通知相关副本(所在的 broker)
- 新 Topic 创建后:controller 会为各个分区选择 leader,并将该信息下发到分区副本所在的 broker
- Replica 状态变化时:例如重新加入 ISR
- 分区重新分配(Reassignment)时
2. 请求参数
LeaderAndIsrRequest 中携带的重要参数有:
| 字段名 | 用途 |
|---|---|
| controllerId | 当前 controller id |
| controllerEpoch | 用于保证请求的时效性 |
| brokerEpoch | 接收方 broker 的 epoch,同样用于保证请求的时效性 |
| partitionStates | Map<TopicPartition, LeaderAndIsrPartitionState>,每个分区的状态信息,见下表 |
| liveLeaders | 所有相关副本的 broker metadata 信息(包含 host, port 等),用于建立连接 |
而 LeaderAndIsrPartitionState 包含的字段有:
| 字段名 | 用途 |
|---|---|
| leader | 新 leader 的 brokerId |
| leaderEpoch | 分区当前 leader 的 epoch,用于版本控制,保证请求的时效性 |
| isr | ISR 副本列表 |
| replicas | 所有副本的 brokerId 列表 |
| partitionEpoch | 分区的版本号,用于幂等控制 |
| topicName, partitionIndex | 分区信息 |
3. 接收并处理请求
当 broker 收到请求之后,首先会验证 brokerEpoch 有效性:
private def isBrokerEpochStale(brokerEpochInRequest: Long): Boolean = {
// Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
//
// 首先,每个 broker 上都有一个 controller 实例,
// 但是只有选举成功的那个 broker 上的 controller 实例才会履行职责,管理整个集群。
//
// 我理解,这里的 controller.brokerEpoch 是指真正的 controller 分配给当前 broker 的 epoch,
//
// 所以,存在下面的情况:
// 真正的 controller 中某个线程,为当前 broker 确定了更大的 brokerEpoch,
// 但是还没来得及通知当前 broker(或者说当前 broker 还没有把数据更新到本地的 controller.brokerEpoch 中),
// 此时,leaderAndIsr 请求中的 brokerEpochInRequest 就会大于controller.brokerEpoch
//
// 这种情况也认为是合法的请求。
brokerEpochInRequest < controller.brokerEpoch
}
}
当 brokerEpochInRequest 合法时,会调用下面的 becomeLeaderOrFollower 方法,尝试更新集群状态:
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
val startMs = time.milliseconds()
replicaStateChangeLock synchronized {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
······
val response = {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
// 忽略来自旧 controller 的请求
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
controllerEpoch = leaderAndIsrRequest.controllerEpoch
val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
// 如果 partition 不存在,首先创建它
requestPartitionStates.foreach { partitionState =>
// 从本地缓存中找出要处理的 partition,如果没找到则新建一个
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline =>
// 分区离线,返回 None
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
case HostedPartition.Online(partition) =>
// 分区存在,返回分区
Some(partition)
case HostedPartition.None =>
// 分区不存在,创建新分区并返回
val partition = Partition(topicPartition, time, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
Some(partition)
}
// Next check partition's leader epoch
// 校验 leader epoch
partitionOpt.foreach { partition =>
// 本地缓存中的 leaderEpoch
val currentLeaderEpoch = partition.getLeaderEpoch
// 请求中的 leaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
if (requestLeaderEpoch > currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
// 记录到合法的 partitionStates 中,以便后续处理(makeLeader、makeFollower)
if (partitionState.replicas.contains(localBrokerId))
partitionStates.put(partition, partitionState)
else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
} else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch is smaller than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
} else {
stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch matches the current leader epoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
}
// 根据请求中的 leader 信息,将 partition 分为两组单独处理
val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
partitionState.leader == localBrokerId
}
val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty) {
// 变为 Leader
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
highWatermarkCheckpoints)
} else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) {
// 变为 follower
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints)
} else
Set.empty[Partition]
······
// 关闭空闲的 data fetcher
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
/*
* 如果发生变更的 partition 属于消费者组协调器 __consumer_offsets 或事务协调器 __transaction_state,
* 则执行下面的方法,来更新元数据及状态:
* a. groupCoordinator.onElection
* 1. 从磁盘(Log)中加载这个分区对应的 group 元数据和 offsets
* 2. 将数据保存在内存里(如 groupMetadataCache)
* 3. 然后开始处理客户端的 group join、heartbeat、commit offset 等请求
* b. groupCoordinator.onResignation
* 1. 清理内存缓存(groupMetadataCache)
* 2. 停止处理来自客户端的 group 请求
* 3. 释放相关锁与资源
* c. txnCoordinator.onElection
* 1. 从 __transaction_state 的日志文件读取历史事务状态(类似 group metadata 的加载)
* 2. 将 producerId -> 状态 等信息恢复到内存
* 3. 初始化协调器状态机,准备处理以下请求:InitProducerId\AddPartitionsToTxn\EndTxn
* 4. 确保事务语义(如 exactly-once, 幂等)继续保持一致性
* d. txnCoordinator.onResignation
* 1. 停止处理事务相关请求(防止双写或冲突)
* 2. 清理内存中加载的事务状态
* 3. 保证新 leader 可以独占性地接管协调事务工作
*/
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
······
}
}
······
stateChangeLogger.info(s"Finished LeaderAndIsr request in ${elapsedMs}ms correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
response
}
}
这一节我们重点关注的是 follower 是如何拉取数据的,所以,我们下来看一看 kafka 是如何 makeFollowers 的。
4. makeFollowers:成为 follower
makeFollowers 方法通过以下步骤,使当前 Broker 成为指定分区的 follower:
- 从 leader 分区集合中移除这些分区。
- 将这些副本标记为 follower,以阻止生产者客户端继续写入数据。
- 停止这些分区的 fetcher(拉取线程),以防副本拉取线程继续添加数据。
- 截断这些分区的日志,并检查点(checkpoint)其偏移量。
- 清除 purgatory(暂存区)中的生产和拉取请求。
- 如果 broker 没有在关闭中状态,则添加 fetcher 向新 leader 拉取数据。
private def makeFollowers(controllerId: Int,
controllerEpoch: Int,
partitionStates: Map[Partition, LeaderAndIsrPartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors],
highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
partitionStates.forKeyValue { (partition, partitionState) =>
if (traceLoggingEnabled)
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionState.leader}")
responseMap.put(partition.topicPartition, Errors.NONE)
}
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
// TODO: Delete leaders from LeaderAndIsrRequest
partitionStates.forKeyValue { (partition, partitionState) =>
val newLeaderBrokerId = partitionState.leader
try {
// 从 controller 发过来的 metadata 缓存中拿到所有存活的 broker
// 根据 brokerId 匹配到新的 leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(_) =>
// 更新 leaderBrokerId、清空 ISR、查询zk上的配置并创建日志目录
if (partition.makeFollower(partitionState, highWatermarkCheckpoints))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " +
s"for partition ${partition.topicPartition} (last update " +
s"controller epoch ${partitionState.controllerEpoch}) " +
s"since the new leader $newLeaderBrokerId is the same as the old leader")
case None =>
// Controller 发送的 leaderAndIsr 告知新 leader 是 newLeaderBrokerId
// Controller 发送的 metadataCache 中找不到 newLeaderBrokerId
// 处理不了啦
//
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionState.controllerEpoch}) " +
s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false,
highWatermarkCheckpoints)
}
} catch {
case e: KafkaStorageException =>
······
responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
// 移除失效的 fetcherThread。
// 比如说原先是通过 fetcherA 向 brokerA 拉取数据,后来 brokerB 上的 replica 选举成功成为 leader了,
// 就要向 brokerB 拉取数据,此时需要删掉 fetcherA
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
// 尝试完成延时请求:
// 在分区重分配(reassignment)期间,一个 broker 可能不再维护某个 partition 的 replica,
// 因此它就不会再收到 LeaderAndIsr 请求,也就只能一直等到相关的延时请求超时了,
// 所以这里在接收到 StopReplica 请求时,强制触发和相关分区有关的 delayed operations 的完成逻辑
partitionsToMakeFollower.foreach { partition =>
completeDelayedFetchOrProduceRequests(partition.topicPartition)
}
if (isShuttingDown.get()) {
if (traceLoggingEnabled) {
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " +
s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " +
"since it is shutting down")
}
}
} else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
// 查找 leader 的基本信息,以便同步数据
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
.brokerEndPoint(config.interBrokerListenerName)
val fetchOffset = partition.localLogOrException.highWatermark
partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
}.toMap
// 创建 log fetcher,同步数据
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}
} catch {
case e: Throwable =>
stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
s"received from controller $controllerId epoch $controllerEpoch", e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
if (traceLoggingEnabled)
partitionStates.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionStates(partition).leader}")
}
partitionsToMakeFollower
}
5. 创建 fetcher 以备拉取数据
在上面的代码中,我们看到了关键的一行,一个 broker 会在变为 follower 的时候,创建 data fetcher。这就是 follower 可以从 leader 拉取数据的原因。
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
下面是 addFetcherForPartitions 方法的实现,我们可以看出:
- 一个 Fetcher Thread 是根据 (brokerId, fetcherId) 来标识的。多个 Fetcher 线程可以连接到同一个 broker
- fetcherId 的目的是为了将 partition 分布在多个 fetcher 线程中,以实现负载均衡
- 同一个 fetcher 线程会拉取多个分区的数据,只要它们属于相同的 broker 且 fetcherId 相同。
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {
lock synchronized {
// 按 [leader_endpoint + fetcherId] 分组
// fetcherId = Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition)
// % numFetchersPerBroker
val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>
BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))
}
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,
brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {
val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)
fetcherThread.start()
fetcherThread
}
for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
// 根据 [brokerId + fetcherId] 找到 fetcher thread
val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker =>
// 如果 map 中找到一个 fetcher thread(目标 broker id & fetcherId 相同),目标 endpoint 也和本次请求相同,则复用这个 fetcher thread
currentFetcherThread
case Some(f) =>
// 如果 map 中找到一个 fetcher thread(目标 broker id & fetcherId 相同),但是目标 endpoint 和本次请求不同
// 则可能是原来的 broker 挂了,因此需要重新创建 fetcher thread
f.shutdown()
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
case None =>
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
}
// 初始化 fetch offset(设置为分区的 HW) 和 leaderEpoch
val initialOffsetAndEpochs = initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
tp -> OffsetAndEpoch(brokerAndInitOffset.initOffset, brokerAndInitOffset.currentLeaderEpoch)
}
// 向 fetcherThread 中添加 partition,以备从指定的 offset 拉取数据
addPartitionsToFetcherThread(fetcherThread, initialOffsetAndEpochs)
}
}
}
protected def addPartitionsToFetcherThread(fetcherThread: T,
initialOffsetAndEpochs: collection.Map[TopicPartition, OffsetAndEpoch]): Unit = {
fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs")
}
那么,向 fetcher 中增加 partition 时,又是怎么处理的呢?
def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
// 从 failedPartitions 中移除这部分 TopicPartition
failedPartitions.removeAll(initialFetchStates.keySet)
initialFetchStates.forKeyValue { (tp, initialFetchState) =>
// We can skip the truncation step iff the leader epoch matches the existing epoch
val currentState = partitionStates.stateValue(tp)
// initialFetchState.offset 取自 Log 对象 的 highWatermarkMetadata.messageOffset
val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
// 如果当前状态存在(之前发生过 fetch 或 truncate),
// 且 leader epoch 一致(说明未发生 leader 变更),则什么都不需要做
currentState
} else if (initialFetchState.offset < 0) {
// 如果 offset 非法,需从 leader 获取最新 offset 并执行日志截断;
// (可能由于某种初始状态或者异常状态,导致被设置为负值;比如创建一个全新的 log 目录的时候,offset 就设置为 -1
// 这个 case 可以参考 LogOffsetMetadata.UnknownOffsetMetadata 的代码)
fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch)
} else {
// 正常情况,构造一个状态为 Truncating 的 PartitionFetchState.
// (当前场景下,truncate 的位置(即 initialFetchState.offset)就是 Log 对象的 HW)
PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, state = Truncating)
}
// 更新状态
partitionStates.updateAndMoveToEnd(tp, updatedState)
}
// 唤醒所有等待线程
//(例如:Fetcher 中没有任何状态 ok 的 partition 时,此时 fetcher 的主任务循环中,
// 会等待 replica.fetch.backoff.ms=1000 时间,等待 partition 被添加或者 partition 分区状态恢复正常,
// 以防循环跑飞了 )
partitionMapCond.signalAll()
initialFetchStates.keySet
} finally partitionMapLock.unlock()
}
这里其实存在 3 个分支:
- state 已存在且 leaderEpoch 和 LeaderAndIsr 请求中的 leaderEpoch 一致:则保持 state 不变
- offset 非法:重新去 leader 查询 log end offset,
- 其他情况:构造一个 truncating 状态的 state,设置 state 中的 leaderEpoch 为 LeaderAndIsr 请求中携带的最新值
truncating 状态的日志,后续会在 fetcherThread 线程中进行截断。这里我们看下 offset 非法的情况:
/**
* Handle a partition whose offset is out of range and return a new fetch offset.
*
* 处理拉取数据时,fetch offset 超出 leader 所保存的日志范围之外的情况。
*
* 查询 leader 的 log end offset,结合本地的 log end offset,
* 找到一个合理的 truncate offset
*
*/
protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): PartitionFetchState = {
// 查询本地 ReplicaManager 中缓存的 topicPartition 的 log end offset
val replicaEndOffset = logEndOffset(topicPartition)
/**
* Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
* and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
* elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
* and it may discover that the current leader's end offset is behind its own end offset.
*
* Unclean leader election:
* 某个 follower 挂了,同时,leader 一直在写入消息。
* 然后 follower 又恢复了,但还没有追上 leader 的 log,此时如果 isr 中的副本都挂了(包括 leader)
* 那么这个 follower 会通过 unclean elect 成为 leader,然后开始接受 client 的数据。
* 此时,如果原 leader 又恢复了,成为了一个 follower,他就会发现新 leader 的 log end offset 比自己的小
*
* In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
*
* 这种情况下,将截断当前 follower(旧 leader) 的 log 到新 leader 的 log end offset,然后继续 fetching。
*
* There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
*
* 这情况暂时不修
*
* 注意:unclean election 可以通过配置文件控制,默认是关闭的
*/
// 向 leader 发送请求(ApiKeys.LIST_OFFSETS),查询 log end offset
val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition, currentLeaderEpoch)
if (leaderEndOffset < replicaEndOffset) {
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's latest offset $leaderEndOffset")
// 截取日志
truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true))
fetcherLagStats.getAndMaybePut(topicPartition).lag = 0
// 截取完毕,构造一个 fetching 状态的 state,以备拉取数据
PartitionFetchState(leaderEndOffset, Some(0), currentLeaderEpoch, state = Fetching)
} else {
/**
* If the leader's log end offset is greater than the follower's log end offset, there are two possibilities:
* 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
* start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
* 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than
* the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts
* to fetch from the new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are
* produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
* the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
*
* 如果 leader log end offset >= follower log end offset,有 2 种情况:
* 1. follower 挂了。当挂了很长时间,然后恢复的时候,还会出现 follower 的 log end offset 会比 leader 的 log start offset
* 还小的情况。
* 2. unclean elect 时,旧 leader 的 hw > 新 leader 的 log end offset,此时旧 leader 会从 hw 截断,
* 当旧 leader 再次从 leader 拉取数据时,会报 OffsetOutOfRangeException。此时新 leader 则继续接受数据,
* 而旧 leader 会尝试处理这个异常,处理异常的过程中,会查询 leader 的 log end offset,最终就会触发这个场景。
*
* In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
* follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
* start offset.
*
* 第一种情况,follower 的 log end offset < leader 的 log start offset,follower 应该截掉所有的内容,
* 创建一个新的 segment,然后从 leader 的 log start offset 最开头拉取数据。
*
* In the second case, the follower should just keep the current log segments and retry the fetch. In the second
* case, there will be some inconsistency of data between old and new leader. We are not solving it here.
* If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
* brokers and producers.
*
* 第二种情况,follower 只需要继续 fetch 就可以了。但第二种情况下,可能会有数据不一致的问题,暂时不处理。
* 如果用户需要强一致性保证,需要同时在 broker 和 producer 进行设置。
*
* Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
* and the current leader's log start offset.
*
* 综上所述,follower 需要从 local log end offset 和 leader log start offset 中选择一个较大的值进行 fetch。
*/
val leaderStartOffset = fetchEarliestOffsetFromLeader(topicPartition, currentLeaderEpoch)
warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
s"leader's start offset $leaderStartOffset")
/**
* 如果 leaderStartOffset > replicaEndOffset: 说明 follower 数据太旧了,需要全部删除,创建一个新的 segment,再拉取数据
* 如果 leaderStartOffset < replicaEndOffset: 则直接从 replicaEndOffset 拉取数据就可以了
*/
val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
// follower 的数据太旧了,全部删掉,创建一个从 leaderStartOffset 开始的新 segment
if (leaderStartOffset > replicaEndOffset)
truncateFullyAndStartAt(topicPartition, leaderStartOffset)
val initialLag = leaderEndOffset - offsetToFetch
fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching)
}
}
至此,我们就把待处理的 partition 添加到 fetcherThread 中了。下来我们看一看 fetcherThread 是如何拉取数据的。
二、FetcherThread
1. PartitionFetchState
上文我们一直提到一个 state,它枚举出了两种状态:
sealed trait ReplicaState
// 当副本需要执行截断操作时,状态置为 Truncating
case object Truncating extends ReplicaState
// 当副本需要拉取数据时,状态置为 Fetching
case object Fetching extends ReplicaState
而分区的状态 paritition fetch State 的结构如下:
/**
* case class to keep partition offset and its state(truncatingLog, delayed)
* This represents a partition as being either:
* (1) Truncating its log, for example having recently become a follower
* (2) Delayed, for example due to an error, where we subsequently back off a bit
* (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
*
* 保存 partition offset 和 state(truncating 或者 delayed),共有三种状态:
* 1. Truncating 表示副本正在执行截断日志操作;比如最近刚成为 follower 的时候
* 2. Delayed 延迟状态;比如由于某种异常,导致了日志拉取延迟,落后于 leader
* 3. ReadyForFetch,表示可以拉取数据
*/
case class PartitionFetchState(fetchOffset: Long,
lag: Option[Long],
currentLeaderEpoch: Int,
delay: Option[DelayedItem],
state: ReplicaState) {
// 副本处于 Fetching 状态且未被 delay 执行
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
// 副本处于 Truncating 状态且未被 delay 执行
def isTruncating: Boolean = state == Truncating && !isDelayed
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
/////////////////////////////////////////////////////////////////////////
// 副本在 ISR 中,没有 lag
def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
......
}
其核心还是 ReplicaState,又通过 lag、delay 等参数更加详细的标识了当前的拉取状态。
2. AbstractFetcherThread
上文提到的 ReplicaFetcherThread 继承自 AbstractFetcherThread,同样继承自这个类的,还有一个 ReplicaAlterLogDirsThread,这个 thread 是用来处理日志目录迁移的。
现在我们来看看 AbstractFetcherThread 的庐山真面目:
abstract class AbstractFetcherThread(name: String,
clientId: String, // 用于记录日志
val sourceBroker: BrokerEndPoint, // 数据源Broker地址
failedPartitions: FailedPartitions, // 处理过程中出现失败的分区
fetchBackOffMs: Int = 0, // 重试的间隔
isInterruptible: Boolean = true, // 线程是否允许被中断
val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManager
extends ShutdownableThread(name, isInterruptible) {
// 保存了 hw、LSO、log start offset 以及拉取的消息集合等信息
type FetchData = FetchResponse.PartitionData[Records]
// 保存了 leaderEpoch 信息
type EpochData = OffsetsForLeaderEpochRequest.PartitionData
// PartitionStates 是 Kafka 自定义的一个集合类。用来保存了每个分区的 PartitionFetchState
// PartitionFetchState 对应下面三种:
// 1. isTruncatingLog
// 2. isDelayed
// 3. isReadyForFetch
// 其中还保存着 isReplicaInSync、是否lag、LeaderEpoch、副本的状态(Fetching,Truncating)等信息
private val partitionStates = new PartitionStates[PartitionFetchState]
......
// 线程启动之后,就会在死循环中执行这个方法
override def doWork(): Unit = {
maybeTruncate()
maybeFetch()
}
// 截取日志
private def maybeTruncate(): Unit = {
val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()
if (partitionsWithEpochs.nonEmpty) {
truncateToEpochEndOffsets(partitionsWithEpochs)
}
if (partitionsWithoutEpochs.nonEmpty) {
truncateToHighWatermark(partitionsWithoutEpochs)
}
}
// 拉取日志
private def maybeFetch(): Unit = {
val fetchRequestOpt = inLock(partitionMapLock) {
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
if (fetchRequestOpt.isEmpty) {
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequestOpt
}
fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
processFetchRequest(sessionPartitions, fetchRequest)
}
}
}
AbstractFetcherThread 它继承自 ShutdownableThread,这个类我们之前也遇到过,它的 run 方法就是在循环中不停的调用 doWork() 方法,也就是说,我们的 fetcher thread 运行的时候,也是在死循环中不停的调用上面代码中的 doWork() 方法:
override def run(): Unit = {
isStarted = true
info("Starting")
try {
while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
shutdownInitiated.countDown()
shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
if (isRunning)
error("Error due to", e)
} finally {
shutdownComplete.countDown()
}
info("Stopped")
}
3. maybeTruncate()
上文中,我们向 fetcher 中新增 partition 时,会构造出一个 Truncating 状态的 state 加入到缓存中:
而在 maybeTruncate() 中,就会处理(截断)这些 truncating 状态的日志:
private def maybeTruncate(): Unit = {
// 从 partitionStates 取出 TopicPartition,据此找到 Log 类实例,
// 然后取出 leaderEpochCache 中最后一项(即最新的 leaderEpoch 对应的日志 index)
// 根据是否找到,将 TopicPartition 分为两组
val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()
if (partitionsWithEpochs.nonEmpty) {
// 如果有 leaderEpoch 及对应的 offset,可以少截取一点
truncateToEpochEndOffsets(partitionsWithEpochs)
}
if (partitionsWithoutEpochs.nonEmpty) {
// 如果没有,则出于安全考虑,只能从所有 follower 都确认过的 hw 截取了
truncateToHighWatermark(partitionsWithoutEpochs)
}
}
分组的代码很简单:
private def fetchTruncatingPartitions(): (Map[TopicPartition, EpochData], Set[TopicPartition]) = inLock(partitionMapLock) {
val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
partitionStates.partitionStateMap.forEach { (tp, state) =>
if (state.isTruncating) {
latestEpoch(tp) match {
case Some(epoch) if isOffsetForLeaderEpochSupported =>
partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch)
case _ =>
partitionsWithoutEpochs += tp
}
}
}
(partitionsWithEpochs, partitionsWithoutEpochs)
}
a. truncateToEpochEndOffsets()
/**
* - Build a leader epoch fetch based on partitions that are in the Truncating phase
* - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
* leader epoch. This is the offset the follower should truncate to ensure
* accurate log replication.
* - Finally truncate the logs for partitions in the truncating phase and mark the
* truncation complete. Do this within a lock to ensure no leadership changes can
* occur during truncation.
*
* - 针对 Truncating 状态的 partition,构建一个 api 请求用于查询 leader epoch
* - 发送 OffsetsForLeaderEpochRequest,从 leader 取回最新的 leaderEpoch 及对应的 log offset
* 这个 log offset 就是应该截取的位置
* - 最后,执行 truncate 操作,并标记为 truncation complete 状态。这部分操作需在 partitionMapLock 中执行,
* 以确保在此期间没有 leadership 变更。
*
*/
private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = {
// 从 fetch target(即 leader 副本)查询最新 leaderEpoch 对应的 log offset
val endOffsets = fetchEpochEndOffsets(latestEpochsForPartitions)
//Ensure we hold a lock during truncation.
inLock(partitionMapLock) {
//Check no leadership and no leader epoch changes happened whilst we were unlocked, fetching epochs
//
// 外层从 partitionStates 中取的 topicPartition 来获取 latestEpoch,
// 没有加锁,到这里有可能已经不存在了/被更新了,所以需要再做一次过滤,确保
// 1. 存在
// 2. leaderEpoch 相同(领导权未发生过变更)
val epochEndOffsets = endOffsets.filter { case (tp, _) =>
val curPartitionState = partitionStates.stateValue(tp)
val partitionEpochRequest = latestEpochsForPartitions.getOrElse(tp, {
throw new IllegalStateException(
s"Leader replied with partition $tp not requested in OffsetsForLeaderEpoch request")
})
val leaderEpochInRequest = partitionEpochRequest.currentLeaderEpoch.get
curPartitionState != null && leaderEpochInRequest == curPartitionState.currentLeaderEpoch
}
// 执行日志截断操作
val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
// 处理异常,通常是由于 leaderShip 变更导致的:
// 更新 partitionStates 中的状态,标记为 replica.fetch.backoff.ms 延时后再 fetch
handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets")
// 将成功 truncate 的 tp,状态设置为 Fetching,等待拉取数据
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
}
maybeTruncateToEpochEndOffsets 的源码如下:
private def maybeTruncateToEpochEndOffsets(fetchedEpochs: Map[TopicPartition, EpochEndOffset],
latestEpochsForPartitions: Map[TopicPartition, EpochData]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.HashSet.empty[TopicPartition]
fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
leaderEpochOffset.error match {
case Errors.NONE =>
// Returns truncation offset and whether this is the final offset to truncate to
//
// 处理每个 topic-partition 的日志截断。
// 返回要截断的 offset 以及是否为 最终 offset。
//
// For each topic partition, the offset to truncate to is calculated based on leader's returned
// epoch and offset:
//
// 对于每个 topic-partition,截断 offset 的计算依靠 leader 返回的 epoch 和 offset:
//
// -- If the leader replied with undefined epoch offset, we must use the high watermark. This can
// happen if 1) the leader is still using message format older than KAFKA_0_11_0; 2) the follower
// requested leader epoch < the first leader epoch known to the leader.
//
// -- 如果 leader 返回的 offset 是未定义(UNDEFINED_EPOCH_OFFSET),则必须使用 hw
// 这种情况可能出现在:
// 1)leader 仍在使用旧版本消息格式(早于 KAFKA_0_11_0);
// 2)follower 请求的 leader epoch 小于 leader 所知道的最早 epoch
//
// -- If the leader replied with the valid offset but undefined leader epoch, we truncate to
// leader's offset if it is lower than follower's Log End Offset. This may happen if the
// leader is on the inter-broker protocol version < KAFKA_2_0_IV0
//
// -- 如果 leader 返回了有效 offset 但未定义的 leaderEpoch(UNDEFINED_EPOCH),
// 如果该 offset 小于 follower 的日志末尾(Log End Offset),就截断到 leader 的 offset。
// 这种情况可能发生在 leader 使用低于 KAFKA_2_0_IV0 的协议版本
//
// -- If the leader replied with leader epoch not known to the follower, we truncate to the
// end offset of the largest epoch that is smaller than the epoch the leader replied with, and
// send OffsetsForLeaderEpochRequest with that leader epoch. In a more rare case, where the
// follower was not tracking epochs smaller than the epoch the leader replied with, we
// truncate the leader's offset (and do not send any more leader epoch requests).
//
// -- 如果 leader 返回的 epoch 对 follower 是未知的,
// 就截断到比该 epoch 小的最大已知 epoch 的日志末尾 offset,并重新发起 OffsetsForLeaderEpoch 请求。
// 如果 follower 甚至没有记录这些较小 epoch 的信息(很少见),
// 就直接截断到 leader 返回的 offset,并不再发起后续 epoch 请求。
//
// -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that
// leader replied with, follower's Log End Offset).
//
// -- 否则,就截断到以下三者中的最小值:
// leader 返回的 offset、follower 在该 epoch 上的末尾 offset、follower 的日志末尾 offset。
//
val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset)
// 截断日志
if(doTruncate(tp, offsetTruncationState))
// 截断成功,记录截取的位置
fetchOffsets.put(tp, offsetTruncationState)
......
case error =>
info(s"Retrying leaderEpoch request for partition $tp as the leader reported an error: $error")
partitionsWithError += tp
}
}
ResultWithPartitions(fetchOffsets, partitionsWithError)
}
b. truncateToHighWatermark()
private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
// 外层默认赋值是 hw,所以这里是从 hw 开始截断
for (tp <- partitions) {
val partitionState = partitionStates.stateValue(tp)
if (partitionState != null) {
val highWatermark = partitionState.fetchOffset
val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)
info(s"Truncating partition $tp to local high watermark $highWatermark")
// 执行日志截断
if (doTruncate(tp, truncationState))
fetchOffsets.put(tp, truncationState)
}
}
// 将成功 truncate 的 tp,状态设置为 Fetching,等待拉取数据
updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}
c. 截断日志 *
截断日志,是 Log 模块的代码,我们曾经遇到过,但并没有细看。这里我们温习一下 Log 相关的内容,看看具体是怎么实现的:
def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean): Unit = {
val affectedLogs = ArrayBuffer.empty[Log]
for ((topicPartition, truncateOffset) <- partitionOffsets) {
val log = {
if (isFuture)
futureLogs.get(topicPartition)
else
currentLogs.get(topicPartition)
}
// If the log does not exist, skip it
if (log != null) {
// May need to abort and pause the cleaning of the log, and resume after truncation is done.
// Kafka 的 Log Cleaner 线程,用于清理压缩日志。
//
// 如果截断 offset 小于当前活跃 segment 的起始 offset,
// 说明 Cleaner 有可能在处理将被截断的 segment,
// 需要通知 Log Cleaner 先暂停处理这个 log,避免出现一致性问题。
val needToStopCleaner = truncateOffset < log.activeSegment.baseOffset
if (needToStopCleaner && !isFuture)
abortAndPauseCleaning(topicPartition)
try {
if (log.truncateTo(truncateOffset))
affectedLogs += log
// 如果截断位置在 active segment 前,意味着之前的 checkpoint 也可能过时,必须一并更新。
if (needToStopCleaner && !isFuture)
maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log, topicPartition)
} finally {
if (needToStopCleaner && !isFuture) {
// 恢复 Log Cleaner
resumeCleaning(topicPartition)
}
}
}
}
// 更新 log 的 recovery point
// 清理日志压缩的 snapshot
for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) {
checkpointRecoveryOffsetsAndCleanSnapshotsInDir(dir, logs)
}
}
下面是 Log 类中真正的截取方法:
private[kafka] def truncateTo(targetOffset: Long): Boolean = {
maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") {
if (targetOffset < 0)
throw new IllegalArgumentException(s"Cannot truncate partition $topicPartition to a negative offset (%d).".format(targetOffset))
if (targetOffset >= logEndOffset) {
info(s"Truncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1}")
// Always truncate epoch cache since we may have a conflicting epoch entry at the
// end of the log from the leader. This could happen if this broker was a leader
// and inserted the first start offset entry, but then failed to append any entries
// before another leader was elected.
lock synchronized {
leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
}
false
} else {
info(s"Truncating to offset $targetOffset")
lock synchronized {
checkIfMemoryMappedBufferClosed()
if (segments.firstEntry.getValue.baseOffset > targetOffset) {
// 截掉所有的 segments,从 targetOffset 重新开始:
// 1. 从 segment list 中移除所有的 segment
// 为日志和索引文件增加 .deleted 后缀,等待 file.delete.delay.ms 时间后物理删除
// 2. 创建新的 segment
// 更新 leaderEpochCache
// 清空 producerStateManager
// 3. 更新 logStartOffset、nextOffsetMetadata、recoveryPoint、HW
// 更新 producerState
truncateFullyAndStartAt(targetOffset)
} else {
// 在现有的 segments 范围内截取
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
// 从 segment list 中移除这部分 segment
// 为日志和索引文件增加 .deleted 后缀,等待 file.delete.delay.ms 时间后物理删除
removeAndDeleteSegments(deletable, asyncDelete = true, LogTruncation)
// 截断索引,截断日志
activeSegment.truncateTo(targetOffset)
// 更新 leaderEpochCache
leaderEpochCache.foreach(_.truncateFromEnd(targetOffset))
// 更新 logStartOffset、nextOffsetMetadata、recoveryPoint、HW、producerState
completeTruncation(
startOffset = math.min(targetOffset, logStartOffset),
endOffset = targetOffset
)
}
true
}
}
}
}
d. 更新 state
对于截断成功的 TopicPartition,需要将状态从 truncating 更新到 fetching,以备后续拉取数据
/**
* Loop through all partitions, updating their fetch offset and maybe marking them as
* truncation completed if their offsetTruncationState indicates truncation completed
*
* 遍历所有 partitionStates,如果是刚刚处理过的(在 fetchOffsets 中)
* 则检查它的 offsetTruncationState.truncationCompleted 字段,
* 如果 True,表示截断完成,则更新状态为 Fetching
* 如果 False,则更新状态为 Truncating
*
* @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete
*/
private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = {
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala
.map { case (topicPartition, currentFetchState) =>
val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
// 存在于 fetchOffsets 中(即刚刚执行过 truncate 流程的)
case Some(offsetTruncationState) =>
// 将 truncate 成功的 tp 的状态更新为 Fetching,表示可以正常拉取数据了
val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating
PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag,
currentFetchState.currentLeaderEpoch, currentFetchState.delay, state)
// 如果不是刚刚处理过的,则维持状态不变
case None => currentFetchState
}
(topicPartition, maybeTruncationComplete)
}
partitionStates.set(newStates.asJava)
}
4. maybeFetch()
经过 maybeTruncate() 日志被截断到正确的位置了,这时候就可以从 leader 拉取日志了:
private def maybeFetch(): Unit = {
val fetchRequestOpt = inLock(partitionMapLock) {
// 检查所有的 partition state,
// 筛掉不是 fetching 状态的 或者 被限流的条目,
// 构造请求,并返回 offline 状态的分区(partitionsWithError)
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
// 为 state 中增加一个 delay 值(来自配置 replica.fetch.backoff.ms 默认值 100),
// 表示延迟 delay ms 后再重新尝试
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
if (fetchRequestOpt.isEmpty) {
// 没有任何 fetch 任务时,等待一段时间(这个时间和上面的 delay 值取自同一个参数)
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequestOpt
}
fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
// 发送并处理 fetch 请求
processFetchRequest(sessionPartitions, fetchRequest)
}
}
我从 PartitionFetchState 的源码中可以看到,延迟重试,其实就是给 state 中设置一个未来的时间点 DelayedItem ;而判断是否是延迟任务(延迟任务是否到期),则就是看这个时间点是否小于当前时间:
case class PartitionFetchState(fetchOffset: Long,
lag: Option[Long],
currentLeaderEpoch: Int,
delay: Option[DelayedItem],
state: ReplicaState) {
......
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
......
}
class DelayedItem(val delayMs: Long) extends Delayed with Logging {
private val dueMs = Time.SYSTEM.milliseconds + delayMs
def getDelay(unit: TimeUnit): Long = {
unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
}
下面是处理 leader response 的代码:
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
var responseData: Map[TopicPartition, FetchData] = Map.empty
try {
// 以 ApiKeys.FETCH 发送请求,拉取数据
trace(s"Sending fetch request $fetchRequest")
responseData = fetchFromLeader(fetchRequest)
} catch {
......
}
......
if (responseData.nonEmpty) {
inLock(partitionMapLock) {
responseData.forKeyValue { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
// the current offset is the same as the offset requested.
//
// 只处理 partition fetch state == Fetching 且 offset 与预期一致的 TopicPartition
// 找到 request param
val fetchPartitionData = sessionPartitions.get(topicPartition)
// 1. 是本次请求的 partition
// 2. 请求中的 offset 和 partition state 中的 fetch offset 一致(即,在请求期间,partition state 没有更新过)
// 3. state == Fetching && delayTime == 0
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
// request leader epoch
val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None
partitionData.error match {
case Errors.NONE =>
try {
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
// 一旦把数据传递给 process 流程,本线程就不能再修改这些数据了
// 将数据 append 到 log 文件的最后,其中就调用了日志模块的 Log.append() 方法。
val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
partitionData)
logAppendInfoOpt.foreach { logAppendInfo =>
// 更新 follower 的落后值 lag
val validBytes = logAppendInfo.validBytes
val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
......
}
} catch {
case ime@( _: CorruptRecordException | _: InvalidRecordException) =>
// 消息 CRC 校验失败 或 消息格式非法
// we log the error and continue. This ensures two things
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread
// down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes
// can cause this), we simply continue and should get fixed in the subsequent fetches
error(s"Found invalid messages during fetch for partition $topicPartition " +
s"offset ${currentFetchState.fetchOffset}", ime)
partitionsWithError += topicPartition
case e: KafkaStorageException =>
// 副本离线、IO 异常等情况
error(s"Error while processing data for partition $topicPartition " +
s"at offset ${currentFetchState.fetchOffset}", e)
markPartitionFailed(topicPartition)
case t: Throwable =>
// stop monitoring this partition and add it to the set of failed partitions
error(s"Unexpected error occurred while processing data for partition $topicPartition " +
s"at offset ${currentFetchState.fetchOffset}", t)
markPartitionFailed(topicPartition)
}
case Errors.OFFSET_OUT_OF_RANGE =>
// follower 请求的 offset 不再 leader 的日志范围内
if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
partitionsWithError += topicPartition
case Errors.UNKNOWN_LEADER_EPOCH =>
// follower 请求中携带的 leader epoch > leader epoch
debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
partitionsWithError += topicPartition
case Errors.FENCED_LEADER_EPOCH =>
// follower 请求中携带的 leader epoch < leader epoch
if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
case Errors.NOT_LEADER_OR_FOLLOWER =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
"that the partition is being moved")
partitionsWithError += topicPartition
case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
warn(s"Received ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from the leader for partition $topicPartition. " +
"This error may be returned transiently when the partition is being created or deleted, but it is not " +
"expected to persist.")
partitionsWithError += topicPartition
case _ =>
error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",
partitionData.error.exception)
partitionsWithError += topicPartition
}
}
}
}
}
}
// 处理失败的分区(为 state 中增加一个 delay值,延迟一段时间后重试)
if (partitionsWithError.nonEmpty) {
handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
}
}
对于正常情况,kafka 会将数据 append 到 log 文件中(append 的具体实现我们在 Kafka源码·二 - Log 这一节已经读过啦):
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
val logTrace = isTraceEnabled
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
val log = partition.localLogOrException
val records = toMemoryRecords(partitionData.records)
maybeWarnIfOversizedRecords(records, topicPartition)
// 确保已经 truncate 到正确的位置
if (fetchOffset != log.logEndOffset)
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, log.logEndOffset))
if (logTrace)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
// 将 leader 的数据 append 到本地 log 文件中
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
if (logTrace)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(log.logEndOffset, records.sizeInBytes, topicPartition))
val leaderLogStartOffset = partitionData.logStartOffset
// leader 会在 follower 拉取数据的同时发送 hw 值,以便 follower 更新本地 hw
val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
// leader 会在 follower 拉取数据的同时发送 log start offset 值,以便 follower 更新本地 log start offset
log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
if (logTrace)
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
// For the follower replica, we do not need to keep its segment base offset and physical position.
// These values will be computed upon becoming leader or handling a preferred read replica fetch.
//
// 对于 follower replica,我们不需要保留其 segment base offset 和物理位置。
// 这些值将在成为 leader 时或处理 preferred read replica 请求时计算。
......
// 更新 metric
if (partition.isReassigning && partition.isAddingLocalReplica)
brokerTopicStats.updateReassignmentBytesIn(records.sizeInBytes)
brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
logAppendInfo
}
至此,一次拉取数据任务就完成了,此后,FecherThread 会不断在在循环中重复下面的任务,以保证主从数据的一致性:
override def doWork(): Unit = {
maybeTruncate()
maybeFetch()
}