In addition to transferring data safely between goroutine s, after reading Concurrency in Go, I lament that channel has so many patterns to use. In my personal learning, I summarize the following common patterns.
pipeline
concept
We take reptiles as an example. Generally, reptiles are divided into the following steps:
Grab Page - > Parse Page - > Integrate Data Analysis - > Store Analysis Results
If you put all the steps above in one function, it will be ugly and difficult to maintain. Considering decoupling, we can start four processes and take on different roles. For example, process 1 is responsible for crawling pages, process 2 is responsible for parsing pages, and so on. Each process gets a data. Later, it is handed over to the next process, which is the basic idea of pipeline. Each role is only responsible for caring about its own things.
Example
Given a number n, perform (n2 + 1) 2 operations
func pipeline() { generator := func(done chan interface{}, intergers ...int) <-chan int { inStream := make(chan int) go func() { defer close(inStream) for _, i := range intergers { select { case <-done: return case inStream <- i: } } }() return inStream } add := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int { addInStream := make(chan int) go func() { defer close(addInStream) for i := range inStream { select { case <-done: return case addInStream <- i + increment: } } }() return addInStream } multiply := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int { multiplyInStream := make(chan int) go func() { defer close(multiplyInStream) for i := range inStream { select { case <-done: return case multiplyInStream <- i * increment: } } }() return multiplyInStream } done := make(chan interface{}) defer close(done) inStream := generator(done, []int{1, 2, 3, 4, 5, 6, 7}...) pipeline := multiply(done, add(done, multiply(done, inStream, 2), 1), 2) for v := range pipeline { fmt.Println(v) } }
Fan in and out
In pipeline model, it is an efficient streaming processing. But if there are three links in pipeline, a,b,c, and B links, the processing of B links is very slow, then it will affect the processing of C links. If the number of processing of B links is increased, the effect of slow processing of B links on the whole pipeline can be weakened, then a-> more than B links. The process is fan-in. Many B-links output data to c-links, which is fan-out.
Example
func FanInFanOut() { producer := func(intergers ...int) <-chan interface{} { inStream := make(chan interface{}) go func() { defer close(inStream) for _, v := range intergers { time.Sleep(5 * time.Second) inStream <- v } }() return inStream } fanIn := func(channels ...<-chan interface{}, ) <-chan interface{} { var wg sync.WaitGroup multiplexStream := make(chan interface{}) multiplex := func(c <-chan interface{}) { defer wg.Done() for i := range c { multiplexStream <- i } } wg.Add(len(channels)) for _, c := range channels { go multiplex(c) } go func() { wg.Wait() close(multiplexStream) }() return multiplexStream } consumer := func(inStream <-chan interface{}) { for v := range inStream { fmt.Println(v) } } nums := runtime.NumCPU() producerStreams := make([]<-chan interface{}, nums) for i := 0; i < nums; i++ { producerStreams[i] = producer(i) } consumer(fanIn(producerStreams...)) }
tee- channel
concept
If you get an SQL statement from channel, and you want to record, analyze and execute the sql, then you need to forward the SQL to the corresponding channel of the three tasks, tee-channel does this.
Example
func teeChannel() { producer := func(intergers ...int) <-chan interface{} { inStream := make(chan interface{}) go func() { defer close(inStream) for _, v := range intergers { inStream <- v } }() return inStream } tee := func(in <-chan interface{}) (_, _ <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range in { out1, out2 := out1, out2 for i := 0; i < 2; i++ { select { case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }() return out1, out2 } out1, out2 := tee(producer(1, 2, 3, 4, 5)) for val1 := range out1 { fmt.Printf("out1: %v, out2: %v", val1, <-out2) } }
Bridge channel
concept
Whether pipeline or fan-in-fan-out mentioned above, each goroutine consumes a channel, but in the actual scenario, there may be multiple channels for us to consume. As consumers, we don't care which channel these values come from. In this case, we deal with a channel-filled ch. Annel may be a lot. If we define a function, we can decompose a channel full of channels into a simple channel, which will enable consumers to focus more on the work at hand. This is the idea of bridging channels.
Example
func bridge() { gen := func() <-chan <-chan interface{} { in := make(chan (<-chan interface{})) go func() { defer close(in) for i := 0; i < 10; i++ { stream := make(chan interface{}, 1) stream <- i close(stream) in <- stream } }() return in } bridge := func(in <-chan (<-chan interface{})) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { stream := make(<-chan interface{}) select { case maybeStream, ok := <-in: if ok == false { return } stream = maybeStream } for val := range stream { valStream <- val } } }() return valStream } for val := range bridge(gen()) { fmt.Println(val) } }