Kafka theory + practice + Zookeeper + depth + error set of Internet of things

Internet of things Kafka configuration Kafka theory + Zookeeper deep error set

It is used in the company's Internet of things project. kafka is used as message middleware and kafka is used as the company's new technology. Currently, it uses single point without cluster.

Kafka configuration of Internet of things

Zookeeper: used to register and discover kafka, using the version apache-zookeeper-3.7.0-bin tar

kafka: use version kafka_2.13-2.8.0

Eagle: view and manage zk, Kafka, topic, etc. using version kafka-eagle-bin-1.3.7

mysql: manage and monitor kafka data, using version 5.5

#237 server environment variables
#JDK_HOME
export JAVA_HOME=/etc/alternatives/java_sdk_1.8.0
export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/project/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#EAGLE_HOME
export KE_HOME=/project/module/eagle
export PATH=$PATH:$KE_HOME/bin
-----------------------------------------------------------------------------------
# zookeeper configuration
# zkData directory configuration id
myid=0
# zoo.cfg configuration file
dirData=/project/module/zookeeper/zkData
-----------------------------------------------------------------------------------
# kafka configuration
# kafka configuration command
 Cluster configuration needs to be configured in kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
#  Change to
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
 export JMX_PORT="9999"
 #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

# kafka config modify port 9849
1,service.properties             		port = 9849    If not specified, follow the default 9092
2,connect-distributed.properties        bootstrap.servers=localhost:9849
3,producer.properties                   bootstrap.servers=localhost:9849
4,connect-standalone.properties         bootstrap.servers=localhost:9849
5,consumer.properties                   bootstrap.servers=localhost:9849
-----------------------------------------------------------------------------------
# eagle configuration
# The 9850 port configured by eagle starts
# localhost of zookeeper connection: 2181
# mysql connection 222.85.156.248:9811, using version 5.5
# start-up http://222.85.156.248:9850/ke
-----------------------------------------------------------------------------------
# mysql configuration
# mysql container startup command:
# Expose port 9811, container volume address / project / module / Kafka / Ke MySQL
docker run --name mysql \
    --restart=always \
    -p 9811:3306 \
    -v /project/module/kafka/ke-mysql:/var/lib/mysql \ 
    -e MYSQL_ROOT_PASSWORD=123456 \
    -e TZ=Asia/Shanghai \
    -d mysql:5.5

Kafka error

  • Both zk and kafka are run based on jdk, and the jdk needs to be installed in advance.

  • If Eagle connects to mysql, version kafka-eagle-bin-1.3.7 requires mysql 5.0 Less than 7 (only 5.5 has been measured).

  • com.mysql.jdbc.Driver is in MySQL connector Java 5, com mysql. cj. jdbc. Driver is in MySQL connector Java 6, and 5 is used for database driver.

  • The firewall closes the ports required for response, such as 8048, kafka9092 and zk2181 of eagle.

  • The connection of zookeeper timed out. You can try to delete the version-2 folder and zookeeper in the dirData directory of zookeeper_ server. Delete these two and start again

  • Error while fetching data with correlation ID: enable listening and set the configured listening port (port 9849 is configured for the Internet of things)

    [the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-qk7mlop3-1624928667223) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210626112653270. PNG)]

  • kafka cannot be closed. Use jps to view the kafka port and kill -9 kafka port to kill the process.

    [root@iZ88rapw1kcZ ~]# jps
    9511 Kafka
    30377 Jps
    7130 QuorumPeerMain
    17901 Bootstrap
    
    kill -9 9511
    
  • The broker is trying to join the wrong cluster Configured zookeeper. Connect may be wrong, and try to delete meta. Under / project/module/kafka/logs(log.dir) Properties file

Kafka theory

definition

kafka is a distributed message queue based on publish / subscribe mode, which is mainly used in the field of big data real-time processing.

Message queue

