Go concurrent programming - Mutex source code implementation

Mutex: how to solve the problem of concurrent access to resources

Multithreading accesses shared resources through mutual exclusion, which is basically the locking of shared memory? (if it is multi process, can you simply use mutual exclusion locks? Do you need distributed locks?)
Usage scenario of synchronization primitive:

  • Share resources. When reading and writing shared resources concurrently, there will be a problem of data race. Therefore, concurrent primitives such as Mutex and RWMutex are needed to protect them.
  • Task scheduling. Goroutines need to be executed according to certain rules, and goroutine s have a sequential relationship of waiting or dependence. We often use WaitGroup or Channel to implement it.
  • Messaging. Channel is often used for information exchange and thread safe data exchange between different goroutine s.

Mutex: concrete implementation

In the first version, the CAS mechanism is used to increase the key value by one every time the lock is added and decrease the key value by one every time the lock is released. However, when the lock is released, the root line will not be checked and any root line is allowed to release the lock,
This feature has been retained to this day. When releasing, if there is a rootine waiting, wake it up directly to get the lock. In other words, the new groutine gets the lock first.

// CAS operation, atomic package was not abstracted at that time
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)

// The structure of a mutex, which contains two fields
type Mutex struct {
    key  int32 // Identification of whether the lock is held
    sema int32 // Semaphore dedicated to block / wake goroutine
}

// Ensure that the delta value is successfully added to val
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val
        if cas(val, v, v+delta) {
            return v + delta
        }
    }
    panic("unreached")
}

// Request lock
func (m *Mutex) Lock() {
    if xadd(&m.key, 1) == 1 { //ID plus 1. If it is equal to 1, the lock is successfully obtained
        return
    }
    semacquire(&m.sema) // Otherwise, block and wait
}

func (m *Mutex) Unlock() {
    if xadd(&m.key, -1) == 0 { // Subtract 1 from the ID. if it is equal to 0, there are no other waiters
        return
    }
    semrelease(&m.sema) // Wake up other blocked goroutine
}    

Among them, CAS operation is used inside xadd() function to ensure the atomicity of operation. When multiple growines call Lock() function at the same time, the key value will not be written repeatedly at a certain moment. Each time Lock() calls the key value, add one. If it is 1 after adding, it means that only one growine is acquiring the lock, and the Lock() function can return directly, indicating that locking is successful; When it is not equal to 1, it means that another thread has obtained the lock at this time and needs to block and wait for it to release the lock. Therefore, after xadd() ends, use the semaphore to put this groove into a FIFO queue. When the lock is released, if the key value minus one is not 0, it indicates that other groutine is also trying to obtain the lock and blocking the waiting. At this time, use the semaphore to wake up the first groutine of the waiting FIFO queue, then the first groutine will be awakened from semacquire(), and its Lock() function can return, indicating that it has successfully obtained the lock.
Therefore, in this simple case, all the growines requesting locks at the same time are put into a FIFO queue. When the lock is released, the growines at the head of the queue are asked to obtain the lock in order.
Note that CAS atomic operation shall be adopted for the operation of assigning value to state. In case of concurrency, if two growines operate cas(m.state, 0, 1), only one will succeed and the other will fail. Therefore, CAS operation shall be written in the for loop.

In the second version, in order to give the new growtine a chance, when releasing the lock, it is no longer designated as the first to obtain the lock in the FIFO queue, but let it compete with the new growtine,
So the newcomer has a chance to get the lock.

type Mutex struct {
    state int32
    sema  uint32
}


const (
    mutexLocked = 1 << iota 		// mutex is locked, 1 is shifted to the left by 0 bit, that is, the lowest 1
    mutexWoken						// That is, mutexwoken = 1 < < iota, that is, mutexwoken = 1 < < 1, 1 shifts left by 1 bit, that is, the lower order 1
    mutexWaiterShift = iota         // mutexWoken = 2
)


