Go deep: use the context package to easily complete concurrency control

When a request arrives at the background, a large number of tasks need to be started concurrently to combine the final response. How to design and implement: after a request arrives, X seconds timeout; Return immediately when timeout or error is encountered, and cancel all concurrent tasks? In fact, with the Go context package, this problem can be solved very gracefully and naturally, and after understanding the Context, you will praise: "Wow, that's how it should be designed!"

The introduction of Package context is an important feature of Go 1.7. But it seems that in practice, context does not appear in our vision; In fact, kubernetes / client go Until 2020 To gradually integrate the package context into it.

There was a question in the company's technical forum before, "do you have any experience to share in Go background time-consuming optimization", Many of the answers mentioned "concurrency" (in fact, we also make full use of Go concurrency as much as possible in project practice). We also found that package context can greatly facilitate our management of concurrency process. Now, combined with GopherCon UK 2017 talk The Context of the Package context is summarized by the skills in and our experience in practice.

Too long to see the version

  • context.Context can be used to manage the process based on one request, including controlling timeout and completion, passing request parameters, etc.
  • WithValue(parent Context, key interface{}, val interface{}) Context is used to add context content related to the request, such as authentication information (note that it is not designed to "pass function parameters", see the following text for details); key should use non exported global variables of non exported type; the package should provide encapsulated type safe functions for value access.
  • Withcancel (parent context) (CTX context, cancel cancelfunction) provides the cancellation entry of context.
  • Withtimeout (parent context, timeout time. Duration) (context, cancelfunction) provides the timeout function of context. Similarly, withdeadline (parent context, deadline time. Time) (context, cancelfunction).
  • The obtained cancel function should be immediately defer cancel(), so that the Context can be recycled in time after the process is completed. See Cancelling in-progress operations.
  • The Context structure created by the above function automatically forms a tree relationship from the parent node to the child node. Once a parent node times out / is cancelled, all its descendant nodes automatically time out / are cancelled.
  • To check whether the listening is timed out / cancelled, you can use the Done() function of the structure to obtain the corresponding channel.
  • The root node (included in main or test) can be created using context. Background(); If you are not sure, you can use context TODO().
Start concurrent tasks and wait for each task to complete

The logic is relatively simple, just use errgroup; Let's take two concurrent tasks as examples:

import (
      "context"
  
        "golang.org/x/sync/errgroup"
)
// Handler concurrently calls F1 and F2 to get the string and return it. If one of them is wrong, it will directly return the obtained error
func Handler(ctx context.Context) ([]string, error) {
      eg, ctx := errgroup.WithContext(ctx)
      ret := make([]string, 2)
  
      eg.Go(func() error { // The new start collaboration process is completed to obtain s1
          s1, err := f1(ctx)
          if err != nil {
            return err
          }
        ret[0] = s1
      })
  
      eg.Go(func() error { // The new startup process is completed to obtain s2
          s2, err := f2(ctx)
          if err != nil {
            return err
          }
        ret[1] = s2
      })
      // Wait for two processes to complete. If one of them has an error, it will return immediately, and other processes will also receive CTX Signal from done()
      if err := eg.Wait(); err != nil {
            return nil, err
      }
      return ret
}
Start concurrent tasks and wait for the fastest results to return
func do(ctx context.Context, i int) (string, error) {
    r1, r2 := rand.Intn(10000), rand.Intn(10000)
    time.Sleep(time.Duration(r1) * time.Millisecond) // pseudo-preprocess
    select {
    case <-ctx.Done(): // If the preprocess ends and it is found that it has been cancelled / timed out, exit directly
        return "", context.Canceled
    default: // Otherwise, start execution
        println("now we begin:", i)
    }
    time.Sleep(time.Duration(r2) * time.Millisecond) // pseudo-execute
    return strconv.Itoa(i), nil
}

func Vroom(ctx context.Context, goroutineNum int) string {
    // To avoid goroutine leakage caused by thread blocking, buffered channel should be used
    ch := make(chan string, goroutineNum)
    ctx, cancel := context.WithCancel(ctx) // Create cancle function of context
    defer cancel() // return when the result is returned, the unexecuted task will be cancelled
    for i := 0; i < goroutineNum; i++ {
        go func(index int) {
            x, err := do(ctx, index)
            if err != nil {
                println(index, ":", err.Error())
                return
            }
            println("now we got:", x)
            ch <- x // Because it is a buffered channel, it will not block even if there is no receiver
        }(i)
    }
    return <-ch
}

