Go, you said you wouldn't be concurrent?

Author: lomtom

Personal website: lomtom.cn

Official account No. Bosio Park

Your support is my biggest motivation.

Go series:

  1. Go (I) basic introduction
  2. Go (II) structure
  3. Go (III) go configuration file
  4. Go (IV) Redis operation
  5. Go (V) go doesn't know how to use Gorm?
  6. Go (VI) to teach you how to call remotely
  7. Go (VII) you said you wouldn't be concurrent?

Do not communicate through shared memory, but share memory through communication.

goroutine

Go coroutine has a simple model: it is a function running in the same address space concurrently with other go coroutines. It is lightweight, and almost all consumption is the allocation of stack space. And stacks are very small at first, so they are cheap and change with the allocation (and release) of heap space only when needed.

Go coroutines can be multiplexed on multi-threaded operating systems, so if one thread is blocked, such as waiting for I/O, other threads will run.

The design of Go collaboration hides many complexities of thread creation and management.

The go keyword can be added to the function or method before it can be invoked in the new Go Association. When the call is completed, the go coroutine will also exit quietly. (the effect is a bit like the & symbol in Unix Shell, which allows commands to run in the background.)

go myFunc()  // Running myFunc at the same time does not require waiting

Anonymous functions are very convenient to invoke in association.

func TestGo(t *testing.T) {
	s := "how are you"
	go func() {
		fmt.Println(s)
	}()
	ss := "I'm not good at Taoism"
	fmt.Println(ss)
}

Result output:

I'm not good at Taoism
 how are you

In Go, anonymous functions are closures: in fact, it is now ensured that the life cycle of the referenced variables in the function is the same as the activity time of the function.

Therefore, it is worth noting that if the main function is executed and the method behind go is not executed, the program will also stop.
For example, in the method behind go, let the function sleep for 1 second, so that the main program will exit after running without outputting s

func TestGo(t *testing.T) {
	s := "how are you"
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println(s)
	}()
	ss := "I'm not good at Taoism"
	fmt.Println(ss)
}

Output:

I'm not good at Taoism

So how to avoid this situation? In the follow-up meeting

These functions are not practical because they do not implement signal processing at completion. Therefore, we need channels.

channel

Why channel?
Simply executing functions concurrently is meaningless. Functions need to exchange data between functions to reflect the significance of concurrent execution of functions.

channel in Go language is a special type.

The channel is like a conveyor belt or queue. It always follows the First In First Out rule to ensure the order of sending and receiving data. Each channel is a conduit of a specific type, that is, the element type needs to be specified when declaring the channel.

Like mapping, pipes need to allocate memory through make.

The resulting value acts as a reference to the underlying data structure. If an optional integer parameter is provided, it sets buffer size for channel.

The default value is zero for unbuffered or synchronized channels.

  1. Create (keyword chan)
c := make(chan int)            // Integer unbuffered channel
c := make(chan int, 0)         // Integer unbuffered channel
c := make(chan *os.File, 100)  // Buffer channel for pointer to file
  1. insert
c <- a
  1. read
num := <- c
  1. Declare read / write pipeline
var c int
  1. Declare write only pipeline
var c chan<- int
  1. Declare read-only pipeline
var c <-chan int

example:

func TestGo(t *testing.T) {
	// Create an unbuffered channel of type integer
	c := make(chan int)
	// Execute custom methods; At the end of the method, a signal will be sent on the channel
	go func() {
		// doSomething
		for i := 0; i < 5; i++ {
			// Send a signal
			c <- i
		}
		// Close the pipe
		close(c)
	}()
	// doSomething
	// Wait for the execution of the custom method to complete, and then get the value from the channel
	for i := range c{
		fmt.Println(i)
	}
}

The receiver will block until it receives the data.

  1. If the channel is unbuffered, the sender will block until the receiver receives the value;
  2. If the channel is buffered, the sender blocks only before the value is copied to the buffer;
  3. If buffer is full, sender waits until a receiver fetches a value.

