开发技术分享

  • 文章分类
    • 技术
    • 工作
    • 相册
    • 杂谈
    • 未分类
  • 工具
    • AI 试衣
靡不有初,鲜克有终
[换一句]
  1. 首页
  2. 技术
  3. 正文

Kafka源码·二 - Log

2024年10月28日 1003点热度

上一节学习了一下 LogSegment 的相关知识,今天来看看 kafka 中 Log 对象的具体构造。不过在此之前,需要先了解一下面几个知识:

一、高水位 High Watermark

1. 高水位的含义

在 kafka 中,HW (High watermark 高水位)的作用主要有两个:

  1. 定义消息可见性,它被用来标识分区下哪些消息是可以被消费者消费的
  2. 帮助完成副本同步

在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。而消费者只能消费已提交消息,或者说消费者只能消费高水位以下的消息,即上图中 offset 小于 8 的消息,而当前高水位是 8,就表示 8 和 8+ 的数据是不能被消费的。

需要注意的是,这是在不考虑事务的情况下的简单模型,因为事务会影响消费者所能看见的消息范围,而日志对事务型消费者的可见性,由 LSO(Log Stable Offset)的值来决定,这个不是我们今天讨论的主题,先略过不谈。

在上面的图中,还有一个 LEO(Log End Offset),它表示分区中下一条待写入消息的位移值。所以我们可以得到结论:介于高水位和 LEO 之间的消息属于未提交消息;这也就意味着:对于分区对应的 log 对象,它的高水位一定 ≤ LEO 值。

2. 高水位更新机制

Kafka 中,分区 partition 的所有的副本中(不论是 leader 还是 follower)都保存着各自的 HW 和 LEO 值。分区的高水位值就是 Leader 副本的高水位值,同时,leader 副本所在的 broker 上还保存了其他 follower 的 LEO。

以下是他们的更新时机:

Broker 1 Follower LEO:当 Follower 从 Leader 拉取消息,写入本地磁盘之后,会更新其 LEO 值
Broker 1 Follower HW: 当 Follower 更新完 LEO 之后,会比较其 LEO 和 Leader 发过来的 leader HW,取较小的一个作为自己的 HW
Broker 0 Leader LEO:当 Leader 接收生产者的消息,写入本地磁盘之后,会更新其 LEO
Broker 0 Follower LEO:当 Follower 从 Leader 拉取消息时,会告诉 Leader 从哪个 offset 开始拉取,Leader 会用这个 offset 更新远程副本的 LEO
Broker 0 Leader HW:主要有两个更新时机,一个是 Leader 更新完 LEO 之后;另一个是更新完 Follower 的 LEO 之后。具体算法是:取 Leader 和所有与 Leader 保持同步的远程副本 LEO 的最小值
Broker 0 Follower HW: 不更新(上图中标注为灰色的部分)

下面,我们分别从 Leader 副本和 Follower 副本两个维度,来捋一捋高水位和 LEO 的更新机制。

对于 Leader 副本

处理生产者的请求逻辑如下:

  1. 写入消息到本地磁盘
  2. 更新 Leader 副本 LEO
  3. 更新分区高水位值
    1. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2...LEO-n)
    2. 获取 Leader 副本高水位 currentHW
    3. 更新 currentHW = max(currentHW,min(LEO-1,LEO-2...LEO-n))

处理 Follower 副本拉取消息的逻辑如下:

  1. 使用 Follower 副本发送的请求中的位移值,更新远程副本的 LEO 值
  2. 更新分区高水位值(具体步骤与上文处理生产者请求的步骤相同)
    1. 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2...LEO-n)
    2. 获取 Leader 副本高水位 currentHW
    3. 更新 currentHW = max(currentHW,min(LEO-1,LEO-2...LEO-n))

这里其实存在着一个问题, Follower 拉取消息的时候,请求中的位移值(fetchOffset)是表示想要从 fetchOffset 开始拉取消息,所以,只有等再下一次拉取消息的时候,才能得到这一次拉取完消息之后的 LEO,他们之间有一个滞后性。这个滞后性,其实是很多“数据丢失”或“数据不一致”问题的根源。这个问题是通过一个叫 Leader Epoch 的东西解决的,我们会在下一节详细介绍它。

对于 Follower 副本

从 Leader 拉取消息的逻辑如下:

  1. 写入消息到本地磁盘
  2. 更新 LEO
  3. 更新高水位值
    1. 获取 Leader 发送的高水位值:currentHW
    2. 获取步骤 2 中更新过的 LEO 值:currentLEO
    3. 更新高水位为 min(currentHW, currentLEO)

