Deeply Understanding the Principles of go-channel and select

Two of the most attractive things about Go, besides goroutine, that is, channel, I've been wondering how select ion actually works. As in my previous article, some irrelevant code is omitted directly

1. Overview of structure

1.1. hchan

This is the structure of channel.

type hchan struct {
   qcount   uint           // Total data in queues
   dataqsiz uint           // The size of the ring queue, > 0 means buffered, = 0 means no buffered
   buf      unsafe.Pointer // A pointer to an array of elements
   elemsize uint16         // Size of a single element
   closed   uint32         // Indicates closing or not.
   elemtype *_type                  // Element types are described later when interface is written
   sendx    uint                    // Index of send array, C < - I
   recvx    uint                    // Index of receive array <-c
   recvq    waitq                   // A list of goroutine s waiting for recv data
   sendq    waitq                   // goroutine list waiting for send data
   lock mutex
}

1.2. waitq

type waitq struct {
   first *sudog
   last  *sudog
}

1.3. sudog

sudog represents a waiting g

type sudog struct {
   g *g

   // isSelect indicates g is participating in a select, so
   // g.selectDone must be CAS'd to win the wake-up race.
   isSelect bool
   next     *sudog
   prev     *sudog
   elem     unsafe.Pointer // Data element, C < - 1, then 1

   // The following fields are never accessed concurrently.
   // For channels, waitlink is only accessed by g.
   // For semaphores, all fields (including the ones above)
   // are only accessed when holding a semaRoot lock.

   acquiretime int64
   releasetime int64
   ticket      uint32
   parent      *sudog // semaRoot binary tree
   waitlink    *sudog // g.waiting list or semaRoot
   waittail    *sudog // semaRoot
   c           *hchan // channel
}

1.4. hcase

This is the structure generated by a case in select

type scase struct {
   c           *hchan         // chan
   elem        unsafe.Pointer // data element
   kind        uint16  // Current case type, nil recv send or default
   pc          uintptr // race pc (for race detector / msan)
   releasetime int64
}

From the above structure, we can see that the internal essence of channel is a buffer pool + two queues (send recv). So how does the data interact? There is a sketch map on the internet, which shows a more vivid image.

1.5. Illustration

1.5.1. Buffer-free (synchronization)

1.5.2. Buffered (asynchronous)

Combining the above structure and illustration, we can probably infer the channel's send recv process.

  1. If it's a recv (<-channel) request, first determine if someone in a sendq queue is waiting for the data to be placed.

    1. If the sendq queue is not empty and the buffer pool is not empty, then the sendq queue is waiting to put data. The g of recv takes data from the buffer pool, and then puts the data carried by the first g of sendq into the buf buffer pool.
    2. If sendq is not empty but the buffer pool is empty, then this is chan without buffer pool. I take the first g data from sendq and it's ok.
    3. If sendq is empty, go to the buffer pool and see if the buffer pool has data, then take it and go.
    4. If sendq is empty and the buffer pool has no data, wait here.
  2. If send, the process is the same as recv
  3. If the channel is closed at this point, wake up all waiting g in the waiting queue (sendq or recvq) and tell them channel.close = true

Next is tracking the source code, proving and correcting the conjecture.

2. Source code analysis

2.1. Receiving and Sending

2.1.1. main

Let's use the go tool to analyze how channel generation, C < - i,< - C are implemented at the bottom level.

func main() {
    c1 := make(chan int)
    c2 := make(chan int, 2)
    go func() {
        c1 <- 1
        c2 <- 2
    }()
    <-c1
    <-c2
    close(c1)
    close(c2)
}

go build -gcflags=all="-N -l" main.go

go tool objdump -s "main.main" main

After we filter out the CALL

