Go concurrency control, anti breakdown - singleflight

background

In high concurrency scenarios, there are often concurrent accesses to the same resource. These concurrent request parameters are the same, and the response results are the same. If each request repeatedly queries the resource, it will undoubtedly bring unnecessary overhead to the system and increase the system pressure.

In order to protect resources, concurrent requests of the same resources can be intercepted. Only one request is allowed to query resources, and then the obtained resources are shared with other requests, so as to reduce the overhead of repeated queries, especially to solve the concurrency problem in cache breakdown.

programme

Go singleflight

Go provides a singleflight component: "golang.org/x/sync/singleflight" component, which can easily realize this function. The implementation is as follows:

var g = singleflight.Group{}

type Info struct {
	ID string
}

// handle request
func handle(id string) (Info, error) {
	val, err, _ := g.Do(id, func() (interface{}, error) {
		// Query info
		info, err := GetInfo(id)
		return info, err
	})
	if err != nil {
		return Info{}, err
	}
	return val.(Info), err
}

Impersonate client calls:

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 1000000000; i++ {
		wg.Add(1)
		go func(j int) { // Simulate concurrent query requests
			defer wg.Done()
			_, err := handle(strconv.Itoa(j % 1000))
			fmt.Println(err)
		}(i)
	}
	wg.Wait()
}

When a handle is called concurrently to request a resource with the same id, only the first request that arrives will get the right to execute GetInfo(id). When the request has not been processed, other requests will block in the "g.Do(...)" method. When the first request gets the result, it will wake up other blocking requests and share the result.

The core method Do(key string, fn func() (interface{}, error)) contains two parameters:

  • key: request ID. It is used to distinguish requests and identify requests with the same resources. The corresponding keys of multiple requests with the same resources must be the same.
  • fn func() (interface{}, error): method for obtaining resources. This method returns two parameters: the first is the query result and the second is the exception. The request to obtain the execution right will share not only the normal result, but also the exception. When error is returned, it will also be shared. Finally, this batch of concurrency will respond to error

Source code analysis

Remove some unnecessary code, and the core code can be simplified as follows:

// Each penetrating request is encapsulated from a call
type call struct {
	wg sync.WaitGroup
	val interface{}
	err error
}

type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil { // The Group attribute is private, so it was nil at first
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok { // ok is true, which means that there is a request penetration to execute, so the request will Wait
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err, true
	}
	// When the key does not request to penetrate the query, instantiate a call and put it into the global map
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}
// After executing the query, remove the corresponding key from the global map. Another key method is to execute c.wg Done(), notify other requests that the request ends, and you can take data out of the call.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	defer func() {
 		c.wg.Done()
		g.mu.Lock()
		defer g.mu.Unlock()
		delete(g.m, key)
	}()
	c.val, c.err = fn()
	normalReturn = true
}

The singleflight implementation is very simple:

  • The penetrating request is encapsulated as a call object, which is used to store the returned data and exceptions. The most important thing is that there is a sync Waitgroup wants to take advantage of its Wait() blocking feature
  • The Group contains a shared map: map[string]*call, which is used to save the key being penetrated and the corresponding request call. The first request will initialize a call and record it in the map.
  • Other requests with the same key get a call from the map and execute the Wait() method of WaitGroup, and the collaboration will block the wait
  • After the first request is processed and the result is returned, c.wg.exe will be executed Done(), indicating the end of the request, the current process and other processes can get the results from the call and return.

Best practices

When using singleflight, you need to define a key for similar requests. The keys of similar requests must be the same. Theoretically, you can use the same singleflight instance for different request types. You only need to define different keys for them, such as:

  • Get user information. The key format is: "user" + user ID
  • Query Department format: "department" + Department ID

However, in general, these are two types of query functions, which deal with two-dimensional problems. It is best to use different singleflight instances. In addition, query employee logic and concurrency control are to solve the two-dimensional problems, and the code should not be coupled. Here are two implementation methods of different scenarios.

Function mode

When the implementation of query logic uses the function method, we can consider using the closure method to add concurrency protection to the function, for example:

The function of querying users is:

func FindUser(db *gorm.DB, id int) (*User, error) {
	var u = &User{}
	return u, db.First(u, id).Error
}

If you directly modify FindUser and add concurrency control, it will violate a single responsibility. A better way is to add a new anti concurrency method to realize the same function parameters and return values:

var FindUserSingle = findUserSingleWrap(FindUser)

// Add anti breakdown protection
func findUserSingleWrap(fn func(db *gorm.DB, id int) (*User, error)) func(db *gorm.DB, id int) (*User, error) {
	single := &singleflight.Group{}
	return func(db *gorm.DB, id int) (*User, error) {
		u, err, _ := single.Do(strconv.Itoa(id), func() (interface{}, error) {
			return fn(db, id)
		})
		if err != nil {
			return nil, err
		}
		return u.(*User), nil
	}
}

After the findUserSingleWrap method is called, it will return a method to increase concurrency protection. The client uses:

 var u, err = FindUserSingle.FindUser(db, 11)

However, the findUserSingleWrap parameter and return value will be a little long. You can optimize it to define a type for the function:

type findUserFn func(db *gorm.DB, id int) (*User, error)

In this way, findUserSingleWrap can be simplified to:

// Add anti breakdown
func findUserSingleWrap(fn findUserFn) findUserFn {
	single := &singleflight.Group{}
	return func(db *gorm.DB, id int) (*User, error) {
		u, err, _ := single.Do(strconv.Itoa(id), func() (interface{}, error) {
			return fn(db, id)
		})
		if err != nil {
			return nil, err
		}
		return u.(*User), nil
	}
}

Class method

If the FindUser implementation is a class method:

type UserFinder struct {
	db *gorm.DB
}
func (r *UserFinder) FindUser(id int) (*User, error) {
	var u = &User{}
	return u, r.db.First(u, id).Error
}
func NewUserFinder(db *gorm.DB) *UserFinder {
	return &UserFinder{
		db: db
	}
}

Consider adding a class that implements the same interface:

type UserFinder interface {
	FindUser(id int) (*User, error)
}

type FindUserSingle struct {
	single *singleflight.Group
	finder UserFinder
}

func (r *UserFinder) FindUser(id int) (*User, error) {
	ret, err, _:=r.s.Do(key, func() (interface{}, error) {
		return r.finder.FindUser(id)
	})
	if err != nil {
		return u, err
	}
	return ret.(*User), nil
}

func NewFindUserSingle(finder UserFinder) *FindUserSingle {
	return &FindUserSingle{
		single: &singleflight.Group{},
		finder: finder,
	}
}

client:

userFinder := NewFindUserSingle(NewUserFinder(db))
u, err = userFinder.FindUser(11)

My blog: https://itart.cn/blogs/2022/explore/go-singleflight.html

Keywords: Go Concurrent Programming Cache

Added by system_critical on Thu, 24 Feb 2022 16:51:56 +0200