Kafka Java client use

Article catalog

1. Common configuration

producer

to configure describe type Default
bootstrap.servers It is used to establish the connection with kafka cluster and only affects the hosts used for initialization to discover all servers.
Format: host1:port1,host2:port2 , as many as possible in case one of them goes down.
list
acks The number of acknowledgments that the producer requested the Leader to have received before the acknowledgment request was completed.
acks = 0: if set to zero, the producer will not wait at all for any confirmation from the server. The record is immediately added to the socket buffer and considered sent. In this case, there is no guarantee that the server has received the record, and the retry configuration will not take effect (because the client usually does not know about any failures). The offset provided for each record will always be set to - 1.
acks = 1: this means that the Leader will write the record to its local log, but will respond without waiting for full acknowledgement from all Followers. In this case, if the Leader fails immediately after confirming the record, but fails before the follower copies the record, the record will be lost.
acks=all or acks=-1: this means that the Leader will wait for the full synchronous replica set to confirm the record. This ensures that as long as at least one synchronous copy is still active, records are not lost. This is the strongest guarantee.
string 1
request.timeout.ms The maximum time a client waits for a response to a request. If no response is received before the timeout, the client will resend the request if necessary, and if the retry is exhausted, the request will fail. int 30000
retries Number of retries when the request failed. int 2147483647
batch.size Batch size, batch is a group of messages, which belong to the same topic and partition. The network overhead can be reduced by dividing messages into batches for transmission. int 163834
linger.ms Send delay, that is, when sending a message, the producer waits for a given delay before it is allowed to send, so that more messages can be aggregated. long 0
buffer.memory The producer can buffer the total memory bytes of records waiting to be sent to the server, which is the maximum memory managed by the producer. If records are sent faster than records are sent to the server, producers block max.block.ms , after which it will throw an exception. long 33554432
key.serializer Message key serialization method class
value.serializer Serialization of the message itself class

consumer

to configure describe type Default
bootstrap.servers It is used to establish the connection with kafka cluster and only affects the hosts used for initialization to discover all servers.
Format: host1:port1,host2:port2 , as many as possible in case one of them goes down.
list
group.id Unique identification of the consumer group to which the consumer belongs string null
enable.auto.commit If true, the consumer's offset is submitted periodically in the background. boolean true
auto.commit.interval.ms If enable.auto.commit Set to true, the frequency, in milliseconds, that consumer offset s are automatically submitted to Kafka. int 5000
session.timeout.ms Timeout to detect whether the consumer has failed. The value must be in the group.min.session.timeout.ms and group.max.session.timeout.ms Within the allowable range. int 10000
group.min.session.timeout.ms The minimum timeout to detect whether the consumer has failed. int 6000
group.max.session.timeout.ms The maximum timeout to detect whether the consumer has failed. int 1800000
key.deserializer Message key deserialization method class
value.deserializer Message value deserialization method class

2. kafka-clients

Add dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

2.1 producers

Send messages synchronously

KafkaProducer is thread safe and can share producer instances among multiple threads.

@Test
public void syncProducer() {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094");
	properties.put("acks", "all");
    properties.put("retries", 0);
    properties.put("batch.size", 16384);
    properties.put("buffer.memory", 33554432);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // Send 10 messages synchronously
    for (int i = 1; i <= 10; i++) {
        kafkaProducer.send(new ProducerRecord<>("java-client-test", "test,test,test " + i));
    }

    try{
        // Synchronously send a message to obtain RecordMetadata. The server has confirmed the metadata of the record
        Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<>("java-client-test", "test,test,test"));
        RecordMetadata recordMetadata = future.get();
        System.out.printf("RecordMetadata : topic = %s, partition = %d, offset = %d, toString() = %s\n",
                recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset(),recordMetadata.toString());
    } catch(Exception e) {
        // Connection error and No Leader error can be solved by retrying
        // If the message is too large, kafkaProducer will not try again and throw an exception directly
        e.printStackTrace();
    }

    kafkaProducer.close();
}

Send message asynchronously

When a message is sent, a callback object is passed

