Typical application scenarios of Channel

Message exchange (producers and consumers)

One goroutine can safely insert data into the Channel, and another goroutine can safely read data from the Channel. Goroutine can safely realize information exchange, that is, producer and consumer mode.
A work pool is a set of processes that wait for tasks and process them.

 1 package main
 2 
 3 import (
 4    "fmt"
 5    "math/rand"
 6    "sync"
 7    "time"
 8 )
 9 
10 type Task struct {
11    id        int
12    randomNum int
13 }
14 
15 type Result struct {
16    task               Task
17    randomNumDigitsSum int
18 }
19 
20 var taskCh = make(chan Task, 10)
21 var resultCh = make(chan Result, 10)
22 
23 func createTasks(taskCount int) {
24    for i := 0; i < taskCount; i++ {
25       taskCh <- Task{i + 1, rand.Intn(999)}
26    }
27    close(taskCh)
28 }
29 
30 func getRandomNumDigitsSum(num int) int {
31    sum := 0
32    for num > 0 {
33       sum += num % 10
34       num /= 10
35    }
36    time.Sleep(2 * time.Second)
37    return sum
38 }
39 
40 func work(wg *sync.WaitGroup) {
41    for task := range taskCh {
42       result := Result{task, getRandomNumDigitsSum(task.randomNum)}
43       resultCh <- result
44    }
45    wg.Done()
46 }
47 
48 func createWorkers(workerCount int) {
49    var wg sync.WaitGroup
50    for i := 0; i < workerCount; i++ {
51       wg.Add(1)
52       go work(&wg)
53    }
54 
55    wg.Wait()
56    close(resultCh)
57 }
58 
59 func getResults(done chan bool) {
60    for result := range resultCh {
61       fmt.Printf("task id is %d, randomNum is %d, randomNumDigitsSum is %d\n",
62          result.task.id, result.task.randomNum, result.randomNumDigitsSum)
63    }
64    done <- true
65 }
66 
67 func main() {
68    startTime := time.Now()
69 
70    taskCount := 100
71    go createTasks(taskCount)
72 
73    workerCount := 10
74    go createWorkers(workerCount)
75 
76    done := make(chan bool)
77    go getResults(done)
78    <-done
79 
80    endTime := time.Now()
81    fmt.Printf("cost time: %fs\n", endTime.Sub(startTime).Seconds())
82 }

Here, the task of the work pool is to calculate the sum of each bit of the entered number.
The channel capacity of taskCh and resultCh is 10.
Ten work collaborations continuously obtain values from taskCh and put the results into resultCh.
After the first batch of 10 tasks enter taskCh, wait for 2s after each work collaboration is obtained, and the second batch starts to enter.
After 2s, the second batch will be obtained by each work cooperation process, and the first batch of results will be printed.
Until the end, it takes about 20s.

 

Operation results

 1 -----Wait 2 seconds
 2 task id is 4, randomNum is 983, randomNumDigitsSum is 20
 3 task id is 2, randomNum is 636, randomNumDigitsSum is 15
 4 task id is 10, randomNum is 150, randomNumDigitsSum is 6
 5 task id is 7, randomNum is 520, randomNumDigitsSum is 7
 6 task id is 5, randomNum is 895, randomNumDigitsSum is 22
 7 task id is 9, randomNum is 904, randomNumDigitsSum is 13
 8 task id is 8, randomNum is 998, randomNumDigitsSum is 26
 9 task id is 3, randomNum is 407, randomNumDigitsSum is 11
10 task id is 1, randomNum is 878, randomNumDigitsSum is 23
11 task id is 6, randomNum is 735, randomNumDigitsSum is 15
12 -----Wait 2 seconds
13 task id is 12, randomNum is 538, randomNumDigitsSum is 16
14 task id is 11, randomNum is 212, randomNumDigitsSum is 5
15 task id is 14, randomNum is 362, randomNumDigitsSum is 11
16 task id is 15, randomNum is 436, randomNumDigitsSum is 13
17 task id is 13, randomNum is 750, randomNumDigitsSum is 12
18 task id is 17, randomNum is 630, randomNumDigitsSum is 9
19 task id is 16, randomNum is 215, randomNumDigitsSum is 8
20 task id is 20, randomNum is 914, randomNumDigitsSum is 14
21 task id is 19, randomNum is 20, randomNumDigitsSum is 2
22 task id is 18, randomNum is 506, randomNumDigitsSum is 11
23 -----Wait 2 seconds
24 Up to 100...
25 cost time: 20.024336s

 

Processing millions of data requests per minute through Go: https://blog.csdn.net/tybaoerge/article/details/50392386

 

 