二、Leader Epoch

从上文中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。而如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来解决因高水位更新错配导致的各种不一致问题。

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移

举个例子来说明一下 Leader Epoch。假设现在有两个 Leader Epoch<0,0> 和 <1,120>,那么,第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。

从 LeaderEpochCheckpointFile 对象中的 Formatter 对象的源码中,我们可以看到具体写入和读取的 toLine 和 fromLine 方法:

  object Formatter extends CheckpointFileFormatter[EpochEntry] {

    // 写到 checkpoint 文件里的时候,就是用这个方法
    override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"

    // 从文件中读取一行数据,用这个方法构建 EpochEntry
    override def fromLine(line: String): Option[EpochEntry] = {
      WhiteSpacesPattern.split(line) match {
        case Array(epoch, offset) =>
          Some(EpochEntry(epoch.toInt, offset.toLong))
        case _ => None
      }
    }
  }

三、Log 类源码

先来看看 Log 方法的签名:

/**
 * An append-only log for storing messages.
 *
 * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
 *
 * log 由多个 LogSegments 构成;每个 segment 都有一个 base offset 参数,代表 segment 中第一条消息
 *
 * New log segments are created according to a configurable policy that controls the size in bytes or time interval
 * for a given segment.
 *
 * 系统会根据配置文件中预设的策略(比如当前 segment 大小、固定的时间间隔等),创建新的 segment
 *
 * @param _dir The directory in which log segments are created.
 * @param config The log configuration settings
 * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
 *                       The logStartOffset can be updated by :
 *                       - user's DeleteRecordsRequest
 *                       - broker's log retention
 *                       - broker's log truncation
 *                       The logStartOffset is used to decide the following:
 *                       - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
 *                         It may trigger log rolling if the active segment is deleted.
 *                       - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
 *                         we make sure that logStartOffset <= log's highWatermark
 *                       Other activities such as log cleaning are not affected by logStartOffset.
 *                       
 *                       日志中第一条日志的 offset,这是一个 volatile 类型的变量,存在被多个线程更新的可能,可能得来源有:
 *                       1. 用户删除消息
 *                       2. broker 日志保留机制(kafka 支持根据时间、日志大小、offset 设置日志保留策略)
 *                       3. broker 日志截断
 *
 *                       logStartOffset 的作用是:
 *                       - 删除日志。nextOffset <= Log.logStartOffset 的 segment 都可以删掉
 *                       - 作为 ListOffsetRequest 请求中的 Earliest offset 值返回。为了避免用户在向前追溯日志的时候触发 OffsetOutOfRange exception,
 *                         kafka 会确保 logStartOffset <= highWatermark
 *
 *                       
 * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
 * @param scheduler The thread pool scheduler used for background actions
 * @param brokerTopicStats Container for Broker Topic Yammer Metrics
 * @param time The time instance used for checking the clock
 * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
 * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
 */