Benefits of using message queuing

  • decoupling

    It allows you to expand or modify the processing of both sides independently, as long as you ensure that they comply with the same interface constraints.

  • Recoverability

    Failure of some components of the system will not affect the whole system. Message queuing reduces the coupling between processes, so even if a process processing messages hangs, the messages added to the queue can still be processed after the system recovers

  • buffer

    It helps to control and optimize the speed of data flow through the system and solve the inconsistency between the processing speed of production messages and consumption messages

  • Flexibility & peak processing power

    In the case of a sharp increase in traffic, applications still need to continue to play a role. Using message queuing can make key components withstand sudden access pressure.

  • asynchronous communication

    Some messages don't need to process information immediately. You can put the information in the queue, but don't process it immediately, and then process them when needed.

Two modes of message queuing

Point to point mode (one-to-one, consumers actively pull data, and the message is cleared after receiving the message)

The message producer produces messages and sends them to kafka's Queue. Then, the message fee consumes the messages from the Queue. After consumption, the messages leave the Queue. The messages cannot be consumed repeatedly. The Queue supports multiple consumers, but only one can be consumed.

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-0p5jgnqf-1624928667226) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624112849946. PNG)]

Publish / subscribe mode (one to many, the message will not be cleared after consumer consumption data)

When a message producer publishes a message to a Topic, multiple consumers will subscribe, and the message published to the Topic will be consumed by all subscribers.

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-c0k5o7yd-1624928667228) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210624112909394. PNG)]

Kafka infrastructure

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-kcl9brki-1624928667229) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-202106241113111396. PNG)]

  • Producer: the message producer is the client that sends messages to Kafka Broker.
  • Consumer: message consumer, the client that fetches messages from Kafka Broker.
  • Consumer Group (CG): Consumer Group, consisting of multiple consumers; Each consumer in the Consumer Group is responsible for consuming data in different partitions. A Partition can only be consumed by consumers in one group; Consumer groups do not affect each other;
  • Broker: a Kafka server is a broker. A cluster consists of multiple brokers. A broker can have multiple topics.
  • Topic: it can be understood as a queue. Both producers and consumers face the same topic. A topic can be distributed to multiple brokers.
  • Partition: partition. A Topic can be divided into multiple partitions. Each partition is an ordered queue.
  • Replica: replica, which is used to ensure that the Partition data on the node (Broker) will not be lost in case of node failure.
    • leader: Master, at least one master copy of each partition, the object of data sent by the producer, and consumer consumption data are all leaders
    • Follower: slave, the "slave" in multiple replicas of each partition, and synchronize data from the "master" leader in real time; When the leader fails, a follower will become a new follower.

Kafka actual combat

Installation deployment

Cluster planning

Machine 0 192.168.124.135Machine 1 192.168.124.136Machine 2 192.168.124.137
zookeeperzookeeperzookeeper
kafkakafkakafk

Zookeeper: used to register and discover kafka. Version zookeeper-3.4.10 is used

kafka: use version kafka_2.11-0.11.0.0

Eagle: view and manage zk, Kafka, topic, etc. using version kafka-eagle-bin-1.3.7

Cluster deployment

Zookeeper basic introduction + deployment

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-wnx8ljdu-1624928667230) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624154348735. PNG)]

## Zookeeper features
# 1) Zookeeper: a cluster composed of one Leader and multiple followers. 
# 2) As long as more than half of the nodes in the cluster survive, the Zookeeper cluster can serve normally. 
# 3) Global data consistency: each Server saves a copy of the same data. No matter which Server the Client connects to, the data is consistent. 
# 4) The update requests are made in sequence, and the update requests from the same Client are executed in sequence according to their sending order. 
# 5) Data update is atomic. A data update either succeeds or fails. 6) Real time. Within a certain time range, the Client can read the latest data.


## Zookeeper cluster installation
#(1) Unzip the Zookeeper installation package to the / opt/module / directory
tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

#(2) Synchronize the contents of / opt/module/zookeeper-3.4.10 directory to other servers

