preface
Part I This paper introduces how to use etcd to realize service discovery. On the premise of service discovery based on etcd, this paper introduces how to realize load balancing of gRPC client.
gRPC load balancing
The official gRPC document provides a load balancing scheme for gRPC Load Balancing in gRPC , this scheme is designed for gRPC, and we will analyze it below.
1. Load balancing each call
Load balancing in gRPC is based on every call, not every connection. In other words, even if all requests come from one client, we still want them to be load balanced on all servers.
2. Load balancing method
- Centralized (Proxy Model)
There is an independent load balancing (LB) between service consumers and service providers, which is usually implemented by special hardware devices such as F5, or based on software such as LVS, HAproxy, etc. There are address mapping tables of all services on LB, which are usually registered by operation and maintenance configuration. When the service consumer calls a target service, it initiates a request to LB, and LB forwards the request to the target service with a certain strategy, such as round robin for load balancing. LB generally has the ability of health examination and can automatically remove unhealthy service instances.
The main problem of this scheme is that a level is added between the service consumer and the provider, which has a certain performance overhead, and the efficiency is low when the request quantity is large.
Some readers may think that there is such a problem in centralized load balancing. Once the load balancing service is suspended, the whole system will not be able to use.
Solution: DNS load balancing can be carried out for load balancing service. By setting multiple IP addresses for a domain name and polling the return load balancing service address each time DNS resolution, a simple DNS load balancing can be realized.
- Balancing aware client
In view of the shortcomings of the first scheme, this scheme integrates the functions of LB into the process of service consumer, also known as soft load or client load scheme. When the service provider starts, it first registers the service address in the service registry, and at the same time reports the heartbeat to the service registry regularly to indicate the survival status of the service, which is equivalent to a health check. When the service consumer wants to access a service, it queries the service registration table through the built-in LB component, caches and periodically refreshes the target service address list, and then balances the service address with a certain load The policy selects a target service address, and finally initiates a request to the target service. LB and service discovery capabilities are distributed within each service consumer's process. At the same time, the service consumer and service provider are directly called, with no additional overhead and good performance.
The main problem of this scheme is to write and maintain the load balancing strategy with multiple languages and versions of clients, which greatly complicates the code of clients.
- Independent LB service (External Load Balancing Service)
This scheme is a compromise scheme for the deficiency of the second scheme, and its principle is basically similar to the second scheme.
The difference is that the LB and service discovery functions are moved out of the process and become a separate process on the host. When one or more services on the host want to access the target service, they all do service discovery and load balancing through independent LB processes on the same host. The scheme is also a distributed scheme without any single point of problem. The in-process call performance between service caller and LB is good. At the same time, the scheme also simplifies the service caller and does not need to develop customer libraries for different languages.
This article will introduce the second load balancing method, client load balancing.
Load balancing of gRPC client
gRPC has provided a simple load balancing strategy (such as Round Robin). We only need to implement the Builder and Resolver interface provided by gRPC to complete the load balancing of gRPC clients.
type Builder interface { Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) Scheme() string }
Builder interface: create a resolver (called service discovery in this article) to monitor name resolution updates.
Build method: create a new resolver for the given target, which is executed when grpc.Dial() is called.
Scheme method: returns the scheme supported by this resolver. For scheme definition, please refer to: https://github.com/grpc/grpc/blob/master/doc/naming.md
type Resolver interface { ResolveNow(ResolveNowOption) Close() }
Resolver interface: monitors updates to specified targets, including address updates and service configuration updates.
ResolveNow method: called by gRPC to try to resolve the target name again. For prompt only, this method can be ignored.
Close method: close resolver
According to the above two interfaces, we write the function of service discovery in the Build method, return the obtained load balancing service address to the client, and monitor the service update to modify the client connection.
Modify the service discovery code, discovery.go
package etcdv3 import ( "context" "log" "sync" "time" "github.com/coreos/etcd/mvcc/mvccpb" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc/resolver" ) const schema = "grpclb" //ServiceDiscovery service discovery type ServiceDiscovery struct { cli *clientv3.Client //etcd client cc resolver.ClientConn serverList map[string]resolver.Address //Service list lock sync.Mutex } //New service discovery new discovery service func NewServiceDiscovery(endpoints []string) resolver.Builder { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } return &ServiceDiscovery{ cli: cli, } } //Build creates a new 'resolver' for a given target, which is executed when 'grpc.Dial()' is called func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { log.Println("Build") s.cc = cc s.serverList = make(map[string]resolver.Address) prefix := "/" + target.Scheme + "/" + target.Endpoint + "/" //Get the existing key according to the prefix resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return nil, err } for _, ev := range resp.Kvs { s.SetServiceList(string(ev.Key), string(ev.Value)) } s.cc.NewAddress(s.getServices()) //Monitor prefix, modify changed server go s.watcher(prefix) return s, nil } // ResolveNow monitoring target updates func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) { log.Println("ResolveNow") } //Scheme return schema func (s *ServiceDiscovery) Scheme() string { return schema } //Close func (s *ServiceDiscovery) Close() { log.Println("Close") s.cli.Close() } //watcher listening prefix func (s *ServiceDiscovery) watcher(prefix string) { rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix()) log.Printf("watching prefix:%s now...", prefix) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //Add or modify s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) case mvccpb.DELETE: //delete s.DelServiceList(string(ev.Kv.Key)) } } } } //SetServiceList add service address func (s *ServiceDiscovery) SetServiceList(key, val string) { s.lock.Lock() defer s.lock.Unlock() s.serverList[key] = resolver.Address{Addr: val} s.cc.NewAddress(s.getServices()) log.Println("put key :", key, "val:", val) } //DelServiceList delete service address func (s *ServiceDiscovery) DelServiceList(key string) { s.lock.Lock() defer s.lock.Unlock() delete(s.serverList, key) s.cc.NewAddress(s.getServices()) log.Println("del key:", key) } //GetServices get service address func (s *ServiceDiscovery) getServices() []resolver.Address { addrs := make([]resolver.Address, 0, len(s.serverList)) for _, v := range s.serverList { addrs = append(addrs, v) } return addrs }
The code is mainly modified as follows:
-
Transfer the obtained service address to resolver.Address for gRPC client connection.
-
Modify the key format according to the schema definition rules.
Service registration mainly modifies the key storage format, register.go
package etcdv3 import ( "context" "log" "time" "go.etcd.io/etcd/clientv3" ) //ServiceRegister create lease registration service type ServiceRegister struct { cli *clientv3.Client //etcd client leaseID clientv3.LeaseID //Lease ID //Lease keepalieve corresponding chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string //key val string //value } //New service register new registration service func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } ser := &ServiceRegister{ cli: cli, key: "/" + schema + "/" + serName + "/" + addr, val: addr, } //Apply for lease setting time keepalive if err := ser.putKeyWithLease(lease); err != nil { return nil, err } return ser, nil } //Set up lease func (s *ServiceRegister) putKeyWithLease(lease int64) error { //Set lease time resp, err := s.cli.Grant(context.Background(), lease) if err != nil { return err } //Register service and bind lease _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) if err != nil { return err } //Set renewal period to send demand request leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) if err != nil { return err } s.leaseID = resp.ID s.keepAliveChan = leaseRespChan log.Printf("Put key:%s val:%s success!", s.key, s.val) return nil } //ListenLeaseRespChan listens for renewal func (s *ServiceRegister) ListenLeaseRespChan() { for leaseKeepResp := range s.keepAliveChan { log.Println("Renewal successful", leaseKeepResp) } log.Println("Close renewal") } // Close logoff service func (s *ServiceRegister) Close() error { //Cancellation of lease if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { return err } log.Println("Cancellation of lease") return s.cli.Close() }
The client can modify some codes of gRPC connection service:
func main() { r := etcdv3.NewServiceDiscovery(EtcdEndpoints) resolver.Register(r) // Connect to server conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithBalancerName("round_robin"), grpc.WithInsecure()) if err != nil { log.Fatalf("net.Connect err: %v", err) } defer conn.Close() // Establish gRPC connection grpcClient = pb.NewSimpleClient(conn)
A simple load balancing strategy, round robin, is built into gRPC to call services by polling according to the load balancing address.
When the server starts, register the service address in etcd:
func main() { // Listen to local port listener, err := net.Listen(Network, Address) if err != nil { log.Fatalf("net.Listen err: %v", err) } log.Println(Address + " net.Listing...") // New gRPC server instance grpcServer := grpc.NewServer() // Register our service with gRPC server pb.RegisterSimpleServer(grpcServer, &SimpleService{}) //Register service to etcd ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5) if err != nil { log.Fatalf("register service err: %v", err) } defer ser.Close() //Use the server Serve() method and our port information area to implement blocking waiting until the process is killed or Stop() is called err = grpcServer.Serve(listener) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } }
Operation effect
Let's start and register three services
Then the client makes a call
See the request received by the server
Shut down the localhost:8000 service, and the remaining localhost:8001 and localhost:8002 services receive the request
Reopen the localhost:8000 service
As you can see, gRPC client load balancing works well.
summary
This paper introduces the implementation of gRPC client load balancing, which simply implements the function of gRPC load balancing. However, it is troublesome to connect other languages. Each language needs to implement a set of service discovery and load policies. If a more complex load policy is needed, the client code needs to be modified to complete it.
The next section describes how to implement the officially recommended External Load Balancing Service.
Source address: https://github.com/Bingjian-Zhu/etcd-example
reference resources: