We can use zookeeper as the registry to realize service registration and discovery. The curator framework provides curator-x-discovery extension to realize out of the box service registration and discovery, but more often we choose to implement it ourselves. At that time, we need to pay extra attention to a feature of zookeeper, that is, wathcer.
In the micro service scenario, the watcher mechanism mainly provides the service notification function. For example, after Instance1 registers an emphemeral child node under the Service1 service node, one of its service consumers registers a child node watcher on the Service1 node according to the dependency configuration, as shown in the red key in the figure. A watcher of child node type will observe the child node of Service1, that is, the InstanceX node, but will not observe the grandson node config1. Then, after the Instance1 node hangs up, the watcher can notify the service consumer who registered itself. After the notification, the Watcher will be destroyed.
wacther principle framework
The watcher of zookeeper is mainly completed by the cooperation among client, server and watchManager, including two stages: Watcher registration and trigger.
On the client side, the registry is ZkWatchManager, which includes dataWatches, existWatches and childWatches. The server-side registry encapsulates two types of watchmanagers in the DataTree class, namely dataWatches and existWatches. dataWatches represents the data monitoring of the current node, and childWatches represents the monitoring of child nodes. It is also easy to understand the existWatches less than the client, because the client needs to judge whether the node exists.
In the registration phase, the getData and exists requests of the client can register dataWatches, and getChilden can register childWatches. In the trigger phase, the setData request will trigger the dataWatches of the current node, the create request will trigger the dataWatches of the current node and the childWatches of the parent node, and the delete request will trigger the dataWatches of the current node, the parent node and the child node, as well as the childWatches of the parent node.
watchManager contains two very important data structures: watchTable and watch2Paths. The former means path set < watcher >, and the latter means watcher set < Path >. Note that the meaning of watcher here means remote connection, so watchTable means that there may be listening connections of multiple consumers in a directory, and watch2Paths means that a consumer may establish listening to multiple directories. Obviously, listening to multiple directories will reuse one connection.
The transmission data (including the watcher information) in the request phase will be encapsulated in the request and response. For example, the getData request will be encapsulated
getDataRequest/getDataResponse. The watcher notification in the trigger phase communicates through the event. The server side will send a watcherEvent, and the client side will convert it into a watchedEvent for processing.
Each client will maintain two threads. SendThread is responsible for processing the request communication between the client and the server, such as sending getDataRequest, while EventThread is responsible for processing the event notification of the server, that is, the event of the watcher.
watcher registration source code
Let's take a look at some of the source code of watcher registration. First, on the client side, taking the getData method in Zookeeper as an example, a packet with watch true will be queued.
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { ... GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); ... }
You can see that GetDataRequest and GetDataResponse are encapsulated here, and the watch parameter is set to true in the request. Finally, submit the request. What the submitRequest does is put these into the event queue and wait for sendThread scheduling to send.
Then the request will be received by the server, and all requests will be processed by the server in the FinalRequestProcessor#processRequest method.
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ... byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); ... }
Here, we will judge the request type through some case s, or take getData as an example. Finally, we will call the getData method of DataTree. We mentioned that DataTree contains 2 watchers. In addition to obtaining data, it is natural to register dataWatchers.
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { DataNode n = (DataNode)this.nodes.get(path); if (n == null) { throw new NoNodeException(); } else { synchronized(n) { n.copyStat(stat); if (watcher != null) { this.dataWatches.addWatch(path, watcher); } return n.data; } } }
The addWatch method mainly stores the path of the data node and servercnxn (remote communication information) information into the watchTable and watch2Paths of the watchmanager. So far, the server has accepted the watcher and registered it in the watchmanager.
We will save a watchManager for the client itself, which is actually performed after receiving the getData response in the readresponse - > finishpacket method of the ClientCnxn$SendThread class.
private void finishPacket(ClientCnxn.Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized(p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; this.eventThread.queuePacket(p); } }
You can see that the register method of watchRegistration is called here, and it is loaded into the corresponding watchManager according to the request type (dataWatches, existWatches, childWatches).
The whole general sequence diagram can be referred to the following:
watcher trigger source code
In the wathcer trigger part, we also take the server DataTree class processing setData request as an example.
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { ... dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
You can see that after processing the data, triggerWatch is called. It does watchers from the previous watchManager and then calls process method one by one.
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
After obtaining the listener that needs to be triggered this time, it also removes itself in watchTable and watch2Paths, so the Watcher is single time. Here, the watchedEvent is encapsulated and stuffed into Watcher's process method. The process method is actually sending notifications. Taking NioServerCnxn, an implementation class of Watcher, as an example, calls its sendResponse method to send notification events to the client. Before sending, watchedEvent will be converted into watcherEvent for sending.
Then, the client first receives the request is still the readResponse method of ClientCnxn$sendThread. Here, after the watcherEvent is converted to watchedEvent, it is listed in the event queue of eventThread for subsequent processing.
... WatchedEvent we = new WatchedEvent(event); if (ClientCnxn.LOG.isDebugEnabled()) { ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId)); } ClientCnxn.this.eventThread.queueEvent(we); ...
Let's take a direct look at the run method of EventThread. The method is very simple, that is, constantly taking notification events from the waitingEvents event queue. The processEvent method is then invoked to handle the event.
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { ...ellipsis }
Here, simply take out the set of watchers to be notified of this event, and then call the process method of each Watcher in a loop. In the scenario of realizing service registration and discovery by ourselves, it is obvious that the process method of the watcher is customized by us.
Refer to the following for the timing diagram of the whole watcher trigger:
At this point, the whole watcher interaction logic of zookeeper has ended.