First, we will explain the use of select according to examples. (Sharing memory by communication)
Question: Suppose we have a coal mine (coal owner). There are two workers and a truck is responsible for transporting the coal mine. Now you need to dig all the coal mines in the fastest time. And check their work status in real time.
First, let's "create" workers:
func generator() chan int { out := make(chan int) go func() { i := 0 for { time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond) out <- i i++ } }() return out }
Then workers will dig up a certain amount of coal mine data at intervals and wait for it to be taken away.
Note: Here we have another protocol to prevent no recipient from being blocked when sending.
Then let's "recruit" drivers for transportation. First, list the drivers'conditions.
First of all, it is necessary to have transportation skills.
func worker(id int, c chan int) { for n := range c { fmt.Printf("work id %d received %d \n", id, n) } }
Create a driver/truck:
func createWork(id int) chan<- int { c := make(chan int) go worker(id, c) return c }
After all the conditions are ready, we shall begin our coal digging work:
1. Create two workers, a truck
var c1, c2 = generator(), generator() w := createWork(0)
2. Start transportation work:
Idea: Start digging coal, two workers start digging coal at the same time, if digging into the truck immediately.
var c1, c2 = generator(), generator() w := createWork(0) n := 0 for { select { case n = <-c1: // Here n is for viewing convenience. Others can be directly w<-<-c1. w <- n fmt.Println("Received c1 data:", n) case n = <-c2: w <- n fmt.Println("Received c2 data:", n) } }
Not being sent directly to w also reduces the pressure on the receiver.
Operation results:
If the sender's speed is faster than the receiver's, the original data will be covered before the receiver's urgent consumption, so the definition of n here is incorrect.
So we should now store the data and wait for the recipient to process it slowly. [] int mode storage.
var val []int for { select { case n := <-c1: val = append(val, n) case n := <-c2: val = append(val, n) } }
If the sender is far beyond the recipient's capability, then the backlog of [] int data is excessive. We use time packages to detect the backlog of data.
var val []int tick := time.Tick(time.Second) for { var activeW chan<- int var activeV int if len(val) > 0 { activeW = w activeV = val[0] } select { case n := <-c1: val = append(val, n) case n := <-c2: val = append(val, n) case <-tick: // Extrusion test fmt.Println("queue len = : ", len(val), " ", val) } }
time.Tick returns a chan bool type channel and receives data at intervals.
If the data is received too much, the processing time of some data is too long, which may cause the collapse or slow of the service. We should set the processing time of each data within the allowable range, and limit the processing time to be completed within a certain time.
Moreover, if the accumulated data is too much unprocessed, the service will run for a long time at high frequency. By limiting the execution time, the amount of data processed by the service is constrained.
var val []int tm := time.After(10 * time.Second) for { var activeW chan<- int var activeV int if len(val) > 0 { activeW = w activeV = val[0] } select { case n := <-c1: val = append(val, n) case n := <-c2: val = append(val, n) case <-time.After(80 * time.Millisecond): //Overtime operation fmt.Println("timeout") case <-tm: // Execute for a period of time fmt.Println("bye") return } }
Time. After is defined outside of the for loop and is initialized only once, so it is automatically disconnected when the time limit is reached. If time.After is defined in for, each loop is initialized once, that is, the length of each processing.
Finally, the complete code:
package main import ( "fmt" "math/rand" "time" ) func generator() chan int { out := make(chan int) go func() { i := 0 for { time.Sleep(time.Duration(rand.Intn(150)) * time.Millisecond) out <- i i++ } }() return out } func worker(id int, c chan int) { for n := range c { fmt.Printf("work id %d received %d \n", id, n) } } func createWork(id int) chan<- int { c := make(chan int) go worker(id, c) return c } func main() { var c1, c2 = generator(), generator() w := createWork(0) // Prevent slow reception by sending data that overrides previously unacceptable data var val []int tm := time.After(10 * time.Second) tick := time.Tick(time.Second) for { var activeW chan<- int var activeV int if len(val) > 0 { activeW = w activeV = val[0] } select { case n := <-c1: val = append(val, n) case n := <-c2: val = append(val, n) case activeW <- activeV: // nil channel select is not passed (blocked) val = val[1:] case <-time.After(80 * time.Millisecond): //Overtime operation fmt.Println("timeout") case <-tick: // Extrusion test fmt.Println("queue len = : ", len(val), " ", val) case <-tm: // Execute for a period of time fmt.Println("bye") return } } }
Because select cannot pass when dispatching to nil channel, assigning w to uninitialized activeW is a nil channel before data is available, so it cannot send data to nil channel and case cannot pass.
As mentioned above, if you send data directly to w, case passes through w<-n before receiving the data sent by c1 c2, then you receive a value of 0 (nil value), which makes the data inaccurate.
Please correct the record sticker.