How does go language limit the number of concurrent processes

preface

When using concurrency to process some tasks, the number of concurrency can not be increased indefinitely due to the limitation of various factors For example, network requests, database queries, and so on. From the perspective of operation efficiency, the concurrency should be as high as possible on the premise that the related services can load (limit the maximum concurrency). This paper explores the solution and implementation of this problem. There are two ideas: one is to use buffered channel, and the other is to use lock.

1, Limit concurrency using buffered channels

1.1 scheme details

First, the code is as follows. The logic is very simple

package golimit

type GoLimit struct {
    ch chan int
}

func NewGoLimit(max int) *GoLimit {
    return &GoLimit{ch: make(chan int, max)}
}

func (g *GoLimit) Add() {
    g.ch <- 1
}

func (g *GoLimit) Done() {
    <-g.ch
}

Create a buffered channel according to the maximum concurrent number allowed. Call Add() before you create the association, write a data to the channel, and the completion of the association is to call a Done() method to read a data. If data cannot be written to the channel, it indicates that the channel is full, that is, the current number of concurrent processes is the maximum allowed The Add() method will be blocked, and a new procedure cannot be created Until a coroutine runs, the Done() method is called to read a data from the channel

The following is an example

package main

import (
    "golimit"
    "log"
    "time"
)

func main() {
    log.Println("Start test...")
    g := golimit.NewGoLimit(2) //max_ Num (maximum allowed concurrency) is set to 2

    for i := 0; i < 10; i++ {
        //Try to add a coroutine. If the maximum number of concurrent processes has been reached, it will be blocked
        g.Add()

        go func(g *golimit.GoLimit, i int) {
            defer g.Done() //A concurrent process has been completed

            time.Sleep(time.Second * 2)
            log.Println(i, "done")
        }(g, i)
    }


    log.Println("End of cycle")

    time.Sleep(time.Second * 3)//Wait for execution to complete
    log.Println("End of test")

}

1.2 evaluation summary

  • Advantages: the implementation logic of this scheme is simple and clear, easy to understand and maintain. If the requirements can be met, this scheme is preferred in general scenarios.
  • Hidden worry: use the buffer size of the channel to represent the maximum number of concurrency. If the number of concurrency allowed is large, such as tens of thousands or even more, I'm not sure whether there will be problems with the performance and memory load of the channel. If any friend knows, please let me know.
  • Deficiency: it is difficult to adjust the maximum number of concurrency during operation. In some scenarios, this requirement exists. For example, service A depends on service B for capacity expansion or reduction, but service A cannot be stopped. It is necessary to adjust the maximum number of concurrent requests for service B interfaces.

2, Using locks to limit the number of concurrent processes

2.1 scheme details

