KAFKA Foundation: Kafka API Producer API/Consumer API

This article is about Learning Guide for Big Data Specialists from Zero (Full Upgrade) Kafka: Partial supplement.

1 Producer API

1.1 Message Sending Process

Kafka's roducer sends messages asynchronously, involving two threads, the main thread and the Ender thread, and a thread-sharing variable, RecordAccumulator.

The main thread sends messages to the RecordAccumulator, and the Sender thread continuously pulls and cancels messages from the RecordAccumulator to the Kafka broker.

Related parameters:

Batch.size: sender will not send data until the data has accumulated to batch.size.

linger.ms: If the data is too late to reach batch.size, sender waits for linger.time to send the data.

1.2 Asynchronous Send API

1) Import Dependency

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.0</version>
        </dependency>
</dependencies>

2) Add log4j profile

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- Type Name Console,Name is a required attribute -->
        <Appender type="Console" name="STDOUT">
            <!-- Layout as PatternLayout The way,
            The output style is[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- Additivity is false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig Set up -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>

</Configuration>

3) Write code

Classes to be used:

KafkaProducer: You need to create a producer object to send data

ProducerConfig: Gets the required set of configuration parameters

ProducerRecord: Each data is encapsulated as a ProducerRecord object

1. API without callback function

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        //kafka cluster, broker-list
        props.put("bootstrap.servers", "hadoop102:9092");

        props.put("acks", "all");

        //retry count
        props.put("retries", 1); 

        //Batch size
        props.put("batch.size", 16384); 

        //waiting time
        props.put("linger.ms", 1); 

        //RecordAccumulator Buffer Size
        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
        }

        producer.close();
    }
}

2. API with callback function

The callback function is called when producer receives an ack and is called asynchronously. The method has two parameters, RecordMetadata and Exception. If Exception is null, the message is sent successfully. If Exception is not null, the message is sent unsuccessfully.

Note: Message sending failures are retried automatically and do not require us to retry manually in the callback function.

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();

        props.put("bootstrap.servers", "hadoop102:9092");//kafka cluster, broker-list

        props.put("acks", "all");

        props.put("retries", 1);//retry count

        props.put("batch.size", 16384);//Batch size

        props.put("linger.ms", 1);//waiting time

        props.put("buffer.memory", 33554432);//RecordAccumulator Buffer Size

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {

                //Callback function, which is called asynchronously when Producer receives an ack
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}

1.3 Partitioner

1) DefaultPartitioner

2) Custom Partitioner

public class MyPartitioner implements Partitioner {
    /**
     * Calculate which partition a message is sent to
     * @param topic theme
     * @param key   Message key
     * @param keyBytes The serialized byte array of the message key
     * @param value Message value
     * @param valueBytes   The serialized byte array of the message's value
     * @param cluster
     * @return
     *
     * Requirements: Take the atguigu theme for example, two partitions
     *       The value of the message contains the Entry 0 partition of "atguigu"
     *       Other messages enter partition 1
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String msgValue = value.toString();
        int partition ;
        if(msgValue.contains("atguigu")){
            partition = 0;
        }else{
            partition = 1;
        }
        return partition;
    }

    /**
     * End Work
     */
    @Override
    public void close() {

    }

