Re equalizer
Rebalancing refers to the behavior that the ownership of the partition is transferred from one consumer to another. It provides a guarantee for the high availability and scalability of the consumer group, so that we can easily and safely delete consumers in the consumer group or add consumers to the consumer group. However, during rebalancing, consumers in the consumer group cannot read messages. In other words, during this short period of time during the rebalancing period, the consumption group will become unavailable.
In addition, when a partition is reassigned to another consumer, the current state of the consumer will also be lost. For example, when a consumer consumes some messages in a partition, the rebalancing operation occurs before submitting the consumption displacement. Then the partition is assigned to another consumer in the consumption group, and the previously consumed messages are consumed again, that is, repeated consumption occurs. In general, unnecessary rebalancing should be avoided as far as possible.
In Section 8, the rebalance listener is mentioned when describing the subscribe() method. It can be found in the subscribe (collection < string > topics, consumerrebalance listener) and subscribe (pattern, consumerrebalance listener) methods. The rebalance listener is used to set some preparatory or closing actions before and after the rebalance action. ConsumerRebalanceListener is an interface that contains two methods. The specific definitions are as follows:
- void onPartitionsRevoked(Collection partitions) is called before rebalancing starts and after the consumer stops reading messages. This callback method can be used to process the submission of consumption displacement, so as to avoid some unnecessary repeated consumption. The parameter partitions indicates the partitions allocated before rebalancing.
- void onPartitionsAssigned(Collection partitions) is called after the partition is reallocated and before the consumer starts reading the consumption. The parameter partitions indicates the partitions allocated after rebalancing.
Let's use an example to demonstrate the usage of ConsumerRebalanceListener, as shown in listing 13-1.
//Listing 13-1 uses the rebalance listener Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { consumer.commitSync(currentOffsets); currentOffsets.clear(); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //do nothing. } }); try { while (isRunning.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { //process the record. currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); } consumer.commitAsync(currentOffsets, null); } } finally { consumer.close(); }
In code listing 13-1, the consumption displacement is temporarily stored in a local variable currentOffsets. In this way, during normal consumption, the consumption displacement can be submitted asynchronously through the commitAsync() method. Before the rebalancing action occurs, the consumption displacement can be submitted synchronously through the onPartitionsRevoked() callback of the rebalancing listener, To try to avoid some unnecessary repeated consumption.
The rebalance listener can also be used with external storage. In code listing 12-4, we save the consumption displacement in the database. Here, we can find the consumption displacement of the allocated partition through the rebalance listener, and further optimize the code logic in combination with the seek() method. The first line of code in code listing 12-4 is modified as follows:
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //store offset in DB (storeOffsetToDB) } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { for(TopicPartition tp: partitions){ consumer.seek(tp, getOffsetFromDB(tp));//Read consumption displacement from DB } } });
This section only briefly demonstrates the usage of the rebalance listener. The interaction logic and related principles between the consumer client and the Kafka server during rebalance are not simple. For more details, please refer to the relevant contents in future products.
Consumer interceptor
Section 4 describes the use of producer interceptors, and the corresponding consumers also have the concept of interceptors. The consumer interceptor mainly performs some customized operations when consuming messages or submitting consumption displacement.
Corresponding to the producer interceptor, the consumer interceptor needs to implement the org.apache.kafka.clients.consumer.ConsumerInterceptor interface. The ConsumerInterceptor interface contains three methods:
- public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
- public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
- public void close().
KafkaConsumer will call the onConsume() method of the interceptor before the poll() method returns to make corresponding customization operations on the message, such as modifying the content of the returned message and filtering the message according to some rule (it may reduce the number of messages returned by the poll() method). If an exception is thrown in the onConsume() method, it will be caught and logged, but the exception will not be passed up.
KafkaConsumer will call the onCommit() method of the interceptor after the consumption displacement is submitted, and this method can be used to record the displacement information submitted. For example, when consumers use the commitSync's no parameter method, we do not know the specific details of the consumption displacement submitted, and the onCommit() method using the interceptor can do this.
The close() method and the configure() method in the parent interface of the ConsumerInterceptor are used for the same purpose as the producer's producerinceptor interface, which will not be repeated here.
In some business scenarios, a validity period attribute will be set for a message. If a message cannot arrive within a given time window, it will be regarded as invalid, and it does not need to be processed any more. The following uses the consumer interceptor to implement a simple message TTL (Time to Live, i.e. expiration time). In code listing 13-1, the custom consumer interceptor TTL uses the timestamp field of the message to determine whether it is expired. If the timestamp of the message differs from the current timestamp by more than 10 seconds, it is determined to be expired, and the message is filtered and not delivered to a specific consumer.
//Listing 13-1 shows a custom consumer interceptor public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> { private static final long EXPIRE_INTERVAL = 10 * 1000; @Override public ConsumerRecords<String, String> onConsume( ConsumerRecords<String, String> records) { long now = System.currentTimeMillis(); Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(); for (TopicPartition tp : records.partitions()) { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>(); for (ConsumerRecord<String, String> record : tpRecords) { if (now - record.timestamp() < EXPIRE_INTERVAL) { newTpRecords.add(record); } } if (!newTpRecords.isEmpty()) { newRecords.put(tp, newTpRecords); } } return new ConsumerRecords<>(newRecords); } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset())); } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }
After implementing the customized consumerinterceptor TTL, you need to configure and specify the interceptor in KafkaConsumer. The specified configuration is the same as that in KafkaProducer, and is also implemented through the interceptor.classes parameter. The default value of this parameter is "". Examples are as follows:
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
When sending a message, we modify the timestamp value in ProducerRecord to make it timeout. For details, please refer to the following example:
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, 0, System .currentTimeMillis()-EXPIRE_INTERVAL, null, "first-expire-data"); producer.send(record1).get(); ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, 0, System .currentTimeMillis(), null, "normal-data"); producer.send(record2).get(); ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, 0, System .currentTimeMillis()-EXPIRE_INTERVAL, null, "last-expire-data"); producer.send(record3).get();
Three messages are sent in the sample code: "first expire data", "normal data" and "last expire data". The first and third messages are modified to timeout. At this time, the consumer can only pull the message "normal data" through the poll() method, and the other two are filtered.
However, when using this function, it should be noted that the wrong displacement information may be submitted when using the displacement submission method with parameters similar to that in code listing 11-2. In a message pull batch, messages that may contain the maximum offset will be filtered by the consumer interceptor.
There is also the concept of interception chain in consumers. Like the manufacturer's interception chain, it is executed one by one according to the sequence of interceptors configured with the interceptor.classes parameter (each interceptor is separated by commas during configuration). Also beware of "side effects". If an interceptor fails to execute in the interception chain, the next interceptor will continue to execute from the last interceptor that successfully executed.