@threadsafe
class Log(@volatile private var _dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          val time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,               // 对应的分区
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {

  /* A lock that guards all modifications to the log */
  /* 所有的 modify 操作都用这个 lock.synchronized 加锁 */
  private val lock = new Object

  // 封装着下一条待插入消息的位移信息,包括:
  // 1. 它的 offset
  // 2. 所处的 segment 的 baseOffset
  // 3. 所处的 segment 的日志文件的物理位置
  @volatile private var nextOffsetMetadata: LogOffsetMetadata = _

    /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
   * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
   * equals the log end offset (which may never happen for a partition under consistent load). This is needed to
   * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
   */
   // 持有 current HW 信息,kafka 会确保 offset ≥ HW 的 segment 的数据不被删除
   //(即确保不会删除包含未提交消息的 segment)
   // 这意味着只有当一个 active segment 的 HW 等于 LEO 的时候,才允许删除数据
   //(通常在一个 active segment 中,持续有数据写入,HW 总是在追 LEO,永远也达不成删除的条件)
   // 这个机制是必要的,可以防止 log start offset 超过 HW,从而使消费者跳过一部分数据
  @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

  /* the actual segments of the log */
  // Log 中所有的 segment 对象,都放在这个 concurrent map 中
  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

  // 保存着一组 leaderEpoch 对象,即:谁从哪个 offset 开始,变成 leader 的
  @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
}

Log 类的初始化

了解了 Log 中这几个核心属性之后,再来看一下 Log 类的初始化逻辑:

locally {

    // create the log directory if it doesnt exist
    // 创建目录
    Files.createDirectories(dir.toPath)

    // 初始化上面的 leaderEpochCache 变量,包括:
    // 1. 从文件(leader-epoch-checkpoint)中读出 EpochEntry(epoch, startOffset),
    //    保存成一个 treeMap,Map 的 key 是 epoch,value 是 EpochEntry
    // 2. topic\partition 信息
    initializeLeaderEpochCache()

    // 从磁盘上加载 segment 文件,生成  LogSegment 对象 ,
    // 放入到上面的 segments map(ConcurrentNavigableMap)中,并返回 nextOffset
    val nextOffset = loadSegments()

    /* Calculate the offset of the next message */
    // 构建下一条带插入消息的 offset 元数据
    nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

    // 丢弃掉 leader epoch 中的 offset 大于等于 nextOffset 的数据,保证数据合法
    leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))

    /**
     * logStartOffset 的来源:log 目录下的 log-start-offset-checkpoint 文件,
     * kafka 在启动加载 log 的时候,会将文件里的内容转化为 <partition,Long> 的 map,
     * 文件的结构如下:
     * -----checkpoint file begin------
     * 0                <- OffsetCheckpointFile.currentVersion
     * 2                <- following entries size
     * tp1  par1  1     <- the format is: TOPIC  PARTITION  OFFSET
     * tp1  par2  2
     * -----checkpoint file end----------
     */
    // 保证数据合法性,取 log-start-offset-checkpoint 文件和 segment 实际数据中,更大的那一个
    updateLogStartOffset(math.max(logStartOffset, segments.firstEntry.getValue.baseOffset))

    // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
    // 保证数据合法性,清理掉无用的 leaderEpoch entry(即 offset < logStartOffset 的所有 entry),
    // 然后更正最早一条,将 entry 的 offset 设置为 logStartOffset
    leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))

    // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
    // from scratch.
    // 加载 segment 或者 recovery 都不允许使用 producerStateManager。下面会重新构建 Producer State
    if (!producerStateManager.isEmpty)
      throw new IllegalStateException("Producer state must be empty during log initialization")

    // hasCleanShutdownFile: 如果有这个文件,说明 broker 是正常关闭的。
    // ProducerStateManager:它维护了一个从生产者 id 到其最近追加条目(如 epoch、sequence number、last offset 等)元数据的 map
    // 只要生产者 id 仍包含在此映射中,相应的生产者就可以继续写入数据。
    // 除非生产者很久不写日志,或者写入的所有消息都被删除(retention policy=delete),这个 id 就会过期。
    // 这个东西暂时不涉及主线功能,这里暂时不深究了.
    loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
  }

接下来,我们来看一看里面的几个关键步骤

初始化 LeaderEpochCache

  private def initializeLeaderEpochCache(): Unit = lock synchronized {

    // 声明一个 leaderEpochFile 对象(leader-epoch-checkpoint 文件)
    val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)

    // 从文件中读出 EpochEntry(epoch, startOffset),保存成一个 treeMap,Map 的 key 是 epoch,value 是 EpochEntry
    def newLeaderEpochFileCache(): LeaderEpochFileCache = {
      // LogDirFailureChannel 是 Kafka 用于管理和处理日志目录的故障。在 broker 执行磁盘 I/O 操作的时候,
      // 如果遇到 I/O 异常(如 IOException),会把出现问题的日志目录名称添加到 LogDirFailureChannel 中,
      // 标记为离线状态,一旦某个日志目录被标记为离线状态,相关线程就收到通知,去处理这个异常。
      // 离线的日志目录会一直保持离线状态 ,直到 Kafka 代理被重启为止
      val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
      new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile)
    }

    // 这里的 record version 和 magic 以及 message format version 是同一个东西,表示 kafka 的数据的版本
    if (recordVersion.precedes(RecordVersion.V2)) {

      // 早期版本,直接删除 leader-epoch-checkpoint 文件
      val currentCache = if (leaderEpochFile.exists())
        Some(newLeaderEpochFileCache())
      else
        None

      if (currentCache.exists(_.nonEmpty))
        warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")

      Files.deleteIfExists(leaderEpochFile.toPath)
      leaderEpochCache = None
    } else {
      leaderEpochCache = Some(newLeaderEpochFileCache())
    }
  }

