Consumer client development

Consumer client development

        After understanding the concept of consumer and consumer group, we can start the development of consumer client. In the history of Kafka, the consumer client, like the producer client, has experienced two major versions: the first is a client written in scala at the beginning of Kafka's open source, which can be called Old Consumer or Scala consumer client; The second is the client written in Java launched from Kafka version 0.9. X. we can call it New Consumer or Java consumer client, which makes up for many design defects in the old client

        This section mainly introduces the popular new consumer client (written in Java language), while the old consumer client has been eliminated, so we won't introduce it accordingly.

A normal consumption logic requires the following steps:

  1. Configure consumer client parameters and create corresponding consumer instances.
  2. Subscribe to topics.
  3. Pull messages and consume them.
  4. Submit consumption displacement.
  5. Close the consumer instance.

      Code listing 2-2 has simply demonstrated the coding of the consumer client, and this section slightly modifies it, as shown in code listing 8-1.

//Code listing 8-1 consumer client example
public class KafkaConsumerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() 
                            + ", partition = "+ record.partition() 
                            + ", offset = " + record.offset());
                    System.out.println("key = " + record.key()
                            + ", value = " + record.value());
                    //do something to process record.
                }
            }
        } catch (Exception e) {
            log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}

      Compared with code listing 2-2, the modified code has a little more. We analyze it according to the steps of consumption logic.

Necessary parameter configuration

      Before creating a real consumer instance, you need to configure the corresponding parameters, such as setting the name and connection address of the consumer group to which the consumer belongs in the previous section. Referring to the initConfig() method in code listing 8-1, four parameters are required in the Kafka consumer client KafkaConsumer.

  • bootstrap.servers: the definition of this parameter is the same as that in the producer client KafkaProducer. It is used to specify the list of broker addresses required to connect to the Kafka cluster. The specific content forms are host1:port1,host2:post. You can set one or more addresses separated by commas. The default value of this parameter is "". Note that it is not necessary to set all the broker addresses in the cluster. Consumers will find all Kafka cluster members from the existing configuration. More than two broker addresses are set here. When any one of them goes down, consumers can still connect to the Kafka cluster.
  • group.id: the name of the consumer group to which the consumer belongs. The default value is' '. If it is set to null, an exception will be reported: Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid. Generally speaking, this parameter needs to be set to a name with certain business significance.
  • key.deserializer and value.deserializer: corresponding to the key.serializer and value.serializer parameters in the producer client KafkaProducer. The message formats obtained by the consumer from the broker are of byte array (byte []), so the corresponding deserialization operation is required to restore to the original object format. These two parameters are used to specify the deserializer of the deserialization operation required for key and value in the message respectively. These two parameters have no default values. Note that the fully qualified name of the deserializer class must be filled in here, such as org.apache.kafka.common.serialization.StringDeserializer in the example. It is wrong to specify StringDeserializer only. For more information on deserialization, refer to the next section.

        Note that the initConfig() method in listing 8-1 also sets a parameter client.id, which is used to set the client ID corresponding to KafkaConsumer, and the default value is "". If the client does not set it, KafkaConsumer will automatically generate a non empty string in the form of "consumer-1" and "consumer-2", that is, the splicing of the string "consumer -" and the number.

        There are many parameters in KafkaConsumer, far from the five in the example initConfig() method. Developers can modify the default values of these parameters according to the actual needs of business applications to achieve the purpose of flexible deployment. In general, ordinary developers cannot remember all the parameter names, but can only have a general impression. In the actual use process, strings such as "key.deserializer" and "auto.offset.reset" are often written incorrectly due to human factors. Therefore, we can directly use the org.apache.kafka.clients.consumer.ConsumerConfig class in the client to prevent it to a certain extent. Each parameter has a corresponding name in the ConsumerConfig class. Take the initConfig() method in code listing 8-1 as an example. The modification results after introducing ConsumerConfig are as follows:

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client.id.demo");
        return props;
    }

      Note that the fully qualified names of the classes corresponding to the key.deserializer and value.deserializer parameters in the above code are relatively long and easy to be written wrong. Here, further improvements are made through the skills in Java. The relevant codes are as follows:

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        In this way, the code is much simpler and prevents the possibility of human error. After configuring the parameters, we can use it to create a consumer instance:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

      The KafkaConsumer configuration described in this section is basically the same as the KafkaProducer configuration. Except for configuring the corresponding deserializer, only one necessary group.id parameter is added.

Subscribe to topics and sections

          After creating a consumer, we need to subscribe to relevant topics for the consumer. A consumer can subscribe to one or more topics. In code listing 8-1, we subscribe to a topic using the subscribe() method. For this method, we can subscribe to multiple topics in the form of a collection or in the form of a regular expression. Several overloading methods of subscribe are as follows:

