开发技术分享

  • 文章分类
    • 技术
    • 工作
    • 相册
    • 杂谈
    • 未分类
  • 工具
    • AI 试衣
靡不有初,鲜克有终
[换一句]
  1. 首页
  2. 技术
  3. 正文

Kafka源码·四 - KafkaRequestHandler

2024年11月21日 854点热度

文章开头依然放上 Kafka 的网络通信架构图。

第三节中阅读了 SocketServer 部分的源码,知道了请求时如何接收并返回的。这一节我们一起阅读一下 KafkaRequestHandler 的源码,看看 IO 线程是如何工作的。

IO线程池:KafkaRequestHandlerPool

KafkaRequestHandler 是 IO 线程,那么自然而然的,就应该有一个线程池去调度它,这就是 KafkaRequestHandlerPool。

我们可以在 KafkaServer 的 startup 方法中,可以看到创建这个线程池的代码:

  // KafkaApis 是 kafka 真正处理请求逻辑的地方
  dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
    kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
    fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)

  // 创建 RequestHandlerPool 线程池
  dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
    config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

  // 如果启用了 control plane 特性
  // 这里同样也会去创建一套 RequestProcessor 和 RequestHandlerPool
  socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
    controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
      kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
      fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)

    controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
      1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
  }

下面是 KafkaRequestHandlerPool 的源码,我们可以清楚的看到,在它构造方法中,通过 createHandler 方法创建了 numThreads 个 handler 线程:

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: ApiRequestHandler,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

  // 线程池大小,由配置文件中的 num.io.threads 控制
  // 这个参数是支持动态调整的,因此是一个 AtomicInteger 类
  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "

  // 负责保存子线程,即 KafkaRequestHandler 线程
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)

  // 创建 handler 线程
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }

  def resizeThreadPool(newSize: Int): Unit = synchronized {
    val currentSize = threadPoolSize.get
    info(s"Resizing request handler thread pool size from $currentSize to $newSize")
    if (newSize > currentSize) {
      for (i <- currentSize until newSize) {
        createHandler(i)
      }
    } else if (newSize < currentSize) {
      for (i <- 1 to (currentSize - newSize)) {
        runnables.remove(currentSize - i).stop()
      }
    }
    threadPoolSize.set(newSize)
  }

  def shutdown(): Unit = synchronized {
    info("shutting down")
    for (handler <- runnables) {
      // 向 RequestChannel.requestQueue() 中塞一个 ShutdownRequest
      handler.initiateShutdown()
    }
    for (handler <- runnables) {
      // 这里会阻塞, 直到 KafkaRequestHandler 读到 ShutdownRequest 的时候,
      // 并 shutdown 自己为止
      handler.awaitShutdown()
    }
    info("shut down completely")
  }
}

IO线程:KafkaRequestHandler