▶ go tool objdump -s "main\.main" main | grep CALL
  main.go:4             0x4548d5                e806fbfaff              CALL runtime.makechan(SB)               
  main.go:5             0x4548f8                e8e3fafaff              CALL runtime.makechan(SB)               
  main.go:6             0x454929                e822a1fdff              CALL runtime.newproc(SB)                
  main.go:10            0x454940                e81b08fbff              CALL runtime.chanrecv1(SB)              
  main.go:11            0x454957                e80408fbff              CALL runtime.chanrecv1(SB)              
  main.go:12            0x454965                e82605fbff              CALL runtime.closechan(SB)              
  main.go:13            0x454973                e81805fbff              CALL runtime.closechan(SB)              
  main.go:3             0x454982                e8d981ffff              CALL runtime.morestack_noctxt(SB)       
  main.go:7             0x454a32                e899fcfaff              CALL runtime.chansend1(SB)              
  main.go:8             0x454a4c                e87ffcfaff              CALL runtime.chansend1(SB)              
  main.go:6             0x454a5b                e80081ffff              CALL runtime.morestack_noctxt(SB) 
  • makechan: Functions that create channel s are the same with or without buffers
  • Chanrecv1: <-c1, the function called
  • Cloechan: The function called when close (c1) closes channel usage
  • Chansend1: C1 < - 1, which is the function used to send data

2.1.2. makechan

The main part of creating channel is to allocate memory to the structure and bug buffer pool, and then initialize the structure of hchan.

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    // Check the size limit of elem
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    // Alignment restriction
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    // Size, which is 2 in make(chan int, 2), defaults to zero to determine the upper and lower limits of size
    if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case size == 0 || elem.size == 0:
        // Queue or element size is 0, no buffer pool is allocated
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        // buf points to itself without allocating memory
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        // Allocate a block of memory to store hchan and buf
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        // It refers to the type of pointer, which can be assigned to the normal hchan structure, and buf can be allocated separately.
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
    }
    // Initialize the properties of hchan
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    return c
}

2.1.3. chanrecv1