#(3) Create zkData in / opt/module/zookeeper-3.4.10 /
mkdir -p zkData

#(4) Create a myid file in the / opt/module/zookeeper-3.4.10/zkData directory and add the myid file. Be sure to create it in linux. It may be garbled in notepad + +
touch myid

#(5) Edit myid file
vi myid
0

#(6) Copy the configured zookeeper to other machines, and modify the contents of myid file as 1 and 2 on the other two servers respectively

#(7) Rename the zoo in the directory / opt/module/zookeeper-3.4.10/conf_ sample. CFG is zoo cfg
mv zoo_sample.cfg zoo.cfg

#(8) Open zoo Cfg file
vi zoo.cfg
---------------------------------------------------------------------------------------
	#Modify data storage path configuration
	dataDir=/opt/module/zkData 
    #Add the following configuration and replace it with the IP of the corresponding server
    #######################cluster##########################
    server.2=192.168.124.135:2888:3888
    server.3=192.168.124.136:2888:3888
    server.4=192.168.124.137:2888:3888
 ---------------------------------------------------------------------------------------
    
#(9) Sync zoo CFG configuration files to other servers

#(10) Interpretation of cluster configuration parameters
# A is a number, which indicates the number of servers. In the cluster mode, a file myid is configured. This file is in the dataDir directory. There is a data in this file, which is the value of A. Zookeeper reads this file when it starts to get the data and zoo Compare the configuration information in CFG to determine which server it is. 
# B is the address of the server;
# C is the port where the Follower server exchanges information with the Leader server in the cluster;
# D is that in case the Leader server in the cluster hangs up, a port is needed to re elect and select a new Leader, and this port is the port used to communicate with each other during the election.
server.A=B:C:D

#(11) Start Zookeeper separately
bin/zkServer.sh start

#(12) View status
bin/zkServer.sh status

#(13) Start client
bin/zkCli.sh
Kafka cluster deployment
# 1) kafka extract the installation package
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

# 2) Modify the unzipped file name
mv kafka_2.11-0.11.0.0/ kafka

# 3) Create the logs folder in the / opt/module/kafka directory
mkdir logs

# 4) Modify profile
vi config/server.properties
--------------------------------------------------------
#The globally unique number of the broker, which cannot be repeated, for example, broker id=1,broker.id=2
broker.id=0
#Delete topic function enable
delete.topic.enable=true
#Number of threads processing network requests
num.network.threads=3
#Off the shelf quantity used to process disk IO
num.io.threads=8
#Buffer size of send socket
socket.send.buffer.bytes=102400
#Buffer size of receive socket
socket.receive.buffer.bytes=102400
#Buffer size of the requested socket
socket.request.max.bytes=104857600
#kafka running log storage path
log.dirs=/project/module/kafka/logs
#topic number of partitions on the current broker
num.partitions=1
#Number of threads used to recover and clean up data under data
num.recovery.threads.per.data.dir=1
#The maximum length of time segment files are retained. The timeout will be deleted
log.retention.hours=168
#Configure Zookeeper cluster address
zookeeper.connect=192.168.124.135:2181,192.168.124.136:2181,192.168.124.137:2181
--------------------------------------------------------

# 5) Configure environment variables
--------------------------------------------------------
vi /etc/profile

    #KAFKA_HOME
    export KAFKA_HOME=/project/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
--------------------------------------------------------

# 6) Other servers complete steps 1-6 in turn, * * broker ID cannot be repeated**

# 7) Start kafka on the three server nodes in turn
bin/kafka-server-start.sh -daemon config/server.properties #-daemon background startup
bin/kafka-server-start.sh  config/server.properties
bin/kafka-server-stop.sh  config/server.properties

Kafka Eagle service monitoring deployment

# (1) Modify the kafka start command In SH command
--------------------------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
--------------------------------------------------------
#  Change to
--------------------------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
 export JMX_PORT="9999"
 #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
