Article catalog
1. Common configuration
producer
to configure | describe | type | Default |
---|---|---|---|
bootstrap.servers | It is used to establish the connection with kafka cluster and only affects the hosts used for initialization to discover all servers. Format: host1:port1,host2:port2 , as many as possible in case one of them goes down. |
list | |
acks | The number of acknowledgments that the producer requested the Leader to have received before the acknowledgment request was completed. acks = 0: if set to zero, the producer will not wait at all for any confirmation from the server. The record is immediately added to the socket buffer and considered sent. In this case, there is no guarantee that the server has received the record, and the retry configuration will not take effect (because the client usually does not know about any failures). The offset provided for each record will always be set to - 1. acks = 1: this means that the Leader will write the record to its local log, but will respond without waiting for full acknowledgement from all Followers. In this case, if the Leader fails immediately after confirming the record, but fails before the follower copies the record, the record will be lost. acks=all or acks=-1: this means that the Leader will wait for the full synchronous replica set to confirm the record. This ensures that as long as at least one synchronous copy is still active, records are not lost. This is the strongest guarantee. |
string | 1 |
request.timeout.ms | The maximum time a client waits for a response to a request. If no response is received before the timeout, the client will resend the request if necessary, and if the retry is exhausted, the request will fail. | int | 30000 |
retries | Number of retries when the request failed. | int | 2147483647 |
batch.size | Batch size, batch is a group of messages, which belong to the same topic and partition. The network overhead can be reduced by dividing messages into batches for transmission. | int | 163834 |
linger.ms | Send delay, that is, when sending a message, the producer waits for a given delay before it is allowed to send, so that more messages can be aggregated. | long | 0 |
buffer.memory | The producer can buffer the total memory bytes of records waiting to be sent to the server, which is the maximum memory managed by the producer. If records are sent faster than records are sent to the server, producers block max.block.ms , after which it will throw an exception. | long | 33554432 |
key.serializer | Message key serialization method | class | |
value.serializer | Serialization of the message itself | class |
consumer
to configure | describe | type | Default |
---|---|---|---|
bootstrap.servers | It is used to establish the connection with kafka cluster and only affects the hosts used for initialization to discover all servers. Format: host1:port1,host2:port2 , as many as possible in case one of them goes down. |
list | |
group.id | Unique identification of the consumer group to which the consumer belongs | string | null |
enable.auto.commit | If true, the consumer's offset is submitted periodically in the background. | boolean | true |
auto.commit.interval.ms | If enable.auto.commit Set to true, the frequency, in milliseconds, that consumer offset s are automatically submitted to Kafka. | int | 5000 |
session.timeout.ms | Timeout to detect whether the consumer has failed. The value must be in the group.min.session.timeout.ms and group.max.session.timeout.ms Within the allowable range. | int | 10000 |
group.min.session.timeout.ms | The minimum timeout to detect whether the consumer has failed. | int | 6000 |
group.max.session.timeout.ms | The maximum timeout to detect whether the consumer has failed. | int | 1800000 |
key.deserializer | Message key deserialization method | class | |
value.deserializer | Message value deserialization method | class |
2. kafka-clients
Add dependency
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency>
2.1 producers
Send messages synchronously
KafkaProducer is thread safe and can share producer instances among multiple threads.
@Test public void syncProducer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // Send 10 messages synchronously for (int i = 1; i <= 10; i++) { kafkaProducer.send(new ProducerRecord<>("java-client-test", "test,test,test " + i)); } try{ // Synchronously send a message to obtain RecordMetadata. The server has confirmed the metadata of the record Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<>("java-client-test", "test,test,test")); RecordMetadata recordMetadata = future.get(); System.out.printf("RecordMetadata : topic = %s, partition = %d, offset = %d, toString() = %s\n", recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset(),recordMetadata.toString()); } catch(Exception e) { // Connection error and No Leader error can be solved by retrying // If the message is too large, kafkaProducer will not try again and throw an exception directly e.printStackTrace(); } kafkaProducer.close(); }
Send message asynchronously
When a message is sent, a callback object is passed
@Test public void producer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // Send 10 messages asynchronously for (int i = 1; i <= 10; i++) { // When a message is sent, a callback object is passed kafkaProducer.send(new ProducerRecord<>("java-client-test", "test,test,test " + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.printf("ProducerCallback RecordMetadata : topic = %s, partition = %d, offset = %d, toString() = %s\n", recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset(),recordMetadata.toString()); // If Kafka returns an error, the onCompletion method throws a non null exception. if (e != null) { // Deal with the exception. Here it is simply printed out e.printStackTrace(); } } }); } kafkaProducer.close(); }
2.2 consumers
KafkaConsumer is thread unsafe.
@Test public void consumer() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094"); properties.put("group.id", "test"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("java-client-test")); try{ while (true) { // Pull data every 100ms ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records){ System.out.printf("consumption : topic = %s, partition = %d, offset = %d, value = %s\n", record.topic(),record.partition(),record.offset(), record.value()); } } }finally { consumer.close(); } }
2.3 multithreading
producer
Define the producer as a tool class
public class BaseProducer { private static KafkaProducer<String, String> kafkaProducer; static { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProducer = new KafkaProducer<>(properties); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { BaseProducer.kafkaProducer.close(); } }); } public static void send(String content, String... topics){ if (topics != null && topics.length > 0) { for (int i = 0;i < topics.length; i++) { System.out.println("send " + topics[i] + " message " + content); kafkaProducer.send(new ProducerRecord<>(topics[i], content)); } } } }
Multithreaded consumer
Implement Runnable, define an abstract consumer, and each thread has a KafkaConsumer.
public abstract class AbstractConsumer implements Runnable { protected KafkaConsumer<String, String> consumer; protected String topic; public AbstractConsumer(String topic) { this.topic = topic; } /** * Create consumer, subscribe to topic */ private void connect() { Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.110.40:9092,192.168.110.40:9093,192.168.110.40:9094"); properties.put("group.id", "test-user"); properties.put("enable.auto.commit", "false"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("topic-user")); } @Override public void run() { this.connect(); while(true) { try { ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(500)); Iterator iterator = consumerRecords.iterator(); while(iterator.hasNext()) { ConsumerRecord<String, String> item = (ConsumerRecord)iterator.next(); this.handler(item.value()); } } catch (Exception e) { e.printStackTrace(); } } } public void handler(String message) { this.consumer.commitAsync(); } }
Inherit the AbstractConsumer, implement the handler() method, and consume the message in it.
public class UserConsumer extends AbstractConsumer { public UserConsumer(String topic) { super(topic); } @Override public void handler(String message) { try { // Consumption data List<User> userList = JSON.parseObject(message, new TypeReference<List<User>>(){}); if (!userList.isEmpty()) { for (User user : userList) { System.out.println(user); } } this.consumer.commitAsync(); }catch (Exception e) { this.consumer.commitAsync(); e.printStackTrace(); } } }
Consumer groups, defining thread pools
public class UserConsumerGroup { private List<Runnable> consumerThreadList = new ArrayList<>(); public UserConsumerGroup() { consumerThreadList.add(new UserConsumer("topic-user")); } public void start() { ThreadFactory factory = r -> new Thread(r, "ThreadPool thread: UserConsumerGroup"); // Core thread pool size; maximum thread pool size; maximum thread idle time; time unit; thread waiting queue; thread creation factory ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 200, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(3), factory); for (Runnable item : consumerThreadList) { executor.execute(item); } executor.shutdown(); } }
reference resources:
Kafka official documents
Kafka clients introduction