Life cycle of JDK HttpClient single request

Life cycle of HttpClient single request

1. Brief introduction

In the previous part, we briefly understood the request processing flow of HttpClient through the flow chart and sequence diagram, and focused on its modification of user requests and the processing of multiple requests - response exchange that may be triggered by a user request. In this article, we take the most basic http1 1 as an example, go deep into the processing process of a single request and witness its complete life course.

This article is the core of HttpClient source code analysis. We will see the management and reuse of connections, the reading and writing of channel s, and the use of responsive streams.

The HttpClient described in this article refers to the built-in HttpClient and related classes from JDK11. The source code analysis is based on JAVA 17. To read this article, you need to understand the principle and use of the Reactive Streams specification and the corresponding JAVA Flow api.

2. uml diagram

For convenience, let's review the flow chart and sequence diagram of HttpClient sending request again:

The following is the uml class diagram of Http1Exchange:

3. Establishment, reuse and degradation of HTTP connection

In the process of a single request, the first operation is to establish an Http connection. We mainly focus on Http1 connection. The connection process can be briefly summarized as follows:

  • Instantiate different exchange impl according to the request type and be responsible for the specific request response process
  • Determine the version of the HTTP connection to be instantiated according to the exchange type; Try to obtain the existing connection of the corresponding route from the connection pool according to the request type
  • If the connection cannot be obtained from the connection pool, instantiate the corresponding Http connection (in the most basic Http 1.1 connection, the NIOSocket channel will be opened and wrapped into the pipeline)
  • If the initialized connection instance is Http2 and negotiation finds that it is not supported, it will be degraded to establish an Http1 connection

We will see that HttpClient can switch freely between two different versions of Http1 and Http2. Depending on whether SSL encryption is performed, HttpClient will instantiate different subclasses of HttpConnection. If it is an Http2 connection, an Http2Connection instance combining instances of this subclass will be responsible for the management of Http2 connections. Otherwise, the subclass of HttpConnection manages the connection itself.

3.1 establishment and reuse of calling process and connection

Next is the specific analysis. We review the responseAsyncImpl method of MultiExchange analyzed in the previous article. This method is responsible for filtering user requests, delegating them to the Exchange class for processing, accepting responses and processing multiple requests.

//The process of processing one or more requests brought by a user request and returning a final response
    private CompletableFuture<Response> responseAsyncImpl() {
        CompletableFuture<Response> cf;
       //Omit
            Exchange<T> exch = getExchange();
            // 2. get response
            // The single exchange object is responsible for processing the current single request and returning the response asynchronously
            //This is the method we will analyze
            cf = exch.responseAsync()
                     .thenCompose((Response response) -> {
                       //Omit
        }
        return cf;
    }

Now, let's turn our attention to the handling of a request response by the Exchange class. We focus on the Exchange::responseAsync method:

/**
 * One request/response exchange (handles 100/101 intermediate response also).
 * depth field used to track number of times a new request is being sent
 * for a given API request. If limit exceeded exception is thrown.
 *
 * Security check is performed here:
 * - uses AccessControlContext captured at API level
 * - checks for appropriate URLPermission for request
 * - if permission allowed, grants equivalent SocketPermission to call
 * - in case of direct HTTP proxy, checks additionally for access to proxy
 *    (CONNECT proxying uses its own Exchange, so check done there)
 *
 */
final class Exchange<T> {   
    //Here is the display of member variables of Exchange class
    final HttpRequestImpl request;
    final HttpClientImpl client;
    //Exchange impl Abstract member. The specific type is determined according to the connection type
    volatile ExchangeImpl<T> exchImpl;
    volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
    volatile CompletableFuture<Void> bodyIgnored;

    // used to record possible cancellation raised before the exchImpl
    // has been established.
    private volatile IOException failed;
    @SuppressWarnings("removal")
    final AccessControlContext acc;
    final MultiExchange<T> multi;
    final Executor parentExecutor;
    volatile boolean upgrading; // to HTTP/2
    volatile boolean upgraded;  // to HTTP/2
    final PushGroup<T> pushGroup;
    final String dbgTag;
    
  	//...... Omit a large number of codes
    
    //In the above, the method called by MultiExchange
     // Completed HttpResponse will be null if response succeeded
    // will be a non null responseAsync if expect continue returns an error
    public CompletableFuture<Response> responseAsync() {
        return responseAsyncImpl(null);
    }

    //Overloaded method called by the above method
    CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
        SecurityException e = checkPermissions();
        if (e != null) {
            return MinimalFuture.failedFuture(e);
        } else {
            return responseAsyncImpl0(connection);
        }
    }
    
    //We need to focus on the actual treatment methods.
    CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
        //An operation to be performed after passing 407 error verification (proxy authentication failed) is declared here
        Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
        bodyIgnored = null;
        if (request.expectContinue()) {
            request.addSystemHeader("Expect", "100-Continue");
            Log.logTrace("Sending Expect: 100-Continue");
            // wait for 100-Continue before sending body
            // If expectContinue() is set for the build request, a confirmation request waiting for 100 response status code will be sent after passing the 407 verification
            after407Check = this::expectContinue;
        } else {
            // send request body and proceed.   In most cases, the request body is sent directly after passing 407 verification
            after407Check = this::sendRequestBody;
        }
        // The ProxyAuthorizationRequired can be triggered either by
        // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
        // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
        // Therefore we handle it with a call to this checkFor407(...) after these
        // two places.
        Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
                (ex) -> ex.sendHeadersAsync()
                        .handle((r,t) -> this.checkFor407(r, t, after407Check))
                        .thenCompose(Function.identity());
        return establishExchange(connection)	//First establish a connection
            //Verify whether 407 error occurs. Otherwise, execute the afterExch407Check above, that is, send the request header, verify the 407 error again, and then execute after407check
                .handle((r,t) -> this.checkFor407(r,t, afterExch407Check))   
                .thenCompose(Function.identity());
    }
    

}

As you can see, the Exchange::responseAsyncImpl0 method actually handles the request. The process here is as you can see in the flowchart:

  1. Attempt to establish connection
  2. Verify that no 407 error has occurred
  3. Send request header
  4. Verify again that a 407 error has occurred
  5. Send 100 confirmation request / send request body

We first focus on the connection establishment process: exchange estableExchange(connection)

 // get/set the exchange impl, solving race condition issues with
    // potential concurrent calls to cancel() or cancel(IOException)
    private CompletableFuture<? extends ExchangeImpl<T>>
    establishExchange(HttpConnection connection) {
        if (debug.on()) {
            debug.log("establishing exchange for %s,%n\t proxy=%s",
                      request, request.proxy());
        }
        //Check whether the request has been cancelled
        // check if we have been cancelled first.
        Throwable t = getCancelCause();
        checkCancelled();
        if (t != null) {
            if (debug.on()) {
                debug.log("exchange was cancelled: returned failed cf (%s)", String.valueOf(t));
            }
            return exchangeCF = MinimalFuture.failedFuture(t);
        }

        CompletableFuture<? extends ExchangeImpl<T>> cf, res;
        //Note that this is the key. The exchangeimpl abstract class is returned asynchronously. It has three subclasses, which can be determined according to the request type
        //We will analyze this method, which implements the creation and reuse of class connections
        cf = ExchangeImpl.get(this, connection);
        // We should probably use a VarHandle to get/set exchangeCF
        // instead - as we need CAS semantics.
        synchronized (this) { exchangeCF = cf; };
        res = cf.whenComplete((r,x) -> {
            synchronized(Exchange.this) {
                if (exchangeCF == cf) exchangeCF = null;
            }
        });
        checkCancelled();
        return res.thenCompose((eimpl) -> {
                    // recheck for cancelled, in case of race conditions
                    exchImpl = eimpl;
                    IOException tt = getCancelCause();
                    checkCancelled();
                    if (tt != null) {
                        return MinimalFuture.failedFuture(tt);
                    } else {
                        // Now we're good to go. Because exchImpl is no longer
                        // null cancel() will be able to propagate directly to
                        // the impl after this point ( if needed ).
                        return MinimalFuture.completedFuture(eimpl);
                    } });
    }

We can see that the static method get(Exchange, Connection) of ExchangeImpl asynchronously returns its specific implementation class (object).

After entering the get static method, we can see that different HTTP subclasses are instantiated according to the current exchange version (HTTP version). If we specify the version number of the HTTP client or request when calling:

HttpClient client = HttpClient.newBuilder()
    .version(HttpClient.Version.HTTP_1_1) //Specify the client as http1 Version 1
    .build();    
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
    .version(HttpClient.Version.HTTP_1_1)  //Or specify the requested version number as http1 one
    .GET().build();

Then, in the following get method, Http1 exchange will be instantiated: Http1Exchange. Otherwise, the default attempt is to establish Http2 exchange: Stream.

    /**
     * Initiates a new exchange and assigns it to a connection if one exists
     * already. connection usually null.
     */
    static <U> CompletableFuture<? extends ExchangeImpl<U>>
    get(Exchange<U> exchange, HttpConnection connection)
    {
        if (exchange.version() == HTTP_1_1) {
            if (debug.on())
                debug.log("get: HTTP/1.1: new Http1Exchange");
            //Create Http1 exchange
            return createHttp1Exchange(exchange, connection);
        } else {
            Http2ClientImpl c2 = exchange.client().client2(); // #### improve
            HttpRequestImpl request = exchange.request();
           	//Get Http2 connection
            CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request, exchange);
            if (debug.on())
                debug.log("get: Trying to get HTTP/2 connection");
            // local variable required here; see JDK-8223553
            //Create Http2 exchange
            CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
                c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
            return fxi.thenCompose(x->x);
        }
    }

We assume that http1.0 is specified when calling 1 version number, continue to focus on the creation and connection establishment process of Exchange. The createHttp1Exchange method calls the constructor of Http1Exchange, and we follow:

Http1Exchange(Exchange<T> exchange, HttpConnection connection)
        throws IOException
    {
        super(exchange);
        this.request = exchange.request();
        this.client = exchange.client();
        this.executor = exchange.executor();
        this.operations = new LinkedList<>();
        operations.add(headersSentCF);
        operations.add(bodySentCF);
        if (connection != null) {
            this.connection = connection;
        } else {
            InetSocketAddress addr = request.getAddress();
            //Get connection
            this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
        }
        this.requestAction = new Http1Request(request, this);
        this.asyncReceiver = new Http1AsyncReceiver(executor, this);
    }

We can see that Http1Exchange maintains the reference of abstract connection and obtains the specific connection in the construction method. According to different connection types, HttpConnection has a total of 6 implementation classes. The difference between them is whether SSL or proxy is used. It is worth noting that Http2Connection is not in this system. It internally combines an abstract member of HttpConnection. This shows that Http2Connection actually modifies HttpConnection.

Let's go back to Http1. Pay attention to the method HttpConnection::getConnection to get a connection that appears in the Http1Exchange constructor.

/**
     * Factory for retrieving HttpConnections. A connection can be retrieved
     * from the connection pool, or a new one created if none available.
     *
     * The given {@code addr} is the ultimate destination. Any proxies,
     * etc, are determined from the request. Returns a concrete instance which
     * is one of the following:
     *      {@link PlainHttpConnection}
     *      {@link PlainTunnelingConnection}
     *
     * The returned connection, if not from the connection pool, must have its,
     * connect() or connectAsync() method invoked, which ( when it completes
     * successfully ) renders the connection usable for requests.
     */
    public static HttpConnection getConnection(InetSocketAddress addr,
                                               HttpClientImpl client,
                                               HttpRequestImpl request,
                                               Version version) {
        // The default proxy selector may select a proxy whose  address is
        // unresolved. We must resolve the address before connecting to it.
        InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
        HttpConnection c = null;
        //The connection type is determined according to whether the request is encrypted or not
        boolean secure = request.secure();
        ConnectionPool pool = client.connectionPool();

        if (!secure) {
            //Unencrypted connection
            //Trying to get from connection pool
            c = pool.getConnection(false, addr, proxy);
            if (c != null && c.checkOpen() /* may have been eof/closed when in the pool */) {
                final HttpConnection conn = c;
                if (DEBUG_LOGGER.on())
                    DEBUG_LOGGER.log(conn.getConnectionFlow()
                                     + ": plain connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                //The connection cannot be obtained from the connection pool. Create a new connection
                return getPlainConnection(addr, proxy, request, client);
            }
        } else {  // secure
            if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
                //Http1 with agent 1 link
                c = pool.getConnection(true, addr, proxy);
            }
            if (c != null && c.isOpen()) {
                final HttpConnection conn = c;
                if (DEBUG_LOGGER.on())
                    DEBUG_LOGGER.log(conn.getConnectionFlow()
                                     + ": SSL connection retrieved from HTTP/1.1 pool");
                return c;
            } else {
                String[] alpn = null;
                if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
                    alpn = new String[] { "h2", "http/1.1" };
                }
                //Create SSL connection
                return getSSLConnection(addr, proxy, alpn, request, client);
            }
        }
    }

It can be seen that the connection acquisition process uses pooling technology. First, try to obtain connections from the connection pool, and then create new connections if you can't obtain them. The advantage of using connection pool is not to reduce the time of object creation, but to greatly reduce the time overhead of "three handshakes" of TCP connections.

So, http1 How are connections cached and reused? We can focus on the connection pool class. The connection pool is initialized when the client is initialized. It uses a hash table internally to maintain the relationship between the route and the previously established HTTP connection list. The encrypted connection is stored in the HashMap named sslPool, while the ordinary connection is stored in the plainPool. When fetching a connection, the request address and proxy address information are combined into a cache key, and the corresponding first connection is fetched from the hash table according to the key and returned to the caller.

/**
 * Http 1.1 connection pool.
 */
final class ConnectionPool {

    //Default keepalive time of 20 minutes
    static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
            "jdk.httpclient.keepalive.timeout", 1200); // seconds
    //There is no limit on the size of the connection pool
    static final long MAX_POOL_SIZE = Utils.getIntegerNetProperty(
            "jdk.httpclient.connectionPoolSize", 0); // unbounded
    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

    // Pools of idle connections
    //Hash table is used to maintain the mapping relationship between routing and Http connection
    private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
    private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
    private final ExpiryList expiryList;
    private final String dbgTag; // used for debug
    boolean stopped;

    /**
    Route in connection pool - cache key of connection mapping table. The combination of destination address and proxy address is used as the cache key.
     * Entries in connection pool are keyed by destination address and/or
     * proxy address:
     * case 1: plain TCP not via proxy (destination only)
     * case 2: plain TCP via proxy (proxy only)
     * case 3: SSL not via proxy (destination only)
     * case 4: SSL over tunnel (destination and proxy)
     */
    static class CacheKey {
        final InetSocketAddress proxy;
        final InetSocketAddress destination;

        CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
            this.proxy = proxy;
            this.destination = destination;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            final CacheKey other = (CacheKey) obj;
            if (!Objects.equals(this.proxy, other.proxy)) {
                return false;
            }
            if (!Objects.equals(this.destination, other.destination)) {
                return false;
            }
            return true;
        }

        @Override
        public int hashCode() {
            return Objects.hash(proxy, destination);
        }
    }

    ConnectionPool(long clientId) {
        this("ConnectionPool("+clientId+")");
    }

    /**
     * There should be one of these per HttpClient.
     */
    private ConnectionPool(String tag) {
        dbgTag = tag;
        plainPool = new HashMap<>();
        sslPool = new HashMap<>();
        expiryList = new ExpiryList();
    }

    //Omit some codes

    //Method to get connection from connection pool
    synchronized HttpConnection getConnection(boolean secure,
                                              InetSocketAddress addr,
                                              InetSocketAddress proxy) {
        if (stopped) return null;
        // for plain (unsecure) proxy connection the destination address is irrelevant.
        addr = secure || proxy == null ? addr : null;
        CacheKey key = new CacheKey(addr, proxy);
        HttpConnection c = secure ? findConnection(key, sslPool)
                                  : findConnection(key, plainPool);
        //System.out.println ("getConnection returning: " + c);
        assert c == null || c.isSecure() == secure;
        return c;
    }
    
    private HttpConnection findConnection(CacheKey key,
                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
        //Get the corresponding connection list from the connection pool
        LinkedList<HttpConnection> l = pool.get(key);
        if (l == null || l.isEmpty()) {
            return null;
        } else {
            //The first connection corresponding to the request address is the oldest connection
            HttpConnection c = l.removeFirst();
            //Remove this connection from the expiration list
            expiryList.remove(c);
            return c;
        }
    }
    
    //Temporarily omitted
}