    /**
     * Read Configured
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

1.4 Synchronous Send API

Synchronous sending means that after a message is sent, the current thread is blocked until ack is returned.

Because the send method returns a Future object, we can also achieve the effect of synchronous sending according to the characteristics of the Future object, which only needs to be sent by calling the get of the Future object.

package com.atguigu.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();

        props.put("bootstrap.servers", "hadoop102:9092");//kafka cluster, broker-list

        props.put("acks", "all");

        props.put("retries", 1);//retry count

        props.put("batch.size", 16384);//Batch size

        props.put("linger.ms", 1);//waiting time

        props.put("buffer.memory", 33554432);//RecordAccumulator Buffer Size

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();
    }
}

2 Consumer API

Consumer consumer data is easily guaranteed to be reliable because it is persistent in Kafka, so there is no need to worry about data loss.

Because consumer may have power outage and other malfunctions in the process of consumption, it needs to continue to consume from the location before the malfunction after consumer recovers, so consumer needs to record which offset he consumed in real time so that he can continue to consume after the malfunction recovers.

Therefore, the maintenance of offset is a must for Consumer consumption data.

2.1 Autocommit offset

1) Import Dependency

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

2) Write code

Classes to be used:

KafkaConsumer: You need to create a consumer object to consume data

ConsumerConfig: Gets the required set of configuration parameters

ConsuemrRecord: Each data is encapsulated as a ConsumerRecord object

To enable us to focus on our business logic, Kafka provides the ability to autocommit offsets. Autocommit offset parameters:

enable.auto.commit: Turn on automatic commit offset

auto.commit.interval.ms: Time interval for automatic offset submission

The following is the code for automatically submitting offset s:

package com.atguigu.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {

public static void main(String[] args) {

        Properties props = new Properties();

        props.put("bootstrap.servers", "hadoop102:9092");

        props.put("group.id", "test");

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("first"));

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

2.2 Reset Offset

auto.offset.rest = earliest | latest | none |

2.3 Manual submission of offset

Although automatic offset submission is very convenient, due to its time-based submission, it is difficult for developers to grasp the timing of offset submission. Therefore, Kafka also provides an API for manual offset submission.

There are two ways to manually submit offsets: commitSync (synchronous commit) and commitAsync (asynchronous commit). The same thing is that both submit the highest number of offsets for this poll. The difference is that commitSync blocks the current thread until the commit succeeds and automatically fails to retry.(Submission failures also occur due to uncontrollable factors); however, commitAsync does not have a failed retry mechanism and therefore may fail to submit.

1) Submit offset synchronously

The synchronous submission offset is more reliable because it has a failed retry mechanism. Below is an example of synchronous submission offset.

package com.atguigu.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomComsumer {

    public static void main(String[] args) {

        Properties props = new Properties();

//Kafka Cluster
        props.put("bootstrap.servers", "hadoop102:9092"); 

//Consumer group, as long as the group.id is the same, belongs to the same consumer group
        props.put("group.id", "test"); 

        props.put("enable.auto.commit", "false");//Turn off automatic submission offset

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("first"));//Consumer Subscription Subject

        while (true) {

//Consumer pull data
            ConsumerRecords<String, String> records = consumer.poll(100); 

            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            }

//Synchronous commit, current thread will block until offset commit succeeds
            consumer.commitSync();
        }
    }
}

2) Submit offset asynchronously

Although synchronous submission offsets are more reliable, they can block the current thread until the submission succeeds. Throughput can therefore be significantly affected. In more cases, asynchronous submission offsets are preferred.

The following is an example of asynchronous submission of offset:

package com.atguigu.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;


public class CustomConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();

        //Kafka Cluster
        props.put("bootstrap.servers", "hadoop102:9092"); 

        //Consumer group, as long as the group.id is the same, belongs to the same consumer group
        props.put("group.id", "test"); 

        //Turn off automatic submission offset
        props.put("enable.auto.commit", "false");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));//Consumer Subscription Subject

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//Consumer pull data
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }

//Asynchronous commit
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("Commit failed for" + offsets);
                    }
                }
            }); 
        }
    }
}

3) Data Leakage Consumption and Repeated Consumption Analysis

Both synchronous and asynchronous submissions of offsets may result in data leakage or duplicate consumption. Submitting offsets before consumption may result in data leakage; Submitting offsets after consumption may result in data leakage

Duplicate consumption.

 

2.3 Custom Storage offset

Before Kafka version 0.9, offsets were stored in zookeeper, version 0.9 and later. By default, offsets were stored in a built-in topic in Kafka. In addition, Kafka has the option of customizing storage offsets.

Maintenance of offset s is cumbersome, as consumer Rebalace needs to be considered.

When a new consumer joins a consumer group, an existing consumer launches a consumer group, or the partition of a subscribed topic changes, it triggers a redistribution of the partition, a process called Rebalance.

After a consumer's Rebalance occurs, the partition of each consumer's consumption changes. Therefore, the consumer should first obtain the partition to which he or she has been reassigned and locate the offset that each partition recently submitted to continue to consume.

To implement a custom storage offset, you need to use ConsumerRebalanceListener, which is an example code in which the method of submitting and getting offsets needs to be implemented by the offset storage system you choose.

 

 View Code

Keywords: Big Data kafka

Added by jtapoling on Wed, 22 Sep 2021 22:11:25 +0300