Source code analysis of IO entrance and exit AbstractEndPoint

AbstractEndpoint is used as the basis for processing Socket IO. From the naming point of view, it means to process the data at both ends of the TCP connection, which is also the endpoint of communication between two machines.

AbstractEndpoint has different implementations for different IO models:

AbstractEndpoint source code

public abstract class AbstractEndpoint<S> {
    // -------------------------------------------------------------- Constants
    protected static final StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res");

    public static interface Handler {
        // Defined Socket status
        public enum SocketState {
            OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING_TOMCAT,
            UPGRADING, UPGRADED
        }
        // Global request processor
        public Object getGlobal();
        // reuse
        public void recycle();
    }
    // 
    protected enum BindState {
        UNBOUND, BOUND_ON_INIT, BOUND_ON_START
    }
    // Socket receiver, which is specially used to execute ServerSocket Accept() method
    public abstract static class Acceptor implements Runnable {
        public enum AcceptorState {
            NEW, RUNNING, PAUSED, ENDED
        }
        protected volatile AcceptorState state = AcceptorState.NEW;
        private String threadName;
    }
    private static final int INITIAL_ERROR_DELAY = 50;
    private static final int MAX_ERROR_DELAY = 1600;
    // ----------------------------------------------------------------- Fields
    /**
     * Running state of the endpoint.
     */
    protected volatile boolean running = false;
    protected volatile boolean paused = false;
    // Use internal thread pool
    protected volatile boolean internalExecutor = true;
    // Connection counter
    private volatile LimitLatch connectionLimitLatch = null;

    /**
     * Socket properties
     */
    protected SocketProperties socketProperties = new SocketProperties();
    // Receiver array (tomcat recommends no more than two, and tomcat defaults to one)
    protected Acceptor[] acceptors;
    // ----------------------------------------------------------------- Properties
    // When the endpoint stop s, the maximum time to wait for the execution of the internal thread pool is 5 seconds by default
    private long executorTerminationTimeoutMillis = 5000;
    // Acceptor[] acceptors receiver array size
    protected int acceptorThreadCount = 0;
    // maximum connection
    private int maxConnections = 10000;
    // Thread pool for internal processing socket
    private Executor executor = null;
    // tomcat listening port, ServerSocket port
    private int port;
    // Get the actual binding port of ServerSocket
    public abstract int getLocalPort();
    private InetAddress address;

    /**
     * Allows the server developer to specify the acceptCount (backlog) that
     * should be used for server sockets. By default, this value
     * is 100.
     */
    private int backlog = 100;

    // Whether to bind ServerSocket in init lifecycle method
    private boolean bindOnInit = true;
    private BindState bindState = BindState.UNBOUND;
    // Blocking time
    private Integer keepAliveTimeout = null;
    // Number of internal thread pool core threads
    private int minSpareThreads = 10;
    // Maximum number of threads in the internal thread pool
    private int maxThreads = 200;
    // Maximum number of http requests per long connection
    private int maxKeepAliveRequests=100; // as in Apache HTTPD server
    // The maximum number of headers per request is supported. 0 means unlimited
    private int maxHeaderCount = 100; // as in Apache HTTPD server
    /**
     * Name of the thread pool, which will be used for naming child threads.
     */
    private String name = "TP";
    // Is the thread of the connection pool a daemon thread
    private boolean daemon = true;


    public void setMaxConnections(int maxCon) {
        this.maxConnections = maxCon;
        LimitLatch latch = this.connectionLimitLatch;
        if (latch != null) {
            // Update the latch that enforces this
            if (maxCon == -1) {
                releaseConnectionLatch();
            } else {
                // Maximum number of threads
                latch.setLimit(maxCon);
            }
        } else if (maxCon > 0) {
            initializeConnectionLatch();
        }
    }

    public int getKeepAliveTimeout() {
        if (keepAliveTimeout == null) {
            return getSoTimeout();
        } else {
            return keepAliveTimeout.intValue();
        }
    }

    public void setMaxThreads(int maxThreads) {
        this.maxThreads = maxThreads;
        Executor executor = this.executor;
        if (internalExecutor && executor instanceof java.util.concurrent.ThreadPoolExecutor) {
            ((java.util.concurrent.ThreadPoolExecutor) executor).setMaximumPoolSize(maxThreads);
        }
    }