chanrecv1 calls the chanrecv implementation. chanrecv listens to channels and receives data from channels and writes it into ep

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
            // Clear the data values in the address, but do not change the type
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    if sg := c.sendq.dequeue(); sg != nil {
        // Get a sudog waiting for send, and then determine whether the channel has a buffer. If there is a buffer, get the data in sudog. If the channel has a buffer, get the header element of the buffer, and add the element of sudog to the end of the queue of the buffer.
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 {
        // Receive directly from queue
        // Buffers have data, and send queues do not wait for sudog to send data (asynchronous and full or not), retrieve data according to the recvx index
        qp := chanbuf(c, c.recvx)
        // If ep is not nil, copy gp to ep
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // Data Clearance in gp Address
        typedmemclr(c.elemtype, qp)
        // Update the index of the next recv
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // Update qcount count count
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    // No sudog for send, no data for buffer, need to be blocked
    gp := getg()
    // Get the structure of a sudog and update the properties of the sudog
    mysg := acquireSudog()
    mysg.releasetime = 0
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // Put this sudog in the recv queue
    c.recvq.enqueue(mysg)
    // Hibernate this g. When G is awakened, continue to execute from here.
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    // After cleaning up the properties of sudog, release sudog
    releaseSudog(mysg)
    return true, !closed
}

From the above logic, we can see four possibilities of data transmission.

  • Sendq queue is not empty, but buf is empty (in the case of synchronous blocking g): get sudog of sendq queue head and copy sudog.elem data to the target address ep
  • Sendq queue is not empty, buf is not empty (in the case of asynchronous blocking g g): copy the buf header element to the target address ep, get sudog of sendq queue header, and then copy the sudog.elem data to the end of the buf queue, release sudog
  • sendq queue is empty, but buf is not empty (in the case of asynchronous non-blocking g): copy the buf header element to the target address ep
  • sendq queue is empty and buf is empty (synchronous non-blocking g case): at this time, you need to block yourself, get a sudog structure, put it in channel's recvq queue, wait for send g to wake up and copy your data to the target address

If you think about it, you will find a problem. After L66 goparkunlock (& C. lock, waitReason ChanReceive, traceEvGoBlockRecv, 3) hibernates g above, G is woken up and continues to execute from here, as if there is no logic to show that the recv g gets the data, and this G is blocked here for equivalence. So far, but none of the following logic operates on data?

The recv method that follows is understandable

2.1.3.1. recv

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // If it is a channel without buffer
    if c.dataqsiz == 0 {
        if ep != nil {
            // copy data from sender
            // Copy data directly between two g
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // Here's the logic of buffers.
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        // Because the sendq queue gets sudog waiting to send data, it indicates that the buffer is full. Get the address of the queue head element in buf according to rcvx
        qp := chanbuf(c, c.recvx)
        // copy data from queue to receiver
        if ep != nil {
            // Copy the data in buf into ep
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        // Copy the sudog data from the sendq queue to the just buf address, and update the index of recvx in buf, that is, the table name, the address of the first element of the buf queue, and move back.
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    // Clear sudog data
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // Wake up the sugog corresponding g in sendq
    goready(gp, skip+1)
}

Combining the above logic, we find that before g is waked up, the sudog data associated with g is already used by channel, so when g is waked up, there is no need to deal with the logic related to data transmission.

2.1.3.2. acquireSudog

Get a sudog structure, which uses the two-level cache of p scheme, i.e. local cache of an array of sudog, just like the queues of cache and scheduler scheduler scheduled to run g. At the same time, a linked list of sudog cache is maintained on the global scheme structure. When p local sudog is insufficient or excessive, go ahead. Balancing with global sched

func acquireSudog() *sudog {
    // Lock up
    mp := acquirem()
    pp := mp.p.ptr()
    // If the current cache does not have sudog, then go to the global scheme and pull some sudog caches in batches to the current p
    if len(pp.sudogcache) == 0 {
        lock(&sched.sudoglock)
        // First, try to grab a batch from central cache.
        for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
            s := sched.sudogcache
            sched.sudogcache = s.next
            s.next = nil
            pp.sudogcache = append(pp.sudogcache, s)
        }
        unlock(&sched.sudoglock)
        // If the central cache is empty, allocate a new one.
        if len(pp.sudogcache) == 0 {
            pp.sudogcache = append(pp.sudogcache, new(sudog))
        }
    }
    // Get the first return from the local cache sudog and update the sudogcache slice
    n := len(pp.sudogcache)
    s := pp.sudogcache[n-1]
    pp.sudogcache[n-1] = nil
    pp.sudogcache = pp.sudogcache[:n-1]
    if s.elem != nil {
        throw("acquireSudog: found s.elem != nil in cache")
    }
    // De lock
    releasem(mp)
    return s
}

2.1.3.3. releaseSudog

releaseSudog is sudog that releases the currently used sudog and balances the sudog of p local cache and sudog of global queue.

func releaseSudog(s *sudog) {
    mp := acquirem() // avoid rescheduling to another P
    pp := mp.p.ptr()
    // If the number of sudogs cached locally in p exceeds the maximum length of this slice, balance the general sudog to the global scheme
    if len(pp.sudogcache) == cap(pp.sudogcache) {
        // Transfer half of local cache to the central cache.
        var first, last *sudog
        for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
            n := len(pp.sudogcache)
            p := pp.sudogcache[n-1]
            pp.sudogcache[n-1] = nil
            pp.sudogcache = pp.sudogcache[:n-1]
            if first == nil {
                first = p
            } else {
                last.next = p
            }
            last = p
        }
        lock(&sched.sudoglock)
        last.next = sched.sudogcache
        sched.sudogcache = first
        unlock(&sched.sudoglock)
    }
    // Put the released sudog in the slice of the local cache
    pp.sudogcache = append(pp.sudogcache, s)
    releasem(mp)
}

2.1.4. chansend1

The sending logic is similar to the receiving logic.

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)
    // Get a sudog from the recvq queue
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    // If qcount < dataqsiz, this channel is a channel with buf, and buf is not full, just add data ep to the end of the buf team.
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // Update qcount
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    // Go here and say, buf is full or no buf, and recvq queue is empty, you need to block the current g, waiting for other G to receive data
    gp := getg()
    // Get a sudog and initialize the relevant properties
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // Bring sudog into sendq
    c.sendq.enqueue(mysg)
    // Hibernate the current g, wait for other g recv data, after recv data, wake up the G
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    // Release sudog
    releaseSudog(mysg)
    return true
}

2.1.4.1. send

Send and recv have the same logic, and because a sudog is taken from recvq, it means that the buffer is empty, so the send method does not need to consider adding data to the buffer. Send is simpler than recv, it only needs to exchange data and wake up g.

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

2.1.5. closechan

Receiving and receiving data is over, and finally the channel is closed.

