kafka source code analysis consumption message

Analysis of kafka consumption message source code

Example code of consumption message

package com.example.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class ConsumerAnalysis {

    public static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CommonHelper.BROKER_LIST);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerAnalysisGroup-1");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "demo-consumer-client-1");
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(CommonHelper.TOPIC));

        try {
            while (IS_RUNNING.get()) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                System.out.println("records count is " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic=" + record.topic()
                            + ", partition = " + record.partition()
                            + ", offset=" + record.offset());

                    System.out.println("key=" + record.offset()
                            + ", value= " + record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }

    }
}

Process steps

  1. Configure consumer client parameters and create consumer instances
  2. Subscribe to Topic
  3. Pull messages and consume
  4. Submit consumption offset
  5. Close consumer instance

Parameter description

  • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG: the corresponding parameter is actually bootstrap Servers, a list of host / port pairs used to establish an initial connection to the Kafka cluster. The client will use all servers, regardless of which servers are specified here for boot; This list affects only the initial hosts used to discover the complete set of servers. The format of this list should be host1:port1,host2:port2. Since these servers are only used for initial connections to discover complete cluster members (which may change dynamically), the list does not need to contain a complete set of servers (however, you may need multiple servers if the server is down).

  • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG: the corresponding parameter is key Deserializer, which implements org apache. kafka. common. serialization. Deserializer class for the key of the deserializer interface.

  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG: the corresponding parameter is value Deserializer, which implements org apache. kafka. common. serialization. Deserializer class for the value of the deserializer interface.

  • ConsumerConfig.GROUP_ID_CONFIG: the corresponding parameter is group ID, a unique string identifying the consumer group to which this consumer belongs. This attribute is required if the consumer uses the group management function by using subscribe(topic) or Kafka based offset management policy.

  • ConsumerConfig.CLIENT_ID_CONFIG: the corresponding parameter is client id, the id string passed to the server when the request is made. The goal is to track the source of the request, not just the ip/port, by allowing the logical application name to be included in the server-side request log record.

    Note uses the ConsumerConfig class when creating producers. In this class, the static {} block is used to initialize some default configurations. There are other producer configurations that can be observed in the ConsumerConfig class.

Create consumer instance main process

Supplementary flow chart

