Allocation of partition replicas

Allocation of partition replicas

      In the previous section, the distribution of partitioned copies is mentioned many times. Readers may be a little confused. There is also the concept of partitioned distribution among producers and consumers. The partition allocation of producers refers to specifying the partition to which each message is sent. The partition allocation of consumers refers to specifying the partition to which consumers can consume messages. The partition allocation here refers to formulating the partition replica allocation scheme for the cluster when creating topics, that is, which broker creates the replica of which partitions.

      When creating a topic, if the replica assignment parameter is used, the partition replica is created according to the specified scheme; If the replica assignment parameter is not used, the allocation scheme needs to be calculated according to the internal logic. The internal allocation logic when creating a topic using the kafka-topics.sh script is divided into two strategies according to the rack information: unspecified rack information and specified rack information. If all broker nodes in the cluster do not have the broker.rack parameter configured, or use the disable rack aware parameter to create a topic, then the allocation policy of unspecified rack information is adopted. Otherwise, the allocation policy of specified rack information is adopted.

      First, let's take a look at the allocation strategy of unspecified rack information. The specific implementation involves the logical details of the code. The allocation strategy of unspecified rack information is easier to understand. Here we analyze it one by one through the source code. The corresponding implementation is the assignReplicasToBrokersRackUnaware() method in the kafka.admin.AdminUtils.scala file. The contents of this method are as follows:

private def assignReplicasToBrokersRackUnaware(
    nPartitions: Int,         //Number of partitions
    replicationFactor: Int,  //Replica factor
    brokerList: Seq[Int],    //List of broker s in the cluster
    fixedStartIndex: Int,    //The starting index is the location of the first replica assignment. The default value is - 1
    startPartitionId: Int):  //Starting partition number, the default value is - 1
Map[Int, Seq[Int]] = { 
  val ret = mutable.Map[Int, Seq[Int]]() //Save a collection of allocation results
  val brokerArray = brokerList.toArray    //List of brokerids
//If the starting index fixedStartIndex is less than 0, a random one is generated according to the length of the broker list to ensure that it is
//Valid brokerId
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex
    else rand.nextInt(brokerArray.length)
  //Ensure that the starting partition number is not less than 0
  var currentPartitionId = math.max(0, startPartitionId)
  //Specifies the interval between replicas in order to distribute replicas more evenly to different broker s
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex
    else rand.nextInt(brokerArray.length)
  //Poll all partitions and assign copies of each partition to different broker s
  for (_ <- 0 until nPartitions) {
    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
      nextReplicaShift += 1
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
    //A collection of broker s that holds all replica assignments for this partition
    for (j <- 0 until replicationFactor - 1)
      replicaBuffer += brokerArray(
        replicaIndex(firstReplicaIndex, nextReplicaShift, 
          j, brokerArray.length)) //Assign a broker to the remaining replicas
    //Save the allocation information of all copies of the partition
    ret.put(currentPartitionId, replicaBuffer)
    //Continue assigning replicas to the next partition
    currentPartitionId += 1
  }
  ret
}

      The fixedStartIndex and startPartitionId values in the list of parameters are passed down from the upstream method. They are all -1, representing the location of the first replica allocation and the starting partition number respectively. The core of the assignreplicatorbrokersrackunaware () method is to traverse each partition, and then select replicationFactor from the brokerArray (list of brokerids) to assign a brokerId to this partition.

      The method first creates a variable Map to store the results to be returned by the method, that is, the mapping relationship between partition and allocated copy. Because fixedStartIndex is - 1, startIndex is a random number used to calculate the brokerId of an initial allocation. At the same time, because startPartitionId is - 1, the value of currentPartitionId is 0. It can be seen that by default, themes are always allocated from the partition with number 0 by polling.