func closechan(c *hchan) {
    // nil chan check
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    // closed chan check
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    // Set c to closed state
    c.closed = 1

    var glist *g

    // release all readers
    // Traverse recvq, clear the sudog data, and string the g corresponding to sudog in recvq into a linked list
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        gp.schedlink.set(glist)
        glist = gp
    }

    // release all writers (they will panic)
    // Traversing sendq, clearing sudog data, and linking g in sudog in sendq with sudog in recvq into a list
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        gp.schedlink.set(glist)
        glist = gp
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // Wake up all g collected above
    for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

After chan close, sudog in all blocked recvq and sendq (recvq and sendq have only one queue), clear some data and state of sudog, set gp.param = nil, let upper logic know that it is caused by close chan

After waking up all g, g will continue to execute the remaining logic in chansend or chanrecv, that is, to release sudog (which is why closechan does not need to release sudog)

2.1.6. Summary

Language expression is always pale. When looking for information on the Internet, I saw two flow charts, which can be seen in combination.

send process

Receiving process (recv)

2.2. select

2.2.1. main

channel's sending and receiving process has been tracked above, and the process has been clear, but with channel there is also a select, what is the selection process?

Let's use the go tool tool to analyze it.

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    go func() {
        time.Sleep(time.Second)
        <-c2
        c1 <- 1
    }()
    select {
    case v := <-c1:
        fmt.Printf("%d <- c1", v)
    case c2 <- 1:
        fmt.Println("c2 <- 1")
    }
}

Filter the analysis results for CALL

  main.go:9             0x4a05c6                e81542f6ff                      CALL runtime.makechan(SB)               
  main.go:10            0x4a05ec                e8ef41f6ff                      CALL runtime.makechan(SB)               
  main.go:11            0x4a0620                e82b3bf9ff                      CALL runtime.newproc(SB)                
  main.go:16            0x4a0654                e82c94fbff                      CALL 0x459a85                           
  main.go:16            0x4a06e3                e8d8b7f9ff                      CALL runtime.selectgo(SB)               
  main.go:18            0x4a074c                e8df8df6ff                      CALL runtime.convT2E64(SB)              
  main.go:18            0x4a07ec                e8cf89ffff                      CALL fmt.Printf(SB)                     
  main.go:18            0x4a0806                e8f587fbff                      CALL runtime.gcWriteBarrier(SB)         
  main.go:20            0x4a088c                e87f8bffff                      CALL fmt.Println(SB)                    
  main.go:8             0x4a0898                e85369fbff                      CALL runtime.morestack_noctxt(SB)       
  main.go:12            0x4a0945                e8868efaff              CALL time.Sleep(SB)                     
  main.go:13            0x4a095c                e8ff4bf6ff              CALL runtime.chanrecv1(SB)              
  main.go:14            0x4a0976                e85541f6ff              CALL runtime.chansend1(SB)              
  main.go:11            0x4a0985                e86668fbff              CALL runtime.morestack_noctxt(SB) 

As you can see, the implementation of select depends on the selectgo function.

Think that's it, and then we start to analyze the selectgo function. No, I found another situation when I was cheap.

func main() {
    c1 := make(chan int)
    go func() {
        time.Sleep(time.Second)
        c1 <- 1
    }()
    select {
    case <-c1:
        fmt.Printf("c1 <- 1")
    default:
        fmt.Println("default")
    }
}

The results are as follows:

  main.go:9             0x49eca8                e8335bf6ff              CALL runtime.makechan(SB)               
  main.go:11            0x49eccf                e85c54f9ff              CALL runtime.newproc(SB)                
  main.go:17            0x49ece6                e83570f6ff              CALL runtime.selectnbrecv(SB)           
  main.go:18            0x49ed1c                e88f8bffff              CALL fmt.Printf(SB)                     
  main.go:22            0x49ed8f                e86c8dffff              CALL fmt.Println(SB)                    
  main.go:8             0x49ed96                e8556cfbff              CALL runtime.morestack_noctxt(SB)       
  main.go:12            0x49ee35                e87692faff              CALL time.Sleep(SB)                     
  main.go:13            0x49ee4f                e87c5cf6ff              CALL runtime.chansend1(SB)              
  main.go:11            0x49ee5e                e88d6bfbff              CALL runtime.morestack_noctxt(SB) 

As you can see, the implementation of select here relies on the underlying selectnbrecv function. If, since there is selectnbrecv function, will there be selectnbsend function? Keep trying.