--------------------------------------------------------

# (2) After modification, other servers should also be modified before starting Kafka

# (3) Upload the compressed package Kafka Eagle bin-1.3.7 tar. GZ to the cluster / opt/module directory, and extract it to the local directory
tar -zxvf kafka-eagle-bin-1.3.7.tar.gz

# (3) Enter the directory you just unzipped and put kafka-eagle-web-1.3.7-bin tar. GZ unzip to / opt/module
tar -zxvf kafka-eagle-web-1.3.7-bin.tar.gz
# (4) Modify name

mv kafka-eagle-web-1.3.7/ eagle
# (5) Give startup file execution permission
chmod 777 ke.sh

# (6) Modify profile
--------------------------------------------------------
######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.124.135:2181,192.168.124.136:2181,192.168.124.137:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver #Note that the driver version is currently connected to 5.5mysql and is driven by version 5
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
--------------------------------------------------------

# (7) Add environment variable
--------------------------------------------------------
vi /etc/profile
    export KE_HOME=/project/module/eagle
    export PATH=$PATH:$KE_HOME/bin
source /etc/profile
--------------------------------------------------------

# (8) Startup note: ZK and KAFKA need to be started before startup
bin/ke.sh start

Kafka command line operation

# 1) View all topic s in the current server
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --list

# 2) Create topic comments: - topic definition topic name -- replication factor definition number of copies -- partitions definition number of partitions
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --create --replication-factor 3 --partitions 1 --topic first

# 3) Delete topic
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --delete --topic first

# 4) Send message
bin/kafka-console-producer.sh --broker-list 192.168.124.135:9092 --topic first

# 5) Comments on consumption message: - from beginning: all previous data in the topic will be read out, - bootstrap server: new version writing method
	bin/kafka-console-consumer.sh --zookeeper 192.168.124.135:2181 --topic first
	bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.135:9092 --from-beginning --topic first
	
# 6) View the details of a Topic
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --describe --topic first

# 7) Modify the number of partitions
bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --alter --topic first --partitions 6

Kafka architecture in depth

[the external chain image transfer fails. The source station may have anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-4zk6ejrt-1624928667231) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-202106241501083. PNG)]

Topic: it is a logical concept. Messages are classified by topic. Producer production messages and consumer consumption messages are all topic oriented.

Partition: a physical concept. Each partition corresponds to a log file. The log file stores the data produced by Produce. The data will be appended to the end of the log, and each data has its own offset

Consumer Group (CG): consumers in the Consumer Group will record the offset of consumption in real time, which is used to recover the last position and continue consumption in case of error.

Segment:

  • When the Partition data is added continuously and reaches a certain value, it will be segmented automatically. The Partition and index mechanism is adopted to find the data faster.
  • Each segment corresponds to two files, ". Index" file and ". Log" file. The index and log files are named after the offset of the first message of the current segment.
  • If the file is in a folder, the folder name rule is: topic name + partition serial number. For example, if the topic has three partitions, the corresponding folders are first-0, first-1 and first-2.

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-smwpxzfg-1624928667231) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624150729555. PNG)]

  • index file: the sequence number in this segment file is on the left, and the actual physical address in the data file is on the right. The address offset depends on the length of the message.
  • log file: message data file. On the left is the actual physical address offset of the message in this segment file, and on the right is the global sequence number of the message in the whole partition

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-vm6bddf0-1624928667232) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624152314643. PNG)]

Kafka producer

Partition policy

Reason for partition
  • (1) It is convenient to expand in the cluster. Each Partition can be adjusted to adapt to its machine, and a topic can be composed of multiple partitions. Therefore, the whole cluster can adapt to data of any size;
  • (2) Concurrency can be improved because you can read and write in Partition units.
Principle of zoning