Above is http1 1. The acquisition of Http2 connection is different. It forms scheme::host::port into a string and takes it from the pool maintained by itself. Not here.

After that, our analysis is based on http1 1. Based on PlainHttpConnection.

We return to the getPlainConnection method of HttpConnection, which is called when the connection cannot be retrieved from the connection pool or the extracted connection has been closed. The purpose of this method is to get a new connection. You can see that different connections will be instantiated according to the request type and whether there is an agent:

    private static HttpConnection getPlainConnection(InetSocketAddress addr,
                                                     InetSocketAddress proxy,
                                                     HttpRequestImpl request,
                                                     HttpClientImpl client) {
        if (request.isWebSocket() && proxy != null)
            return new PlainTunnelingConnection(addr, proxy, client,
                                                proxyTunnelHeaders(request));

        if (proxy == null)
            //Create the most basic Http connection
            return new PlainHttpConnection(addr, client);
        else
            return new PlainProxyConnection(proxy, client);
    }

We enter the constructor of PlainHttpConnection:

/**
 * Plain raw TCP connection direct to destination.
 * The connection operates in asynchronous non-blocking mode.
 * All reads and writes are done non-blocking.
 */
class PlainHttpConnection extends HttpConnection {

    //Some member variables. It can be seen that NIO's Socket channel is maintained here
    private final Object reading = new Object();
    protected final SocketChannel chan;
    //Bidirectional socket pipeline
    private final SocketTube tube; // need SocketTube to call signalClosed().
    private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
    private volatile boolean connected;
    private boolean closed;
    private volatile ConnectTimerEvent connectTimerEvent;  // may be null
    private volatile int unsuccessfulAttempts;

    // Indicates whether a connection attempt has succeeded or should be retried.
    // If the attempt failed, and shouldn't be retried, there will be an exception
    // instead.
    private enum ConnectState { SUCCESS, RETRY }

    //Constructor
    PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
        super(addr, client);
        try {
            //Open a socket channel, instantiate the chan attribute and set it to non blocking mode
            this.chan = SocketChannel.open();
            chan.configureBlocking(false);
            //Sets the size of the buffer
            if (debug.on()) {
                int bufsize = getSoReceiveBufferSize();
                debug.log("Initial receive buffer size is: %d", bufsize);
                bufsize = getSoSendBufferSize();
                debug.log("Initial send buffer size is: %d", bufsize);
            }
            if (trySetReceiveBufferSize(client.getReceiveBufferSize())) {
                if (debug.on()) {
                    int bufsize = getSoReceiveBufferSize();
                    debug.log("Receive buffer size configured: %d", bufsize);
                }
            }
            if (trySetSendBufferSize(client.getSendBufferSize())) {
                if (debug.on()) {
                    int bufsize = getSoSendBufferSize();
                    debug.log("Send buffer size configured: %d", bufsize);
                }
            }
            //Set and disable TCP packet gluing algorithm
            chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
            // wrap the channel in a Tube for async reading and writing
            //Wrap the nio socket channel in the instantiated socket pipe member variable
            //Its internal structure and function will be analyzed later
            tube = new SocketTube(client(), chan, Utils::getBuffer);
        } catch (IOException e) {
            throw new InternalError(e);
        }
    }
    
}

It can be seen that during the instantiation of PlainHttpConnection, a non blocking socket channel is opened and wrapped in an instantiated socketTube pipe, which is the focus of our analysis in the next section. Before that, we first analyze the connection degradation process.

3.2 connection degradation and upgrade

In the previous section, we mentioned that the static get method of ExchangeImpl determines the subclass that instantiates itself by judging the version number. If we do not specify Http1 when calling 1, the get method will attempt to instantiate Stream (Stream of Http2). However, we call an Http connection. Why instantiate Http2? Isn't it doomed to failure?

In fact, the Http2 specification does not stipulate that it must be established on SSL (TLS). Today, when Http2 has become popular, HttpClient naturally prefers to try Http2. When the connection is established, the client and server will connect through alpn (Application) Layer Protocol Negotiation) to determine the connection type to be established. The server informed that only http1.0 is supported 1 when connecting, HttpClient must also downgrade the connection.

We follow the code and analyze:

static <U> CompletableFuture<? extends ExchangeImpl<U>>
    get(Exchange<U> exchange, HttpConnection connection)
    {
        if (exchange.version() == HTTP_1_1) {
            if (debug.on())
                debug.log("get: HTTP/1.1: new Http1Exchange");
            return createHttp1Exchange(exchange, connection);
        } else {
            //Gets the member variable Httpclient2Impl of HttpclientImpl
            Http2ClientImpl c2 = exchange.client().client2(); // #### improve
            HttpRequestImpl request = exchange.request();
            //Try to get an Http2 connection asynchronously. If it fails, the result in c2f will be empty
            CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request, exchange);
            if (debug.on())
                debug.log("get: Trying to get HTTP/2 connection");
            // local variable required here; see JDK-8223553
            //For the processing of Http2 connections that may or may not be obtained, determine whether to instantiate Stream or Http1Exchange
            //We'll enter later
            CompletableFuture<CompletableFuture<? extends ExchangeImpl<U>>> fxi =
                c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection));
            return fxi.thenCompose(x->x);
        }
    }

We enter http2clientimpl Getconnectionfor method. When the url we want to access does not support http2, there are two cases: directly obtaining the address beginning with HTTP fails; For the address beginning with https, an attempt will be made to establish an http2 connection, but the negotiation fails and ends with an exception.

CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
                                                        Exchange<?> exchange) {
        URI uri = req.uri();
        InetSocketAddress proxy = req.proxy();
        String key = Http2Connection.keyFor(uri, proxy);

        synchronized (this) {
            //Try to get a connection from the Http2 connection pool. Of course, you can't get it
            Http2Connection connection = connections.get(key);
            if (connection != null) {
                try {
                    if (connection.closed || !connection.reserveStream(true)) {
                        if (debug.on())
                            debug.log("removing found closed or closing connection: %s", connection);
                        deleteConnection(connection);
                    } else {
                        // fast path if connection already exists
                        if (debug.on())
                            debug.log("found connection in the pool: %s", connection);
                        return MinimalFuture.completedFuture(connection);
                    }
                } catch (IOException e) {
                    // thrown by connection.reserveStream()
                    return MinimalFuture.failedFuture(e);
                }
            }

            //Case 1: the http connection is accessed. Because ALPN is an extension of SSL/TLS protocol,
            //There is no need to consider here. It directly returns null and fails to obtain the http2 connection
            if (!req.secure() || failures.contains(key)) {
                // secure: negotiate failed before. Use http/1.1
                // !secure: no connection available in cache. Attempt upgrade
                if (debug.on()) debug.log("not found in connection pool");
                return MinimalFuture.completedFuture(null);
            }
        }
        return Http2Connection
            //Case 2: try to continue to obtain the Http2 connection. As you will see later, it will also end in failure here
                .createAsync(req, this, exchange)
                .whenComplete((conn, t) -> {
                    synchronized (Http2ClientImpl.this) {
                        if (conn != null) {
                            try {
                                conn.reserveStream(true);
                            } catch (IOException e) {
                                throw new UncheckedIOException(e); // shouldn't happen
                            }
                            offerConnection(conn);
                        } else {
                            Throwable cause = Utils.getCompletionCause(t);
                            if (cause instanceof Http2Connection.ALPNException)
                                failures.add(key);
                        }
                    }
                });
    }

We track Http2Connection The createsync method will trace to the Http2Connection::checkSSLConfig method. As you can see below, when the attempt to use alpn negotiation to connect with Http2 fails, the establishment of the completable future of the Http2Connection object will be terminated with failure.

//Check the ssl handshake. It will be called during https connection
private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
        assert aconn.isSecure();

        Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
            CompletableFuture<Void> cf = new MinimalFuture<>();
            SSLEngine engine = aconn.getEngine();
            String engineAlpn = engine.getApplicationProtocol();
            assert Objects.equals(alpn, engineAlpn)
                    : "alpn: %s, engine: %s".formatted(alpn, engineAlpn);

            DEBUG_LOGGER.log("checkSSLConfig: alpn: %s", alpn );

            //Try alpn negotiation, and the result is not "h2", indicating that the server does not support http2, and only try to downgrade
            if (alpn == null || !alpn.equals("h2")) {
                String msg;
                if (alpn == null) {
                    Log.logSSL("ALPN not supported");
                    msg = "ALPN not supported";
                } else {
                    switch (alpn) {
                        case "":
                            Log.logSSL(msg = "No ALPN negotiated");
                            break;
                        case "http/1.1":
                            Log.logSSL( msg = "HTTP/1.1 ALPN returned");
                            break;
                        default:
                            Log.logSSL(msg = "Unexpected ALPN: " + alpn);
                            cf.completeExceptionally(new IOException(msg));
                    }
                }
                //An attempt to terminate an Http2 connection with an exception
                cf.completeExceptionally(new ALPNException(msg, aconn));
                return cf;
            }
            cf.complete(null);
            return cf;
        };

        return aconn.getALPN()
                .whenComplete((r,t) -> {
                    if (t != null && t instanceof SSLException) {
                        // something went wrong during the initial handshake
                        // close the connection
                        aconn.close();
                    }
                })
                .thenCompose(checkAlpnCF);
    }

When the Http2 negotiation connection fails, the c2f returned asynchronously to the get method of ExchangeImpl will have no results. It can be expected that http1 will be followed 1. Establishment process of exchange. What else will happen?

We will see that HttpClient can be described as "perseverance". For http requests that cannot be negotiated by Alpn, the request header will also be modified to try to upgrade the protocol.

Since the ExchangeImpl::get method calls the createExchangeImpl method, we follow into:

private static <U> CompletableFuture<? extends ExchangeImpl<U>>
    createExchangeImpl(Http2Connection c,
                       Throwable t,
                       Exchange<U> exchange,
                       HttpConnection connection)
    {
        if (debug.on())
            debug.log("handling HTTP/2 connection creation result");
        boolean secure = exchange.request().secure();
        if (t != null) {
            if (debug.on())
                debug.log("handling HTTP/2 connection creation failed: %s",
                                 (Object)t);
            t = Utils.getCompletionCause(t);
            if (t instanceof Http2Connection.ALPNException) {
                //If we visit http1 1, then it will enter this branch
                Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
                AbstractAsyncSSLConnection as = ee.getConnection();
                if (debug.on())
                    debug.log("downgrading to HTTP/1.1 with: %s", as);
                //Creating Http1Exchange will reuse the original AsyncSSLConnection
                CompletableFuture<? extends ExchangeImpl<U>> ex =
                        createHttp1Exchange(exchange, as);
                return ex;
            } else {
                if (debug.on())
                    debug.log("HTTP/2 connection creation failed "
                                     + "with unexpected exception: %s", (Object)t);
                return MinimalFuture.failedFuture(t);
            }
        }
        if (secure && c== null) {
            if (debug.on())
                debug.log("downgrading to HTTP/1.1 ");
            CompletableFuture<? extends ExchangeImpl<U>> ex =
                    createHttp1Exchange(exchange, null);
            return ex;
        }
        if (c == null) {
            //When the address we want to access starts with HTTP, we will enter the branch and establish http1 1 connect and try connection upgrade
            // no existing connection. Send request with HTTP 1 and then
            // upgrade if successful
            if (debug.on())
                debug.log("new Http1Exchange, try to upgrade");
            return createHttp1Exchange(exchange, connection)
                    .thenApply((e) -> {
                        //Trying to Upgrade a Connection is actually adding the Connection, Upgrade and http2 settings fields to the request header
                        exchange.h2Upgrade();
                        return e;
                    });
        } else {
            if (debug.on()) debug.log("creating HTTP/2 streams");
            Stream<U> s = c.createStream(exchange);
            CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s);
            return ex;
        }
    }

We can see that access to the address beginning with Http will attempt to upgrade the Http2 connection, that is, first request the server to upgrade to Http2 in the form of Http1 request. If the server responds, it will upgrade. The upgrade related steps are after a request response process. In order to be relevant to the topic of this section, we have also taken a look:

