gRPC Go service discovery and load balancing

gRPC Go service discovery and load balancing

https://blog.cong.moe/post/2021-03-06-grpc-go-discovery-lb/

 

gRPC It is a high performance RPC framework open source by Google and supports multiple languages It has been widely used in inter service invocation in cluster In order to large-scale traffic and avoid single point of failure, services are often deployed with multiple instances, so load balancing is a hard requirement

Note: everything in this article is based on grpc/grpc-go , the implementation of different languages will be different, which will not be explained later

Basic introduction

Because of the long connection between gRPC client and server, connection based load balancing does not make much sense, so gRPC load balancing is based on each call That is, you want the request sent by the same client to be load balanced to all servers

Client load balancing

Generally speaking, load balancers are independent and placed between service consumers and providers Agents usually need to save a copy of the request response, so there is performance consumption and additional delay When the request volume is large, lb may become a bottleneck, and at this time, lb single point of failure will affect the whole service

gRPC adopts client-side load balancing. The general principle is as follows:

  1. When the server starts, the registered address is to the registration center
  2. The client queries the target service address list from the registry, selects the target service through a certain load balancing strategy, and initiates a request

In this way, the client directly requests the server, so there is no additional performance overhead In this mode, the client will establish connections with multiple servers. Behind the gRPC client connection, a group of subConnections are maintained, and each subConnection will establish a connection with a server Refer to the documentation for details Load Balancing in gRPC.

How to use

According to the above analysis, we find that the focus of load balancing is actually service discovery, because service discovery provides the mapping of "server - > addrs", and the subsequent lb is only to select different connections in the existing addrs list according to different policies to send requests

In gRPC go client, the module responsible for parsing "server - > addrs" is google.golang.org/grpc/resolver Module

When the client establishes a connection, it will select the corresponding resolver registered globally in the resolver module according to the URI scheme. The selected resolver is responsible for resolving the corresponding URI addrs according to the uri Endpoint Therefore, we implement our own service discovery module by extending the global registration custom scheme resolver Reference details gRPC Name Resolution Documentation

The core of extending resolver is implementation resolver.Builder This interface

// m is a map from scheme to resolver builder.
var	m = make(map[string]Builder)

type Target struct {
	Scheme    string
	Authority string
	Endpoint  string
}

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

// State contains the current Resolver state relevant to the ClientConn.
type State struct {
	// Addresses is the latest set of resolved addresses for the target.
	Addresses []Address

	// ServiceConfig contains the result from parsing the latest service
	// config.  If it is nil, it indicates no service config is present or the
	// resolver does not provide service configs.
	ServiceConfig *serviceconfig.ParseResult

	// Attributes contains arbitrary data about the resolver intended for
	// consumption by the load balancing policy.
	Attributes *attributes.Attributes
}

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	// ResolveNow will be called by gRPC to try to resolve the target name
	// again. It's just a hint, resolver can ignore this if it's not necessary.
	//
	// It could be called multiple times concurrently.
	ResolveNow(ResolveNowOptions)
	// Close closes the resolver.
	Close()
}

Copy

When establishing a connection with gRPC client, the address resolution part roughly includes the following steps:

  1. Find the corresponding resolver (Builder) in the global resolver map (m in the above code) according to the "Scheme" of the incoming address
  2. Resolve the address to 'Target' and call 'Resolver' as a parameter Instantiate build Resolver with build method
  3. Using user to invoke cc. in Resolver Updatestate the state passed in The address in addrs # establishes a connection

For example: register a test resolver, and the m value will change to {test: testResolver}. When the connection address is test:///xxx It will be matched to {testResolver, and the address will be resolved to & target {scheme: "test", authority: "", endpoint: "XXX"}, and then call} testResolver Parameters of build method

Sort it out:

  1. Each Scheme corresponds to a Builder
  2. Each different target of the same Scheme corresponds to a Resolver through builder Build instantiation

Static resolver example

Implement an example of writing a dead routing table:

// Define Scheme name
const exampleScheme = "example"

type exampleResolverBuilder struct {
	addrsStore map[string][]string
}

func NewExampleResolverBuilder(addrsStore map[string][]string) *exampleResolverBuilder {
	return &exampleResolverBuilder{addrsStore: addrsStore}
}

func (e *exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  // Initialize resolver and pass addrsStore in
	r := &exampleResolver{
		target:     target,
		cc:         cc,
		addrsStore: e.addrsStore,
	}
  // Call start to initialize the address
	r.start()
	return r, nil
}
func (e *exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
}

func (r *exampleResolver) start() {
  // Query the addrs corresponding to this Endpoint in the static routing table
	addrStrs := r.addrsStore[r.target.Endpoint]
	addrs := make([]resolver.Address, len(addrStrs))
	for i, s := range addrStrs {
		addrs[i] = resolver.Address{Addr: s}
	}
  // Convert the addrs list to state and call CC Updatestate update address
	r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                  {}

Copy

It can be used as follows:

// Register our resolver
resolver.Register(NewExampleResolverBuilder(map[string][]string{
  "test": []string{"localhost:8080", "localhost:8081"},
}))

// Establish a connection to the corresponding scheme and configure load balancing
conn, err := grpc.Dial("example:///test", grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))

Copy

The principle is very simple. exampleResolver just updates the "addrs" found in the routing table to the "connection" of the bottom layer

resolver based on etcd

The main principles of etcd as service discovery are as follows:

  1. When the server starts, save a key to etcd as {{servername}} / {addr}}, and set a short {Lease
  2. The server , KeepAlive , regularly renews the contract key
  3. When the client starts, all key s with {serverName}} prefix are used to get the current service list
  4. The key with {serverName}} / as the client's {watch prefix can get the service list change event

