2021SC@SDUSC
preface
Next, we will enter the source code world to analyze step by step how the client and server establish network communication through ClientCnxn/ServerCnxn. This time, the content will be divided into three chapters. In this chapter, we will introduce how the client sends the request to the server. The latter two chapters will introduce how the server responds to the client's request and how the client operates after receiving the server's response.
Execution process of network communication in ZooKeeper
① In the constructor of ZooKeeper, the ClientCnxn interactive connection between the client and the server is created. Thus, the request sent by the client can be transmitted to the server through the interactive connection, and the createConnection method returns ClientCnxn.
// Create a client connection and initialize SendThread and EventThread cnxn = createConnection( connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); // Start SendThread and EventThread cnxn.start();
The specific code for creating a client connection is as follows:
protected ClientCnxn createConnection( // Client path String chrootPath, // Server HostProvider hostProvider, // session time out int sessionTimeout, ZooKeeper zooKeeper, // Client listener ClientWatchManager watcher, // Client connection Socket ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { return new ClientCnxn( chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly); }
② sendThread is the internal class of ClientCnxn and a thread in ZooKeeper. The core is the run() method.
(1) In the run() method, if the client connection does not start to be created, the startConnect() method in sendThread() will be called for asynchronous connection.
public void run() { clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; while (state.isAlive()) { try { // If the client connection is not connected if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } onConnecting(serverAddress); //Asynchronous connection startConnect(serverAddress); // Update now to start the connection timer right after we make a connection attempt clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } // If the client connection is already connected to the server if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent) { eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null)); if (state == States.AUTH_FAILED) { eventThread.queueEventOfDeath(); } } } // Next query timeout to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { // Decrement connection timeout to = connectTimeout - clientCnxnSocket.getIdleRecv(); } // If the session timed out, including the connection timed out if (to <= 0) { String warnInfo = String.format( "Client session timed out, have not heard from server in %dms for session id 0x%s", clientCnxnSocket.getIdleRecv(), Long.toHexString(sessionId)); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } // If the sending is idle, the Ping packet is sent if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If it is in read-only mode, look for the R/W server. If it is found, clean up the previous connection and reconnect to the R/W server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout); // Synchronously test whether the next server is an R/W server. If so, throw RWServerFoundException pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } //Process IO clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { // closing so this is expected LOG.warn( "An exception was thrown while closing send thread for session 0x{}.", Long.toHexString(getSessionId()), e); break; } else { LOG.warn( "Session 0x{} for server {}, Closing socket connection. " + "Attempting reconnect except it is a SessionExpiredException.", Long.toHexString(getSessionId()), serverAddress, e); // At this point, there might still be new packets appended to outgoingQueue. // they will be handled in next connection or cleared up if closed. cleanAndNotifyState(); } } } synchronized (state) { //Clean up the previous connection and find the next server connection cleanup(); } clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null)); ZooTrace.logTraceMessage( LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
(2) in startConnect(), clientCnxnSocket.connect(addr) is called asynchronous connection, and the default is the connection implemented by NIO.
private void startConnect(InetSocketAddress addr) throws IOException { // Initialize and create connection saslLoginFailed = false; if (!isFirstConnect) {//If this is not the first connection, try to sleep for a period of time and wake up try { Thread.sleep(ThreadLocalRandom.current().nextLong(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } //Change status to connected changeZkState(States.CONNECTING); // Host port String hostPort = addr.getHostString() + ":" + addr.getPort(); MDC.put("myid", hostPort); setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")")); if (clientConfig.isSaslClientEnabled()) { try { if (zooKeeperSaslClient != null) { zooKeeperSaslClient.shutdown(); } zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig); } catch (LoginException e) { LOG.warn( "SASL configuration failed. " + "Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it.", e); eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); // Start asynchronous connection clientCnxnSocket.connect(addr); }
(3) The connect() method is a specific method to create a connection. Here, the connection implemented by the default NIO is used for analysis. The specific implementation of connect() is in ClientCnxnSocketNIO. In the connect() method, the registerAndConnect(sock, addr) method is called to register connection events and try to connect.
void connect(InetSocketAddress addr) throws IOException { // Create client SocketChannel SocketChannel sock = createSock(); try { // Register connection events registerAndConnect(sock, addr); } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) { LOG.error("Unable to open socket to {}", addr); sock.close(); throw e; } //The session has not been initialized initialized = false; //Reset 2 read buffer s to prepare for the next read lenBuffer.clear(); incomingBuffer = lenBuffer; }
(4) If the connection is successful, the send.Thread.primeConnection() method will be called to initialize and create operations such as session.
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); //Try to connect boolean immediateConnect = sock.connect(addr); //If the connection is successful, call primeConnection() to create a session and other operations if (immediateConnect) { sendThread.primeConnection(); } }
(5) The primeConnection() method is mainly used to create a session and add the request sent by the client to the sending queue, that is, outgoingQueue.
void primeConnection() throws IOException { LOG.info( "Socket connection established, initiating session, client: {}, server: {}", clientCnxnSocket.getLocalSocketAddress(), clientCnxnSocket.getRemoteSocketAddress()); isFirstConnect = false; // The client sessionId defaults to 0 long sessId = (seenRwServerBefore) ? sessionId : 0; // Build connection request ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); // We add backwards since we are pushing into the front // Only send if there's a pending watch // TODO: here we have the only remaining use of zooKeeper in // this class. It's to be eliminated! if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) { List<String> dataWatches = zooKeeper.getDataWatches(); List<String> existWatches = zooKeeper.getExistWatches(); List<String> childWatches = zooKeeper.getChildWatches(); List<String> persistentWatches = zooKeeper.getPersistentWatches(); List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty() || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) { Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator(); Iterator<String> existWatchesIter = prependChroot(existWatches).iterator(); Iterator<String> childWatchesIter = prependChroot(childWatches).iterator(); Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator(); Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator(); long setWatchesLastZxid = lastZxid; while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext() || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) { List<String> dataWatchesBatch = new ArrayList<String>(); List<String> existWatchesBatch = new ArrayList<String>(); List<String> childWatchesBatch = new ArrayList<String>(); List<String> persistentWatchesBatch = new ArrayList<String>(); List<String> persistentRecursiveWatchesBatch = new ArrayList<String>(); int batchLength = 0; // Note, we may exceed our max length by a bit when we add the last // watch in the batch. This isn't ideal, but it makes the code simpler. while (batchLength < SET_WATCHES_MAX_LENGTH) { final String watch; if (dataWatchesIter.hasNext()) { watch = dataWatchesIter.next(); dataWatchesBatch.add(watch); } else if (existWatchesIter.hasNext()) { watch = existWatchesIter.next(); existWatchesBatch.add(watch); } else if (childWatchesIter.hasNext()) { watch = childWatchesIter.next(); childWatchesBatch.add(watch); } else if (persistentWatchesIter.hasNext()) { watch = persistentWatchesIter.next(); persistentWatchesBatch.add(watch); } else if (persistentRecursiveWatchesIter.hasNext()) { watch = persistentRecursiveWatchesIter.next(); persistentRecursiveWatchesBatch.add(watch); } else { break; } batchLength += watch.length(); } Record record; int opcode; if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) { // maintain compatibility with older servers - if no persistent/recursive watchers // are used, use the old version of SetWatches record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch); opcode = OpCode.setWatches; } else { record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch); opcode = OpCode.setWatches2; } RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode); Packet packet = new Packet(header, new ReplyHeader(), record, null, null); outgoingQueue.addFirst(packet); } } } for (AuthData id : authInfo) { outgoingQueue.addFirst( new Packet( new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth), null, new AuthPacket(0, id.scheme, id.data), null, null)); } // The Packet object combined into the network layer is added to the send queue. For ConnectRequest, its requestHeader is null outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); // The connectionPrimed() method encapsulates to ensure that read and write events can be monitored clientCnxnSocket.connectionPrimed(); LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress()); }
③ When the request sent by the client enters the send queue, the SendThread thread will start doTransport processing and send the request in the send queue to the server.
void doTransport( int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); // If the previous connection is not connected immediately, the OP is processed here_ Connect event if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } //If it is readable or writable, processing is performed } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) {// If the previous connection is already connected // If a packet that can be sent is found in the outgoing queue, it can be written if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } //Release resources selected.clear(); }
④ If we get the enableWrite() writable resource from doTransport(), we can send the request in the request queue to the server. SendThread executes the doIO() method in ClientCnxnSocketNIO.
//If writable if (sockKey.isWritable()) { // sendThread fetches the request packet from the send queue Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); // If the request package is not empty if (p != null) { // Modify last sending time updateLastSend(); // When sending a request packet, the buffer resource must be applied for, and the request packet is serialized to the buffer if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } //serialize p.createBB(); } //Write data sock.write(p.bb); //If there is no remaining data, i.e. the writing is completed, the transmission is successful if (!p.bb.hasRemaining()) { //Sent service package + 1 sentCount.getAndIncrement(); //Delete the request packet from the send queue outgoingQueue.removeFirstOccurrence(p); //If it is a business request, it will be added to the Pending queue to facilitate the corresponding processing of the server-side return. If it is other requests, it will be discarded after sending if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } //If the send queue is empty, the write permission is revoked if (outgoingQueue.isEmpty()) { disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) {//If it's not finished disableWrite(); } else { enableWrite(); } }
⑤ Because the first request packet is a ConnectRequest connection request packet, the packet it constructs has no header, so it is directly discarded after sending. However, SendThread also needs to listen to the return of the server to confirm the connection and initialize the session. As for how the server responds to the request, it will be introduced in the next chapter.
summary
The above network communication process: create a TCP connection according to ClientCnxn and send a ConnectRequest request packet to the server.