Spring Boot2.x Integrated kafka (windows integrated environment + example explanation)

Learn kafka, this article takes you to the beginning.

Article directory

1. Build windows integrated environment

1.1 java8 installation

I don't need to explain this in detail. Since I'm learning this article, I believe that I've already installed java, but I haven't installed it. Go to find Du Niang.

1.2 zookeeper installation

Since the operation of Kafka depends on Zookeeper, you need to install and run Zookeeper before running Kafka

1.2.1 download zookeeper

Download path of Zookeeper: http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.7/ as shown above.

1.2.2 install and modify the zoo.cfg configuration file

1. Unzip it to a file (my directory is D:\webserver\apache-zookeeper-3.5.7-bin). I am used to changing the directory name to apache-zookeeper-3.5.7

2. Open D:\webserver\apache-zookeeper-3.5.7\conf, copy zoo_sample.cfg and rename it to zoo.cfg

3. Edit zoo.cfg and modify dataDir to [dataDir=D:\webserver\apache-zookeeper-3.5.7\data],
 Create the data directory in the directory D:\webserver\apache-zookeeper-3.5.7.

//zookeeper uses port 8080 by default, which conflicts with the default port of tomcat, so it's better to modify this place.
Add a line at the end of the file: admin.serverPort=8888

1.2.3 setting the zookeeper environment variable

 ZOOKEEPER_HOME          D:\webserver\apache-zookeeper-3.5.7
 Path Add after existing value     %ZOOKEEPER_HOME%\bin

As shown in the figure:

1.2.4 verify successful installation

Open CMD, enter zkServer.cmd, and run as follows:

It indicates that the operation is successful.

1.3 kafka installation

1.3.1 download kafka

Kafka download path: http://kafka.apache.org/downloads

1.3.2 decompression

. unzip the file Kafka ﹣ 2.13-2.4.0.tgz (my directory is D: \ webserver \ Kafka ﹣ 2.13-2.4.0 [do not use the directory with spaces between Program Files and other file names, or the path will not be recognized later])

1.3.3 modify the server.properties configuration file

Open the server.properties file under the directory D:\webserver\kafka_2.13-2.4.0\config, and change the log.dirs to: log.dirs = D: \ webserver \ kafka_2.13-2.4.0 \ Kafka logs

1.3.4 start kafka service

Open the command window cmd and enter the directory D: \ webserver \ Kafka? 2.13-2.4.0


Execute the following command to start the Kafka communication server:. \ bin \ windows \ kafka-server-start.bat. \ config \ server.properties

As shown in the figure above, kafka starts successfully!!! It includes version, startup time and other information.

1.3.5 test

1.3.5.1 creating a theme

Enter the Kafka file directory D:\webserver\kafka_2.13-2.4.0\bin\windows, and create the Kafka message topics

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testDemo

After execution: as shown in the figure, the execution is successful.

1.3.5.2 create Producer and Consumer

Open two cmd windows respectively, enter the directory D:\webserver\kafka_2.13-2.4.0\bin\windows, and create Producer and Consumer

cmd window 1: Producer:

kafka-console-producer.bat --broker-list localhost:9092 --topic testDemo

cmd window 2: Consumer:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testDemo

Note: kafka-console-consumer.bat --zookeeper localhost:2181 --topic testDemo is out of date and can no longer be used

1.3.5.3 test with own script

Then you can send information in the Producer and receive information in the Consumer
In the Producer window, enter: hello binge
Then observe whether the Consumer window can receive the message.

The test is completed, indicating that kafka installation is successful.

2. SpringBoot2.x integration kafka

2.1 add maven dependency

pom files need to increase their dependence on kafka, as follows:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
  </dependency>

Spring boot has configured the version number for us. You don't need to reference it here. If the reference is not correct, it will cause version compatibility problems. I've encountered such a pit.

2.2 yml configuration kafka information

Add the following kafka related information to the configuration file:

  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1
    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000