We need to encapsulate the data sent by the producer into a producer record object.

  • (1) When the partition is specified, the specified value is directly used as the partition value;
  • (2) If the partition value is not specified but there is a key, the hash value of the key and the partition number of topic are taken to obtain the partition value;
  • (3) When there is no partition value and no key value, an integer is randomly generated for the first call (which will increase automatically for each subsequent call), and the partition value is obtained by subtracting this value from the total number of partitions available for topic, which is commonly known as the round robin algorithm.

Data reliability assurance

In order to ensure that the data sent by the producer can be reliably sent to the specified topic, each partition of the topic needs to send an ack to the producer after receiving the data sent by the producer. If the producer receives an ack, it will send the next round of data, otherwise it will send the data again.

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-is3k7h9n-1624928667232) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210626113937150. PNG)]

Replica data synchronization policy
programmeadvantageshortcoming
Send acks when more than half of the synchronization is completedLow delayWhen electing a new leader, the fault of N nodes is tolerated, and 2n+1 secondary nodes are required
Send acks after all synchronization is completedWhen electing a new leader, it tolerates the failure of N nodes and requires n+1 copiesHigh delay

Kafka chose the second option for the following reasons:

  • Similarly, in order to tolerate the failure of N nodes, the first scheme needs 2n+1 copies, while the second scheme only needs n+1 copies, and each partition of Kafka has a large amount of data. The first scheme will cause a large amount of data redundancy.
  • Although the network delay of the second scheme will be relatively high, the network delay has little impact on Kafka.
ISR

To solve the problem of slow synchronization of some followers, the leader maintains a dynamic in sync replica set (ISR), which means a set of followers synchronized with the leader. When the follower in ISR completes data synchronization, the leader will send ack to the follower. If the follower fails to synchronize data with the leader for a long time, the follower will propose ISR and change the time threshold to replica lag. time. The MS parameter determines that if the leader hangs up, the leader will be elected from the ISR.

ACK response mechanism

acks parameter configuration:

  • 0 (At Most Once): the producer does not have to wait for the broker's ack, which provides a minimum delay. However, if the broker fails, this operation may lead to data loss.
  • 1: The producer waits for the broker's ack. The leader of the partition has written data, but the follower is not synchronized. If the leader fails, this operation will lead to data loss
  • -1 (At Least Once): the producer waits for the broker's ack. Both the leader and the follower have written and synchronized data, but when the ACK is returned, the leader fails, causing the producer to resend the data and duplicate the data.
Fault handling details

LEO (log end offset): the last offset of each replica.

HW (High Watermark): refers to the largest offset that consumers can see and the smallest LEO in the ISR queue.

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-ej8ek8h4-1624928667233) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210625104832309. PNG)]

  • Follower failure: after a follower failure, it will be kicked out of the ISR. After the follower recovers, the follower will read the last HW recorded on the local disk, cut off the part of the log file higher than HW, and synchronize with the leader from HW. When the LEO of the follower is higher than the HW of the partition, it can rejoin the ISR.
  • Leader failure: after a leader failure, a new leader will be selected from the ISR. After the election, in order to ensure data consistency, the rest of the follower s will first cut off the part of the log file higher than HW, and then synchronize the data from the new leader.
  • Note: this can only ensure the data consistency between replicas, and does not guarantee that the data is not lost or duplicated.

Exactly Once semantics

  • When ACK is 0, it is At Most Once, which can ensure that data is not repeated, but data may be lost.
  • When the ACK is - 1, it is At Least Once, which can ensure that the data is not lost, but the data will be repeated.
  • Exactly Once requires that data is neither lost nor repeated. After version 0.11, Exactly Once is realized through idempotency combined with At least Once.
  • Enabling idempotency requires enable in the producer parameter Idompotent is set to true. The previous de duplication is performed in the downstream, so the performance is low. After 0.11, the de duplication is performed in the upstream.
  • During initialization, the producer will allocate a PID and send it to the same Partition, and the message will be attached with a Sequence Number. The Broker side will cache < PID, Partition, seqnumber >, and only one message with the same primary key will be cached.
  • The PID will change after restart, and different partitions also have different primary keys, so idempotency cannot guarantee the exact once of cross Partition sessions.