Load segments

  private def loadSegments(): Long = {
    // first do a pass through the files in the log directory and remove any temporary files
    // and find any interrupted swap operations
    // 清理 .delete 文件: 需要被删除的 log 文件和 index 文件
    // 清理 .cleaned 文件: 在log compaction 过程中宕机,文件中的数据状态未知,需要删除掉
    // 清理所有的无法使用的 .swap 文件: KAFKA-6264
    // 返回可用的 .swap 文件: 即在 log compaction 结束后,替换原文件时宕机而遗留下的文件,只需要再做一次替换操作就可以了
    val swapFiles = removeTempFilesAndCollectSwapFiles()

    // Now do a second pass and load all the log and index files.
    // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
    // this happens, restart loading segment files from scratch.
    // 加载日志文件和索引文件
    // 可能会遇到带有偏移量溢出的旧版日志段(KAFKA-6264)。需要将这些日志段拆分出来。当这种情况发生时,从头开始重新加载日志段文件。
    retryOnOffsetOverflow {
      // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
      // loading of segments. In that case, we also need to close all segments that could have been left open in previous
      // call to loadSegmentFiles().
      // 如果遇到带有偏移量溢出的日志段,会抛出 LogSegmentOffsetOverflowException,重试逻辑将会将其拆分。然后我们需要重试加载这些日志段。
      // 在这种情况下,还需要关闭在之前调用 loadSegmentFiles() 时可能已打开的所有日志段。
      logSegments.foreach(_.close())
      segments.clear()
      loadSegmentFiles()
    }

    // Finally, complete any interrupted swap operations. To be crash-safe,
    // log files that are replaced by the swap segment should be renamed to .deleted
    // before the swap file is restored as the new segment file.
    // 最后,完成被中断的交换操作。
    // 为了保证在崩溃情况下的数据安全, 被 swap segment 替换的日志文件应在 swap 文件恢复为新的 segment 文件之前重命名为 .deleted。
    completeSwapOperations(swapFiles)

    if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {

      // 如果日志目录存在,则执行 recoverLog
      val nextOffset = retryOnOffsetOverflow {
        // 根据 log 文件重建所有的 index 文件,并砍掉 log 和 index 末尾的无效 Bytes
        recoverLog()
      }

      // reset the index size of the currently active log segment to allow more entries
      activeSegment.resizeIndexes(config.maxIndexSize)
      nextOffset
    } else {
       // 如果没有可用的目录,则创建一个空的日志文件
       if (logSegments.isEmpty) {
          addSegment(LogSegment.open(dir = dir,
            baseOffset = 0,
            config,
            time = time,
            fileAlreadyExists = false,
            initFileSize = this.initFileSize,
            preallocate = false))
       }
      0
    }
  }

再看看核心的 loadSegmentFiles() 方法:

  private def loadSegmentFiles(): Unit = {
    // load segments in ascending order because transactional data from one segment may depend on the
    // segments that come before it
    // 因为事务型的数据可能和数据的时间关系有关,因此这里需要按时间顺序加载 segment 数据
    for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
      if (isIndexFile(file)) {
        // if it is an index file, make sure it has a corresponding .log file

        // 确保索引文件有对应的(同名的,或者说同 offset 的) log 文件
        val offset = offsetFromFile(file)  // 读文件名,即就是当前日志的 base offset
        val logFile = Log.logFile(dir, offset)
        if (!logFile.exists) {
          warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
          Files.deleteIfExists(file.toPath)
        }

      } else if (isLogFile(file)) {
        // if it's a log file, load the corresponding log segment

        // 读文件名,确定 baseOffset
        val baseOffset = offsetFromFile(file)
        val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()

        // 创建 LogSegment 对象
        val segment = LogSegment.open(dir = dir,
          baseOffset = baseOffset,
          config,
          time = time,
          fileAlreadyExists = true)

        try segment.sanityCheck(timeIndexFileNewlyCreated)
        catch {
          case _: NoSuchFileException =>
            error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
              "recovering segment and rebuilding index files...")
            recoverSegment(segment)
          case e: CorruptIndexException =>
            warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
              s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
            recoverSegment(segment)
        }

        // 将 LogSegment 对象添加到 segment map 中
        addSegment(segment)
      }
    }
  }

然后是 completeSwapOperations() 方法:

  private def completeSwapOperations(swapFiles: Set[File]): Unit = {
    for (swapFile <- swapFiles) {
      val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
      val baseOffset = offsetFromFile(logFile)

      // 从磁盘中加载 swap segment
      val swapSegment = LogSegment.open(swapFile.getParentFile,
        baseOffset = baseOffset,
        config,
        time = time,
        fileSuffix = SwapFileSuffix)
      info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")

      // 以 FileChannelRecordBatch 维度遍历 segment 文件,重建所有的 index 文件,并砍掉 log 和 index 末尾的无效 Bytes
      recoverSegment(swapSegment)

      // We create swap files for two cases:
      // (1) Log cleaning where multiple segments are merged into one, and
      // (2) Log splitting where one segment is split into multiple.
      //
      // 两种情况下会创建 swap 文件:
      // 1. log clean(log compaction)的时候,将多个 segment 合并成一个的时候
      // 2. log splitting(segment 中的数据超过 Int.MAX 条)的时候,将一个 segment 拆分成多个 segment
      //
      // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
      // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
      // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
      // do a replace with an existing segment.
      //
      // 如果找不到原始的 log 文件,也就不需要 replace 了,直接重命名就可以了
      val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
        segment.readNextOffset > swapSegment.baseOffset
      }
      replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
    }
  }

