Internet of things Kafka configuration Kafka theory + Zookeeper deep error set
It is used in the company's Internet of things project. kafka is used as message middleware and kafka is used as the company's new technology. Currently, it uses single point without cluster.
Kafka configuration of Internet of things
Zookeeper: used to register and discover kafka, using the version apache-zookeeper-3.7.0-bin tar
kafka: use version kafka_2.13-2.8.0
Eagle: view and manage zk, Kafka, topic, etc. using version kafka-eagle-bin-1.3.7
mysql: manage and monitor kafka data, using version 5.5
#237 server environment variables #JDK_HOME export JAVA_HOME=/etc/alternatives/java_sdk_1.8.0 export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/ export PATH=$PATH:$JAVA_HOME/bin #KAFKA_HOME export KAFKA_HOME=/project/module/kafka export PATH=$PATH:$KAFKA_HOME/bin #EAGLE_HOME export KE_HOME=/project/module/eagle export PATH=$PATH:$KE_HOME/bin ----------------------------------------------------------------------------------- # zookeeper configuration # zkData directory configuration id myid=0 # zoo.cfg configuration file dirData=/project/module/zookeeper/zkData ----------------------------------------------------------------------------------- # kafka configuration # kafka configuration command Cluster configuration needs to be configured in kafka-server-start.sh if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi # Change to if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi # kafka config modify port 9849 1,service.properties port = 9849 If not specified, follow the default 9092 2,connect-distributed.properties bootstrap.servers=localhost:9849 3,producer.properties bootstrap.servers=localhost:9849 4,connect-standalone.properties bootstrap.servers=localhost:9849 5,consumer.properties bootstrap.servers=localhost:9849 ----------------------------------------------------------------------------------- # eagle configuration # The 9850 port configured by eagle starts # localhost of zookeeper connection: 2181 # mysql connection 222.85.156.248:9811, using version 5.5 # start-up http://222.85.156.248:9850/ke ----------------------------------------------------------------------------------- # mysql configuration # mysql container startup command: # Expose port 9811, container volume address / project / module / Kafka / Ke MySQL docker run --name mysql \ --restart=always \ -p 9811:3306 \ -v /project/module/kafka/ke-mysql:/var/lib/mysql \ -e MYSQL_ROOT_PASSWORD=123456 \ -e TZ=Asia/Shanghai \ -d mysql:5.5
Kafka error
-
Both zk and kafka are run based on jdk, and the jdk needs to be installed in advance.
-
If Eagle connects to mysql, version kafka-eagle-bin-1.3.7 requires mysql 5.0 Less than 7 (only 5.5 has been measured).
-
com.mysql.jdbc.Driver is in MySQL connector Java 5, com mysql. cj. jdbc. Driver is in MySQL connector Java 6, and 5 is used for database driver.
-
The firewall closes the ports required for response, such as 8048, kafka9092 and zk2181 of eagle.
-
The connection of zookeeper timed out. You can try to delete the version-2 folder and zookeeper in the dirData directory of zookeeper_ server. Delete these two and start again
-
Error while fetching data with correlation ID: enable listening and set the configured listening port (port 9849 is configured for the Internet of things)
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-qk7mlop3-1624928667223) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210626112653270. PNG)]
-
kafka cannot be closed. Use jps to view the kafka port and kill -9 kafka port to kill the process.
[root@iZ88rapw1kcZ ~]# jps 9511 Kafka 30377 Jps 7130 QuorumPeerMain 17901 Bootstrap kill -9 9511
-
The broker is trying to join the wrong cluster Configured zookeeper. Connect may be wrong, and try to delete meta. Under / project/module/kafka/logs(log.dir) Properties file
Kafka theory
definition
kafka is a distributed message queue based on publish / subscribe mode, which is mainly used in the field of big data real-time processing.
Message queue
Benefits of using message queuing
-
decoupling
It allows you to expand or modify the processing of both sides independently, as long as you ensure that they comply with the same interface constraints.
-
Recoverability
Failure of some components of the system will not affect the whole system. Message queuing reduces the coupling between processes, so even if a process processing messages hangs, the messages added to the queue can still be processed after the system recovers
-
buffer
It helps to control and optimize the speed of data flow through the system and solve the inconsistency between the processing speed of production messages and consumption messages
-
Flexibility & peak processing power
In the case of a sharp increase in traffic, applications still need to continue to play a role. Using message queuing can make key components withstand sudden access pressure.
-
asynchronous communication
Some messages don't need to process information immediately. You can put the information in the queue, but don't process it immediately, and then process them when needed.
Two modes of message queuing
Point to point mode (one-to-one, consumers actively pull data, and the message is cleared after receiving the message)
The message producer produces messages and sends them to kafka's Queue. Then, the message fee consumes the messages from the Queue. After consumption, the messages leave the Queue. The messages cannot be consumed repeatedly. The Queue supports multiple consumers, but only one can be consumed.
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-0p5jgnqf-1624928667226) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624112849946. PNG)]
Publish / subscribe mode (one to many, the message will not be cleared after consumer consumption data)
When a message producer publishes a message to a Topic, multiple consumers will subscribe, and the message published to the Topic will be consumed by all subscribers.
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-c0k5o7yd-1624928667228) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210624112909394. PNG)]
Kafka infrastructure
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-kcl9brki-1624928667229) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-202106241113111396. PNG)]
- Producer: the message producer is the client that sends messages to Kafka Broker.
- Consumer: message consumer, the client that fetches messages from Kafka Broker.
- Consumer Group (CG): Consumer Group, consisting of multiple consumers; Each consumer in the Consumer Group is responsible for consuming data in different partitions. A Partition can only be consumed by consumers in one group; Consumer groups do not affect each other;
- Broker: a Kafka server is a broker. A cluster consists of multiple brokers. A broker can have multiple topics.
- Topic: it can be understood as a queue. Both producers and consumers face the same topic. A topic can be distributed to multiple brokers.
- Partition: partition. A Topic can be divided into multiple partitions. Each partition is an ordered queue.
- Replica: replica, which is used to ensure that the Partition data on the node (Broker) will not be lost in case of node failure.
- leader: Master, at least one master copy of each partition, the object of data sent by the producer, and consumer consumption data are all leaders
- Follower: slave, the "slave" in multiple replicas of each partition, and synchronize data from the "master" leader in real time; When the leader fails, a follower will become a new follower.
Kafka actual combat
Installation deployment
Cluster planning
Machine 0 192.168.124.135 | Machine 1 192.168.124.136 | Machine 2 192.168.124.137 |
---|---|---|
zookeeper | zookeeper | zookeeper |
kafka | kafka | kafk |
Zookeeper: used to register and discover kafka. Version zookeeper-3.4.10 is used
kafka: use version kafka_2.11-0.11.0.0
Eagle: view and manage zk, Kafka, topic, etc. using version kafka-eagle-bin-1.3.7
Cluster deployment
Zookeeper basic introduction + deployment
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-wnx8ljdu-1624928667230) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624154348735. PNG)]
## Zookeeper features # 1) Zookeeper: a cluster composed of one Leader and multiple followers. # 2) As long as more than half of the nodes in the cluster survive, the Zookeeper cluster can serve normally. # 3) Global data consistency: each Server saves a copy of the same data. No matter which Server the Client connects to, the data is consistent. # 4) The update requests are made in sequence, and the update requests from the same Client are executed in sequence according to their sending order. # 5) Data update is atomic. A data update either succeeds or fails. 6) Real time. Within a certain time range, the Client can read the latest data. ## Zookeeper cluster installation #(1) Unzip the Zookeeper installation package to the / opt/module / directory tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/ #(2) Synchronize the contents of / opt/module/zookeeper-3.4.10 directory to other servers #(3) Create zkData in / opt/module/zookeeper-3.4.10 / mkdir -p zkData #(4) Create a myid file in the / opt/module/zookeeper-3.4.10/zkData directory and add the myid file. Be sure to create it in linux. It may be garbled in notepad + + touch myid #(5) Edit myid file vi myid 0 #(6) Copy the configured zookeeper to other machines, and modify the contents of myid file as 1 and 2 on the other two servers respectively #(7) Rename the zoo in the directory / opt/module/zookeeper-3.4.10/conf_ sample. CFG is zoo cfg mv zoo_sample.cfg zoo.cfg #(8) Open zoo Cfg file vi zoo.cfg --------------------------------------------------------------------------------------- #Modify data storage path configuration dataDir=/opt/module/zkData #Add the following configuration and replace it with the IP of the corresponding server #######################cluster########################## server.2=192.168.124.135:2888:3888 server.3=192.168.124.136:2888:3888 server.4=192.168.124.137:2888:3888 --------------------------------------------------------------------------------------- #(9) Sync zoo CFG configuration files to other servers #(10) Interpretation of cluster configuration parameters # A is a number, which indicates the number of servers. In the cluster mode, a file myid is configured. This file is in the dataDir directory. There is a data in this file, which is the value of A. Zookeeper reads this file when it starts to get the data and zoo Compare the configuration information in CFG to determine which server it is. # B is the address of the server; # C is the port where the Follower server exchanges information with the Leader server in the cluster; # D is that in case the Leader server in the cluster hangs up, a port is needed to re elect and select a new Leader, and this port is the port used to communicate with each other during the election. server.A=B:C:D #(11) Start Zookeeper separately bin/zkServer.sh start #(12) View status bin/zkServer.sh status #(13) Start client bin/zkCli.sh
Kafka cluster deployment
# 1) kafka extract the installation package tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/ # 2) Modify the unzipped file name mv kafka_2.11-0.11.0.0/ kafka # 3) Create the logs folder in the / opt/module/kafka directory mkdir logs # 4) Modify profile vi config/server.properties -------------------------------------------------------- #The globally unique number of the broker, which cannot be repeated, for example, broker id=1,broker.id=2 broker.id=0 #Delete topic function enable delete.topic.enable=true #Number of threads processing network requests num.network.threads=3 #Off the shelf quantity used to process disk IO num.io.threads=8 #Buffer size of send socket socket.send.buffer.bytes=102400 #Buffer size of receive socket socket.receive.buffer.bytes=102400 #Buffer size of the requested socket socket.request.max.bytes=104857600 #kafka running log storage path log.dirs=/project/module/kafka/logs #topic number of partitions on the current broker num.partitions=1 #Number of threads used to recover and clean up data under data num.recovery.threads.per.data.dir=1 #The maximum length of time segment files are retained. The timeout will be deleted log.retention.hours=168 #Configure Zookeeper cluster address zookeeper.connect=192.168.124.135:2181,192.168.124.136:2181,192.168.124.137:2181 -------------------------------------------------------- # 5) Configure environment variables -------------------------------------------------------- vi /etc/profile #KAFKA_HOME export KAFKA_HOME=/project/module/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile -------------------------------------------------------- # 6) Other servers complete steps 1-6 in turn, * * broker ID cannot be repeated** # 7) Start kafka on the three server nodes in turn bin/kafka-server-start.sh -daemon config/server.properties #-daemon background startup bin/kafka-server-start.sh config/server.properties bin/kafka-server-stop.sh config/server.properties
Kafka Eagle service monitoring deployment
# (1) Modify the kafka start command In SH command -------------------------------------------------------- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi -------------------------------------------------------- # Change to -------------------------------------------------------- if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi -------------------------------------------------------- # (2) After modification, other servers should also be modified before starting Kafka # (3) Upload the compressed package Kafka Eagle bin-1.3.7 tar. GZ to the cluster / opt/module directory, and extract it to the local directory tar -zxvf kafka-eagle-bin-1.3.7.tar.gz # (3) Enter the directory you just unzipped and put kafka-eagle-web-1.3.7-bin tar. GZ unzip to / opt/module tar -zxvf kafka-eagle-web-1.3.7-bin.tar.gz # (4) Modify name mv kafka-eagle-web-1.3.7/ eagle # (5) Give startup file execution permission chmod 777 ke.sh # (6) Modify profile -------------------------------------------------------- ###################################### # multi zookeeper&kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=192.168.124.135:2181,192.168.124.136:2181,192.168.124.137:2181 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # enable kafka metrics ###################################### kafka.eagle.metrics.charts=true kafka.eagle.sql.fix.error=false ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver #Note that the driver version is currently connected to 5.5mysql and is driven by version 5 kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456 -------------------------------------------------------- # (7) Add environment variable -------------------------------------------------------- vi /etc/profile export KE_HOME=/project/module/eagle export PATH=$PATH:$KE_HOME/bin source /etc/profile -------------------------------------------------------- # (8) Startup note: ZK and KAFKA need to be started before startup bin/ke.sh start
Kafka command line operation
# 1) View all topic s in the current server bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --list # 2) Create topic comments: - topic definition topic name -- replication factor definition number of copies -- partitions definition number of partitions bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --create --replication-factor 3 --partitions 1 --topic first # 3) Delete topic bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --delete --topic first # 4) Send message bin/kafka-console-producer.sh --broker-list 192.168.124.135:9092 --topic first # 5) Comments on consumption message: - from beginning: all previous data in the topic will be read out, - bootstrap server: new version writing method bin/kafka-console-consumer.sh --zookeeper 192.168.124.135:2181 --topic first bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.135:9092 --from-beginning --topic first # 6) View the details of a Topic bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --describe --topic first # 7) Modify the number of partitions bin/kafka-topics.sh --zookeeper 192.168.124.135:2181 --alter --topic first --partitions 6
Kafka architecture in depth
[the external chain image transfer fails. The source station may have anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-4zk6ejrt-1624928667231) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-202106241501083. PNG)]
Topic: it is a logical concept. Messages are classified by topic. Producer production messages and consumer consumption messages are all topic oriented.
Partition: a physical concept. Each partition corresponds to a log file. The log file stores the data produced by Produce. The data will be appended to the end of the log, and each data has its own offset
Consumer Group (CG): consumers in the Consumer Group will record the offset of consumption in real time, which is used to recover the last position and continue consumption in case of error.
Segment:
- When the Partition data is added continuously and reaches a certain value, it will be segmented automatically. The Partition and index mechanism is adopted to find the data faster.
- Each segment corresponds to two files, ". Index" file and ". Log" file. The index and log files are named after the offset of the first message of the current segment.
- If the file is in a folder, the folder name rule is: topic name + partition serial number. For example, if the topic has three partitions, the corresponding folders are first-0, first-1 and first-2.
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-smwpxzfg-1624928667231) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624150729555. PNG)]
- index file: the sequence number in this segment file is on the left, and the actual physical address in the data file is on the right. The address offset depends on the length of the message.
- log file: message data file. On the left is the actual physical address offset of the message in this segment file, and on the right is the global sequence number of the message in the whole partition
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-vm6bddf0-1624928667232) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210624152314643. PNG)]
Kafka producer
Partition policy
Reason for partition
- (1) It is convenient to expand in the cluster. Each Partition can be adjusted to adapt to its machine, and a topic can be composed of multiple partitions. Therefore, the whole cluster can adapt to data of any size;
- (2) Concurrency can be improved because you can read and write in Partition units.
Principle of zoning
We need to encapsulate the data sent by the producer into a producer record object.
- (1) When the partition is specified, the specified value is directly used as the partition value;
- (2) If the partition value is not specified but there is a key, the hash value of the key and the partition number of topic are taken to obtain the partition value;
- (3) When there is no partition value and no key value, an integer is randomly generated for the first call (which will increase automatically for each subsequent call), and the partition value is obtained by subtracting this value from the total number of partitions available for topic, which is commonly known as the round robin algorithm.
Data reliability assurance
In order to ensure that the data sent by the producer can be reliably sent to the specified topic, each partition of the topic needs to send an ack to the producer after receiving the data sent by the producer. If the producer receives an ack, it will send the next round of data, otherwise it will send the data again.
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-is3k7h9n-1624928667232) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210626113937150. PNG)]
Replica data synchronization policy
programme | advantage | shortcoming |
---|---|---|
Send acks when more than half of the synchronization is completed | Low delay | When electing a new leader, the fault of N nodes is tolerated, and 2n+1 secondary nodes are required |
Send acks after all synchronization is completed | When electing a new leader, it tolerates the failure of N nodes and requires n+1 copies | High delay |
Kafka chose the second option for the following reasons:
- Similarly, in order to tolerate the failure of N nodes, the first scheme needs 2n+1 copies, while the second scheme only needs n+1 copies, and each partition of Kafka has a large amount of data. The first scheme will cause a large amount of data redundancy.
- Although the network delay of the second scheme will be relatively high, the network delay has little impact on Kafka.
ISR
To solve the problem of slow synchronization of some followers, the leader maintains a dynamic in sync replica set (ISR), which means a set of followers synchronized with the leader. When the follower in ISR completes data synchronization, the leader will send ack to the follower. If the follower fails to synchronize data with the leader for a long time, the follower will propose ISR and change the time threshold to replica lag. time. The MS parameter determines that if the leader hangs up, the leader will be elected from the ISR.
ACK response mechanism
acks parameter configuration:
- 0 (At Most Once): the producer does not have to wait for the broker's ack, which provides a minimum delay. However, if the broker fails, this operation may lead to data loss.
- 1: The producer waits for the broker's ack. The leader of the partition has written data, but the follower is not synchronized. If the leader fails, this operation will lead to data loss
- -1 (At Least Once): the producer waits for the broker's ack. Both the leader and the follower have written and synchronized data, but when the ACK is returned, the leader fails, causing the producer to resend the data and duplicate the data.
Fault handling details
LEO (log end offset): the last offset of each replica.
HW (High Watermark): refers to the largest offset that consumers can see and the smallest LEO in the ISR queue.
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-ej8ek8h4-1624928667233) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210625104832309. PNG)]
- Follower failure: after a follower failure, it will be kicked out of the ISR. After the follower recovers, the follower will read the last HW recorded on the local disk, cut off the part of the log file higher than HW, and synchronize with the leader from HW. When the LEO of the follower is higher than the HW of the partition, it can rejoin the ISR.
- Leader failure: after a leader failure, a new leader will be selected from the ISR. After the election, in order to ensure data consistency, the rest of the follower s will first cut off the part of the log file higher than HW, and then synchronize the data from the new leader.
- Note: this can only ensure the data consistency between replicas, and does not guarantee that the data is not lost or duplicated.
Exactly Once semantics
- When ACK is 0, it is At Most Once, which can ensure that data is not repeated, but data may be lost.
- When the ACK is - 1, it is At Least Once, which can ensure that the data is not lost, but the data will be repeated.
- Exactly Once requires that data is neither lost nor repeated. After version 0.11, Exactly Once is realized through idempotency combined with At least Once.
- Enabling idempotency requires enable in the producer parameter Idompotent is set to true. The previous de duplication is performed in the downstream, so the performance is low. After 0.11, the de duplication is performed in the upstream.
- During initialization, the producer will allocate a PID and send it to the same Partition, and the message will be attached with a Sequence Number. The Broker side will cache < PID, Partition, seqnumber >, and only one message with the same primary key will be cached.
- The PID will change after restart, and different partitions also have different primary keys, so idempotency cannot guarantee the exact once of cross Partition sessions.
Kafka consumer
Consumption mode
- Consumers use the pull mode to read data from broker s, and the push mode is difficult to meet all consumer s.
- The disadvantage of pull mode is that if Kafka has no data, the consumer will fall into a loop and always return empty data. (in view of this, Kafka's consumers will pass in a duration parameter timeout when consuming data. If there is no data available for consumption, the consumer will wait for a period of time before returning. This duration is timeout.)
Partition allocation policy
Allocation timing
There are two default partition allocation strategies in Kafka: Range and RoundRobin. Kafka will make a partition allocation when the following events occur:
- New consumers in the same Consumer Group
- The consumer leaves the current Consumer Group, including shutdowns or crashes
- New partition for subscribed topics
offset maintenance
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-x1e9hs6k-1624928667233) (C: \ users \ HiApad \ appdata \ roaming \ typora user images \ image-20210625140310334. PNG)]
Consumers in the same consumer group can only have one consumer at a time, consumer Group. In the properties configuration file ID grouping.
Kafka efficient data reading and writing
Sequential write disk
Kafka's producer production data should be written to the log file. The writing process is to append it to the end of the file in order. Data on the official website shows that for the same disk, sequential writing can reach 600M/s, while random writing is only 100K/s. This is related to the mechanical mechanism of the disk. The reason why sequential writing is fast is that it saves a lot of head addressing time.
Zero replication technology
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-kftjqopl-1624928667233) (C: \ users \ HiApad \ appdata \ roaming \ typora \ user images \ image-20210625141831997. PNG)]
The role of Zookeeper in Kafka
zookeeper will elect a broker as a conroller, which is responsible for managing the uplink and downlink of the cluster broker, the partition replica allocation of all topic s and the leader election.
Kafka transaction
Kafka introduced transaction support from version 0.11. Transactions can guarantee the base of Kafka in the exact once semantics
On this basis, production and consumption can cross partitions and sessions, either all succeed or all fail.
Producer transaction
In order to realize cross partition and cross session transactions, it is necessary to introduce a globally unique Transaction ID and bind the PID obtained by the producer with the Transaction ID. In this way, when the producer is restarted, the original PID can be obtained through the ongoing Transaction ID. In order to manage transactions, Kafka introduces a new component Transaction Coordinator. Producer obtains the task status corresponding to the Transaction ID by interacting with the Transaction Coordinator. The Transaction Coordinator is also responsible for writing the Transaction information into an internal Topic of Kafka. In this way, even if the entire service is restarted, the Transaction status in progress can be restored because the Transaction status is saved, so as to continue.
Consumer transaction
Distribution and leader election * *.
Kafka transaction
Kafka introduced transaction support from version 0.11. Transactions can guarantee the base of Kafka in the exact once semantics
On this basis, production and consumption can cross partitions and sessions, either all succeed or all fail.
Producer transaction
In order to realize cross partition and cross session transactions, it is necessary to introduce a globally unique Transaction ID and bind the PID obtained by the producer with the Transaction ID. In this way, when the producer is restarted, the original PID can be obtained through the ongoing Transaction ID. In order to manage transactions, Kafka introduces a new component Transaction Coordinator. Producer obtains the task status corresponding to the Transaction ID by interacting with the Transaction Coordinator. The Transaction Coordinator is also responsible for writing the Transaction information into an internal Topic of Kafka. In this way, even if the entire service is restarted, the Transaction status in progress can be restored because the Transaction status is saved, so as to continue.
Consumer transaction
The above transaction mechanism is mainly considered from the Producer. For the Consumer, the transaction guarantee will be relatively weak. In particular, it is impossible to ensure that the Commit information is accurately consumed. This is because the Consumer can ask for any information through offset, and the life cycles of different segment files are different. Messages of the same transaction may be deleted after restart.