sync

sync.WaitGroup

In Go, you can use chan to realize communication. Similarly, Go also provides WaitGroup to realize synchronization.

The above example well illustrates the synchronization problem without using WaitGroup

func TestGo(t *testing.T) {
	s := "Hello, world"
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println(s)
	}()
	ss := "I'm not good at Taoism"
	fmt.Println(ss)
}

Here is another example. I output it ten times in the main function and ten times in myFunc. In theory, it will be output.

In order to simulate when myFunc is not finished but the main program is finished, add time.Sleep(time.Second*1) to myFunc

func TestGo1(t *testing.T) {
	go myFunc()
	for i := 0; i < 10; i++ {
		fmt.Println("main()Test, this is the second" + strconv.Itoa(i) + "second")
	}
	wg.Wait()
}

func myFunc() {
	for i := 0; i < 10; i++ {
		fmt.Println("test()Test, this is the second" + strconv.Itoa(i) + "second")
		time.Sleep(time.Second*1)
	}
}

The output is as follows:

main()Test, this is the 0 th time
main()Test, this is the first time
main()Test, this is the second time
main()Test, this is the third time
main()Test, this is the fourth time
main()Test, this is the fifth time
main()Test, this is the sixth time
main()Test, this is the seventh time
main()Test, this is the eighth time
main()Test, this is the ninth time
test()Test, this is the 0 th time

This is obviously not the result we want. The simplest thing is to add time.Sleep(time.Second*1) to the loop of the main function

func TestGo1(t *testing.T) {
	go myFunc()
	for i := 0; i < 10; i++ {
		fmt.Println("main()Test, this is the second" + strconv.Itoa(i) + "second")
		time.Sleep(time.Second*1)
	}
	wg.Wait()
}

func myFunc() {
	for i := 0; i < 10; i++ {
		fmt.Println("test()Test, this is the second" + strconv.Itoa(i) + "second")
		time.Sleep(time.Second*1)
	}
}

The output is as follows:

main()Test, this is the 0 th time
test()Test, this is the 0 th time
main()Test, this is the first time
test()Test, this is the first time
main()Test, this is the second time
test()Test, this is the second time
test()Test, this is the third time
main()Test, this is the third time
main()Test, this is the fourth time
test()Test, this is the fourth time
test()Test, this is the fifth time
main()Test, this is the fifth time
main()Test, this is the sixth time
test()Test, this is the sixth time
test()Test, this is the seventh time
main()Test, this is the seventh time
main()Test, this is the eighth time
test()Test, this is the eighth time
test()Test, this is the ninth time
main()Test, this is the ninth time

Although this has achieved our expected effect, under formal circumstances, we do not know the execution speed and time of the code, so this second is feasible in theory, but it is very broken in practice.

Then you can use WaitGroup to control.

  1. As long as a coroutine is started, Add(1) indicates that a coroutine is started
  2. Do() is required after the execution of the coprocessor, indicating that it is deleted from the coprocessor waiting group
  3. Only when all the coroutines are Done() will the subsequent code of Wait() continue to be executed.
  4. The number of coroutines of Add must correspond to the number of coroutines of Done, otherwise the deadlock will report an error
func TestGo1(t *testing.T) {
	var wg sync.WaitGroup
	go myFunc(&wg)
	for i := 0; i < 10; i++ {
		fmt.Println("main()Test, this is the second" + strconv.Itoa(i) + "second")
		//time.Sleep(time.Second*1)
	}
	wg.Wait()
}

func myFunc(wg *sync.WaitGroup) {
	wg.Add(1)
	for i := 0; i < 10; i++ {
		fmt.Println("test()Test, this is the second" + strconv.Itoa(i) + "second")
		time.Sleep(time.Second*1)
	}
	wg.Done()
}

Output:

