[Golang] gRPC service discovery and load balancing based on ETCD


Three modes of load balancing: https://grpc.io/blog/grpc-load-balancing/

  • Proxy
  • Thick client(Client side)
  • Lookaside Load Balancing (Client side)

Basic usage

server side registration service

Register the service information to the kv center when the service starts

import(
    ...
    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/naming/endpoints"
    ...
)

func Register(etcdBrokers []string, srvName, addr string, ttl int64) error {
	var err error
	if cli == nil {
		cli, err = clientv3.New(clientv3.Config{
			Endpoints:   etcdBrokers,
			DialTimeout: 5 * time.Second,
		})
		if err != nil {
			log.Printf("connect to etcd err:%s", err)
			return err
		}
	}

	em, err := endpoints.NewManager(cli, srvName)
	if err != nil {
		return err
	}

	err = em.AddEndpoint(context.TODO(), fmt.Sprintf("%v/%v",srvName, addr), endpoints.Endpoint{Addr: addr})
	if err != nil {
		return err
	}
	//TODO withAlive
	return nil
}

func UnRegister(srvName, addr string) error {
	if cli != nil {
		em, err := endpoints.NewManager(cli, srvName)
		if err != nil {
			return err
		}
		err = em.DeleteEndpoint(context.TODO(), fmt.Sprintf("%v/%v", srvName, addr))
		if err != nil {
			return err
		}
		return err
	}

	return nil
}

When the service starts, call the Register method to Register the monitored port with the registry. When the service stops, call UnRegister to cancel the registration.

client side discovery service

Implement Google golang. Builder interface under org / grpc / resolver package

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

The official gRPC naming and discovery implementation of etcd: https://etcd.io/docs/v3.5/dev-guide/grpc_naming/

The corresponding load balancing strategy is used for client Dail y:

import(
    ...
    "go.etcd.io/etcd/client/v3/naming/resolver"
    ...
)
func NewClient(etcdBrokers []string, srvName string) (*Client, error) {
	cli, err := clientv3.NewFromURLs(etcdBrokers)
	etcdResolver, err := resolver.NewBuilder(cli)

	// Set up a connection to the server.
	addr := fmt.Sprintf("etcd:///%s", srvName) // "schema://[authority]/service"
	conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp), grpc.WithResolvers(etcdResolver))
	if err != nil {
		return nil, err
	}
	return &Client{
		FileSrvClient: NewFileSrvClient(conn),
		conn:          conn,
	}, nil
}

Use lease

Since the service may exit abnormally, so the UnRegister interface will not be called, Lease needs to be used to maintain the heartbeat detection of the service by Etcd.

The following warnings may appear when using lease ⚠️:

"Leave keepalive response queue is full; dropping response send" warning

reason:
KeepAlive attempts to keep the given lease valid forever. If the keep alive response published to the channel is not consumed in time, the channel may become full. When it is full, the leasing client will continue to send the keep alive request to the etcd server, but will discard the response until the channel has the ability to send more responses. (this will enter the branch of printing warn)

To register a service node with heartbeat:

func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
	em := endpoints.NewManager(c, service)
	return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid));
}

// How to get leaseID
func test(etcdBrokers []string, srvName, addr string, ttl int64) {
    var err error
	if cli == nil {
		cli, err = clientv3.New(clientv3.Config{
			Endpoints:   etcdBrokers,
			DialTimeout: 5 * time.Second,
		})
		if err != nil {
			log.Printf("connect to etcd err:%s", err)
			return err
		}
	}

    //get leaseID
    resp, err := cli.Grant(context.TODO(), ttl)
    if err != nil {
        return err
    }
    
    err = etcdAdd(cli, resp.ID, srvName, addr)
	if err != nil {
		return err
	}
	
	// withAlive
	ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
    if kaerr != nil {
        return err
    }
    
    go func() {
        for {	//You need to constantly take out the response of the lease
    		ka := <-ch
    		fmt.Println("ttl:", ka.TTL)
        }
    }
    
	return nil
}

analysis:
Each LeaseID corresponds to a unique keepalive. The keepalive structure is as follows:

type lessor struct {
	...
	keepAlives map[LeaseID]*keepAlive
    ...
}

// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
	chs  []chan<- *LeaseKeepAliveResponse
	ctxs []context.Context
	// deadline is the time the keep alive channels close if no response
	deadline time.Time
	// nextKeepAlive is when to send the next keep alive message
	nextKeepAlive time.Time
	// donec is closed on lease revoke, expiration, or cancel.
	donec chan struct{}
}

When KeepAlive is called, a leasekepaliveresponse channel with 16 buffer size will be initialized for Lease, which is used to receive the leasekepalive response continuously read from etcd Server, and add this channel to the channel array of KeepAlive bound by the Lease,

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

	l.mu.Lock()
    ...
	ka, ok := l.keepAlives[id]
	if !ok {
		// create fresh keep alive
		ka = &keepAlive{
			chs:           []chan<- *LeaseKeepAliveResponse{ch},
			ctxs:          []context.Context{ctx},
			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
			nextKeepAlive: time.Now(),
			donec:         make(chan struct{}),
		}
		l.keepAlives[id] = ka
	} else {
		// add channel and context to existing keep alive
		ka.ctxs = append(ka.ctxs, ctx)
		ka.chs = append(ka.chs, ch)
	}
	l.mu.Unlock()

	go l.keepAliveCtxCloser(ctx, id, ka.donec)
	l.firstKeepAliveOnce.Do(func() {
		go l.recvKeepAliveLoop()
		go l.deadlineLoop()
	})

	return ch, nil
}

Send the received leasekepaliveresponse to all channel s of KeepAlive bound to the corresponding Lease.

// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
	karesp := &LeaseKeepAliveResponse{
		ResponseHeader: resp.GetHeader(),
		ID:             LeaseID(resp.ID),
		TTL:            resp.TTL,
	}
    ...
	ka, ok := l.keepAlives[karesp.ID]
    ...
	for _, ch := range ka.chs {
		select {
		case ch <- karesp:
		default:
			if l.lg != nil {
				l.lg.Warn("lease keepalive response queue is full; dropping response send",
					zap.Int("queue-size", len(ch)),
					zap.Int("queue-capacity", cap(ch)),
				)
			}
		}
		// still advance in order to rate-limit keep-alive sends
		ka.nextKeepAlive = nextKeepAlive
	}
}

Rewrite load balancing policy

To use a custom load balancing policy, you need to use gRPC's balancer Register (balancer. Builder) registers the parser.

google. golang. Base.org/grpc/balancer/base package Newbalancerbuilder as
It realizes the management of node state and sub connection by the Balancer interface.

So we just need to:

  1. Define customPickerBuilder implementation < base Pickerbuilder >, used to return < balancer Picker>.
  2. Define customPicker implementation < balancer Picker >, which is used to return the specific SubConn.

reference material

Old version (reference only):

  • [grpc go implements service discovery mechanism based on etcd] http://morecrazy.github.io/2018/08/14/grpc-go%E5%9F%BA%E4%BA%8Eetcd%E5%AE%9E%E7%8E%B0%E6%9C%8D%E5%8A%A1%E5%8F%91%E7%8E%B0%E6%9C%BA%E5%88%B6/#l3 –l4%E4%BC%A0%E8%BE%93%E5%B1%82%E4%B8%8El7%E5%BA%94%E7%94%A8
  • [gRPC load balancing (custom load balancing strategy)] https://www.cnblogs.com/FireworksEasyCool/p/12924701.html

Keywords: Go Load Balance etcd

Added by stearnol on Thu, 24 Feb 2022 12:54:58 +0200