Chapter 9 concurrency based on shared variables
In the previous chapter, we used direct and natural methods such as goroutine and channel to implement concurrency. Sometimes they have some problems
In this chapter, we will introduce the concurrency mechanism in more detail, especially the sharing of variables between goroutines. We will also introduce some technical differences between goroutines and operating system threads
9.7 example: concurrent non blocking cache
In this section, we will build a non blocking cache. We often need to build a cache function when writing concurrent programs, but there is no ready-made way to help us solve it. In other words, we need to save the calculation results of a function. When using this value, we can use it directly instead of recalculating it again. Now we need to avoid locking the whole cache, so that other gorotine s compete for a lock in order to use it
Let's take a look at the following program
func httpGetBody(url string) (interface{},error) { resp,err := http.Get(url) if err != nil { return nil, err } defer resp.Body.Close() return ioutil.ReadAll(resp.Body) }
We want to get a body to access the link, and the final return calls ioutil Readall, so that the return value type of httpGetbody can be satisfied. Our return type
Now let's talk about a cache
type Memo struct { f Func cache map[string]result } type Func func(key string) (interface{},error) type result struct { value interface{} err error } func New(f Func) *Memo { return &Memo{f:f, cache: make(map[string]result)} } func (memo Memo) Get(key string) (interface{},error) { res,ok := memo.cache[key] if !ok { res.value,res.err = memo.f(key) memo.cache[key]= res } return res.value,res.err }
The Memo instance will record the function f that needs to be cached and the cache content. Each result is a value pair returned by the function - a value and an error value. Let's take another look at the deformation of Memo
m := memo.New(httpGetBody) for url := range incomingURLs() { start := time.Now() value,err := m.Get(url) if err != nil { log.Print(rr) } fmt.Printf("%s,%s,%d bytes\n", url,time.Sicne(start),len(value,([]byte))) }
For each element of each incoming URL, we will call Get and print a log of the call delay and the size of the data returned
We can use the test package to systematically identify the effect of cache. From the test output below, we can see that the URL stream contains some duplicates. For the first time, we make (* Memo) for each URL The call to Get takes hundreds of milliseconds, but it only takes 1 millisecond to return complete data
$ go test -v gopl.io/ch9/memo1 === RUN Test https://golang.org, 175.026418ms, 7537 bytes https://godoc.org, 172.686825ms, 6878 bytes https://play.golang.org, 115.762377ms, 5767 bytes http://gopl.io, 749.887242ms, 2856 bytes https://golang.org, 721ns, 7537 bytes https://godoc.org, 152ns, 6878 bytes https://play.golang.org, 205ns, 5767 bytes http://gopl.io, 326ns, 2856 bytes --- PASS: Test (1.21s) PASS ok gopl.io/ch9/memo1 1.257s
This test does all the calls in sequence
After the previous study, we know that this independent http request can be well concurrent. Now let's change it and use sync Waitgroup to wait for all requests to be completed before returning
m := memo.New(httpGetBody) var n sync.WaitGroup for url := range imcomingURLs() { n.Add(1) go func(url string) { start := time.Now() value,err := m.Get(url) if err != nil { log.Print(err) } fmt.Printf("%s,%s,%d bytes\n", url, time.Since(start),len(value.([]byte))) n.Done() }(url) } n.Wait()
Now test this program, it will run faster, but it will not run correctly every time (occasional bug s). There are some unexpected cache miss es, or hit the cache but returned the wrong value, or even crash directly. So we use the flag - race to run the program, and the competitive eraser will help us print the report
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1 === RUN TestConcurrent ... WARNING: DATA RACE Write by goroutine 36: runtime.mapassign1() ~/go/src/runtime/hashmap.go:411 +0x0 gopl.io/ch9/memo1.(*Memo).Get() ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205 ... Previous write by goroutine 35: runtime.mapassign1() ~/go/src/runtime/hashmap.go:411 +0x0 gopl.io/ch9/memo1.(*Memo).Get() ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205 ... Found 1 data race(s) FAIL gopl.io/ch9/memo1 2.393s
memo.go line 32 appears twice, indicating that two goroutine s have updated the cache map without synchronization intervention, which indicates that Get is not concurrent safe and there is data competition
func (memo Memo) Get(key string) (interface{},error) { res,ok := memo.cache[key] if !ok { res.value,res.err = memo.f(key) memo.cache[key]= res } return res.value,res.err }
The simplest way to make cache concurrency safe is to use monitoring based synchronization, add Mutex to memo, obtain the Mutex at the beginning of Get, and release it when return, so that the cache operation can occur in the re critical area
type Memo struct { f Func mu sync.Mutex cache map[string]result } func (memo Memo) Get(key string) (interface{},error) { memo.mu.Lock() res,ok := memo.cache[key] if !ok { res.value,res.err = memo.f(key) memo.cache[key]= res } memo.mu.Unlock() return res.value,res.err }
Now the test is still conducted concurrently, but the competition checker is "silent", but the locking operation makes the I/O operations that Get could have run in parallel serialized, which is not our goal
In the following Get implementation, the goroutine calling Get will obtain the lock twice: it will be obtained once in the search stage. If the search does not return any content, it will be obtained again in the update stage. In the intermediate stage of obtaining the lock twice, other goroutines can use the cache at will
func (memo Memo) Get(key string) (interface{},error) { memo.mu.Lock() res,ok := memo.cache[key] memo.mu.Unlock() if !ok { res.value,res.err = memo.f(key) memo.mu.Lock() memo.cache[key] = res memo.mu.Unlock() } return res.value,res.err }
The above modifications will improve the performance of the code again, but some URLs are obtained twice. In this case, more than two goroutines call Get to request the same URL at the same time. Multiple goroutines query the cache together, find that there is no value, and then call f together. After the result is obtained, the map will also be updated, and one result will overwrite the other
In fact, the above work is repetitive. Ideally, it is of course to avoid this kind of work. They are called duplicate suppression. In the following version of Memo, each map element is a pointer to an item, and each item contains the content cache of the function f call result. Unlike before, this entry also contains a ready channel, After the result of the entry is set, the channel will be closed to broadcast to other goroutine s. It is safe to read the result in the entry
type entry struct { res result ready chan struct{} } func New(f Func) *Memo { return &Memo{f:f,cache:make(map[string]*entry)} } type Memo struct { f Func mu sync.Mutex cache map[string]*entry } func (memo *Memo) Get(key string) (value interface{},err error) { memo.mu.Lock() e := memo.cache[key] if e == nil { e = &entry{ready:make(chan struct{})} memo.cache[key] = e memo.mu.Unlock() e.res.value,e.res.err = memo.f(key) close(e.ready) }else { memo.mu.Unlock() <- e.ready } return e.res.value,e.res.err }
Now the go function contains the following steps:
Obtain a mutex to protect the shared variable cache map; Query whether the specified entry exists in the map. If not, allocate space to insert a new entry and release the mutex. If it exists but the writing is not completed, goroutine must wait until it is ready to read the entry result. If you want to know whether it is ready, you can read it directly from the ready channel, because this read operation is blocked until the channel is closed
In the above no case, you need to insert an entry that is not ready into the map, and the goroutine currently being called needs to be responsible for calling slow functions, updating entries, and broadcasting the message that the entries are ready to be read to all other goroutines
The r.res.value and e.res.err variables in the entry are shared among multiple goroutines. The goroutine that creates the entry will also set the value of the entry. Other goroutines will read the value of the entry immediately after receiving the "ready" broadcast message. Although it will be accessed by multiple goroutines at the same time, mutual exclusion is not required. The closing of the ready channel must occur before other goroutines receive broadcast events. Therefore, the write operation of the first goroutine to these variables must occur before these read operations. No data competition
In this way, the concurrent, non repetitive and non blocking cache is completed
The above Memo implementation uses a mutex to protect the shared map variable when multiple goroutines call get. It is advisable to compare this design with the previously mentioned scheme of limiting the map variable to a single monitor goroutine, which needs to send a message when calling get
The statements of Func, result and entry are the same as before
type Func func(key string) (interface{}, error) type result struct { value interface{} err error } type entry struct { res result ready chan struct{} }
However, the Memo type contains a channel called requests, which is used by the caller of Get to communicate with the monitor channel. The element type in requests channel is request. The Get caller will fill in both groups of key s in this structure. In fact, these two variables are used to cache the function. Another channel called response will be used to send response results, and this channel will only return a separate value
type request struct { key string response chan<- result } type Memo struct { requests chan request } func New(f Func) *Memo { memo := &Memo{requests: make(chan request)} go memo.server(f) return memo } func (memo *Memo) Get(key string) (interface{}, error) { response := make(chan result) memo.requests <- request{key, response} res := <-response return res.value, res.err } func (memo *Memo) Close() { close(memo.requests) }
The above Get method will create a response channel, put it into the request structure, send it to monitor goroutine, and then accept it immediately
Cache variables are limited to monitor, (* Memo) server, you will see below. The monitor will read the request in the loop until the request channel is closed by the Close method. Each request will query the cache. If no entry is found, a new entry will be created or inserted
func (memo *Memo) server(f Func) { cache := make(map[string]*entry) for req := range memo.requests { e := cache[req.key] if e == nil { e = &entry{ready: make(chan struct{})} cache[req.key] = e go e.call(f, req.key) } go e.deliver(req.response) } } func (e *entry)deliver(response chan <- result) { <- e.ready response <- e.res }
Similar to the mutex based version, the first request for a key needs to call f and pass in the key, store the result in the entry, and close the ready channel to broadcast the ready information of the entry. Yes (* entry) Call to complete the above work
The request for the same key will wait for the result to become ready, and send the result from the response to the goroutine of the client. The above work is done using (* entry) deliver to complete. The call to the delivery method of call must be made in its own goroutines to ensure that monitor goroutines will not be blocked and unable to process new requests
This example shows that it is feasible to establish concurrent programs by locking or communication
Both of the above methods are OK, but the reader can choose according to the specific situation (go prefers concurrent communication)