[Go advanced concurrent programming] Mutex

Mutex is the most basic means of access control to critical resources in concurrent programs. Mutex is the native implementation of mutex in Go language.

data structure

Source package Src / sync / Mutex The data structure of Mutex is defined in go:

type Mutex struct {
    state int32
    sema  uint32
}

The state field indicates the status of the mutex, which is a 32-bit integer. During internal implementation, the variable is divided into four parts to record four states:

  • Locked: indicates whether the mutex has been locked;
  • Woken: indicates whether to wake up from the normal mode;
  • Starving: indicates whether the Mutex is hungry;
  • Waiter: indicates the number of coprocesses blocking and waiting on the mutex.

The sema field indicates the semaphore. The co process that fails to lock blocks and waits for the semaphore. The unlocked co process releases the semaphore to wake up the co process that waits for the semaphore.

Normal mode and starvation mode

Mutex has two modes - normal mode and starvation mode. Starvation mode is an optimization introduced in version 1.9 to ensure the fairness of mutex and prevent the cooperative process from starving. By default, mutex's mode is normal.

In the normal mode, if the co process fails to lock, it will not immediately enter the waiting queue, but judge whether the conditions for spin are met. If so, it will spin.

When the co process holding the lock releases the lock, it will release a semaphore to wake up a co process in the waiting queue. However, if a co process is in the process of spin, the lock will often be obtained by the spin co process. The wake-up process has to block again, but before blocking, it will judge how long it has been since the last blocking to this blocking. If it exceeds 1ms, it will mark Mutex as hungry mode.

In the starvation mode, the newly locked processes will not enter the spin state. They will only wait at the end of the queue. After the mutex is released, it will be directly handed over to the process at the front of the waiting queue. If a coroutine obtains a mutex and it is at the end of the queue or its waiting time is less than 1ms, the mutex will switch back to normal mode.

method

Mutex Mutex provides two methods, Lock and Unlock: calling Lock method before entering critical area, calling Unlock method when leaving critical area.

Lock

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

When the lock state is 0, set mutexLocked to 1, which is the simplest case. If the status of the mutex is not 0, the lockSlow method will be called. Here, it is divided into several parts to introduce the process of obtaining the lock:

  1. Judge whether the current Goroutine can enter the spin;
  2. Wait for the release of mutex by spin;
  3. Calculate the latest status of the mutex;
  4. Update the status of the mutex and acquire the lock.
Judge whether the current Goroutine can enter the spin
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // Hunger markers for this goroutine
    awoke := false // Wake up flag
    iter := 0 // Spin number
    old := m.state
    for {
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

The conditions for Goroutine to enter spin are very harsh:

  • Mutex can enter spin only in normal mode;
  • runtime.sync_runtime_canSpin needs to return true:

    • Running on a multi CPU machine;
    • At present, the number of times Goroutine enters the spin in order to obtain the lock is less than 4;
    • There is at least one running processor P on the current machine and the processing run queue is empty.
Wait for the release of the mutex by spinning

Once the current Goroutine can enter the spin, it will call runtime sync_ runtime_ Dospin and runtime Procyield executes the PAUSE instruction 30 times, which will only occupy CPU and consume CPU time:

func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET
Calculates the latest state of the mutex

After handling the spin dependent logic, the latest state of the current mutex is calculated according to the context. Several different conditions update different information stored in the state field - mutexLocked, mutexstarting, mutexWoken and mutexWaiterShift:

        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked // Non hungry state, locked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift // State of starvation plus 1
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving // Set hunger status
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken // Clear wakeup flag in new status
        }
Update the status of the mutex and acquire the lock

