2021SC@SDUSC
preface
In the previous chapter, the server handles the request and writes it to the buffer of the client through the sendBuffer() method of NIOServerCnxn. In the previous step, the ConnectResponse response object has been initialized, so RSP will be called after writing Serialize (Bos, "connect") returns a ConnectResponse response object to the client. How to operate when the client receives this response object? See below.
Processing flow after the client receives the response from the server
In the first chapter, the sendThread thread of the client is in the state of waiting for the response of the server when it sends a request. At this time, the server sends back a response, and the sendThread of the client receives the response. At this time, you need to read the contents of the buffer. The specific process is as follows:
(1) because sendThread is in wait state, when CPU calls the thread, the thread executes the run() method, in which clientCnxnSocket. is called. doTransport (to, pendingqueue, clientcnxn. This) is the same as the previous request made by the client, except that the doIO method in doTransport calls different operation statements. The previous request calls write operation, and now the processing response is read operation.
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // If readable if (sockKey.isReadable()) { // Read the length of the response packet of the input buffer int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } // If the response packet of the input buffer is full, the same packet needs to be read twice, one for length and one for content if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); // If the length is read, allocate space for the packet length to the incomingBuffer if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) {// If it is not initialized, that is, the session is not established, the server must return ConnectResponse // Read the ConnectRequest and deserialize the contents of incomingBuffer into a ConnectResponse object readConnectResult(); // Continue reading subsequent responses enableRead(); // If there are write requests, ensure that the write event is ok if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } // Ready to read the next response lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); // Session setup completed initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } ...... ...... }
(2) Obviously, if the given scenario session is uninitialized at this time, call readConnectResult(), so you need to analyze how this method deserializes the contents of incomingBuffer into ConnectResponse object.
void readConnectResult() throws IOException { if (LOG.isTraceEnabled()) { StringBuilder buf = new StringBuilder("0x["); for (byte b : incomingBuffer.array()) { buf.append(Integer.toHexString(b)).append(","); } buf.append("]"); if (LOG.isTraceEnabled()) { LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString()); } } // Deserialize incomingBuffer into ConnectResponse ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = new ConnectResponse(); conRsp.deserialize(bbia, "connect"); // Read only flag bit boolean isRO = false; try { isRO = bbia.readBool("readOnly"); } catch (IOException e) { // this is ok -- just a packet from an old server which // doesn't contain readOnly field LOG.warn("Connected to an old server; r-o mode will be unavailable"); } // Session ID returned by the server this.sessionId = conRsp.getSessionId(); // Subsequent processing, initialize some parameters of the client, and finally trigger the WatchedEvent thread sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); }
(3) After deserialization, you need to care about subsequent processing. At this time, you need to enter the onConnected method for analysis. onConnected is a specific method under ClientCnxn to initialize some parameters of the client in order to finally trigger the WatchedEvent thread.
void onConnected( int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { ...... ...... if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } // Initialize the session related parameters of the client readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED); seenRwServerBefore |= !isRO; LOG.info( "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}", clientCnxnSocket.getRemoteSocketAddress(), Long.toHexString(sessionId), negotiatedSessionTimeout, (isRO ? " (READ-ONLY mode)" : "")); //A SyncConnected event is triggered, and the EventThread asynchronously notifies the registered watcher to handle it KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null)); }
(4) When the EventThread asynchronously notifies the registered watcher for processing, it will enter the queueEvent for processing.
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } // EventThread sync session state sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event // Find out those watchers that need to be notified, and the main thread directly calls the corresponding watcher interface watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<Watcher>(); watchers.addAll(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing // Commit asynchronous queue processing waitingEvents.add(pair); }
(5) At this time, the EventThread thread will execute run(), run to fetch the event from the waitingEvents queue, and call processEvent to process the event.
public void run() { try { // Running isRunning = true; while (true) { // Remove the event handler from the waiting event queue Object event = waitingEvents.take(); // If it's a death if (event == eventOfDeath) { // eliminate wasKilled = true; } else {// Otherwise, handle the event processEvent(event); } // If it's a death if (wasKilled) { // Synchronous processing synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); }
(6) Obviously, we are more concerned about how EventThread calls processEvent to handle events. Since processEvent has hundreds of lines of code, here we choose the important ones to explain. In addition, since the watcher is not the main part of this chapter, we will not elaborate here. If you need to know, you can go to this blog to check: https://blog.csdn.net/par_ser?spm=1001.2014.3001.5343
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // Each wathcer handles events WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } ...... ...... }
summary
Here, the network communication process between the client and the server has been analyzed. The session between the client and the server has been established successfully, and then subsequent business processing can be carried out. This paper introduces the network communication process between ZooKeeper client and server, and explains in detail how the client and server interact with data. Although the given scenario is to establish a connection, other network communication business processes are similar to this process.