The starting process and code module of broker in kafka

Kafka cluster is composed of several brokers. Starting Kafka cluster is to start the brokers in the cluster and run normally. There are various interactions between brokers, between brokers and generators, and between brokers and consumers. The following is a brief introduction to the broker startup process.

Script to start the broker:

nohup ./bin/kafka-server-start.sh config/server.properties &

The script file Kafka server start The operation of SH startup is as follows:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

The final implementation is Kafka The class Kafka, Kafka Kafka's code is as follows:

object Kafka extends Logging {

  def main(args: Array[String]): Unit = {
    ......
  
    try {
      val props = Utils.loadProps(args(0))
      val serverConfig = new KafkaConfig(props)
      KafkaMetricsReporter.startReporters(serverConfig.props)
      val kafkaServerStartable = new KafkaServerStartable(serverConfig)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown
        }
      })

      kafkaServerStartable.startup
      kafkaServerStartable.awaitShutdown
    }
    catch {
      case e: Throwable => fatal(e)
    }
    System.exit(0)
  }
}

As can be seen from the above code, the code executes KafkaServerStartable Start up to start the system. The startup code of kafkaserverstable is as follows:

class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
  private val server = new KafkaServer(serverConfig)

  def startup() {
    try {
      server.startup()
      AppInfo.registerInfo()
    }
    catch {
      ......
    }
  }
......
}

As can be seen from the above code, the startup function of kafkaserverstable mainly encapsulates the startup function of KafkaServer startup(). Let's take a look at the code structure of KafkaServer.

class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  this.logIdent = "[Kafka Server " + config.brokerId + "], "
  private var isShuttingDown = new AtomicBoolean(false)
  private var shutdownLatch = new CountDownLatch(1)
  private var startupComplete = new AtomicBoolean(false)
  val brokerState: BrokerState = new BrokerState
  val correlationId: AtomicInteger = new AtomicInteger(0)
  var socketServer: SocketServer = null
  var requestHandlerPool: KafkaRequestHandlerPool = null
  var logManager: LogManager = null
  var offsetManager: OffsetManager = null
  var kafkaHealthcheck: KafkaHealthcheck = null
  var topicConfigManager: TopicConfigManager = null
  var replicaManager: ReplicaManager = null
  var apis: KafkaApis = null
  var kafkaController: KafkaController = null
  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  var zkClient: ZkClient = null
  ......
}

KafkaServer class contains various modules required for system operation. The functions of the main modules are described below.

1. SocketServer: communication module of broker. It is used to send and receive messages with other brokers, generators and consumers. First, an Acceptor thread will be created to listen to socket requests on the default port 9092. When a new connection is added, a corresponding SocketChannel will be created and forwarded to one of the Processor threads in the form of polling. It will process the next requests of the SocketChannel and place the requests in the request queue of the RequestChannel. When the Processor listens to the response of the SocketChannel request, it will take the response request from the corresponding queue of the RequestChannel and send it to the client.

2. KafkaRequestHandlerPool: the thread pool that handles socket requests. The socket request is extracted from the queue, and KafkaApis is invoked to complete the business logic processing, and the result is returned to the RequestChannel response queue.

3. LogManager:Kafka's log management module. It mainly deletes expired data and redundant data, refreshes dirty data, and performs Checkpoint and log merging on log files

4. Replication Manager: Kafka's replica management module. It mainly manages the partition replica data of Topic, including the change of replica leader and ISR status, deletion of replica, etc.

5. OffsetManager: the offset management module of Kafka. Mainly the saving and reading of offset.

6. Kafkascheduler: background task scheduling resource pool of Kafka. Provide thread scheduling services for LogManager, ReplicationManager, OffsetManager and other modules.

7. Kafkaapis: the business logic implementation layer of Kafka. Different requests are processed according to different requests. Processing de requests includes ProduceKey, FetchKey, OffsetsKey, MetadataKey, LeaderAndIsrKey, StopReplicaKey, UpdateMetadataKey, controlledshutdown key, OffsetCommitKey, OffsetFetchKey and ConsumerMetadataKey.

8. Kafkahealthcheck: the health check module of broker. The directory registered by the broker on zookeeper is / brokers/ids. When the broker is online, the corresponding id exists, otherwise it does not exist.

9. TopicConfigManager: the configuration management module of topic. Register the directory / config/changes on zookeeper. When the data is sent and changed, you can sense the change of topic through the callback function.

10. KafkaController: kafka's cluster control management module. Because zookeeper saves the metadata of kafka cluster in the cluster, KafkaController monitors the changes of different metadata by registering different callback functions. When these metadata change, KafkaController can perceive these changes and make different responses, so as to achieve the purpose of cluster management.

 

Keywords: kafka architecture message queue

Added by toyfruit on Sat, 08 Jan 2022 05:06:50 +0200