Go language Bible - Chapter 9 concurrency based on shared variables - 9.7 example: concurrent non blocking cache

Chapter 9 concurrency based on shared variables

In the previous chapter, we used direct and natural methods such as goroutine and channel to implement concurrency. Sometimes they have some problems

In this chapter, we will introduce the concurrency mechanism in more detail, especially the sharing of variables between goroutines. We will also introduce some technical differences between goroutines and operating system threads

9.7 example: concurrent non blocking cache

In this section, we will build a non blocking cache. We often need to build a cache function when writing concurrent programs, but there is no ready-made way to help us solve it. In other words, we need to save the calculation results of a function. When using this value, we can use it directly instead of recalculating it again. Now we need to avoid locking the whole cache, so that other gorotine s compete for a lock in order to use it

Let's take a look at the following program

func httpGetBody(url string) (interface{},error) {
	resp,err := http.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	return ioutil.ReadAll(resp.Body)
}

We want to get a body to access the link, and the final return calls ioutil Readall, so that the return value type of httpGetbody can be satisfied. Our return type

Now let's talk about a cache

type Memo struct {
	f Func
	cache map[string]result
}

type Func func(key string) (interface{},error)

type result struct {
	value interface{}
	err error
}

func New(f Func) *Memo {
	return &Memo{f:f, cache: make(map[string]result)}
}

func (memo Memo) Get(key string) (interface{},error) {
	res,ok := memo.cache[key]
	if !ok {
		res.value,res.err = memo.f(key)
		memo.cache[key]= res
	}
	return res.value,res.err
}

The Memo instance will record the function f that needs to be cached and the cache content. Each result is a value pair returned by the function - a value and an error value. Let's take another look at the deformation of Memo

 m := memo.New(httpGetBody)
 for url := range incomingURLs() {
 	start := time.Now()
 	value,err := m.Get(url)
 	if err != nil {
 		log.Print(rr)
	}
	fmt.Printf("%s,%s,%d bytes\n",
 	url,time.Sicne(start),len(value,([]byte)))
}

For each element of each incoming URL, we will call Get and print a log of the call delay and the size of the data returned

We can use the test package to systematically identify the effect of cache. From the test output below, we can see that the URL stream contains some duplicates. For the first time, we make (* Memo) for each URL The call to Get takes hundreds of milliseconds, but it only takes 1 millisecond to return complete data

$ go test -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s

This test does all the calls in sequence

After the previous study, we know that this independent http request can be well concurrent. Now let's change it and use sync Waitgroup to wait for all requests to be completed before returning

m := memo.New(httpGetBody)
	var n sync.WaitGroup
	for url := range imcomingURLs() {
		n.Add(1)
		go func(url string) {
			start := time.Now()
			value,err := m.Get(url)
			if err != nil {
				log.Print(err)
			}
			fmt.Printf("%s,%s,%d bytes\n",
			url, time.Since(start),len(value.([]byte)))
			n.Done()
		}(url)
	}
	n.Wait()

Now test this program, it will run faster, but it will not run correctly every time (occasional bug s). There are some unexpected cache miss es, or hit the cache but returned the wrong value, or even crash directly. So we use the flag - race to run the program, and the competitive eraser will help us print the report

$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...
Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s

memo.go line 32 appears twice, indicating that two goroutine s have updated the cache map without synchronization intervention, which indicates that Get is not concurrent safe and there is data competition

func (memo Memo) Get(key string) (interface{},error) {
	res,ok := memo.cache[key]
	if !ok {
		res.value,res.err = memo.f(key)
		memo.cache[key]= res
	}
	return res.value,res.err
}

The simplest way to make cache concurrency safe is to use monitoring based synchronization, add Mutex to memo, obtain the Mutex at the beginning of Get, and release it when return, so that the cache operation can occur in the re critical area

type Memo struct {
	f Func
	mu sync.Mutex
	cache map[string]result
}

func (memo Memo) Get(key string) (interface{},error) {
	memo.mu.Lock()
	res,ok := memo.cache[key]
	if !ok {
		res.value,res.err = memo.f(key)
		memo.cache[key]= res
	}
	memo.mu.Unlock()
	return res.value,res.err
}

Now the test is still conducted concurrently, but the competition checker is "silent", but the locking operation makes the I/O operations that Get could have run in parallel serialized, which is not our goal

In the following Get implementation, the goroutine calling Get will obtain the lock twice: it will be obtained once in the search stage. If the search does not return any content, it will be obtained again in the update stage. In the intermediate stage of obtaining the lock twice, other goroutines can use the cache at will

func (memo Memo) Get(key string) (interface{},error) {
	memo.mu.Lock()
	res,ok := memo.cache[key]
	memo.mu.Unlock()
	if !ok {
		res.value,res.err = memo.f(key)

		memo.mu.Lock()
		memo.cache[key] = res
		memo.mu.Unlock()
	}
	return res.value,res.err
}