Kafka consumer

Consumption mode

  • Consumers use the pull mode to read data from broker s, and the push mode is difficult to meet all consumer s.
  • The disadvantage of pull mode is that if Kafka has no data, the consumer will fall into a loop and always return empty data. (in view of this, Kafka's consumers will pass in a duration parameter timeout when consuming data. If there is no data available for consumption, the consumer will wait for a period of time before returning. This duration is timeout.)

Partition allocation policy

Allocation timing

There are two default partition allocation strategies in Kafka: Range and RoundRobin. Kafka will make a partition allocation when the following events occur:

  • New consumers in the same Consumer Group
  • The consumer leaves the current Consumer Group, including shutdowns or crashes
  • New partition for subscribed topics

offset maintenance

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-x1e9hs6k-1624928667233) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210625140310334. PNG)]

Consumers in the same consumer group can only have one consumer at a time, consumer Group. In the properties configuration file ID grouping.

Kafka efficient data reading and writing

Sequential write disk

Kafka's producer production data should be written to the log file. The writing process is to append it to the end of the file in order. Data on the official website shows that for the same disk, sequential writing can reach 600M/s, while random writing is only 100K/s. This is related to the mechanical mechanism of the disk. The reason why sequential writing is fast is that it saves a lot of head addressing time.

Zero replication technology

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-kftjqopl-1624928667233) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210625141831997. PNG)]

The role of Zookeeper in Kafka

zookeeper will elect a broker as a conroller, which is responsible for managing the uplink and downlink of the cluster broker, the partition replica allocation of all topic s and the leader election.

Kafka transaction

Kafka introduced transaction support from version 0.11. Transactions can guarantee the base of Kafka in the exact once semantics

On this basis, production and consumption can cross partitions and sessions, either all succeed or all fail.

Producer transaction

In order to realize cross partition and cross session transactions, it is necessary to introduce a globally unique Transaction ID and bind the PID obtained by the producer with the Transaction ID. In this way, when the producer is restarted, the original PID can be obtained through the ongoing Transaction ID. In order to manage transactions, Kafka introduces a new component Transaction Coordinator. Producer obtains the task status corresponding to the Transaction ID by interacting with the Transaction Coordinator. The Transaction Coordinator is also responsible for writing the Transaction information into an internal Topic of Kafka. In this way, even if the entire service is restarted, the Transaction status in progress can be restored because the Transaction status is saved, so as to continue.

Consumer transaction

Distribution and leader election * *.

Kafka transaction

Kafka introduced transaction support from version 0.11. Transactions can guarantee the base of Kafka in the exact once semantics

On this basis, production and consumption can cross partitions and sessions, either all succeed or all fail.

Producer transaction

In order to realize cross partition and cross session transactions, it is necessary to introduce a globally unique Transaction ID and bind the PID obtained by the producer with the Transaction ID. In this way, when the producer is restarted, the original PID can be obtained through the ongoing Transaction ID. In order to manage transactions, Kafka introduces a new component Transaction Coordinator. Producer obtains the task status corresponding to the Transaction ID by interacting with the Transaction Coordinator. The Transaction Coordinator is also responsible for writing the Transaction information into an internal Topic of Kafka. In this way, even if the entire service is restarted, the Transaction status in progress can be restored because the Transaction status is saved, so as to continue.

Consumer transaction

The above transaction mechanism is mainly considered from the Producer. For the Consumer, the transaction guarantee will be relatively weak. In particular, it is impossible to ensure that the Commit information is accurately consumed. This is because the Consumer can ask for any information through offset, and the life cycles of different segment files are different. Messages of the same transaction may be deleted after restart.

Keywords: Java kafka

Added by nblackwood on Sat, 22 Jan 2022 21:25:10 +0200