Implementation and analysis of time wheel and time wheel in go zero

Time wheel

Used to delay the scheduling of tasks.

scene

Imagine a scenario where I need to maintain a connection pool. The connection in the connection pool has a timeout time. The connection will maintain a certain frequency to send heartbeat packets. For example, it will expire in 5s if I don't receive keep alive within 30s. conn will disconnect at the expiration time. How to design this connection pool?

Scene abstraction

This scenario can be understood as that after I receive a request, I need to perform an action after a delay of 30 seconds, and if I receive the same request within 30 seconds, I will postpone the task for another 30 seconds. What should we do?

feasible solution

Assuming that the maximum number of connections in my connection pool is 1000, the simple method is to maintain a recent keep alive time for each connection, start a timer, traverse all connections every second, and disconnect when the time comes. When the heartbeat is received, update the time of this connection. What if there are 10000 connections? It takes 10000 iterations per second to determine which tasks to delete, which is a waste of resources.

Better solution

Use the time wheel algorithm. The basic idea of the time wheel algorithm is to disperse the delayed tasks and not maintain them in one place, so as to avoid the loss of traversal every time because they are all placed in one place.
How to disperse the tasks? According to the maximum time limit of the delayed task and the execution time of the timer, for example, I support the maximum delay of 60 seconds and scan every second. Then I can create an array called timeWheel. On the timeWheel is an array connList composed of connections whose time expires, and maintain a cur_index, pointing to an index of timeWheel.
The executed process is to traverse backward from this index, move an index every second, return to 0 when reaching the maximum index, and continue the cycle. Each move disconnects all connections in the index.
When the heartbeat is received, the connection is removed from the index of the current timeWheel. After the index is increased by 30, the module of 60 is taken to ensure that it does not cross the boundary.

Optimization: Although you don't need to scan groups when deleting every second, in this case, you have to scan the array when you receive the heartbeat. How can you optimize the operation of scanning the array when you receive the heartbeat? In the process of fetching connections from timeWheel, you can use map to index conn to index in timeWheel without looking up the whole time wheel. Modify the index of timeWheel in Conn,

graphic

defect

In this scenario, the maximum delay time is 60s, so a timeWheel with a size of 60 is enough. What if it is a day? One day is 86400s, so you need a timeWheel of 86400 size. It can be optimized by layering.

Multilayer time wheel

Multi layer time wheel is the optimization scheme of single-layer time wheel, which is used to alleviate the situation that the size of timeWheel also needs to be increased when the time range of single-layer time wheel is large and the span is small.

scene

Expand the time range of the above scene to 7 days, 86400s*7

How to layer

For this kind of scene, we can divide it into four time rounds.
The first layer timeWheel1 represents seconds, the size is 60, and each element represents 1s.
On the second layer, timeWheel2 represents minutes, the size is 60, and each element represents 1m.
In the third layer, timeWheel3 represents hours, the size is 24, and each element represents 1h.
The fourth layer timeWheel4 represents days, the size is 7, and each element represents 1d.

Execution process

Premise setting

Suppose the current timeWheel status is as follows
60 indexes of timeWheel1 are empty, cur_index1 is at index0
60 indexes of timeWheel2 are empty, cur_index2 is at index0
All 24 indexes of timeWheel3 are empty, cur_index3 is at index0
Index of 7 songs of timeWheel4 is null, cur_index4 is at index0

Task addition process

A task 1 is received and needs to be executed after a delay of 7100s,
Calculate whether it can fall into timeWheel1. 7100 is larger than the maximum range 60 of timeWheel1 and cannot be placed in timeWheel1.
Calculate whether it can fall into timeWheel2. 7100 / 60 = 118.333 rounded down 118118 is larger than the maximum range of 60 of timeWheel2. Cannot be placed in timeWheel2.
Calculate whether it can fall into timeWheel3$ 7100 / (60 * 60) = 1.97222 $, rounded down to 1.97222, which is smaller than the maximum range 24 of timeWheel3 and can be placed in timeWheel3. Calculate the remaining time of the next level: $7100% (60 * 60) = 3500 $, 3500s left, stored in cur_ At index3 + 1, save the remaining time 3500 at the same time.
This is the upgrade process of the time wheel.