Also, start with the code (Note: this code has been open source on github https://github.com/zh-five/golimit)

// Concurrency limit Library

package golimit

import (
    "sync"
)

type GoLimit struct {
    max       uint             //Maximum concurrent quantity
    count     uint             //Current concurrent number
    isAddLock bool             //Is the increase locked
    zeroChan  chan interface{} //Broadcast when 0
    addLock   sync.Mutex       //Lock (to increase concurrency)
    dataLock  sync.Mutex       //Lock (for modifying data)
}

func NewGoLimit(max uint) *GoLimit {
    return &GoLimit{max: max, count: 0, isAddLock: false, zeroChan: nil}
}

//Concurrent count plus 1 If count > = max_ Num, then block until the count < max_ num
func (g *GoLimit) Add() {
    g.addLock.Lock()
    g.dataLock.Lock()

    g.count += 1

    if g.count < g.max { //Unlock when concurrency is not exceeded, and you can continue to increase it later
        g.addLock.Unlock()
    } else { //The maximum concurrent number has been reached. Do not unlock and mark Unlock after the quantity decreases
        g.isAddLock = true
    }

    g.dataLock.Unlock()
}

//Concurrent count minus 1
//If count < Max_ Num, which can make the original blocked Add() quickly unblock
func (g *GoLimit) Done() {
    g.dataLock.Lock()

    g.count -= 1

    //Unlock
    if g.isAddLock == true && g.count < g.max {
        g.isAddLock = false
        g.addLock.Unlock()
    }

    //0 broadcast
    if g.count == 0 && g.zeroChan != nil {
        close(g.zeroChan)
        g.zeroChan = nil
    }

    g.dataLock.Unlock()
}

//The maximum update concurrency count is. If it is increased, the original blocked Add() can be unblocked quickly
func (g *GoLimit) SetMax(n uint) {
    g.dataLock.Lock()

    g.max = n

    //Unlock
    if g.isAddLock == true && g.count < g.max {
        g.isAddLock = false
        g.addLock.Unlock()
    }

    //Lock
    if g.isAddLock == false && g.count >= g.max {
        g.isAddLock = true
        g.addLock.Lock()
    }

    g.dataLock.Unlock()
}

//If the current concurrency count is 0, it will be returned quickly; Otherwise, block and wait until the concurrency count is 0
func (g *GoLimit) WaitZero() {
    g.dataLock.Lock()

    //No waiting
    if g.count == 0 {
        g.dataLock.Unlock()
        return
    }

    //No broadcast channel, create one
    if g.zeroChan == nil {
        g.zeroChan = make(chan interface{})
    }

    //Unlock after copying the channel to avoid reading data from nil
    c := g.zeroChan
    g.dataLock.Unlock()

    <-c
}

//Get concurrency count
func (g *GoLimit) Count() uint {
    return g.count
}

//Get maximum concurrency count
func (g *GoLimit) Max() uint {
    return g.max
}

A total of two locks are used. One is data lock, which is used to lock the data and ensure the safety of data modification. Locking and unlocking are carried out before and after data modification; The other is to increase the addLock of whether to add a collaboration. When adding a collaboration, you must first add a lock, and then modify the concurrency number. If the concurrency number is less than the maximum concurrency number, it will be unlocked, otherwise it will not be unlocked, which will cause the subsequent locking operation of the increased collaboration to be blocked, so as to limit the concurrency number of the collaboration. Use examples are as follows:

package main

import (
    "github.com/zh-five/golimit"
    "log"
    "time"
)

func main() {
    log.Println("Start test...")
    g := golimit.NewGoLimit(2) //max_ Num (maximum allowed concurrency) is set to 2

    for i := 0; i < 10; i++ {
        //Concurrent count plus 1 If count > = max_ Num, then block until the count < max_ num
        g.Add()

        //The maximum concurrent data can be modified at any time during operation
        //g.SetMax(3)

        go func(g *golimit.GoLimit, i int) {
            defer g.Done() //Concurrent count minus 1

            time.Sleep(time.Second * 2)
            log.Println(i, "done")
        }(g, i)
    }


    log.Println("End of cycle")

    g.WaitZero() //Block until all concurrency is complete
    log.Println("End of test")

}

In addition to adding the SetMax() method to the GoLimit in scheme 2, it is used to modify the maximum number of concurrent operations. For fun and laziness, a waitzero () method is added (in fact, this function can also be quickly implemented by using sync.WaitGroup externally) to block and wait for all concurrent processes to complete. It can be used in the following scenarios: a large number of URLs need limited concurrent data collection. In the main program, you can block the completion of all collection processes by simply calling WaitZero() method.

2.2 evaluation summary

  • Advantages: from the implementation logic, it can be determined that the performance and consumption will not increase linearly with the increase of the maximum number of concurrency. In addition, there are many scalable imagination.
  • Disadvantages: the implementation logic is complex

other

In fact, I'd like to compare and test the performance of the two schemes, especially when the maximum concurrency is large. But I haven't found a good test method. If any friend has a method or idea, please communicate.

Keywords: Go

Added by gooney0 on Tue, 25 Jan 2022 11:57:03 +0200