func main() {
    x := Vroom(context.Background(), 3)
    println("in main:"x)
    time.Sleep(10 * time.Second)
}

One execution:

now we begin: 0
now we begin: 1
now we got: 1
in main: 1
now we got: 0
2 : context canceled

(too long to see the end of the version)

Why Context?

The ability of the service to respond to requests

When facing the request, our service should have:

  • Ability to rely on detection of module / service timeout;
  • In concurrent tasks, when the results of ongoing concurrent tasks are no longer needed (for example, a task returns an error or timeout), the ability to cancel other concurrent tasks;
  • Controls the timeout of an entire task.

For example, after receiving the query request, our service needs to authenticate, query the associated information in the database, query the status of the corresponding Pod, query the relevant events if the Pod fails, query ElasticSearch or k8s to correspond to the log of the Pod (whichever is the fastest). We hope that the processing of the request can be carried out as concurrently as possible, so the process corresponds to the call in the following figure:

flowchart LR
  begin(Query request received) --> auth[authentication ]
  begin --> db[query data base] --> k8s[query Pod state] --> event[query events]
  db --> log[Query log] -.-> k8s_log[query k8s Get log]
  log -.-> es[query ElasticSearch Get log]

Suppose that the request needs to be returned within 3 seconds, otherwise it times out; Naturally, we ask:

  • If the request times out, all ongoing processes are cancelled;
  • ES or k8s obtains the log, and the request of another query channel is cancelled;
  • If any phase fails, all other concurrent tasks in progress should be cancelled and respond directly to the corresponding error.

So, how should our code be designed?

Context capabilities

Use context Context, we can easily control the timeout and cancellation of concurrent tasks.

To meet the above conditions, you only need to call Context at the beginning of the query request Withtimeout (3 * time. Second), and use the returned Context for subsequent tasks; One error in concurrent tasks is to cancel all. See the use of errgroup above; Here we give the function to respond to the query request and the function to obtain the Pod status:

func getPodResult(ctx context.Context, podName string) (
  status, failureReason string, err error) {
    select {
    case err = <-ctx.Done(): // If cancelled, return directly
        return "", "", err
    default:
    }
    podStatus := getPodStatus(ctx, podName)
    if isSuccessful(podStatus) {
        return podStatus, "", nil
    }
    select {
    case err = <-ctx.Done(): // If cancelled, return directly
        return "", "", err
    default:
    }
    failureReason, err := getFailureReason(ctx, podName)
    return podStatus, failureReason, err
}

func Handle(ctx context.Context, req *GetInfoRequest) (*GetInfoResponse, error) {
    ctx, cancel := context.WithTimeout(ctx, 3*time.Second) // The request timed out for a total of 3 seconds
    defer cancel()
    eg, ctx := errgroup.WithContext(ctx) // errgroup also registers the cancel function for ctx
    eg.Go(
        func() error {
            ok, err := Auth(ctx, req)
            if err != nil {
                return err
            }
            if !ok {
                return NoPermissionError
            }
            return nil
        })
    var rsp *GetInfoResponse
    eg.Go(
        func() error {
            var err error
            rsp, err = query(ctx, req.TaskID) // query DB, pod and logs; use errgroup
            if err != nil {
                return err
            }
            return nil
        })
    if err := eg.Wait(); err != nil {
        return nil, err
    }
    return rsp, nil
}

The Context of context.Context

Context structure

The Context structure in the Package context is as long as this (translate the English notes into Chinese, see the original text) here ), Go's comments also explain in detail how Context should be used, especially how its Value should be used:

