GroupCache source code analysis (2): singleflight
(1) Overview:
There is a problem in the group cache distributed cache: if at a certain moment, the tail node is eliminated, and then a large number of data requesting the original tail node suddenly pour in, which may cause cache breakdown.
Cache avalanche: all caches fail at the same time, resulting in large instantaneous DB requests and sudden pressure increase, resulting in avalanche. Cache avalanches are usually caused by cache server downtime and cache key s with the same expiration time.
Cache breakdown: for an existing key, when the cache expires, there are a large number of requests at the same time. These requests will breakdown into the DB, resulting in a large number of instantaneous DB requests and a sudden increase in pressure.
Cache penetration: query a nonexistent data. If it does not exist, it will not be written to the cache. Therefore, it will request the DB every time. If the instantaneous traffic is too large, it will penetrate into the DB, resulting in downtime.
Therefore, single flight is used to solve this problem. Its core functions are:
(2) Source code:
// call is an in-flight or completed Do call // call is the data structure of the request, and WaitGroup is used to realize the waiting function type call struct { wg sync.WaitGroup val interface{} err error }
Take a look at the use of sync.WaitGroup:
Wait group method
Method name | function |
---|---|
(wg * WaitGroup) Add(delta int) | The wait group counter is incremented by 1 |
(wg * WaitGroup) Done() | Wait for the group counter to decrease by one |
(wg * WaitGroup) Wait() | When the wait group counter is not equal to 0, it blocks until it becomes 0. |
// Group represents a class of work and forms a namespace in which // units of work can be executed with duplicate suppression. // Group is the master data structure of singleflight, which can not manage key call s // Using mu can suppress repeated requests from being executed type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized }
Take a look at the use of sync.Mutex:
The following mutex introduction comes from the author of the brief book: junhow520
sync.Mutex is a mutually exclusive lock that can be locked and unlocked by different goroutine s.
The Go standard library provides the sync.Mutex mutex type and two methods, lock and unlock. You can invoke the Lock method before the code, call the Unlock method after the code to ensure the mutex execution of a piece of code, or use the defer statement to ensure that the mutex will be unlocked. When a goroutine calls the lock method to obtain a lock, other requested goroutines will block the lock method until the lock is released.
A mutex can only be locked by one goroutine at the same time, and other goroutines will block until the mutex is unlocked, that is, re compete for the lock of the mutex. It should be noted that a runtime error will occur when unlocking an unlocked mutex.
sync.Mutex does not distinguish between read and write locks. Blocking occurs only between Lock() and Lock(). If lock () is used in one place and lock () is not used in another place, but the shared data is directly modified or accessed, it is allowed for sync.Mutext type, because mutex will not be associated with goroutine. To distinguish between read and write locks, use the sync.RWMutex type.
The code segment between Lock() and Unlock() becomes a critical section of resources. The code in this section is strictly protected by Lock() and thread safe. Only one goroutine can execute the code in this section at any time point.
Finally, the core implementation of singleflight, Do function:
// Do executes and returns the results of the given function, making // sure that only one execution is in-flight for a given key at a // time. If a duplicate comes in, the duplicate caller waits for the // original to complete and receives the same results. // In short, the function of Do is to enable only one request to be executed and the same result to be returned no matter how many requests come in at the same time // The parameter fn is a function called when a node selects a value func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // Because the probability of each process being selected is not certain, if the selected request comes in, lock it // Then the same requests that come in at the same time can no longer be accessed g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } // Skip this if first, and then come back to the content of if after reading the following // The second request comes in. If the request corresponding to the key is in progress: if c, ok := g.m[key]; ok { // Unlock request g.mu.Unlock() // When the wait group counter is not equal to 0, it is blocked until it becomes 0, because the first request may not be returned by the fn function // The corresponding value, so wait for it to complete, then unlock, and then return the value c.wg.Wait() return c.val, c.err } c := new(call) // Request incomplete counter + 1 c.wg.Add(1) // Tag request in progress g.m[key] = c // Unlock g.mu.Unlock() // Call fn function to get val c.val, c.err = fn() // The call request ends and the counter is decremented by one c.wg.Done() // Locking ensures that the key is deleted only once g.mu.Lock() delete(g.m, key) g.mu.Unlock() return c.val, c.err }
That's all for singleflight.