func main() {
    c1 := make(chan int)
    go func() {
        time.Sleep(time.Second)
        <- c1
    }()
    select {
    case c1 <- 1:
        fmt.Printf("c1 <- 1")
    default:
        fmt.Println("default")
    }
}

Analysis of j results

  main.go:9             0x49ecb3                e8285bf6ff                      CALL runtime.makechan(SB)               
  main.go:11            0x49ecda                e85154f9ff                      CALL runtime.newproc(SB)                
  main.go:17            0x49ed05                e81670f6ff                      CALL runtime.selectnbsend(SB)           
  main.go:18            0x49ed3b                e8708bffff                      CALL fmt.Printf(SB)                     
  main.go:22            0x49edb4                e8478dffff                      CALL fmt.Println(SB)                    
  main.go:8             0x49edbb                e8306cfbff                      CALL runtime.morestack_noctxt(SB)       
  main.go:12            0x49ee65                e84692faff              CALL time.Sleep(SB)                     
  main.go:13            0x49ee7c                e8df66f6ff              CALL runtime.chanrecv1(SB)              
  main.go:11            0x49ee8b                e8606bfbff              CALL runtime.morestack_noctxt(SB)

Here we use the selectnbsend function to implement the select statement, and then continue to experiment, and draw the following conclusions:

  • If there is only one case in the select statement waiting to receive data from the channel, the selectnbrecv implementation is called
  • If there is only one case in the select statement waiting to send data to channel, the selectnbsend implementation is called
  • If there are multiple case s in the select statement waiting to send or receive data to or from one or more channel s, the selectgo implementation is called

Okay, we started tracking from selectgo, but before tracking selectgo, we need to select reflect_rselect, or we'll look at the parameters of selectgo function, which is totally confused.

2.2.2. reflect_rselect

func reflect_rselect(cases []runtimeSelect) (int, bool) {
    // If there is no case select ion, dormant the current goroutine
    if len(cases) == 0 {
        block()
    }
    sel := make([]scase, len(cases))
    order := make([]uint16, 2*len(cases))
    for i := range cases {
        rc := &cases[i]
        switch rc.dir {
        case selectDefault:
            sel[i] = scase{kind: caseDefault}
        case selectSend:
            // If it is sent, C < - 1, rc. Val is the address of 1.
            sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
        case selectRecv:
            // If it is received, v:= < - c, rc. Val is the address of V.
            sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
        }
    }
    return selectgo(&sel[0], &order[0], len(cases))
}

2.2.3. selectgo

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    // Order is a slice of 2*ncases length, and then order[0-ncases] is given to pollorder and order[ncases-2ncases] to lockorder.
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]

    // Replace send/receive cases involving nil channels with
    // caseNil so logic below can assume non-nil channel.
    for i := range scases {
        cas := &scases[i]
        if cas.c == nil && cas.kind != caseDefault {
            *cas = scase{}
        }
    }

    // The compiler rewrites selects that statically have
    // only 0 or 1 cases plus default into simpler constructs.
    // The only way we can end up with such small sel.ncase
    // values here is for a larger select in which most channels
    // have been nilled out. The general code handles those
    // cases correctly, and they are rare enough not to bother
    // optimizing (and needing to test).

    // generate permuted order
    // Determine the order of polling
    for i := 1; i < ncases; i++ {
        j := fastrandn(uint32(i + 1))
        pollorder[i] = pollorder[j]
        pollorder[j] = uint16(i)
    }

    // sort the cases by Hchan address to get the locking order.
    // simple heap sort, to guarantee n log n time and constant stack footprint.
    // Locking order is determined by hchan's address, and heap sort is used to reduce time complexity.
    for i := 0; i < ncases; i++ {
        j := i
        // Start with the pollorder to permute cases on the same channel.
        c := scases[pollorder[i]].c
        for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
            k := (j - 1) / 2
            lockorder[j] = lockorder[k]
            j = k
        }
        lockorder[j] = pollorder[i]
    }
    for i := ncases - 1; i >= 0; i-- {
        o := lockorder[i]
        c := scases[o].c
        lockorder[i] = lockorder[0]
        j := 0
        for {
            k := j*2 + 1
            if k >= i {
                break
            }
            if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
                k++
            }
            if c.sortkey() < scases[lockorder[k]].c.sortkey() {
                lockorder[j] = lockorder[k]
                j = k
                continue
            }
            break
        }
        lockorder[j] = o
    }

    // lock all the channels involved in the select
    // Lock case by case according to the lock order determined above
    sellock(scases, lockorder)

    var (
        gp     *g
        sg     *sudog
        c      *hchan
        k      *scase
        sglist *sudog
        sgnext *sudog
        qp     unsafe.Pointer
        nextp  **sudog
    )