taskCh in taskChPool is a non buffered channel.
The worker puts his own taskCh into the taskChPool, and the dispatch takes out the taskCh from the taskChPool and puts the task into the extracted taskCh.
After the worker takes out the task from his taskCh and executes it, he puts his taskCh into the taskChPool again and waits for a new task.

 

 1 package main
 2 
 3 import (
 4    "fmt"
 5    "math/rand"
 6    "time"
 7 )
 8 
 9 type Task struct {
10    id        int
11    randomNum int
12 }
13 
14 type Worker struct {
15    taskCh chan Task
16 }
17 
18 var taskCh = make(chan Task, 10)
19 var taskChPool chan chan Task
20 
21 func createTasks(taskCount int) {
22    for i := 0; i < taskCount; i++ {
23       taskCh <- Task{i + 1, rand.Intn(999)}
24    }
25    close(taskCh)
26 }
27 
28 func getRandomNumDigitsSum(num int) int {
29    sum := 0
30    for num > 0 {
31       sum += num % 10
32       num /= 10
33    }
34    time.Sleep(2 * time.Second)
35    return sum
36 }
37 
38 func (w *Worker) work() {
39    for {
40       taskChPool <- w.taskCh
41       if task, ok := <-w.taskCh; ok {
42          fmt.Printf("task id is %d, randomNum is %d, randomNumDigitsSum is %d\n",
43             task.id, task.randomNum, getRandomNumDigitsSum(task.randomNum))
44       }
45    }
46 }
47 
48 func createWorkers(workerCount int) {
49    for i := 0; i < workerCount; i++ {
50       worker := Worker{make(chan Task)}
51       go worker.work()
52    }
53 }
54 
55 func dispatch() {
56    for task := range taskCh {
57       go func(task Task) {
58          taskCh := <-taskChPool
59          taskCh <- task
60       }(task)
61    }
62 }
63 
64 func main() {
65    taskCount := 100
66    go createTasks(taskCount)
67 
68    workerCount := 10
69    go createWorkers(workerCount)
70 
71    taskChPool = make(chan chan Task, workerCount)
72    go dispatch()
73 
74    time.Sleep(30 * time.Second)
75 }

 

Comparison of the two methods:
Whether the request will be blocked due to the slow processing of the request.
In the first scheme, if the processing of the request is too slow, the task will be put into taskCh, which is equivalent to blocking the request.
In the second scheme, if the processing of the request is too slow, the request is asynchronously cached by creating a co process, and then executed when the worker is free, instead of blocking the request processing until the worker is empty.

 

Data transfer (token)

There are four goroutines, numbered 1, 2, 3 and 4. Every second, a goroutine will print out its own number. You are required to write a program to make the output number always print out in the order of 1, 2, 3, 4, 1, 2, 3, 4.
In order to realize sequential data transmission, a token variable is defined. Whoever gets the token can print his own number once, and pass the token to the next goroutine at the same time.

 1 package main
 2 
 3 import (
 4    "fmt"
 5    "time"
 6 )
 7 
 8 type Token struct{}
 9 
10 func newWorker(id int, ch chan Token, nextCh chan Token) {
11    for {
12       token := <-ch         // Get token
13       fmt.Println(id + 1) // Print id
14       time.Sleep(time.Second)
15       nextCh <- token
16    }
17 }
18 
19 func main() {
20    chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}
21 
22    // Create 4 worker That is, start four processes to read one respectively channel
23    for i := 0; i < 4; i++ {
24       go newWorker(i, chs[i], chs[(i+1)%4])
25    }
26 
27    //First give the token to the first one worker
28    chs[0] <- Token{}
29 
30    select {}
31 }

Operation results

 1 1                                                                               
 2 2                                                                               
 3 3                                                                               
 4 4                                                                               
 5 1                                                                               
 6 2                                                                               
 7 3                                                                               
 8 4                                                                               
 9 1                                                                               
10 2

Define a Token type.
Define the function to create the worker (read the token from your own chan). Which goroutine gets the token can print out its own number. Because the data needs to be printed once a second, sleep for 1 second and then give the token to its next home.
Start the goroutine of each worker on line 24, and give the token to the first worker on line 28.

Signal notification (program exit)

 1 func main() {
 2   go func() {
 3       ...... // Execute business processing
 4   }()
 5 
 6   // handle CTRL+C Equal interrupt signal
 7   termChan := make(chan os.Signal)
 8   signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
 9   <-termChan
10 
11   // Perform the cleanup action before exiting
12   doCleanup()
13   
14   fmt.Println("Graceful exit")
15 }

Doclean up can be a time-consuming operation, for example, it takes more than ten minutes to complete. If the program needs to wait so long to exit, the user cannot accept it. Therefore, set a maximum waiting time. As long as this time is exceeded, the program will no longer wait and can exit directly. Therefore, the exit is divided into two stages: closing represents the exit of the program, but the cleaning work has not been done; closed means that the cleaning work has been completed.

 1 func main() {
 2     var closing = make(chan struct{})
 3     var closed = make(chan struct{})
 4 
 5     go func() {
 6         // Simulated service processing
 7         for {
 8             select {
 9             case <-closing:
10                 return
11             default:
12                 // ....... Business calculation
13                 time.Sleep(100 * time.Millisecond)
14             }
15         }
16     }()
17 
18     // handle CTRL+C Equal interrupt signal
19     termChan := make(chan os.Signal)
20     signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
21     <-termChan
22 
23       // Stop business processing
24     close(closing)
25     // Perform the cleanup action before exiting
26     go doCleanup(closed)
27 
28     select {
29     case <-closed:
30     case <-time.After(time.Second):
31         fmt.Println("Cleaning timeout, wait")
32     }
33     fmt.Println("Graceful exit")
34 }
35 
36 func doCleanup(closed chan struct{}) {
37     time.Sleep((time.Minute))
38     close(closed)
39 }

 

Added by nellson on Tue, 25 Jan 2022 00:32:41 +0200