There are a lot of delay operations in Kafka, such as delayed production, delayed pull and delayed deletion. Kafka does not use the Timer or DelayQueue provided by JDK to realize the delay function, but implements a Timer (SystemTimer) for the delay function based on the time wheel algorithm.
The average time complexity of inserting and deleting a single task in Timer and DelayQueue in JDK is O(logN), which can not meet the high performance requirements of Kafka. Based on time wheel, the time complexity of inserting and deleting tasks can be reduced to O(1).
The following figure shows Kafka's time wheel structure:
Kafka's timing wheel is a ring queue for storing scheduled tasks. The bottom layer is implemented by array. Each element in the array can store a TimerTaskList, or task slot. TimerTaskList is a circular two-way linked list. Each item in the linked list represents a timertaskentry, which encapsulates the real TimerTask.
The time wheel is composed of multiple time grids, and each time grid represents the basic time span (tickMs) of the current time wheel. The number of time grids of the time wheel is fixed and can be expressed by wheelSize. Then the overall time span (interval) of the whole time wheel can be expressed by the formula tickMs × wheelSize is calculated. The time wheel also has a dial pointer (currentTime), which is used to represent the current time of the time wheel. currentTime is an integer multiple of tickMs. currentTime can divide the whole time wheel into expired part and unexpired part. The time frame currently pointed to by currentTime also belongs to the expired part, which means that it just expires and all tasks in the TimerTaskList corresponding to this time frame need to be processed.
Let's explain the time wheel algorithm through Kafka's source code.
Task addition
def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) // Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } }
Take task 222 as an example to explain the process of adding tasks to the time wheel:
The addTimerTaskEntry method of SystemTimer calls the add method of TimeingWheel. If the task fails to be added, it proves that the current task has expired, and the task is directly handed over to the working thread for execution;
The add method of TimeingWheel first obtains the expiration time of the task, which is 222 here; Here comes the judgment logic:
- If expiration < currenttime + tick proves that the current task has expired, it will directly return to fasle and hand over the task to the working thread for execution;
Assuming that the creation time of SystemTimer is 0, the currentTime of the TimeingWheel created by SystemTimer is also 0. Since 222 > 0 + 1, it does not meet the first judgment and enters the second judgment.
- If expiration < currenttime + interval, prove that the time wheel of the current level can accommodate the task, and put the task into the corresponding slot of the time wheel;
Since 222 > 0 + 10, it does not meet the second judgment and enters the third judgment.
- If expiration > = currenttime + interval, it is proved that the time wheel of this level cannot accommodate the task, and it is necessary to try the time wheel of the previous level up;
After obtaining the time wheel of the previous layer, continue to execute the add method directly on the time wheel of the previous layer.
tick=10 and interval=100 for the time wheel of the second layer. Since 222 > 0 + 100, it still enters the third judgment and continues to obtain the time wheel of the previous layer.
The tick of the time wheel on the third layer is 100 and the interval is 1000. Since 222 < 0 + 1000, enter the second judgment and execute the task addition process.
Next, let's look at the process of adding tasks:
(1) The first is to calculate the slot position;
val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) virtualId = 222 / 100 = 2 bucket = 2 % 10 = 2 That is, the second slot, corresponding to[200-300]Scope of
(2) Obtain the task linked list on the slot and add the task to the linked list;
bucket.add(timerTaskEntry)
(3) If the task is added to the linked list for the first time, you need to set the expiration time of the linked list and add the linked list to the DelayQueue of SystemTimer.
// Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } Expiration time is 2*100=200
Take a look at timertasklist Setexpiration method:
def setExpiration(expirationMs: Long): Boolean = { expiration.getAndSet(expirationMs) != expirationMs }
It can be found that if the expiration time of the linked list is the same as that set before, it will directly return False to avoid adding the linked list to the DelayQueue of the Timer repeatedly.
Time wheel push
Next, let's look at how to push the time wheel. Suppose we create a SystemTimer and add six scheduled tasks with expiration times of 9, 88, 222, 520, 521 and 522, numbered ① to ⑥ respectively.
The time wheel after adding a task is shown as follows:
The SystemTimer constructor is as follows:
@threadsafe class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer { // timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable)) private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) private[this] val timingWheel = new TimingWheel( tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue )
SystemTimer relies on DelayQueue to advance the time wheel.
def advanceClock(timeoutMs: Long): Boolean = { // Get the first element of DelayQueue (the fastest expired task slot) var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { // If the task slot is not null, it will continue to cycle (while ensures the continuous advancement of the time wheel) while (bucket != null) { // Cascade update the time wheel of each level. currentTime is the expiration time of the time slot timingWheel.advanceClock(bucket.getExpiration) // Delete the slot and add the tasks in the slot to the time wheel again bucket.flush(addTimerTaskEntry) // Continue to get the first element of DelayQueue (the fastest expired task slot) bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }
- The slot where task 1 is located is the first element of DelayQueue, and its expiration time is 9. Then, the currentTime of time wheels at all levels is updated to 9;
- Perform flush operation on each element in the slot where task 1 is located;
// Remove all task entries and apply the supplied function to each of them def flush(f: TimerTaskEntry => Unit): Unit = { synchronized { var head = root.next while (head ne root) { // Delete each element in the slot remove(head) // Execute the passed in function f(head) head = root.next } // Set the expiration time of the original time slot to - 1 expiration.set(-1L) } }
The function passed in by flush is addTimerTaskEntry of SystemTimer:
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { // Try to add a task TimerTaskEntry to the time wheel if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled // If the addition fails, it proves that the task has been cancelled or expired if (!timerTaskEntry.cancelled) // Overdue tasks are submitted directly to the worker thread for execution taskExecutor.submit(timerTaskEntry.timerTask) } }
Task 1 is added to the time wheel again. At this time:
currentTime=9 expiration=9 tick=1 interval=10 expiration < currentTime + tick,If it proves that the current task has expired, return directly fasle,Leave the task to the worker thread to execute
-
Continue with delayqueue The poll () method returns the slot where task ② is located, and its expiration time is 80. Then, the currentTime of time wheels at all levels is updated to 80;
-
Task ② is added to the time wheel again. At this time:
currentTime=80 expiration=88 tick=1 interval=10 expiration < currentTime + interval,Prove that the time wheel of the current level can accommodate the task, and put the task into the corresponding slot of the time wheel; virtualId = expiration / tickMs = 88 / 1 = 88 bucket = virtualId % wheelSize = 88 % 10 = 8 That is, the 8th slot. Set the expiration time of the slot to 88 and add it to the delay queue delayQueue in
-
Continue with delayqueue The poll () method returns the slot where task ② is located, and its expiration time is 88. Then, the currentTime of time wheels at all levels is updated to 88;
-
Task ② is added to the time wheel again. At this time:
currentTime=88 expiration=88 tick=1 interval=10 expiration < currentTime + tick,If it proves that the current task has expired, return directly fasle,Leave the task to the worker thread to execute
Other tasks, and so on.
Focus on 2 points:
(1) How currenttime evolves;
(2) How the task is downgraded from the time wheel to the small wheel.
Task slot
SystemTimer relies on DelayQueue to advance the time wheel, and the element in DelayQueue is the slot TimerTaskList in the time wheel.
Elements added to the delay queue must implement the getDelay and compareTo methods of the Delayed interface:
def getDelay(unit: TimeUnit): Long = { unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[TimerTaskList] java.lang.Long.compare(getExpiration, other.getExpiration) }
When the delay of the task slot is < = 0, the task slot will be poll ed out of the delay queue, then traverse the elements in the slot and re add to the time wheel in turn;
When a task in a time slot is added to the time wheel again, the task slot will be degraded or the task will be directly submitted to the worker thread for execution.
Every time you re add a slot, you start with the smallest time wheel:
For example, for task ③, its initial slot is in the second slot of the third layer time wheel. When it is taken out and added to the time wheel again, first try from the first layer time wheel:
currentTime=200 expiration=222 tick=1 interval=10 expiration >= currentTime + interval,Prove that the time wheel at this level cannot accommodate the task. You need to try the time wheel at the previous level up
Then try the layer 2 time wheel:
currentTime=200 expiration=222 tick=10 interval=100 expiration < currentTime + interval,Prove that the time wheel of the current level can accommodate the task, and put the task into the corresponding slot of the time wheel; virtualId = 222 / 10 = 22 bucket = 22 % 10 = 2 That is, the second slot, and set the expiration time of the slot to virtualId * tickMs = 22*10 = 220
It can be found that task ③ is degraded from the second time slot of the layer 3 time wheel (the expiration time is 200) to the second time slot of the layer 2 time wheel (the expiration time is 220).
It is deduced in sequence that the next degradation will be degraded from the second time slot (expiration time 220) of the layer 2 time wheel to the second time slot (expiration time 222) of the layer 1 time wheel.
Then demote again. At this time, there is no time wheel with lower accuracy. Expiration < currenttime + tick indicates that the current task has expired and the task is handed over to the working thread for execution.
To sum up, with the continuous advancement of the time wheel, the task will be repeatedly added to the time wheel, its slot position will be degraded, and the accuracy of the expiration time will be gradually improved until the accuracy reaches the accuracy of the minimum time wheel, indicating that the task is really due and submitted for execution.
Through analysis, it can be found that the TimingWheel in Kafka is dedicated to inserting and deleting TimerTaskEntry, while the DelayQueue is dedicated to the task of time advance. Imagine that the expiration of the first timeout task list in the DelayQueue is 200ms and the second timeout task is 840ms. Here, only O(1) time complexity is required to obtain the queue head of the DelayQueue (the new queue head will be switched out again after obtaining it). If the timing propulsion per millisecond is adopted, 199 of the 200 propulsion performed when obtaining the first timeout task list belong to "empty propulsion", and 639 "empty propulsion" are required when obtaining the second timeout task, which will waste the performance resources of the machine for no reason. Here, DelayQueue is used to assist in exchanging a small amount of space for time, so as to achieve "accurate propulsion". The timer in Kafka can be described as "knowing people and making good use of them". TimjngWheel is used to do the best task addition and deletion operations, while DelayQueue is used to do the best time promotion work. The two complement each other.
reference:
[1] https://github.com/apache/kafka/tree/trunk/core/src/main/scala/kafka/utils/timer
[2] In depth understanding of Kafka: core design and practice principles, author: Zhu Zhonghua, publishing house: Electronic Industry Press