Time wheel operation process

The operation involves the degradation process of the time wheel.
Take the four layer ring as an example
A simple implementation idea is to define in advance how many layers of rings there are and how many layers of rings there are, and open as many timers as there are.
When the upper timer reaches the time, the corresponding cur_ The tasks in the index are placed in the lower timer, which needs to synchronize the processing sequence of these rings. Otherwise, when the upper layer is placed in the lower layer, the lower layer has passed this position.

Another implementation is that there is only one timer. Go to the bottom layer first. When the bottom layer completes a circle, go to the upper layer and pass the cur of the upper layer_ The remaining time of the task in the index is determined. If there is no remaining time, it shall be executed immediately, and if there is any remaining time, it shall be placed at the bottom. If this layer also runs a circle, go to the next layer to pick up the task. By analogy, the implementation is more complex.

The implementation of go zero is a two-tier loop, which can maintain a bottom slot and a loop number circle. Although the number of scans per time is large, the implementation is simple, which can be referred to in the following source code.

Implementation of time wheel in go

Implementation of time wheel in Go language
High performance time wheel implementation fully compatible with golang timer
Implementation of time wheel in go zero

Comments on the source code of time wheel in go zero

The timeWheel in go zero is used to clean up the expiration of the internal cache of the program. The following concerns only the implementation of the time wheel.

// Time wheel structure
type TimingWheel struct {
        interval      time.Duration //Time interval of each slot of time wheel
        ticker        timex.Ticker //timer
        slots         []*list.List //Slot array
        timers        *SafeMap //A map to prevent memory leakage. What is it used to hold?
        tickedPos     int //Now point to the slot
        numSlots      int //Number of slots
        execute       Execute //Function executed in time
        // Externally provided methods interact with timeWheel through channel
        // The advantage of this interaction mode is that it can decouple the callers who don't care about the results, and there is no need to wait synchronously
        setChannel    chan timingEntry //Set interface
        moveChannel   chan baseEntry //Mobile interface
        removeChannel chan interface{} //Delete interface
        drainChannel  chan func(key, value interface{}) //
        stopChannel   chan lang.PlaceholderType //End interface
}

//Time wheel element structure
type timingEntry struct {
        baseEntry
        value   interface{}
        circle  int
        diff    int
        removed bool
}

//Base field
type baseEntry struct {
        delay time.Duration
        key   interface{}
}

//key to slot mapping and a timingEntry element pointer,
//It is used to quickly find elements in timers through key
type positionEntry struct {
        pos  int
        item *timingEntry
}

//Delayed task field
type timingTask struct {
        key   interface{}
        value interface{}
}



//Method definition
//The following methods are provided externally. Send signals through the channel, monitor each channel in the run method, and execute the corresponding methods after receiving the signals
//Perform all tasks immediately
func (tw *TimingWheel) Drain(fn func(key, value interface{})) 
//Update the delay time of the task before the task arrives. When the time comes, it will be executed directly.
func (tw *TimingWheel) MoveTimer(key interface{}, delay time.Duration)
//Remove a task
func (tw *TimingWheel) RemoveTimer(key interface{})
//Add a task
func (tw *TimingWheel) SetTimer(key, value interface{}, delay time.Duration)
//Stop time wheel
func (tw *TimingWheel) Stop()

