Select Scheduling in Go Language

First, we will explain the use of select according to examples. (Sharing memory by communication)

Question: Suppose we have a coal mine (coal owner). There are two workers and a truck is responsible for transporting the coal mine. Now you need to dig all the coal mines in the fastest time. And check their work status in real time.

First, let's "create" workers:

func generator() chan int {
	out := make(chan int)

	go func() {
		i := 0
		for {
			time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

Then workers will dig up a certain amount of coal mine data at intervals and wait for it to be taken away.

Note: Here we have another protocol to prevent no recipient from being blocked when sending.

 

Then let's "recruit" drivers for transportation. First, list the drivers'conditions.

First of all, it is necessary to have transportation skills.

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("work id %d received %d \n", id, n)
	}
}

Create a driver/truck:

func createWork(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

 

After all the conditions are ready, we shall begin our coal digging work:

1. Create two workers, a truck

var c1, c2 = generator(), generator()
w := createWork(0)

2. Start transportation work:

Idea: Start digging coal, two workers start digging coal at the same time, if digging into the truck immediately.


	var c1, c2 = generator(), generator()
	w := createWork(0)

	
	n := 0
	for {
		select {
		case n = <-c1:   // Here n is for viewing convenience. Others can be directly w<-<-c1. 
			w <- n
			fmt.Println("Received c1 data:", n)
		case n = <-c2:
			w <- n
			fmt.Println("Received c2 data:", n)
		}
	}

Not being sent directly to w also reduces the pressure on the receiver.

Operation results:

If the sender's speed is faster than the receiver's, the original data will be covered before the receiver's urgent consumption, so the definition of n here is incorrect.

So we should now store the data and wait for the recipient to process it slowly. [] int mode storage.

    var val []int
	for {
		select {
		case n := <-c1:
			val = append(val, n)
		case n := <-c2:
			val = append(val, n)
		}
	}

If the sender is far beyond the recipient's capability, then the backlog of [] int data is excessive. We use time packages to detect the backlog of data.

    var val []int
	tick := time.Tick(time.Second)
	for {
		var activeW chan<- int
		var activeV int
		if len(val) > 0 {
			activeW = w
			activeV = val[0]
		}
		select {
		case n := <-c1:
			val = append(val, n)
		case n := <-c2:
			val = append(val, n)
		case <-tick: // Extrusion test
			fmt.Println("queue len = : ", len(val), "  ", val)
		}
	}

time.Tick returns a chan bool type channel and receives data at intervals.  

If the data is received too much, the processing time of some data is too long, which may cause the collapse or slow of the service. We should set the processing time of each data within the allowable range, and limit the processing time to be completed within a certain time.  

Moreover, if the accumulated data is too much unprocessed, the service will run for a long time at high frequency. By limiting the execution time, the amount of data processed by the service is constrained.

    var val []int
    tm := time.After(10 * time.Second)
	for {
		var activeW chan<- int
		var activeV int
		if len(val) > 0 {
			activeW = w
			activeV = val[0]
		}
		select {
		case n := <-c1:
			val = append(val, n)
		case n := <-c2:
			val = append(val, n)
		case <-time.After(80 * time.Millisecond): //Overtime operation
			fmt.Println("timeout")
        case <-tm: // Execute for a period of time
			fmt.Println("bye")
			return
		
		}
	}

Time. After is defined outside of the for loop and is initialized only once, so it is automatically disconnected when the time limit is reached. If time.After is defined in for, each loop is initialized once, that is, the length of each processing.

 

Finally, the complete code:

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)

	go func() {
		i := 0
		for {
			time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("work id %d received %d \n", id, n)
	}
}

func createWork(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	var c1, c2 = generator(), generator()
	w := createWork(0)

	// Prevent slow reception by sending data that overrides previously unacceptable data
	var val []int
	tm := time.After(10 * time.Second)
	tick := time.Tick(time.Second)
	for {
		var activeW chan<- int
		var activeV int
		if len(val) > 0 {
			activeW = w
			activeV = val[0]
		}
		select {
		case n := <-c1:
			val = append(val, n)
		case n := <-c2:
			val = append(val, n)
		case activeW <- activeV: // nil channel select is not passed (blocked)
			val = val[1:]
		case <-time.After(80 * time.Millisecond): //Overtime operation
			fmt.Println("timeout")
		case <-tick: // Extrusion test
			fmt.Println("queue len = : ", len(val), "  ", val)
		case <-tm: // Execute for a period of time
			fmt.Println("bye")
			return
		}
	}

}

Because select cannot pass when dispatching to nil channel, assigning w to uninitialized activeW is a nil channel before data is available, so it cannot send data to nil channel and case cannot pass.

As mentioned above, if you send data directly to w, case passes through w<-n before receiving the data sent by c1 c2, then you receive a value of 0 (nil value), which makes the data inaccurate.

 

Please correct the record sticker.

Added by hayson1991 on Sat, 18 May 2019 20:36:57 +0300