对于处理业务的 handler 线程,我们自然要看一看它的 run 方法了:

  def run(): Unit = {
    while (!stopped) {
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      //
      // 跟踪线程池的总空闲时间比例
      // Meter 的值是通过 total_recorded_value / time_window 计算的
      // 其中:
      // total_recorded_value 是总的记录值。
      // time_window 是一个时间窗口,独立于线程数量。
      //
      // 因为这个指标是测量线程池中所有线程的整体空闲情况,而不是单个线程的。
      // 所以要除以线程数,对每个线程的空闲时间进行折算,照线程总数进行平摊。
      // 这样可以提供一个更准确的度量,显示线程池的整体闲置率,而不是单个线程的状态。
      val startSelectTime = time.nanoseconds

      // 其实就是从 RequestChannel 的 requestQueue 中取出一个请求
      val req = requestChannel.receiveRequest(300)

      val endTime = time.nanoseconds
      val idleTime = endTime - startSelectTime

      // 当前线程的空闲时间/线程数 进行折算
      aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

      req match {

        // 如果是 shutdown 请求,则退出
        case RequestChannel.ShutdownRequest =>
          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
          shutdownComplete.countDown()
          return

        // 正常的 request
        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            trace(s"Kafka request handler $id on broker $brokerId handling request $request")

            // 调用 KafkaApi 类去处理业务逻辑
            apis.handle(request)

          } catch {
            case e: FatalExitError =>
              shutdownComplete.countDown()
              Exit.exit(e.statusCode)
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }

到这里我们就和第三节连起来了,Processor 线程接收请求,将其封装为 Request,放到 RequestChannel 中的 requestQueue 中,然后 KafkaRequestHandler 线程循环去从这个队列中拉取请求,再通过 KafkaApis 类去实际处理。

KafkaApis: 处理业务

KafkaApis 是 Broker 端所有功能的入口,其中的 handle 方法封装了所有 RPC 请求的具体处理逻辑。

参考下面的代码,如果我们要自定义方法,只需要新增一个 ApiKeys 枚举,然后在这里新增 handleXXXRequest 方法就可以了。

  override def handle(request: RequestChannel.Request): Unit = {
    try {
      trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
        s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
        case ApiKeys.END_TXN => handleEndTxnRequest(request)
        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
        case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
        case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentialsRequest(request)
        case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
        // Until we are ready to integrate the Raft layer, these APIs are treated as
        // unexpected and we just close the connection.
        case ApiKeys.VOTE => closeConnection(request, util.Collections.emptyMap())
        case ApiKeys.BEGIN_QUORUM_EPOCH => closeConnection(request, util.Collections.emptyMap())
        case ApiKeys.END_QUORUM_EPOCH => closeConnection(request, util.Collections.emptyMap())
        case ApiKeys.DESCRIBE_QUORUM => closeConnection(request, util.Collections.emptyMap())
      }
    } catch {
      case e: FatalExitError => throw e
      case e: Throwable => handleError(request, e)
    } finally {
      // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
      // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
      // expiration thread for certain delayed operations (e.g. DelayedJoin)
      replicaManager.tryCompleteActions()
      // The local completion time may be set while processing the request. Only record it if it's unset.
      if (request.apiLocalCompleteTimeNanos < 0)
        request.apiLocalCompleteTimeNanos = time.nanoseconds
    }
  }

资源访问控制: authorizer

我会选择其中的两个方法详细学习,在此之前,我们需要先了解一下 authorize 方法。如果配置文件里没有配置 authorizer.class.name 参数,就不会启用身份验证,所有的操作都是开放的。如果配置了这个参数,在 KafkaApis 处理实际业务之前,都会用 authorize 方法判断一下 Request 的权限是否正确,比如说,这是一个创建 Topic 的请求,那么就要校验客户端有没有创建 Topic 的权限,成功之后才会继续执行,否则就会返回错误信息给客户端。

我们同样可以在 KafkaServer 的 startup() 方法中找到 authorizer 的创建和配置过程:

  // 从配置文件的 authorizer.class.name 配置中读出应用的 auth 类,实例化并返回
  authorizer = config.authorizer

  // 调用 authorize 类的 configure 方法做具体的配置
  authorizer.foreach(_.configure(config.originals))

  // 开始加载 metadata,并返回一个可以表示 authorizer 已经异步初始化经完成的 CompletableFuture
  //
  // 目的是:用户自定义的 authorize 类在初始化过程中,可能需要一些异步的操作,为了保证
  //        在 Acceptor 和 Processor开始接受请求的时候,这些异步操作全部都可以完成,
  //        需要这么一个标志位
  //
  // 事实上:目前 kafka 所提供的 authorize 类中,并没有异步操作,这里返回的就是 completed Future
  //         这是给用户自定义 authorize 类留下了空间
  val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {

    case Some(authZ) =>
      authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
        ep -> cs.toCompletableFuture
      }

    case None =>
      // 如果没有配置 authorize 类,则什么都不做,返回一个 completed Future
      brokerInfo.broker.endPoints.map { ep =>
        ep.toJava -> CompletableFuture.completedFuture[Void](null)
      }.toMap
  }

  ......

  // 在 KafkaServer.startup() 方法的最后,启动 Acceptor 和 Processor 之前,
  // 会调用 CompletableFuture.join(),等待上面的 CompletableFuture 完成
  // 即:authorizer 真正初始化完成之后,kafka 才会去处理客户端请求
  socketServer.startProcessingRequests(authorizerFutures)

具体 authorize 过程我们这里暂时不做研究。

发送响应 sendResponse

业务处理的细节目前暂不研究,我们看一下处理完业务之后,是如何发送响应的,下面是 KafkaApis 中的 sendResponse() 方法:

  private def sendResponse(request: RequestChannel.Request,
                           responseOpt: Option[AbstractResponse],
                           onComplete: Option[Send => Unit]): Unit = {
    // Update error metrics for each error code in the response including Errors.NONE
    responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

    val response = responseOpt match {
      case Some(response) =>
        val responseSend = request.context.buildResponse(response)
        val responseString =
          if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
          else None
        new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
      case None =>
        new RequestChannel.NoOpResponse(request)
    }

    // 这里其实就是把响应放到了 RequestChannel 的 responseQueue 中
    // 等待上一节提到的 Processor 线程的 processNewResponses() 方法去处理
    requestChannel.sendResponse(response)
  }

参考

Kafka 核心技术与实战 - 极客时间

标签: kafka
最后更新:2026年2月26日

zt52875287

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >
文章目录
  • IO线程池:KafkaRequestHandlerPool
  • IO线程:KafkaRequestHandler
  • KafkaApis: 处理业务
    • 资源访问控制: authorizer
    • 发送响应 sendResponse
  • 参考

Copyright © by zt52875287@gmail.com All Rights Reserved.

Theme Kratos Made By Seaton Jiang

陕ICP备2021009385号-1