loop:
    // pass 1 - look for something already waiting
    var dfli int
    var dfl *scase
    var casi int
    var cas *scase
    var recvOK bool
    for i := 0; i < ncases; i++ {
        // According to pollorder, get the current polling case
        casi = int(pollorder[i])
        cas = &scases[casi]
        c = cas.c

        switch cas.kind {
        // case of nil type, ignore, continue next
        case caseNil:
            continue

        case caseRecv:
            // Recv type case, to determine whether there is sudog waiting to send data in sendq's queue, if obtained, jump to recv
            sg = c.sendq.dequeue()
            if sg != nil {
                goto recv
            }
            // No sudog is queued in sendq queue, and then checks if there is data in buf. If there is data in buf, jump to bufrecv.
            if c.qcount > 0 {
                goto bufrecv
            }
            // Finally, sendq buf can't get the data, then judge whether the channel is closed or not.
            // So it can be seen that if we close a channel with buf, we can still read the previously stored data after closing it.
            if c.closed != 0 {
                goto rclose
            }

        case caseSend:
            // send type case, first confirm whether the channel is closed
            if c.closed != 0 {
                goto sclose
            }
            // Then decide if there is sudog waiting to receive data in the recvq queue, or jump to the send tag
            sg = c.recvq.dequeue()
            if sg != nil {
                goto send
            }
            // Judging whether there is a spare buf location, you can let yourself put the data on it, and if so, jump to the bufsend tag.
            if c.qcount < c.dataqsiz {
                goto bufsend
            }

        case caseDefault:
            // Update and record the index and address of case
            dfli = casi
            dfl = cas
        }
    }
    // Judge if there is default based on dfl, and go
    // After all case traversals are completed, if you don't need to wait, you will jump to the corresponding label, such as recv bufrecv send, etc. If you go here, it means that all cases can't get or send data directly, waiting for another g to be ready.
    if dfl != nil {
        selunlock(scases, lockorder)
        casi = dfli
        cas = dfl
        // If there is default, execute default directly
        goto retc
    }

    // pass 2 - enqueue on all chans
    // When the process is executed here, all case s need to wait, and there is no default execution.
    gp = getg()
    if gp.waiting != nil {
        throw("gp.waiting != nil")
    }
    nextp = &gp.waiting
    // According to lockorder, for each case, create the corresponding sudog and put it into the recvq or sendq queue of the channel corresponding to the case
    for _, casei := range lockorder {
        casi = int(casei)
        cas = &scases[casi]
        if cas.kind == caseNil {
            continue
        }
        c = cas.c
        // Each case takes a sudog and binds it to the sendq or recvq queue of the cahnnel corresponding to the case
        sg := acquireSudog()
        sg.g = gp
        sg.isSelect = true
        // No stack splits between assigning elem and enqueuing
        // sg on gp.waiting where copystack can find it.
        sg.elem = cas.elem
        sg.releasetime = 0
        if t0 != 0 {
            sg.releasetime = -1
        }
        sg.c = c
        // Construct waiting list in lock order.
        // According to lockorder, connect these sudogs in series depending on sudog.waitlink
        *nextp = sg
        nextp = &sg.waitlink

        switch cas.kind {
        case caseRecv:
            // If recv, put it in the recvq queue
            c.recvq.enqueue(sg)

        case caseSend:
            // If it is send, put it in the sendq queue
            c.sendq.enqueue(sg)
        }
    }

    // wait for someone to wake us up
    // Sleep awaits Awakening
    gp.param = nil
    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    // 
    sellock(scases, lockorder)

    gp.selectDone = 0
    sg = (*sudog)(gp.param)
    gp.param = nil

    // pass 3 - dequeue from unsuccessful chans
    // otherwise they stack up on quiet channels
    // record the successful case, if any.
    // We singly-linked up the SudoGs in lock order.
    casi = -1
    cas = nil
    sglist = gp.waiting
    // Clear all elem before unlinking from gp.waiting.
    // Before disbanding the waiting queue, empty the data first, because the execution to this queue must be due to another goroutine in recv or send a channel, and get the data, so, after execution to here, the data is useless.
    for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
        sg1.isSelect = false
        sg1.elem = nil
        sg1.c = nil
    }
    gp.waiting = nil

    for _, casei := range lockorder {
        k = &scases[casei]
        if k.kind == caseNil {
            continue
        }
        if sglist.releasetime > 0 {
            k.releasetime = sglist.releasetime
        }
        if sg == sglist {
            // sg has already been dequeued by the G that woke us up.
            // Identify the self-awakening caused by this sudog
            casi = int(casei)
            cas = k
        } else {
            // Remove other waiting sudog s from the waiting queue
            c = k.c
            if k.kind == caseSend {
                c.sendq.dequeueSudoG(sglist)
            } else {
                c.recvq.dequeueSudoG(sglist)
            }
        }
        sgnext = sglist.waitlink
        sglist.waitlink = nil
        releaseSudog(sglist)
        sglist = sgnext
    }

    if cas == nil {
        // If cas is nil, it means that it may be awakened by other factors and recycled again.
        goto loop
    }

    c = cas.c
    if cas.kind == caseRecv {
        recvOK = true
    }
    selunlock(scases, lockorder)
    goto retc