至此,segmets 文件,就全部加载到 kafka 中了。

删除 segments

源码中对于删除 segment 有一个总的入口,从下面方法的实现可以清楚的看到哪些场景下会执行删除操作

  /**
   * If topic deletion is enabled, delete any log segments that have either expired due to time based retention
   * or because the log size is > retentionSize.
   *
   * 如果配置文件里的 clean policy 是 delete,则删除超过生存期限的日志,以及日志大小超限的日志
   *
   * Whether or not deletion is enabled, delete any log segments that are before the log start offset
   *
   * 不论 clean policy 是什么,offset 小于 log start offset 的日志全部删除
   */
  def deleteOldSegments(): Int = {
    if (config.delete) {
      // Breach: 违反\违背
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

这三个方法的实现也非常简单:

  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds

    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      startMs - segment.largestTimestamp > config.retentionMs
    }

    deleteOldSegments(shouldDelete, RetentionMsBreach)
  }

  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, RetentionSizeBreach)
  }

  private def deleteLogStartOffsetBreachedSegments(): Int = {
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
    }

    deleteOldSegments(shouldDelete, StartOffsetBreach)
  }

全部都调用了 deleteOldSegments 方法:从前向后,遍历所有的 segments,使用传入的 predicate(即shouldDelete) 方法,找出可以删除的 segments,然后再将它们删除掉

  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
                                reason: SegmentDeletionReason): Int = {
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        deleteSegments(deletable, reason)
      else
        0
    }
  }

下面是遍历所有的 segment 的 deletableSegments 方法:

  /**
   * Find segments starting from the oldest until the user-supplied predicate is false or the segment
   * containing the current high watermark is reached. We do not delete segments with offsets at or beyond
   * the high watermark to ensure that the log start offset can never exceed it. If the high watermark
   * has not yet been initialized, no segments are eligible for deletion.
   *
   * 从最早的 segment 开始遍历,直到传入的 predicate = false 或者达到了包含 high watermark 的 segment。
   * 为了确保 log start offset ≤ high watermark,我们不会删除包含 high watermark 的 segment。
   * 如果 hw 未初始化,则不会删除任何 segment。
   *
   * A final segment that is empty will never be returned (since we would just end up re-creating it).
   *
   * @param predicate A function that takes in a candidate log segment and the next higher segment
   *                  (if there is one) and returns true iff it is deletable
   * @return the segments ready to be deleted
   */
  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)

        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

然后再执行删除操作:

  private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          checkIfMemoryMappedBufferClosed()

          // 删除 segment:
          // 首先调用 segments.remove() 方法,从 segments map 中删除对应的 segment,
          // 然后给 segment 对应的 log|offsetIndex|timeIndex|txnIndex 文件加上后缀 .deleted
          // 然后调用 scheduler 异步删除这些文件
          // scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
          removeAndDeleteSegments(deletable, asyncDelete = true, reason)

          // 如果上一步删除了日志,则 LogStartOffset 有可能发生改变
          // 当 newLogStartOffset > oldLogStartOffset 的时候才会更新
          // 此时,
          // 如果 newLogStartOffset > highWatermark,则同时更新 hw
          // 由于 logStartOffset 发生了变化,此时还需要同步更新 leaderEpochCache 中的内容
          // (参考上文的 truncateFromStart 方法)
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentDeletion)
        }
      }
      numToDelete
    }
  }

