if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { eventListener.callFailed(this, e); throw e; } finally { client.dispatcher().finished(this); } }
We'll use it here `client` Object (actually created above) `RealCall` Passed in when `OkHttpClient`)of `dispatcher()` Method to get a `Dispatcher` Object and call its `executed()` Method to convert the current `RealCall` Join a double ended queue. Here is `executed(RealCall)` Method definition, here `runningSyncCalls` The type of is `Deque<RealCall>`:
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
Let's go back to the top `execute()` Method, put `RealCall` After joining the double ended queue, we call `getResponseWithInterceptorChain()` Method, the following is the definition of this method.
Response getResponseWithInterceptorChain() throws IOException {
//Add a series of interceptors and pay attention to the order of addition
List interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
//Bridge interceptor
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//Cache Interceptor: get data from cache
interceptors.add(new CacheInterceptor(client.internalCache()));
//Network connection Interceptor: establish network connection
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//Server request Interceptor: Send a request to the server to obtain data
interceptors.add(new CallServerInterceptor(forWebSocket));
//Build a chain of responsibility
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//Handling the chain of responsibility
return chain.proceed(originalRequest);
}
Here, after we create a list object `client` Interceptor, reconnection interceptor, bridge interceptor, cache interceptor, network connection interceptor, server request interceptor, etc**successively**Add to the list. Then, we use this list to create an interceptor chain. Used here`Responsibility chain design mode`,Each time an interceptor is executed, the next interceptor will be called or not called and the result will be returned. Obviously, the response we finally get is the result returned after the chain is executed. When we customize an interceptor, it will also be added to the interceptor chain. Here we encounter many new classes, such as `RealCall`,`Dispatcher` And the chain of responsibility. In the following, we will analyze the relationship between these classes and the links in the responsibility chain. Here, we will roughly sort out the whole request process. The following is the approximate sequence diagram of this process: ![OkHttp Request sequence diagram](https://user-gold-cdn.xitu.io/2018/10/19/1668c58f05078818?imageView2/0/w/1280/h/960/ignore-error/1) ### 2.2 distributor Dispatcher We mentioned above `Dispatcher` This class is used to distribute requests. Take the initial example code as an example OkHttp We will create a `RealCall` And add it to the double ended queue. Note, however, that the name of the double ended queue here is `runningSyncCalls`,In other words, this request is a synchronous request and will be executed immediately in the current thread. So, the following `getResponseWithInterceptorChain()` This is the execution process of synchronization. And when we finish executing, it will call `Dispatcher` of `finished(RealCall)` Method to remove the request from the queue. Therefore, this synchronous request cannot reflect the "distribution" function of the distributor. In addition to synchronous requests, there are asynchronous requests: when we get `RealCall` Call its `enqueue(Callback responseCallback)` Method and set a callback. This method executes the following line of code:
client.dispatcher().enqueue(new AsyncCall(responseCallback));
Use the callback above to create a `AsyncCall` And call `enqueue(AsyncCall)`. there `AsyncCall` Indirectly inherited from `Runnable`,Is an executable object and will `Runnable` of `run()` Method `AsyncCall` of `execute()` method.`AsyncCall` of `execute()` Method and `RealCall` of `execute()` The method is similar. Both use the responsibility chain to complete a network request. But the latter can be executed in an asynchronous thread. When we call `Dispatcher` of `enqueue(AsyncCall)` The method will also `AsyncCall` It is added to a queue and will be removed from the queue when the request is completed, except that the queue here is `runningAsyncCalls` perhaps `readyAsyncCalls`. They are both a two ended queue and are used to store asynchronous requests. The difference is,`runningAsyncCalls` It is an executing queue. When the executing queue reaches the limit, it will be placed in the ready queue `readyAsyncCalls` Medium:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
When the request is added to the executing queue, we will immediately use a thread pool to execute the request `AsyncCall`. In this way, the responsibility chain of the request will be executed asynchronously in a thread pool. The thread pool here consists of `executorService()` Method returns:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
Obviously, when the thread pool does not exist, a thread pool will be created. In addition to the above method, we can also build `OkHttpClient` When, customize one `Dispacher`,And specify a thread pool in its construction method. Let's make an analogy OkHttp The synchronous request of draws a timing diagram of asynchronous requests. You can understand the differences between the two implementations by comparing the two diagrams: ![OkHttp Asynchronous request](https://user-gold-cdn.xitu.io/2018/10/19/1668c5c04f04eab2?imageView2/0/w/1280/h/960/ignore-error/1) The above is the distributor `Dispacher` The logic doesn't seem so complicated. From the above analysis, we can see that the actual request execution process is not completed here. Here, we can only decide which thread to execute the request and cache the request with a double ended queue, while the actual request execution process is completed in the responsibility chain. Now let's analyze it OkHttp The implementation process of the responsibility chain in. ### 2.3 implementation process of responsibility chain In a typical responsibility chain design pattern, many objects are connected by each object's reference to its subordinate to form a chain. Requests are passed along the chain until an object in the chain decides to process the request. The client issuing the request does not know which object in the chain will eventually process the request, which makes the system dynamically reorganize and allocate responsibilities without affecting the client. A scene of responsibility chain in real life is the interview. When the interviewer in one round thinks you are not qualified to enter the next round, he can deny you, otherwise he will let the interviewer in the next round continue the interview. stay OkHttp Inside, the implementation mode of the responsibility chain is slightly different. Here we mainly analyze OkHttp Inside, how the responsibility chain is implemented. As for the specific logic in each chain, we will explain it one by one later. Back to 2.1 There are two things we should pay attention to: 1. When creating a chain of responsibility `RealInterceptorChain` The fifth parameter we pass in is 0. The parameter name is `index`,Will be assigned to `RealInterceptorChain` A global variable with the same name inside the instance. 2. When the responsibility chain is enabled, it will be called `proceed(Request)` method. Here is `proceed(Request)` Definition of method:
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
Here, the internal overloaded `proceed()` method. The method is simplified as follows:
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
// ...
//Call the next interceptor in the responsibility chain
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// ...
return response;
}
Note that when the responsibility chain is used for processing here, the next responsibility chain will be created and `index+1` As the next responsibility chain `index`. Then we use `index` Take an interceptor from the interceptor list and call its `intercept()` Method and pass the next execution chain as a parameter. In this way, when the next interceptor wants its next level to continue processing the request, it can call the incoming responsibility chain `proceed()` method; If the next level does not need to continue processing after processing, it will directly return a `Response` Instance. Because every time it's in the current `index` Add 1 to the base, so you can call `proceed()` Take the next interceptor from the interceptor list for processing. We should also pay attention to the retry interceptor mentioned earlier. This interceptor will internally start a `while` Loop and invoke the execution chain in the loop body. `proceed()` Method to continuously retry the request. This is because of the blocker chain there `index` Is fixed, so it can be called every time `proceed()` When, they can execute the chain from their next level. The following is the implementation process of this responsibility chain: ![Responsibility chain execution process](https://user-gold-cdn.xitu.io/2018/10/19/1668c5c6363ea20f?imageView2/0/w/1280/h/960/ignore-error/1) I see OkHttp After the execution of the interceptor chain, let's take a look at the logic of each interceptor. ### 2.3 retry and redirection: RetryAndFollowUpInterceptor `RetryAndFollowUpInterceptor` It is mainly used to retry when the request fails and redirect when necessary. As we said above, the responsibility chain will call the first interceptor when processing `intercept()` method. If we're creating OkHttp If the user-defined interceptor is not added to the client, then `RetryAndFollowUpInterceptor` It is the first interceptor called in our responsibility chain.
@Override public Response intercept(Chain chain) throws IOException {
// ...
//Note that here we initialize a streamalallocation and assign it to a global variable. Its role will be mentioned later
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
//Used to record the number of redirects
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response; boolean releaseConnection = true; try { // Here, the responsibility chain is executed from the current responsibility chain, which is a retry logic response = realChain.proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // Call the recover method to recover from the failure. If it can be recovered, it returns true; otherwise, it returns false if (!recover(e.getLastConnectException(), streamAllocation, false, request)) { throw e.getLastConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // Try connecting to the server again boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, streamAllocation, requestSendStarted, request)) throw e; releaseConnection = false; continue; } finally { // If releaseConnection is true, it indicates that an exception has occurred in the middle and resources need to be released if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } // Use the previous response priorResponse to build a response whose response body is empty if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder().body(null).build()) .build(); } // Depending on the response, you may add some authentication information, redirect or process timeout requests // If the request cannot be processed or the error does not need to be processed, null will be returned Request followUp = followUpRequest(response, streamAllocation.route()); // Unable to redirect, return the previous response directly if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } // close resource closeQuietly(response.body()); // When the maximum number of redirects is reached, an exception is thrown if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } // Here, judge whether the new request can reuse the previous connection. If it cannot be reused, create a new connection if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; } }
The above code is mainly used to do some processing according to the error information. It will judge whether the request can be redirected or whether it is necessary to retry according to the information returned by the server. If it is worth trying again, the previous connection will be created or reused, and the request will be retried in the next cycle. Otherwise, the obtained request will be wrapped and returned to the user. Here, we mentioned `StreamAllocation` Object, which is equivalent to a management class. It maintains the relationship between server connection, concurrent flow and request. This class also initializes a `Socket` Connect object, get input/Output stream object. At the same time, also note that here we pass `client.connectionPool()` A connection pool object was passed in `ConnectionPool`. Here, we just initialize these classes, but actually we don't really use these classes in the current method. Instead, we pass them to the following interceptor to get the response of the request from the server. Later, we will explain the purpose of these classes and the relationship between them. ### 2.4 BridgeInterceptor Bridge interceptor `BridgeInterceptor` It is used to build a network request from the user's request, then use the request to access the network, and finally build a user response from the network response. Relatively speaking, the logic of this interceptor is relatively simple. It is only used to wrap the request and convert the server response into a user-friendly response:
public final class BridgeInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
//Get network request builder from user request
Request.Builder requestBuilder = userRequest.newBuilder();
// ...
//Execute network request
Response networkResponse = chain.proceed(requestBuilder.build());
// ...
//Get user response builder from network response
Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);
// ...
//Return user response
return responseBuilder.build();
}
}
### 2.5 using cache: CacheInterceptor The cache interceptor will judge whether there is a cache available according to the requested information and the cached response information. If there is a cache available, it will return the cache to the user. Otherwise, it will continue the responsibility chain to obtain the response from the server. When the response is obtained, the response will be cached on disk. The following is the logic of this part:
public final class CacheInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;
long now = System.currentTimeMillis();
//Judge whether the cache is available according to the information in the request and cached response
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest; // If the request does not use the network, it is empty
Response cacheResponse = strategy.cacheResponse; // If the request does not use the cache, it is empty
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
//The request does not use the network and does not use the cache, which is equivalent to being intercepted here. It is not necessary to hand it over to the next level (network request interceptor) for execution
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
//The request uses the cache, but does not use the network: the result is taken from the cache and does not need to be handed over to the next level (network request interceptor) for execution
if (networkRequest == null) {
return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
}
Response networkResponse = null;
try {
//Here, the processing method of the execution chain is called, which is actually handed over to its next level for execution
networkResponse = chain.proceed(networkRequest);
} finally {
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
/ / this is called after the network request is received. The execution of the next level will be given to it to continue execution. If the cache is used, the result of the request will be updated to the cache.
if (cacheResponse != null) {
//The result returned by the server is 304, which returns the result in the cache
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
//Update cache
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//Put the result of the request into the cache
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
}
For caching, we use global variables `cache`,It is `InternalCache` Variable of type.`InternalCache` Is an interface, in OkHttp There is only one implementation class in `Cache`. stay `Cache` Internal, used `DiskLruCache` To save the cached data to disk.`DiskLruCache` as well as `LruCache` yes Android There are two commonly used caching strategies. The former is based on disk caching, and the latter is based on memory caching. Their core ideas are Least Recently Used,That is, the least recently used algorithm. We will introduce these two caching frameworks in detail in future articles. Please continue to pay attention to our articles. In addition, we used it above to determine whether there is a cache available according to the information in the request and cached response `CacheStrategy` A lot of judgment is used to get these two fields, which involves Http Cache related knowledge, if you are interested, you can refer to the source code yourself. ### 2.6 connection multiplexing: ConnectInterceptor Connection interceptor `ConnectInterceptor` It is used to open the network connection to the specified server and hand it over to the next interceptor for processing. Here, we only opened a network connection, but did not send a request to the server. The logic of obtaining data from the server is handed over to the next level interceptor for execution. Although we don't really get data from the network, we just open a connection, but there are a lot of contents worth our attention. Because the connection pool is used when obtaining the connection object `ConnectionPool` To reuse the connection.
public final class ConnectInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); boolean doExtensiveHealthChecks = !request.method().equals("GET"); HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection); } }
there `HttpCodec` Used to encode the request and decode the response,`RealConnection` Used to initiate a connection to the server. They will be used in the next interceptor to get the response information from the server. The logic of the next interceptor is not complicated. When everything is ready, it can read data from the server. so to speak, OkHttp The core part of is probably here, so let's first analyze how to reuse connections with the help of connection pools when creating connections. According to the above code, when we call `streamAllocation` of `newStream()` Method will eventually arrive after a series of judgments `StreamAllocation` Medium `findConnection()` method.
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
// ...
synchronized (connectionPool) {
// ...
//An attempt was made to use an assigned connection, which may have been restricted from creating a new flow
releasedConnection = this.connection;
//Release the resources of the current connection. If the connection has been restricted to create a new stream, a Socket will be returned to close the connection
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
//A connection has been assigned and is available
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
//If the connection has never been marked for acquisition, do not mark it as published. reportedAcquired is modified through the acquire() method
releasedConnection = null;
}
if (result == null) { // Try to get a connection from the connection pool Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } // Close connection closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If a connection has been obtained from the connection pool, it is returned return result; } boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Get a link from the connection pool based on a series of IP addresses List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size; i++) { Route route = routes.get(i); // Get a connection from the connection pool Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a new connection and assign it so that we can terminal before handshaking route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } } // If we find a pool connection the second time, we return it if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } // TCP and TLS handshake result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Put the connection into the connection pool Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address is created at the same time, release the connection and obtain that connection if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }
The method is placed in a loop and called continuously to get an available connection. It gives priority to the existing connections, otherwise it will use the connections existing in the connection pool. If not, it will create a new connection. Therefore, the above code is roughly divided into three parts: 1. Judge whether the current connection can be used: whether the flow has been closed and has been restricted to create a new flow; 2. If the current connection cannot be used, obtain a connection from the connection pool; 3. There are no available connections found in the connection pool. Create a new connection, shake hands, and then put it into the connection pool. When obtaining a connection from the connection pool, the `Internal` of `get()` method.`Internal` There is a static instance that will OkHttpClient Is initialized in the static code. We'll be there `Internal` of `get()` Calling connection pool `get()` Method to get a connection. From the above code, we can also see that in fact, one advantage of using connection reuse is that it eliminates the need for connection reuse TCP and TLS A process of shaking hands. Because establishing the connection itself takes some time, the reuse of the connection can improve the efficiency of our network access. So how are these connections managed after they are placed in the connection pool? We will analyze it below OkHttp of `ConnectionPool` How these connections are managed in. ### 2.7 CallServerInterceptor Server request interceptor `CallServerInterceptor` It is used to make a request to the server and obtain data. This is the last interceptor of the whole responsibility chain. Instead of calling the processing method of the execution chain, it processes the received response and returns it directly to the Interceptor at the upper level:
public final class CallServerInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; // Get HttpCodec initialized in ConnectInterceptor HttpCodec httpCodec = realChain.httpStream(); // Gets the streamalallocation initialized in RetryAndFollowUpInterceptor StreamAllocation streamAllocation = realChain.streamAllocation(); // Gets the RealConnection initialized in ConnectInterceptor RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); realChain.eventListener().requestHeadersStart(realChain.call()); // Write the request header here httpCodec.writeRequestHeaders(request); realChain.eventListener().requestHeadersEnd(realChain.call(), request); Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(true); } // Write the request body here if (responseBuilder == null) { realChain.eventListener().requestBodyStart(realChain.call()); long contentLength = request.body().contentLength(); CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); // Write request body request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); realChain.eventListener() .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount); } else if (!connection.isMultiplexed()) { streamAllocation.noNewStreams();