private CompletableFuture<Response>
    checkForUpgradeAsync(Response resp,
                         ExchangeImpl<T> ex) {

        int rcode = resp.statusCode();
    //When the response status code is 101, it represents that the server receives the protocol upgrade to Http2
        if (upgrading && (rcode == 101)) {
            Http1Exchange<T> e = (Http1Exchange<T>)ex;
            // check for 101 switching protocols
            // 101 responses are not supposed to contain a body.
            //    => should we fail if there is one?
            if (debug.on()) debug.log("Upgrading async %s", e.connection());
            return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
                .thenCompose((T v) -> {// v is null
                    debug.log("Ignored body");
                    // we pass e::getBuffer to allow the ByteBuffers to accumulate
                    // while we build the Http2Connection
                    ex.upgraded();
                    upgraded = true;
                    //Establish Http2 connection
                    return Http2Connection.createAsync(e.connection(),
                                                 client.client2(),
                                                 this, e::drainLeftOverBytes)
                        .thenCompose((Http2Connection c) -> {
                            boolean cached = c.offerConnection();
                            if (cached) connectionAborter.disable();
                            Stream<T> s = c.getStream(1);

                         //Omit
                );
        }
        return MinimalFuture.completedFuture(resp);
    }

Here, the process of Http connection downgrade and upgrade is introduced. We will enter an exciting part: how data is sent.

4. Connection of responsive read / write stream

Seeing the establishment of Http connection above, we don't seem to see the establishment of corresponding TCP connection? Yes, yes. When requesting to establish a connection for the first time, JDK HttpClient postponed the establishment of socket connection to the relevant methods of sending the request header.

Let's take the above analysis of establishing PlainHttpConnection connection and see what the sockettube instantiated at last is. As can be seen from the UML diagram below, sockettube is the implementation of FlowTube interface, and its other implementation class is SSLTube.

4.1 structure and function of socket pipeline

So, what is FlowTube? From the structure and annotation of FlowTube, it plays the role of publisher and subscriber in JAVA Flow Api (Reactive Streams). As a "connector", one end is connected to the Socket channel for reading and writing, and the other end is connected to the Http message for reading and writing.

/**
 Google Translate original note:
 FlowTube Is an I/O abstraction that allows asynchronous read and write to targets. This is not flow Processor < list < ByteBuffer >, list < ByteBuffer > >, but models the publisher source and subscriber receiver in a two-way flow.
The connectFlows method should be called to connect two-way flows. FlowTube supports handing over the same read subscriptions to different sequential read subscribers over time. When connectFlows(writePublisher, readSubscriber) is called, FlowTube will call dropSubscription on its previous readSubscriber and onSubscribe on its new readSubscriber. 
 */
public interface FlowTube extends
       Flow.Publisher<List<ByteBuffer>>,
       Flow.Subscriber<List<ByteBuffer>> {

    /**
     * Subscriber used to read from a two-way stream. TubeSubscriber is a flow that can be cancelled by calling dropSubscription() Subscriber.   Once dropsubscription () is called, TubeSubscriber should stop calling any method of its subscription. 
     */
    static interface TubeSubscriber extends Flow.Subscriber<List<ByteBuffer>> {

        default void dropSubscription() { }

        default boolean supportsRecycling() { return false; }

    }

    /**
    A publisher who writes to a two-way stream
     * A publisher for writing to the bidirectional flow.
     */
    static interface TubePublisher extends Flow.Publisher<List<ByteBuffer>> {

    }

    /**
     * Connect bidirectional streams to write publishers and read subscribers. This method can be called sequentially multiple times to switch existing publishers and subscribers to new write subscriber and read publisher pairs. 
     * @param writePublisher A new publisher for writing to the bidirectional flow.
     * @param readSubscriber A new subscriber for reading from the bidirectional
     *                       flow.
     */
    default void connectFlows(TubePublisher writePublisher,
                              TubeSubscriber readSubscriber) {

        this.subscribe(readSubscriber);
        writePublisher.subscribe(this);
    }

    /**
     * Returns true if this flow was completed, either exceptionally
     * or normally (EOF reached).
     * @return true if the flow is finished
     */
    boolean isFinished();
     
}

Here, let's talk about the interaction mode of Reactive Streams:

  1. Publisher accepts subscriptions from subscribers: publisher subscribe(Subscriber)
  2. The publisher gives a subscription to the subscriber: subscriber onSubscribe(Subscription)
  3. Subscriber requests n Subscriptions: subscription request(n)
  4. Subscribers can accept up to n Subscriptions: subscriber onNext(T item)
  5. Subscribers can unsubscribe: subscription cancel()
  6. Subscribers receive notifications of completion and errors: subscriber onError(Throwable); subscriber. onComplete()

Let's look at the SocketTube constructor:

/**
 * A SocketTube is a terminal tube plugged directly into the socket.
 * The read subscriber should call {@code subscribe} on the SocketTube before
 * the SocketTube is subscribed to the write publisher.
 */
final class SocketTube implements FlowTube {

    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    static final AtomicLong IDS = new AtomicLong();

    private final HttpClientImpl client;
    //nio socket channel
    private final SocketChannel channel;
    private final SliceBufferSource sliceBuffersSource;
    private final Object lock = new Object();
    private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    private final InternalReadPublisher readPublisher;
    private final InternalWriteSubscriber writeSubscriber;
    private final long id = IDS.incrementAndGet();

    public SocketTube(HttpClientImpl client, SocketChannel channel,
                      Supplier<ByteBuffer> buffersFactory) {
        this.client = client;
        this.channel = channel;
        this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
        //Here, two objects are instantiated as attributes: internal read publisher and internal write receiver
        this.readPublisher = new InternalReadPublisher();
        this.writeSubscriber = new InternalWriteSubscriber();
    }
    
}

In the constructor, SocketTube instantiates readPublisher and writeSubscriber. Their types are the internal classes InternalReadPublisher and InternalWriteSubscriber of SocketTube. Their location and function can be seen from their names:

  • ReadPublisher reads the content from the socket channel and "publishes" it to the pipeline, waiting for the consumer to receive and parse the content into Http request header and request body
  • "WriteSubscriber" subscribes to "Http message". It waits for the publisher of Http content to write the message to the SocketTube, then takes out the message and writes it to the socket channel

These will be further discussed later in the analysis.

4.2 establishment of socket connection

After all, how is the socket connection established? The answer is contained in the default method connectFlows of FlowTube (SocketTube rewrites this method, but only adds a line of log printing). This method requires the caller to pass in a publisher and a subscriber from a "source". In this way, a two-way subscription relationship is established between the caller and SocketTube.

    @Override
    public void connectFlows(TubePublisher writePublisher,
                             TubeSubscriber readSubscriber) {
        //connectFlow method of socketTube class
        if (debug.on()) debug.log("connecting flows");
        this.subscribe(readSubscriber);
        writePublisher.subscribe(this);
    }

To witness this journey, we must go back to the responseAsyncImpl0 method of Exchange.

CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
        Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
        bodyIgnored = null;
        if (request.expectContinue()) {
            request.addSystemHeader("Expect", "100-Continue");
            Log.logTrace("Sending Expect: 100-Continue");
            // wait for 100-Continue before sending body
            after407Check = this::expectContinue;
        } else {
            after407Check = this::sendRequestBody;
        }
        // The ProxyAuthorizationRequired can be triggered either by
        // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
        // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
        // Therefore we handle it with a call to this checkFor407(...) after these
        // two places.
        Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
            //Now, let's focus on this method called sending request headers asynchronously, which is involved in the connection establishment process
                (ex) -> ex.sendHeadersAsync()
                        .handle((r,t) -> this.checkFor407(r, t, after407Check))
                        .thenCompose(Function.identity());
        return establishExchange(connection)	//First establish a connection
                .handle((r,t) -> this.checkFor407(r,t, afterExch407Check))   
                .thenCompose(Function.identity());
    }

We enter the ExchangeImpl::sendHeadersAsync method. Here is the rewriting method of Http1Exchange:

 	@Override
    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
        // create the response before sending the request headers, so that
        // the response can set the appropriate receivers.
        if (debug.on()) debug.log("Sending headers only");
        // If the first attempt to read something triggers EOF, or
        // IOException("channel reset by peer"), we're going to retry.
        // Instruct the asyncReceiver to throw ConnectionExpiredException
        // to force a retry.
        asyncReceiver.setRetryOnError(true);
        if (response == null) {
            //The response object is generated here. Internally, asyncReceiver completes the "subscription" to the request header
            response = new Http1Response<>(connection, this, asyncReceiver);
        }

        if (debug.on()) debug.log("response created in advance");

        CompletableFuture<Void> connectCF;
        if (!connection.connected()) {
            //Note that when the connection is established for the first time, if the socket connection is not established, the connection will be established here
            if (debug.on()) debug.log("initiating connect async");
            //Establish and complete the socket connection asynchronously. We are about to enter the analysis
            connectCF = connection.connectAsync(exchange)
                	//Mark asynchronous connections as connected
                    .thenCompose(unused -> connection.finishConnect());
            Throwable cancelled;
            synchronized (lock) {
                if ((cancelled = failed) == null) {
                    operations.add(connectCF);
                }
            }
            if (cancelled != null) {
                if (client.isSelectorThread()) {
                    executor.execute(() ->
                        connectCF.completeExceptionally(cancelled));
                } else {
                    connectCF.completeExceptionally(cancelled);
                }
            }
        } else {
            connectCF = new MinimalFuture<>();
            connectCF.complete(null);
        }

        return connectCF
                .thenCompose(unused -> {
                    CompletableFuture<Void> cf = new MinimalFuture<>();
                    try {
                        asyncReceiver.whenFinished.whenComplete((r,t) -> {
                            if (t != null) {
                                if (debug.on())
                                    debug.log("asyncReceiver finished (failed=%s)", (Object)t);
                                if (!headersSentCF.isDone())
                                    headersSentCF.completeAsync(() -> this, executor);
                            }
                        });
                        //Finally, the FlowTube::connectFlows method is called to establish a two-way connection
                        //We're about to analyze
                        connectFlows(connection);

                        if (debug.on()) debug.log("requestAction.headers");
                        //Fetch request header data from request
                        List<ByteBuffer> data = requestAction.headers();
                        synchronized (lock) {
                            state = State.HEADERS;
                        }
                        if (debug.on()) debug.log("setting outgoing with headers");
                        assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
                        //Put it in the output queue, which we will analyze in the next section
                        appendToOutgoing(data);
                        cf.complete(null);
                        return cf;
                    } catch (Throwable t) {
                        if (debug.on()) debug.log("Failed to send headers: %s", t);
                        headersSentCF.completeExceptionally(t);
                        bodySentCF.completeExceptionally(t);
                        connection.close();
                        cf.completeExceptionally(t);
                        return cf;
                    } })
                .thenCompose(unused -> headersSentCF);
    }

This method is called "send request header" and actually does several things:

  1. Establish socket connection asynchronously
  2. Establish a two-way subscription relationship with a pipeline (not a socket channel)
  3. Take out the request header, put it into the queue, and notify the subscriber at the pipe end to consume

We will analyze the first two steps in this section. First, let's look at the establishment of asynchronous socket connection: PlainHttpConnection::connectAsync method

//connectAsync method of HttpConnection abstract class implemented by PlainHttpConnection class
@Override
    public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
        CompletableFuture<ConnectState> cf = new MinimalFuture<>();
        try {
            assert !connected : "Already connected";
            assert !chan.isBlocking() : "Unexpected blocking channel";
            boolean finished;

            if (connectTimerEvent == null) {
                //Register the connection timeout timer, which will wake up the blocked selector thread
                connectTimerEvent = newConnectTimer(exchange, cf);
                if (connectTimerEvent != null) {
                    if (debug.on())
                        debug.log("registering connect timer: " + connectTimerEvent);
                    client().registerTimer(connectTimerEvent);
                }
            }
            //Resolve the DNS address, and then connect the socket of the channel with the corresponding address
            //Since the non blocking mode is set, it will be returned immediately,
            //When returned, the connection may have been successful (finished = true), or the connection needs to be continued later (finished = false)
            PrivilegedExceptionAction<Boolean> pa =
                    () -> chan.connect(Utils.resolveAddress(address));
            try {
                 finished = AccessController.doPrivileged(pa);
            } catch (PrivilegedActionException e) {
               throw e.getCause();
            }
            if (finished) {
                //If the direct connection is successful, the asynchronous operation is equivalent to synchronization
                if (debug.on()) debug.log("connect finished without blocking");
                cf.complete(ConnectState.SUCCESS);
            } else {
                //Otherwise, a connection event needs to be registered here (analyzed later). After the event is ready, the selector management thread distributes the event,
                //And call the handle method of the event to complete the establishment of the connection.
                if (debug.on()) debug.log("registering connect event");
                client().registerEvent(new ConnectEvent(cf, exchange));
            }
            cf = exchange.checkCancelled(cf, this);
        } catch (Throwable throwable) {
            cf.completeExceptionally(Utils.toConnectException(throwable));
            try {
                close();
            } catch (Exception x) {
                if (debug.on())
                    debug.log("Failed to close channel after unsuccessful connect");
            }
        }
        return cf.handle((r,t) -> checkRetryConnect(r, t,exchange))
                .thenCompose(Function.identity());
    }

After reading the above methods, we can see that there are two possibilities for the establishment of socket connection: direct success; Or wait until the corresponding channel is ready (connectable event). At this time, a connection event will be registered, and the selector thread will call the handle method of the event to complete the connection later. The flow chart is as follows:

The working process of selector manager is described in detail in the article "construction and startup of HttpClient client".

Let's look at the implementation of ConnectEvent: it is an internal class located in PlainHttpConnection.

final class ConnectEvent extends AsyncEvent {
        private final CompletableFuture<ConnectState> cf;
        private final Exchange<?> exchange;

        ConnectEvent(CompletableFuture<ConnectState> cf, Exchange<?> exchange) {
            this.cf = cf;
            this.exchange = exchange;
        }

        @Override
        public SelectableChannel channel() {
            return chan;
        }

        @Override
        public int interestOps() {
            //The operation of interest to this event is the connection event.
            return SelectionKey.OP_CONNECT;
        }

    //Event handling method. When the connection event is ready, the selector management thread (SelectorManager) will call
        @Override
        public void handle() {
            try {
                assert !connected : "Already connected";
                assert !chan.isBlocking() : "Unexpected blocking channel";
                if (debug.on())
                    debug.log("ConnectEvent: finishing connect");
                //Call the finishConnect method of the java nio channel channel to complete the connection when the connection is ready (now)
                boolean finished = chan.finishConnect();
                if (debug.on())
                    debug.log("ConnectEvent: connect finished: %s, cancelled: %s, Local addr: %s",
                              finished, exchange.multi.requestCancelled(), chan.getLocalAddress());
                assert finished || exchange.multi.requestCancelled() : "Expected channel to be connected";
                // complete async since the event runs on the SelectorManager thread
                cf.completeAsync(() -> ConnectState.SUCCESS, client().theExecutor());
            } catch (Throwable e) {
                if (canRetryConnect(e)) {
                    unsuccessfulAttempts++;
                    cf.completeAsync(() -> ConnectState.RETRY, client().theExecutor());
                    return;
                }
                Throwable t = Utils.toConnectException(e);
                client().theExecutor().execute( () -> cf.completeExceptionally(t));
                close();
            }
        }

        @Override
        public void abort(IOException ioe) {
            client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
            close();
        }
    }

The operation here is relatively simple, that is, the Channel::finishConnect method is called to complete the connection. So far, the process of asynchronous socket connection has been analyzed.

4.3 establishment of bidirectional read-write relationship

Next, let's look at the establishment process of two-way connection:

//Private connectFlows method for Http1Exchange
private void connectFlows(HttpConnection connection) {
        FlowTube tube =  connection.getConnectionFlow();
        if (debug.on()) debug.log("%s connecting flows", tube);

        // Connect the flow to our Http1TubeSubscriber:
        //   asyncReceiver.subscriber().
        tube.connectFlows(writePublisher,
                          asyncReceiver.subscriber());
    }

After understanding Section 4.1, the purpose of this method is obvious:

  1. The InternalReadPublisher of SocketTube is connected to the subscriber of asyncReceiver in Http1Exchange

  2. writePublisher in Http1Exchange (Http1Publisher) and InternalWriteSubScriber connection of SocketTube

Note that these two steps are in order. Otherwise, there may be a problem that the data is written to the socket channel, and the extracted response data does not have a subscriber.

Let's take a look at some of the source code of the corresponding class to see what we do when subscribing:

Http1Publisher

final class Http1Publisher implements FlowTube.TubePublisher {

        final Logger debug = Utils.getDebugLogger(this::dbgString);
        volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
        volatile boolean cancelled;
    	//The publisher of Http1 content holds the "subscription" to which Http1 is written
        final Http1WriteSubscription subscription = new Http1WriteSubscription();
        final Demand demand = new Demand();
    	//Here, a custom scheduler is used to ensure the reading and writing order. We will focus on writeTask in the next section
        final SequentialScheduler writeScheduler =
                SequentialScheduler.lockingScheduler(new WriteTask());

        @Override
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
            assert state == State.INITIAL;
            Objects.requireNonNull(s);
            assert subscriber == null;

            subscriber = s;
            if (debug.on()) debug.log("got subscriber: %s", s);
            //(in PlainHttpConnection) enable the internalWriteSubscriber in socketTube to receive subscriptions
            s.onSubscribe(subscription);
        }
    
    //..................
}

Internalwritesubscriber (internal class) of SocketTube:

private final class InternalWriteSubscriber
            implements Flow.Subscriber<List<ByteBuffer>> {

        volatile WriteSubscription subscription;
        volatile List<ByteBuffer> current;
        volatile boolean completed;
    	//Here, the events required to trigger the request for the initial Http content are maintained
        final AsyncTriggerEvent startSubscription =
                new AsyncTriggerEvent(this::signalError, this::startSubscription);
        final WriteEvent writeEvent = new WriteEvent(channel, this);
        final Demand writeDemand = new Demand();

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            WriteSubscription previous = this.subscription;
            if (debug.on()) debug.log("subscribed for writing");
            try {
                //If it is a new subscription, you need to register the subscription event and wait for the selector thread to distribute it to trigger the write operation
                boolean needEvent = current == null;
                if (needEvent) {
            	//If you have previously subscribed to another publisher, the previous subscription is discarded
                    if (previous != null && previous.upstreamSubscription != subscription) {
                        previous.dropSubscription();
                    }
                }
                //Receive new subscriptions. Instead of directly using it as a member variable, a new subscription is thinly encapsulated as a proxy
                this.subscription = new WriteSubscription(subscription);
                if (needEvent) {
                    if (debug.on())
                        debug.log("write: registering startSubscription event");
                    client.registerEvent(startSubscription);
                }
            } catch (Throwable t) {
                signalError(t);
            }
        }
}

