gRPC load balancing (client load balancing)

preface

Part I This paper introduces how to use etcd to realize service discovery. On the premise of service discovery based on etcd, this paper introduces how to realize load balancing of gRPC client.

gRPC load balancing

The official gRPC document provides a load balancing scheme for gRPC Load Balancing in gRPC , this scheme is designed for gRPC, and we will analyze it below.

1. Load balancing each call

Load balancing in gRPC is based on every call, not every connection. In other words, even if all requests come from one client, we still want them to be load balanced on all servers.

2. Load balancing method

  • Centralized (Proxy Model)

There is an independent load balancing (LB) between service consumers and service providers, which is usually implemented by special hardware devices such as F5, or based on software such as LVS, HAproxy, etc. There are address mapping tables of all services on LB, which are usually registered by operation and maintenance configuration. When the service consumer calls a target service, it initiates a request to LB, and LB forwards the request to the target service with a certain strategy, such as round robin for load balancing. LB generally has the ability of health examination and can automatically remove unhealthy service instances.

The main problem of this scheme is that a level is added between the service consumer and the provider, which has a certain performance overhead, and the efficiency is low when the request quantity is large.

Some readers may think that there is such a problem in centralized load balancing. Once the load balancing service is suspended, the whole system will not be able to use.
Solution: DNS load balancing can be carried out for load balancing service. By setting multiple IP addresses for a domain name and polling the return load balancing service address each time DNS resolution, a simple DNS load balancing can be realized.

  • Balancing aware client

In view of the shortcomings of the first scheme, this scheme integrates the functions of LB into the process of service consumer, also known as soft load or client load scheme. When the service provider starts, it first registers the service address in the service registry, and at the same time reports the heartbeat to the service registry regularly to indicate the survival status of the service, which is equivalent to a health check. When the service consumer wants to access a service, it queries the service registration table through the built-in LB component, caches and periodically refreshes the target service address list, and then balances the service address with a certain load The policy selects a target service address, and finally initiates a request to the target service. LB and service discovery capabilities are distributed within each service consumer's process. At the same time, the service consumer and service provider are directly called, with no additional overhead and good performance.

The main problem of this scheme is to write and maintain the load balancing strategy with multiple languages and versions of clients, which greatly complicates the code of clients.

  • Independent LB service (External Load Balancing Service)

This scheme is a compromise scheme for the deficiency of the second scheme, and its principle is basically similar to the second scheme.

The difference is that the LB and service discovery functions are moved out of the process and become a separate process on the host. When one or more services on the host want to access the target service, they all do service discovery and load balancing through independent LB processes on the same host. The scheme is also a distributed scheme without any single point of problem. The in-process call performance between service caller and LB is good. At the same time, the scheme also simplifies the service caller and does not need to develop customer libraries for different languages.

This article will introduce the second load balancing method, client load balancing.

Load balancing of gRPC client

gRPC has provided a simple load balancing strategy (such as Round Robin). We only need to implement the Builder and Resolver interface provided by gRPC to complete the load balancing of gRPC clients.

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
	Scheme() string
}

Builder interface: create a resolver (called service discovery in this article) to monitor name resolution updates.
Build method: create a new resolver for the given target, which is executed when grpc.Dial() is called.
Scheme method: returns the scheme supported by this resolver. For scheme definition, please refer to: https://github.com/grpc/grpc/blob/master/doc/naming.md

type Resolver interface {
	ResolveNow(ResolveNowOption)
	Close()
}

Resolver interface: monitors updates to specified targets, including address updates and service configuration updates.
ResolveNow method: called by gRPC to try to resolve the target name again. For prompt only, this method can be ignored.
Close method: close resolver

According to the above two interfaces, we write the function of service discovery in the Build method, return the obtained load balancing service address to the client, and monitor the service update to modify the client connection.
Modify the service discovery code, discovery.go

package etcdv3

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc/resolver"
)

const schema = "grpclb"

//ServiceDiscovery service discovery
type ServiceDiscovery struct {
	cli        *clientv3.Client //etcd client
	cc         resolver.ClientConn
	serverList map[string]resolver.Address //Service list
	lock       sync.Mutex
}

//New service discovery new discovery service
func NewServiceDiscovery(endpoints []string) resolver.Builder {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli: cli,
	}
}