//The following methods are the real implementation methods
func (tw *TimingWheel) drainAll(fn func(key, value interface{}))
//pos is represented by delay time / slot position. First calculate the next slot. Considering overflow and current slot position offset, the final pos = (tickedPos+d/interval)%numSlots
//circle = (d/interval-1)/numSlots
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int)
func (tw *TimingWheel) initSlots()
func (tw *TimingWheel) moveTask(task baseEntry)
func (tw *TimingWheel) onTick()
func (tw *TimingWheel) removeTask(key interface{})
func (tw *TimingWheel) removeTask(key interface{})
func (tw *TimingWheel) run()
func (tw *TimingWheel) runTasks(tasks []timingTask)
func (tw *TimingWheel) scanAndRunTasks(l *list.List)

// 
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
    *TimingWheel, error) {
    tw := &TimingWheel{
        interval:      interval,
        ticker:        ticker,
        slots:         make([]*list.List, numSlots),
        timers:        NewSafeMap(),
        tickedPos:     numSlots - 1, // at previous virtual circle
        execute:       execute,
        numSlots:      numSlots,
        setChannel:    make(chan timingEntry),
        moveChannel:   make(chan baseEntry),
        removeChannel: make(chan interface{}),
        drainChannel:  make(chan func(key, value interface{})),
        stopChannel:   make(chan lang.PlaceholderType),
    }

    tw.initSlots()
    go tw.run()

    return tw, nil
}

// Drain drains all items and executes them.
func (tw *TimingWheel) Drain(fn func(key, value interface{})) {
    tw.drainChannel <- fn
}

//The run method listens to all channel s and starts when newTimeWheel.
func (tw *TimingWheel) run() {
    for {
        select {
        case <-tw.ticker.Chan(): //When the time is up, execute the tasks to be executed on the corresponding slot.
            tw.onTick()
        case task := <-tw.setChannel: //Add a task to the time wheel
            tw.setTask(&task)
        case key := <-tw.removeChannel: //Delete a task from the time wheel
            tw.removeTask(key)
        case task := <-tw.moveChannel: //Update the execution time of a task on a time wheel
            tw.moveTask(task)
        case fn := <-tw.drainChannel:
            tw.drainAll(fn)
        case <-tw.stopChannel:
            tw.ticker.Stop()
            return
        }
    }
}

//Let's first look at the addition, that is, the setTask method
//Call:
//  run -> setTask
//Logic:
//  Determine from the map index whether the task already exists
//      If it exists, move the location of the task through moveTask
//      If it does not exist, calculate the location of the task in the ring relative to the current ticked and the number of turns to turn. Put the task on the ring and maintain the map index
func (tw *TimingWheel) setTask(task *timingEntry) {
    if task.delay < tw.interval {
        task.delay = tw.interval
    }

    if val, ok := tw.timers.Get(task.key); ok {
        entry := val.(*positionEntry)
        entry.item.value = task.value
        tw.moveTask(task.baseEntry)
    } else {
        pos, circle := tw.getPositionAndCircle(task.delay)
        task.circle = circle
        tw.slots[pos].PushBack(task)
        tw.setTimerPosition(pos, task)
    }
}


// After reading the addition, let's see how it is executed. Assuming that the slot where the task is located has been scanned,
// First maintain the scanned position, then take out the corresponding list from the slot and throw it into the scanAndRunTask method for execution.
func (tw *TimingWheel) onTick() {
    tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
    l := tw.slots[tw.tickedPos]
    tw.scanAndRunTasks(l)
}