InternalReadPublisher (internal class) of SocketTube:

private final class InternalReadPublisher
            implements Flow.Publisher<List<ByteBuffer>> {
    	//The publisher of socket content holds an internal subscription to read socket information
        private final InternalReadSubscription subscriptionImpl
                = new InternalReadSubscription();
        AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
        private volatile ReadSubscription subscription;

        @Override
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
            Objects.requireNonNull(s);

            TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
            ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
            ReadSubscription previous = pendingSubscription.getAndSet(target);

            //Here, it is still judged that if the subscriber has been subscribed, the previous subscriber will be discarded
            if (previous != null && previous != target) {
                if (debug.on())
                    debug.log("read publisher: dropping pending subscriber: "
                              + previous.subscriber);
                previous.errorRef.compareAndSet(null, errorRef.get());
                previous.signalOnSubscribe();
                if (subscriptionImpl.completed) {
                    previous.signalCompletion();
                } else {
                    previous.subscriber.dropSubscription();
                }
            }

            if (debug.on()) debug.log("read publisher got subscriber");
            //In this step, you can select to register a subscription event to implement asynchronous subscription according to the situation of sequentialScheduler,
            //Finally, the onsubscribe method of the subscriber is called; Or directly call subscriber onSubscribe
            subscriptionImpl.signalSubscribe();
            debugState("leaving read.subscribe: ");
        }
}

Http1TubeSubscriber (internal member) of http1aasyncreceiver:

final class Http1TubeSubscriber implements TubeSubscriber {
        volatile Flow.Subscription subscription;
        volatile boolean completed;
        volatile boolean dropped;

        public void onSubscribe(Flow.Subscription subscription) {
            // supports being called multiple time.
            // doesn't cancel the previous subscription, since that is
            // most probably the same as the new subscription.
            if (debug.on()) debug.log("Received onSubscribed from upstream");
            if (Log.channel()) {
                Log.logChannel("HTTP/1 read subscriber got subscription from {0}", describe());
            }
            assert this.subscription == null || dropped == false;
            //When accepting a subscription, the reference to the subscription is simply maintained here
            this.subscription = subscription;
            dropped = false;
            canRequestMore.set(true);
            if (delegate != null) {
                scheduler.runOrSchedule(executor);
            } else {
                if (debug.on()) debug.log("onSubscribe: read delegate not present yet");
            }
        }
}

You can see that after calling the connectFlows method, the publisher and subscriber are basically ready, but you may also need the help of the SelectorManager thread to really complete the subscription.

In the above code, a scheduler member appears, which actually controls the whole reading and writing process. We must be clear about its role, otherwise we will be lost in the changeable "scheduling" process.

4.4 brief analysis of sequential scheduler

The secscheduler variable in the above code belongs to the SequentialScheduler type. The SequentialScheduler class is located in JDK internal. net. http. Under the common package, JDK is designed to simplify the sequential execution of the same repeatable task. It ensures that the same tasks are mutually exclusive.

The following is the annotation description and translation on the class.

A scheduler of ( repeatable ) tasks that MUST be run sequentially.
This class can be used as a synchronization aid that assists a number of parties in running a task in a mutually exclusive fashion.
To run the task, a party invokes runOrSchedule. To permanently prevent the task from subsequent runs, the party invokes stop.
The parties can, but do not have to, operate in different threads.
The task can be either synchronous ( completes when its run method returns ), or asynchronous ( completed when its DeferredCompleter is explicitly completed ).
The next run of the task will not begin until the previous run has finished.
The task may invoke runOrSchedule itself, which may be a normal situation.

Scheduler for tasks that must be run sequentially (repeatable). This class can be used as a synchronization aid to help multiple parties run tasks in a mutually exclusive manner.

In order to run the task, one party calls runOrSchedule. In order to permanently prevent the task from running later, the party calls stop.

Parties can but do not have to operate in different threads. A task can be synchronous (when its run method returns) or asynchronous (when its DeferredCompleter explicitly completes).

The next task run will not start until the last run is completed.

The task may call runOrSchedule itself, which may be normal.

Let's take a look at the uml class diagram of SequentialScheduler:

In short, this class controls whether the newly called runOrSchedule method is executed through the atomic shaping variable state. It is worth noting that when multiple threads call the runOrSchedule method, only one task can be in the waiting state, and other tasks will not be executed.

5. Sending of request header and request body

After seeing the delicate connection establishment process, we will witness the sending process of request header and request body. In short, sending the request header is a publish subscribe process that follows the Reactive Streams specification. It uses a thread safe two-way queue to maintain the data to be output (request header + body).

In the process of sending the request header, the flow of the request header data can be described by the following data flow diagram:

Since this section involves the classic reactive flow interaction, which is relatively complex, let's briefly introduce the relevant members involved.

5.1 introduction to publishers and subscribers

At plain http1 1. In the process of request response, Http1Exchange acts as the publisher of the request and the subscriber of the response; SocketTube acts as the subscriber of the request and the publisher of the response. In fact, these functions are performed by their internal class members. We first release the UML class diagrams of Http1Exchange and SocketTube, and then briefly introduce the members involved.

uml class diagram of Http1Exchange:

uml class diagram of SocketTube:

Class name Parent class (Interface) External class role function
Http1Exchange ExchangeImpl - Primary http1 1 manager of request response exchange Implement the parent class method and manage http1.0 once 1. Request response exchange, and assign specific responsibilities to internal members
Http1Publisher FlowTube.TubePublisher,Flow.Publisher Http1Exchange Publisher of Http request content Accept the registration of pipeline subscribers; Fetch content from the request data buffer queue and deliver it to the subscriber
Http1WriteSubscription Flow.Subscription Http1Publisher Subscription (relationship) of Http request content As a medium, it connects publishers and subscribers, accepts the needs of pipeline subscribers for Http requested content and transmits it to publishers
WriteTask Runnable Http1Publisher task The publishing task to be performed by the Http1 request data publisher (Http1 publisher) as Http1
DataPair - Http1Exchange Data combination Combining the Http request fragment data to be transmitted and the error on the fragment, it is stored in the buffer queue waiting for the publisher to take it out to realize error notification
SocketTube Flow.Publisher,Flow.Subscriber - socket pipe Manage and maintain publishers and subscribers on the socket channel side
InternalWriteSubscriber Flow.Subscriber SocketTube Subscriber of Http request content Receive the Http request data and write it to the socket channel
WriteSubscription Flow.Subscription InternalWriteSubscriber Wrapper for Http pull content subscription The agent modifies the behavior of Http1WriteSubscription and participates in the behavior of requesting data from Http
SocketFlowEvent AsyncEvent SocketTube Channel flow events The parent class of channel read / write events. By changing the operation of interest, you can pause or start reading and writing channel data
WriteEvent SocketFlowEvent InternalWriteSubscriber Channel write event For op_ The write operation is of interest and is used to register with the socket channel to start the write operation

5.2 startup process of requesting hair delivery

Let's go back to the sendHeadersAsync method of Http1Exchange:

	@Override
    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
        //Pay attention to the comments here. Create a response when sending the request header, so that the response object can set the correct receiver
        // create the response before sending the request headers, so that
        // the response can set the appropriate receivers.
        if (debug.on()) debug.log("Sending headers only");
        // If the first attempt to read something triggers EOF, or
        // IOException("channel reset by peer"), we're going to retry.
        // Instruct the asyncReceiver to throw ConnectionExpiredException
        // to force a retry.
        asyncReceiver.setRetryOnError(true);
        if (response == null) {
            //Create response
            response = new Http1Response<>(connection, this, asyncReceiver);
        }

        if (debug.on()) debug.log("response created in advance");

        CompletableFuture<Void> connectCF;
        //This is the process of establishing an asynchronous socket connection. It has been analyzed and omitted
        //............
        return connectCF
                .thenCompose(unused -> {
                    CompletableFuture<Void> cf = new MinimalFuture<>();
                    try {
                        asyncReceiver.whenFinished.whenComplete((r,t) -> {
                            if (t != null) {
                                if (debug.on())
                                    debug.log("asyncReceiver finished (failed=%s)", (Object)t);
                                if (!headersSentCF.isDone())
                                    headersSentCF.completeAsync(() -> this, executor);
                            }
                        });
                        connectFlows(connection);

                        if (debug.on()) debug.log("requestAction.headers");
                        //Build and take out the request header from the request, including the construction process of headers string and ByteBuffer. Skip
                        List<ByteBuffer> data = requestAction.headers();
                        //Setting the status of this Exchange is in the send request header
                        synchronized (lock) {
                            state = State.HEADERS;
                        }
                        if (debug.on()) debug.log("setting outgoing with headers");
                        assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
                  		//This step wraps the request header into a dataPair, puts it into a buffer queue, and notifies the subscriber to accept it
                        //We will enter the analysis
                        appendToOutgoing(data);
                        cf.complete(null);
                        return cf;
                    } catch (Throwable t) {
                        if (debug.on()) debug.log("Failed to send headers: %s", t);
                        headersSentCF.completeExceptionally(t);
                        bodySentCF.completeExceptionally(t);
                        connection.close();
                        cf.completeExceptionally(t);
                        return cf;
                    } })
                .thenCompose(unused -> headersSentCF);
    }

We see that after requesting the header information to be removed, we call the appendToOutgoing(List) method, we track the entry, and we need to pay attention to the outgoing queue.

class Http1Exchange<T> extends ExchangeImpl<T> {  
    //...... Omitted

    /** Holds the outgoing data, either the headers or a request body part. Or
     * an error from the request body publisher. At most there can be ~2 pieces
     * of outgoing data ( onComplete|onError can be invoked without demand ).*/
    //The queue that carries the output data acts as a "buffer" in the responsive flow
    final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();

    /** A carrier for either data or an error. Used to carry data, and communicate
     * errors from the request ( both headers and body ) to the exchange. */
    //Static nested classes within Http1Exchange combine a "fragment" of data and errors
    static class DataPair {
        Throwable throwable;
        List<ByteBuffer> data;
        DataPair(List<ByteBuffer> data, Throwable throwable){
            this.data = data;
            this.throwable = throwable;
        }
        @Override
        public String toString() {
            return "DataPair [data=" + data + ", throwable=" + throwable + "]";
        }
    }	

    //These methods are located within Http1Exchange
    void appendToOutgoing(List<ByteBuffer> item) {
        //Wrap the output byteBuffer into a DataPair data pair
        appendToOutgoing(new DataPair(item, null));
    }

    private void appendToOutgoing(DataPair dp) {
        if (debug.on()) debug.log("appending to outgoing " + dp);
        //Add data pairs to the member variable queue named outgoing
        outgoing.add(dp);
        //The notification performs the "publish" operation, and we continue to track it. It is located in the inner class of the publisher below
        writePublisher.writeScheduler.runOrSchedule();
    }
    
    
    final class Http1Publisher implements FlowTube.TubePublisher {

        final Logger debug = Utils.getDebugLogger(this::dbgString);
        volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
        volatile boolean cancelled;
        //Subscription (relationship) information held by the requested publisher. In a reactive flow, subscriptions (relationships) are generated by the publisher and handed over to the subscriber
        //The modified class is located in the Http1Publisher. In order not to affect reading, we will analyze it later
        final Http1WriteSubscription subscription = new Http1WriteSubscription();
        final Demand demand = new Demand();
        //The sequential scheduler described in the previous section is used to wrap the write task. We focus on writing tasks
        final SequentialScheduler writeScheduler =
                SequentialScheduler.lockingScheduler(new WriteTask());
        
        //We should focus on the write task, which will be called by both the write request header and the request body
        final class WriteTask implements Runnable {
            @Override
            public void run() {
                assert state != State.COMPLETED : "Unexpected state:" + state;
                if (debug.on()) debug.log("WriteTask");

                if (cancelled) {
                    if (debug.on()) debug.log("handling cancellation");
                    //If it is cancelled due to errors, the scheduler's stop method is called, and then the task will never be executed
                    //At the same time, the sending process of request hair and request body is marked as completed asynchronously
                    writeScheduler.stop();
                    getOutgoing();
                    return;
                }

                if (checkRequestCancelled()) return;

                if (subscriber == null) {
                    //If there are no subscribers, ignore them first
                    if (debug.on()) debug.log("no subscriber yet");
                    return;
                }

                if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
                //This is the condition for reading the request data from the buffer queue, that is, the queue is not empty and a request from the subscriber is received
                while (hasOutgoing() && demand.tryDecrement()) {
                    //Get the next data pair to be output. Note: the request header is just a piece of data.
                    //In getOutgoing(), the current exchange state will be switched. After reading the request header that occupies only one DataPair data,
                    //The status of the current exchange object will be set to "BODY", that is, the request BODY will be sent; A placeholder for the send request header is also set
                    //headersCentCF is in the completed state
                    DataPair dp = getOutgoing();
                    if (dp == null)
                        break;

                    if (dp.throwable != null) {
                        //If an error occurs, the write task is stopped forever
                        if (debug.on()) debug.log("onError");
                        // Do not call the subscriber's onError, it is not required.
                        writeScheduler.stop();
                    } else {
                        List<ByteBuffer> data = dp.data;
                        if (data == Http1BodySubscriber.COMPLETED) {
                            //If the fetched data to be written is an empty byteBuffer that marks the end of the request body, it is marked that the write request task has been completed
                            synchronized (lock) {
                                assert state == State.COMPLETING : "Unexpected state:" + state;
                                state = State.COMPLETED;
                            }
                            if (debug.on())
                                debug.log("completed, stopping %s", writeScheduler);
                            writeScheduler.stop();
                            // Do nothing more. Just do not publish anything further.
                            // The next Subscriber will eventually take over.

                        } else {
                            if (checkRequestCancelled()) return;
                            if (debug.on())
                                debug.log("onNext with " + Utils.remaining(data) + " bytes");
                            //In the process of writing the request header and body, the onNext method of the subscriber (SocketTube in Plain Http1) is called continuously,
                            //Make it write to socket channel
                            subscriber.onNext(data);
                        }
                    }
                }
            }
        }
        
    }
      

  }

We can see that the operation of the request (header) Publisher is relatively clear:

  • Build and take out the request header from the request, package it into a data pair, and put it into a two-way buffer queue maintained in Http1Exchange
  • Perform the "notification" of request writing. When the conditions are met, take out the data from the queue and notify the subscriber of consumption

So what are the conditions for "when the conditions are met"?

                while (hasOutgoing() && demand.tryDecrement()) 

We can see that the "condition" is that the buffer queue is not empty on the one hand, and the "demand" is not empty on the other hand. So what the hell is this requirement, how and when is it initialized and operated? In fact, this requirement just encapsulates a variable of AtomicLong type, which acts as the number of items requested (but not yet delivered) by the subscriber to the publisher in the reactive flow (reactive flow). Let's take a look at the structure of the demand, and then focus on the behavior of the InternalWriteSubscriber in SocketTube during the sending of the request header (body).

/**
 * Encapsulates operations with demand (Reactive Streams).
 *
 * <p> Demand is the aggregated number of elements requested by a Subscriber
 * which is yet to be delivered (fulfilled) by the Publisher.
 */
public final class Demand {