    protected abstract boolean getDeferAccept();
    // Property save collection
    protected HashMap<String, Object> attributes = new HashMap<String, Object>();
    public int getCurrentThreadCount() {
        Executor executor = this.executor;
        if (executor != null) {
            if (executor instanceof ThreadPoolExecutor) {
                return ((ThreadPoolExecutor) executor).getPoolSize();
            } else if (executor instanceof ResizableExecutor) {
                return ((ResizableExecutor) executor).getPoolSize();
            } else {
                return -1;
            }
        } else {
            return -2;
        }
    }
    // Gets the number of currently busy threads
    public int getCurrentThreadsBusy() {
        Executor executor = this.executor;
        if (executor != null) {
            if (executor instanceof ThreadPoolExecutor) {
                return ((ThreadPoolExecutor) executor).getActiveCount();
            } else if (executor instanceof ResizableExecutor) {
                return ((ResizableExecutor) executor).getActiveCount();
            } else {
                return -1;
            }
        } else {
            return -2;
        }
    }
    // Initialize internal thread pool
    public void createExecutor() {
        internalExecutor = true;
        // Task queue used
        TaskQueue taskqueue = new TaskQueue();
        // Daemon - is it a daemon thread
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }
    // Stop internal thread pool
    public void shutdownExecutor() {
        Executor executor = this.executor;
        if (executor != null && internalExecutor) {
            this.executor = null;
            if (executor instanceof ThreadPoolExecutor) {
                //this is our internal one, so we need to shut it down
                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
                tpe.shutdownNow();
                long timeout = getExecutorTerminationTimeoutMillis();
                if (timeout > 0) {
                    try {
                        // Wait to stop
                        tpe.awaitTermination(timeout, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                TaskQueue queue = (TaskQueue) tpe.getQueue();
                queue.setParent(null);
            }
        }
    }

    /**
     * Unlock the server socket accept using a bogus connection.
     */
    // Wake up the thread waiting on the accept() method
    protected void unlockAccept() {
        // Only try to unlock the acceptor if it is necessary
        boolean unlockRequired = false;
        for (Acceptor acceptor : acceptors) {
            if (acceptor.getState() == AcceptorState.RUNNING) {
                unlockRequired = true;
                break;
            }
        }
        if (!unlockRequired) {
            return;
        }

        java.net.Socket s = null;
        InetSocketAddress saddr = null;
        try {
            // Need to create a connection to unlock the accept();
            if (address == null) {
                saddr = new InetSocketAddress("localhost", getLocalPort());
            } else if (address.isAnyLocalAddress()) {
                saddr = new InetSocketAddress(getUnlockAddress(address), getLocalPort());
            } else {
                saddr = new InetSocketAddress(address, getLocalPort());
            }
            s = new java.net.Socket();
            int stmo = 2 * 1000;
            int utmo = 2 * 1000;
            if (getSocketProperties().getSoTimeout() > stmo) {
                stmo = getSocketProperties().getSoTimeout();
            }
            if (getSocketProperties().getUnlockTimeout() > utmo) {
                utmo = getSocketProperties().getUnlockTimeout();
            }
            s.setSoTimeout(stmo);
            // TODO Consider hard-coding to s.setSoLinger(true,0)
            s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime());
            s.connect(saddr,utmo);
            if (getDeferAccept()) {
                /*
                 * In the case of a deferred accept / accept filters we need to
                 * send data to wake up the accept. Send OPTIONS * to bypass
                 * even BSD accept filters. The Acceptor will discard it.
                 */
                OutputStreamWriter sw;

                sw = new OutputStreamWriter(s.getOutputStream(), "ISO-8859-1");
                sw.write("OPTIONS * HTTP/1.0\r\n" +
                         "User-Agent: Tomcat wakeup connection\r\n\r\n");
                sw.flush();
            }
            // Wait for upto 1000ms acceptor threads to unlock
            long waitLeft = 1000;
            for (Acceptor acceptor : acceptors) {
                while (waitLeft > 0 &&
                       acceptor.getState() == AcceptorState.RUNNING) {
                    Thread.sleep(5);
                    waitLeft -= 5;
                }
            }
        } catch(Exception e) {
        } finally {
            if (s != null) {
                try {
                    s.close();
                } catch (Exception e) {
                    // Ignore
                }
            }
        }
    }

    // ---------------------------------------------- Request processing methods

    public abstract void processSocketAsync(SocketWrapper<S> socketWrapper,
                                            SocketStatus socketStatus);

    public abstract void removeWaitingRequest(SocketWrapper<S> socketWrapper);


    // ------------------------------------------------------- Lifecycle methods
    public abstract void bind() throws Exception;
    public abstract void unbind() throws Exception;
    public abstract void startInternal() throws Exception;
    public abstract void stopInternal() throws Exception;

    public final void init() throws Exception {
        testServerCipherSuitesOrderSupport();
        if (bindOnInit) {
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
    }

    private void testServerCipherSuitesOrderSupport() {
        // Only test this feature if the user explicitly requested its use.
        if(!"".equals(getUseServerCipherSuitesOrder().trim())) {
            if (!JreCompat.isJre8Available()) {
                throw new UnsupportedOperationException(
                    sm.getString("endpoint.jsse.cannotHonorServerCipherOrder"));
            }
        }
    }

    public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

    // Open the thread that handles the accept() method. The default is one
    protected final void startAcceptorThreads() {
        
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            // Is it a daemon thread
            t.setDaemon(getDaemon());
            t.start();
        }
    }
    // Implementation classes provide concrete receivers
    protected abstract Acceptor createAcceptor();

    public void pause() {
        if (running && !paused) {
            paused = true;
            unlockAccept();
        }
    }
    public void resume() {
        if (running) {
            paused = false;
        }
    }

    public final void stop() throws Exception {
        stopInternal();
        if (bindState == BindState.BOUND_ON_START) {
            unbind();
            bindState = BindState.UNBOUND;
        }
    }

    public final void destroy() throws Exception {
        if (bindState == BindState.BOUND_ON_INIT) {
            unbind();
            bindState = BindState.UNBOUND;
        }
    }

    protected abstract Log getLog();
    // Flags to indicate optional feature support
    // Some of these are always hard-coded, some are hard-coded to false (i.e.
    // the endpoint does not support them) and some are configurable.
    // Some tag properties
    public abstract boolean getUseSendfile();
    public abstract boolean getUseComet();
    public abstract boolean getUseCometTimeout();
    public abstract boolean getUsePolling();

    // Connection data control method -- LimitLatch is realized through synchronizer AQS------------------------------------------------------------------
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) {
            return null;
        }
        if (connectionLimitLatch==null) {
            // Initialize maximum connection limit
            connectionLimitLatch = new LimitLatch(getMaxConnections());
        }
        return connectionLimitLatch;
    }
    // Connection data increase
    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) {
            return;
        }
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            latch.countUpOrAwait();
        }
    }
    // Reduced number of connections
    protected long countDownConnection() {
        if (maxConnections==-1) {
            return -1;
        }
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            long result = latch.countDown();
            return result;
        } else {
            return -1;
        }
    }
}

