Two of the most attractive things about Go, besides goroutine, that is, channel, I've been wondering how select ion actually works. As in my previous article, some irrelevant code is omitted directly
1. Overview of structure
1.1. hchan
This is the structure of channel.
type hchan struct { qcount uint // Total data in queues dataqsiz uint // The size of the ring queue, > 0 means buffered, = 0 means no buffered buf unsafe.Pointer // A pointer to an array of elements elemsize uint16 // Size of a single element closed uint32 // Indicates closing or not. elemtype *_type // Element types are described later when interface is written sendx uint // Index of send array, C < - I recvx uint // Index of receive array <-c recvq waitq // A list of goroutine s waiting for recv data sendq waitq // goroutine list waiting for send data lock mutex }
1.2. waitq
type waitq struct { first *sudog last *sudog }
1.3. sudog
sudog represents a waiting g
type sudog struct { g *g // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // Data element, C < - 1, then 1 // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
1.4. hcase
This is the structure generated by a case in select
type scase struct { c *hchan // chan elem unsafe.Pointer // data element kind uint16 // Current case type, nil recv send or default pc uintptr // race pc (for race detector / msan) releasetime int64 }
From the above structure, we can see that the internal essence of channel is a buffer pool + two queues (send recv). So how does the data interact? There is a sketch map on the internet, which shows a more vivid image.
1.5. Illustration
1.5.1. Buffer-free (synchronization)
1.5.2. Buffered (asynchronous)
Combining the above structure and illustration, we can probably infer the channel's send recv process.
-
If it's a recv (<-channel) request, first determine if someone in a sendq queue is waiting for the data to be placed.
- If the sendq queue is not empty and the buffer pool is not empty, then the sendq queue is waiting to put data. The g of recv takes data from the buffer pool, and then puts the data carried by the first g of sendq into the buf buffer pool.
- If sendq is not empty but the buffer pool is empty, then this is chan without buffer pool. I take the first g data from sendq and it's ok.
- If sendq is empty, go to the buffer pool and see if the buffer pool has data, then take it and go.
- If sendq is empty and the buffer pool has no data, wait here.
- If send, the process is the same as recv
- If the channel is closed at this point, wake up all waiting g in the waiting queue (sendq or recvq) and tell them channel.close = true
Next is tracking the source code, proving and correcting the conjecture.
2. Source code analysis
2.1. Receiving and Sending
2.1.1. main
Let's use the go tool to analyze how channel generation, C < - i,< - C are implemented at the bottom level.
func main() { c1 := make(chan int) c2 := make(chan int, 2) go func() { c1 <- 1 c2 <- 2 }() <-c1 <-c2 close(c1) close(c2) }
go build -gcflags=all="-N -l" main.go
go tool objdump -s "main.main" main
After we filter out the CALL
▶ go tool objdump -s "main\.main" main | grep CALL main.go:4 0x4548d5 e806fbfaff CALL runtime.makechan(SB) main.go:5 0x4548f8 e8e3fafaff CALL runtime.makechan(SB) main.go:6 0x454929 e822a1fdff CALL runtime.newproc(SB) main.go:10 0x454940 e81b08fbff CALL runtime.chanrecv1(SB) main.go:11 0x454957 e80408fbff CALL runtime.chanrecv1(SB) main.go:12 0x454965 e82605fbff CALL runtime.closechan(SB) main.go:13 0x454973 e81805fbff CALL runtime.closechan(SB) main.go:3 0x454982 e8d981ffff CALL runtime.morestack_noctxt(SB) main.go:7 0x454a32 e899fcfaff CALL runtime.chansend1(SB) main.go:8 0x454a4c e87ffcfaff CALL runtime.chansend1(SB) main.go:6 0x454a5b e80081ffff CALL runtime.morestack_noctxt(SB)
- makechan: Functions that create channel s are the same with or without buffers
- Chanrecv1: <-c1, the function called
- Cloechan: The function called when close (c1) closes channel usage
- Chansend1: C1 < - 1, which is the function used to send data
2.1.2. makechan
The main part of creating channel is to allocate memory to the structure and bug buffer pool, and then initialize the structure of hchan.
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. // Check the size limit of elem if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // Alignment restriction if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // Size, which is 2 in make(chan int, 2), defaults to zero to determine the upper and lower limits of size if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize { panic(plainError("makechan: size out of range")) } var c *hchan switch { case size == 0 || elem.size == 0: // Queue or element size is 0, no buffer pool is allocated // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. // buf points to itself without allocating memory c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. // Allocate a block of memory to store hchan and buf c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. // It refers to the type of pointer, which can be assigned to the normal hchan structure, and buf can be allocated separately. c = new(hchan) c.buf = mallocgc(uintptr(size)*elem.size, elem, true) } // Initialize the properties of hchan c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
2.1.3. chanrecv1
chanrecv1 calls the chanrecv implementation. chanrecv listens to channels and receives data from channels and writes it into ep
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { lock(&c.lock) if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { // Clear the data values in the address, but do not change the type typedmemclr(c.elemtype, ep) } return true, false } if sg := c.sendq.dequeue(); sg != nil { // Get a sudog waiting for send, and then determine whether the channel has a buffer. If there is a buffer, get the data in sudog. If the channel has a buffer, get the header element of the buffer, and add the element of sudog to the end of the queue of the buffer. recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue // Buffers have data, and send queues do not wait for sudog to send data (asynchronous and full or not), retrieve data according to the recvx index qp := chanbuf(c, c.recvx) // If ep is not nil, copy gp to ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // Data Clearance in gp Address typedmemclr(c.elemtype, qp) // Update the index of the next recv c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // Update qcount count count c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. // No sudog for send, no data for buffer, need to be blocked gp := getg() // Get the structure of a sudog and update the properties of the sudog mysg := acquireSudog() mysg.releasetime = 0 // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // Put this sudog in the recv queue c.recvq.enqueue(mysg) // Hibernate this g. When G is awakened, continue to execute from here. goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil // After cleaning up the properties of sudog, release sudog releaseSudog(mysg) return true, !closed }
From the above logic, we can see four possibilities of data transmission.
- Sendq queue is not empty, but buf is empty (in the case of synchronous blocking g): get sudog of sendq queue head and copy sudog.elem data to the target address ep
- Sendq queue is not empty, buf is not empty (in the case of asynchronous blocking g g): copy the buf header element to the target address ep, get sudog of sendq queue header, and then copy the sudog.elem data to the end of the buf queue, release sudog
- sendq queue is empty, but buf is not empty (in the case of asynchronous non-blocking g): copy the buf header element to the target address ep
- sendq queue is empty and buf is empty (synchronous non-blocking g case): at this time, you need to block yourself, get a sudog structure, put it in channel's recvq queue, wait for send g to wake up and copy your data to the target address
If you think about it, you will find a problem. After L66 goparkunlock (& C. lock, waitReason ChanReceive, traceEvGoBlockRecv, 3) hibernates g above, G is woken up and continues to execute from here, as if there is no logic to show that the recv g gets the data, and this G is blocked here for equivalence. So far, but none of the following logic operates on data?
The recv method that follows is understandable
2.1.3.1. recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // If it is a channel without buffer if c.dataqsiz == 0 { if ep != nil { // copy data from sender // Copy data directly between two g recvDirect(c.elemtype, sg, ep) } } else { // Here's the logic of buffers. // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // Because the sendq queue gets sudog waiting to send data, it indicates that the buffer is full. Get the address of the queue head element in buf according to rcvx qp := chanbuf(c, c.recvx) // copy data from queue to receiver if ep != nil { // Copy the data in buf into ep typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue // Copy the sudog data from the sendq queue to the just buf address, and update the index of recvx in buf, that is, the table name, the address of the first element of the buf queue, and move back. typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } // Clear sudog data sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // Wake up the sugog corresponding g in sendq goready(gp, skip+1) }
Combining the above logic, we find that before g is waked up, the sudog data associated with g is already used by channel, so when g is waked up, there is no need to deal with the logic related to data transmission.
2.1.3.2. acquireSudog
Get a sudog structure, which uses the two-level cache of p scheme, i.e. local cache of an array of sudog, just like the queues of cache and scheduler scheduler scheduled to run g. At the same time, a linked list of sudog cache is maintained on the global scheme structure. When p local sudog is insufficient or excessive, go ahead. Balancing with global sched
func acquireSudog() *sudog { // Lock up mp := acquirem() pp := mp.p.ptr() // If the current cache does not have sudog, then go to the global scheme and pull some sudog caches in batches to the current p if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // First, try to grab a batch from central cache. for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // If the central cache is empty, allocate a new one. if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // Get the first return from the local cache sudog and update the sudogcache slice n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } // De lock releasem(mp) return s }
2.1.3.3. releaseSudog
releaseSudog is sudog that releases the currently used sudog and balances the sudog of p local cache and sudog of global queue.
func releaseSudog(s *sudog) { mp := acquirem() // avoid rescheduling to another P pp := mp.p.ptr() // If the number of sudogs cached locally in p exceeds the maximum length of this slice, balance the general sudog to the global scheme if len(pp.sudogcache) == cap(pp.sudogcache) { // Transfer half of local cache to the central cache. var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p } lock(&sched.sudoglock) last.next = sched.sudogcache sched.sudogcache = first unlock(&sched.sudoglock) } // Put the released sudog in the slice of the local cache pp.sudogcache = append(pp.sudogcache, s) releasem(mp) }
2.1.4. chansend1
The sending logic is similar to the receiving logic.
func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) // Get a sudog from the recvq queue if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // If qcount < dataqsiz, this channel is a channel with buf, and buf is not full, just add data ep to the end of the buf team. if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // Update qcount c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. // Go here and say, buf is full or no buf, and recvq queue is empty, you need to block the current g, waiting for other G to receive data gp := getg() // Get a sudog and initialize the relevant properties mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // Bring sudog into sendq c.sendq.enqueue(mysg) // Hibernate the current g, wait for other g recv data, after recv data, wake up the G goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil // Release sudog releaseSudog(mysg) return true }
2.1.4.1. send
Send and recv have the same logic, and because a sudog is taken from recvq, it means that the buffer is empty, so the send method does not need to consider adding data to the buffer. Send is simpler than recv, it only needs to exchange data and wake up g.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
2.1.5. closechan
Receiving and receiving data is over, and finally the channel is closed.
func closechan(c *hchan) { // nil chan check if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) // closed chan check if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } // Set c to closed state c.closed = 1 var glist *g // release all readers // Traverse recvq, clear the sudog data, and string the g corresponding to sudog in recvq into a linked list for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil gp.schedlink.set(glist) glist = gp } // release all writers (they will panic) // Traversing sendq, clearing sudog data, and linking g in sudog in sendq with sudog in recvq into a list for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) glist = gp } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // Wake up all g collected above for glist != nil { gp := glist glist = glist.schedlink.ptr() gp.schedlink = 0 goready(gp, 3) } }
After chan close, sudog in all blocked recvq and sendq (recvq and sendq have only one queue), clear some data and state of sudog, set gp.param = nil, let upper logic know that it is caused by close chan
After waking up all g, g will continue to execute the remaining logic in chansend or chanrecv, that is, to release sudog (which is why closechan does not need to release sudog)
2.1.6. Summary
Language expression is always pale. When looking for information on the Internet, I saw two flow charts, which can be seen in combination.
send process
Receiving process (recv)
2.2. select
2.2.1. main
channel's sending and receiving process has been tracked above, and the process has been clear, but with channel there is also a select, what is the selection process?
Let's use the go tool tool to analyze it.
func main() { c1 := make(chan int) c2 := make(chan int) go func() { time.Sleep(time.Second) <-c2 c1 <- 1 }() select { case v := <-c1: fmt.Printf("%d <- c1", v) case c2 <- 1: fmt.Println("c2 <- 1") } }
Filter the analysis results for CALL
main.go:9 0x4a05c6 e81542f6ff CALL runtime.makechan(SB) main.go:10 0x4a05ec e8ef41f6ff CALL runtime.makechan(SB) main.go:11 0x4a0620 e82b3bf9ff CALL runtime.newproc(SB) main.go:16 0x4a0654 e82c94fbff CALL 0x459a85 main.go:16 0x4a06e3 e8d8b7f9ff CALL runtime.selectgo(SB) main.go:18 0x4a074c e8df8df6ff CALL runtime.convT2E64(SB) main.go:18 0x4a07ec e8cf89ffff CALL fmt.Printf(SB) main.go:18 0x4a0806 e8f587fbff CALL runtime.gcWriteBarrier(SB) main.go:20 0x4a088c e87f8bffff CALL fmt.Println(SB) main.go:8 0x4a0898 e85369fbff CALL runtime.morestack_noctxt(SB) main.go:12 0x4a0945 e8868efaff CALL time.Sleep(SB) main.go:13 0x4a095c e8ff4bf6ff CALL runtime.chanrecv1(SB) main.go:14 0x4a0976 e85541f6ff CALL runtime.chansend1(SB) main.go:11 0x4a0985 e86668fbff CALL runtime.morestack_noctxt(SB)
As you can see, the implementation of select depends on the selectgo function.
Think that's it, and then we start to analyze the selectgo function. No, I found another situation when I was cheap.
func main() { c1 := make(chan int) go func() { time.Sleep(time.Second) c1 <- 1 }() select { case <-c1: fmt.Printf("c1 <- 1") default: fmt.Println("default") } }
The results are as follows:
main.go:9 0x49eca8 e8335bf6ff CALL runtime.makechan(SB) main.go:11 0x49eccf e85c54f9ff CALL runtime.newproc(SB) main.go:17 0x49ece6 e83570f6ff CALL runtime.selectnbrecv(SB) main.go:18 0x49ed1c e88f8bffff CALL fmt.Printf(SB) main.go:22 0x49ed8f e86c8dffff CALL fmt.Println(SB) main.go:8 0x49ed96 e8556cfbff CALL runtime.morestack_noctxt(SB) main.go:12 0x49ee35 e87692faff CALL time.Sleep(SB) main.go:13 0x49ee4f e87c5cf6ff CALL runtime.chansend1(SB) main.go:11 0x49ee5e e88d6bfbff CALL runtime.morestack_noctxt(SB)
As you can see, the implementation of select here relies on the underlying selectnbrecv function. If, since there is selectnbrecv function, will there be selectnbsend function? Keep trying.
func main() { c1 := make(chan int) go func() { time.Sleep(time.Second) <- c1 }() select { case c1 <- 1: fmt.Printf("c1 <- 1") default: fmt.Println("default") } }
Analysis of j results
main.go:9 0x49ecb3 e8285bf6ff CALL runtime.makechan(SB) main.go:11 0x49ecda e85154f9ff CALL runtime.newproc(SB) main.go:17 0x49ed05 e81670f6ff CALL runtime.selectnbsend(SB) main.go:18 0x49ed3b e8708bffff CALL fmt.Printf(SB) main.go:22 0x49edb4 e8478dffff CALL fmt.Println(SB) main.go:8 0x49edbb e8306cfbff CALL runtime.morestack_noctxt(SB) main.go:12 0x49ee65 e84692faff CALL time.Sleep(SB) main.go:13 0x49ee7c e8df66f6ff CALL runtime.chanrecv1(SB) main.go:11 0x49ee8b e8606bfbff CALL runtime.morestack_noctxt(SB)
Here we use the selectnbsend function to implement the select statement, and then continue to experiment, and draw the following conclusions:
- If there is only one case in the select statement waiting to receive data from the channel, the selectnbrecv implementation is called
- If there is only one case in the select statement waiting to send data to channel, the selectnbsend implementation is called
- If there are multiple case s in the select statement waiting to send or receive data to or from one or more channel s, the selectgo implementation is called
Okay, we started tracking from selectgo, but before tracking selectgo, we need to select reflect_rselect, or we'll look at the parameters of selectgo function, which is totally confused.
2.2.2. reflect_rselect
func reflect_rselect(cases []runtimeSelect) (int, bool) { // If there is no case select ion, dormant the current goroutine if len(cases) == 0 { block() } sel := make([]scase, len(cases)) order := make([]uint16, 2*len(cases)) for i := range cases { rc := &cases[i] switch rc.dir { case selectDefault: sel[i] = scase{kind: caseDefault} case selectSend: // If it is sent, C < - 1, rc. Val is the address of 1. sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val} case selectRecv: // If it is received, v:= < - c, rc. Val is the address of V. sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val} } } return selectgo(&sel[0], &order[0], len(cases)) }
2.2.3. selectgo
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) { cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // Order is a slice of 2*ncases length, and then order[0-ncases] is given to pollorder and order[ncases-2ncases] to lockorder. scases := cas1[:ncases:ncases] pollorder := order1[:ncases:ncases] lockorder := order1[ncases:][:ncases:ncases] // Replace send/receive cases involving nil channels with // caseNil so logic below can assume non-nil channel. for i := range scases { cas := &scases[i] if cas.c == nil && cas.kind != caseDefault { *cas = scase{} } } // The compiler rewrites selects that statically have // only 0 or 1 cases plus default into simpler constructs. // The only way we can end up with such small sel.ncase // values here is for a larger select in which most channels // have been nilled out. The general code handles those // cases correctly, and they are rare enough not to bother // optimizing (and needing to test). // generate permuted order // Determine the order of polling for i := 1; i < ncases; i++ { j := fastrandn(uint32(i + 1)) pollorder[i] = pollorder[j] pollorder[j] = uint16(i) } // sort the cases by Hchan address to get the locking order. // simple heap sort, to guarantee n log n time and constant stack footprint. // Locking order is determined by hchan's address, and heap sort is used to reduce time complexity. for i := 0; i < ncases; i++ { j := i // Start with the pollorder to permute cases on the same channel. c := scases[pollorder[i]].c for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { k := (j - 1) / 2 lockorder[j] = lockorder[k] j = k } lockorder[j] = pollorder[i] } for i := ncases - 1; i >= 0; i-- { o := lockorder[i] c := scases[o].c lockorder[i] = lockorder[0] j := 0 for { k := j*2 + 1 if k >= i { break } if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() { k++ } if c.sortkey() < scases[lockorder[k]].c.sortkey() { lockorder[j] = lockorder[k] j = k continue } break } lockorder[j] = o } // lock all the channels involved in the select // Lock case by case according to the lock order determined above sellock(scases, lockorder) var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog ) loop: // pass 1 - look for something already waiting var dfli int var dfl *scase var casi int var cas *scase var recvOK bool for i := 0; i < ncases; i++ { // According to pollorder, get the current polling case casi = int(pollorder[i]) cas = &scases[casi] c = cas.c switch cas.kind { // case of nil type, ignore, continue next case caseNil: continue case caseRecv: // Recv type case, to determine whether there is sudog waiting to send data in sendq's queue, if obtained, jump to recv sg = c.sendq.dequeue() if sg != nil { goto recv } // No sudog is queued in sendq queue, and then checks if there is data in buf. If there is data in buf, jump to bufrecv. if c.qcount > 0 { goto bufrecv } // Finally, sendq buf can't get the data, then judge whether the channel is closed or not. // So it can be seen that if we close a channel with buf, we can still read the previously stored data after closing it. if c.closed != 0 { goto rclose } case caseSend: // send type case, first confirm whether the channel is closed if c.closed != 0 { goto sclose } // Then decide if there is sudog waiting to receive data in the recvq queue, or jump to the send tag sg = c.recvq.dequeue() if sg != nil { goto send } // Judging whether there is a spare buf location, you can let yourself put the data on it, and if so, jump to the bufsend tag. if c.qcount < c.dataqsiz { goto bufsend } case caseDefault: // Update and record the index and address of case dfli = casi dfl = cas } } // Judge if there is default based on dfl, and go // After all case traversals are completed, if you don't need to wait, you will jump to the corresponding label, such as recv bufrecv send, etc. If you go here, it means that all cases can't get or send data directly, waiting for another g to be ready. if dfl != nil { selunlock(scases, lockorder) casi = dfli cas = dfl // If there is default, execute default directly goto retc } // pass 2 - enqueue on all chans // When the process is executed here, all case s need to wait, and there is no default execution. gp = getg() if gp.waiting != nil { throw("gp.waiting != nil") } nextp = &gp.waiting // According to lockorder, for each case, create the corresponding sudog and put it into the recvq or sendq queue of the channel corresponding to the case for _, casei := range lockorder { casi = int(casei) cas = &scases[casi] if cas.kind == caseNil { continue } c = cas.c // Each case takes a sudog and binds it to the sendq or recvq queue of the cahnnel corresponding to the case sg := acquireSudog() sg.g = gp sg.isSelect = true // No stack splits between assigning elem and enqueuing // sg on gp.waiting where copystack can find it. sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } sg.c = c // Construct waiting list in lock order. // According to lockorder, connect these sudogs in series depending on sudog.waitlink *nextp = sg nextp = &sg.waitlink switch cas.kind { case caseRecv: // If recv, put it in the recvq queue c.recvq.enqueue(sg) case caseSend: // If it is send, put it in the sendq queue c.sendq.enqueue(sg) } } // wait for someone to wake us up // Sleep awaits Awakening gp.param = nil gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) // sellock(scases, lockorder) gp.selectDone = 0 sg = (*sudog)(gp.param) gp.param = nil // pass 3 - dequeue from unsuccessful chans // otherwise they stack up on quiet channels // record the successful case, if any. // We singly-linked up the SudoGs in lock order. casi = -1 cas = nil sglist = gp.waiting // Clear all elem before unlinking from gp.waiting. // Before disbanding the waiting queue, empty the data first, because the execution to this queue must be due to another goroutine in recv or send a channel, and get the data, so, after execution to here, the data is useless. for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil for _, casei := range lockorder { k = &scases[casei] if k.kind == caseNil { continue } if sglist.releasetime > 0 { k.releasetime = sglist.releasetime } if sg == sglist { // sg has already been dequeued by the G that woke us up. // Identify the self-awakening caused by this sudog casi = int(casei) cas = k } else { // Remove other waiting sudog s from the waiting queue c = k.c if k.kind == caseSend { c.sendq.dequeueSudoG(sglist) } else { c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } if cas == nil { // If cas is nil, it means that it may be awakened by other factors and recycled again. goto loop } c = cas.c if cas.kind == caseRecv { recvOK = true } selunlock(scases, lockorder) goto retc bufrecv: // can receive from buffer // recv operation, and buf is not empty, you can get data from buf recvOK = true qp = chanbuf(c, c.recvx) if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) } typedmemclr(c.elemtype, qp) // Update the index of recvx in buf c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // Update the number of data in buf c.qcount-- // Unlock the current case selunlock(scases, lockorder) goto retc bufsend: // can send to buffer // send operation, and buf has free space to store, copy their own data to the end of the buf team typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) // Update the index of sendx in buf c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // Update the number of data in buf c.qcount++ // Unlock the current case selunlock(scases, lockorder) goto retc recv: // can receive from sleeping sender (sg) // Recv operation, but sudog is waiting in sendq, through recv method, get data recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) recvOK = true goto retc rclose: // read at end of closed channel // recv operation, but the channel is close d selunlock(scases, lockorder) recvOK = false if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } goto retc send: // can send to a sleeping receiver (sg) // send operation, but sudog waiting in recvq queue send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) goto retc retc: // Return return casi, recvOK sclose: // send on closed channel selunlock(scases, lockorder) panic(plainError("send on closed channel")) }
2.2.4. selectnbrecv
When there is only one case in a select and the case is an operation to receive data, the select calls the selectnbrecv function to implement it.
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return }
Here you will find that selectnbrecv is implemented by calling chanrecv, that is to say, the <-c1 we parsed above is the same, which is equivalent to the expression of selectnbrecv back into a separate <-c.
2.2.5. selectnbsend
Like selectnbrecv, when there is only one case for selectnbrecv and the case is sent to channel, it will fall back to C < - 1 expression.
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }
2.2.6. Summary
So, the select ion process is roughly as follows
- Judge whether each case needs to be blocked or not, and jump execution directly.
- If the sending and receiving operations of each case need to be blocked, then judge whether there is default, and if so, execute default.
- If there is no default for each case, create a sudog for each case and bind it to the sendq or recvq queue of the channel corresponding to the case.
- If a sudog is fortunate, it is woken up, clears all sudog data and other attributes, and removes other sudogs from the queue.
- At this point, a select operation ends
3. Summary
I still very much like Tucao, selectgo function gorgeous written more than 300 lines, which also used a number of goto to jump, really can not split it, but the code of God, or really need to worship.
4. Reference Documents
- Go Language Learning Notes--Rain Traces
- Analysis of Go CHannel Source Code
- Deep Understanding of Go Channel
- Analysis of the Implementation Principle of Go channel