// Mainly focus on the scanAndRunTask method, which is the operation that is really done after getting the list
//Logic:
//  Traverse the whole list, first clear the deleted tasks, and then turn the number of cycles of tasks with no cycles to - 1,
//  The remaining tasks are valid tasks with 0 turns. Considering that there is an update operation, the pos of the update operation will be delayed until it is really to be executed, so we need to check whether it is an updated operation according to diff.
//  Finally, the previous tasks are filtered out. The remaining tasks are the tasks to be executed by scan this time. Add them to the execution queue and execute them concurrently through the runTask method, which will control the number of concurrency.
func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
    var tasks []timingTask

    for e := l.Front(); e != nil; {
        task := e.Value.(*timingEntry)
        if task.removed {
            next := e.Next()
            l.Remove(e)
            e = next
            continue
        } else if task.circle > 0 {
            task.circle--
            e = e.Next()
            continue
        } else if task.diff > 0 {
            next := e.Next()
            l.Remove(e)
            // (tw.tickedPos+task.diff)%tw.numSlots
            // cannot be the same value of tw.tickedPos
            pos := (tw.tickedPos + task.diff) % tw.numSlots
            tw.slots[pos].PushBack(task)
            tw.setTimerPosition(pos, task)
            task.diff = 0
            e = next
            continue
        }

        tasks = append(tasks, timingTask{
            key:   task.key,
            value: task.value,
        })
        next := e.Next()
        l.Remove(e)
        tw.timers.Del(task.key)
        e = next
    }

    tw.runTasks(tasks)
}



// Update the delay time of existing tasks in timeWheel
// Call:
//    Run - > setTask - > moveTask determine in setTask that if there is this key, it will be moveTask
//    run -> moveTask
func (tw *TimingWheel) moveTask(task baseEntry) {
    val, ok := tw.timers.Get(task.key)
    if !ok {
        return
    }

    timer := val.(*positionEntry)
    //If the delay time set by the task is too small, execute it directly
    if task.delay < tw.interval {
        threading.GoSafe(func() {
            tw.execute(timer.item.key, timer.item.value)
        })
        return
    }

    // Before the time, the position needs to be changed, and the new location and circle are calculated according to the new delay time
    pos, circle := tw.getPositionAndCircle(task.delay)
    //According to the old data of pos and circle, modify the information of the task, make some marks, and then really modify and reposition the task when it is scanned.
    //The advantage of delaying these changes is that if some key s are changed frequently, frequent relocation operations are not required, and the relocation operation needs to ensure concurrency security.
    if pos >= timer.pos { 
        //If the new POS is greater than or equal to the old POS, update the circle of the task. The POS is still the old one. Instead, update the diff of the task to the new pos - the old pos. why?
        //Considering the scenario, trigger the tick first and then the move, because when the tick runs the task, the go goes out and the go out task is executing. At this time, if the move request comes, there will be concurrency problems
        //The diff flag recorded here has been changed. The pos update will be triggered the next time you run to this task.
        timer.item.circle = circle
        timer.item.diff = pos - timer.pos
    } else if circle > 0 {
        //pos is ahead of time. If it is not triggered in this circle, we have to calculate the diff offset and how many circles to go
        circle-- //Take off the circle of diff
        timer.item.circle = circle 
        //Calculate diff and add back the circle. Suppose the new pos is 1, the old pos is 2, num is 5, and diff is 4, which means 2 + 4 = 6, 6% 5 = 1. When calculating, it will be calculated as 1
        timer.item.diff = tw.numSlots + pos - timer.pos 
    } else {
        //pos is advanced, and this is the cycle. Delete the old and add a new one
        // Is concurrency not considered here? If this task is being performed, will there be a problem here?
        // It is reasonable to consider generating a new to continue execution even after execution.
        // Regardless of concurrency, the same key can only be executed once. If you want to consider it, it may be executed twice
        timer.item.removed = true
        newItem := &timingEntry{
            baseEntry: task,
            value:     timer.item.value,
        }
        tw.slots[pos].PushBack(newItem)
        tw.setTimerPosition(pos, newItem)
    }
}


//Finally, let's take a look at how to delete it. In fact, we just find the task through the index and mark the task as deleted.
//When looking at the scanAndRunTask method above, you have seen the logic of processing the deleted node
func (tw *TimingWheel) removeTask(key interface{}) {
    val, ok := tw.timers.Get(key)
    if !ok {
        return
    }

    timer := val.(*positionEntry)
    timer.item.removed = true
    tw.timers.Del(key)
}

Keywords: Go go-zero

Added by cavedave on Sat, 19 Feb 2022 17:38:08 +0200