Analysis of kafka consumption message source code
Example code of consumption message
package com.example.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; public class ConsumerAnalysis { public static final AtomicBoolean IS_RUNNING = new AtomicBoolean(true); public static Properties initConfig() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CommonHelper.BROKER_LIST); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerAnalysisGroup-1"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "demo-consumer-client-1"); return properties; } public static void main(String[] args) { Properties properties = initConfig(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList(CommonHelper.TOPIC)); try { while (IS_RUNNING.get()) { ConsumerRecords<String, String> records = consumer.poll(10000); System.out.println("records count is " + records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println("topic=" + record.topic() + ", partition = " + record.partition() + ", offset=" + record.offset()); System.out.println("key=" + record.offset() + ", value= " + record.value()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } }
Process steps
- Configure consumer client parameters and create consumer instances
- Subscribe to Topic
- Pull messages and consume
- Submit consumption offset
- Close consumer instance
Parameter description
-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG: the corresponding parameter is actually bootstrap Servers, a list of host / port pairs used to establish an initial connection to the Kafka cluster. The client will use all servers, regardless of which servers are specified here for boot; This list affects only the initial hosts used to discover the complete set of servers. The format of this list should be host1:port1,host2:port2. Since these servers are only used for initial connections to discover complete cluster members (which may change dynamically), the list does not need to contain a complete set of servers (however, you may need multiple servers if the server is down).
-
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG: the corresponding parameter is key Deserializer, which implements org apache. kafka. common. serialization. Deserializer class for the key of the deserializer interface.
-
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG: the corresponding parameter is value Deserializer, which implements org apache. kafka. common. serialization. Deserializer class for the value of the deserializer interface.
-
ConsumerConfig.GROUP_ID_CONFIG: the corresponding parameter is group ID, a unique string identifying the consumer group to which this consumer belongs. This attribute is required if the consumer uses the group management function by using subscribe(topic) or Kafka based offset management policy.
-
ConsumerConfig.CLIENT_ID_CONFIG: the corresponding parameter is client id, the id string passed to the server when the request is made. The goal is to track the source of the request, not just the ip/port, by allowing the logical application name to be included in the server-side request log record.
Note uses the ConsumerConfig class when creating producers. In this class, the static {} block is used to initialize some default configurations. There are other producer configurations that can be observed in the ConsumerConfig class.
Create consumer instance main process
Supplementary flow chart
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { try { // If the client id parameter is not configured, a default is generated String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; // Get consumer configuration String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); this.log = logContext.logger(getClass()); log.debug("Initializing the Kafka consumer"); // Configure the timeout time of the request. After the timeout time and the number of retries, there is still no response to fail processing this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); // Timeout for heartbeat with consumer group. If it exceeds the time limit, it will leave the consumption group and trigger redistribution. int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); // Before reaching the fetch Min.bytes, the maximum time that the server will block before responding to the fetch request. int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); // The request timeout must be greater than the heartbeat timeout and the maximum blocking time if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs) throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); this.time = Time.SYSTEM; // Collect indicators of consumer clients Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); // Wait time between retries after request failure this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); // load interceptors and make sure they get clientId // Load the user configured interceptor and enter the clientID Map<String, Object> userProvidedConfigs = config.originals(); userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class); this.interceptors = new ConsumerInterceptors<>(interceptorList); // Read the deserializer of key and vlaue. If the user has no configuration, these two configurations will be ignored if (keyDeserializer == null) { this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.keyDeserializer.configure(config.originals(), true); } else { config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); this.keyDeserializer = keyDeserializer; } if (valueDeserializer == null) { this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.valueDeserializer.configure(config.originals(), false); } else { config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); this.valueDeserializer = valueDeserializer; } // Initialize a cluster listener instance ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); // Configure metadata information. In addition to exception triggered update, metadata will be updated regularly this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), true, false, clusterResourceListeners); // Start configuring server address List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0); String metricGrpPrefix = "consumer"; ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); // Configure transaction isolation level IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics); // The heartbeat coordination time with the consumption group must be set lower than < code > session timeout. MS < / code >, but usually should not be set higher than 1 / 3 of this value. It can be adjusted lower to control the expected time of normal rebalancing. int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); // Initialize a network client NetworkClient netClient = new NetworkClient( new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext), this.metadata, clientId, 100, // a fixed large enough value will suffice for max in-flight requests config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time, true, new ApiVersions(), throttleTimeSensor, logContext); // Convert network client to consumer client this.client = new ConsumerNetworkClient( logContext, netClient, metadata, time, retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation // Configure offset reset policy OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); // Subject partition offset tracking this.subscriptions = new SubscriptionState(offsetResetStrategy); // Distribution distributor this.assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); // Create a consumer group coordinator instance with metadata monitoring this.coordinator = new ConsumerCoordinator(logContext, this.client, groupId, config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), heartbeatIntervalMs, assignors, this.metadata, this.subscriptions, metrics, metricGrpPrefix, this.time, retryBackoffMs, config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); // The crawler gets the thread with the broker this.fetcher = new Fetcher<>( logContext, this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, metrics, metricsRegistry.fetcherMetrics, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka consumer initialized"); } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 close(0, true); // now propagate the exception throw new KafkaException("Failed to construct kafka consumer", t); } }
Subscribe to topics
The subscription only retains the last Topic, which can be one, a group, or regular topics (which will change dynamically).
@Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { // Get lock and check if consumer is off acquireAndEnsureOpen(); try { if (topics == null) { throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); } else if (topics.isEmpty()) { // treat subscribing to empty topic list as the same as unsubscribing // The subject is empty, and the effect is consistent with unsubscribing this.unsubscribe(); } else { for (String topic : topics) { if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); } // Check partition distributor throwIfNoAssignorsConfigured(); log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); // Modify subscription topic this.subscriptions.subscribe(new HashSet<>(topics), listener); // If the Topic information in the metadata does not contain the Topics of the current consumption group, the Topics in the metadata will be updated metadata.setTopics(subscriptions.groupSubscription()); } } finally { release(); } }
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); // Determine subscription type setSubscriptionType(SubscriptionType.AUTO_TOPICS); // Determine consumer balance listener this.rebalanceListener = listener; // Change the consumer's subscription message and increase the subscription message of the consumer changeSubscription(topics); }
Consumption data
Obtain a batch of messages through the poll method. timeout is the time that can be spent polling messages one at a time.
@Override public ConsumerRecords<K, V> poll(long timeout) { acquireAndEnsureOpen(); try { // The polling time length of 0 is the immediate range if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); // Consumer has not subscribed to a topic or assigned a partition if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // poll for new data until the timeout expires long start = time.milliseconds(); long remaining = timeout; do { //One time polling data Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. // Before returning the acquired records, we can send the next round of acquisition and avoid blocking waiting for their response to enable the pipeline when users process the acquired records. // Note: since the consumed location has been updated, we cannot allow a wake-up or any other error to be triggered before returning the acquired record. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) client.pollNoWakeup(); // Interceptor processing -- there is no interceptor currently, and the business needs to implement it by itself return this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; remaining = timeout - elapsed; } while (remaining > 0); return ConsumerRecords.empty(); } finally { release(); } }