Go token bucket based current limiter
brief introduction
If the general flow is too large and the downstream system cannot respond, it is necessary to limit the flow at this time. In fact, it is the same as going on the subway, which is to slow down the upstream access to the downstream.
Limit the frequency or frequency of accessing the service to prevent service overload, brush explosion, etc.
In the official extension package time of Golang (Golang. Org / X / time / rate), a current limiter implementation based on token bucket is provided.
Principle overview
- Token: each time you get a token, you can access it
- Bucket. The maximum capacity of the bucket is fixed. Add tokens to the bucket at a fixed frequency until it is full
- Each request consumes one token.
- When the current limiter is initialized, the token bucket is generally full.
Specific use
package limiter import ( "fmt" "testing" "time" "golang.org/x/time/rate" ) func TestLimter(t *testing.T) { limiter := rate.NewLimiter(rate.Every(time.Millisecond*31), 2) //time.Sleep(time.Second) for i := 0; i < 10; i++ { var ok bool if limiter.Allow() { ok = true } time.Sleep(time.Millisecond * 20) fmt.Println(ok, limiter.Burst()) } }
Execution results:
=== RUN TestLimter true 2 true 2 true 2 false 2 true 2 true 2 false 2 true 2 true 2 false 2 --- PASS: TestLimter (0.21s)
It can be seen from the execution result that two token buckets are full at first. Since the token interval is 11ms (31-20) more than the requested interval, every two requests will fail.
Specific implementation principle
Let's first look at the creation method of the lower limit flow: newlimit
func NewLimiter(r Limit, b int) *Limiter { return &Limiter{ limit: r, burst: b, } }
View current Limiter data structure
// The methods AllowN, ReserveN, and WaitN consume n tokens. type Limiter struct { mu sync.Mutex limit Limit burst int tokens float64 // last is the last time the limiter's tokens field was updated last time.Time // lastEvent is the latest time of a rate-limited event (past or future) lastEvent time.Time }
- burst indicates the size of the bucket
- limit indicates the frequency of putting into the bucket
- Tokens indicates the number of remaining tokens
- last time the token was taken
- lastEvent time of the latest current limiting event
When the token bucket is issued, it will be retained in the Reservation object, which is defined as follows. The Reservation object describes the number of tokens that can be obtained after reaching the timeToAct time.
type Reservation struct { ok bool // Whether the conditions are met and assigned to tokens lim *Limiter // Current limiter for sending token tokens int // Number of tokens timeToAct time.Time // Time to satisfy token issuance limit Limit // Token issuing speed }
How does the current limiter limit current
The officially provided current limiter includes blocking waiting, direct judgment, maintenance reservation, etc. How to implement current limiting code is in reserveN.
When using, the Allow() method is called every time
// Allow is shorthand for AllowN(time.Now(), 1). func (lim *Limiter) Allow() bool { return lim.AllowN(time.Now(), 1) } // AllowN reports whether n events may happen at time now. // Use this method if you intend to drop / skip events that exceed the rate limit. // Otherwise use Reserve or Wait. func (lim *Limiter) AllowN(now time.Time, n int) bool { return lim.reserveN(now, n, 0).ok }
Continue to view the reserverN algorithm
Method description:
- Three parameters: now, n, maxFutureReserve
- You need to get n tokens at the now time. The maximum waiting time is maxFutureReserve
- The result will return an object Reservation for the reserved token
// maxFutureReserve specifies the maximum reservation wait duration allowed. // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() // First, judge whether the insertion frequency is infinite. If it is infinite, it means that the current is not limited temporarily if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } } // The number of tokens that can be obtained when the deadline is now obtained. The last time the token was taken is last now, last, tokens := lim.advance(now) // Calculate the remaining number of tokens resulting from the request. // Update the number of tokens and remove those that need to be taken away tokens -= float64(n) // Calculate the wait duration // If the number of tokens is negative, it indicates that you need to wait. Calculate the waiting time WaitDuration var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } // Decide result // Whether the calculation meets the allocation requirements // 1. The size to be allocated shall not exceed the barrel capacity // 2. The waiting time shall not exceed the set waiting time ok := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation // Finally, construct a recovery object r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } // Update state // The current limit value needs to be updated if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last } lim.mu.Unlock() return r }
In terms of implementation, the limiter does not update the number of current buckets at regular intervals, but records the number of tokens in the last access and the current bucket. When accessing again, it calculates the number of current tokens through the last access time to determine whether tokens can be issued.
reference material
- Semaphore based current limiter: https://github.com/golang/net/blob/master/netutil/listen.go
- Didi open source a current limiter middleware for http requests: https://github.com/didip/tollbooth
- Uber's open source algorithm based on vulnerability fails a current limiter: https://github.com/uber-go/ratelimit Go realization fuse
- https://blog.csdn.net/weixin_46825193/article/details/107032136 -https://mp.weixin.qq.com/s?__biz=MzA3MjgwNTQ3OA%3D%3D&chksm=9f19ff27a86e76319e144c77418bdc64bf36abe9aa59edc0bf7c058a6c0003efbf096c4cc049&idx=1&mid=2247483950&scene=21&sn=46e30a8383d50a2fb76347076e415ade#wechat_redirect
- https://www.cnblogs.com/Me1onRind/p/13191506.html
- https://segmentfault.com/a/1190000023033365