WaitGroup
preface
- WaitGroup is a concurrency control technology often used in the process of Golang application development
- WaitGroup
- It can be understood as wait goroutine group
- That is, wait for a group of goroutine s to end
- For example, a goroutine needs to wait for other goroutines to complete
- Then it can be easily implemented using WaitGroup
- The following program shows an example of a goroutine waiting for the other two goroutines to finish:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// Set the counter. The value is the number of goroutine s
wg.Add(2)
go func() {
// Do some work
time.Sleep(1 * time.Second)
fmt.Println("Goroutine 1 finished!")
// After goroutine execution, the counter is decremented by one
wg.Done()
}()
go func() {
// Do some work
time.Sleep(2 * time.Second)
fmt.Println("Goroutine 2 finished!")
wg.Done()
}()
// The main goroutine blocking wait counter changes to 0
wg.Wait()
fmt.Printf("All Goroutine finished!")
}
- Simply put
- A counter is wg internally maintained in the above program:
- Before starting goroutine, set the counter to the number of goroutines to be started through Add(2)
- After starting goroutine, use the Wait() method to block yourself and wait for the counter to change to 0
- At the end of each goroutine execution, the counter is decremented by 1 through the Done() method
- After the counter becomes 0, the blocked goroutine is awakened
- In fact, WaitGroup can also implement one group of goroutines waiting for another group of goroutines
- It's kind of like acrobatics
- Very error tolerant
- This is especially true if you do not understand its implementation principle
- In fact, the implementation source code of WaitGroup is very simple
Basic knowledge
Semaphore
- Semaphore is a mechanism provided by Unix system to protect shared resources
- Used to prevent multiple threads from accessing a resource at the same time
- It can be simply understood that the semaphore is a value:
- When semaphore > 0
- Indicates that resources are available, and the system automatically reduces the semaphore by 1 when acquiring the semaphore
- When semaphore = = 0
- Indicates that the resource is temporarily unavailable. When obtaining the semaphore, the current thread will go to sleep
- Wakes up when the semaphore is positive
- Because semaphores are also used in WaitGroup implementation
- Here is a brief introduction
WaitGroup
data structure
- state1 is an array of length 3
- It contains state and a semaphore
- state is actually two counters:
- Counter: the goroutine counter that has not been executed yet
- waiter count: the number of goroutines waiting for the goroutine group to end
- That is, how many people are waiting
- Semaphore: semaphore
- Consider whether the bytes are aligned
- The three appear in different positions
- for simplicity
- By byte aligned
- The locations of the three in memory are as follows:
- WaitGroup provides three external interfaces:
- Add(delta int): add the delta value to the counter
- Wait(): wait is incremented by 1
- And block the waiting semaphore semaphore
- Done(): counter decrements by 1
- Release the semaphore for corresponding times according to the waiter value
- The implementation details of these three functions are described below
Add(delta int)
- Add() does two things
- First, add the delta value to the counter
- Because delta can be negative
- In other words, counter may become 0 or negative
- So the second thing is when the counter value becomes 0
- Release the same amount of semaphore according to the waiter value
- Wake up all the waiting goroutine s
- If counter becomes negative
- Add() pseudo code is as follows:
func (wg *WaitGroup) Add(delta int) {
//Get state and semaphore address pointers
statep, semap := wg.state()
//Shift delta left by 32 bits and add it to state, that is, add it to counter
state := atomic.AddUint64(statep, uint64(delta)<<32)
//Get counter value
v := int32(state >> 32)
//Get waiter value
w := uint32(state)
//After accumulation, the counter value becomes a negative value, panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//After accumulation, at this time, counter > = 0
//If the counter is positive, it means there is no need to release the semaphore and exit directly
//If the waiter is zero, it means that there is no waiting person and there is no need to release the semaphore. Exit directly
if v > 0 || w == 0 {
return
}
//At this time, the counter must be equal to 0, and the water must be greater than 0 (the internal maintenance of water will not be less than 0),
//First set the counter to 0, and then release the semaphore of the number of waiter s
*statep = 0
for ; w != 0; w-- {
//Release the semaphore, release one at a time, and wake up one waiting person
runtime_Semrelease(semap, false)
}
}
wait()
- The Wait() method also does two things
- One is to accumulate waiter
- The second is blocking waiting semaphore
func (wg *WaitGroup) Wait() {
statep, semap := wg.state() //Get state and semaphore address pointers
for {
state := atomic.LoadUint64(statep) //Get state value
v := int32(state >> 32) //Get counter value
w := uint32(state) //Get waiter value
if v == 0 { //If the counter value is 0, it means that all goroutine s have exited. There is no need to wait and return directly
return
}
// Use CAS (comparison and exchange algorithm) to accumulate waiter s. Accumulation may fail. After failure, try again through for loop
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap) //After successful accumulation, wait for the semaphore to wake you up
return
}
}
}
- CAS algorithm is used here
- When multiple goroutine s are guaranteed to execute Wait() at the same time
- It can also correctly accumulate waiter s
Done()
- Done() does only one thing
- That is, subtract counter by 1
- We know that Add() can accept negative values
- So Done actually just called Add(-1)
- The execution logic of Done() goes to Add()
- In fact, it is the last goroutine that wakes up the waiting person
Programming Tips
- Add() operation must be earlier than Wait()
- The value set by Add() must be consistent with the number of goroutine s actually waiting