向 log 中增加消息

  /**
   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
   *
   * 将数据 append 到 active segment 中,如果 segment 中的数据满了,就滚动创建一个新的 segment
   *
   * This method will generally be responsible for assigning offsets to the messages,
   * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
   *
   * @param records The log records to append
   * @param origin Declares the origin of the append which affects required validations
   * @param interBrokerProtocolVersion Inter-broker message protocol version
   * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
   * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
   * @param ignoreRecordSize true to skip validation of record size.
   * @throws KafkaStorageException If the append fails due to an I/O error.
   * @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
   * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
   * @return Information about the appended messages including the first and last offset.
   */
  private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: ApiVersion,
                     assignOffsets: Boolean,
                     leaderEpoch: Int,
                     ignoreRecordSize: Boolean): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {

      // 校验 CRC、消息大小是否超限。构建 LogAppendInfo,里面保存着这批 records
      // 的 firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, codec
      // 等信息
      val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // records 中的数据为空,直接退出
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // 第一步校验的时候,如果某一个 batch 格式异常,会直接报错退出,
      // 所以 invalid bytes 只有可能出现在最后一个 batch 的后面
      // 这里比较 records.sizeInBytes 和 appendInfo.validBytes,
      // 尝试删除末尾的 invalid bytes(如果有的话)
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        checkIfMemoryMappedBufferClosed()

        // leader 副本调用 append 方法追加数据的时候,assignOffsets = true,会给消息分配一个 offset
        // follower 副本调用 append 方法追加数据的时候,assignOffsets = false,会使用消息体中自带的 offset
        if (assignOffsets) {

          // 分配 offset,其实就是 LogEndOffset
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)

          // 更新 offset,并且做进一步的验证,包括:
          // 1. 启用了 compaction的 topic 的消息,必须包含 key
          // 2. 当 magic >= 1 时,如果内部的消息是压缩过的,他们的 offset 必须是从 0 开始递增的
          // 3. 当 magic >= 1 时,校验或修正时间戳
          // 4. DefaultRecordBatch 中声明的 records 数量必须与实际数量一致
          // 如果消息的版本与 topic 中配置的不一样,这个方法还会将消息转换为 topic 的配置的 message format 版本
          // 为了兼容低版本的数据,这个方法里写了巨多关于格式校验和格式转换的处理代码,我们就不细看了
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              origin,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }

          // 拿到了校验结果,补全 appendInfo 信息
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          // 上一步验证过程中,可能会做重新压缩或者格式转换,所以再次校验一下消息大小
          if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
        } else {
          // we are taking the offsets we are given
          // 采信消息体中的 offset(通常是 follower 副本拉取数据,然后调用 append 方法的场景)

          // 拒绝非递增的消息序列
          if (!appendInfo.offsetsMonotonic)
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            //
            // 异常情况: 待插入消息的 offset 小于 Log End Offset
            //
            // 一个已知的场景是:通过 AdminClient#deleteRecords() 删除了一部分消息,
            // 导致从 Leader 节点获取日志起始 offset 时,该 offset 可能未对齐到批次的边界
            //
            // 通常来说是没得救的,但是在下面的情况下,还是能救的回来:
            // logEndOffset == log.logStartOffset && firstOffset < logEndOffset && appendInfo.lastOffset >= logEndOffset
            // 即:日志为空,并且,LogStartOffset 落在了待插入数据的中间的时候;
            //
            // 在 append 方法的最外层,会 catch 这个异常,然后对日志从 firstOffset 位置做一次 truncateFullyAndStartAt()
            // 然后重复做一次 append
            //
            // 之所以要在最外层处理,是因为需要先删除当前 segment,然后创建一个新的 segment,并以 firstOffset 作为其 baseOffset
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        // 更新 leaderEpochCache(如果需要的话)
        validRecords.batches.forEach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            // 比较入参和 epochCache 中的 item,如果有冲突,根据具体情况,
            // 可能会直接追加一个 item,也有可能截掉一部分 item 然后再追加,
            // 当然也有可能不做任何操作
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            // 在分步升级的场景下,可能会出现消息格式降级。为了确保选举的正确性,
            // 会清理 epoch cache,以便在下一次选举之后,重新从高水位开始截断。

            // 这是因为V2及以上版本中,使用 partitionLeaderEpoch 来明确的指示 epoch 信息,
            // 而旧版本的消息中不包含 partitionLeaderEpoch,它是以 hw 作为主要的截断依据。
            // 如果不清除缓存,可能会导致领导者选举后无法正确回滚到合适的偏移量。
            // 清除缓存后,系统会回退到基于高水位标记的日志截断(truncation by high watermark)机制,
            // 以确保系统的安全性
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // 如果当前 segment 已经满了,就轮转生成一个新的 segment
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        // 验证事务状态
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, origin)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }

        // 实际写入消息,可以在上一篇文章查看具体实现
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 更新 logEndOffset。因为 transaction index 的写入可能失败,而我们又要保证未来写入消息的 offset 递增,
        // 所以在写入消息之后立即更新 LEO。因此产生的 transaction index 不一致将会在日志目录 recovered 的时候被解决。
        // 注意,如果transaction index写入失败时,ProducerStateManager 的 end offset 不会被更新,
        // last stable offset 也不会前进。
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        // 更新事务状态
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        // 不太懂,以后再看
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")

        if (unflushedMessages >= config.flushInterval)
          flush()

        appendInfo
      }
    }
  }

