ZooKeeper source code analysis of the complete network communication process

2021SC@SDUSC

preface

Next, we will enter the source code world to analyze step by step how the client and server establish network communication through ClientCnxn/ServerCnxn. This time, the content will be divided into three chapters. In this chapter, we will introduce how the client sends the request to the server. The latter two chapters will introduce how the server responds to the client's request and how the client operates after receiving the server's response.

Execution process of network communication in ZooKeeper

① In the constructor of ZooKeeper, the ClientCnxn interactive connection between the client and the server is created. Thus, the request sent by the client can be transmitted to the server through the interactive connection, and the createConnection method returns ClientCnxn.

        // Create a client connection and initialize SendThread and EventThread  
        cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);
        // Start SendThread and EventThread 
        cnxn.start();

The specific code for creating a client connection is as follows:

	    protected ClientCnxn createConnection(
	    	// Client path
 	        String chrootPath,
 	        // Server
	        HostProvider hostProvider,
	        // session time out
	        int sessionTimeout,
	        ZooKeeper zooKeeper,
	        // Client listener
	        ClientWatchManager watcher,
	        // Client connection Socket
	        ClientCnxnSocket clientCnxnSocket,
	        boolean canBeReadOnly) throws IOException {
	        return new ClientCnxn(
	            chrootPath,
	            hostProvider,
	            sessionTimeout,
	            this,
	            watchManager,
	            clientCnxnSocket,
	            canBeReadOnly);
	    }

② sendThread is the internal class of ClientCnxn and a thread in ZooKeeper. The core is the run() method.

(1) In the run() method, if the client connection does not start to be created, the startConnect() method in sendThread() will be called for asynchronous connection.

        public void run() {
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;

            while (state.isAlive()) {
                try {
                    // If the client connection is not connected
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }
                        onConnecting(serverAddress);
                        //Asynchronous connection
                        startConnect(serverAddress);
                        // Update now to start the connection timer right after we make a connection attempt
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                    // If the client connection is already connected to the server
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                    LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                                    changeZkState(States.AUTH_FAILED);
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    changeZkState(States.AUTH_FAILED);
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent) {
                                eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                                if (state == States.AUTH_FAILED) {
                                    eventThread.queueEventOfDeath();
                                }
                            }
                        }
                        // Next query timeout
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        // Decrement connection timeout
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    // If the session timed out, including the connection timed out
                    if (to <= 0) {
                        String warnInfo = String.format(
                            "Client session timed out, have not heard from server in %dms for session id 0x%s",
                            clientCnxnSocket.getIdleRecv(),
                            Long.toHexString(sessionId));
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                    // If the sending is idle, the Ping packet is sent
                    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small
                        int timeToNextPing = readTimeout / 2
                                             - clientCnxnSocket.getIdleSend()
                                             - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }

                    // If it is in read-only mode, look for the R/W server. If it is found, clean up the previous connection and reconnect to the R/W server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                            // Synchronously test whether the next server is an R/W server. If so, throw RWServerFoundException
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
                    //Process IO
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        // closing so this is expected
                        LOG.warn(
                            "An exception was thrown while closing send thread for session 0x{}.",
                            Long.toHexString(getSessionId()),
                            e);
                        break;
                    } else {
                        LOG.warn(
                            "Session 0x{} for server {}, Closing socket connection. "
                                + "Attempting reconnect except it is a SessionExpiredException.",
                            Long.toHexString(getSessionId()),
                            serverAddress,
                            e);

                        // At this point, there might still be new packets appended to outgoingQueue.
                        // they will be handled in next connection or cleared up if closed.
                        cleanAndNotifyState();
                    }
                }
            }

            synchronized (state) {
                //Clean up the previous connection and find the next server connection
                cleanup();
            }
            clientCnxnSocket.close();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
            }
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.getTextTraceLevel(),
                "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
        }

(2) in startConnect(), clientCnxnSocket.connect(addr) is called asynchronous connection, and the default is the connection implemented by NIO.

        private void startConnect(InetSocketAddress addr) throws IOException {
            // Initialize and create connection
            saslLoginFailed = false;
            if (!isFirstConnect) {//If this is not the first connection, try to sleep for a period of time and wake up
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextLong(1000));
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected exception", e);
                }
            }
            //Change status to connected
            changeZkState(States.CONNECTING);
            // Host port
            String hostPort = addr.getHostString() + ":" + addr.getPort();
            MDC.put("myid", hostPort);
            setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
            if (clientConfig.isSaslClientEnabled()) {
                try {
                    if (zooKeeperSaslClient != null) {
                        zooKeeperSaslClient.shutdown();
                    }
                    zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);
                } catch (LoginException e) {
                    LOG.warn(
                        "SASL configuration failed. "
                            + "Will continue connection to Zookeeper server without "
                            + "SASL authentication, if Zookeeper server allows it.", e);
                    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);
            // Start asynchronous connection
            clientCnxnSocket.connect(addr);
        }

(3) The connect() method is a specific method to create a connection. Here, the connection implemented by the default NIO is used for analysis. The specific implementation of connect() is in ClientCnxnSocketNIO. In the connect() method, the registerAndConnect(sock, addr) method is called to register connection events and try to connect.

    void connect(InetSocketAddress addr) throws IOException {
        // Create client SocketChannel
        SocketChannel sock = createSock();
        try {
            // Register connection events
            registerAndConnect(sock, addr);
        } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
            LOG.error("Unable to open socket to {}", addr);
            sock.close();
            throw e;
        }
        //The session has not been initialized
        initialized = false;
        //Reset 2 read buffer s to prepare for the next read
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