type Context interface {
    // Deadline returns the timeout cancellation time corresponding to the context. If the deadline is not set, the returned ok==false.
    // Successive calls to the Deadline function will return the same result.
    Deadline() (deadline time.Time, ok bool)

    // Done returns the channel closed when the task is completed by calling the corresponding cancel function. If the context cannot be cancelled, done returns nil Successive calls to the done function will return the same result.
    // The Done channel may be closed asynchronously after cancel is called.
    //
    // WithCancel enables the Done channel to be closed when cancel is called;
    // WithDeadline enables the Done channel to be closed when the deadline exceeds;
  // WithTimeout enables the Done channel to be closed when timeout occurs.
    //
    // Done can be used for select statements:
    //
    //  //Stream generates output by calling DoSomething and sends it to out until
    //  //DoSomething returned an error or CTX Done is closed.
    //  func Stream(ctx context.Context, out chan<- Value) error {
    //      for {
    //          v, err := DoSomething(ctx)
    //          if err != nil {
    //              return err
    //          }
    //          select {
    //          case <-ctx.Done():
    //              return ctx.Err()
    //          case out <- v:
    //          }
    //      }
    //  }
    //
    // For more examples of task cancellation using the Done channel, see:
    // https://blog.golang.org/pipelines
    Done() <-chan struct{}

    // If Done has not been closed, Err will return nil.
    // If Done has not been closed yet, Err will return a non nil error indicating the reason for closing:
    // Cancelled when the context is cancel led, or
    // DeadlineExceeded when the deadline is exceeded.
    // When Err returns non nil error, successive calls to Err function will return the same result.
    Err() error

    // Value returns the value of the corresponding key in this context. If there is no value corresponding to the key, nil is returned.
    // Calling the Value function continuously with the same key will return the same result.
    //
    // Use context only for data based on the request and within the scope of different processes or API s,
    // Do not pass this as an optional argument to a function.
    //
    // A key is used to determine a specific value in the Context.
  // Functions that want to store values in Context usually use global variables as Context Withvalue and Context Parameter for value.
  // Key can be any type that supports equality comparison; Package should define key as non export type to avoid collision.
    //
    // The package defining the Context key needs to provide type safe access entry for the corresponding value:
    //
    //     //Package user defines the User type stored in the Context.
    //     package user
    //
    //     import "context"
    //
    //     //User is a type stored in Context.
    //     type User struct {...}
    //
    //     //key is a non exported type in this package.
    //     //It avoids conflicts with key types defined in other packages.
    //     type key int
    //
    //     //userKey is the user stored in the Context The key corresponding to the user value.
    //     //It is not exported; User shall use user Newcontext and user Fromcontext function instead of
    //     //Use this key directly.
    //     var userKey key
    //
    //     //NewContext returns the saved Context of u.
    //     func NewContext(ctx context.Context, u *User) context.Context {
    //         return context.WithValue(ctx, userKey, u)
    //     }
    //
    //     //FromContext returns the User type value saved in ctx.
    //     func FromContext(ctx context.Context) (*User, bool) {
    //         u, ok := ctx.Value(userKey).(*User)
    //         return u, ok
    //     }
    Value(key interface{}) interface{}
}

Context. Precautions for value

  • Value is only used to store data related to the Context life cycle, not to pass optional parameters;
    • Therefore, the Value in Value should not be changed after setting;
  • The key of value should use the non exported global variable defined by the non exported type in the package, and the access of value should use the type safe function provided by the package instead of directly manipulating the context Value;
  • When the Value in Value is used as an essential parameter, it should not be obtained through the context for high readability. For example, the following function should be explicitly used as a function parameter, even if uin and this request are strongly bound together in the context, such as func IsAdmin(ctx context.Context, uin string) (bool, error). Context.Value should inform, not control. –Jack Lindamood

Tree Contexts

The newly created Context is always the child node of the original Context and is affected by the Deadline, cancel and Value of the original Context. For example:

func testWithTimeout() {
    a1 := context.Background()
    b2, cancelB2 := context.WithTimeout(a1, time.Minute)
    defer cancelB2()
    c3, cancelC3 := context.WithTimeout(b2, time.Hour)
    defer cancelC3()
    d3, cancelD3 := context.WithTimeout(b2, time.Second)
    defer cancelD3()
    e2, cancelE2 := context.WithCancel(a1)
    defer cancelE2()
    fmt.Println(a1.Deadline())
    fmt.Println(b2.Deadline())
    fmt.Println(c3.Deadline())
    fmt.Println(d3.Deadline())
    cancelB2()
    if err := d3.Err(); err != nil {
        fmt.Println("d3 is canceled:", err)
    }
    if err := e2.Err(); err == nil {
        fmt.Println("e2 is not canceled yet")
    }
}
/*
0001-01-01 00:00:00 false
2021-11-29 20:29:44.839665 m=+60.001770296 true
2021-11-29 20:29:44.839665 m=+60.001770296 true // Affected by the 1-minute timeout of the parent node, the newly set 1-hour timeout is invalid
2021-11-29 20:28:45.839691 m=+1.001796617 true  // Timeout after 1 second of new setting
d3 is canceled: context canceled                // Because the parent node is cancel led
e2 is not canceled yet                          // Not affected by other nodes that are not ancestors
*/

Other precautions and summary

Go see "too long to see the version". For the interpretation of the context source code, see the next article.

Added by abkdesign on Tue, 14 Dec 2021 07:41:11 +0200