private KafkaConsumer(ConsumerConfig config,
                      Deserializer<K> keyDeserializer,
                      Deserializer<V> valueDeserializer) {
    try {
        // If the client id parameter is not configured, a default is generated
        String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
        if (clientId.isEmpty())
            clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
        this.clientId = clientId;
        // Get consumer configuration
        String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);

        LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
        this.log = logContext.logger(getClass());

        log.debug("Initializing the Kafka consumer");
        // Configure the timeout time of the request. After the timeout time and the number of retries, there is still no response to fail processing
        this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        // Timeout for heartbeat with consumer group. If it exceeds the time limit, it will leave the consumption group and trigger redistribution.
        int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
        // Before reaching the fetch Min.bytes, the maximum time that the server will block before responding to the fetch request.
        int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
        // The request timeout must be greater than the heartbeat timeout and the maximum blocking time
        if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
            throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
        this.time = Time.SYSTEM;

        // Collect indicators of consumer clients
        Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                .tags(metricsTags);
        List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time);
        // Wait time between retries after request failure
        this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

        // load interceptors and make sure they get clientId
        // Load the user configured interceptor and enter the clientID
        Map<String, Object> userProvidedConfigs = config.originals();
        userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class);
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        // Read the deserializer of key and vlaue. If the user has no configuration, these two configurations will be ignored
        if (keyDeserializer == null) {
            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    Deserializer.class);
            this.keyDeserializer.configure(config.originals(), true);
        } else {
            config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
            this.keyDeserializer = keyDeserializer;
        }
        if (valueDeserializer == null) {
            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    Deserializer.class);
            this.valueDeserializer.configure(config.originals(), false);
        } else {
            config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
            this.valueDeserializer = valueDeserializer;
        }
        // Initialize a cluster listener instance
        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
        // Configure metadata information. In addition to exception triggered update, metadata will be updated regularly
        this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                true, false, clusterResourceListeners);
        // Start configuring server address
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
        this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
        String metricGrpPrefix = "consumer";
        ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);

        // Configure transaction isolation level
        IsolationLevel isolationLevel = IsolationLevel.valueOf(
                config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);

        // The heartbeat coordination time with the consumption group must be set lower than < code > session timeout. MS < / code >, but usually should not be set higher than 1 / 3 of this value. It can be adjusted lower to control the expected time of normal rebalancing.
        int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

        // Initialize a network client
        NetworkClient netClient = new NetworkClient(
                new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
                this.metadata,
                clientId,
                100, // a fixed large enough value will suffice for max in-flight requests
                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                time,
                true,
                new ApiVersions(),
                throttleTimeSensor,
                logContext);
        // Convert network client to consumer client
        this.client = new ConsumerNetworkClient(
                logContext,
                netClient,
                metadata,
                time,
                retryBackoffMs,
                config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
        // Configure offset reset policy
        OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
        // Subject partition offset tracking
        this.subscriptions = new SubscriptionState(offsetResetStrategy);
        // Distribution distributor
        this.assignors = config.getConfiguredInstances(
                ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                PartitionAssignor.class);
        // Create a consumer group coordinator instance with metadata monitoring
        this.coordinator = new ConsumerCoordinator(logContext,
                this.client,
                groupId,
                config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
                config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                heartbeatIntervalMs,
                assignors,
                this.metadata,
                this.subscriptions,
                metrics,
                metricGrpPrefix,
                this.time,
                retryBackoffMs,
                config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                this.interceptors,
                config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
                config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
        // The crawler gets the thread with the broker
        this.fetcher = new Fetcher<>(
                logContext,
                this.client,
                config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                this.keyDeserializer,
                this.valueDeserializer,
                this.metadata,
                this.subscriptions,
                metrics,
                metricsRegistry.fetcherMetrics,
                this.time,
                this.retryBackoffMs,
                this.requestTimeoutMs,
                isolationLevel);

        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);

        log.debug("Kafka consumer initialized");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed
        // this is to prevent resource leak. see KAFKA-2121
        close(0, true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}

Subscribe to topics

The subscription only retains the last Topic, which can be one, a group, or regular topics (which will change dynamically).

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
    // Get lock and check if consumer is off
    acquireAndEnsureOpen();
    try {
        if (topics == null) {
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
        } else if (topics.isEmpty()) {
            // treat subscribing to empty topic list as the same as unsubscribing
            // The subject is empty, and the effect is consistent with unsubscribing
            this.unsubscribe();
        } else {
            for (String topic : topics) {
                if (topic == null || topic.trim().isEmpty())
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
            }
            // Check partition distributor
            throwIfNoAssignorsConfigured();

            log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
            // Modify subscription topic
            this.subscriptions.subscribe(new HashSet<>(topics), listener);
            // If the Topic information in the metadata does not contain the Topics of the current consumption group, the Topics in the metadata will be updated
            metadata.setTopics(subscriptions.groupSubscription());
        }
    } finally {
        release();
    }
}
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
    if (listener == null)
        throw new IllegalArgumentException("RebalanceListener cannot be null");
    // Determine subscription type
    setSubscriptionType(SubscriptionType.AUTO_TOPICS);
    // Determine consumer balance listener
    this.rebalanceListener = listener;
    // Change the consumer's subscription message and increase the subscription message of the consumer
    changeSubscription(topics);
}

Consumption data

Obtain a batch of messages through the poll method. timeout is the time that can be spent polling messages one at a time.

@Override
public ConsumerRecords<K, V> poll(long timeout) {
    acquireAndEnsureOpen();
    try {
        // The polling time length of 0 is the immediate range
        if (timeout < 0)
            throw new IllegalArgumentException("Timeout must not be negative");

        // Consumer has not subscribed to a topic or assigned a partition
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

        // poll for new data until the timeout expires
        long start = time.milliseconds();
        long remaining = timeout;
        do {
            //One time polling data
            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
            if (!records.isEmpty()) {
                // before returning the fetched records, we can send off the next round of fetches
                // and avoid block waiting for  their responses to enable pipelining while the user
                // is handling the fetched records.
                //
                // NOTE: since the consumed position has already been updated, we must not allow
                // wakeups or any other errors to be triggered prior to returning the fetched records.
                // Before returning the acquired records, we can send the next round of acquisition and avoid blocking waiting for their response to enable the pipeline when users process the acquired records.
                // Note: since the consumed location has been updated, we cannot allow a wake-up or any other error to be triggered before returning the acquired record.
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
                    client.pollNoWakeup();

                // Interceptor processing -- there is no interceptor currently, and the business needs to implement it by itself
                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }

            long elapsed = time.milliseconds() - start;
            remaining = timeout - elapsed;
        } while (remaining > 0);

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

Keywords: Java Big Data kafka

Added by gauravupadhyaya on Mon, 17 Jan 2022 20:19:48 +0200