introduce
Kafka was originally developed by Linkedin company. It is a distributed, partition and multi replica based on zookeeper protocol
The biggest feature of the distributed messaging system is that it can process a large amount of data in real time to meet various demand scenarios: for example, the batch processing system based on hadoop
System, low latency real-time system, Storm/Spark streaming engine, web/nginx log, access log, message service, etc. are written in scala language,
Linkedin contributed to the Apache foundation in 2010 and became a top open source project.
Usage scenario of Kafka
Kafka basic concept
kafka is a distributed, partitioned message (officially called
commit log
)Service. It provides the functions that a message system should have, but it does have its own advantages
Special design. It can be said that Kafka borrowed the idea of JMS specification, but it did not
The JMS specification is not fully followed.
First, let's look at the basic news(
Message
)Related terms:
name
|
explain
|
Broker
|
Message oriented middleware processing node. A Kafka node is a broker
One or more brokers can form a Kafka cluster
|
Topic
|
Kafka classifies messages according to topic and publishes them to each node of Kafka cluster
All messages need to specify a topic
|
Producer
|
Message producer, the client that sends messages to the Broker
|
Consumer
|
Message consumer, the client that reads messages from the Broker
|
ConsumerGroup
|
Each Consumer belongs to a specific Consumer Group, and one Consumer
Interest can be consumed by multiple different consumer groups, but one
Only one Consumer in the Consumer Group can consume this message
|
Partition
|
Physically, a topic can be divided into multiple partition s, each
partition internal messages are ordered
|
Therefore, from a higher level, the producer sends messages to the Kafka cluster through the network, and then consumer s consume them, as shown in the following figure:
The communication between brokers and clients (producer s and consumers) is through
TCP protocol
To finish.
Topic
It can be understood that Topic is the name of a category, and similar messages are sent under the same Topic. For each Topic, there can be multiple partition log files:
Partition is a
Ordered message sequence
These message s are sequentially added to a file called
File of commit log
Yes. In each partition
Messages have a unique number called
offset
, which is used to uniquely identify the message in a partition.
Each partition corresponds to a commit log file
. The offset s of message s in a partition are unique, but different partitions
The offset of message in may be the same.
kafka generally does not delete messages, regardless of whether they are consumed or not. Multiple messages will only be confirmed according to the configured log retention time (log.retention.hours)
If it has been deleted for a long time, the log messages of the last week are retained by default. The performance of kafka has nothing to do with the amount of message data retained, so a large amount of message data is saved
Blog information will have no impact.
Each consumer works based on its own consumption progress (offset) in the commit log
. In kafka,
The consumption offset is set by the consumer from
Self maintenance
; Generally, we consume the messages in the commit log one by one in order. Of course, I can repeatedly consume some messages by specifying offset,
Or skip some messages.
This means that the impact of consumers in kafka on the cluster is very small. Adding or reducing a consumer is very important for the cluster or other consumers
Generally speaking, it has no impact, because each consumer maintains its own consumption offset.
Why partition storage of data under Topic?
1. The commit log file is limited by the file system size of the machine. After partitioning, you can put different partitions on different machines, which is equivalent to
Data is stored in a distributed manner. Theoretically, a topic can handle any number of data.
2. For
Improve parallelism
.
Partition election mechanism
Kafka core general Controller
In the Kafka cluster, there will be one or more broker s, one of which will be elected as the controller (Kafka Controller), which is responsible for managing the whole cluster
Status of all partitions and replicas in the cluster.
When the leader copy of a partition fails, the controller is responsible for electing a new leader copy for the partition.
When the ISR set of a partition is detected to change, the controller is responsible for notifying all broker s to update their metadata information.
When the kafka-topics.sh script is used to increase the number of partitions for a topic, the controller is also responsible for making the new partition perceived by other nodes
Yes.
Independent election mechanism
When the kafka cluster is started, a broker will be automatically selected as the controller to manage the whole cluster. The election process is that each broker in the cluster will
Try creating one on zookeeper
/controller temporary node
, zookeeper will ensure that only one broker can be created successfully
It will become the master controller of the cluster.
When the broker in the controller role goes down, the zookeeper temporary node will disappear, and other brokers in the cluster will always listen to this temporary node
Click. If you find that the temporary node disappears, compete to create the temporary node again, which is the above-mentioned election mechanism. zookeeper will ensure that there is a broker
Become the new controller.
A broker with controller identity needs more responsibilities than other ordinary brokers. The details are as follows:
1. Monitor broker related changes
. Add a BrokerChangeListener for the / brokers/ids / node in Zookeeper to handle brokers
Changes in increase or decrease.
2. Listen for changes related to topic. Add a TopicChangeListener to the / brokers/topics node in Zookeeper to handle topic changes
Changes in; For / admin / delete in Zookeeper_ The topics node adds a TopicDeletionListener to handle the action of deleting topics.
3. Read and obtain all current information related to topic, partition and broker from Zookeeper and manage them accordingly. For all topics
The / brokers/topics/[topic] node in the corresponding Zookeeper adds a PartitionModificationsListener to listen to the data in the topic
Partition allocation changes.
4. Update the metadata information of the cluster,
Synchronize to other normal broker nodes.
Partition replica election Leader mechanism
The controller senses that the broker where the partition leader is located is hung (the controller listens to many zk nodes and can perceive that the broker is alive), and the controller will
Select the first broker in the ISR list (on the premise that the parameter unclean. Leader. Selection. Enable = false) as the leader (the first broker is first placed in the ISR)
List, which may be the replica with the most synchronized data). If the parameter unclean.leader.election.enable is true, it means that all replicas in the ISR list are hung
When you're ready
Copy outside ISR list
The selected leader can improve the availability, but the selected new leader may have much less data.
There are two conditions for a replica to enter the ISR list:
1. The replica node cannot generate partitions. It must be able to maintain a session with zookeeper and connect to the leader replica network
2. The replica can replicate all write operations on the leader and cannot lag too much behind. (the replica that lags behind the leader replica is synchronized by the
The replica.lag.time.max.ms configuration determines that the replica that has not been synchronized with the leader for more than this time will be removed from the ISR list)
The following is an explanation of the output content. The first line is the summary information of all partitions, and each subsequent line represents the information of each partition.
The leader node is responsible for all read and write requests for a given partition. The leader replicas of different partitions of the same topic are generally different (for disaster recovery)
replicas indicates the number of broker s on which a partition is backed up. No matter whether the points are "leader s" or not, even if the node is hung, they will be listed.
isr is a subset of replicas. It lists only those that are currently alive, and
Synchronized backup
The node of the partition
Producers
The producer sends the message to the topic and is responsible for selecting which partition of the topic to send the message to
roundrobin
Do simple
Load balancing. It can also be distinguished according to a keyword in the message. Usually, the second method is used more.
Consumers
Kafka provides an abstract concept of consumer based on these two modes:
consumer group
.
queue mode: all consumers are in the same consumer group.
Publish subscribe mode: all consumers have their own unique consumer group.
Consumption order
A partition can only have one consumer instance in a consumer group at a time, so as to ensure the consumption order.
The number of consumer instance s in the consumer group cannot be more than the number of partition s in a Topic. Otherwise, the excess will occur
consumer cannot consume the message.
Kafka only guarantees the local order of message consumption within the scope of partition, and cannot guarantee the total consumption order in multiple partitions in the same topic
Sex.
If there is a need to ensure the overall consumption order, we can set the number of partitions in topic to 1 to set the number of partitions in the consumer group
The number of consumer instance s is also set to 1, but this will affect the performance, so the sequential consumption of kafka is rarely used
Message sender code parsing
public class MsgProducer { private final static String TOPIC_NAME = "my‐replicated‐topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094"); /* Issue message persistence mechanism parameters (1)acks=0: It means that the producer can continue to send the next message without waiting for any broker to confirm the reply of the message. It has the highest performance, but it is most likely to lose messages. (2)acks=1: At least wait until the leader has successfully written data to the local log, but do not wait for all follower s to write successfully. You can continue to send the next message Messages. In this case, if the follower fails to successfully back up the data and the leader hangs up, the message will be lost. (3)acks=‐1 Or all: you need to wait for min.insync.replicas (the default is 1, and the recommended configuration is greater than or equal to 2). The number of replicas configured by this parameter are successfully written to the log. This policy will ensure that As long as a backup survives, there will be no data loss. This is the strongest data guarantee. Generally, this configuration is only used in financial level or scenarios dealing with money. */ /*props.put(ProducerConfig.ACKS_CONFIG, "1"); *//* Retrying occurs when sending fails. The default retry interval is 100ms. Retrying can ensure the reliability of message sending, but it may also cause repeated message sending, such as network jitter, so it is necessary to The receiver handles the idempotency of message reception *//* props.put(ProducerConfig.RETRIES_CONFIG, 3); //Retry interval setting props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //Set the local buffer for sending messages. If this buffer is set, messages will be sent to the local buffer first, which can improve message sending performance. The default value is 33554432, or 32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); *//* kafka The local thread will fetch data from the buffer and send it to the broker in batches, Set the size of batch messages. The default value is 16384, or 16kb, which means that a batch is sent when it is full of 16kb *//* props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); *//* The default value is 0, which means that the message must be sent immediately, but this will affect performance Generally, it is set to about 10ms, which means that after the message is sent, it will enter a local batch. If the batch is full of 16kb within 10ms, it will be sent together with the batch If the batch is not full within 10 milliseconds, the message must also be sent. The message sending delay cannot be too long *//* props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*/ //Serialize the sent key from the string into a byte array props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //Serialize the sent message value from a string into a byte array props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); int msgNum = 5; final CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 1; i <= msgNum; i++) { Order order = new Order(i, 100 + i, 1, 1000.00); //Specify send partition /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/ //No sending partition is specified. The specific sending partition calculation formula is: hash(key)%partitionNum ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME , order.getOrderId().toString(), JSON.toJSONString(order)); //Synchronization blocking method waiting for successful message sending /*RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("Sending message result in synchronization mode: "+" topic ‐ "+ metadata. Topic() +" |partition ‐“ + metadata.partition() + "|offset‐" + metadata.offset());*/ //Send message in asynchronous callback mode producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("Failed to send message:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("Send message results asynchronously:" + "topic‐" + metadata.topic() + "|partition‐" + metadata.partition() + "|offset‐" + metadata.offset()); } countDownLatch.countDown(); } }); //Send points TODO } countDownLatch.await(5, TimeUnit.SECONDS); producer.close(); } }
Message receiver code
public class MsgConsumer { private final static String TOPIC_NAME = "my‐replicated‐topic"; private final static String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094"); // Consumer group name props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); // Whether to automatically submit offset. The default is true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Interval between auto commit offset s props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); /* When the consumption theme is a new consumption group, or the consumption method of offset is specified, and the offset does not exist, how should it be consumed latest(Default): only consume the messages sent to the topic after you start earliest: It is necessary to start consumption from scratch for the first time and continue consumption according to the consumption offset record later. This need is different from consumer.seektobegining (start consumption from scratch every time) */ //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); /* consumer The interval between sending a heartbeat to the broker. If the broker receives a heartbeat, it will respond through the heartbeat rebalance The scheme is distributed to the consumer. This time can be a little shorter */ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); /* How often does the service broker think that a consumer fails when it doesn't feel the heartbeat, and it will kick it out of the consumer group, The corresponding Partition will also be reassigned to other consumer s. The default is 10 seconds */ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); //The maximum number of messages pulled by a poll. If the consumer processes quickly, it can be set to be larger. If the processing speed is average, it can be set to be smaller props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); /* If the interval between two poll operations exceeds this time, the broker will think that the processing capacity of the consumer is too weak, It will be kicked out of the consumption group and the partition will be allocated to other consumer s */ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC_NAME)); // Consumption specified partition //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); //Message backtracking consumption /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/ //Specify offset consumption /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/ //Start consumption from a specified point in time List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); //Consumption started 1 hour ago long fetchDataTime = new Date().getTime() ‐1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(topicName, par.partition()), fetchDataTime); } Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition‐" + key.partition() + "|offset‐" + offset); System.out.println(); //Determine the offset according to the timestamp in the consumption if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } } while (true) { /* * poll() API Is a long poll to pull messages */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Message received: partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } if (records.count() > 0) { // If the offset is submitted manually, the current thread will block until the offset is submitted successfully // Synchronous submission is generally used because there is generally no logic code after submission consumer.commitSync(); // Manually and asynchronously submit offset. The offset submitted by the current thread will not be blocked. You can continue to process the following program logic consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } } }); } } } }