A CAS Operational Use Scenario for Go

About a year ago, there was such a problem:
There are N routines executed concurrently in the program, which will write data to a channel with size n. The N routines have high concurrency and load, so they don't want to be stuck when writing data, so they use this code.

if len(c) < n {
    c <- something // Write in
}

The original meaning is to ensure that it can be written to prevent the worker routine from getting stuck. However, in the actual operation process, routine cards are written in channel for very simple reasons.

Multiple routines simultaneously determine that len(c) is not full and enter the code written to channel. When channel is full, if the processing is not timely, the routine written later will block here.

Using a sync.Mutex to protect the length of the check and the code written to channel can certainly be solved, but considering that this Mutex may affect performance, it is actually a relatively low solution.

const (
    _CHAN_SIZE  = 10
    _GUARD_SIZE = 10
)

var c chan interface{} = make(_CHAN_SIZE + _GUARD_SIZE) // An extra protective space was allocated.

func write(val interface{}) {
    if len(c) < _CHAN_SIZE {
        c <- val
    }
}

Among the multiple routines R1, R2... Rn executed concurrently, only one routine is allowed to perform one operation at the same time, and other routines can use CAS operations when they need non-blocking knowledge that they are not authorized to operate and return.

For these worker routine s, this is probably the case:

"A weak glance at that position (operation), no one occupies us, no one else occupies us, not the same, go straight."

The more elegant way is to use atomic.CompareAndSwap functions in the go standard library.

// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
...

These functions are simple. When the value of a given address is equal to old, they are set to a new value and return true, otherwise false.
This function is an atomic operation.

Description in Wikipedia:
Compare and swap (CAS)

So the above code can be written as follows:

func writeMsgWithCASCheck(val interface{}) {
    if atomic.CompareAndSwapInt64(&flag, 0, 1) {
        if len(c) < _CHAN_SIZE {
            c <- val
            atomic.StoreInt64(&obj.flag, 0)
            return nil
        }
        atomic.StoreInt64(&obj.flag, 0)
    }
}

If you want to make sure that it's written in, you can put another for on the outside of atomic.

func writeMsgWithCASCheck(val interface{}) {
    for {
        if atomic.CompareAndSwapInt64(&flag, 0, 1) {
            if len(c) < _CHAN_SIZE {
                ...
        }
    }
}

But the effect is as busy as the direct card in C < - val.

In response to this situation, I wrote a simple test program:

$ go run cas.go
R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(2)+1 R(3)+1 R(1)+1 R(0)+1 R(1)+1 R(2)+1 R(3)+1 Chan overflow, len: 13.
quit.
$ go run cas.go cas
R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(0)+1 R(3)+1 R(1)+1 R(2)+1 R(1)+1 R(0)+1 R(3)+1 R(2)+1 R(1)+1 R(3)+1 R(3)+1 R(3)+1 R(3)+1 R(1)+1 R(2)+1 R(2)+1 R(2)+1 R(3)+1 R(1)+1 R(2)+1 R(3)+1 R(1)+1 R(1)+1 R(2)+1 R(1)+1 R(2)+1 <nil>
quit.

It's easy to write more than expected when you keep writing four routine s.

The complete code is cas.go as follows:

package main

import (
    "errors"
    "fmt"
    "os"
    "sync/atomic"
    "time"
)

const (
    _CHAN_SIZE  = 10
    _GUARD_SIZE = 10

    _TEST_CNT = 32
)

type Obj struct {
    flag int64
    c    chan interface{}
}

func (obj *Obj) readLoop() error {
    counter := _TEST_CNT
    for {
        time.Sleep(5 * time.Millisecond)
        if len(obj.c) > _CHAN_SIZE {
            return errors.New(fmt.Sprintf("Chan overflow, len: %v.", len(obj.c)))
        } else if len(obj.c) > 0 {
            <-obj.c
            counter--
        }
        if counter <= 0 {
            return nil
        }
    }
}

func (obj *Obj) writeMsg(idx int, v interface{}) (err error) {
    for {
        if len(obj.c) < _CHAN_SIZE {
            obj.c <- v
            fmt.Printf("R(%v)+1 ", idx)
            return nil
        }
    }
}

func (obj *Obj) writeMsgWithCASCheck(idx int, v interface{}) (err error) {
    for {
        if atomic.CompareAndSwapInt64(&obj.flag, 0, 1) {
            if len(obj.c) < _CHAN_SIZE {
                obj.c <- v
                atomic.StoreInt64(&obj.flag, 0)
                fmt.Printf("R(%v)+1 ", idx)
                return nil
            } else {
                atomic.StoreInt64(&obj.flag, 0)
            }
        }
    }

    return nil
}

func main() {
    useCAS := false
    if len(os.Args) > 1 && os.Args[1] == "cas" {
        useCAS = true
    }
    routineCnt := 4
    tryCnt := _TEST_CNT / routineCnt
    var obj = &Obj{c: make(chan interface{}, _CHAN_SIZE+_GUARD_SIZE)}

    for idx := 0; idx < routineCnt; idx++ {
        go func(nameIdx int) {
            for tryIdx := 0; tryIdx < tryCnt; tryIdx++ {
                if useCAS {
                    obj.writeMsgWithCASCheck(nameIdx, nil)
                } else {
                    obj.writeMsg(nameIdx, nil)
                }
            }
        }(idx)
    }

    // fmt.Println(casObj.readLoop())
    fmt.Println(obj.readLoop())
    fmt.Println("quit.")
}

Keywords: Go

Added by peeps on Tue, 03 Sep 2019 15:08:57 +0300