Then realize:

1. Server registration

func Register(ctx context.Context, client *clientv3.Client, service, self string) error {
	resp, err := client.Grant(ctx, 2)
	if err != nil {
		return errors.Wrap(err, "etcd grant")
	}
	_, err = client.Put(ctx, strings.Join([]string{service, self}, "/"), self, clientv3.WithLease(resp.ID))
	if err != nil {
		return errors.Wrap(err, "etcd put")
	}
	// respCh needs to be consumed, otherwise there will be warning
	respCh, err := client.KeepAlive(ctx, resp.ID)
	if err != nil {
		return errors.Wrap(err, "etcd keep alive")
	}

	for {
		select {
		case <-ctx.Done():
			return nil
		case <-respCh:

		}
	}
}

Copy

The code is very simple without too much explanation

2. Client

const (
  // scheme type that etcd resolver is responsible for
	Scheme      = "etcd"
	defaultFreq = time.Minute * 30
)

type Builder struct {
	client *clientv3.Client
  // Global routing table snapshot, unnecessary
	store  map[string]map[string]struct{}
}

func NewBuilder(client *clientv3.Client) *Builder {
	return &Builder{
		client: client,
		store:  make(map[string]map[string]struct{}),
	}
}

func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	b.store[target.Endpoint] = make(map[string]struct{})

  // Initialize etcd resolver
	r := &etcdResolver{
		client: b.client,
		target: target,
		cc:     cc,
		store:  b.store[target.Endpoint],
		stopCh: make(chan struct{}, 1),
		rn:     make(chan struct{}, 1),
		t:      time.NewTicker(defaultFreq),
	}

  // Enable background update goroutine
	go r.start(context.Background())
  // Full update service address
	r.ResolveNow(resolver.ResolveNowOptions{})

	return r, nil
}

func (b *Builder) Scheme() string {
	return Scheme
}

type etcdResolver struct {
	client *clientv3.Client
	target resolver.Target
	cc     resolver.ClientConn
	store  map[string]struct{}
	stopCh chan struct{}
	// rn channel is used by ResolveNow() to force an immediate resolution of the target.
	rn chan struct{}
	t  *time.Ticker
}

func (r *etcdResolver) start(ctx context.Context) {
	target := r.target.Endpoint

	w := clientv3.NewWatcher(r.client)
	rch := w.Watch(ctx, target+"/", clientv3.WithPrefix())
	for {
		select {
		case <-r.rn:
			r.resolveNow()
		case <-r.t.C:
			r.ResolveNow(resolver.ResolveNowOptions{})
		case <-r.stopCh:
			w.Close()
			return
		case wresp := <-rch:
			for _, ev := range wresp.Events {
				switch ev.Type {
				case mvccpb.PUT:
					r.store[string(ev.Kv.Value)] = struct{}{}
				case mvccpb.DELETE:
					delete(r.store, strings.Replace(string(ev.Kv.Key), target+"/", "", 1))
				}
			}
			r.updateTargetState()
		}
	}
}

func (r *etcdResolver) resolveNow() {
	target := r.target.Endpoint
	resp, err := r.client.Get(context.Background(), target+"/", clientv3.WithPrefix())
	if err != nil {
		r.cc.ReportError(errors.Wrap(err, "get init endpoints"))
		return
	}

	for _, kv := range resp.Kvs {
		r.store[string(kv.Value)] = struct{}{}
	}

	r.updateTargetState()
}

func (r *etcdResolver) updateTargetState() {
	addrs := make([]resolver.Address, len(r.store))
	i := 0
	for k := range r.store {
		addrs[i] = resolver.Address{Addr: k}
		i++
	}
	r.cc.UpdateState(resolver.State{Addresses: addrs})
}

// It will be called concurrently, so multiple full refreshes at the same time are prevented here
func (r *etcdResolver) ResolveNow(o resolver.ResolveNowOptions) {
	select {
	case r.rn <- struct{}{}:
	default:

	}
}

func (r *etcdResolver) Close() {
	r.t.Stop()
	close(r.stopCh)
}

Copy

The core of the above code is the func (r *etcdResolver) start(ctx context.Context), which does the following three things:

  1. The corresponding key prefix of watch , etcd updates the local cache and the , addrs of the underlying connection when a change event occurs
  2. r.rn channel , performs a full refresh when receiving the message, and the r.rn , message is generated when , ResolveNow , is called
  3. A 30 minute full refresh scheme is set globally. When the cycle arrives, a full refresh is performed

The use method is similar to static routing. You can view the complete code and examples zcong1993/grpc-example.

load balancing

After talking about so many resolver s, it's time to talk about load balancing. As we said, the difficulty of LB lies in service discovery. After service discovery is implemented, use the built-in lb, which only needs a simple parameter: grpc WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`).

Previously, you could use grpc With balancername ("round_robin"), but this method is abandoned Personally, I think the latter is clearer. There is also one on GitHub grpc-go/issues/3003 If you are interested in discussing this issue, you can view it

Write at the end

By studying gRPC load balancing, we can see the advantages and disadvantages of different types of load balancers. Although the client load balancing adopted by gRPC solves the performance problem, it also adds a lot of complexity to the client code. Although we users don't feel it, and the beginning of the article also explains that gRPC supports multiple languages, This means that each language client has to be implemented However, the current situation is that there are great differences in the implementation cycle of some new features for clients in different languages. For example, the support of new features for clients in c + +, golang and java will be the best, but the support of languages such as NodeJS is not so good, which is also a problem faced by gRPC for a long time For example, up to now, the NodeJS client library is still in the form of callback, and still does not support "server interceptor"

Keywords: Microservices

Added by barrygar on Thu, 17 Feb 2022 20:55:07 +0200