gRPC Go service discovery and load balancing
https://blog.cong.moe/post/2021-03-06-grpc-go-discovery-lb/
gRPC It is a high performance RPC framework open source by Google and supports multiple languages It has been widely used in inter service invocation in cluster In order to large-scale traffic and avoid single point of failure, services are often deployed with multiple instances, so load balancing is a hard requirement
Note: everything in this article is based on grpc/grpc-go , the implementation of different languages will be different, which will not be explained later
Basic introduction
Because of the long connection between gRPC client and server, connection based load balancing does not make much sense, so gRPC load balancing is based on each call That is, you want the request sent by the same client to be load balanced to all servers
Client load balancing
Generally speaking, load balancers are independent and placed between service consumers and providers Agents usually need to save a copy of the request response, so there is performance consumption and additional delay When the request volume is large, lb may become a bottleneck, and at this time, lb single point of failure will affect the whole service
gRPC adopts client-side load balancing. The general principle is as follows:
- When the server starts, the registered address is to the registration center
- The client queries the target service address list from the registry, selects the target service through a certain load balancing strategy, and initiates a request
In this way, the client directly requests the server, so there is no additional performance overhead In this mode, the client will establish connections with multiple servers. Behind the gRPC client connection, a group of subConnections are maintained, and each subConnection will establish a connection with a server Refer to the documentation for details Load Balancing in gRPC.
How to use
According to the above analysis, we find that the focus of load balancing is actually service discovery, because service discovery provides the mapping of "server - > addrs", and the subsequent lb is only to select different connections in the existing addrs list according to different policies to send requests
In gRPC go client, the module responsible for parsing "server - > addrs" is google.golang.org/grpc/resolver Module
When the client establishes a connection, it will select the corresponding resolver registered globally in the resolver module according to the URI scheme. The selected resolver is responsible for resolving the corresponding URI addrs according to the uri Endpoint Therefore, we implement our own service discovery module by extending the global registration custom scheme resolver Reference details gRPC Name Resolution Documentation
The core of extending resolver is implementation resolver.Builder This interface
// m is a map from scheme to resolver builder. var m = make(map[string]Builder) type Target struct { Scheme string Authority string Endpoint string } // Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { // Build creates a new resolver for the given target. // // gRPC dial calls Build synchronously, and fails if the returned error is // not nil. Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // Scheme returns the scheme supported by this resolver. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. Scheme() string } // State contains the current Resolver state relevant to the ClientConn. type State struct { // Addresses is the latest set of resolved addresses for the target. Addresses []Address // ServiceConfig contains the result from parsing the latest service // config. If it is nil, it indicates no service config is present or the // resolver does not provide service configs. ServiceConfig *serviceconfig.ParseResult // Attributes contains arbitrary data about the resolver intended for // consumption by the load balancing policy. Attributes *attributes.Attributes } // Resolver watches for the updates on the specified target. // Updates include address updates and service config updates. type Resolver interface { // ResolveNow will be called by gRPC to try to resolve the target name // again. It's just a hint, resolver can ignore this if it's not necessary. // // It could be called multiple times concurrently. ResolveNow(ResolveNowOptions) // Close closes the resolver. Close() }
Copy
When establishing a connection with gRPC client, the address resolution part roughly includes the following steps:
- Find the corresponding resolver (Builder) in the global resolver map (m in the above code) according to the "Scheme" of the incoming address
- Resolve the address to 'Target' and call 'Resolver' as a parameter Instantiate build Resolver with build method
- Using user to invoke cc. in Resolver Updatestate the state passed in The address in addrs # establishes a connection
For example: register a test resolver, and the m value will change to {test: testResolver}. When the connection address is test:///xxx It will be matched to {testResolver, and the address will be resolved to & target {scheme: "test", authority: "", endpoint: "XXX"}, and then call} testResolver Parameters of build method
Sort it out:
- Each Scheme corresponds to a Builder
- Each different target of the same Scheme corresponds to a Resolver through builder Build instantiation
Static resolver example
Implement an example of writing a dead routing table:
// Define Scheme name const exampleScheme = "example" type exampleResolverBuilder struct { addrsStore map[string][]string } func NewExampleResolverBuilder(addrsStore map[string][]string) *exampleResolverBuilder { return &exampleResolverBuilder{addrsStore: addrsStore} } func (e *exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { // Initialize resolver and pass addrsStore in r := &exampleResolver{ target: target, cc: cc, addrsStore: e.addrsStore, } // Call start to initialize the address r.start() return r, nil } func (e *exampleResolverBuilder) Scheme() string { return exampleScheme } type exampleResolver struct { target resolver.Target cc resolver.ClientConn addrsStore map[string][]string } func (r *exampleResolver) start() { // Query the addrs corresponding to this Endpoint in the static routing table addrStrs := r.addrsStore[r.target.Endpoint] addrs := make([]resolver.Address, len(addrStrs)) for i, s := range addrStrs { addrs[i] = resolver.Address{Addr: s} } // Convert the addrs list to state and call CC Updatestate update address r.cc.UpdateState(resolver.State{Addresses: addrs}) } func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {} func (*exampleResolver) Close() {}
Copy
It can be used as follows:
// Register our resolver resolver.Register(NewExampleResolverBuilder(map[string][]string{ "test": []string{"localhost:8080", "localhost:8081"}, })) // Establish a connection to the corresponding scheme and configure load balancing conn, err := grpc.Dial("example:///test", grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
Copy
The principle is very simple. exampleResolver just updates the "addrs" found in the routing table to the "connection" of the bottom layer
resolver based on etcd
The main principles of etcd as service discovery are as follows:
- When the server starts, save a key to etcd as {{servername}} / {addr}}, and set a short {Lease
- The server , KeepAlive , regularly renews the contract key
- When the client starts, all key s with {serverName}} prefix are used to get the current service list
- The key with {serverName}} / as the client's {watch prefix can get the service list change event
Then realize:
1. Server registration
func Register(ctx context.Context, client *clientv3.Client, service, self string) error { resp, err := client.Grant(ctx, 2) if err != nil { return errors.Wrap(err, "etcd grant") } _, err = client.Put(ctx, strings.Join([]string{service, self}, "/"), self, clientv3.WithLease(resp.ID)) if err != nil { return errors.Wrap(err, "etcd put") } // respCh needs to be consumed, otherwise there will be warning respCh, err := client.KeepAlive(ctx, resp.ID) if err != nil { return errors.Wrap(err, "etcd keep alive") } for { select { case <-ctx.Done(): return nil case <-respCh: } } }
Copy
The code is very simple without too much explanation
2. Client
const ( // scheme type that etcd resolver is responsible for Scheme = "etcd" defaultFreq = time.Minute * 30 ) type Builder struct { client *clientv3.Client // Global routing table snapshot, unnecessary store map[string]map[string]struct{} } func NewBuilder(client *clientv3.Client) *Builder { return &Builder{ client: client, store: make(map[string]map[string]struct{}), } } func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { b.store[target.Endpoint] = make(map[string]struct{}) // Initialize etcd resolver r := &etcdResolver{ client: b.client, target: target, cc: cc, store: b.store[target.Endpoint], stopCh: make(chan struct{}, 1), rn: make(chan struct{}, 1), t: time.NewTicker(defaultFreq), } // Enable background update goroutine go r.start(context.Background()) // Full update service address r.ResolveNow(resolver.ResolveNowOptions{}) return r, nil } func (b *Builder) Scheme() string { return Scheme } type etcdResolver struct { client *clientv3.Client target resolver.Target cc resolver.ClientConn store map[string]struct{} stopCh chan struct{} // rn channel is used by ResolveNow() to force an immediate resolution of the target. rn chan struct{} t *time.Ticker } func (r *etcdResolver) start(ctx context.Context) { target := r.target.Endpoint w := clientv3.NewWatcher(r.client) rch := w.Watch(ctx, target+"/", clientv3.WithPrefix()) for { select { case <-r.rn: r.resolveNow() case <-r.t.C: r.ResolveNow(resolver.ResolveNowOptions{}) case <-r.stopCh: w.Close() return case wresp := <-rch: for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: r.store[string(ev.Kv.Value)] = struct{}{} case mvccpb.DELETE: delete(r.store, strings.Replace(string(ev.Kv.Key), target+"/", "", 1)) } } r.updateTargetState() } } } func (r *etcdResolver) resolveNow() { target := r.target.Endpoint resp, err := r.client.Get(context.Background(), target+"/", clientv3.WithPrefix()) if err != nil { r.cc.ReportError(errors.Wrap(err, "get init endpoints")) return } for _, kv := range resp.Kvs { r.store[string(kv.Value)] = struct{}{} } r.updateTargetState() } func (r *etcdResolver) updateTargetState() { addrs := make([]resolver.Address, len(r.store)) i := 0 for k := range r.store { addrs[i] = resolver.Address{Addr: k} i++ } r.cc.UpdateState(resolver.State{Addresses: addrs}) } // It will be called concurrently, so multiple full refreshes at the same time are prevented here func (r *etcdResolver) ResolveNow(o resolver.ResolveNowOptions) { select { case r.rn <- struct{}{}: default: } } func (r *etcdResolver) Close() { r.t.Stop() close(r.stopCh) }
Copy
The core of the above code is the func (r *etcdResolver) start(ctx context.Context), which does the following three things:
- The corresponding key prefix of watch , etcd updates the local cache and the , addrs of the underlying connection when a change event occurs
- r.rn channel , performs a full refresh when receiving the message, and the r.rn , message is generated when , ResolveNow , is called
- A 30 minute full refresh scheme is set globally. When the cycle arrives, a full refresh is performed
The use method is similar to static routing. You can view the complete code and examples zcong1993/grpc-example.
load balancing
After talking about so many resolver s, it's time to talk about load balancing. As we said, the difficulty of LB lies in service discovery. After service discovery is implemented, use the built-in lb, which only needs a simple parameter: grpc WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`).
Previously, you could use grpc With balancername ("round_robin"), but this method is abandoned Personally, I think the latter is clearer. There is also one on GitHub grpc-go/issues/3003 If you are interested in discussing this issue, you can view it
Write at the end
By studying gRPC load balancing, we can see the advantages and disadvantages of different types of load balancers. Although the client load balancing adopted by gRPC solves the performance problem, it also adds a lot of complexity to the client code. Although we users don't feel it, and the beginning of the article also explains that gRPC supports multiple languages, This means that each language client has to be implemented However, the current situation is that there are great differences in the implementation cycle of some new features for clients in different languages. For example, the support of new features for clients in c + +, golang and java will be the best, but the support of languages such as NodeJS is not so good, which is also a problem faced by gRPC for a long time For example, up to now, the NodeJS client library is still in the form of callback, and still does not support "server interceptor"