main()Test, this is the 0 th time
main()Test, this is the first time
main()Test, this is the second time
main()Test, this is the third time
main()Test, this is the fourth time
main()Test, this is the fifth time
test()Test, this is the 0 th time
main()Test, this is the sixth time
main()Test, this is the seventh time
main()Test, this is the eighth time
main()Test, this is the ninth time
test()Test, this is the first time
test()Test, this is the second time
test()Test, this is the third time
test()Test, this is the fourth time
test()Test, this is the fifth time
test()Test, this is the sixth time
test()Test, this is the seventh time
test()Test, this is the eighth time
test()Test, this is the ninth time

sync.Once

In many programming scenarios, we need to ensure that some operations are executed only once in high concurrency scenarios, such as loading the configuration file only once
Close the channel only once, etc.

The sync package in the Go language provides a solution for a scenario that is executed only once: sync.Once.

sync.Once has only one Do method for external operations:

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

In use, we only need to pass in the method to be executed.

var db *gorm.DB

var loadDbConf sync.Once

// GetDb get connection
func GetDb() *gorm.DB {
	loadDbConf.Do(DbInit)
	return db
}

// DbInit database connection pool initialization
func DbInit() {
	newLogger := logger.New(
		log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
		logger.Config{
			SlowThreshold:             time.Second, // Slow SQL threshold
			LogLevel:                  logger.Info, // Log level
			IgnoreRecordNotFoundError: true,        // Ignore ErrRecordNotFound error for logger
			Colorful:                  true,        // Disable color
		},
	)
	conn, err1 := gorm.Open(mysql.Open(mySQLUri()), &gorm.Config{
		Logger: newLogger,
	})
	if err1 != nil {
		log.Printf("mysql connect get failed.%v", err1)
		return
	}
	db = conn
	log.Printf("mysql init success")
}
type Once struct {
	done uint32
	m    Mutex
}

sync.Once actually contains a mutex and a Boolean value. The mutex ensures the security of Boolean values and data, and Boolean values are used to remember
Check whether the initialization is completed.

This design can ensure that the initialization operation is concurrent and safe, and the initialization operation will not be executed many times.

sync.Map

The built-in map in the Go language is not concurrency safe.

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func TestGo5(t *testing.T) {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
			wg.Done()
		}(i)
	}
	wg.Wait()
}

There may be no problem when the above code starts a small number of goroutine s. When there is more concurrency, the above code will report a fatal error: concurrent map writes error.

k=:0,v:=0
k=:2,v:=2
k=:4,v:=4
k=:5,v:=5
k=:6,v:=6
k=:7,v:=7
k=:1,v:=1
k=:8,v:=8
k=:3,v:=3
k=:11,v:=11
k=:10,v:=10
k=:12,v:=12
fatal error: concurrent map writes
k=:13,v:=13
k=:14,v:=14

goroutine 38 [running]:
runtime.throw(0xe6715c, 0x15)
	E:/program/go/src/runtime/panic.go:1117 +0x79 fp=0xc000337ec8 sp=0xc000337e98 pc=0x7ac6f9
runtime.mapassign_faststr(0xd9b800, 0xc00003c2a0, 0xe8046a, 0x2, 0x0)

In this scenario, it is necessary to lock the map to ensure concurrent security. The sync package of Go language provides a concurrent secure version of map - sync.Map out of the box.

Out of the box means that you can use it directly without using the make function initialization like the built-in map. Meanwhile, sync.Map has built-in operation methods such as Store, Load, LoadOrStore, Delete and Range.

var m1 = sync.Map{}

