Using Kafka message queue in springboot project

1. Kafka message producer

1) Create the springboot Kafka producer demo project and import the jar package.


2) Add Kafka producer configuration item in

spring.kafka.listener.concurrency= 3

# Number of retries when write failed. When the leader node fails, a repli node will be replaced by a leader node. At this time, write failure may occur,
# When retris is 0, produce does not repeat. retirs resend. At this time, the repli node becomes the leader node completely, and no message is lost.
# The number of messages sent in batches each time, the product accumulates to a certain amount of data, and sends them once
# Once the product accumulates data and sends it, the data will be sent when the cache size reaches buffer.memory
#request.required.acks has three values 0 1 - 1
#0: the producer will not wait for the broker's ack. The delay is the lowest but the storage guarantee is the weakest. When the server hangs up, the data will be lost
#1: The server will wait for the ack value leader copy to confirm receiving the message and then send the ack. However, if the leader fails to ensure that the new leader is copied, the data will be lost
#-1: Also, on the basis of 1, the server will wait for all the follower s' copies to receive the data before receiving the ack issued by the leader, so that the data will not be lost

3) Add @ EnableKafka annotation to Kafka message producer startup class

4) Write Kafka message sending class - Kafka message producer

public class KafkaSender {

    private Globals globals;
    private KafkaTemplate<String,String> kafkaTemplate;

     * Send message method
     * @param msg
    public ResponseEntity send(String msg) {"send message,Message content : {}", msg);
        try {
            String uuid = UUID.randomUUID().toString();
            String topic = globals.getTopic();
            Message message = new Message();
            message.setSendTime(new Date());

            ListenableFuture listenableFuture = kafkaTemplate.send(topic, uuid, JSON.toJSONString(message));

            //Callback after sending successfully
            SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {
                public void onSuccess(SendResult<String,String> result) {
          "Message sent successfully");
            //Send failed callback
            FailureCallback failureCallback = new FailureCallback() {
                public void onFailure(Throwable ex) {
                    log.error("Failed to send message", ex);

        }catch (Exception e){
            log.error("Sending message exception", e);
        return new ResponseEntity("", HttpStatus.OK);

5) Send message test class

public class MsgSenderTest extends KafkaProducerApplicationTest {
    MsgSender sender;

    public void send() {
        sender.send("This is Kafka Message content sent" + System.currentTimeMillis());

2. Kafka message consumer

1) Create the springboot Kafka consumer demo project and import the jar package.


2) Add Kafka consumer configuration item in

#============== kafka ===================
# Specify kafka proxy address, multiple

#=============== consumer  =======================
# Specify the default consumer group ID -- > because in kafka, the consumers in the same group will not read the same message, relying on the group ID to set the group name
# Smallest and largest are only valid. If smallest starts to read again from 0, then largest will read from the offset of logfile. In general, we set up smalles
# If '' is true, the consumer offset is automatically submitted to Kafka in milliseconds, with a default value of 5000.
# Specifies the encoding and decoding method of message key and message body

3) Add @ EnableKafka annotation to Kafka message consumer startup class

4) Create Kafka message consumer class with @ KafkaListener

public class KafkaBatchConsumer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = {"${spring.kafka.template.default-topic}"},
            containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord> records, Acknowledgment ack) {
        try {
            for (ConsumerRecord record : records) {
      "receive messages: offset = {}, key = {}, value = {} ",
                        record.offset(), record.key(), record.value());
        } catch (Exception e) {
            logger.error("kafka Receive message exception",e);
        } finally {
            //Manually submit offset

3. Test Kafka

Start the Kafka message consumer springboot Kafka consumer demo project first

1) Execute the send() method in MsgSenderTest class of Kafka message producer, as shown below

2) Switch to Kafka message consumer view consumption results


Source code example:

Keywords: Programming kafka Spring SpringBoot Apache

Added by Patrick3002 on Tue, 14 Apr 2020 18:03:10 +0300