//Build creates a new 'resolver' for a given target, which is executed when 'grpc.Dial()' is called
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
	log.Println("Build")
	s.cc = cc
	s.serverList = make(map[string]resolver.Address)
	prefix := "/" + target.Scheme + "/" + target.Endpoint + "/"
	//Get the existing key according to the prefix
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return nil, err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}
	s.cc.NewAddress(s.getServices())
	//Monitor prefix, modify changed server
	go s.watcher(prefix)
	return s, nil
}

// ResolveNow monitoring target updates
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
	log.Println("ResolveNow")
}

//Scheme return schema
func (s *ServiceDiscovery) Scheme() string {
	return schema
}

//Close
func (s *ServiceDiscovery) Close() {
	log.Println("Close")
	s.cli.Close()
}

//watcher listening prefix
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //Add or modify
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //delete
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList add service address
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serverList[key] = resolver.Address{Addr: val}
	s.cc.NewAddress(s.getServices())
	log.Println("put key :", key, "val:", val)
}

//DelServiceList delete service address
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serverList, key)
	s.cc.NewAddress(s.getServices())
	log.Println("del key:", key)
}

//GetServices get service address
func (s *ServiceDiscovery) getServices() []resolver.Address {
	addrs := make([]resolver.Address, 0, len(s.serverList))

	for _, v := range s.serverList {
		addrs = append(addrs, v)
	}
	return addrs
}

The code is mainly modified as follows:

  1. Transfer the obtained service address to resolver.Address for gRPC client connection.

  2. Modify the key format according to the schema definition rules.

Service registration mainly modifies the key storage format, register.go

package etcdv3

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

//ServiceRegister create lease registration service
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //Lease ID
	//Lease keepalieve corresponding chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//New service register new registration service
func NewServiceRegister(endpoints []string, serName, addr string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	ser := &ServiceRegister{
		cli: cli,
		key: "/" + schema + "/" + serName + "/" + addr,
		val: addr,
	}

	//Apply for lease setting time keepalive
	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

//Set up lease
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	//Set lease time
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	//Register service and bind lease
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//Set renewal period to send demand request
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan listens for renewal
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("Renewal successful", leaseKeepResp)
	}
	log.Println("Close renewal")
}

// Close logoff service
func (s *ServiceRegister) Close() error {
	//Cancellation of lease
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("Cancellation of lease")
	return s.cli.Close()
}

The client can modify some codes of gRPC connection service:

func main() {
	r := etcdv3.NewServiceDiscovery(EtcdEndpoints)
	resolver.Register(r)
	// Connect to server
	conn, err := grpc.Dial(r.Scheme()+"://8.8.8.8/simple_grpc", grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
	if err != nil {
		log.Fatalf("net.Connect err: %v", err)
	}
	defer conn.Close()

	// Establish gRPC connection
	grpcClient = pb.NewSimpleClient(conn)

A simple load balancing strategy, round robin, is built into gRPC to call services by polling according to the load balancing address.

When the server starts, register the service address in etcd:

func main() {
	// Listen to local port
	listener, err := net.Listen(Network, Address)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	log.Println(Address + " net.Listing...")
	// New gRPC server instance
	grpcServer := grpc.NewServer()
	// Register our service with gRPC server
	pb.RegisterSimpleServer(grpcServer, &SimpleService{})
	//Register service to etcd
	ser, err := etcdv3.NewServiceRegister(EtcdEndpoints, SerName, Address, 5)
	if err != nil {
		log.Fatalf("register service err: %v", err)
	}
	defer ser.Close()
	//Use the server Serve() method and our port information area to implement blocking waiting until the process is killed or Stop() is called
	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("grpcServer.Serve err: %v", err)
	}
}

Operation effect

Let's start and register three services

Then the client makes a call

See the request received by the server

Shut down the localhost:8000 service, and the remaining localhost:8001 and localhost:8002 services receive the request

Reopen the localhost:8000 service

As you can see, gRPC client load balancing works well.

summary

This paper introduces the implementation of gRPC client load balancing, which simply implements the function of gRPC load balancing. However, it is troublesome to connect other languages. Each language needs to implement a set of service discovery and load policies. If a more complex load policy is needed, the client code needs to be modified to complete it.

The next section describes how to implement the officially recommended External Load Balancing Service.

Source address: https://github.com/Bingjian-Zhu/etcd-example

reference resources:

Keywords: github DNS Google network

Added by zszucs on Tue, 19 May 2020 04:47:29 +0300