Processing mechanism of chansend/chanrecv when Go channel - block is false

Preface

This article aggregates the specific processing of send and recv when selecting 2 cases (1 send/recv case, 1 default case).

For more content sharing, please follow the Public Number: Go Development Notes

chansend

select {
case c <- v:
	... foo
default:
	... bar
}

The bottom corresponding func is selectnbsend.

selectnbsend

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

As we can see from the comments, select 2 cases (1 send/recv case, 1 default case) will be compiled as if...else is processed as a form.

Note: block defaults to false at this time.

chansend implementation

The following source code omits the logic where the block is true and part of the debug and race logic.

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block { // For select chan, sending a message to nil Chan directly returns false
			return false
		}
		...
	}

	...
	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	if !block && c.closed == 0 && full(c) { // For select chan, return false directly when the cache is full without closing
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock) // Acquire locks

	if c.closed != 0 { // If chan is closed, release the lock and panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {// If there are waiting recipients in the receive wait queue, send them directly to the recipient.
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    // Following is the processing without a recipient
	if c.qcount < c.dataqsiz {// If the current amount of data is less than the cache, that is, the cache has leftover and is put in the send cache queue
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)// Get the location where the data is stored in the cache
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)// Store data in a specified location in the cache
		c.sendx++// Point to Next Location
		if c.sendx == c.dataqsiz {// If the queue size has been reached, it is full and points to the beginning again
			c.sendx = 0
		}
		c.qcount++ // Data volume+1
		unlock(&c.lock)// Release lock
		return true // Send Successfully
	}
    // When the cache is full
	if !block {// select channel direct return failed
		unlock(&c.lock)
		return false
	}

	...
	return true // Send Successfully
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {// sg stored sent content
		sendDirect(c.elemtype, sg, ep)// Sending memory directly to the recipient actually copies the data to the recipient
		sg.elem = nil // empty
	}
	gp := sg.g // Get Receive goroutine gp
	unlockf()
	gp.param = unsafe.Pointer(sg) // Set wake-up parameter to sg in waiting list
	sg.success = true // Set sg status to success
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1) // Receive goroutine ready to run
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack. Cross Stack Copy

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
	dst := sg.elem 
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)// Add a write barrier before copying data to ensure that no preemption occurs between read and use.
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(dst, src, t.size) //Copy data to dst, sg. In elem
}

When using send in this scenario, you need to be aware that:

  • block is false, no blocking occurs
  • chan send to nil, returning false directly
  • If chan is not closed and the cache is full, return false directly
  • panic is sent to closed chan send
  • If there is already a waiting recipient, it will be sent directly to the recipient (directly copy the data to the recipient)
  • Continue caching if cache is not full
  • Return false if the cache is full
  • The recv/close operation can wake up send goroutine, except that the current goroutine will panic after close and return true after recv

chanrecv

// recv1
select {
case v = <-c:
	... foo
default:
	... bar
}

// recv2
select {
case v, ok = <-c: 
	... foo
default:
	... bar
}

The recv handling of the above two scenarios is the same, recv1 and recv2 correspond to the bottom func selectnbrecv and selectnbrecv2, respectively.

selectnbrecv and

// compiler implements
//
//	select {
//	case v = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbrecv(&v, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if c != nil && selectnbrecv2(&v, &ok, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	// TODO(khr): just return 2 values from this function, now that it is in Go.
	selected, *received = chanrecv(c, elem, false)
	return
}å

The difference between selectnbrecv and selectnbrecv2 is whether or not received. Note that the default value of the block is false.

Implementation of chanrecv

The following source code omits the logic where the block is true and part of the debug and race logic.

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {/
		if !block {// For select chan, receiving a message to nil Chan returns false directly
			return
		}
		...
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {// For select chan, Chan cache is empty
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&c.closed) == 0 { // If chan is close, return directly
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {// Check again if the cache is empty
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep) //Empty data in ep
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {// Check again
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {// Receive data directly from sender if there are waiting senders in the send queue
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	if c.qcount > 0 {// If there is a cache
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)// Get the location of the cache
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {// Copy data into ep
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++ // Point to Next Location
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount-- // Reduce the number of caches
		unlock(&c.lock)
		return true, true
	}

	if !block {// For the select channel operation, false is returned directly when there is no cache
		unlock(&c.lock)
		return false, false
	}
	return true, success
	...
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size) // Copy Data
}

When using recv in this scenario, you need to be aware that:

  • block is false
  • chan recv to nil returns selected=false,received=false directly
  • chan recv with closed and empty cache gets a zero value of type chan
  • If there are already senders waiting,
    • If there is no cache chan, the data will be received directly from the sender (from the sender to the recipient)
    • Otherwise, take the cache of the receiving location and store the sending data to the cache of the sending location
  • If there is a cache, take the cache of the receiving location
  • If there is no cache, return selected=false,received=false directly
  • The send/close operation can wake up the send goroutine, except that:
    • After close, recv returns the cache if there is a cache, and if there is no cache, recv returns zero
    • Returns true after send, true

summary

Finally, it summarizes the processing logic of send/recv in two cases (one send/recv case, one default case) with one diagram:

The biggest difference from using it alone (including select single send/recv) is that it does not block the goroutine and returns the result directly when the condition is not met.

Keywords: Go source code analysis select Channel block

Added by justinma on Wed, 09 Feb 2022 08:18:10 +0200