[Go advanced concurrent programming] WaitGroup

WaitGroup is a concurrency control technology often used in the development process. It is used to control waiting for the end of a group of goroutine s in the program.

Implementation principle

data structure

The data structure of WaitGroup includes a noCopy auxiliary field and an array of state1 records the state of WaitGroup:

  • Auxiliary fields of noCopy;
  • state1, a field with compound meaning, contains the count of WaitGroup, the number of waiter s blocked at checkpoints and semaphores.
type WaitGroup struct {
    // A trick to avoid copying is to tell vet tools that they have violated the rules of copying
    noCopy noCopy
    // The value of the first 64 bits (8 bytes) is divided into two segments. The high 32 bits is the count value and the low 32 bits is the count of the waiter
    // The other 32 bits are used as semaphores
    // Because the atomic operation of 64bit value requires 64bit alignment, but the 32bit compiler does not support it, the elements in the array are different in different architectures. See the following methods for specific processing
    // In short, we will find the aligned 64bit as state and the remaining 32bit as semaphore
    state1 [3]uint32
}

// Get the address of state and the address of semaphore
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // If the address is 64bit aligned, the first two elements of the array are state and the latter element is semaphore
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    } else {
        // If the address is 32bit aligned, the last two elements of the array are used for state, which can be used for 64bit atomic operation, and the first element 32bit is used for semaphore
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    }
}

In the 64 bit environment, the first element of state1 is the number of waiter s, the second element is the count value of WaitGroup, and the third element is the semaphore.

noCopy: secondary vet check

The noCopy field is used to indicate that this data structure cannot be used for value copying when vet tool checks. More strictly speaking, it cannot be copied and used after the first use.

vet will statically check the data type that implements the Locker interface. Once this data type is copied and used in the code, it will issue a warning. However, WaitGroup does not satisfy the Locker interface. In this case, you can implement the Locker interface by adding a noCopy field to WaitGroup. Moreover, because the noCopy field is of non output type, WaitGroup will not expose the Lock/Unlock method.

If you want the data structure defined by yourself not to be copied, or you can't check the alarm of copying through vet tool, you can embed noCopy as the data type.

method

Add & Done

The Add method mainly operates the state counting part. After removing the code of race check and exception check, its implementation is as follows:

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    // The high 32bit is the count value v, so move the delta left by 32 and increase it to the count
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32) // Current count value
    w := uint32(state) // waiter count
    
    if v > 0 || w == 0 {
        return
    }
    
    // If the count value v is 0 and the number of waiters w is not 0, the value of state is the number of waiters
    // Set the number of waiters to 0. Because the count value v is also 0, the combination * statep of them can be directly set to 0. At this point, you need to and wake up all waiters
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

// The Done method is actually a counter minus one
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}
Wait

The implementation logic of the Wait method is to constantly check the value of state. If the count value changes to 0, it indicates that all tasks have been completed, and the caller does not have to Wait and returns directly. If the count value is greater than 0, it indicates that there are tasks not completed at this time, then the caller becomes a waiting person, needs to join the waiter queue and block himself.

The main implementation code is as follows:

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // Current count value
        w := uint32(state) // Number of waiter s
        if v == 0 {
            // If the count value is 0, the goroutine calling this method does not have to wait any longer and can continue to execute the logic behind it
            return
        }
        // Otherwise, add 1 to the number of waiter s. During this period, there may be concurrent calls to Wait, and the increase may fail, so a for loop is used at the outermost layer
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            // Blocking sleep wait
            runtime_Semacquire(semap)
            // Wake up, no longer blocked, return
            return
        }
    }
}

Common errors

The counter is set to a negative value

The counter value of WaitGroup must be greater than or equal to 0. When we change the count value, WaitGroup will check it first. If the count value is set to negative, panic will be caused.

In general, there are two ways to set the counter to a negative number:

  1. A negative number is passed when calling Add. If you can guarantee that the current counter is greater than or equal to 0 after adding this negative number, there is no problem, otherwise it will lead to panic.
  2. The Done method was called too many times, exceeding the count value of WaitGroup.

Add timing error

When using WaitGroup, you must follow the principle that Wait until all the Add methods are called before calling Wait, otherwise it may lead to panic or unexpected results.

Reuse WaitGroup before the previous Wait is over

As long as the count value of WaitGroup returns to zero, it can be regarded as a newly created WaitGroup and reused. However, if we reuse it before the count value of WaitGroup returns to zero, it will lead to program panic. Let's take an example. Initially set the count value of WaitGroup to 1, start a goroutine, call the Done method first, and then call the Add method. The Add method may be executed concurrently with the main goroutine.

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        time.Sleep(time.Millisecond)
        wg.Done() // Counter minus 1
        wg.Add(1) // Count value plus 1
    }()
    wg.Wait() // The main goroutine waits, which may be executed concurrently with line 7
}

In this example, line 6 restores the count of WaitGroup to 0, but because there is a waiter waiting in line 9, if the goroutine waiting for Wait conflicts with the Add call (line 7) immediately after it is awakened, panic will appear.

Although WaitGroup can be reused, there is a premise, that is, you can reuse WaitGroup to execute the next round of Add/Wait only after the last round of Wait is completed. If you call the next round of Add method before the Wait is completed, panic may occur.

Keywords: Go

Added by jimmayhugh on Thu, 03 Mar 2022 10:46:48 +0200