kafka:
  topic:
    group-id: topicGroupId
    topic-name: testDemo

The first part is the Spring Kafka configuration:
The second part is application specific custom configuration. We define the Kafka topic name and group ID

2.3 write kafka configuration class

2.3.1 first write the property reading file KafkaTopicProperties

package com.ieslab.powergrid.demosvr.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.io.Serializable;

@ConfigurationProperties("kafka.topic")
public class KafkaTopicProperties implements Serializable {

    private String groupId;
    private String topicName;

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }
}

This class is responsible for reading the second part of the configuration file, our application specific custom configuration.

2.3.2 write kafka configuration class KafkaTopicConfiguration

Take the properties of the properties file into effect in the configuration class.

package com.ieslab.powergrid.demosvr.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(KafkaTopicProperties.class)
public class KafkaTopicConfiguration {
    private final KafkaTopicProperties properties;

    public KafkaTopicConfiguration(KafkaTopicProperties properties) {
        this.properties = properties;
    }

    @Bean
    public String kafkaTopicName() {
        return properties.getTopicName();
    }

    @Bean
    public String topicGroupId() {
        return properties.getGroupId();
    }
}

2.4 writing producers and consumers

For convenience, we write producer and consumer classes in the same program. Create KafkaTestService

package com.ieslab.powergrid.demosvr.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
@Slf4j
public class KafkaTestService {
    private final KafkaTemplate<Integer, String> kafkaTemplate;

    /**
     * Inject KafkaTemplate
     * @param kafkaTemplate kafka Template class
     */
    @Autowired
    public KafkaTestService(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")
    public void processMessage(ConsumerRecord<Integer, String> record) {
        log.info("kafka processMessage start");
        log.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());
        // do something ...
        log.info("kafka processMessage end");
    }

    public void sendMessage(String topic, String data) {
        log.info("kafka sendMessage start");
        ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                log.info("kafka sendMessage success topic = {}, data = {}",topic, data);
            }
        });
        log.info("kafka sendMessage end");
    }
}

This is a service class, where sendMessage(String topic, String data) is responsible for sending messages. The processMessage function adds the @ KafkaListener annotation, indicating that this is a listening method. Specify the topic name in the annotation. When there is a new message in the corresponding topic, the processMessage method will be called, and the parameter is the new message in the topic. This process is asynchronous.

2.5 write controller class

TestController class:

@Controller
@RequestMapping("/api")
@Api(tags="RequestParam Usage test interface class")
public class TestController {
    @Autowired
    KafkaTestService kafkaTestService;
    /**
     * Test custom listener entry method
     * @return Current number of people
     */
    @GetMapping("/testKafka")
    @ResponseBody
    public String testKafka(String name) {
        kafkaTestService.sendMessage("testDemo",name);
        return name;
    }
}

2.6 browser input test

Visit: http://localhost:8080/api/testKafka?name=houpeibin333
To view the console log:

Meanwhile, cmd window 2: Consumer:
In Kafka console consumer.bat -- bootstrap server localhost: 9092 -- topic testdemo, you should also receive information:

3. Question: is zookeeper not used?

Here, we found a problem. We didn't use zookeeper. Why install it?
In fact, kafka service has been used when starting: in kafka/config/server.properties configuration file, there is a line configuration: zookeeper.connect=localhost:2181, which configures the address to connect zookeeper. If zookeeper is running on another server, this place needs to be modified.

When producers and consumers are running on different machines, they are all registered with zookeeper, so when we subscribe to messages, we only need to register with zookeeper, and we don't need to write the producer's ip address dead.

4. summary

This paper introduces the installation of kafka windows integrated environment, explains the integration method of Springboot with examples, but it's just the introduction knowledge. Later, there will be a special article explaining the practical application of kafka in the project, please pay attention to this blog, thank you!!!

Published 28 original articles, won praise 6, visited 1043
Private letter follow

Keywords: kafka Zookeeper Apache Windows

Added by jbrave on Sun, 23 Feb 2020 11:54:17 +0200