func (m *Mutex) Lock() {
    // Fast path: Lucky case, which can directly obtain the lock
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false
    for {
        old := m.state
        new := old | mutexLocked // New state locking
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift //Number of people waiting plus one
        }
        if awoke {
            // goroutine was awakened,
            // New status clear wake up flag
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {//Set new status
            if old&mutexLocked == 0 { // The original state of the lock is unlocked
                break
            }
            runtime.Semacquire(&m.sema) // Request semaphore
            awoke = true
        }
    }
}


func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked) //Remove the lock mark
    if (new+mutexLocked)&mutexLocked == 0 { //It wasn't locked
        panic("sync: unlock of unlocked mutex")
    }

    old := new
    for {
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // There are no waiters, or there are wake-up waiters, or the lock has been locked
            return
        }
        new = (old - 1<<mutexWaiterShift) | mutexWoken // In the new state, prepare to wake up goroutine and set the wake-up flag
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime.Semrelease(&m.sema)
            return
        }
        old = m.state
    }
}

The basic implementation of this version is still to assign a value to stat when Lock() is called, but it is not a simple number plus 1 operation, but a user-defined value with special meaning; At the same time, after the assignment is completed, if the assignment of the same version is successful and becomes 1, it indicates that the function returns differently if the original Lock is not added. Here, it is explicitly judged that the Lock is not added before the assignment, and if the assignment is successful, the direct return indicates that the Lock is obtained successfully; Moreover, the first version of the sleep function did not execute a statement, indicating that the function returned to obtain the Lock after waking up. Here, after waking up, it enters the next cycle and does not return directly, so it does not directly obtain the Lock. If other new growines execute Lock in the next cycle, there will be competition. See which Lock succeeds, that is "giving new people a chance".
Here are some points to note. The lowest bit of < 1 > state indicates the lock position. If lock () is operated when it is 1, the cas in lock () will surely fall into sleep after successful execution, which is the same as the sleep mechanism when the state after cas is not 1 in the first version< 2> The second low bit indicates the wake-up bit. This bit indicates that a sleeping thread is awakened to compete for the lock. It will only be set by the groove executing the Unlock() under certain conditions when it is unlocked(). It will wake up a certain groove only after it is set to 1< 3> The remaining high bits indicate the number of growines waiting to obtain locks. It will increase when there are growines for Lock() and decrease when Unlock() wakes up growines.

Mark the meaning of the status field by bit, and the lowest mutexLocked indicates whether the current Mutex is locked; The second low mutexWoken indicates whether there is currently a awakened gutter. If there is, it means that the awakened gutter will try to obtain the lock; The remaining high bits indicate the number of threads currently waiting to acquire a lock.
The basic idea is: when multiple growines call Lock() at the same time, if the current lock is not held by anyone, call CompareAndSwapInt32() atomic operation (set the initial value to 1, that is, the muteslocked bit to 1). The successful growine means that the lock is obtained successfully, and can return directly without blocking and waiting.
Otherwise, set the new state value according to the current state field and judge whether to sleep and wait.

In the third version, in order to give priority to getting the lock to the groutine running in the cpu (this can reduce the context of the groutine switch), when the groutine tries to get the lock, let it spin a certain number of times (the cpu is occupied all the time when spinning),
If the lock is released by other groutine, it will take priority to obtain the lock, so the performance will be better. That is, "give more opportunities to newcomers", most of them are newcomers to groutine to get locks.

The fourth version is implemented. In order to prevent the sleeping rootine in the third version from not getting the lock all the time, set a time limit of 1ms for it to fall into sleep. Beyond this time, the lock structure will enter the starvation mode,
In starvation mode, the lock is assigned to the first root of the FIFO queue. The new root does not spin or gun lock and is directly placed at the end of the FIFO queue.

Keywords: Go

Added by Funk001 on Wed, 24 Nov 2021 10:44:39 +0200