Some Patterns of Channel in Go Learning

In addition to transferring data safely between goroutine s, after reading Concurrency in Go, I lament that channel has so many patterns to use. In my personal learning, I summarize the following common patterns.

pipeline

concept

We take reptiles as an example. Generally, reptiles are divided into the following steps:

Grab Page - > Parse Page - > Integrate Data Analysis - > Store Analysis Results

If you put all the steps above in one function, it will be ugly and difficult to maintain. Considering decoupling, we can start four processes and take on different roles. For example, process 1 is responsible for crawling pages, process 2 is responsible for parsing pages, and so on. Each process gets a data. Later, it is handed over to the next process, which is the basic idea of pipeline. Each role is only responsible for caring about its own things.

Example

Given a number n, perform (n2 + 1) 2 operations

func pipeline() {
    generator := func(done chan interface{}, intergers ...int) <-chan int {
        inStream := make(chan int)
        go func() {
            defer close(inStream)
            for _, i := range intergers {
                select {
                case <-done:
                    return
                case inStream <- i:
                }
            }
        }()
        return inStream
    }

    add := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int {
        addInStream := make(chan int)
        go func() {
            defer close(addInStream)
            for i := range inStream {
                select {
                case <-done:
                    return
                case addInStream <- i + increment:
                }
            }
        }()
        return addInStream
    }

    multiply := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int {
        multiplyInStream := make(chan int)
        go func() {
            defer close(multiplyInStream)
            for i := range inStream {
                select {
                case <-done:
                    return
                case multiplyInStream <- i * increment:
                }
            }
        }()
        return multiplyInStream
    }

    done := make(chan interface{})
    defer close(done)
    inStream := generator(done, []int{1, 2, 3, 4, 5, 6, 7}...)
    pipeline := multiply(done, add(done, multiply(done, inStream, 2), 1), 2)

    for v := range pipeline {
        fmt.Println(v)
    }
}

Fan in and out

In pipeline model, it is an efficient streaming processing. But if there are three links in pipeline, a,b,c, and B links, the processing of B links is very slow, then it will affect the processing of C links. If the number of processing of B links is increased, the effect of slow processing of B links on the whole pipeline can be weakened, then a-> more than B links. The process is fan-in. Many B-links output data to c-links, which is fan-out.

Example

func FanInFanOut() {
    producer := func(intergers ...int) <-chan interface{} {
        inStream := make(chan interface{})
        go func() {
            defer close(inStream)
            for _, v := range intergers {
                time.Sleep(5 * time.Second)
                inStream <- v
            }
        }()
        return inStream
    }

    fanIn := func(channels ...<-chan interface{},
    ) <-chan interface{} {
        var wg sync.WaitGroup
        multiplexStream := make(chan interface{})

        multiplex := func(c <-chan interface{}) {
            defer wg.Done()
            for i := range c {
                multiplexStream <- i
            }
        }

        wg.Add(len(channels))
        for _, c := range channels {
            go multiplex(c)
        }
        go func() {
            wg.Wait()
            close(multiplexStream)
        }()
        return multiplexStream
    }

    consumer := func(inStream <-chan interface{}) {
        for v := range inStream {
            fmt.Println(v)
        }
    }

    nums := runtime.NumCPU()
    producerStreams := make([]<-chan interface{}, nums)
    for i := 0; i < nums; i++ {
        producerStreams[i] = producer(i)
    }

    consumer(fanIn(producerStreams...))
}

tee- channel

concept

If you get an SQL statement from channel, and you want to record, analyze and execute the sql, then you need to forward the SQL to the corresponding channel of the three tasks, tee-channel does this.

Example

func teeChannel() {
    producer := func(intergers ...int) <-chan interface{} {
        inStream := make(chan interface{})
        go func() {
            defer close(inStream)
            for _, v := range intergers {
                inStream <- v
            }
        }()
        return inStream
    }
    tee := func(in <-chan interface{}) (_, _ <-chan interface{}) {
        out1 := make(chan interface{})
        out2 := make(chan interface{})
        go func() {
            defer close(out1)
            defer close(out2)

            for val := range in {
                out1, out2 := out1, out2
                for i := 0; i < 2; i++ {
                    select {
                    case out1 <- val:
                        out1 = nil
                    case out2 <- val:
                        out2 = nil
                    }
                }
            }
        }()
        return out1, out2
    }

    out1, out2 := tee(producer(1, 2, 3, 4, 5))
    for val1 := range out1 {
        fmt.Printf("out1: %v, out2: %v", val1, <-out2)
    }
}

Bridge channel

concept

Whether pipeline or fan-in-fan-out mentioned above, each goroutine consumes a channel, but in the actual scenario, there may be multiple channels for us to consume. As consumers, we don't care which channel these values come from. In this case, we deal with a channel-filled ch. Annel may be a lot. If we define a function, we can decompose a channel full of channels into a simple channel, which will enable consumers to focus more on the work at hand. This is the idea of bridging channels.

Example

func bridge() {
    gen := func() <-chan <-chan interface{} {
        in := make(chan (<-chan interface{}))
        go func() {
            defer close(in)
            for i := 0; i < 10; i++ {
                stream := make(chan interface{}, 1)
                stream <- i
                close(stream)
                in <- stream
            }
        }()
        return in
    }

    bridge := func(in <-chan (<-chan interface{})) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                stream := make(<-chan interface{})
                select {
                case maybeStream, ok := <-in:
                    if ok == false {
                        return
                    }
                    stream = maybeStream
                }
                for val := range stream {
                    valStream <- val
                }
            }
        }()
        return valStream
    }

    for val := range bridge(gen()) {
        fmt.Println(val)
    }
}

Keywords: Go SQL

Added by ExpertAlmost on Thu, 15 Aug 2019 17:04:02 +0300