(4) If the connection is successful, the send.Thread.primeConnection() method will be called to initialize and create operations such as session.

    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
        //Try to connect
        boolean immediateConnect = sock.connect(addr);
        //If the connection is successful, call primeConnection() to create a session and other operations
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

(5) The primeConnection() method is mainly used to create a session and add the request sent by the client to the sending queue, that is, outgoingQueue.

        void primeConnection() throws IOException {
            LOG.info(
                "Socket connection established, initiating session, client: {}, server: {}",
                clientCnxnSocket.getLocalSocketAddress(),
                clientCnxnSocket.getRemoteSocketAddress());
            isFirstConnect = false;
            // The client sessionId defaults to 0
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            // Build connection request
            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
            // We add backwards since we are pushing into the front
            // Only send if there's a pending watch
            // TODO: here we have the only remaining use of zooKeeper in
            // this class. It's to be eliminated!
            if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
                List<String> dataWatches = zooKeeper.getDataWatches();
                List<String> existWatches = zooKeeper.getExistWatches();
                List<String> childWatches = zooKeeper.getChildWatches();
                List<String> persistentWatches = zooKeeper.getPersistentWatches();
                List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
                if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
                        || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
                    Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                    Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                    Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                    Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
                    Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
                    long setWatchesLastZxid = lastZxid;

                    while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
                            || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
                        List<String> dataWatchesBatch = new ArrayList<String>();
                        List<String> existWatchesBatch = new ArrayList<String>();
                        List<String> childWatchesBatch = new ArrayList<String>();
                        List<String> persistentWatchesBatch = new ArrayList<String>();
                        List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
                        int batchLength = 0;

                        // Note, we may exceed our max length by a bit when we add the last
                        // watch in the batch. This isn't ideal, but it makes the code simpler.
                        while (batchLength < SET_WATCHES_MAX_LENGTH) {
                            final String watch;
                            if (dataWatchesIter.hasNext()) {
                                watch = dataWatchesIter.next();
                                dataWatchesBatch.add(watch);
                            } else if (existWatchesIter.hasNext()) {
                                watch = existWatchesIter.next();
                                existWatchesBatch.add(watch);
                            } else if (childWatchesIter.hasNext()) {
                                watch = childWatchesIter.next();
                                childWatchesBatch.add(watch);
                            }  else if (persistentWatchesIter.hasNext()) {
                                watch = persistentWatchesIter.next();
                                persistentWatchesBatch.add(watch);
                            } else if (persistentRecursiveWatchesIter.hasNext()) {
                                watch = persistentRecursiveWatchesIter.next();
                                persistentRecursiveWatchesBatch.add(watch);
                            } else {
                                break;
                            }
                            batchLength += watch.length();
                        }

                        Record record;
                        int opcode;
                        if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
                            // maintain compatibility with older servers - if no persistent/recursive watchers
                            // are used, use the old version of SetWatches
                            record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
                            opcode = OpCode.setWatches;
                        } else {
                            record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
                                    childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                            opcode = OpCode.setWatches2;
                        }
                        RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
                        Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                        outgoingQueue.addFirst(packet);
                    }
                }
            }

            for (AuthData id : authInfo) {
                outgoingQueue.addFirst(
                    new Packet(
                        new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
                        null,
                        new AuthPacket(0, id.scheme, id.data),
                        null,
                        null));
            }
            // The Packet object combined into the network layer is added to the send queue. For ConnectRequest, its requestHeader is null
            outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
            // The connectionPrimed() method encapsulates to ensure that read and write events can be monitored
            clientCnxnSocket.connectionPrimed();
            LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
        }

③ When the request sent by the client enters the send queue, the SendThread thread will start doTransport processing and send the request in the send queue to the server.

    void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            // If the previous connection is not connected immediately, the OP is processed here_ Connect event
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
                //If it is readable or writable, processing is performed
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {// If the previous connection is already connected
            // If a packet that can be sent is found in the outgoing queue, it can be written
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
        //Release resources
        selected.clear();
    }

④ If we get the enableWrite() writable resource from doTransport(), we can send the request in the request queue to the server. SendThread executes the doIO() method in ClientCnxnSocketNIO.

        //If writable
        if (sockKey.isWritable()) {
            // sendThread fetches the request packet from the send queue
            Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
            // If the request package is not empty
            if (p != null) {
                // Modify last sending time
                updateLastSend();
                // When sending a request packet, the buffer resource must be applied for, and the request packet is serialized to the buffer
                if (p.bb == null) {
                    if ((p.requestHeader != null)
                        && (p.requestHeader.getType() != OpCode.ping)
                        && (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    //serialize
                    p.createBB();
                }
                //Write data
                sock.write(p.bb);
                //If there is no remaining data, i.e. the writing is completed, the transmission is successful
                if (!p.bb.hasRemaining()) {
                    //Sent service package + 1
                    sentCount.getAndIncrement();
                    //Delete the request packet from the send queue
                    outgoingQueue.removeFirstOccurrence(p);
                    //If it is a business request, it will be added to the Pending queue to facilitate the corresponding processing of the server-side return. If it is other requests, it will be discarded after sending
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            //If the send queue is empty, the write permission is revoked
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {//If it's not finished
    
                disableWrite();
            } else {
                enableWrite();
            }
        }

⑤ Because the first request packet is a ConnectRequest connection request packet, the packet it constructs has no header, so it is directly discarded after sending. However, SendThread also needs to listen to the return of the server to confirm the connection and initialize the session. As for how the server responds to the request, it will be introduced in the next chapter.

summary

The above network communication process: create a TCP connection according to ClientCnxn and send a ConnectRequest request packet to the server.

Keywords: Zookeeper

Added by siko on Mon, 06 Dec 2021 04:20:25 +0200