It is not difficult to see from its source code that AbstractEndpoint mainly defines public attributes and behaviors under various IO models

  1. Related properties of ServerSocket (binding port, ip, etc.)
  2. Number of receive threads processing the accept() method
  3. The thread pool for processing socket s and the related attributes of the thread pool (number of threads, whether it is a daemon thread) are defined internally
  4. The life cycle methods init(),start(), etc. are defined, and the specific implementation initInternal(), startInternal() method is handed over to the specific subclass implementation
  5. The abstract receiver of the Acceptor is defined, and the subclass implements the specific Acceptor to define how to receive the new socket connection

JioEndPoint implementation

Jiendpoint is the concrete implementation of AbstractEndpoint for traditional synchronous blocking IO (BIO),

From the class structure of JioEndPoint, it mainly implements the relevant abstract methods of AbstractEndPoint. ServerSocket is the socket server. At the same time, its implementation defines the specific receiver Acceptor and the SocketProcessor processor of how to process the received socket.

Let's take a look at some important methods of JioEndPoint itself

public class JIoEndpoint extends AbstractEndpoint<Socket> {

    protected ServerSocket serverSocket = null;
    // ------------------------------------------------------------- Properties
    protected Handler handler = null;
    public void setHandler(Handler handler ) { this.handler = handler; }
    public Handler getHandler() { return handler; }

    protected ServerSocketFactory serverSocketFactory = null;
    protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests = new ConcurrentLinkedQueue<SocketWrapper<Socket>>();
    @Override
    public int getLocalPort() {
        ServerSocket s = serverSocket;
        if (s == null) {
            return -1;
        } else {
            return s.getLocalPort();
        }
    }

    @Override
    public boolean getUseSendfile() { return false; } // Not supported
    @Override
    public boolean getUseComet() { return false; } // Not supported
    @Override
    public boolean getUseCometTimeout() { return false; } // Not supported
    @Override
    public boolean getDeferAccept() { return false; } // Not supported
    @Override
    public boolean getUsePolling() { return false; } // Not supported