从 log 读取消息


  /**
   * Read messages from the log.
   *
   * @param startOffset The offset to begin reading at
   * @param maxLength The maximum number of bytes to read
   * @param isolation The fetch isolation, which controls the maximum offset we are allowed to read
   * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
   * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
   * @return The fetch data information including fetch starting offset metadata and messages read.
   */
  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
        s"total length $size bytes")

      val includeAbortedTxns = isolation == FetchTxnCommitted

      // Because we don't use the lock for reading, the synchronization is a little bit tricky.
      // We create the local variables to avoid race conditions with updates to the log.
      // 读操作都没有加锁,所以这里也不打算取巧加锁。而是创建一个本地变量,
      // 用来保存最后一条消息的 offset,防止读到脏数据。
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset

      // private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
      // 在 segment map 中,找到 startOffset 所在的那一个 segment,从这个 segment 开始查找
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      val maxOffsetMetadata = isolation match {
        // 表示查询所有日志
        case FetchLogEnd => endOffsetMetadata
        // 表示查询所有 hw 以下的日志,即 consumer 可见的日志
        case FetchHighWatermark => fetchHighWatermarkMetadata
        // 表示查询所有已提交的事务(LSO)的日志
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }

      if (startOffset == maxOffsetMetadata.messageOffset) {
        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }

      // Do the read on the segment with a base offset less than the target offset
      // but if that segment doesn't contain any messages with an offset greater than that
      // continue to read from successive segments until we get some messages or we reach the end of the log
      //
      // 从 baseOffset 小于 startOffset 的 segment 开始遍历后续的所有 segment,查找消息
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue

        val maxPosition = {
          // Use the max offset position if it is on this segment; otherwise, the segment size is the limit.
          // 1. 如果 maxOffsetMetadata 和我们要找的 offset 在同一个 segment, 就以它的 position 为查找的限制
          // 2. 如果 maxOffsetMetadata 和我们要找的 offset 不在同一个 segment,
          //    则说明 maxOffsetMetadata 在后续的 segment 中,那么就以当前 segment 的 size 为查找的限制
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }

        // 这里的 read 方法,我们上一篇文章已经看过了,这里不再赘述
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        if (fetchInfo == null) {
          // 没找到的话,就继续遍历下一个 segment
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else {
          // 找到了就返回
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
      // this can happen when all messages with offset larger than start offsets have been deleted.
      // In this case, we will return the empty set with log end offset metadata
      //
      // 走到这里说明尽管 start offset 是在 max offset 的范围内,但是我们遍历完所有的 segment 也没有找到消息。
      // 这种情况是因为所有 offset 大于 start offset 的消息都被删除了。
      // 这时候,返回下一条待插入消息的 metaData(不包含 records)
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

四、High Watermark 相关源码

从上文中,我们已经看到了 HW 的定义:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

那 LogOffsetMetadata 又是什么呢?我们来看看它的定义:

/*
 * A log offset structure, including:
 *  1. the message offset
 *  2. the base message offset of the located segment
 *  3. the physical position on the located segment
 */

// 封装着消息的位移信息,包括:
// 1. 它的 offset
// 2. 所处的 segment 的 baseOffset
// 3. 所处的 segment 的日志文件的物理位置

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
  ......

  // 通过比较 segmentBaseOffset 可以判断两个 LogOffsetMetadata 是否处于同一个 segment
  def onSameSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
    this.segmentBaseOffset == that.segmentBaseOffset
  }

  // 计算两个 LogOffsetMetadata 的 offset 差值
  def offsetDiff(that: LogOffsetMetadata): Long = {
    this.messageOffset - that.messageOffset
  }

  // compute the number of bytes between this offset to the given offset
  // if they are on the same segment and this offset precedes the given offset
  // 如果两个对象处于同一个 segment,计算他们对应的消息之间差了多少 bytes
  def positionDiff(that: LogOffsetMetadata): Int = {
    if(!onSameSegment(that))
      throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment")
    if(messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info")

    this.relativePositionInSegment - that.relativePositionInSegment
  }

  ......

  override def toString = s"(offset=$messageOffset segment=[$segmentBaseOffset:$relativePositionInSegment])"

}

ok 这里,我们现在了解了 highWatermarkMetadata 对象的基本结构,下来看一看 HW 是如何更新的

获取 HW

  // LogOffsetMetadata 中可能只包含一个 absolute offset,而不包含 segmentBaseOffset 和 relativePositionInSegment,
  // 这时候需要去 Log 中查找出该 offset 对应的 segmentBaseOffset 和 relativePositionInSegment
  // 然后重新封装成 LogOffsetMetadata 并返回
  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    checkIfMemoryMappedBufferClosed()

    val offsetMetadata = highWatermarkMetadata

    // 如果只包含一个 absolute offset,则需要补全其他信息
    if (offsetMetadata.messageOffsetOnly) {
      lock.synchronized {

        // 遍历 segment 文件,找到对应的消息,补全 LogOffsetMetadata 中的 segmentBaseOffset 和 relativePositionInSegment
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)

        // 上一步补全了 LogOffsetMetadata 的信息,这一步更新到 hw 中
        updateHighWatermarkMetadata(fullOffset)
        fullOffset
      }
    } else {
      offsetMetadata
    }
  }

