Golang collaboration and channel usage

brief introduction

Xie Cheng is a major feature and selling point of golang A coroutine is a lightweight execution thread. You can quickly start a coroutine by using the go keyword to a function or lamba expression The return value of the covariance function is discarded Thread scheduling is managed by the operating system, which is preemptive scheduling. The coordination process is different. The coordination process needs to cooperate with each other and take the initiative to hand over the executive power.

to configure

GOMAXPROCS sets the number of logical CPUs. Generally, the number of CPU cores is used Use enough threads to improve the parallel execution efficiency of golang If your business is IO intensive, you can set the value of several times and the number of CPU cores to get better performance If the Go program is executed in the container, you need to reduce the value according to the situation, because all the cores of the host cannot be used in the container Setting a smaller value can avoid the overhead of thread switching

Use channel for collaborative communication

Define channel

ch1 := make(chan string) //Defines an unbuffered string channel
ch2 := make(chan string , 4) //Defines a four element string channel

Channel operator

ch1 <- "Chengdu" //Write data to channel
itemOfCh1 := <- ch1 //Read a piece of data from ch1 channel
<- ch1 //Read the next value of the channel
var in_only chan<- int //Only receive channels
var out_only <-chan int //Read only channels
close(ch) //Close channel

Channel blocking

By default, the channel is synchronous and unbuffered, and the sender is blocked before the receiver is ready If there is no data in the channel, the receiver is also blocked

package main

import (
    "fmt"
    "time"
)

func f1(in chan int) {
    data := <-in
    fmt.Println(data)
}

func main() {
    out := make(chan int)
    out <- 2
    fmt.Println(v)
    go f1(out)
    time.Sleep(100 * time.Millisecond)
}

The above program will panic out, because there is no receiver for the data written by out, so the main coroutine is blocked The following code will never be executed, so the channel will never have data, resulting in deadlock Modify out: = make (Chan, int, 1) so that the channel has a buffer and will not deadlock Or start the read process before writing Or reading in another coroutine can solve this problem

Use semaphore

Semaphores can be used to make the main process wait for the completion of the sub process to exit execution

package main

import (
    "fmt"
    "time"
)

func f1(in chan int, done chan int) {
    data := <-in
    fmt.Println(data)
    time.Sleep(10e9)
    done <- 1
}

func main() {
    out := make(chan int)
    done := make(chan int)
    go f1(out, done)
    out <- 2
    <-done
}

The program will exit 10 seconds after output 2, so we don't need to use sleep to let the main process execute

Close channel

Explicitly close the channel. Closing the channel means that the sender will not send new data to the receiver Only the sender needs to close the channel

ch := make(chan int )
defer close(ch)

data,ok := <-ch //If the data is received, ok is true. Use ok to detect whether the channel is closed or blocked

In the following case, the read channel will not report a deadlock error in the main process, because after checking that the channel is closed, it will not read the channel and jump out of the loop, so it will not continue to read the channel that has not been written So there is no deadlock

package main

import "fmt"

func makeStream(n int) chan bool {
    ch := make(chan bool, n)
    go func() {
        for i := 0; i < n; i++ {
            ch <- true
        }
        close(ch)
    }()
    return ch
}

func main() {
    stream := makeStream(5)

    for {
        v, ok := <-stream
        if !ok {
            break
        }
        fmt.Println(v)
    }
}

Use select to switch coroutines

The select keyword can be used for rotation training to obtain values from different concurrent processes Usually used with a for loop

  • If both are blocked, wait for one of them to be processed
  • If more than one can be processed, select one at random If there is circulation in the outer layer, deal with the rest next time
  • If there is no channel to handle and there is default, execute default Otherwise, it will be blocked all the time
  • If there is no case, the select will always be blocked
  • You can use break to jump out of select
package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        for i := 0; i < 10; i++ {
            ch1 <- fmt.Sprintf("A%d", i)
        }
    }()

    go func() {
        for i := 0; i < 10; i++ {
            ch2 <- fmt.Sprintf("B%d", i)
        }

    }()

    go func() {
        for {
            select {
            case v := <-ch1:
                fmt.Println(v)
            case v := <-ch2:
                fmt.Println(v)
            }
        }
    }()

    time.Sleep(1e9)
}

This mode can be used as the server to process customer requests in a circular manner

Timer (Ticker)

type Ticker struct {
    C <-chan Time // The channel on which the ticks are delivered.
    r runtimeTimer
}

The C variable of the timer will write the time to the channel within a given time according to the timer time you create

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.NewTicker(time.Second)

    go func() {
        for {
            v := <-t.C
            fmt.Println(v)
        }
    }()

    time.Sleep(10e9) // <-time. After (10e9) uses the channel to set the timeout
}

Use time Tick (duration) can directly obtain channels, similar to time NewTicker(1e9). C

time.After(duration) is sent only once You can use this channel to handle timeouts

Recovery of coordination process

When a cooperative process encounters panic, it will exit safely without affecting other cooperative processes

package main

import (
    "log"
    "time"
)

func doWork() {
    time.Sleep(4e9)
    panic("fk")
}

func main() {

    go func() {
        for {
            log.Printf("another worker")
            time.Sleep(1e9)
        }
    }()

    go func() {

        defer func() {
            if err := recover(); err != nil {
                log.Printf("Something's wrong %s", err)
            }
        }()

        doWork()
    }()

    time.Sleep(10e9)
}

Lock or channel

In one scenario, there are multiple tasks, and one worker handles one task This scenario is suitable for using channels and coroutines to solve problems

package main

type Task struct{}
type Result struct{}

func process(Task *Task) Result {
    return Result{}
}

func main() {

    tasks, results := make(chan Task), make(chan Result)

    workCount := 10

    //Create task
    go func() {
        for i := 0; i < workCount; i++ {
            tasks <- Task{}
        }
    }()

    //Start worker
    for i := 0; i < workCount; i++ {
        go func() {
            for {
                t := <-tasks
                result := process(&t) //Processing data
                results <- result     //Write structure
            }
        }()
    }

    //Consumption results

}
  • Scenario of using lock:

    • Accessing cached information in shared data structures
    • Save application context and state information data
  • Scenario of using channel:

    • Interact with the results of asynchronous operations
    • Distribute tasks
    • Transfer data ownership

Keywords: Go

Added by Stalingrad on Thu, 17 Feb 2022 04:23:26 +0200