    private final AtomicLong val = new AtomicLong();

    public boolean increase(long n) {
        //Add n requirements
    }

    /**
     * Increases this demand by 1 but only if it is fulfilled.
     * @return true if the demand was increased, false otherwise.
     */
    public boolean increaseIfFulfilled() {
        //When the current demand is empty, add 1 demand for startup
        return val.compareAndSet(0, 1);
    }

    public long decreaseAndGet(long n) {
        //Reduce the demand by n as far as possible. If it is insufficient, it will be reduced to zero
    }

    //Reduce the demand quantity by 1
    public boolean tryDecrement() {
        return decreaseAndGet(1) == 1;
    }

    public boolean isFulfilled() {
        return val.get() == 0;
    }

    public void reset() {
        val.set(0);
    }

    public long get() {
        return val.get();
    }

}

The following is the InternalWriteScheduler of SocketTube. Let's first analyze the onSubScribe method. This method receives a subscription (relationship) after subscribing to Http1Publisher. After the subscription is thinly wrapped, it starts to request to accept List data from the subscription relationship for writing to the socket channel.

//This class assumes that the publisher will call onNext in order, and if the request(1) does not increase the demand, onNext will not be called.
 //It has a queue length of 1, which means it will call request(1) in onSubscribe.
//And current is set to null only after its' current 'buffer list is completely written;     

// This class makes the assumption that the publisher will call onNext
    // sequentially, and that onNext won't be called if the demand has not been
    // incremented by request(1).
    // It has a 'queue of 1' meaning that it will call request(1) in
    // onSubscribe, and then only after its 'current' buffer list has been
    // fully written and current set to null;
    private final class InternalWriteSubscriber
            implements Flow.Subscriber<List<ByteBuffer>> {
        //Thin wrapping of received subscriptions
        volatile WriteSubscription subscription;
        volatile List<ByteBuffer> current;
        volatile boolean completed;
        //The event that initiates the consumption data request for the first time is distributed and processed by the selector management thread. We want to focus on this::startSubscription
        final AsyncTriggerEvent startSubscription =
                new AsyncTriggerEvent(this::signalError, this::startSubscription);
        final WriteEvent writeEvent = new WriteEvent(channel, this);
        final Demand writeDemand = new Demand();
        
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            //Once again, we focus on the method of accepting subscriptions. If there is a subscription before, discard it
            WriteSubscription previous = this.subscription;
            if (debug.on()) debug.log("subscribed for writing");
            try {
                boolean needEvent = current == null;
                if (needEvent) {
                    if (previous != null && previous.upstreamSubscription != subscription) {
                        previous.dropSubscription();
                    }
                }
                //The received subscription is wrapped in a thin package. This step is mainly to control
                this.subscription = new WriteSubscription(subscription);
                if (needEvent) {
                    if (debug.on())
                        debug.log("write: registering startSubscription event");
                    //Place an event to start consuming the request header (body) to the queue to be registered in the SelectorManager, and wait for it to be registered
                    //Selector management thread distribution execution
                    client.registerEvent(startSubscription);
                }
            } catch (Throwable t) {
                signalError(t);
            }
        }
        
        @Override
        public void onNext(List<ByteBuffer> bufs) {
            //We will analyze the onNext method later, which is responsible for writing data to the socket channel
        }
        
        //......
        
        //The method handle corresponding to the second parameter of the startSubscription member variable constructor
        void startSubscription() {
            try {
                if (debug.on()) debug.log("write: starting subscription");
                if (Log.channel()) {
                    Log.logChannel("Start requesting bytes for writing to channel: {0}",
                            channelDescr());
                }
                assert client.isSelectorThread();
                //Ensure that the publisher of the socket response function has been registered (so that the response can be received after sending)
                // make sure read registrations are handled before;
                readPublisher.subscriptionImpl.handlePending();
                if (debug.on()) debug.log("write: offloading requestMore");
                // start writing;   Ask the publisher to give it message content (subscription)
                client.theExecutor().execute(this::requestMore);
            } catch(Throwable t) {
                signalError(t);
            }
        }

        
        void requestMore() {
           WriteSubscription subscription = this.subscription;
            //Finally, the subscription request method is called
           subscription.requestMore();
        }
   
        //Subscription (relationship) of Http1Publisher received in thin package
        final class WriteSubscription implements Flow.Subscription {
            //Upstream subscription, that is, the subscription to s.onSubscribe(subscription) in the subscribe method of Http1Publisher.
            final Flow.Subscription upstreamSubscription;
            volatile boolean cancelled;
            WriteSubscription(Flow.Subscription subscription) {
                this.upstreamSubscription = subscription;
            }

            @Override
            public void request(long n) {
                if (cancelled) return;
                upstreamSubscription.request(n);
            }

            @Override
            public void cancel() {
                if (cancelled) return;
                if (debug.on()) debug.log("write: cancel");
                if (Log.channel()) {
                    Log.logChannel("Cancelling write subscription");
                }
                dropSubscription();
                upstreamSubscription.cancel();
            }

            void dropSubscription() {
                synchronized (InternalWriteSubscriber.this) {
                    cancelled = true;
                    if (debug.on()) debug.log("write: resetting demand to 0");
                    writeDemand.reset();
                }
            }

            //Finally get to the point
            void requestMore() {
                try {
                    if (completed || cancelled) return;
                    boolean requestMore;
                    long d;
                    // don't fiddle with demand after cancel.
                    // see dropSubscription.
                    synchronized (InternalWriteSubscriber.this) {
                        if (cancelled) return;
                        //The next step is to initialize the request, that is, if the request is initially 0, more requests will be requested
                        d = writeDemand.get();
                        requestMore = writeDemand.increaseIfFulfilled();
                    }
                    if (requestMore) {
                        //Request an upstream subscription to get a subscription
                        if (debug.on()) debug.log("write: requesting more...");
                        //This method requires one more item from the upstream (requesting data fragmentation), and adds a "Demand" to the upstream subscription
                        upstreamSubscription.request(1);
                    } else {
                        if (debug.on())
                            debug.log("write: no need to request more: %d", d);
                    }
                } catch (Throwable t) {
                    if (debug.on())
                        debug.log("write: error while requesting more: " + t);
                    signalError(t);
                } finally {
                    debugState("leaving requestMore: ");
                }
            }
        }
        
    }

The above code is a little deep encapsulated, but it is not difficult to understand the logic after carefully reading the working process of responsive flow: after receiving the subscription, make a layer of packaging, then register an event of "requiring to receive subscriptions", wait for the selector management thread to distribute and execute the event, and add a "demand" to the upstream subscription; The upstream Http1Publisher will start to fragment the request data in the buffer queue through the subscriber The onnext method is given to the subscriber (the current InternalWriteSubscriber). Let's look back at the internal class Http1WriteSubscription in the internal class of Http1Publisher, that is, how the "upstream subscription" mentioned above operates when receiving a request(1):

final class Http1WriteSubscription implements Flow.Subscription {

    @Override
    public void request(long n) {
        if (cancelled)
            return;  //no-op
        //Here, add 1 to the member variable demand in Http1Publisher, that is, the demand is increased
        demand.increase(n);
        if (debug.on())
            debug.log("subscription request(%d), demand=%s", n, demand);
        //Start the write task of "fetching data from the buffer queue to the subscriber" mentioned above
        writeScheduler.runOrSchedule(client.theExecutor());
    }

    @Override
    public void cancel() {
        if (debug.on()) debug.log("subscription cancelled");
        if (cancelled)
            return;  //no-op
        cancelled = true;
        writeScheduler.runOrSchedule(client.theExecutor());
    }
}

So far, the sending and starting process of Http1 request header (including request body) has been introduced. Next, we will focus on the onNext method of the subscriber (InternalWriteSubscriber in SocketTube) to see how it writes data to the socket channel.

5.3 write data to NiO socket channel

Next, we will focus on the onNext method of InternalWriteSubscriber, which involves requesting data from the publisher and writing to the socket channel:

final class SocketTube implements FlowTube {
    //Omit
    
    //Subscribers of request header and request body
    private final class InternalWriteSubscriber
                implements Flow.Subscriber<List<ByteBuffer>> {

        volatile WriteSubscription subscription;
        //Currently held request data (request header / request body)
        volatile List<ByteBuffer> current;
        volatile boolean completed;
        final AsyncTriggerEvent startSubscription =
            new AsyncTriggerEvent(this::signalError, this::startSubscription);
        final WriteEvent writeEvent = new WriteEvent(channel, this);
        final Demand writeDemand = new Demand();

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            //This method has been analyzed in the previous section
        }

        //This is the method we want to analyze
        @Override
        public void onNext(List<ByteBuffer> bufs) {
            assert current == null : dbgString() // this is a queue of 1.
                + "w.onNext current: " + current;
            assert subscription != null : dbgString()
                + "w.onNext: subscription is null";
            current = bufs;
            //Flush the current request data to the NIO socket channel
            tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
            // For instance in HTTP/2, a received SETTINGS frame might trigger
            // the sending of a SETTINGS frame in turn which might cause
            // onNext to be called from within the same selector thread that the
            // original SETTINGS frames arrived on. If rs is the read-subscriber
            // and ws is the write-subscriber then the following can occur:
            // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
            // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
            debugState("leaving w.onNext");
        }

        //Method of flushing data to socket channel
        // If this method is invoked in the selector manager thread (because of
        // a writeEvent), then the executor will be used to invoke request(1),
        // ensuring that onNext() won't be invoked from within the selector
        // thread. If not in the selector manager thread, then request(1) is
        // invoked directly.
        void tryFlushCurrent(boolean inSelectorThread) {
            List<ByteBuffer> bufs = current;
            if (bufs == null) return;
            try {
                assert inSelectorThread == client.isSelectorThread() :
                "should " + (inSelectorThread ? "" : "not ")
                    + " be in the selector thread";
                //Gets the length of data to be written
                long remaining = Utils.remaining(bufs);
                if (debug.on()) debug.log("trying to write: %d", remaining);
                //Try to write all the data to be written to the socket channel at one time. However, the nio channel may be slow,
                //In non blocking mode, the "best effort" is written, and then the number of bytes written is returned directly
                long written = writeAvailable(bufs);
                if (debug.on()) debug.log("wrote: %d", written);
                assert written >= 0 : "negative number of bytes written:" + written;
                assert written <= remaining;
                if (remaining - written == 0) {
                //When all the requested data in this batch have been written, request more data from the publisher
                    current = null;
                    if (writeDemand.tryDecrement()) {
                        Runnable requestMore = this::requestMore;
                        if (inSelectorThread) {
                            assert client.isSelectorThread();
                            client.theExecutor().execute(requestMore);
                        } else {
                            assert !client.isSelectorThread();
                            requestMore.run();
                        }
                    }
                } else {
                    //If the nio channel is full, a write event should be registered,
                    //After waiting for the write to be ready, the SelectorManager thread distributes and processes the event to continue writing
                    resumeWriteEvent(inSelectorThread);
                }
            } catch (Throwable t) {
                signalError(t);
            }
        }

        //Best effort write method
        private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
            ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
            final long remaining = Utils.remaining(srcs);
            long written = 0;
            while (remaining > written) {
                try {
                    //The final call is Java nio. channels. write(ByteBuffer []) method of socketchannel,
                    //Write request data to socket channel
                    long w = channel.write(srcs);
                    assert w >= 0 : "negative number of bytes written:" + w;
                    if (w == 0) {
                        break;
                    }
                    written += w;
                } catch (IOException x) {
                    if (written == 0) {
                        // no bytes were written just throw
                        throw x;
                    } else {
                        // return how many bytes were written, will fail next time
                        break;
                    }
                }
            }
            return written;
        }

        //Call the "continue to write event" method of the external class (SocketChannel) to register the event
        void resumeWriteEvent(boolean inSelectorThread) {
                if (debug.on()) debug.log("scheduling write event");
                resumeEvent(writeEvent, this::signalError);
        }

    }

    //Method registers or updates events as appropriate
    private void resumeEvent(SocketFlowEvent event,
                             Consumer<Throwable> errorSignaler) {
        boolean registrationRequired;
        synchronized(lock) {
            registrationRequired = !event.registered();
            event.resume();
        }
        try {
            if (registrationRequired) {
                client.registerEvent(event);
            } else {
                client.eventUpdated(event);
            }
        } catch(Throwable t) {
            errorSignaler.accept(t);
        }
    }
}

So far, we have seen the closed loop of the whole request header information writing.

5.4 sending of requester

The sending of the request body is similar to the request header. The main steps of writing to the socket channel are presented above. However, sending the request body from the caller to Http1Publisher is another "publish subscribe" process. We will track the entire request body write process.

Let's first review the calling code when sending a request with the request body:

HttpRequest request = HttpRequest.newBuilder()
      .uri(URI.create("http://openjdk.java.net/")) 	// Set destination url
      .timeout(Duration.ofMinutes(1))	//Timeout
      .header("Content-Type", "application/json")	//Set the request body to json format
      //.POST(BodyPublishers.ofFile(Paths.get("file.json")))  	// Set the content to be read from the JSON file
      .POST(HttpRequest.BodyPublishers.ofString(		//Send json string
                            JSONUtil.toJsonStr(Map.of("country", "China"))))
      .build()

As you can see, we can specify the request body of the POST method by passing in an HttpRequest::BodyPublisher object. The specific operation is to instantiate its different implementation classes through the auxiliary tool class BodyPublishers with strings or files (or input streams or other publishers). The BodyPublisher interface in HttpRequest is defined as follows.

public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {

        /**
         * Returns the content length for this request body. May be zero
         * if no request body being sent, greater than zero for a fixed
         * length content, or less than zero for an unknown content length.
         *
         * <p> This method may be invoked before the publisher is subscribed to.
         * This method may be invoked more than once by the HTTP client
         * implementation, and MUST return the same constant value each time.
         *
         * @return the content length for this request body, if known
         */
        long contentLength();
    }

You can see that the BodyPublisher interface inherits flow Publisher interface that acts as a publisher in a responsive flow model. Its implementation classes are distributed in the RequestPublisher class, as shown in the following figure. As you will see later, the interface determines whether its subscribers are fixed length or streaming subscribers by distinguishing whether the return value of contentLength is positive, negative or 0.

Let's take a look at ByteArrayPublisher, which is our most commonly used body publishers as callers The parent class of StringPublisher returned by ofstring(). It can be seen that byte publisher ByteArrayPublisher uses byte array to store the caller's data. When accepting subscription, a real publisher PullPublisher is generated to accept the subscriber's publication.

//This class is also bodypublishers The parent class of StringPublisher returned by ofstring()
public static class ByteArrayPublisher implements BodyPublisher {
        private final int length;
    	//Byte array received from caller
        private final byte[] content;
        private final int offset;
        private final int bufSize;

        public ByteArrayPublisher(byte[] content) {
            this(content, 0, content.length);
        }

        public ByteArrayPublisher(byte[] content, int offset, int length) {
            this(content, offset, length, Utils.BUFSIZE);
        }

        /* bufSize exposed for testing purposes */
        ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
            this.content = content;
            this.offset = offset;
            this.length = length;
            this.bufSize = bufSize;
        }

    	//The operation of being subscribed, which converts the byte array into ByteBuffer
        List<ByteBuffer> copy(byte[] content, int offset, int length) {
            List<ByteBuffer> bufs = new ArrayList<>();
            while (length > 0) {
                ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length));
                int max = b.capacity();
                int tocopy = Math.min(max, length);
                b.put(content, offset, tocopy);
                offset += tocopy;
                length -= tocopy;
                b.flip();
                bufs.add(b);
            }
            return bufs;
        }

    	//Accept the subscription method, which will be called after the requestor is published to complete the subscription
        @Override
        public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
            List<ByteBuffer> copy = copy(content, offset, length);
            //For the byte type Http request body, this is the real publisher
            var delegate = new PullPublisher<>(copy);
            delegate.subscribe(subscriber);
        }

        @Override
    	//Here, a fixed length of the request body is returned, which means that a fixed length subscriber will be created later
        public long contentLength() {
            return length;
        }
    }