其中的 convertToOffsetMetadataOrThrow 方法,其实就是调用了 Log 对象的 read 方法,从指定的 offset 读出该条信息的 metadata

  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    val fetchDataInfo = read(offset,
      maxLength = 1,
      isolation = FetchLogEnd,
      minOneMessage = false)
    fetchDataInfo.fetchOffsetMetadata
  }

更新 HW

更新 high watermark 的方法一,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。一旦拿到新的消息,就必须要更新高水位值:

  /**
   * Update high watermark with offset metadata. The new high watermark will be lower
   * bounded by the log start offset and upper bounded by the log end offset.
   *
   * 更新 high watermark。注意不能 < logStartOffset,也不能 ≥ logEndOffset
   *
   * @param highWatermarkMetadata the suggested high watermark with offset metadata
   * @return the updated high watermark offset
   */
  def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
    val endOffsetMetadata = logEndOffsetMetadata
    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) {
      LogOffsetMetadata(logStartOffset)
    } else if (highWatermarkMetadata.messageOffset >= endOffsetMetadata.messageOffset) {
      endOffsetMetadata
    } else {
      highWatermarkMetadata
    }

    updateHighWatermarkMetadata(newHighWatermarkMetadata)
    newHighWatermarkMetadata.messageOffset
  }

更新 high watermark 的方法二,主要是用来更新 Leader 副本的高水位值。需要注意的是,Leader 副本高水位值的更新是有条件的,某些情况下会更新高水位值,某些情况下可能不会(比如正在等待其他 Follower 的同步进度):

  /**
   * Update the high watermark to a new value if and only if it is larger than the old value. It is
   * an error to update to a value which is larger than the log end offset.
   *
   * This method is intended to be used by the leader to update the high watermark after follower
   * fetch offsets have been updated.
   *
   * 这个方法主要是用于 leader 副本更新了 follower 副本的 fetch offset 更新之后
   * (也就是 follower 副本拉取了数据之后)
   *
   * @return the old high watermark, if updated by the new value
   */
  def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
    if (newHighWatermark.messageOffset > logEndOffset)
      throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
        s"log end offset $logEndOffsetMetadata")

    lock.synchronized {
      // LogOffsetMetadata 中可能只包含一个 absolute offset,而不包含 segmentBaseOffset 和 relativePositionInSegment,
      // 这时候需要去 Log 中查找出该 offset 对应的 segmentBaseOffset 和 relativePositionInSegment
      // 然后重新封装成 LogOffsetMetadata 并返回
      val oldHighWatermark = fetchHighWatermarkMetadata

      // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
      // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
      // 确保 hw 是单调递增的
      // 当 log 轮转的时候,offset metadata 移到新的 segment,也会更新 hw
      if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
        (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
        updateHighWatermarkMetadata(newHighWatermark)
        Some(oldHighWatermark)
      } else {
        None
      }
    }
  }

参考

Kafka 核心技术与实战 - 极客时间
kafka日志存储【日志清理】

标签: kafka
最后更新:2026年2月26日

zt52875287

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >
文章目录
  • 一、高水位 High Watermark
    • 1. 高水位的含义
    • 2. 高水位更新机制
  • 二、Leader Epoch
  • 三、Log 类源码
    • Log 类的初始化
      • 初始化 LeaderEpochCache
      • Load segments
    • 删除 segments
    • 向 log 中增加消息
    • 从 log 读取消息
  • 四、High Watermark 相关源码
    • 获取 HW
    • 更新 HW
  • 参考

Copyright © by zt52875287@gmail.com All Rights Reserved.

Theme Kratos Made By Seaton Jiang

陕ICP备2021009385号-1