Message queuing Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

1 Overview

1.1 basic concepts

1.1.1 Topic

By classifying messages through Topic mechanism, we can think that each Topic is a queue.

1.1.2 Partition

Each Topic can have multiple partitions, which are mainly designed to improve concurrency. Under the same Topic, different partitions can receive messages concurrently and allow consumers to pull messages concurrently. As many partitions as there are, there is as much concurrency.

On the Kafka server, partitions exist in the form of file directories. In each partition directory, Kafka will split the partition into multiple logsegments according to the configured size and configuration cycle. Each segment is composed of three parts:

- Log file:*.log
- Displacement index file:*.index
- Time index file:*.timeindex

Where * log is used to store the data content of the message itself, * index stores the location of the message in the file (including the logical offset and physical storage offset of the message), * timeindex stores the mapping relationship between the message creation time and the corresponding logical address.

The partition is split into multiple segments to control the size of the storage file. It can be easily mapped to memory through the operating system mmap mechanism to improve the efficiency of writing and reading. Another advantage is that when the system wants to clear the expired data, it can directly delete the expired segment files.

If the location information of each message should be saved in the index, the size of the index file itself can easily become very large. Therefore, Kafka designs the index as a sparse index to reduce the size of the index file.

1.1.3 Broker agent

Messages saved in a cluster are called Kafka. Each server in the cluster is a Broker.

1.1.4 Replication replicas

Number of redundant messages. The number of brokers in the cluster cannot be exceeded.

1.2 basic operation

1.2.1 Topic related
# Create Topic 
# --Topic topic name avoid using [] And [.] number
# --Number of replication factor replicas (cannot exceed the number of broker nodes)
# --Partitions number of partitions (concurrent)
./bin/kafka-topics.sh --create \
--topic UserDataQueue \
--replication-factor 3 \
--partitions 5 \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# View Topic
./bin/kafka-topics.sh --list \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# Modify Topic
# Delete Topic
1.2.2 Message related
# send message
# --Topic specifies the target topic
./bin/kafka-console-producer.sh \
--topic UserDataQueue \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094

# Pull message
# --From beginning (get the full amount of existing data)
./bin/kafka-console-consumer.sh \
--topic UserDataQueue \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--from-beginning

2. Cluster configuration

Kafka cluster depends on Zookeeper.

2.1 Zookeeper configuration and startup

# Parameters to be modified

# the directory where the snapshot is stored.
dataDir=/kafka/zkdata
# the port at which the clients will connect
clientPort=2182
# start-up
./bin/zookeeper-server-start.sh -daemon /kafka/zookeeper.properties

2.2 Kafka configuration and startup

# Parameters to be modified

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1  # ID must be unique in the same cluster

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092 # the port number of the same host cannot be the same

# A comma separated list of directories under which to store log files
log.dirs=/kafka/data01  # The log storage directory needs to be isolated

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2182  # For Zookeeper connection address, see 2.1 zk configuration
# Kafka start

# broker-1
./bin/kafka-server-start.sh -daemon /kafka/server01.properties

# broker-2
./bin/kafka-server-start.sh -daemon /kafka/server02.properties

# broker-3
./bin/kafka-server-start.sh -daemon /kafka/server03.properties

2.3 Zookeeper visualization

PrettyZoo It is a Zookeeper graphical management client based on Apache cursor and JavaFX.

As can be seen from the figure below, the three brokers in the cluster are started normally.

2.4 Kafka visualization and monitoring

2.4.1 AKHQ

Kafka visualization system for managing Topic, Topic message, consumer group, etc., relevant documents: https://akhq.io/

2.4.2 Kafka Eagle

A simple and efficient monitoring system. Related documents: http://www.kafka-eagle.org/index.html

Kafka Eagle has its own monitoring screen.

3 integration with Spring Boot

Spring Boot version: 2.4.4.

3.1 Spring Boot

3.1.1 add dependency
implementation 'org.springframework.kafka:spring-kafka'
3.1.2 configuration file
spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    producer:
      client-id: kfk-demo
      retries: 3
3.1.3 message sending
@RestController
public class IndexController {

    @Autowired
    KafkaTemplate<Object, Object> kafkaTemplate;

    @GetMapping
    public String index() {
        int rdm = new Random().nextInt(1000);
        kafkaTemplate.send("UserDataQueue", new UserData("", rdm));
        return "hello world";
    }

    @GetMapping("index2")
    public String index2() {
		// Send string mode
        kafkaTemplate.send("UserDataTopic", new Gson().toJson(new UserData("apple", 23)));
        return "ok";
    }
}
3.1.4 message reception
@Component
@KafkaListener(
        id = "kfk-demo-userdata",
        topics = {"UserDataQueue"},
        groupId = "kfk-demo-group",
        clientIdPrefix = "kfk-demo-client")
public class KfkListener {

    @KafkaHandler
    public void process(@Payload UserData ud,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(String.format("topic: %s, partition: %d, userData: %s", topic, partition, ud));
    }

    @KafkaHandler(isDefault = true)
    public void process(Object obj) {
        System.out.println(obj);
    }
}

// Receive string mode
@Slf4j
@Component
@KafkaListener(id = "kfk-demo2", topics = {"UserDataTopic"})
public class KfkUserDataTopicListener {

    @KafkaHandler
    public void process(String userDataStr) {
        UserData userData = new Gson().fromJson(userDataStr, UserData.class);
        log.info("username: {}, age: {}", userData.getUsername(), userData.getAge());
    }
}
3.1.5 automatic creation of topic
@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic userDataTopic() {
        return new NewTopic("UserDataTopic", 3, (short) 1);
    }
}

Reference documents

Keywords: Java Spring Boot Zookeeper bash bootstrap

Added by kidestranged on Tue, 08 Mar 2022 16:16:47 +0200