func TestGo6(t *testing.T) {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			k := strconv.Itoa(n)
			m1.Store(k,n)
			v, _ := m1.Load(k)
			fmt.Printf("k=:%v,v:=%v\n", k, v)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

Parallelization

Sum of 1 - n

Another application of these designs is to implement parallel computing on multi CPU cores.

If the calculation process can be divided into several independent processes, it can send signals to the channel at the end of each calculation, so as to realize parallel processing.

In the following example, I need to calculate the sum of 1 to n. generally speaking, it is easy to do it directly in a loop.

func TestGo(t *testing.T){
	n := 100000
	var s int
	for i := 0;i < n;i++{
		s += i
	}
	log.Println(s)
}

However, for a large amount of data, it is obviously not suitable to appear in our code, so parallel computing can be used.

func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	for i := 0;i < num;i++ {
		go func() {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
		}()
	}
	var s int
	for i := 0;i < num;i++ {
		s += <- c
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

Of course, you can also borrow WaitGroup

func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		go func() {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
			wg.Done()
		}()
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

We start a separate processing block in the loop, and each CPU will execute one processing.

They may be completed and ended in disorder, but it doesn't matter; We only need to receive after all Go processes start and count the completion signals in the channel.

In addition to setting the num constant value directly, we can also ask the runtime for a reasonable value.

function runtime.NumCPU You can return the number of cores on the hardware CPU and use it as follows:

var num = runtime.NumCPU()

Another function you need to know is runtime.GOMAXPROCS, which returns the number of available CPU s set by the user. By default, the value of runtime.NumCPU is used, but it can be changed by the command line environment variable, or this function can be called and passed as a positive integer. If the parameter 0 is passed, the value will be returned. If we respect the user's allocation of resources,

It should be written as follows:

var numCPU = runtime.GOMAXPROCS(0)

Be careful not to confuse the concepts of concurrency and parallelism:
Concurrency is a method of constructing programs with independently executable components, and parallelism is to calculate in parallel on multiple CPU s for efficiency.

Although the concurrency of Go can make some problems easier to construct parallel computing, Go is still a concurrent rather than parallel language, and Go's model is not suitable for all parallel problems.

problem

However, if you run through the previous code, you will find a problem that the later calculation is actually inaccurate.

The value of 1 - 100 should be 4950, and its output value will not be the same every time, let alone 4950

The problem is in the for loop of Go. The loop variable will be reused in each iteration, so the i variable will be shared among all Go coroutines, which is not what we want.

We need to ensure that i is unique for each Go collaboration.

There are several ways to achieve this:

  1. First: write one more layer call
    Extract the following anonymous method into a general method and pass in i as a parameter.
func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		go myFunc1(n,num,i,c,&wg)
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

func myFunc1(n,num,i int,c chan int,wg *sync.WaitGroup) {
	start := n / num * i
	end := n / num * (i + 1)
	var s int
	for j := start; j < end;j++ {
		s += j
	}
	c <- s
	wg.Done()
}
  1. The second type: closures of incoming parameters (i as the parameter)
func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		// Pass in with i as parameter
		go func(i int) {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
			wg.Done()
		}(i)
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}
  1. Third: Restatement
func TestGo3(t *testing.T){
	t1 := time.Now()
	n := 100
	num := 10
	c := make(chan int,num)
	var wg sync.WaitGroup
	for i := 0;i < num;i++ {
		wg.Add(1)
		// Restate
		i := i
		go func() {
			start := n / num * i
			end := n / num * (i + 1)
			var s int
			for j := start; j < end;j++ {
				s += j
			}
			c <- s
			wg.Done()
		}()
	}
	wg.Wait()
	close(c)
	var s int
	for item := range c{
		s += item
	}
	t2 := time.Since(t1)
	log.Println(t2)
	log.Println(s)
}

i: The expression of = I may seem strange, but it is legal and common in Go.

You get a new version of the variable with the same name, so as to deliberately shield the loop variable locally and make it unique to each Go coroutine.

Performance comparison

Traditional methods and parallel computing:

The cross-border situation (i.e. the correctness of calculation) is not considered here, because the results are still calculated every time.

n valuetraditionparallel
100000003.7146ms1.058ms
10000000039.9925ms7.1645ms
1000000000344.0599ms49.9023ms
100000000003.4797346s501.5713ms
10000000000034.5406926s4.650136s

The result is obvious.

Keywords: Java node.js Go Database

Added by skbanta on Sun, 28 Nov 2021 23:52:49 +0200