In the subscribe method of PullPublisher, the subscriber's onSubscribe method is called.

We turned our attention to our subscribers. After the request sending is completed and the verification 407 error passes, Http1Exchange will send the request body immediately, and the subscription process takes place in the relevant methods of sending the request body of Exchange and ExchangeImpl. The link flow invoked is:

Exchange::responseAsyncImpl0 -> Exchange::sendRequestBody -> Http1Exchange::sendBodyAsync

Let's review the relevant methods in Http1Exchange and Http1Request:

/**
 * Encapsulates one HTTP/1.1 request/response exchange.
 */
class Http1Exchange<T> extends ExchangeImpl<T> {
    //Omit
    
@Override
    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
        assert headersSentCF.isDone();
        if (debug.on()) debug.log("sendBodyAsync");
        try {
            //If there is a requester, the subscriber of the Http requester is instantiated and the subscription operation is completed
            bodySubscriber = requestAction.continueRequest();
            if (debug.on()) debug.log("bodySubscriber is %s",
                    bodySubscriber == null ? null : bodySubscriber.getClass());
            if (bodySubscriber == null) {
                bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
                //If the request body is not sent, the "completed" flag bit is directly sent to the buffer queue mentioned above,
                //That is, the "finish" flag is given to http1 publisher, which was mentioned earlier
                //Connect to the next publisher of socketTube
                appendToOutgoing(Http1BodySubscriber.COMPLETED);
            } else {
                // start
                bodySubscriber.whenSubscribed
                        .thenAccept((s) -> cancelIfFailed(s))
                    	//After the asynchronous subscription is successful, the Http request body is requested from the publisher
                        .thenAccept((s) -> requestMoreBody());
            }
        } catch (Throwable t) {
            cancelImpl(t);
            bodySentCF.completeExceptionally(t);
        }
        return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
    }
    //Omit
}

//An external class representing the Http1 specific request action, which is referenced with Http1Exchange
class Http1Request {

    //Omit
    Http1BodySubscriber continueRequest()  {
        Http1BodySubscriber subscriber;
        if (streaming) {
            //If it is a streaming request body, use the streaming subscriber, otherwise use a fixed length subscriber
            subscriber = new StreamSubscriber();
            requestPublisher.subscribe(subscriber);
        } else {
            if (contentLength == 0)
                return null;

            subscriber = new FixedContentSubscriber();
            //Complete the subscription to BodyPublisher above
            requestPublisher.subscribe(subscriber);
        }
        return subscriber;
    }
    
    //Omit
}

Take another look at the FixedContentSubscriber, one of the subscribers, which is located in Http1Request. We focus on its onNext(ByteBuffer item) method:

		//This method will be called immediately after the subscription is successful
		@Override
        public void onNext(ByteBuffer item) {
            if (debug.on()) debug.log("onNext");
            Objects.requireNonNull(item);
            if (complete) {
                Throwable t = new IllegalStateException("subscription already completed");
                http1Exchange.appendToOutgoing(t);
            } else {
                long writing = item.remaining();
                long written = (contentWritten += writing);

                if (written > contentLength) {
                    cancelSubscription();
                    String msg = connection.getConnectionFlow()
                                  + " [" + Thread.currentThread().getName() +"] "
                                  + "Too many bytes in request body. Expected: "
                                  + contentLength + ", got: " + written;
                    http1Exchange.appendToOutgoing(new IOException(msg));
                } else {
                    //Put the request body into the buffer queue mentioned above and give it to the next responsive flow,
                    //Wait to be washed into the socket channel by the next stream
                    http1Exchange.appendToOutgoing(List.of(item));
                }
            }
        }

Here, the responsive process of sending the request body is also clear at a glance.

5.5 sending summary

The sending process of the request header and the request body is similar, but there are some differences. With unencrypted http1 1 connection as an example, their precondition is that a two-way subscription relationship is established between the publisher requesting content and SocketTube.

The sending process of the request header is as follows:

  • Filter and build request headers from user requests
  • Put the request header into the buffer queue
  • After the two-way subscription relationship is established, the write subscriber in SocketTube requests the request header data from the Http1Publisher publisher
  • The HTTP publisher takes the request header from the buffer queue and gives it to the write subscriber in SocketTube
  • The write subscriber writes the request header data to the socket channel

The sending process of the request body is as follows:

  • Instantiate different request body publishers according to the request body passed in by the caller
  • After the request is completed, the fixed length or streaming request body subscriber is instantiated according to the request body
  • The requester accepts the subscription of the requester subscriber, and the requester subscriber requests the request header information
  • The requestor subscriber puts the requestor data into the buffer queue and notifies Http1Publisher to start the publish subscribe task
  • The HTTP publisher takes the request body from the buffer queue and gives it to the write subscriber in SocketTube
  • The write subscriber writes the request body data to the socket channel

The main difference between the two is that the writing process of the request body involves two "publish subscribe" processes, while the request header only involves one.

6. Creation of response and resolution of response header

Because a user request may generate multiple requests - response exchange due to redirection and other reasons, HttpClient always parses only the request header when each response is received, and only parses the response body when it is determined that the request is the final request.

There is no doubt that the parsing process of response header (and response body) is a "publish subscribe" process of a round of responsive flow. The publisher is InternalReadPublisher in SocketTube, and the subscriber is Http1TubeSubscriber in Http1AsyncReceiver. However, Http1TubeScriber that receives the response content will put the response content into a queue and wait for subsequent consumers to parse and process it. At this time, it will be the stage for a series of components such as Reader and Parser.

Compared with the request, the receiving and parsing of the response is more complex and messy. We first introduce the functions of several classes related to response parsing, and then track the process of response header parsing.

6.1 introduction to publishers and subscribers

The following figure is the class diagram of Http1Response and Http1AsyncReceiver, as well as the function introduction of several components related to response header and response body resolution:

Class name Parent class (Interface) External class role function
Http1TubeSunscriber TubeSubscriber(Flow.Sunscriber) Http1AsyncReceiver Direct recipient of response data Receive the response data received and published by SocketTube
Http1AsyncDelegate - Http1AsyncReceiver Abstract receiving interface Receive or process upstream Http response data
HttpAsyncDelegateSubscription Abstraction(Flow.Subscription) Http1AsyncReceiver Proxy subscription relationship -
HttpAsyncReceiver - - Response data receiving auxiliary class Assist in receiving response data, and
Http1Response - - Http1 response object Process an http1 1 response object
HeadersReader Http1AsyncDelegate Http1Response Response header reader Participate in reading the response header in ByteBuffer format
BodyReader Http1AsyncDelegate Http1Response Responder reader Participate in the reading of response body in ByteBuffer format
Http1HeaderParser - - Http1 response header parser Parsing Http1 response header text information
BodyParser Consumer interface ResponseContent Response body resolution interface Parsing Http1 response body
BodyHandler - HttpResponse Responder result assembler Assemble the parsed Http1 responder

6.2 response header analysis

We first focus on the parsing process of the response header. In short, the parsing link of the response header is as follows:

  1. Before sending the request data, the HeadersReader subscribes to the HttpAsyncReceiver, prepares to receive the data, and informs the demand

  2. InternalReadPublisher in SocketTube reads response data from socket channel

  3. InternalReadPublisher in SocketTube gives the response data to Http1TubeSubscriber in the form of publish subscribe (responsive flow)

  4. After Http1TubeSubscriber receives the data, it sends the response header data to HeadersReader for processing

  5. HeadersReader calls Http1HeaderParser to complete the parsing of the response header

  6. Switch to the state of reading the Response body, and assemble the Response object to return the Response

We will follow up one by one. The parsing of response headers can be traced back to the instantiation of Http1Response in Http1Exchange.

Firstly, in HttpClient, the Response class and Http1Response class do not inherit:

  • The Response class represents the Response header and status code
  • The Http1Response class represents http1 1 Response, including Response header and Response body, in which a member variable of Response is combined

Let's first look at the instantiation process of Http1Response response. As mentioned earlier, this process is located in the sendHeadersAsync send request header method of Http1Exchange.

@Override
    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
        // create the response before sending the request headers, so that
        // the response can set the appropriate receivers.
        if (debug.on()) debug.log("Sending headers only");
        // If the first attempt to read something triggers EOF, or
        // IOException("channel reset by peer"), we're going to retry.
        // Instruct the asyncReceiver to throw ConnectionExpiredException
        // to force a retry.
        asyncReceiver.setRetryOnError(true);
        //An Http1Response is created in advance
        if (response == null) {
            response = new Http1Response<>(connection, this, asyncReceiver);
        }

        if (debug.on()) debug.log("response created in advance");
       
        //Omit
    }

We then analyze the construction process of Http1Response. This process is very important: the subscription of the response header reader to the asynchronous receiver of the Http1 response body, and the preparation of the response header reading and parsing process are hidden in the constructor of the Http1Response.

class Http1Response<T> {

    private volatile ResponseContent content;
    private final HttpRequestImpl request;
    private Response response;
    private final HttpConnection connection;
    private HttpHeaders headers;
    private int responseCode;
    //Here, a reference to Http1Exchange is maintained. In HttpClient, correlation is common
    private final Http1Exchange<T> exchange;
    private boolean return2Cache; // return connection to cache when finished
    //Response header reader
    private final HeadersReader headersReader; // used to read the headers
    //Responder reader
    private final BodyReader bodyReader; // used to read the body
	//Direct recipient of Http response content (from socket pipe)
    private final Http1AsyncReceiver asyncReceiver;
    private volatile EOFException eof;
    //Responder resolver
    private volatile BodyParser bodyParser;
    // max number of bytes of (fixed length) body to ignore on redirect
    private final static int MAX_IGNORE = 1024;

    // Revisit: can we get rid of this?
    static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
    private volatile State readProgress = State.INITIAL;

    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    final static AtomicLong responseCount = new AtomicLong();
    final long id = responseCount.incrementAndGet();
    //Response header resolver
    private Http1HeaderParser hd;

    Http1Response(HttpConnection conn,
                  Http1Exchange<T> exchange,
                  Http1AsyncReceiver asyncReceiver) {
        this.readProgress = State.INITIAL;
        this.request = exchange.request();
        this.exchange = exchange;
        this.connection = conn;
        this.asyncReceiver = asyncReceiver;
        //The reader instantiating the response header and response body
        //The advance callback method is used to upgrade the read state when the response header or response body is read
        headersReader = new HeadersReader(this::advance);
        bodyReader = new BodyReader(this::advance);

        hd = new Http1HeaderParser();
        readProgress = State.READING_HEADERS;
        //Let the response header reader maintain the reference of the resolver
        headersReader.start(hd);
        //Make the response header reader subscribe to the asynchronous receiver of Http1 response content
        asyncReceiver.subscribe(headersReader);
    }
}

Since the subscription process is similar to the previous one, you will not follow it here. We just need to know that the headersReader is ready to read the response header data at this time.

Next, the InternalReadPublisher in SocketTube reads the response data from the socket channel. Note that this is a programmatically asynchronous and I/O synchronous non blocking process: after sending the request body, when the selection key corresponding to the socket channel is readable, the selector management thread will distribute and execute the read event, and finally call the read() method in InternalReadSubscription in InternalReadPublisher in SocketTube.

So, when is the read event registered? In fact, the registration of read time occurs in the connectFlows() method mentioned earlier. When the two-way read-write relationship is established and the Http1TubeSubscriber in Http1AsyncReceiver subscribes to InternalReadPublisher in SocketTube, the onSubscribe method of Http1TubeSubscriber finally calls subscription after multiple twists and turns Request (1) method. In the corresponding subscription InternalReadSubscription, a read event is registered to the to-do list of SelectorManager, that is, to the socket channel.

Let's briefly focus on the request method and read method of the internal class InternalReadPublisher of the internal class InternalReadSubscription of SocketTube:

        final class InternalReadSubscription implements Flow.Subscription {

            private final Demand demand = new Demand();
            final SequentialScheduler readScheduler;
            private volatile boolean completed;
            private final ReadEvent readEvent;
            private final AsyncEvent subscribeEvent;
            
            @Override
            public final void request(long n) {
                if (n > 0L) {
                    boolean wasFulfilled = demand.increase(n);
                    if (wasFulfilled) {
                        if (debug.on()) debug.log("got some demand for reading");
                        //This method registers a read event to the channel
                        resumeReadEvent();
                        // if demand has been changed from fulfilled
                        // to unfulfilled register read event;
                    }
                } else {
                    signalError(new IllegalArgumentException("non-positive request"));
                }
                debugState("leaving request("+n+"): ");
            }

/** The body of the task that runs in SequentialScheduler. */
            final void read() {
                // It is important to only call pauseReadEvent() when stopping
                // the scheduler. The event is automatically paused before
                // firing, and trying to pause it again could cause a race
                // condition between this loop, which calls tryDecrementDemand(),
                // and the thread that calls request(n), which will try to resume
                // reading.
                try {
                    while(!readScheduler.isStopped()) {
                        if (completed) return;

                        // make sure we have a subscriber
                        if (handlePending()) {
                            if (debug.on())
                                debug.log("pending subscriber subscribed");
                            return;
                        }

                        // If an error was signaled, we might not be in the
                        // the selector thread, and that is OK, because we
                        // will just call onError and return.
                        ReadSubscription current = subscription;
                        Throwable error = errorRef.get();
                        if (current == null)  {
                            assert error != null;
                            if (debug.on())
                                debug.log("error raised before subscriber subscribed: %s",
                                          (Object)error);
                            return;
                        }
                        TubeSubscriber subscriber = current.subscriber;
                        if (error != null) {
                            completed = true;
                            // safe to pause here because we're finished anyway.
                            pauseReadEvent();
                            if (debug.on())
                                debug.log("Sending error " + error
                                          + " to subscriber " + subscriber);
                            if (Log.channel()) {
                                Log.logChannel("Raising error with subscriber for {0}: {1}",
                                        channelDescr(), error);
                            }
                            current.errorRef.compareAndSet(null, error);
                            current.signalCompletion();
                            if (debug.on()) debug.log("Stopping read scheduler");
                            readScheduler.stop();
                            debugState("leaving read() loop with error: ");
                            return;
                        }

                        // If we reach here then we must be in the selector thread.
                        assert client.isSelectorThread();
                        if (demand.tryDecrement()) {
                            // we have demand.
                            try {
                                //In this step, read the readable response data from the socket channel
                                List<ByteBuffer> bytes = readAvailable(current.bufferSource);
                                if (bytes == EOF) {
                                    //After receiving eof, it indicates that the channel has been read. In the readAvailable method, the result of the corresponding read() is - 1
                                    if (!completed) {
                                        if (debug.on()) debug.log("got read EOF");
                                        if (Log.channel()) {
                                            Log.logChannel("EOF read from channel: {0}",
                                                        channelDescr());
                                        }
                                        completed = true;
                                        // safe to pause here because we're finished
                                        // anyway.
                                        //Stop the read event and mark the read complete
                                        pauseReadEvent();
                                        current.signalCompletion();
                                        if (debug.on()) debug.log("Stopping read scheduler");
                                        readScheduler.stop();
                                    }
                                    debugState("leaving read() loop after EOF: ");
                                    return;
                                } else if (Utils.remaining(bytes) > 0) {
                                    // the subscriber is responsible for offloading
                                    // to another thread if needed.
                                    if (debug.on())
                                        debug.log("read bytes: " + Utils.remaining(bytes));
                                    assert !current.completed;
                                    //Give the received data to Http1TubeSubscriber
                                    subscriber.onNext(bytes);
                                    // we could continue looping until the demand
                                    // reaches 0. However, that would risk starving
                                    // other connections (bound to other socket
                                    // channels) - as other selected keys activated
                                    // by the selector manager thread might be
                                    // waiting for this event to terminate.
                                    // So resume the read event and return now...
                                    //Here, according to the notes, instead of letting the loop read directly, a read event is registered for redistribution,
                                    //For the sake of "fairness" of other channels
                                    resumeReadEvent();
                                    if (errorRef.get() != null) continue;
                                    debugState("leaving read() loop after onNext: ");
                                    return;
                                } else {
                                    // nothing available!
                                    if (debug.on()) debug.log("no more bytes available");
                                    // re-increment the demand and resume the read
                                    // event. This ensures that this loop is
                                    // executed again when the socket becomes
                                    // readable again.
                                    //What if you don't read it? Description: the channel is unreadable. Register a new read event
                                    demand.increase(1);
                                    resumeReadEvent();
                                    if (errorRef.get() != null) continue;
                                    debugState("leaving read() loop with no bytes");
                                    return;
                                }
                            } catch (Throwable x) {
                                signalError(x);
                                continue;
                            }
                        } else {
                            if (debug.on()) debug.log("no more demand for reading");
                            // the event is paused just after firing, so it should
                            // still be paused here, unless the demand was just
                            // incremented from 0 to n, in which case, the
                            // event will be resumed, causing this loop to be
                            // invoked again when the socket becomes readable:
                            // This is what we want.
                            // Trying to pause the event here would actually
                            // introduce a race condition between this loop and
                            // request(n).
                            if (errorRef.get() != null) continue;
                            debugState("leaving read() loop with no demand");
                            break;
                        }
                    }
                } catch (Throwable t) {
                    if (debug.on()) debug.log("Unexpected exception in read loop", t);
                    signalError(t);
                } finally {
                    if (readScheduler.isStopped()) {
                        if (debug.on()) debug.log("Read scheduler stopped");
                        if (Log.channel()) {
                            Log.logChannel("Stopped reading from channel {0}", channelDescr());
                        }
                    }
                   
                    handlePending();
                }
            }
        }

