AbstractEndpoint is used as the basis for processing Socket IO. From the naming point of view, it means to process the data at both ends of the TCP connection, which is also the endpoint of communication between two machines.
AbstractEndpoint has different implementations for different IO models:
AbstractEndpoint source code
public abstract class AbstractEndpoint<S> { // -------------------------------------------------------------- Constants protected static final StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); public static interface Handler { // Defined Socket status public enum SocketState { OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING_TOMCAT, UPGRADING, UPGRADED } // Global request processor public Object getGlobal(); // reuse public void recycle(); } // protected enum BindState { UNBOUND, BOUND_ON_INIT, BOUND_ON_START } // Socket receiver, which is specially used to execute ServerSocket Accept() method public abstract static class Acceptor implements Runnable { public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED } protected volatile AcceptorState state = AcceptorState.NEW; private String threadName; } private static final int INITIAL_ERROR_DELAY = 50; private static final int MAX_ERROR_DELAY = 1600; // ----------------------------------------------------------------- Fields /** * Running state of the endpoint. */ protected volatile boolean running = false; protected volatile boolean paused = false; // Use internal thread pool protected volatile boolean internalExecutor = true; // Connection counter private volatile LimitLatch connectionLimitLatch = null; /** * Socket properties */ protected SocketProperties socketProperties = new SocketProperties(); // Receiver array (tomcat recommends no more than two, and tomcat defaults to one) protected Acceptor[] acceptors; // ----------------------------------------------------------------- Properties // When the endpoint stop s, the maximum time to wait for the execution of the internal thread pool is 5 seconds by default private long executorTerminationTimeoutMillis = 5000; // Acceptor[] acceptors receiver array size protected int acceptorThreadCount = 0; // maximum connection private int maxConnections = 10000; // Thread pool for internal processing socket private Executor executor = null; // tomcat listening port, ServerSocket port private int port; // Get the actual binding port of ServerSocket public abstract int getLocalPort(); private InetAddress address; /** * Allows the server developer to specify the acceptCount (backlog) that * should be used for server sockets. By default, this value * is 100. */ private int backlog = 100; // Whether to bind ServerSocket in init lifecycle method private boolean bindOnInit = true; private BindState bindState = BindState.UNBOUND; // Blocking time private Integer keepAliveTimeout = null; // Number of internal thread pool core threads private int minSpareThreads = 10; // Maximum number of threads in the internal thread pool private int maxThreads = 200; // Maximum number of http requests per long connection private int maxKeepAliveRequests=100; // as in Apache HTTPD server // The maximum number of headers per request is supported. 0 means unlimited private int maxHeaderCount = 100; // as in Apache HTTPD server /** * Name of the thread pool, which will be used for naming child threads. */ private String name = "TP"; // Is the thread of the connection pool a daemon thread private boolean daemon = true; public void setMaxConnections(int maxCon) { this.maxConnections = maxCon; LimitLatch latch = this.connectionLimitLatch; if (latch != null) { // Update the latch that enforces this if (maxCon == -1) { releaseConnectionLatch(); } else { // Maximum number of threads latch.setLimit(maxCon); } } else if (maxCon > 0) { initializeConnectionLatch(); } } public int getKeepAliveTimeout() { if (keepAliveTimeout == null) { return getSoTimeout(); } else { return keepAliveTimeout.intValue(); } } public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; Executor executor = this.executor; if (internalExecutor && executor instanceof java.util.concurrent.ThreadPoolExecutor) { ((java.util.concurrent.ThreadPoolExecutor) executor).setMaximumPoolSize(maxThreads); } } protected abstract boolean getDeferAccept(); // Property save collection protected HashMap<String, Object> attributes = new HashMap<String, Object>(); public int getCurrentThreadCount() { Executor executor = this.executor; if (executor != null) { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) executor).getPoolSize(); } else if (executor instanceof ResizableExecutor) { return ((ResizableExecutor) executor).getPoolSize(); } else { return -1; } } else { return -2; } } // Gets the number of currently busy threads public int getCurrentThreadsBusy() { Executor executor = this.executor; if (executor != null) { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) executor).getActiveCount(); } else if (executor instanceof ResizableExecutor) { return ((ResizableExecutor) executor).getActiveCount(); } else { return -1; } } else { return -2; } } // Initialize internal thread pool public void createExecutor() { internalExecutor = true; // Task queue used TaskQueue taskqueue = new TaskQueue(); // Daemon - is it a daemon thread TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); } // Stop internal thread pool public void shutdownExecutor() { Executor executor = this.executor; if (executor != null && internalExecutor) { this.executor = null; if (executor instanceof ThreadPoolExecutor) { //this is our internal one, so we need to shut it down ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; tpe.shutdownNow(); long timeout = getExecutorTerminationTimeoutMillis(); if (timeout > 0) { try { // Wait to stop tpe.awaitTermination(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // Ignore } } TaskQueue queue = (TaskQueue) tpe.getQueue(); queue.setParent(null); } } } /** * Unlock the server socket accept using a bogus connection. */ // Wake up the thread waiting on the accept() method protected void unlockAccept() { // Only try to unlock the acceptor if it is necessary boolean unlockRequired = false; for (Acceptor acceptor : acceptors) { if (acceptor.getState() == AcceptorState.RUNNING) { unlockRequired = true; break; } } if (!unlockRequired) { return; } java.net.Socket s = null; InetSocketAddress saddr = null; try { // Need to create a connection to unlock the accept(); if (address == null) { saddr = new InetSocketAddress("localhost", getLocalPort()); } else if (address.isAnyLocalAddress()) { saddr = new InetSocketAddress(getUnlockAddress(address), getLocalPort()); } else { saddr = new InetSocketAddress(address, getLocalPort()); } s = new java.net.Socket(); int stmo = 2 * 1000; int utmo = 2 * 1000; if (getSocketProperties().getSoTimeout() > stmo) { stmo = getSocketProperties().getSoTimeout(); } if (getSocketProperties().getUnlockTimeout() > utmo) { utmo = getSocketProperties().getUnlockTimeout(); } s.setSoTimeout(stmo); // TODO Consider hard-coding to s.setSoLinger(true,0) s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime()); s.connect(saddr,utmo); if (getDeferAccept()) { /* * In the case of a deferred accept / accept filters we need to * send data to wake up the accept. Send OPTIONS * to bypass * even BSD accept filters. The Acceptor will discard it. */ OutputStreamWriter sw; sw = new OutputStreamWriter(s.getOutputStream(), "ISO-8859-1"); sw.write("OPTIONS * HTTP/1.0\r\n" + "User-Agent: Tomcat wakeup connection\r\n\r\n"); sw.flush(); } // Wait for upto 1000ms acceptor threads to unlock long waitLeft = 1000; for (Acceptor acceptor : acceptors) { while (waitLeft > 0 && acceptor.getState() == AcceptorState.RUNNING) { Thread.sleep(5); waitLeft -= 5; } } } catch(Exception e) { } finally { if (s != null) { try { s.close(); } catch (Exception e) { // Ignore } } } } // ---------------------------------------------- Request processing methods public abstract void processSocketAsync(SocketWrapper<S> socketWrapper, SocketStatus socketStatus); public abstract void removeWaitingRequest(SocketWrapper<S> socketWrapper); // ------------------------------------------------------- Lifecycle methods public abstract void bind() throws Exception; public abstract void unbind() throws Exception; public abstract void startInternal() throws Exception; public abstract void stopInternal() throws Exception; public final void init() throws Exception { testServerCipherSuitesOrderSupport(); if (bindOnInit) { bind(); bindState = BindState.BOUND_ON_INIT; } } private void testServerCipherSuitesOrderSupport() { // Only test this feature if the user explicitly requested its use. if(!"".equals(getUseServerCipherSuitesOrder().trim())) { if (!JreCompat.isJre8Available()) { throw new UnsupportedOperationException( sm.getString("endpoint.jsse.cannotHonorServerCipherOrder")); } } } public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); } // Open the thread that handles the accept() method. The default is one protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); // Is it a daemon thread t.setDaemon(getDaemon()); t.start(); } } // Implementation classes provide concrete receivers protected abstract Acceptor createAcceptor(); public void pause() { if (running && !paused) { paused = true; unlockAccept(); } } public void resume() { if (running) { paused = false; } } public final void stop() throws Exception { stopInternal(); if (bindState == BindState.BOUND_ON_START) { unbind(); bindState = BindState.UNBOUND; } } public final void destroy() throws Exception { if (bindState == BindState.BOUND_ON_INIT) { unbind(); bindState = BindState.UNBOUND; } } protected abstract Log getLog(); // Flags to indicate optional feature support // Some of these are always hard-coded, some are hard-coded to false (i.e. // the endpoint does not support them) and some are configurable. // Some tag properties public abstract boolean getUseSendfile(); public abstract boolean getUseComet(); public abstract boolean getUseCometTimeout(); public abstract boolean getUsePolling(); // Connection data control method -- LimitLatch is realized through synchronizer AQS------------------------------------------------------------------ protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) { return null; } if (connectionLimitLatch==null) { // Initialize maximum connection limit connectionLimitLatch = new LimitLatch(getMaxConnections()); } return connectionLimitLatch; } // Connection data increase protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) { return; } LimitLatch latch = connectionLimitLatch; if (latch!=null) { latch.countUpOrAwait(); } } // Reduced number of connections protected long countDownConnection() { if (maxConnections==-1) { return -1; } LimitLatch latch = connectionLimitLatch; if (latch!=null) { long result = latch.countDown(); return result; } else { return -1; } } }
It is not difficult to see from its source code that AbstractEndpoint mainly defines public attributes and behaviors under various IO models
- Related properties of ServerSocket (binding port, ip, etc.)
- Number of receive threads processing the accept() method
- The thread pool for processing socket s and the related attributes of the thread pool (number of threads, whether it is a daemon thread) are defined internally
- The life cycle methods init(),start(), etc. are defined, and the specific implementation initInternal(), startInternal() method is handed over to the specific subclass implementation
- The abstract receiver of the Acceptor is defined, and the subclass implements the specific Acceptor to define how to receive the new socket connection
JioEndPoint implementation
Jiendpoint is the concrete implementation of AbstractEndpoint for traditional synchronous blocking IO (BIO),
From the class structure of JioEndPoint, it mainly implements the relevant abstract methods of AbstractEndPoint. ServerSocket is the socket server. At the same time, its implementation defines the specific receiver Acceptor and the SocketProcessor processor of how to process the received socket.
Let's take a look at some important methods of JioEndPoint itself
public class JIoEndpoint extends AbstractEndpoint<Socket> { protected ServerSocket serverSocket = null; // ------------------------------------------------------------- Properties protected Handler handler = null; public void setHandler(Handler handler ) { this.handler = handler; } public Handler getHandler() { return handler; } protected ServerSocketFactory serverSocketFactory = null; protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests = new ConcurrentLinkedQueue<SocketWrapper<Socket>>(); @Override public int getLocalPort() { ServerSocket s = serverSocket; if (s == null) { return -1; } else { return s.getLocalPort(); } } @Override public boolean getUseSendfile() { return false; } // Not supported @Override public boolean getUseComet() { return false; } // Not supported @Override public boolean getUseCometTimeout() { return false; } // Not supported @Override public boolean getDeferAccept() { return false; } // Not supported @Override public boolean getUsePolling() { return false; } // Not supported // ------------------------------------------------ Handler Inner Interface public interface Handler extends AbstractEndpoint.Handler { public SocketState process(SocketWrapper<Socket> socket, SocketStatus status); public SSLImplementation getSslImplementation(); public boolean isAvailable(SocketWrapper<Socket> socket); } // -------------------- Public methods -------------------- // Bind the port and start listening for connections @Override public void bind() throws Exception { // Initialize thread count defaults for acceptor if (acceptorThreadCount == 0) { acceptorThreadCount = 1; } if (serverSocketFactory == null) { if (isSSLEnabled()) { serverSocketFactory = handler.getSslImplementation().getServerSocketFactory(this); } else { serverSocketFactory = new DefaultServerSocketFactory(this); } } if (serverSocket == null) { try { if (getAddress() == null) { serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog()); } else { // Create ServerSocket and start listening serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog(), getAddress()); } } catch (BindException orig) { // .... throw be; } } } @Override public void startInternal() throws Exception { if (!running) { running = true; paused = false; // Create worker collection if (getExecutor() == null) { createExecutor(); } initializeConnectionLatch(); // Turn on the accept() receiver method startAcceptorThreads(); // Start async timeout thread Thread timeoutThread = new Thread(new AsyncTimeout(), getName() + "-AsyncTimeout"); timeoutThread.setPriority(threadPriority); timeoutThread.setDaemon(true); timeoutThread.start(); } } @Override protected AbstractEndpoint.Acceptor createAcceptor() { return new Acceptor(); } // Defines how to handle the received socket protected boolean processSocket(Socket socket) { // Process the request from this socket try { SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket); wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); wrapper.setSecure(isSSLEnabled()); // During shutdown, executor may be null - avoid NPE if (!running) { return false; } // After packaging, it is handed over to the thread pool for processing getExecutor().execute(new SocketProcessor(wrapper)); } catch (RejectedExecutionException x) { return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); return false; } return true; } }
Implementation of BIO receiver Acceptor
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Cyclic processing while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break;// Non operating state, exit } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait, check whether the maximum connections are reached countUpOrAwaitConnection(); Socket socket = null; try { // Accept the next incoming connection from the server // Receive new socket socket = serverSocketFactory.acceptSocket(serverSocket); } catch (IOException ioe) { countDownConnection(); // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused && setSocketOptions(socket)) { // Handling socket s if (!processSocket(socket)) { countDownConnection(); // Close socket right away closeSocket(socket); } } else { countDownConnection(); // Close socket right away closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } }
The Acceptor is mainly used to handle the ServerSocket#accept() method according to the running state of the container and the number of connections, and hand over the received socket to a thread pool for processing.
How thread pool threads handle sockets is defined by SocketProcessor
How does the SocketProcessor handle sockets
protected class SocketProcessor implements Runnable { protected SocketWrapper<Socket> socket = null; protected SocketStatus status = null; public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) { this(socket); this.status = status; } @Override public void run() { boolean launch = false; synchronized (socket) { try { SocketState state = SocketState.OPEN; // ... Omit some ssl related codes // Hand over the socket to a specific processor for processing if ((state != SocketState.CLOSED)) { if (status == null) { state = handler.process(socket, SocketStatus.OPEN_READ); } else { state = handler.process(socket,status); } } // Other situations if (state == SocketState.CLOSED) { countDownConnection(); try { socket.getSocket().close(); } catch (IOException e) { // Ignore } } else if (state == SocketState.OPEN || state == SocketState.UPGRADING || state == SocketState.UPGRADING_TOMCAT || state == SocketState.UPGRADED){ // Long connection socket.setKeptAlive(true); socket.access(); launch = true; } else if (state == SocketState.LONG) { socket.access(); waitingRequests.add(socket); } } finally { if (launch) { try { getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); } catch (RejectedExecutionException x) { try { //unable to handle connection at this time handler.process(socket, SocketStatus.DISCONNECT); } finally { countDownConnection(); } } } } } socket = null; // Finish up this request } }
JioEndPoint.Handler
The specific Handler definition is the interface defined internally by JioEndPoint
public interface Handler extends AbstractEndpoint.Handler { public SocketState process(SocketWrapper<Socket> socket, SocketStatus status); public SSLImplementation getSslImplementation(); public boolean isAvailable(SocketWrapper<Socket> socket); }
Inherit to abstractendpoint Handler, a tag interface, has different implementations according to different protocols, and is specialized in dealing with different protocols of BIO
At this point, the context of tomcat EndPoint is roughly clear. First, there are specific Endpoint implementations according to different IO models. For a certain IO model, there are different protocol implementations to deal with the received socket processor XXX Protocol Handler.