public void subscribe(Collection<String> topics, 
    ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

          It is easy to understand that consumers subscribe to topics in the form of subscribe(Collection), and consume messages in what topics they subscribe to. If you subscribe to different topics twice, the last one will prevail.

consumer.subscribe(Arrays.asList(topic1));
consumer.subscribe(Arrays.asList(topic2));

      In the above example, the end consumer subscribes to topic2, not topic1, nor the union of topic1 and topic2.

        If the consumer subscribes in the form of regular expression (subscribe(Pattern)), in the subsequent process, if someone creates a new topic and the name of the topic matches the regular expression, the consumer can consume the messages in the newly added topic. This subscription is effective if the application needs to consume multiple topics and can handle different types. This regular expression approach is common when copying data between Kafka and other systems. An example of regular expression subscription is as follows:

consumer.subscribe(Pattern.compile("topic-.*"));

        Careful readers may observe that in the overload method of subscribe, one parameter type is consumerrebalance listener, which is used to set the corresponding rebalance listener. For details, please refer to section 13.

        Consumers can not only subscribe to topics through KafkaConsumer.subscribe() method, but also directly subscribe to specific partitions of some topics. KafkaConsumer also provides an assign() method to realize these functions. The specific definition of this method is as follows:

public void assign(Collection<TopicPartition> partitions)

        This method only accepts one parameter, partitions, which is used to specify the partition collection to subscribe to. Here is a supplementary description of the TopicPartition class. In Kafka's client, it is used to represent partitions. Some contents of this class are as follows.

public final class TopicPartition implements Serializable {

    private final int partition;
    private final String topic;

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

    public int partition() {
        return partition;
    }

    public String topic() {
        return topic;
    }
    //Omit the hashCode(), equals() and toString() methods
}

        The TopicPartition class has only two attributes: topic and partition, which respectively represent the topic to which the partition belongs and its own partition number. This class can be mapped with the concept of topic partition.

        We modify the subscribe() method in code listing 8-1 to the assign() method. Here, we only subscribe to the partition with partition number 0 in the topic demo topic. The relevant code is as follows:

consumer.assign(Arrays.asList(new TopicPartition("topic-demo", 0)));

        Some readers will ask: what if we don't know how many partitions there are in the topic in advance? The partitionsFor() method in KafkaConsumer can be used to query the metadata information of the specified topic. The specific definitions of the partitionsFor() method are as follows:

public List<PartitionInfo> partitionsFor(String topic)

        The PartitionInfo type is the partition metadata information of the topic. The main structure of this class is as follows:

public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;
	    //Constructor, attribute extraction, toString and other methods are omitted here
}

        The attribute topic in the PartitionInfo class represents the subject name, partition represents the partition number, leader represents the location of the leader replica of the partition, replicas represents the AR set of the partition, inSyncReplicas represents the ISR set of the partition, and offlineReplicas represents the OSR set of the partition. With the help of the partitionFor() method, we can implement the function of subscribing to topics (all partitions) through the assign() method. The example is as follows:

List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
if (partitionInfos != null) {
    for (PartitionInfo tpInfo : partitionInfos) {
        partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
    }
}
consumer.assign(partitions);

      Since there is a subscription, there is a cancellation of the subscription. You can use the unsubscribe() method in KafkaConsumer to cancel the subscription of the topic. This method can cancel the subscription through subscribe(Collection), subscribe(Pattern) and assign(Collection). The example code is as follows:

consumer.unsubscribe();

        If the collection parameter in subscribe(Collection) or assign(Collection) is set to an empty collection, the effect is the same as that of unsubscribe() method. The three lines of code in the following example have the same effect:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());

      If you do not subscribe to any topic or partition, an IllegalStateException will be reported when you continue to execute the consumer program:

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

        The collection subscription method subscribe(Collection), regular expression subscription method subscribe(Pattern) and the specified partition subscription method assign(Collection) sub tables represent three different subscription states: AUTO_TOPICS,AUTO_PATTERN and USER_ASSIGNED (if there is no subscription, the subscription status is NONE). However, these three states are mutually exclusive. Only one of them can be used in a consumer, otherwise an IllegalStateException will be reported:

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive.

        Subscribing to a topic through the subscribe() method has the function of automatic rebalancing of consumers. In the case of multiple consumers, the relationship between each consumer and the partition can be automatically allocated according to the partition allocation policy. When the number of consumers in the consumption group increases or decreases, the partition allocation relationship will be automatically adjusted to realize consumption load balancing and automatic failover. When subscribing to partitions through the assign() method, it does not have the function of automatic consumer balancing. In fact, this can be seen from the parameters of the assign() method. Both types of subscribe() have methods with ConsumerRebalanceListener type parameters, but the assign() method does not.

Keywords: Java Big Data kafka

Added by sebmaurer on Thu, 11 Nov 2021 02:20:20 +0200