We see that in the read() method, subscriber. is called. Onnext (bytes) method. We focus on how Http1TubeSubscriber, as a subscriber, handles the received response data.

After tracing all the way, we will find that after receiving the response data, the pipeline subscriber Http1TubeSubscriber puts it into a queue maintained by the external object, and then notifies the external class Http1AsyncReceiver to read and deliver the data to the headersReader:

//Internal class of Http1AsyncReceiver
final class Http1TubeSubscriber implements TubeSubscriber {
        volatile Flow.Subscription subscription;
        volatile boolean completed;
        volatile boolean dropped;
     @Override
        public void onNext(List<ByteBuffer> item) {
            canRequestMore.set(item.isEmpty());
            for (ByteBuffer buffer : item) {
                //The asyncReceive method of the external class http1aasyncreceiver was called
                //We will analyze
                asyncReceive(buffer);
            }
        }
}
/**
 * A helper class that will queue up incoming data until the receiving
 * side is ready to handle it.
 */
class Http1AsyncReceiver {
    //A queue that holds received response data
    private final ConcurrentLinkedDeque<ByteBuffer> queue
            = new ConcurrentLinkedDeque<>();
    //For a scheduler running to fetch response data, we focus on the flush method
    private final SequentialScheduler scheduler =
            SequentialScheduler.lockingScheduler(this::flush);
    final MinimalFuture<Void> whenFinished;
    private final Executor executor;
    //Maintains a reference to an instance of an internal pipeline subscriber
    private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
    //... omitted
    
    //This method is the external method called by the internal member subscriber, which we analyze
    // Callback: Consumer of ByteBuffer
    private void asyncReceive(ByteBuffer buf) {
        if (debug.on())
            debug.log("Putting %s bytes into the queue", buf.remaining());
        received.addAndGet(buf.remaining());
        //Put the response data into the buffer queue
        queue.offer(buf);
        //Scheduling the flush method, fetching data from the queue and subsequent consumption
        //We will analyze it in depth
        //Note: This is a read event directly distributed by the SelectorManager thread,
        //To prevent blocking the selector management thread, the task is handed over to other threads
        // This callback is called from within the selector thread.
        // Use an executor here to avoid doing the heavy lifting in the
        // selector.
        scheduler.runOrSchedule(executor);
    }
    
    //Focus on the methods to be analyzed for data reading and consumption from the queue
    private void flush() {
        ByteBuffer buf;
        try {
            // we should not be running in the selector here,
            // except if the custom Executor supplied to the client is
            // something like (r) -> r.run();
           assert !client.isSelectorThread()
                   || !(client.theExecutor().delegate() instanceof ExecutorService) :
                    "Http1AsyncReceiver::flush should not run in the selector: "
                    + Thread.currentThread().getName();

            //The handle pending method calls the onsubscribe method of the headersReader
            // First check whether we have a pending delegate that has
            // just subscribed, and if so, create a Subscription for it
            // and call onSubscribe.
            handlePendingDelegate();

            //Get response data from queue
            // Then start emptying the queue, if possible.
            while ((buf = queue.peek()) != null && !stopRequested) {
                Http1AsyncDelegate delegate = this.delegate;
                if (debug.on())
                    debug.log("Got %s bytes for delegate %s",
                              buf.remaining(), delegate);
                if (!hasDemand(delegate)) {
                    // The scheduler will be invoked again later when the demand
                    // becomes positive.
                    return;
                }

                assert delegate != null;
                if (debug.on())
                    debug.log("Forwarding %s bytes to delegate %s",
                              buf.remaining(), delegate);
                // The delegate has demand: feed it the next buffer.
                //Note that this step is to give the data taken from the queue to the headersReader, that is, the request header reader
                //Previously, in the constructor of Http1Response, headers subscribed to AsyncReceiver,
                //delegate points to the headersReader. We will enter the analysis of the method.
                //The method returns false, which means that the current data is received and parsed, that is, the response header is parsed, and the flush operation will stop
                //In this way, the correctness of the reception of the switching response body can be guaranteed
                if (!delegate.tryAsyncReceive(buf)) {
                    final long remaining = buf.remaining();
                    if (debug.on()) debug.log(() -> {
                        // If the scheduler is stopped, the queue may already
                        // be empty and the reference may already be released.
                        String remstr = scheduler.isStopped() ? "" :
                                " remaining in ref: "
                                + remaining;
                        remstr +=  remstr
                                + " total remaining: " + remaining();
                        return "Delegate done: " + remaining;
                    });
                    canRequestMore.set(false);
                    // The last buffer parsed may have remaining unparsed bytes.
                    // Don't take it out of the queue.
                    return; // done.
                }

                // removed parsed buffer from queue, and continue with next
                // if available
                ByteBuffer parsed = queue.remove();
                canRequestMore.set(queue.isEmpty() && !stopRequested);
                assert parsed == buf;
            }
            //After the queue is cleared, request more response data. This step will trigger the registration of read events
            //Since the flush method will also be called in onSubscribe(), the read process is actually triggered
            // queue is empty: let's see if we should request more
            checkRequestMore();

        } catch (Throwable t) {
            Throwable x = error;
            if (x == null) error = t; // will be handled in the finally block
            if (debug.on()) debug.log("Unexpected error caught in flush()", t);
        } finally {
            // Handles any pending error.
            // The most recently subscribed delegate will get the error.
            checkForErrors();
        }
    }
    
    

We can see that the Http1TubeSubscriber, as the direct recipient of the Http response content, facilitates the completion of the headersReader's subscription to the Http1AsyncReceiver in the onNext(bytes) method, and then gives the response data (first the response header, then the response body) to the headersReader.

We trace the behavior of headersReader, and finally call the handle method of headersReader. This method calls the parse method of parser to parse the response header.

		@Override
        final void handle(ByteBuffer b,
                          Http1HeaderParser parser,
                          CompletableFuture<State> cf) {
            assert cf != null : "parsing not started";
            assert parser != null : "no parser";
            try {
                count += b.remaining();
                if (debug.on())
                    debug.log("Sending " + b.remaining() + "/" + b.capacity()
                              + " bytes to header parser");
                //The parse method of the response header parser is called to parse the response header
                //It circulates internally for many times, uses the state state to record the resolution state, and returns true when the resolution is finally completed
                if (parser.parse(b)) {
                    count -= b.remaining();
                    if (debug.on())
                        debug.log("Parsing headers completed. bytes=" + count);
                    //After parsing the response header, upgrade the state, that is, let the Http1Response enter the state of reading the response body
                    onComplete.accept(State.READING_HEADERS);
                    //Mark placeholder as complete request header read
                    cf.complete(State.READING_HEADERS);
                }
            } catch (Throwable t) {
                if (debug.on())
                    debug.log("Header parser failed to handle buffer: " + t);
                cf.completeExceptionally(t);
            }
        }

In the above code, we can see that the parse of the response header is completed only by calling the parse method of the parser once. But the response data is not all at once? In fact, Http1HeaderParser internally uses a status bit state to support the parse method to be called many times. When the final parsing is completed, it returns true, otherwise it returns false:

    /**
     * Parses HTTP/1.X status-line and headers from the given bytes. Must be
     * called successive times, with additional data, until returns true.
     *
     * All given ByteBuffers will be consumed, until ( possibly ) the last one
     * ( when true is returned ), which may not be fully consumed.
     *
     * @param input the ( partial ) header data
     * @return true iff the end of the headers block has been reached
     */
    boolean parse(ByteBuffer input) throws ProtocolException {
        requireNonNull(input, "null input");

        while (canContinueParsing(input)) {
            //Here, a state machine with an initial value of initial is maintained through the state state. Call parse once, and the state state will be updated in the while loop according to the read content,
            //State stops in a certain state until the readable content in input is read. After multiple calls, the state will continue to change until the reading is completed and the method returns true
            switch (state) {
                case INITIAL                                    ->  state = State.STATUS_LINE;
                case STATUS_LINE                                ->  readResumeStatusLine(input);
                case STATUS_LINE_FOUND_CR, STATUS_LINE_FOUND_LF ->  readStatusLineFeed(input);
                case STATUS_LINE_END                            ->  maybeStartHeaders(input);
                case STATUS_LINE_END_CR, STATUS_LINE_END_LF     ->  maybeEndHeaders(input);
                case HEADER                                     ->  readResumeHeader(input);
                case HEADER_FOUND_CR, HEADER_FOUND_LF           ->  resumeOrLF(input);
                case HEADER_FOUND_CR_LF                         ->  resumeOrSecondCR(input);
                case HEADER_FOUND_CR_LF_CR                      ->  resumeOrEndHeaders(input);

                default -> throw new InternalError("Unexpected state: " + state);
            }
        }
        //Returns true only on completion
        return state == State.FINISHED;
    }

Above, we have completed most of the process of reading and parsing the response header. The final process is to assemble a Reponse object. This step occurs in the Exchange class after sending the request body. The relevant methods are as follows:

	//The sendReqeustBody method of the Exchange class sends the request body, then calls the concrete implementation class to receive and parse the response header method.
// After sending the request headers, if no ProxyAuthorizationRequired
    // was raised and the expectContinue flag is off, we can immediately
    // send the request body and proceed.
    private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
        assert !request.expectContinue();
        if (debug.on()) debug.log("sendRequestBody");
        CompletableFuture<Response> cf = ex.sendBodyAsync()
            	//We track entry into the method
                .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
        //Protocol upgrade and log, protocol upgrade for http1 exchange of type 1 is not applicable
        cf = wrapForUpgrade(cf);
        cf = wrapForLog(cf);
        return cf;
    }

If we enter the getResponseAsync method of the implementation class Http1Exchange of ExchangeImpl, we will find that its main work is to call the readHeadersAsync method of Http1Response. So, we go directly to the readHeadersAsync method of Http1Response.

This method defines the operations to be performed after parsing the Http response header:

//readHeadersAsync method of Http1Response
public CompletableFuture<Response> readHeadersAsync(Executor executor) {
        if (debug.on())
            debug.log("Reading Headers: (remaining: "
                      + asyncReceiver.remaining() +") "  + readProgress);

    //First analysis
        if (firstTimeAround) {
            if (debug.on()) debug.log("First time around");
            firstTimeAround = false;
        } else {
            // with expect continue we will resume reading headers + body.
            asyncReceiver.unsubscribe(bodyReader);
            bodyReader.reset();

            hd = new Http1HeaderParser();
            readProgress = State.READING_HEADERS;
            headersReader.reset();
            headersReader.start(hd);
            asyncReceiver.subscribe(headersReader);
        }
   	
    	//What is this step? It returns a placeholder for the response header read. We'll break it down later
        CompletableFuture<State> cf = headersReader.completion();
        assert cf != null : "parsing not started";
        if (debug.on()) {
            debug.log("headersReader is %s",
                    cf == null ? "not yet started"
                            : cf.isDone() ? "already completed"
                            : "not yet completed");
        }

  		//Define the operation after parsing the Response header: return a Response object
        Function<State, Response> lambda = (State completed) -> {
                assert completed == State.READING_HEADERS;
                if (debug.on())
                    debug.log("Reading Headers: creating Response object;"
                              + " state is now " + readProgress);
            	//Unsubscribe the response header reader from the Http response receiver
                asyncReceiver.unsubscribe(headersReader);
                responseCode = hd.responseCode();
                headers = hd.headers();

                response = new Response(request,
                                        exchange.getExchange(),
                                        headers,
                                        connection,
                                        responseCode,
                                        HTTP_1_1);

                if (Log.headers()) {
                    StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
                    Log.dumpHeaders(sb, "    ", headers);
                    Log.logHeaders(sb.toString());
                }

                return response;
            };

    	//Perform this operation after parsing the response header
        if (executor != null) {
            return cf.thenApplyAsync(lambda, executor);
        } else {
            return cf.thenApply(lambda);
        }
    }

Thus, the receiving and parsing process of the response header has been completed. Asynchronous programming and non blocking I/O programming mode are adopted, which is also one of the main features of this section.

7. Analysis of responders

As seen in the previous section, when parsing the response header is completed, the status of Http1Response will be upgraded to READING_BODY, however, this does not mean that the response of this response will be parsed and returned immediately. As shown in the previous article, a user request may lead to multiple request response exchanges. The final response will be parsed only when HttpClient applies the response header filter and finds that no new request is generated.

We will see that the parsing of the response body will introduce a new responsive publish subscribe process.

Let's review the calling method when we initiate the request: the second parameter passed in send() or sendAsync() is to call httpresponse An object generated by the bodyhandlers:: ofstring method. The function of this object, in other words, is to construct and generate a real response subscriber.

client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) 

Let's start with the uml class diagram: we can see that BodyHandler is a functional interface inside the HttpResponse interface. It has only one method named apply, which is used to generate the real subscriber of the response body: the specific implementation of BodySubscriber. The two tool classes HttpHandlers and HttpSubscribers in HttpResponse provide convenient static methods for generating specific implementations of BodyHandler and BodySubscriber respectively.

Let's trace what happens when we call BodyHandlers::ofString:

      
        public static BodyHandler<String> ofString() {
            return (responseInfo) -> BodySubscribers.ofString(charsetFrom(responseInfo.headers()));
        }

You can see that if the BodyHandler is called, the character set will be obtained from the response header. We then trace the BodySubscribers::ofString method.

