ZooKeeper source code analysis of the complete network communication process

2021SC@SDUSC

preface

In the previous chapter, the server handles the request and writes it to the buffer of the client through the sendBuffer() method of NIOServerCnxn. In the previous step, the ConnectResponse response object has been initialized, so RSP will be called after writing Serialize (Bos, "connect") returns a ConnectResponse response object to the client. How to operate when the client receives this response object? See below.

Processing flow after the client receives the response from the server

In the first chapter, the sendThread thread of the client is in the state of waiting for the response of the server when it sends a request. At this time, the server sends back a response, and the sendThread of the client receives the response. At this time, you need to read the contents of the buffer. The specific process is as follows:

(1) because sendThread is in wait state, when CPU calls the thread, the thread executes the run() method, in which clientCnxnSocket. is called. doTransport (to, pendingqueue, clientcnxn. This) is the same as the previous request made by the client, except that the doIO method in doTransport calls different operation statements. The previous request calls write operation, and now the processing response is read operation.

    void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        // If readable
        if (sockKey.isReadable()) {
            // Read the length of the response packet of the input buffer
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                               + Long.toHexString(sessionId)
                                               + ", likely server has closed socket");
            }
            // If the response packet of the input buffer is full, the same packet needs to be read twice, one for length and one for content
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                // If the length is read, allocate space for the packet length to the incomingBuffer
                if (incomingBuffer == lenBuffer) {
                    recvCount.getAndIncrement();
                    readLength();
                } else if (!initialized) {// If it is not initialized, that is, the session is not established, the server must return ConnectResponse
                    // Read the ConnectRequest and deserialize the contents of incomingBuffer into a ConnectResponse object
                    readConnectResult();
                    // Continue reading subsequent responses
                    enableRead();
                    // If there are write requests, ensure that the write event is ok
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    // Ready to read the next response
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    // Session setup completed
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        ......
        ......
 }

(2) Obviously, if the given scenario session is uninitialized at this time, call readConnectResult(), so you need to analyze how this method deserializes the contents of incomingBuffer into ConnectResponse object.

    void readConnectResult() throws IOException {
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b)).append(",");
            }
            buf.append("]");
            if (LOG.isTraceEnabled()) {
                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
            }
        }
        // Deserialize incomingBuffer into ConnectResponse
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // Read only flag bit
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }
        // Session ID returned by the server
        this.sessionId = conRsp.getSessionId();
        // Subsequent processing, initialize some parameters of the client, and finally trigger the WatchedEvent thread
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
    }

(3) After deserialization, you need to care about subsequent processing. At this time, you need to enter the onConnected method for analysis. onConnected is a specific method under ClientCnxn to initialize some parameters of the client in order to finally trigger the WatchedEvent thread.

        void onConnected(
            int _negotiatedSessionTimeout,
            long _sessionId,
            byte[] _sessionPasswd,
            boolean isRO) throws IOException {
            ......
            ......
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            // Initialize the session related parameters of the client
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
            seenRwServerBefore |= !isRO;
            LOG.info(
                "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
                clientCnxnSocket.getRemoteSocketAddress(),
                Long.toHexString(sessionId),
                negotiatedSessionTimeout,
                (isRO ? " (READ-ONLY mode)" : ""));
            //A SyncConnected event is triggered, and the EventThread asynchronously notifies the registered watcher to handle it
            KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
        }

(4) When the EventThread asynchronously notifies the registered watcher for processing, it will enter the queueEvent for processing.

        private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            // EventThread sync session state
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // materialize the watchers based on the event
                // Find out those watchers that need to be notified, and the main thread directly calls the corresponding watcher interface
                watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // queue the pair (watch set & event) for later processing
            // Commit asynchronous queue processing
            waitingEvents.add(pair);
        }

(5) At this time, the EventThread thread will execute run(), run to fetch the event from the waitingEvents queue, and call processEvent to process the event.

        public void run() {
            try {
                // Running
                isRunning = true;
                while (true) {
                    // Remove the event handler from the waiting event queue
                    Object event = waitingEvents.take();
                    // If it's a death
                    if (event == eventOfDeath) {
                        // eliminate
                        wasKilled = true;
                    } else {// Otherwise, handle the event
                        processEvent(event);
                    }
                    // If it's a death
                    if (wasKilled) {
                        // Synchronous processing
                        synchronized (waitingEvents) {
                            if (waitingEvents.isEmpty()) {
                                isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                LOG.error("Event thread exiting due to interruption", e);
            }

(6) Obviously, we are more concerned about how EventThread calls processEvent to handle events. Since processEvent has hundreds of lines of code, here we choose the important ones to explain. In addition, since the watcher is not the main part of this chapter, we will not elaborate here. If you need to know, you can go to this blog to check: https://blog.csdn.net/par_ser?spm=1001.2014.3001.5343

private void processEvent(Object event) {
            try {
                if (event instanceof WatcherSetEventPair) {
                    // Each wathcer handles events
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    for (Watcher watcher : pair.watchers) {
                        try {
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher ", t);
                        }
                    }
                    ......
                    ......
        }

summary

Here, the network communication process between the client and the server has been analyzed. The session between the client and the server has been established successfully, and then subsequent business processing can be carried out. This paper introduces the network communication process between ZooKeeper client and server, and explains in detail how the client and server interact with data. Although the given scenario is to establish a connection, other network communication business processes are similar to this process.

Keywords: Zookeeper

Added by dumbass on Tue, 21 Dec 2021 14:57:21 +0200