Three modes of load balancing: https://grpc.io/blog/grpc-load-balancing/
- Proxy
- Thick client(Client side)
- Lookaside Load Balancing (Client side)
Basic usage
server side registration service
Register the service information to the kv center when the service starts
import( ... "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" ... ) func Register(etcdBrokers []string, srvName, addr string, ttl int64) error { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: etcdBrokers, DialTimeout: 5 * time.Second, }) if err != nil { log.Printf("connect to etcd err:%s", err) return err } } em, err := endpoints.NewManager(cli, srvName) if err != nil { return err } err = em.AddEndpoint(context.TODO(), fmt.Sprintf("%v/%v",srvName, addr), endpoints.Endpoint{Addr: addr}) if err != nil { return err } //TODO withAlive return nil } func UnRegister(srvName, addr string) error { if cli != nil { em, err := endpoints.NewManager(cli, srvName) if err != nil { return err } err = em.DeleteEndpoint(context.TODO(), fmt.Sprintf("%v/%v", srvName, addr)) if err != nil { return err } return err } return nil }
When the service starts, call the Register method to Register the monitored port with the registry. When the service stops, call UnRegister to cancel the registration.
client side discovery service
Implement Google golang. Builder interface under org / grpc / resolver package
// Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { // Build creates a new resolver for the given target. // // gRPC dial calls Build synchronously, and fails if the returned error is // not nil. Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // Scheme returns the scheme supported by this resolver. // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. Scheme() string }
The official gRPC naming and discovery implementation of etcd: https://etcd.io/docs/v3.5/dev-guide/grpc_naming/
The corresponding load balancing strategy is used for client Dail y:
import( ... "go.etcd.io/etcd/client/v3/naming/resolver" ... ) func NewClient(etcdBrokers []string, srvName string) (*Client, error) { cli, err := clientv3.NewFromURLs(etcdBrokers) etcdResolver, err := resolver.NewBuilder(cli) // Set up a connection to the server. addr := fmt.Sprintf("etcd:///%s", srvName) // "schema://[authority]/service" conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp), grpc.WithResolvers(etcdResolver)) if err != nil { return nil, err } return &Client{ FileSrvClient: NewFileSrvClient(conn), conn: conn, }, nil }
Use lease
Since the service may exit abnormally, so the UnRegister interface will not be called, Lease needs to be used to maintain the heartbeat detection of the service by Etcd.
The following warnings may appear when using lease ⚠️:
"Leave keepalive response queue is full; dropping response send" warning
reason:
KeepAlive attempts to keep the given lease valid forever. If the keep alive response published to the channel is not consumed in time, the channel may become full. When it is full, the leasing client will continue to send the keep alive request to the etcd server, but will discard the response until the channel has the ability to send more responses. (this will enter the branch of printing warn)
To register a service node with heartbeat:
func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error { em := endpoints.NewManager(c, service) return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid)); } // How to get leaseID func test(etcdBrokers []string, srvName, addr string, ttl int64) { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: etcdBrokers, DialTimeout: 5 * time.Second, }) if err != nil { log.Printf("connect to etcd err:%s", err) return err } } //get leaseID resp, err := cli.Grant(context.TODO(), ttl) if err != nil { return err } err = etcdAdd(cli, resp.ID, srvName, addr) if err != nil { return err } // withAlive ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID) if kaerr != nil { return err } go func() { for { //You need to constantly take out the response of the lease ka := <-ch fmt.Println("ttl:", ka.TTL) } } return nil }
analysis:
Each LeaseID corresponds to a unique keepalive. The keepalive structure is as follows:
type lessor struct { ... keepAlives map[LeaseID]*keepAlive ... } // keepAlive multiplexes a keepalive for a lease over multiple channels type keepAlive struct { chs []chan<- *LeaseKeepAliveResponse ctxs []context.Context // deadline is the time the keep alive channels close if no response deadline time.Time // nextKeepAlive is when to send the next keep alive message nextKeepAlive time.Time // donec is closed on lease revoke, expiration, or cancel. donec chan struct{} }
When KeepAlive is called, a leasekepaliveresponse channel with 16 buffer size will be initialized for Lease, which is used to receive the leasekepalive response continuously read from etcd Server, and add this channel to the channel array of KeepAlive bound by the Lease,
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize) l.mu.Lock() ... ka, ok := l.keepAlives[id] if !ok { // create fresh keep alive ka = &keepAlive{ chs: []chan<- *LeaseKeepAliveResponse{ch}, ctxs: []context.Context{ctx}, deadline: time.Now().Add(l.firstKeepAliveTimeout), nextKeepAlive: time.Now(), donec: make(chan struct{}), } l.keepAlives[id] = ka } else { // add channel and context to existing keep alive ka.ctxs = append(ka.ctxs, ctx) ka.chs = append(ka.chs, ch) } l.mu.Unlock() go l.keepAliveCtxCloser(ctx, id, ka.donec) l.firstKeepAliveOnce.Do(func() { go l.recvKeepAliveLoop() go l.deadlineLoop() }) return ch, nil }
Send the received leasekepaliveresponse to all channel s of KeepAlive bound to the corresponding Lease.
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { karesp := &LeaseKeepAliveResponse{ ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, } ... ka, ok := l.keepAlives[karesp.ID] ... for _, ch := range ka.chs { select { case ch <- karesp: default: if l.lg != nil { l.lg.Warn("lease keepalive response queue is full; dropping response send", zap.Int("queue-size", len(ch)), zap.Int("queue-capacity", cap(ch)), ) } } // still advance in order to rate-limit keep-alive sends ka.nextKeepAlive = nextKeepAlive } }
Rewrite load balancing policy
To use a custom load balancing policy, you need to use gRPC's balancer Register (balancer. Builder) registers the parser.
google. golang. Base.org/grpc/balancer/base package Newbalancerbuilder as
It realizes the management of node state and sub connection by the Balancer interface.
So we just need to:
- Define customPickerBuilder implementation < base Pickerbuilder >, used to return < balancer Picker>.
- Define customPicker implementation < balancer Picker >, which is used to return the specific SubConn.
reference material
Old version (reference only):
- [grpc go implements service discovery mechanism based on etcd] http://morecrazy.github.io/2018/08/14/grpc-go%E5%9F%BA%E4%BA%8Eetcd%E5%AE%9E%E7%8E%B0%E6%9C%8D%E5%8A%A1%E5%8F%91%E7%8E%B0%E6%9C%BA%E5%88%B6/#l3 –l4%E4%BC%A0%E8%BE%93%E5%B1%82%E4%B8%8El7%E5%BA%94%E7%94%A8
- [gRPC load balancing (custom load balancing strategy)] https://www.cnblogs.com/FireworksEasyCool/p/12924701.html