    // ------------------------------------------------ Handler Inner Interface
    public interface Handler extends AbstractEndpoint.Handler {
        public SocketState process(SocketWrapper<Socket> socket,
                                   SocketStatus status);
        public SSLImplementation getSslImplementation();
        public boolean isAvailable(SocketWrapper<Socket> socket);
    }
    // -------------------- Public methods --------------------
    // Bind the port and start listening for connections
    @Override
    public void bind() throws Exception {

        // Initialize thread count defaults for acceptor
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        if (serverSocketFactory == null) {
            if (isSSLEnabled()) {
                serverSocketFactory = handler.getSslImplementation().getServerSocketFactory(this);
            } else {
                serverSocketFactory = new DefaultServerSocketFactory(this);
            }
        }

        if (serverSocket == null) {
            try {
                if (getAddress() == null) {
                    serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog());
                } else {
                    // Create ServerSocket and start listening
                    serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog(), getAddress());
                }
            } catch (BindException orig) {
                // ....
                throw be;
            }
        }
    }
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            // Create worker collection
            if (getExecutor() == null) {
                createExecutor();
            }

            initializeConnectionLatch();
            // Turn on the accept() receiver method
            startAcceptorThreads();

            // Start async timeout thread
            Thread timeoutThread = new Thread(new AsyncTimeout(), getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
    }

    @Override
    protected AbstractEndpoint.Acceptor createAcceptor() {
        return new Acceptor();
    }

    // Defines how to handle the received socket
    protected boolean processSocket(Socket socket) {
        // Process the request from this socket
        try {
            SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
            wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
            wrapper.setSecure(isSSLEnabled());
            // During shutdown, executor may be null - avoid NPE
            if (!running) {
                return false;
            }
            // After packaging, it is handed over to the thread pool for processing
            getExecutor().execute(new SocketProcessor(wrapper));
        } catch (RejectedExecutionException x) {
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            return false;
        }
        return true;
    }
}

Implementation of BIO receiver Acceptor

protected class Acceptor extends AbstractEndpoint.Acceptor {

    @Override
    public void run() {

        int errorDelay = 0;

        // Cyclic processing
        while (running) {

            // Loop if endpoint is paused
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            if (!running) {
                break;// Non operating state, exit
            }
            
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait, check whether the maximum connections are reached
                countUpOrAwaitConnection();

                Socket socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // Receive new socket
                    socket = serverSocketFactory.acceptSocket(serverSocket);
                } catch (IOException ioe) {
                    countDownConnection();
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (running && !paused && setSocketOptions(socket)) {
                    // Handling socket s
                    if (!processSocket(socket)) {
                        countDownConnection();
                        // Close socket right away
                        closeSocket(socket);
                    }
                } else {
                    countDownConnection();
                    // Close socket right away
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
}

The Acceptor is mainly used to handle the ServerSocket#accept() method according to the running state of the container and the number of connections, and hand over the received socket to a thread pool for processing.

How thread pool threads handle sockets is defined by SocketProcessor

How does the SocketProcessor handle sockets

protected class SocketProcessor implements Runnable {

    protected SocketWrapper<Socket> socket = null;
    protected SocketStatus status = null;
    public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
        this(socket);
        this.status = status;
    }
    @Override
    public void run() {
        boolean launch = false;
        synchronized (socket) {
            try {
                SocketState state = SocketState.OPEN;
				// ...  Omit some ssl related codes
                // Hand over the socket to a specific processor for processing
                if ((state != SocketState.CLOSED)) {
                    if (status == null) {
                        state = handler.process(socket, SocketStatus.OPEN_READ);
                    } else {
                        state = handler.process(socket,status);
                    }
                }
                
                // Other situations
                if (state == SocketState.CLOSED) {
                    countDownConnection();
                    try {
                        socket.getSocket().close();
                    } catch (IOException e) {
                        // Ignore
                    }
                } else if (state == SocketState.OPEN ||
                           state == SocketState.UPGRADING ||
                           state == SocketState.UPGRADING_TOMCAT  ||
                           state == SocketState.UPGRADED){
                    // Long connection
                    socket.setKeptAlive(true);
                    socket.access();
                    launch = true;
                } else if (state == SocketState.LONG) {
                    socket.access();
                    waitingRequests.add(socket);
                }
            } finally {
                if (launch) {
                    try {
                        getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                    } catch (RejectedExecutionException x) {
                        try {
                            //unable to handle connection at this time
                            handler.process(socket, SocketStatus.DISCONNECT);
                        } finally {
                            countDownConnection();
                        }
                    }
                }
            }
        }
        socket = null;
        // Finish up this request
    }
}

JioEndPoint.Handler

The specific Handler definition is the interface defined internally by JioEndPoint

public interface Handler extends AbstractEndpoint.Handler {
    public SocketState process(SocketWrapper<Socket> socket,  SocketStatus status);
    public SSLImplementation getSslImplementation();
    public boolean isAvailable(SocketWrapper<Socket> socket);
}

Inherit to abstractendpoint Handler, a tag interface, has different implementations according to different protocols, and is specialized in dealing with different protocols of BIO

At this point, the context of tomcat EndPoint is roughly clear. First, there are specific Endpoint implementations according to different IO models. For a certain IO model, there are different protocol implementations to deal with the received socket processor XXX Protocol Handler.

Keywords: Java Tomcat server

Added by z0mb1 on Tue, 28 Dec 2021 11:05:09 +0200