bufrecv:
    // can receive from buffer
    // recv operation, and buf is not empty, you can get data from buf
    recvOK = true
    qp = chanbuf(c, c.recvx)
    if cas.elem != nil {
        typedmemmove(c.elemtype, cas.elem, qp)
    }
    typedmemclr(c.elemtype, qp)
    // Update the index of recvx in buf
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    // Update the number of data in buf
    c.qcount--
    // Unlock the current case
    selunlock(scases, lockorder)
    goto retc

bufsend:
    // can send to buffer
    // send operation, and buf has free space to store, copy their own data to the end of the buf team
    typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
    // Update the index of sendx in buf
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    // Update the number of data in buf
    c.qcount++
    // Unlock the current case
    selunlock(scases, lockorder)
    goto retc

recv:
    // can receive from sleeping sender (sg)
    // Recv operation, but sudog is waiting in sendq, through recv method, get data
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

rclose:
    // read at end of closed channel
    // recv operation, but the channel is close d
    selunlock(scases, lockorder)
    recvOK = false
    if cas.elem != nil {
        typedmemclr(c.elemtype, cas.elem)
    }
    goto retc

send:
    // can send to a sleeping receiver (sg)
    // send operation, but sudog waiting in recvq queue
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

retc:
    // Return
    return casi, recvOK

sclose:
    // send on closed channel
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))
}

2.2.4. selectnbrecv

When there is only one case in a select and the case is an operation to receive data, the select calls the selectnbrecv function to implement it.

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    selected, _ = chanrecv(c, elem, false)
    return
}

Here you will find that selectnbrecv is implemented by calling chanrecv, that is to say, the <-c1 we parsed above is the same, which is equivalent to the expression of selectnbrecv back into a separate <-c.

2.2.5. selectnbsend

Like selectnbrecv, when there is only one case for selectnbrecv and the case is sent to channel, it will fall back to C < - 1 expression.

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}

2.2.6. Summary

So, the select ion process is roughly as follows

  1. Judge whether each case needs to be blocked or not, and jump execution directly.
  2. If the sending and receiving operations of each case need to be blocked, then judge whether there is default, and if so, execute default.
  3. If there is no default for each case, create a sudog for each case and bind it to the sendq or recvq queue of the channel corresponding to the case.
  4. If a sudog is fortunate, it is woken up, clears all sudog data and other attributes, and removes other sudogs from the queue.
  5. At this point, a select operation ends

3. Summary

I still very much like Tucao, selectgo function gorgeous written more than 300 lines, which also used a number of goto to jump, really can not split it, but the code of God, or really need to worship.

4. Reference Documents

Keywords: Go Hibernate

Added by coco777 on Thu, 05 Sep 2019 05:06:15 +0300