After calculating the new mutex state, the state is updated through the CAS function. If the lock is not obtained, runtime is called sync_ runtime_ Semacquiremutex guarantees that resources will not be acquired by two goroutines through semaphores. runtime.sync_runtime_SemacquireMutex will keep trying to acquire the lock in the method and fall into sleep waiting for the semaphore to be released. Once the current Goroutine can acquire the semaphore, it will return immediately and continue to execute the remaining code.

  • In normal mode, this code will set the wake-up and starvation flags, reset the number of iterations, and re execute the cycle of obtaining locks;
  • In the starvation mode, the current Goroutine will obtain the mutex. If only the current Goroutine exists in the waiting queue, the mutex will exit from the starvation mode.
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // The state of the original lock has been released and is not hungry. Obtain the lock and return
            if old&(mutexLocked|mutexStarving) == 0 {
                break
            }
            
            // Dealing with hunger
            
            // If you were in the queue before, add to the queue header
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // Blocking waiting
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // After waking up, check whether the lock should be hungry
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // If the lock is already hungry, grab the lock directly and return
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // Lock and reduce the number of waiter s by 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // Clear hunger mark
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

Unlock

    func (m *Mutex) Unlock() {
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            m.unlockSlow(new)
        }
    }
    
    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 {
            old := new
            for {
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {
            runtime_Semrelease(&m.sema, true, 1)
        }
    }

The unlocking process of mutex is relatively simple, which uses sync atomic. The addint32 function unlocks quickly and executes the slow unlocking process after failure. unlockSlow will verify the validity of the lock state first - if the current mutex has been unlocked, an exception will be thrown directly. Then, according to the current status of the mutex, handle it in normal mode and starvation mode respectively:

  • In normal mode, the above code will use the process shown below:

    • If there are no waiters in the mutex lock or the mutexLocked, mutexstarting and mutexWoken states of the mutex lock are not all 0, the current method can return directly without waking up other waiters;
    • If there is a waiting person in the mutex, it will pass sync runtime_ Semrelease wakes up the waiting person and transfers ownership of the lock;
  • In starvation mode, the above code will directly call sync runtime_ Semrelease gives the current lock to the next waiting person who is trying to obtain the lock. The waiting person will get the lock after being awakened. At this time, the mutex will not exit the hungry state.

Error prone scenario

There are four common error scenarios when using Mutex: Lock/Unlock does not appear in pairs, Copy Mutex used, reentry and deadlock. The other three are relatively simple. Here we focus on the problem of reentry.

Reentry

The standard library Mutex is not a reentrant lock, which means that the same lock cannot be obtained multiple times in a goroutine. If you want to implement a reentrant lock based on Mutex, you can have the following two schemes:

  • Get the goroutine id through hacker and record the goroutine id of the lock. It can implement the Locker interface.
  • When the Lock/Unlock method is called, goroutine provides a token to identify itself, instead of obtaining goroutine id through hacker. However, this does not meet the Locker interface.

Reentrant lock solves the deadlock problem caused by code reentry or recursive call. At the same time, it also brings another advantage. We can require that only goroutine holding the lock can unlock the lock. This is also easy to implement, because in the above two schemes, it has been recorded which goroutine holds the lock.

Scheme 1: goroutine id

The key first step of this scheme is to obtain goroutine id in two ways: simple method and hacker method.

The simple way is through runtime The stack method obtains the stack frame information, which contains goroutine id. runtime. The stack method can obtain the current goroutine information.

Next, let's look at the hacker method. We get the G pointer at runtime and inverse solve the corresponding g structure. The G pointer of each running goroutine structure is stored in a TLS object called the current goroutine.

  1. We first get the TLS object;
  2. Then obtain the g pointer of goroutine structure from TLS;
  3. Then take the goroutine id from the g pointer.

We don't need to invent the wheel repeatedly. We can directly use the third-party library to obtain goroutine id. Now there are many mature libraries, such as petermattis/goid . Next, we implement a reentrant lock that can be used:

// Recursive Mutex wraps a Mutex to enable reentry
type RecursiveMutex struct {
    sync.Mutex
    owner     int64 // goroutine id of the lock currently held
    recursion int32 // The number of times this goroutine re enters
}

func (m *RecursiveMutex) Lock() {
    gid := goid.Get()
    // If the goroutine currently holding the lock is the goroutine called this time, it means reentry
    if atomic.LoadInt64(&m.owner) == gid {
        m.recursion++
        return
    }
    m.Mutex.Lock()
    // For the first call of goroutine that obtains the lock, record its goroutine id, and add 1 to the number of calls
    atomic.StoreInt64(&m.owner, gid)
    m.recursion = 1
}

func (m *RecursiveMutex) Unlock() {
    gid := goid.Get()
    // goroutine that does not hold the lock attempts to release the lock, which is used incorrectly
    if atomic.LoadInt64(&m.owner) != gid {
        panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
    }
    // Call times minus 1
    m.recursion--
    if m.recursion != 0 { // If the goroutine has not been completely released, it will return directly
        return
    }
    // The last time this goroutine is called, the lock needs to be released
    atomic.StoreInt64(&m.owner, -1)
    m.Mutex.Unlock()
}
Scheme 2: token

Scheme 1 is to use goroutine id as the logo of goroutine. We can also let goroutine provide the logo by itself. In any case, Go developers do not expect users to use goroutine id to do something uncertain, so they do not expose the method of obtaining goroutine id.

We can design it this way. The caller can provide a token by himself. The token is passed in when the lock is obtained and also needs to be passed in when the lock is released. The goroutine id in scheme 1 is replaced by the token passed in by the user, and other logic is consistent with the scheme one by one.

expand

TryLock

We can add a TryLock method to Mutex, that is, try to obtain the lock. When a goroutine calls the TryLock method to request a lock, if the lock is not held by other goroutines, the goroutine holds the lock and returns true. If the lock has been held by other goroutines or is preparing to be handed over to a awakened goroutine, it will directly return false and will not block the method call.

// Copy constants defined by Mutex
const (
    mutexLocked = 1 << iota // Lock mark position
    mutexWoken              // Wake up identification location
    mutexStarving           // Lock identification position
    mutexWaiterShift = iota // Identifies the starting bit position of the waiter
)

// Extend a Mutex structure
type Mutex struct {
    sync.Mutex
}

// Attempt to acquire lock
func (m *Mutex) TryLock() bool {
    // If you can successfully grab the lock
    if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
        return true
    }
    // If the request is in the wake-up, locking or starvation state, the request will not participate in the competition and returns false
    old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
        return false
    }
    // An attempt was made to request a lock in a competing state
    new := old | mutexLocked
    return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}

