go concurrency control - WaitGroup

WaitGroup

preface

  • WaitGroup is a concurrency control technology often used in the process of Golang application development
  • WaitGroup
    • It can be understood as wait goroutine group
    • That is, wait for a group of goroutine s to end
  • For example, a goroutine needs to wait for other goroutines to complete
    • Then it can be easily implemented using WaitGroup
  • The following program shows an example of a goroutine waiting for the other two goroutines to finish:
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup

	// Set the counter. The value is the number of goroutine s
	wg.Add(2)
	go func() {
		// Do some work
		time.Sleep(1 * time.Second)

		fmt.Println("Goroutine 1 finished!")
		// After goroutine execution, the counter is decremented by one
		wg.Done()
	}()

	go func() {
		// Do some work
		time.Sleep(2 * time.Second)

		fmt.Println("Goroutine 2 finished!")
		wg.Done()
	}()

	// The main goroutine blocking wait counter changes to 0
	wg.Wait()
	fmt.Printf("All Goroutine finished!")
}
  • Simply put
  • A counter is wg internally maintained in the above program:
    • Before starting goroutine, set the counter to the number of goroutines to be started through Add(2)
    • After starting goroutine, use the Wait() method to block yourself and wait for the counter to change to 0
    • At the end of each goroutine execution, the counter is decremented by 1 through the Done() method
    • After the counter becomes 0, the blocked goroutine is awakened
  • In fact, WaitGroup can also implement one group of goroutines waiting for another group of goroutines
    • It's kind of like acrobatics
      • Very error tolerant
      • This is especially true if you do not understand its implementation principle
  • In fact, the implementation source code of WaitGroup is very simple

Basic knowledge

Semaphore

  • Semaphore is a mechanism provided by Unix system to protect shared resources
    • Used to prevent multiple threads from accessing a resource at the same time
  • It can be simply understood that the semaphore is a value:
    • When semaphore > 0
      • Indicates that resources are available, and the system automatically reduces the semaphore by 1 when acquiring the semaphore
    • When semaphore = = 0
      • Indicates that the resource is temporarily unavailable. When obtaining the semaphore, the current thread will go to sleep
      • Wakes up when the semaphore is positive
  • Because semaphores are also used in WaitGroup implementation
    • Here is a brief introduction

WaitGroup

data structure

  • state1 is an array of length 3
    • It contains state and a semaphore
    • state is actually two counters:
      • Counter: the goroutine counter that has not been executed yet
      • waiter count: the number of goroutines waiting for the goroutine group to end
        • That is, how many people are waiting
      • Semaphore: semaphore
  • Consider whether the bytes are aligned
    • The three appear in different positions
    • for simplicity
      • By byte aligned
      • The locations of the three in memory are as follows:

  • WaitGroup provides three external interfaces:
    • Add(delta int): add the delta value to the counter
    • Wait(): wait is incremented by 1
      • And block the waiting semaphore semaphore
    • Done(): counter decrements by 1
      • Release the semaphore for corresponding times according to the waiter value
  • The implementation details of these three functions are described below

Add(delta int)

  • Add() does two things
    • First, add the delta value to the counter
      • Because delta can be negative
      • In other words, counter may become 0 or negative
    • So the second thing is when the counter value becomes 0
      • Release the same amount of semaphore according to the waiter value
      • Wake up all the waiting goroutine s
    • If counter becomes negative
      • panic
  • Add() pseudo code is as follows:
func (wg *WaitGroup) Add(delta int) {
	//Get state and semaphore address pointers
	statep, semap := wg.state()

	//Shift delta left by 32 bits and add it to state, that is, add it to counter
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	//Get counter value
	v := int32(state >> 32)
	//Get waiter value
	w := uint32(state)
	//After accumulation, the counter value becomes a negative value, panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	 }

	 //After accumulation, at this time, counter > = 0
	 //If the counter is positive, it means there is no need to release the semaphore and exit directly
	 //If the waiter is zero, it means that there is no waiting person and there is no need to release the semaphore. Exit directly
	 if v > 0 || w == 0 {
	 return
	}
	//At this time, the counter must be equal to 0, and the water must be greater than 0 (the internal maintenance of water will not be less than 0),
	//First set the counter to 0, and then release the semaphore of the number of waiter s
	*statep = 0
	for ; w != 0; w-- {
		//Release the semaphore, release one at a time, and wake up one waiting person
		runtime_Semrelease(semap, false)
	}
}

wait()

  • The Wait() method also does two things
    • One is to accumulate waiter
    • The second is blocking waiting semaphore
func (wg *WaitGroup) Wait() {
	statep, semap := wg.state() //Get state and semaphore address pointers
	for {
		state := atomic.LoadUint64(statep) //Get state value
		v := int32(state >> 32) //Get counter value
		w := uint32(state) //Get waiter value
		if v == 0 { //If the counter value is 0, it means that all goroutine s have exited. There is no need to wait and return directly
			return
		}

		// Use CAS (comparison and exchange algorithm) to accumulate waiter s. Accumulation may fail. After failure, try again through for loop
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			runtime_Semacquire(semap) //After successful accumulation, wait for the semaphore to wake you up
			return
		}
	}
}
  • CAS algorithm is used here
    • When multiple goroutine s are guaranteed to execute Wait() at the same time
    • It can also correctly accumulate waiter s

Done()

  • Done() does only one thing
    • That is, subtract counter by 1
  • We know that Add() can accept negative values
    • So Done actually just called Add(-1)

  • The execution logic of Done() goes to Add()
  • In fact, it is the last goroutine that wakes up the waiting person

Programming Tips

  • Add() operation must be earlier than Wait()
    • Otherwise it will panic
  • The value set by Add() must be consistent with the number of goroutine s actually waiting
    • Otherwise it will panic

Keywords: Go Back-end

Added by darkhappy on Tue, 14 Dec 2021 08:33:34 +0200