The above modifications will improve the performance of the code again, but some URLs are obtained twice. In this case, more than two goroutines call Get to request the same URL at the same time. Multiple goroutines query the cache together, find that there is no value, and then call f together. After the result is obtained, the map will also be updated, and one result will overwrite the other

In fact, the above work is repetitive. Ideally, it is of course to avoid this kind of work. They are called duplicate suppression. In the following version of Memo, each map element is a pointer to an item, and each item contains the content cache of the function f call result. Unlike before, this entry also contains a ready channel, After the result of the entry is set, the channel will be closed to broadcast to other goroutine s. It is safe to read the result in the entry

type entry struct {
	res result
	ready chan struct{}
}

func New(f Func) *Memo {
	return &Memo{f:f,cache:make(map[string]*entry)}
}

type Memo struct {
	f Func
	mu sync.Mutex
	cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{},err error) {
	memo.mu.Lock()
	e := memo.cache[key]
	if e == nil {
		e = &entry{ready:make(chan struct{})}
		memo.cache[key] = e
		memo.mu.Unlock()

		e.res.value,e.res.err = memo.f(key)

		close(e.ready)
	}else {
		memo.mu.Unlock()
		<- e.ready
	}
	return e.res.value,e.res.err
}

Now the go function contains the following steps:

Obtain a mutex to protect the shared variable cache map; Query whether the specified entry exists in the map. If not, allocate space to insert a new entry and release the mutex. If it exists but the writing is not completed, goroutine must wait until it is ready to read the entry result. If you want to know whether it is ready, you can read it directly from the ready channel, because this read operation is blocked until the channel is closed

In the above no case, you need to insert an entry that is not ready into the map, and the goroutine currently being called needs to be responsible for calling slow functions, updating entries, and broadcasting the message that the entries are ready to be read to all other goroutines

The r.res.value and e.res.err variables in the entry are shared among multiple goroutines. The goroutine that creates the entry will also set the value of the entry. Other goroutines will read the value of the entry immediately after receiving the "ready" broadcast message. Although it will be accessed by multiple goroutines at the same time, mutual exclusion is not required. The closing of the ready channel must occur before other goroutines receive broadcast events. Therefore, the write operation of the first goroutine to these variables must occur before these read operations. No data competition

In this way, the concurrent, non repetitive and non blocking cache is completed

The above Memo implementation uses a mutex to protect the shared map variable when multiple goroutines call get. It is advisable to compare this design with the previously mentioned scheme of limiting the map variable to a single monitor goroutine, which needs to send a message when calling get

The statements of Func, result and entry are the same as before

type Func func(key string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

type entry struct {
	res   result
	ready chan struct{}
}

However, the Memo type contains a channel called requests, which is used by the caller of Get to communicate with the monitor channel. The element type in requests channel is request. The Get caller will fill in both groups of key s in this structure. In fact, these two variables are used to cache the function. Another channel called response will be used to send response results, and this channel will only return a separate value

type request struct {
	key      string
	response chan<- result
}
type Memo struct {
	requests chan request
}

func New(f Func) *Memo {
	memo := &Memo{requests: make(chan request)}
	go memo.server(f)
	return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
	response := make(chan result)
	memo.requests <- request{key, response}
	res := <-response
	return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

The above Get method will create a response channel, put it into the request structure, send it to monitor goroutine, and then accept it immediately

Cache variables are limited to monitor, (* Memo) server, you will see below. The monitor will read the request in the loop until the request channel is closed by the Close method. Each request will query the cache. If no entry is found, a new entry will be created or inserted

func (memo *Memo) server(f Func) {
	cache := make(map[string]*entry)
	for req := range memo.requests {
		e := cache[req.key]
		if e == nil {
			e = &entry{ready: make(chan struct{})}
			cache[req.key] = e
			go e.call(f, req.key)
		}
		go e.deliver(req.response)
	}
}
func (e *entry)deliver(response chan <- result) {
	<- e.ready

	response <- e.res
}

Similar to the mutex based version, the first request for a key needs to call f and pass in the key, store the result in the entry, and close the ready channel to broadcast the ready information of the entry. Yes (* entry) Call to complete the above work

The request for the same key will wait for the result to become ready, and send the result from the response to the goroutine of the client. The above work is done using (* entry) deliver to complete. The call to the delivery method of call must be made in its own goroutines to ensure that monitor goroutines will not be blocked and unable to process new requests

This example shows that it is feasible to establish concurrent programs by locking or communication

Both of the above methods are OK, but the reader can choose according to the specific situation (go prefers concurrent communication)

Keywords: Go Concurrent Programming Cache Channel

Added by ManOnScooter on Sat, 22 Jan 2022 11:09:47 +0200