@Test
public void producer() {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094");
	properties.put("acks", "all");
    properties.put("retries", 0);
    properties.put("batch.size", 16384);
    properties.put("buffer.memory", 33554432);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

    // Send 10 messages asynchronously
    for (int i = 1; i <= 10; i++) {
        // When a message is sent, a callback object is passed
        kafkaProducer.send(new ProducerRecord<>("java-client-test", "test,test,test " + i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.printf("ProducerCallback RecordMetadata : topic = %s, partition = %d, offset = %d, toString() = %s\n",
                        recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset(),recordMetadata.toString());

                // If Kafka returns an error, the onCompletion method throws a non null exception.
                if (e != null) {
                    // Deal with the exception. Here it is simply printed out
                    e.printStackTrace();
                }
            }
        });
    }

    kafkaProducer.close();
}

2.2 consumers

KafkaConsumer is thread unsafe.

@Test
public void consumer() {
	Properties properties = new Properties();
	properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094");
	properties.put("group.id", "test");
	properties.put("enable.auto.commit", "true");
	properties.put("auto.commit.interval.ms", "1000");
	properties.put("session.timeout.ms", "30000");
	properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
	properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

	KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
	
	consumer.subscribe(Arrays.asList("java-client-test"));

	try{
		while (true) {
			// Pull data every 100ms
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
			for (ConsumerRecord<String, String> record : records){
				System.out.printf("consumption : topic = %s, partition = %d, offset = %d, value = %s\n",
						record.topic(),record.partition(),record.offset(), record.value());
			}
		}
	}finally {
		consumer.close();
	}
}

2.3 multithreading

producer
Define the producer as a tool class

public class BaseProducer {

    private static KafkaProducer<String, String> kafkaProducer;

    static {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        kafkaProducer = new KafkaProducer<>(properties);

        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                BaseProducer.kafkaProducer.close();
            }
        });
    }

    public static void send(String content, String... topics){
        if (topics != null && topics.length > 0) {
            for (int i = 0;i < topics.length; i++) {
                System.out.println("send " + topics[i] + " message " + content);
                kafkaProducer.send(new ProducerRecord<>(topics[i], content));
            }
        }
    }
}

Multithreaded consumer
Implement Runnable, define an abstract consumer, and each thread has a KafkaConsumer.

public abstract class AbstractConsumer implements Runnable {

    protected KafkaConsumer<String, String> consumer;

    protected String topic;

    public AbstractConsumer(String topic) {
        this.topic = topic;
    }

    /**
     * Create consumer, subscribe to topic
     */
    private void connect() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094");
        properties.put("group.id", "test-user");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("topic-user"));
    }

    @Override
    public void run() {
        this.connect();

        while(true) {
            try {
                ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(500));
                Iterator iterator = consumerRecords.iterator();
                while(iterator.hasNext()) {
                    ConsumerRecord<String, String> item = (ConsumerRecord)iterator.next();
                    this.handler(item.value());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void handler(String message) {
        this.consumer.commitAsync();
    }

}

Inherit the AbstractConsumer, implement the handler() method, and consume the message in it.

public class UserConsumer extends AbstractConsumer {

    public UserConsumer(String topic) {
        super(topic);
    }

    @Override
    public void handler(String message) {
        try {
            // Consumption data
            List<User> userList = JSON.parseObject(message, new TypeReference<List<User>>(){});

            if (!userList.isEmpty()) {
                for (User user : userList) {
                    System.out.println(user);
                }
            }

            this.consumer.commitAsync();
        }catch (Exception e) {
            this.consumer.commitAsync();
            e.printStackTrace();
        }

    }
}

Consumer groups, defining thread pools

public class UserConsumerGroup {

    private List<Runnable> consumerThreadList = new ArrayList<>();

    public UserConsumerGroup() {
        consumerThreadList.add(new UserConsumer("topic-user"));
    }

    public void start() {
        ThreadFactory factory = r -> new Thread(r, "ThreadPool thread: UserConsumerGroup");
        // Core thread pool size; maximum thread pool size; maximum thread idle time; time unit; thread waiting queue; thread creation factory
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 200, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(3), factory);
        for (Runnable item : consumerThreadList) {
            executor.execute(item);
        }
        executor.shutdown();
    }
}

reference resources:
Kafka official documents
Kafka clients introduction

Keywords: kafka Apache Session Java

Added by Gmans on Sat, 13 Jun 2020 07:34:03 +0300