Message exchange (producers and consumers)
One goroutine can safely insert data into the Channel, and another goroutine can safely read data from the Channel. Goroutine can safely realize information exchange, that is, producer and consumer mode.
A work pool is a set of processes that wait for tasks and process them.
1 package main 2 3 import ( 4 "fmt" 5 "math/rand" 6 "sync" 7 "time" 8 ) 9 10 type Task struct { 11 id int 12 randomNum int 13 } 14 15 type Result struct { 16 task Task 17 randomNumDigitsSum int 18 } 19 20 var taskCh = make(chan Task, 10) 21 var resultCh = make(chan Result, 10) 22 23 func createTasks(taskCount int) { 24 for i := 0; i < taskCount; i++ { 25 taskCh <- Task{i + 1, rand.Intn(999)} 26 } 27 close(taskCh) 28 } 29 30 func getRandomNumDigitsSum(num int) int { 31 sum := 0 32 for num > 0 { 33 sum += num % 10 34 num /= 10 35 } 36 time.Sleep(2 * time.Second) 37 return sum 38 } 39 40 func work(wg *sync.WaitGroup) { 41 for task := range taskCh { 42 result := Result{task, getRandomNumDigitsSum(task.randomNum)} 43 resultCh <- result 44 } 45 wg.Done() 46 } 47 48 func createWorkers(workerCount int) { 49 var wg sync.WaitGroup 50 for i := 0; i < workerCount; i++ { 51 wg.Add(1) 52 go work(&wg) 53 } 54 55 wg.Wait() 56 close(resultCh) 57 } 58 59 func getResults(done chan bool) { 60 for result := range resultCh { 61 fmt.Printf("task id is %d, randomNum is %d, randomNumDigitsSum is %d\n", 62 result.task.id, result.task.randomNum, result.randomNumDigitsSum) 63 } 64 done <- true 65 } 66 67 func main() { 68 startTime := time.Now() 69 70 taskCount := 100 71 go createTasks(taskCount) 72 73 workerCount := 10 74 go createWorkers(workerCount) 75 76 done := make(chan bool) 77 go getResults(done) 78 <-done 79 80 endTime := time.Now() 81 fmt.Printf("cost time: %fs\n", endTime.Sub(startTime).Seconds()) 82 }
Here, the task of the work pool is to calculate the sum of each bit of the entered number.
The channel capacity of taskCh and resultCh is 10.
Ten work collaborations continuously obtain values from taskCh and put the results into resultCh.
After the first batch of 10 tasks enter taskCh, wait for 2s after each work collaboration is obtained, and the second batch starts to enter.
After 2s, the second batch will be obtained by each work cooperation process, and the first batch of results will be printed.
Until the end, it takes about 20s.
Operation results
1 -----Wait 2 seconds 2 task id is 4, randomNum is 983, randomNumDigitsSum is 20 3 task id is 2, randomNum is 636, randomNumDigitsSum is 15 4 task id is 10, randomNum is 150, randomNumDigitsSum is 6 5 task id is 7, randomNum is 520, randomNumDigitsSum is 7 6 task id is 5, randomNum is 895, randomNumDigitsSum is 22 7 task id is 9, randomNum is 904, randomNumDigitsSum is 13 8 task id is 8, randomNum is 998, randomNumDigitsSum is 26 9 task id is 3, randomNum is 407, randomNumDigitsSum is 11 10 task id is 1, randomNum is 878, randomNumDigitsSum is 23 11 task id is 6, randomNum is 735, randomNumDigitsSum is 15 12 -----Wait 2 seconds 13 task id is 12, randomNum is 538, randomNumDigitsSum is 16 14 task id is 11, randomNum is 212, randomNumDigitsSum is 5 15 task id is 14, randomNum is 362, randomNumDigitsSum is 11 16 task id is 15, randomNum is 436, randomNumDigitsSum is 13 17 task id is 13, randomNum is 750, randomNumDigitsSum is 12 18 task id is 17, randomNum is 630, randomNumDigitsSum is 9 19 task id is 16, randomNum is 215, randomNumDigitsSum is 8 20 task id is 20, randomNum is 914, randomNumDigitsSum is 14 21 task id is 19, randomNum is 20, randomNumDigitsSum is 2 22 task id is 18, randomNum is 506, randomNumDigitsSum is 11 23 -----Wait 2 seconds 24 Up to 100... 25 cost time: 20.024336s
Processing millions of data requests per minute through Go: https://blog.csdn.net/tybaoerge/article/details/50392386
taskCh in taskChPool is a non buffered channel.
The worker puts his own taskCh into the taskChPool, and the dispatch takes out the taskCh from the taskChPool and puts the task into the extracted taskCh.
After the worker takes out the task from his taskCh and executes it, he puts his taskCh into the taskChPool again and waits for a new task.
1 package main 2 3 import ( 4 "fmt" 5 "math/rand" 6 "time" 7 ) 8 9 type Task struct { 10 id int 11 randomNum int 12 } 13 14 type Worker struct { 15 taskCh chan Task 16 } 17 18 var taskCh = make(chan Task, 10) 19 var taskChPool chan chan Task 20 21 func createTasks(taskCount int) { 22 for i := 0; i < taskCount; i++ { 23 taskCh <- Task{i + 1, rand.Intn(999)} 24 } 25 close(taskCh) 26 } 27 28 func getRandomNumDigitsSum(num int) int { 29 sum := 0 30 for num > 0 { 31 sum += num % 10 32 num /= 10 33 } 34 time.Sleep(2 * time.Second) 35 return sum 36 } 37 38 func (w *Worker) work() { 39 for { 40 taskChPool <- w.taskCh 41 if task, ok := <-w.taskCh; ok { 42 fmt.Printf("task id is %d, randomNum is %d, randomNumDigitsSum is %d\n", 43 task.id, task.randomNum, getRandomNumDigitsSum(task.randomNum)) 44 } 45 } 46 } 47 48 func createWorkers(workerCount int) { 49 for i := 0; i < workerCount; i++ { 50 worker := Worker{make(chan Task)} 51 go worker.work() 52 } 53 } 54 55 func dispatch() { 56 for task := range taskCh { 57 go func(task Task) { 58 taskCh := <-taskChPool 59 taskCh <- task 60 }(task) 61 } 62 } 63 64 func main() { 65 taskCount := 100 66 go createTasks(taskCount) 67 68 workerCount := 10 69 go createWorkers(workerCount) 70 71 taskChPool = make(chan chan Task, workerCount) 72 go dispatch() 73 74 time.Sleep(30 * time.Second) 75 }
Comparison of the two methods:
Whether the request will be blocked due to the slow processing of the request.
In the first scheme, if the processing of the request is too slow, the task will be put into taskCh, which is equivalent to blocking the request.
In the second scheme, if the processing of the request is too slow, the request is asynchronously cached by creating a co process, and then executed when the worker is free, instead of blocking the request processing until the worker is empty.
Data transfer (token)
There are four goroutines, numbered 1, 2, 3 and 4. Every second, a goroutine will print out its own number. You are required to write a program to make the output number always print out in the order of 1, 2, 3, 4, 1, 2, 3, 4.
In order to realize sequential data transmission, a token variable is defined. Whoever gets the token can print his own number once, and pass the token to the next goroutine at the same time.
1 package main 2 3 import ( 4 "fmt" 5 "time" 6 ) 7 8 type Token struct{} 9 10 func newWorker(id int, ch chan Token, nextCh chan Token) { 11 for { 12 token := <-ch // Get token 13 fmt.Println(id + 1) // Print id 14 time.Sleep(time.Second) 15 nextCh <- token 16 } 17 } 18 19 func main() { 20 chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)} 21 22 // Create 4 worker That is, start four processes to read one respectively channel 23 for i := 0; i < 4; i++ { 24 go newWorker(i, chs[i], chs[(i+1)%4]) 25 } 26 27 //First give the token to the first one worker 28 chs[0] <- Token{} 29 30 select {} 31 }
Operation results
1 1 2 2 3 3 4 4 5 1 6 2 7 3 8 4 9 1 10 2
Define a Token type.
Define the function to create the worker (read the token from your own chan). Which goroutine gets the token can print out its own number. Because the data needs to be printed once a second, sleep for 1 second and then give the token to its next home.
Start the goroutine of each worker on line 24, and give the token to the first worker on line 28.
Signal notification (program exit)
1 func main() { 2 go func() { 3 ...... // Execute business processing 4 }() 5 6 // handle CTRL+C Equal interrupt signal 7 termChan := make(chan os.Signal) 8 signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) 9 <-termChan 10 11 // Perform the cleanup action before exiting 12 doCleanup() 13 14 fmt.Println("Graceful exit") 15 }
Doclean up can be a time-consuming operation, for example, it takes more than ten minutes to complete. If the program needs to wait so long to exit, the user cannot accept it. Therefore, set a maximum waiting time. As long as this time is exceeded, the program will no longer wait and can exit directly. Therefore, the exit is divided into two stages: closing represents the exit of the program, but the cleaning work has not been done; closed means that the cleaning work has been completed.
1 func main() { 2 var closing = make(chan struct{}) 3 var closed = make(chan struct{}) 4 5 go func() { 6 // Simulated service processing 7 for { 8 select { 9 case <-closing: 10 return 11 default: 12 // ....... Business calculation 13 time.Sleep(100 * time.Millisecond) 14 } 15 } 16 }() 17 18 // handle CTRL+C Equal interrupt signal 19 termChan := make(chan os.Signal) 20 signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) 21 <-termChan 22 23 // Stop business processing 24 close(closing) 25 // Perform the cleanup action before exiting 26 go doCleanup(closed) 27 28 select { 29 case <-closed: 30 case <-time.After(time.Second): 31 fmt.Println("Cleaning timeout, wait") 32 } 33 fmt.Println("Graceful exit") 34 } 35 36 func doCleanup(closed chan struct{}) { 37 time.Sleep((time.Minute)) 38 close(closed) 39 }