public static BodySubscriber<String> ofString(Charset charset) {
            Objects.requireNonNull(charset);
            return new ResponseSubscribers.ByteArraySubscriber<>(
                    bytes -> new String(bytes, charset)
            );
        }

You can see that if the BodyHandler is called, a specific implementation ByteArraySubscriber of the BodyHandler will be generated. It is a subscriber in the standard flow API, which maintains a buffer queue for receiving response body data and a function for collecting response data: finisher. After the response data is received in the onNext method, it is placed in the buffer list received. When the reception is completed, the onComplete method will use finisher to collect the response data and become the final result to the user. In the BodySubscribers::ofString method above, finisher simply generates a string from the received byte array.

//Specific implementation of BodyHandler
public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
        private final Function<byte[], T> finisher;
        private final CompletableFuture<T> result = new MinimalFuture<>();
    	//Responder receiving list
        private final List<ByteBuffer> received = new ArrayList<>();

        private volatile Flow.Subscription subscription;

        public ByteArraySubscriber(Function<byte[],T> finisher) {
            this.finisher = finisher;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (this.subscription != null) {
                subscription.cancel();
                return;
            }
            this.subscription = subscription;
            //When accepting a subscription, the request receives the maximum length of response body data
            // We can handle whatever you've got
            subscription.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(List<ByteBuffer> items) {
            // incoming buffers are allocated by http client internally,
            // and won't be used anywhere except this place.
            // So it's free simply to store them for further processing.
            assert Utils.hasRemaining(items);
            //When the response data is received, it is saved in the buffer list
            received.addAll(items);
        }

        @Override
        public void onError(Throwable throwable) {
            received.clear();
            result.completeExceptionally(throwable);
        }

        static private byte[] join(List<ByteBuffer> bytes) {
            int size = Utils.remaining(bytes, Integer.MAX_VALUE);
            byte[] res = new byte[size];
            int from = 0;
            for (ByteBuffer b : bytes) {
                int l = b.remaining();
                b.get(res, from, l);
                from += l;
            }
            return res;
        }

        @Override
        public void onComplete() {
            try {
                //When receiving, first convert the byteBuffer data into a byte array, and then collect it
                result.complete(finisher.apply(join(received)));
                received.clear();
            } catch (IllegalArgumentException e) {
                result.completeExceptionally(e);
            }
        }

        @Override
        public CompletionStage<T> getBody() {
            return result;
        }
    }

The response subscriber is analyzed. So, what is the publisher? What is the process of response body processing?

Let's go back to the method after HttpClient handles multiple responses: multiexchange responseAsync0(CompletableFuture start)

private CompletableFuture<HttpResponse<T>>
    responseAsync0(CompletableFuture<Void> start) {
    	//All the contents of the previous analysis include the management of multiple exchanges, the sending and response receiving of a single request, and the analysis of response headers
    	//All occur in this responseAsyncImpl method
        return start.thenCompose( v -> responseAsyncImpl())
                    .thenCompose((Response r) -> {
                        //Get the current final exchange
                        Exchange<T> exch = getExchange();
                        //Check 204 status code and clear pants without response body
                        if (bodyNotPermitted(r)) {
                            if (bodyIsPresent(r)) {
                                IOException ioe = new IOException(
                                    "unexpected content length header with 204 response");
                                exch.cancel(ioe);
                                return MinimalFuture.failedFuture(ioe);
                            } else
                                //Handle cases where there are no responders
                                return handleNoBody(r, exch);
                        }
                        //We have important analytical methods
                        return exch.readBodyAsync(responseHandler)
                            .thenApply((T body) -> {
                                //After parsing the response body, return a response object to be finally given to the caller
                                this.response =
                                    new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
                                return this.response;
                            });
                    }).exceptionallyCompose(this::whenCancelled);
    }

We can see that this procedure calls the readBodyAsync(responseHandler) method of the Exchange class.

We continued to track and found that the responseAsync method of http1Exchange was called directly, and we followed.

@Override
    CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
                                       boolean returnConnectionToPool,
                                       Executor executor)
    {
        //Apply the passed in BodyHandler to generate a specific bodySubscriber
        BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
                                                                  response.responseHeaders(),
                                                                  HTTP_1_1));
        //Read the response, we will enter later
        CompletableFuture<T> bodyCF = response.readBody(bs,
                                                        returnConnectionToPool,
                                                        executor);
        return bodyCF;
    }

Http1Response. The readbody method is the focus of our analysis, and we follow into:

//Http1Response.readBody method
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
                                         boolean return2Cache,
                                         Executor executor) {
        if (debug.on()) {
            debug.log("readBody: return2Cache: " + return2Cache);
            if (request.isWebSocket() && return2Cache && connection != null) {
                debug.log("websocket connection will be returned to cache: "
                        + connection.getClass() + "/" + connection );
            }
        }
        assert !return2Cache || !request.isWebSocket();
        this.return2Cache = return2Cache;
    	//Here, the bodySubscriber generated by the BodyHandler we passed in is packaged into an Http1BodySubscriber object
    	//The purpose is to prevent the onError method from being called multiple times when an error occurs
    	//What this subscriber does is basically the role of proxy forwarding. It will hand over the main functions to our BodySubscriber
        final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);

        final CompletableFuture<U> cf = new MinimalFuture<>();

    	//Determine the content length, which may be fixed length / - 1 (blocking) or - 2 (unknown)
        long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);
        final long clen = fixupContentLen(clen0);

    	//Unsubscribe the response body reader from the asynchronous receiver of Http1 response (in fact, it has been unsubscribed before)
        // expect-continue reads headers and body twice.
        // if we reach here, we must reset the headersReader state.
        asyncReceiver.unsubscribe(headersReader);
        headersReader.reset();
        ClientRefCountTracker refCountTracker = new ClientRefCountTracker();

        // We need to keep hold on the client facade until the
        // tracker has been incremented.
        connection.client().reference();
        executor.execute(() -> {
            try {
                //Generate Http1 response body object
                content = new ResponseContent(
                        connection, clen, headers, subscriber,
                        this::onFinished
                );
                if (cf.isCompletedExceptionally()) {
                    // if an error occurs during subscription
                    connection.close();
                    return;
                }
                // increment the reference count on the HttpClientImpl
                // to prevent the SelectorManager thread from exiting until
                // the body is fully read.
                refCountTracker.acquire();
                //The response body resolver is instantiated, the input parameter is the callback at completion, and the future placeholder of bodyReader will be set to read completion
                //The BodyParser of that type (fixed length, block, and indefinite length) will be instantiated according to the previous clen0
                bodyParser = content.getBodyParser(
                    (t) -> {
                        try {
                            if (t != null) {
                                try {
                                    subscriber.onError(t);
                                } finally {
                                    cf.completeExceptionally(t);
                                }
                            }
                        } finally {
                            bodyReader.onComplete(t);
                            if (t != null) {
                                connection.close();
                            }
                        }
                    });
                bodyReader.start(bodyParser);
                CompletableFuture<State> bodyReaderCF = bodyReader.completion();
                //Set the response body reader to subscribe to the asynchronous receiver of Http1 response
                //The flush method and the internal handlePendingDelegate method will be called,
                //Will cause the subscription to complete
                asyncReceiver.subscribe(bodyReader);
                assert bodyReaderCF != null : "parsing not started";
                // Make sure to keep a reference to asyncReceiver from
                // within this
                CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
                    t = Utils.getCompletionCause(t);
                    try {
                        if (t == null) {
                            if (debug.on()) debug.log("Finished reading body: " + s);
                            assert s == State.READING_BODY;
                        }
                        if (t != null) {
                            subscriber.onError(t);
                            cf.completeExceptionally(t);
                        }
                    } catch (Throwable x) {
                        // not supposed to happen
                        asyncReceiver.onReadError(x);
                    } finally {
                        // we're done: release the ref count for
                        // the current operation.
                        refCountTracker.tryRelease();
                    }
                });
                connection.addTrailingOperation(trailingOp);
            } catch (Throwable t) {
               if (debug.on()) debug.log("Failed reading body: " + t);
                try {
                    subscriber.onError(t);
                    cf.completeExceptionally(t);
                } finally {
                    asyncReceiver.onReadError(t);
                }
            } finally {
                connection.client().unreference();
            }
        });

    	//Get the final response body. For ByteArraySubscriber, it's just a result
        ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> {
            cf.completeExceptionally(t);
            asyncReceiver.setRetryOnError(false);
            asyncReceiver.onReadError(t);
        });

        return cf.whenComplete((s,t) -> {
            if (t != null) {
                // If an exception occurred, release the
                // ref count for the current operation, as
                // it may never be triggered otherwise
                // (BodySubscriber ofInputStream)
                // If there was no exception then the
                // ref count will be/have been released when
                // the last byte of the response is/was received
                refCountTracker.tryRelease();
            }
        });
    }

We can see that the readBody method of Http1Response sets the subscription of the response body reader bodyreader to Http1AsyncReceiver. After that, the data received by Http1AsyncReceiver will be continuously handed over to bodyreader for processing. This process takes place in the flush method mentioned in the previous section. After all the response data has been parsed, the final result will be returned.

Let's review the flush method of Http1AsyncReceiver mentioned in the previous section, and its delegate The tryasyncreceive method requires us to focus on the execution of the BodyReader.

//We only intercepted a short section, 	 This is also the tryAsyncReceive method
//Since the bodyReader subscription Http1AsyncReceiver is set, the delegate here is the bodyReader
//if condition holds, the response body is received and parsed
if (!delegate.tryAsyncReceive(buf)) {
        final long remaining = buf.remaining();
        if (debug.on()) debug.log(() -> {
            // If the scheduler is stopped, the queue may already
            // be empty and the reference may already be released.
            String remstr = scheduler.isStopped() ? "" :
            " remaining in ref: "
                + remaining;
            remstr +=  remstr
                + " total remaining: " + remaining();
            return "Delegate done: " + remaining;
        });
        canRequestMore.set(false);
        // The last buffer parsed may have remaining unparsed bytes.
        // Don't take it out of the queue.
        return; // done.
    }

After tracking all the way, we will find that the final method called is bodyreader Handle override method:

	@Override
        final void handle(ByteBuffer b,
                          BodyParser parser,
                          CompletableFuture<State> cf) {
            assert cf != null : "parsing not started";
            assert parser != null : "no parser";
            try {
                if (debug.on())
                    debug.log("Sending " + b.remaining() + "/" + b.capacity()
                              + " bytes to body parser");
                //Finally, the parse method of BodyParser is called, which is similar to the previous parsing of the response header
                //We tracked in
                parser.accept(b);
            } catch (Throwable t) {
                if (debug.on())
                    debug.log("Body parser failed to handle buffer: " + t);
                if (!cf.isDone()) {
                    cf.completeExceptionally(t);
                }
            }
        }

We follow parser Accept (b) to see how the response body parser parses the response body. As mentioned earlier, there are three types of response body parsers:

FixedLengthBodyParser,ChunkedBodyParser,UnknownLengthBodyParser

This time, we select the chunked response body parser ChunkedBodyParser to see the parsing process. In fact, a ChunkState state is used to support the state flow between multiple calls. Each time, the received data is parsed and the parsed results are given to the bodySubscriber.

//The following code is located in the ResponseContent inner class

//Block state enumeration is defined here
static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
//Partitioned response body parser 
class ChunkedBodyParser implements BodyParser {
        final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
        final Consumer<Throwable> onComplete;
        final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
        final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";

        volatile Throwable closedExceptionally;
        volatile int partialChunklen = 0; // partially read chunk len
        volatile int chunklen = -1;  // number of bytes in chunk
        volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
        volatile boolean cr = false;  // tryReadChunkLength has found CR
        volatile int chunkext = 0;    // number of bytes already read in the chunk extension
        volatile int digits = 0;      // number of chunkLength bytes already read
        volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
    //The initialization block status is to read the block length. This variable is used to switch the read state
        volatile ChunkState state = ChunkState.READING_LENGTH; // current state
        volatile AbstractSubscription sub;
        ChunkedBodyParser(Consumer<Throwable> onComplete) {
            this.onComplete = onComplete;
        }
            
        @Override
        public void accept(ByteBuffer b) {
            if (closedExceptionally != null) {
                if (debug.on())
                    debug.log("already closed: " + closedExceptionally);
                return;
            }
            // debugBuffer(b);
            boolean completed = false;
            try {
                List<ByteBuffer> out = new ArrayList<>();
                do {
                    //This method reads and parses a byteBuffer, and determines and switches the block reading state according to the line feed character
                    //When it returns true, only if all chunks are read
                    if (tryPushOneHunk(b, out))  {
                        // We're done! (true if the final chunk was parsed).
                        if (!out.isEmpty()) {
                            // push what we have and complete
                            // only reduce demand if we actually push something.
                            // we would not have come here if there was no
                            // demand.
                            boolean hasDemand = sub.demand().tryDecrement();
                            assert hasDemand;
                            //The last chunk of content is delivered to subscribers, pusher is the packaging mentioned above
                            //Http1BodySubscriber of the BodySubscriber generated by the incoming BodyHandler
                            pusher.onNext(Collections.unmodifiableList(out));
                            if (debug.on()) debug.log("Chunks sent");
                        }
                        if (debug.on()) debug.log("done!");
                        assert closedExceptionally == null;
                        assert state == ChunkState.DONE;
                        //Cleanup of asyncReceiver
                        onFinished.run();
                        //After all block transmissions are completed, the subscriber is notified to collect the results
                        pusher.onComplete();
                        if (debug.on()) debug.log("subscriber completed");
                        completed = true;
                        onComplete.accept(closedExceptionally); // should be null
                        break;
                    }
                    // the buffer may contain several hunks, and therefore
                    // we must loop while it's not exhausted.
                } while (b.hasRemaining());

                if (!completed && !out.isEmpty()) {
                    // push what we have.
                    // only reduce demand if we actually push something.
                    // we would not have come here if there was no
                    // demand.
                    boolean hasDemand = sub.demand().tryDecrement();
                    assert hasDemand;
                    //To deliver the next chunk of content to the subscriber, pusher is the wrapper mentioned above
                    //Http1BodySubscribe of the BodySubscriber generated by the incoming BodyHandler
                    pusher.onNext(Collections.unmodifiableList(out));
                    if (debug.on()) debug.log("Chunk sent");
                }
                assert state == ChunkState.DONE || !b.hasRemaining();
            } catch(Throwable t) {
                if (debug.on())
                    debug.log("Error while processing buffer: %s", (Object)t );
                closedExceptionally = t;
                if (!completed) onComplete.accept(t);
            }
        }
            
            
            
 }

To sum up, the analysis of response body can be divided into the following processes:

  1. SocketTube continuously reads the response data from the socket channel to the Http1TubeSubscriber, which puts the data into a data buffer queue maintained by the external object Http1AsyncReceiver
  2. After the response header reading and parsing is completed, the Http1AsyncReceiver suspends the handling of response data from the buffer queue and delivers it to the downstream subscriber
  3. Instantiate the corresponding subscriber BodySubscriber according to the response body processing method provided by the user, and instantiate the corresponding BodyParser according to the response header information
  4. Http1AsyncReceiver accepts the subscription of bodyReader, restarts the handling of response data, and delivers the response body to bodyReader
  5. The bodyParser receives multiple calls, parses the response body, and gives the parsing result to the downstream bodySubscriber
  6. After the response body parsing is completed, the bodySubscriber assembles the received response body data into the response result content and returns it

The flow of response body data can be described by the following data flow diagram:

8. Summary

After a long journey, we finally saw a single non encrypted http1 1 life course of request response process. Understanding the reactive flow model is the key to understanding this process.

Keywords: Java http

Added by AndrewJ1313 on Wed, 05 Jan 2022 23:04:27 +0200