Line 17 is a fast path. If you are lucky that no other goroutine competes for the lock, the lock will be obtained by the goroutine of the request and returned directly.

If the lock has been held by other goroutines or is ready to be held by other awakened goroutines, it will directly return false and will not be requested. The code logic is in line 23.

If it is not held, there is no other awakened goroutine to compete for the lock, and the lock is not hungry, try to obtain the lock (line 29), and the result will be returned whether it is successful or not. At this time, there may be other goroutines competing for the lock. Therefore, it is not guaranteed to successfully obtain the lock.

Obtain indicators such as the number of waiting persons

The data structure of Mutex contains two fields: state and sema. The first four bytes are the state field. The state field in the Mutex structure has many meanings. Through the state field, you can know whether the lock has been held by a goroutine, whether it is currently hungry, whether there are waiting goroutines awakened, the number of waiting people and other information. However, the state field is not exposed. How to get the unexposed field? It's very simple. We can implement it through unsafe.

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
)

type Mutex struct {
    sync.Mutex
}

func (m *Mutex) Count() int {
    // Gets the value of the state field
    v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    v = v >> mutexWaiterShift //Get the value of the waiting person
    v = v + (v & mutexLocked) //Add the number of lock holders, 0 or 1
    return int(v)
}

In line 14 of this example, we can get the value of the state field through the unsafe operation. In line 15, the number of current waiters is obtained by shifting three bits to the right (the value of the constant mutexWaiterShift here is 3). If the current lock has been held by other goroutines, we can slightly adjust this value and add a 1 (line 16), which can basically be regarded as the total number of goroutines currently holding and waiting for this lock.

Keywords: Go

Added by ganesh on Sun, 27 Feb 2022 11:06:26 +0200