nextReplicaShift indicates the displacement of the next replica allocation relative to the previous allocation, which is a bit of a detour from the literal understanding. For example, suppose that there are three broker nodes in the cluster, corresponding to the brokerArray in the code, and there are three replicas and six partitions in a topic created. First, allocate from the partition whose partitionId (partition number) is 0. Suppose that the nextreplicatashift value obtained from the first calculation (randomly generated by rand.nextInt(brokerArray.length)) is 1, If the startIndex value generated randomly for the first time is 2, the position of the first replica with partitionId 0 (here refers to the array subscript of brokerArray) firstreplicaindex = (currentpartitionid + startIndex)% brokerArray. Length = (0 + 2)% 3 = 2, and the position of the second replica is replicaindex (firstreplicaindex, nextreplicatashift, J, brokerArray. Length) = replicaindex (2, nextReplicaShift + 1,0,3) =?, a new method, replicaIndex(), is introduced here, but this method is very simple, as follows:

private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, 
                         replicaIndex: Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}

Continue to calculate replicaindex (2, nextpricashift + 1,0,3) = replicaindex (2,2,0,3) = (2 + (1 + (2 + 0)% (3-1))% 3 = 0. Continue to calculate the location of the next replica. Replicaindex (2,2,1,3) = (2 + (1 + (2 + 1)% (3-1))% 3 = 1. Therefore, the list of replica allocation locations with partitionId 0 is [2,0,1] , if the brokerArray is numbered exactly from 0 and the order is uninterrupted, that is, the brokerArray is [0,1,2], then the replica allocation policy with the current partitionId of 0 is [2,0,1]. If the brokerArray is not started from 0 or sequential (it is possible that several brokers in the previous cluster are offline), the final brokerArray is [2,5,8] , then the replica allocation policy of the partition with partitionId 0 is [8,2,5]. For ease of illustration, simply assume that the brokerArray is [0,1,2].

      Similarly, calculate the replica allocation policy for the next partition, that is, the partition ID is 1. At this time, the value of nextReplicaShift is still 2, which does not meet the self increment condition. The firstreplicaindex of this partition = (1 + 2)% 3 = 0. The location of the second replica replicaindex (0,2,0,3) = (0 + (1 + (2 + 0)% (3-1))% 3 = 1, and the location of the third replica replicaindex (0,2,1,3) =2. The partition allocation policy with the final partitionId of 2 is [0,1,2]. And so on. For more allocation details, please refer to the following example. The partition allocation policy of topic-test2 is consistent with that stated above:

[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/ kafka --create --topic topic-test2 --replication-factor 3 --partitions 6
Created topic "topic-test2".

[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/ kafka –-describe --topic topic-test2
Topic:topic-test2   PartitionCount:6    ReplicationFactor:3 Configs:
    Topic: topic-test2  Partition: 0    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: topic-test2  Partition: 1    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    Topic: topic-test2  Partition: 2    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    Topic: topic-test2  Partition: 3    Leader: 2   Replicas: 2,1,0 Isr: 2,1,0
    Topic: topic-test2  Partition: 4    Leader: 0   Replicas: 0,2,1 Isr: 0,2,1
    Topic: topic-test2  Partition: 5    Leader: 1   Replicas: 1,0,2 Isr: 1,0,2

        We can't know the values of startIndex and nextReplicaShift in advance because they are generated randomly. The values of startIndex and nextReplicaShift can be inferred from the final partition allocation scheme. For example, in topic-test2 above, if the first replica of the first partition (that is, the partition with partitionId=0) is 2, then it can be inferred that startIndex is 2 from 2 = (0+startIndex)%3. The reason why startIndex is randomly generated is that it can distribute partition replicas as evenly as possible in the case of multiple topics. If it is fixed to a specific value here, the first replica of each time is on this broker, resulting in too many partition replicas assigned by a few brokers and too few partition replicas assigned by other brokers, Eventually, the load is unbalanced. Especially when the number of copies and partitions of some topics are small, or even 1, all copies fall on the specified broker. At the same time, the displacement nextReplicaShift during allocation can also better distribute the partitioned replicas more evenly.

      In comparison, the allocation strategy of specified rack information is slightly more complex than that of unspecified rack information, but the main idea is not much different, just taking rack information as an additional reference item. Suppose there are three racks rack1, rack2 and rack3, and nine broker nodes in Kafka cluster are deployed on these three racks. The comparison relationship between racks and broker nodes is as follows:

rack1: 0, 1, 2
rack2: 3, 4, 5
rack3: 6, 7, 8

      If the rack information is not considered, the value of the brokerArray variable in the assignreplicatorbrokersrackunaware() method is [0, 1, 2, 3, 4, 5, 6, 7, 8]. The value of the brokerArray in the assignReplicasToBrokersRackAware() method that specifies the base frame information will be converted here to [0, 3, 6, 1, 4, 7, 2, 5, 8]. Obviously, this is the result of polling each rack. In this way, the new brokerArray (specifically arrangedBrokerList) contains simple rack allocation information. The following steps are also similar to the assignreplicatorbrokersrackunaware () method, which also includes the concepts of startIndex, currentpartionid and nextReplicaShift, and circularly allocates replicas for each partition. When allocating replicas, in addition to processing the first replica, the rest also call the replicaIndex() method to obtain a broker. However, unlike assignReplicasToBrokersRackUnaware(), this broker is not simply added to the replica list of the current partition, but also subject to a layer of filtering, A broker that meets any of the following conditions cannot be added to the replica list of the current partition:

  • If there is already a broker in the rack where this broker is located that owns the replica of the partition, and there is no broker in other racks that owns the replica of the partition.
  • If this broker already has a copy of the partition, and there are other brokers that do not have a copy of the partition.

        When creating a topic, whether through the kafka-topics.sh script or other methods (such as KafkaAdminClient described in section 30), it is essentially to create a child node corresponding to the topic under the / brokers/topics node in ZooKeeper and write the partition replica allocation scheme, And create a child node corresponding to the topic under the / config/topics / node and write the configuration information related to the topic (this step can be omitted). The substantive action of Kafka to create a theme is completed asynchronously by the controller.

        After knowing the essence of the kafka-topics.sh script, we can directly use the ZooKeeper client to create the corresponding topic node under the / brokers/topics node and write the preset allocation scheme, so that we can create a new topic. This method of creating themes can also bypass some restrictions when using kafka-topics.sh script to create themes. For example, the sequence number of partitions can not be accumulated continuously from 0. First, we create a topic Create ZK with the same name except for the topic create name through the ZooKeeper client. The example is as follows:

[zk: localhost:2181/kafka(CONNECTED) 29] create /brokers/topics/topic-create-zk {"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
Created /brokers/topics/topic-create-zk

      By viewing the allocation of topic Create ZK, you can see that the information is no different from that of topic create.

[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/ kafka --describe --topic topic-create-zk
Topic:topic-create-zk	PartitionCount:4	ReplicationFactor:2	Configs:
Topic: topic-create-zk	Partition: 0	Leader: 2 Replicas: 2,0	Isr: 2,0
	    Topic: topic-create-zk	Partition: 1	Leader: 0 Replicas: 0,1	Isr: 0,1
	    Topic: topic-create-zk	Partition: 2	Leader: 1 Replicas: 1,2	Isr: 1,2
	    Topic: topic-create-zk	Partition: 3	Leader: 2 Replicas: 2,1	Isr: 2,1

      Let's create an alternative topic. The allocation is the same as topic Create, except that the partition number is very different from topic create special. An example is as follows:

[zk: localhost:2181/kafka (CONNECTED) 31] create /brokers/topics/topic-create- special {"version":1,"partitions":{"10":[1,2],"21":[0,1],"33":[2,1],"40":[2,0]}}
Created /brokers/topics/topic-create-special

[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/ kafka --describe --topic topic-create-special
Topic:topic-create-special	PartitionCount:4	ReplicationFactor:2	Configs:
	Topic: topic-create-special	Partition: 10 Leader: 1 Replicas: 1,2	Isr: 1,2
	Topic: topic-create-special	Partition: 21 Leader: 0 Replicas: 0,1	Isr: 0,1
	Topic: topic-create-special	Partition: 33 Leader: 2 Replicas: 2,1	Isr: 2,1
	Topic: topic-create-special	Partition: 40 Leader: 2 Replicas: 2,0	Isr: 2,0

      You can see that the partition numbers are 10, 21, 33 and 40, which cannot be achieved by simply using the kafka-topics.sh script. However, this method is only some practical skills. The author still suggests using a more orthodox kafka-topics.sh script or KafkaAdminClient to manage the corresponding topics.

Keywords: Java kafka Back-end

Added by olanjouw on Wed, 08 Dec 2021 07:24:43 +0200