Go channel knowledge summary

What is channel

brief introduction

One of the core ideas of Go language is "sharing memory by means of communication", and channel is its best embodiment.
channel provides a mechanism to synchronize two functions executed concurrently, and allows two functions to communicate by passing specific types of values to each other
channel can be initialized in two ways: with cache and without cache

make(chan int, 0)
make(chan int, 10)

usage method:

c := make(int)
defer close(c)
go func() {
	c<-1
}()
n := <- c
fmt.Println(n)

Internal structure of channel

The implementation of chan is in runtime / chan Go is a structure of hchan

type hchan struct {
	qcount uint 		// Number of data in the queue
	dataqsiz uint 		// The size of the wake-up queue. The channel itself is a ring queue
	buf unsafe.Pointer 	// Pointer for storing actual data, use unsafe Pointer stores the address to avoid gc
	elemsize uint16
	closed uint32		// Identifies whether the channel is closed
	elemtype *_type 	// Data element type
	sendx uint 			// index of send
	recvx uint 			// index of recv
	recvq waitq 		// Queue blocked in recv
	sendq waitq 		// Blocking the queue at send

	lock mutex 			// lock
}

It can be seen that the channel itself is a ring buffer, and the data is stored on the heap. The synchronization of the channel is realized through the lock, not the imagined lock free way. There are two queues in the channel, one is the sending blocking queue and the other is the receiving blocking queue. When sending data to a full channel is blocked, the sending process will be added to sendq. Similarly, when receiving data to an empty channel, the receiving process will also be blocked and put into recvq.

waitq is a linked list, which simply encapsulates the g structure

Create channel

When we create a channel through make in the code, we actually call the following function

CALL runtime.makechan(SB)

makechan is implemented as follows

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	// Judge element type and size
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	
	// Judge alignment limits
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	// Judge whether the non negative sum of size is greater than the maxAlloc limit
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

According to the above code, we can see that creating channel s can be divided into three cases:

  1. When the buffer size is 0, you only need to allocate hchansize memory
  2. The buffer size is not 0, and the channel type does not contain pointers. At this time, buf is the continuous memory of hchannesize + element size * number of elements
  3. If the buffer size is not 0 and the channel type contains pointers, you cannot simply apply for memory according to the size of the element. You need to allocate memory through mallocgc

Send data:
Sending data calls Chan The following interfaces in go:

CALL runtime.chansend1(SB)

chansend1 will call the chansend interface. The implementation of chansend is as follows:

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation.
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

c is the specific channel, ep is the data sent, and block is true, which means the transmission is blocked. Generally, the data sent to the channel is blocked. If the channel data is full, it will be blocked here all the time. However, in select, if a case listens to the sending of a channel, the block parameter is false at this time, which will be discussed in the subsequent analysis of the implementation of select.

select {
case <-c: // This is non blocking transmission
	// do some thing
default:
	// do some thing
}

The chansend interface will judge some conditions

If you send data to a nil channel, if it is blocked, the transmission will be blocked all the time:

if c == nil {
	if !block {
		return false
	}

	gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
	throw("unreachable")
}

First, it will lock to ensure atomicity. If you send data to a closed channel, it will panic

lock(&c, lock)
if c.close != 0 {
	unlock(&c, lock)
	
	panic(plainError("send on closed channel"))
}

If there is a waiting process in recvq at this time, directly call the send function to copy the data to the receiver. The implementation is as follows:

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			qp := chanbuf(c, c.recvx)
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

If there is no waiting process at this time and the data is not full, copy the data into the ring buffer and move the position back one bit

if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

If the ring buffer is full and the transmission is blocked, the sender will be placed in the sendq queue

Receive data:
Receiving data will call the following interface

CALL runtime.chanrecv1(SB)

chanrecv1 will call the chanrecv interface, and the chanrecv method is as follows

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not ready for receiving, we observe that the
	// channel is not closed. Each of these observations is a single word-sized read
	// (first c.sendq.first or c.qcount, and second c.closed).
	// Because a channel cannot be reopened, the later observation of the channel
	// being not closed implies that it was also not closed at the moment of the
	// first observation. We behave as if we observed the channel at that moment
	// and report that the receive cannot proceed.
	//
	// The order of operations is important here: reversing the operations can lead to
	// incorrect behavior when racing with a close.
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

c refers to the channel that needs to be operated, and the received data will be written to ep. block is the same as that in send, indicating whether it is blocking reception or non blocking reception. Non blocking i reception refers to receiving a channel value in case in select

select {
	case a := <-c: // This is non blocking reception and no data is returned directly
	// do some thing
	default:
	// do some thing
}

First, chanrecv will also do some parameter verification

If the channel is nil and is in non blocking mode, return directly. If it is in blocking mode, wait forever

if c == nil {
	if !block {
		return
	}
	
	gopark(nil,nil,waitReasonChanReceiveNilChan, traceEvGoStop, 2)
	throw("unreachable")
}

It will then be locked to prevent competitive reading and writing

lock(&c.lock)

If you receive data from a closed channel and there is still data in the channel, you can still receive data, which belongs to the normal situation of receiving data.
If you receive data from a closed channel and there is no data in the channel, you will return (true, false) at this time, indicating that there is a value returned, but it is not the value we need

if c.closed != 0 && c.qcount == 0 {
	if raceenabled {
		raceacquire(c.raceaddr())
	}

	unlock(&c.lock)
	if ep != nil {
		typedmemclr(c.elemtype, ep)
	}

	return true, false
}

Reception can also be divided into three cases:
If the sender is blocking in sendq at this time, the recv function will be called

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
		}
		// copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

At this time, the sender is waiting, which means that the data in the channel is full. At this time, the data in the head of the channel will be copied to the receiver, and then the data of the sender in the head of the sender's queue will be copied to that location. This involves two copy operations.
In the second case, if there is no sender waiting, the data will be copied to the channel

if c.qcount > 0 {
	// receive directly from queue
	qp := chanbuf(c, c.recvx)
	if raceenabled {
		raceacquire(qp)
		racerelease(qp)
	}

	if ep != nil {
		typedmemmove(c.elemtype, ep, qp)
	}
	
	typedmemclr(c.elemtype, qp)
	c.recvx++

	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	
	c.qcount--
	unlock(&c.lock)

	return true, true
}

Close channel
When the channel is closed, the following interface will be called

func closechan(c *hchan)

First, I will do some data verification

if c == nil {
	panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
	unlock(&c.lock)
	panic(plainError("close of closed channel"))
}

if raceenabled {
	callerpc := getcallerpc()
	racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
	racerelease(c.raceaddr())
}

c.closed = 1 // Set close flag bit

If you initiate a close operation to a nil channel or a closed channel, you will panic.

Then it will wake up all the co processes in recvq or sendq

var glist gList

  // release all readers
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  // release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)

If there is a receiver, set the received data to 0 through typedmemclr
If there are senders, all senders will be panic

  • summary
    To sum up, there are several points to pay attention to when using channel
    Ensure that all data is sent before closing the channel, which is closed by the sender
    Do not close the channel repeatedly
    Do not send a value to a channel that is nil
    Do not receive values from channel s that are nil
    When receiving data, you can judge whether it is ok by the return value
n. ok := <- c
if ok {
	// do some thing
}

Keywords: Go Back-end

Added by amma on Thu, 24 Feb 2022 17:23:23 +0200