Using Kafka message queue in springboot project

1. Kafka message producer

1) Create the springboot Kafka producer demo project and import the jar package.

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.4.5.RELEASE</version>
</dependency>

2) Add Kafka producer configuration item in application.properties

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.template.default-topic=topic-test
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.concurrency= 3

spring.kafka.producer.client-id=${spring.application.name}
# Number of retries when write failed. When the leader node fails, a repli node will be replaced by a leader node. At this time, write failure may occur,
# When retris is 0, produce does not repeat. retirs resend. At this time, the repli node becomes the leader node completely, and no message is lost.
spring.kafka.producer.retries=0
# The number of messages sent in batches each time, the product accumulates to a certain amount of data, and sends them once
spring.kafka.producer.batch-size=16384
# Once the product accumulates data and sends it, the data will be sent when the cache size reaches buffer.memory
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#request.required.acks has three values 0 1 - 1
#0: the producer will not wait for the broker's ack. The delay is the lowest but the storage guarantee is the weakest. When the server hangs up, the data will be lost
#1: The server will wait for the ack value leader copy to confirm receiving the message and then send the ack. However, if the leader fails to ensure that the new leader is copied, the data will be lost
#-1: Also, on the basis of 1, the server will wait for all the follower s' copies to receive the data before receiving the ack issued by the leader, so that the data will not be lost
spring.kafka.producer.acks=-1

3) Add @ EnableKafka annotation to Kafka message producer startup class

4) Write Kafka message sending class - Kafka message producer

@Slf4j
@Component
public class KafkaSender {

    @Autowired
    private Globals globals;
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * Send message method
     * @param msg
     */
    public ResponseEntity send(String msg) {
        log.info("send message,Message content : {}", msg);
        try {
            String uuid = UUID.randomUUID().toString();
            String topic = globals.getTopic();
            Message message = new Message();
            message.setId(uuid);
            message.setMsg(msg);
            message.setSendTime(new Date());

            ListenableFuture listenableFuture = kafkaTemplate.send(topic, uuid, JSON.toJSONString(message));

            //Callback after sending successfully
            SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {
                @Override
                public void onSuccess(SendResult<String,String> result) {
                    log.info("Message sent successfully");
                }
            };
            //Send failed callback
            FailureCallback failureCallback = new FailureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("Failed to send message", ex);
                }
            };

            listenableFuture.addCallback(successCallback,failureCallback);
        }catch (Exception e){
            log.error("Sending message exception", e);
        }
        return new ResponseEntity("", HttpStatus.OK);
    }
}

5) Send message test class

public class MsgSenderTest extends KafkaProducerApplicationTest {
    @Autowired
    MsgSender sender;

    @Test
    public void send() {
        sender.send("This is Kafka Message content sent" + System.currentTimeMillis());
    }
}

2. Kafka message consumer

1) Create the springboot Kafka consumer demo project and import the jar package.

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.4.5.RELEASE</version>
</dependency>

2) Add Kafka consumer configuration item in application.properties

#============== kafka ===================
# Specify kafka proxy address, multiple
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.template.default-topic=topic-test
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.concurrency=3
spring.cloud.bus.trace.enabled=true

#=============== consumer  =======================
spring.kafka.consumer.client-id=${spring.application.name}
# Specify the default consumer group ID -- > because in kafka, the consumers in the same group will not read the same message, relying on the group ID to set the group name
spring.kafka.consumer.group-id=test
# Smallest and largest are only valid. If smallest starts to read again from 0, then largest will read from the offset of logfile. In general, we set up smalles
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
# If 'enable.auto.commit' is true, the consumer offset is automatically submitted to Kafka in milliseconds, with a default value of 5000.
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=10
# Specifies the encoding and decoding method of message key and message body
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3) Add @ EnableKafka annotation to Kafka message consumer startup class

4) Create Kafka message consumer class with @ KafkaListener

@Component
public class KafkaBatchConsumer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = {"${spring.kafka.template.default-topic}"},
            containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord> records, Acknowledgment ack) {
        try {
            for (ConsumerRecord record : records) {
                logger.info("receive messages: offset = {}, key = {}, value = {} ",
                        record.offset(), record.key(), record.value());
            }
        } catch (Exception e) {
            logger.error("kafka Receive message exception",e);
        } finally {
            //Manually submit offset
            ack.acknowledge();
        }
    }
}

3. Test Kafka

Start the Kafka message consumer springboot Kafka consumer demo project first

1) Execute the send() method in MsgSenderTest class of Kafka message producer, as shown below

2) Switch to Kafka message consumer view consumption results

 

Source code example: https://gitee.com/lion123/springboot-kafka-demo

Keywords: Programming kafka Spring SpringBoot Apache

Added by Patrick3002 on Tue, 14 Apr 2020 18:03:10 +0300