OKHttp source code analysis

Okhttp is a commonly used network framework. Okhttp is developed by square company. It is necessary to master the usage of okhttp and its internal working principle.

1, Introduction mode

 

1.gradle introduction

 

   implementation 'com.squareup.okhttp3:okhttp:3.14.7'
    implementation 'com.squareup.okio:okio:1.17.5'

2. Authority

<uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
    <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />

 

2, Mode of use

Steps:

  • Build the OkHttpClient object.
  • Build the Request object.
  • Generate a Call object.
  • Call initiates synchronous and asynchronous requests.

 

2.1 Get request

1. Asynchronous request

 

 val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

2. Synchronization request

val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
Thread() {
    try {
        val execute = newCall.execute()
        Log.v(OKHTTP_TAG, execute.toString())
    } catch (e: Exception) {
        e.printStackTrace()
    }
}.start()

 

2.2 Post request

The following example is:

1.post request submission String.

 

val okHttpClient = OkHttpClient()
val contentType = MediaType.parse("text/x-markdown; charset=utf-8")
val content = "Hello"
val body = RequestBody.create(contentType, content)

val request = Request.Builder()
    .url("https://wanandroid.com/wxarticle/chapters/json")
    .post(body)
    .build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

 

 

2.post request submission form.

val okHttpClient = OkHttpClient()
val formBody = FormBody.Builder().add("username", "zy")
    .add("password", "123")
    .build()

val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .post(formBody)
    .build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

 

 

Post request complex body

val okHttpClient = OkHttpClient()
//image file
val imageFile = File(Environment.getExternalStorageDirectory(), "test_ic.png")
//Build fileBody through RequestBody
val fileBody = RequestBody.create(MediaType.parse("image/jpg"), imageFile)
//MultipartBody builds multiple types (user name, password, Avatar)
val multipartBody = MultipartBody.Builder()
    .setType(MultipartBody.FORM)
    .addFormDataPart("username", "zy")
    .addFormDataPart("phone", "123456")
    .addFormDataPart("Test", "test_ic.png", fileBody)
    .build()

val getRequest = Request.Builder()
    .url("https://wanandroid.com/wxarticle/chapters/json")
    .post(multipartBody)
    .build()

val newCall = okHttpClient.newCall(getRequest)
newCall.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

 

 

2.3 request configuration item

In the following example, the timeout duration, cache location and size of the request are set, and the request sent by OKHttp is monitored (global configuration)

val okHttpClient = OkHttpClient.Builder()
    .connectTimeout(15, TimeUnit.SECONDS)
    .readTimeout(10, TimeUnit.SECONDS)
    .writeTimeout(10, TimeUnit.SECONDS)
    .cache(Cache(externalCacheDir, 500 * 1024 * 1024))
    .addInterceptor {
        val request = it.request()
        val url = request.url().toString()
        Log.v(
            OKHTTP_TAG,
            "intercept:proceed start: url" + url + ", at " + System.currentTimeMillis()
        )
        val response = it.proceed(request)
        val body = response.body()
        Log.v(
            OKHTTP_TAG,
            "intercept:proceed end: url" + url + ", at " + System.currentTimeMillis()
        )
        Log.v(OKHTTP_TAG, body?.toString() ?: "")
        response
    }.build()

val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

Request separate configuration

In the following example:

The request header is added using the addHeader() method.

Use cacheControl(CacheControl.FORCE_NETWORK) to set that the request can use the network without caching.

val okHttpClient = OkHttpClient()

val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get()
    .addHeader("key", "value")
    .cacheControl(CacheControl.FORCE_CACHE)
    .build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object :Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

 

 

3, OKHTTP request process

Analyze the request flow of OKHttp through a simple asynchronous request.

val okHttpClient = OkHttpClient()
val request = Request.Builder().url("https://wanandroid.com/wxarticle/chapters/json")
    .get().build()

val newCall = okHttpClient.newCall(request)
newCall.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        e.printStackTrace()
    }

    override fun onResponse(call: Call, response: Response) {
        Log.v(OKHTTP_TAG, response.toString())
        Log.v(OKHTTP_TAG, Thread.currentThread().name)
    }
})

 

1. Request creation

1.1 okhttpclient creation

 

Build an instance through the OkHttpClient constructor, and the configuration item is the default value in the Builder construction method.

OKHttpClient. Constructor for builder

public Builder() {
  dispatcher = new Dispatcher();//Distributor, which is mainly used to execute policies when asynchronous requests are made
  protocols = DEFAULT_PROTOCOLS;//http protocol
  connectionSpecs = DEFAULT_CONNECTION_SPECS;//Connection configuration
  eventListenerFactory = EventListener.factory(EventListener.NONE);//Request listening factory
  proxySelector = ProxySelector.getDefault();//surrogate selector 
  if (proxySelector == null) {
    proxySelector = new NullProxySelector();
  }
  cookieJar = CookieJar.NO_COOKIES;//cookie
  socketFactory = SocketFactory.getDefault();//Socket factory
  hostnameVerifier = OkHostnameVerifier.INSTANCE;//Host name authentication
  certificatePinner = CertificatePinner.DEFAULT;//Certificate chain
  proxyAuthenticator = Authenticator.NONE;//Proxy authentication
  authenticator = Authenticator.NONE;//Source server authentication
  connectionPool = new ConnectionPool();//Connection pool
  dns = Dns.SYSTEM;//dns domain name
  followSslRedirects = true;//Follow ssl redirection
  followRedirects = true;//Follow redirection
  retryOnConnectionFailure = true;//Do you want to retry when the connection fails
  callTimeout = 0;//request timeout
  connectTimeout = 10_000;//connection timed out
  readTimeout = 10_000;//Read timeout
  writeTimeout = 10_000;//Write timeout
  pingInterval = 0;//ping interval
}

 

1.2Request instance creation

Request is also created using Builder mode

 

 

 

 

 

 

// Request link
  HttpUrl url;
  // Request method
  String method;
  // Request header
  Headers.Builder headers;
  // Request body
  RequestBody body;
    // label
  Map<Class<?>, Object> tags = Collections.emptyMap();

 

 

1.3 creation of call object

 

 

 

public interface Call extends Cloneable {
 
  //request
  Request request();
  //Synchronization request
  Response execute() throws IOException;
  //Asynchronous request
  void enqueue(Callback responseCallback);
  //Cancel request
  void cancel();

 //Is it in the process of request
  boolean isExecuted();
 //Cancel request
  boolean isCanceled();


  Timeout timeout();

  
  Call clone();
  //Factory interface
  interface Factory {
    Call newCall(Request request);
  }
}

 

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  //OKHttpClient instance
  this.client = client;
  //Initial Request
  this.originalRequest = originalRequest;
  //Whether web socket communication is supported
  this.forWebSocket = forWebSocket;
}

 

The Transmitter internally holds OkHttpClient, connection pool, call and event listener.

public Transmitter(OkHttpClient client, Call call) {
  this.client = client; //OKHttpClient
  //Connection pool
  this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
  //Call instance
  this.call = call;
  //event listeners 
  this.eventListener = client.eventListenerFactory().create(call);
  this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}

 

 

 

2. Request scheduling

2.1 asynchronous request

 

 

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.callStart();//Start of callback request listener
  client.dispatcher().enqueue(new AsyncCall(responseCallback));//Start scheduling
}

 

First, judge whether it has been requested, and the callback request starts. Then call the enqueue method of Dispatcher and import the parameter AsyncCall. AsynCall is essentially a Runnable.

final class AsyncCall extends NamedRunnable {
    ... ...
}
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

 

 

#Dispatcher

Put the request into the double ended queue readyAsyncCalls, find the request with the same host from the executing request runningAsyncCalls or the waiting request readyAsyncCalls, and reuse the callsPerHost to the current request. callsPerHost looks at the name and feels like the number of requests with the same host. The type is AtomicInteger.

void enqueue(AsyncCall call) {
  synchronized (this) {
      //Add AsyncCall to the queue waiting for execution
    readyAsyncCalls.add(call);

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.get().forWebSocket) {
        //Request to find the same host from runningAsyncCalls or readyAsyncCalls
        //The same host, using the same host calls counter
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  promoteAndExecute();
}

 

//Request to find the same host from runningAsyncCalls or readyAsyncCalls
@Nullable private AsyncCall findExistingCallWithHost(String host) {
  for (AsyncCall existingCall : runningAsyncCalls) {
    if (existingCall.host().equals(host)) return existingCall;
  }
  for (AsyncCall existingCall : readyAsyncCalls) {
    if (existingCall.host().equals(host)) return existingCall;
  }
  return null;
}

 

#Dispatcher
private int maxRequests = 64; //The maximum number of simultaneous requests is 64
private int maxRequestsPerHost = 5;//The maximum number of simultaneous requests per host is 5

//Note that a double ended queue is used

/** Asynchronous task queue to be run */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous task queue*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronization task queue */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));

  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    //Traverse the asynchronous task queue waiting for execution
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        //Get an AsyncCall to be executed
      AsyncCall asyncCall = i.next();
        //When the maximum number of concurrent requests is less than 64 and the number of requests of the current requested host is less than 5, it is added to the asynchronous request queue in processing
      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
        
      i.remove();
      //The calls counter under the host is incremented by 1
      asyncCall.callsPerHost().incrementAndGet();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }

  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());//Execute request
  }

  return isRunning;
}

 

Traverse readyAsyncCalls and judge that if the number of runningAsyncCalls is greater than the maximum number of concurrent requests 64, break, or the number of requests from the same host is greater than 5, continue. If the above two conditions are not met, it will be removed from the waiting queue readyAsyncCalls, callsPerHost will increase by 1, put it into the collection executableCalls, and add it to the queue runningAsyncCalls, indicating the asynchronous requests being executed. The value of readyAsyncCalls is to control the buffer of the maximum concurrent number: when the concurrent number of asynchronous requests reaches 64 and the asynchronous requests of the same host reach 5, they should be put into the waiting queue.

 

#RealCall

void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
      //Execute AsyncCall through the incoming thread pool
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);//Callback failed
  } finally {
    if (!success) {
      client.dispatcher().finished(this); // This call is no longer running!
    }
  }
}

 

#Dispatcher

This thread pool is very similar to the thread pool of CachedThreadPool} executing the request RealCall.

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

 

#AsyncCall 

@Override protected void execute() {
  boolean signalledCallback = false;
  //Timeout timing
  transmitter.timeoutEnter();
  try {
      //Get request Response
    Response response = getResponseWithInterceptorChain();
    signalledCallback = true;
    //Callback request result
    responseCallback.onResponse(RealCall.this, response);
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
        //Walk failure callback
      responseCallback.onFailure(RealCall.this, e);
    }
  } catch (Throwable t) {
    cancel();
    if (!signalledCallback) {
      IOException canceledException = new IOException("canceled due to " + t);
      canceledException.addSuppressed(t);
      // Walk failure callback
      responseCallback.onFailure(RealCall.this, canceledException);
    }
    throw t;
  } finally {
      //End request
    client.dispatcher().finished(this);
  }
}

Call the getResponseWithInterceptorChain() method to get the Response, use the responseCallback callback to callback the result, and finally call the dispatcher's finish method at the end of the request.

 

3. Execution of requests

The execution request will eventually be executed into the getResponseWithInterceptorChain() method of RealCall

Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  //Application interceptor (user defined interceptor)
  interceptors.addAll(client.interceptors());
  //Retry and redirect interceptor
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  //Bridge interceptor
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  //Cache interceptor
  interceptors.add(new CacheInterceptor(client.internalCache()));
  //Connection interceptor
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
      //Network interceptor (user defined)
    interceptors.addAll(client.networkInterceptors());
  }
  //Request service interceptor
  interceptors.add(new CallServerInterceptor(forWebSocket));
  //Interceptor chain 
  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  boolean calledNoMoreExchanges = false;
  try {
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) {
      closeQuietly(response);
      throw new IOException("Canceled");
    }
    return response;
  } catch (IOException e) {
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
  } finally {
    if (!calledNoMoreExchanges) {
      transmitter.noMoreExchanges(null);
    }
  }
}
package okhttp3;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

 
    @Nullable Connection connection();

    Call call();

    int connectTimeoutMillis();

    Chain withConnectTimeout(int timeout, TimeUnit unit);

    int readTimeoutMillis();

    Chain withReadTimeout(int timeout, TimeUnit unit);

    int writeTimeoutMillis();

    Chain withWriteTimeout(int timeout, TimeUnit unit);
  }
}

 

 

 

Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
    originalRequest, this, client.connectTimeoutMillis(),
    client.readTimeoutMillis(), client.writeTimeoutMillis());

 

 

When instantiating RealInterceptorChain, the index value is 0 and the exchange value is null. First, the first interceptor is obtained, its interceptor method is called, and the result is returned and verified.

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
    index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

The parameter passed in by calling the interceptor method of the application Interceptor: interceptor chain instance next. Next is to add index + 1. The proceed method of next is invoked in the application interceptor.

Except for the last interceptor CallServerInterceptor, the interceptor methods of all interceptors call the incoming RealInterceptorChain instance. The interceptor method of each interceptor gets the response processed by the next interceptor after calling the processed method of chain, and then returns it to the previous interceptor.

 

4. Interceptor

All interceptors will be combined into an interceptor chain, and each interceptor will be executed in turn in a typical responsibility chain mode. Different interceptors have different responsibilities. The interceptors on the chain will be processed one by one in order. Before the Request is sent and the Response is returned, some custom logic will be inserted to facilitate the expansion of requirements.

 

#RealCall

Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  interceptors.addAll(client.interceptors());
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
    interceptors.addAll(client.networkInterceptors());
  }
  interceptors.add(new CallServerInterceptor(forWebSocket));

  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  boolean calledNoMoreExchanges = false;
  try {
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) {
      closeQuietly(response);
      throw new IOException("Canceled");
    }
    return response;
  } catch (IOException e) {
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
  } finally {
    if (!calledNoMoreExchanges) {
      transmitter.noMoreExchanges(null);
    }
  }
}

 

 

 

 

 

1.RetryAndFollowUpInterceptor

 

 

 

@Override public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Transmitter transmitter = realChain.transmitter();

  int followUpCount = 0;
  Response priorResponse = null;
  while (true) {
      //Ready to connect
    transmitter.prepareToConnect(request);

    if (transmitter.isCanceled()) {
      throw new IOException("Canceled");
    }

    Response response;
    boolean success = false;
    try {
        //Continue with the next Interceptor
      response = realChain.proceed(request, transmitter, null);
      success = true;
    } catch (RouteException e) {
      // The attempt to connect via a route failed. The request will not have been sent.
      //The connection route is abnormal, and the request has not been sent at this time.
      if (!recover(e.getLastConnectException(), transmitter, false, request)) {
        throw e.getFirstConnectException();
      }
      continue;
    } catch (IOException e) {
      // An attempt to communicate with a server failed. The request may have been sent.
      boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
      //IO exception, the request may have been sent.
      if (!recover(e, transmitter, requestSendStarted, request)) throw e;
      continue;
    } finally {
      // The network call threw an exception. Release any resources.
      if (!success) {
          //The request was unsuccessful. Free up resources.
        transmitter.exchangeDoneDueToException();
      }
    }

    // Attach the prior response if it exists. Such responses never have a body.
    // Associate previous response
    if (priorResponse != null) {
      response = response.newBuilder()
          .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
          .build();
    }

    Exchange exchange = Internal.instance.exchange(response);
    Route route = exchange != null ? exchange.connection().route() : null;
    //The follow-up result is mainly used to process the request according to the response code. When the return request is not empty, it will be redirected - get the redirected request.
     //Obtain the response code to determine whether redirection is required
    Request followUp = followUpRequest(response, route);
    //followUp is empty. You don't need to try again. You can return it directly
    if (followUp == null) {
      if (exchange != null && exchange.isDuplex()) {
        transmitter.timeoutEarlyExit();
      }
      return response;
    }

    RequestBody followUpBody = followUp.body();
    if (followUpBody != null && followUpBody.isOneShot()) {
      return response;
    }

    closeQuietly(response.body());
    if (transmitter.hasExchange()) {
      exchange.detachWithViolence();
    }
    //Retry up to 20 times
    if (++followUpCount > MAX_FOLLOW_UPS) {
      throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    request = followUp;
    priorResponse = response;
  }
}

 

2.BridgeInterceptor

@Override public Response intercept(Chain chain) throws IOException {
  Request userRequest = chain.request();
  Request.Builder requestBuilder = userRequest.newBuilder();

  RequestBody body = userRequest.body();
  if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
      requestBuilder.header("Content-Type", contentType.toString());
    }

    long contentLength = body.contentLength();
    //Get the length of RequestBody and add header "content length"
    if (contentLength != -1) {
      requestBuilder.header("Content-Length", Long.toString(contentLength));
      requestBuilder.removeHeader("Transfer-Encoding");
    } else {
        //Otherwise, add "transfer encoding"
      requestBuilder.header("Transfer-Encoding", "chunked");
      requestBuilder.removeHeader("Content-Length");
    }
  }

  if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
  }

  if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
  }

  // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
  // the transfer stream.
  //"Accept encoding: gzip" to receive and return gzip encoded compressed data
  //If "accept encoding: gzip" is added manually, the following if statement will not be entered. transparentGzip is false, and we need to handle data decompression ourselves.
  //If "accept encoding: gzip" is not added manually and transparentGzip is true, it will be added automatically and decompressed automatically in the future.
  boolean transparentGzip = false;
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
  }
    //Obtain the cookie from the cookie jar and add it to the Header.
  List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
  if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
  }
    //"User agent" needs to be added as a public Header
  if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
  }

    //Leave it to the next interceptor to process the request
  Response networkResponse = chain.proceed(requestBuilder.build());
   //Get the header "set cookie" from the networkResponse and store it in the cookie jar
  HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

  Response.Builder responseBuilder = networkResponse.newBuilder()
      .request(userRequest);
  //If "accept encoding: gzip" is not added manually, a ResponseBody gzipsource that can be decompressed automatically will be created automatically
  if (transparentGzip
      && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
      && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    responseBuilder.headers(strippedHeaders);
    String contentType = networkResponse.header("Content-Type");
    responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
  }
  //Build a new Response and return it
  return responseBuilder.build();
}

 

BridgeInterceptor interceptor adds a header to the request: "content type", "content length" or "transfer encoding", "Host", "Connection", "accept encoding", "cookie" and "user agent", that is, the truly executable request of the network layer. There is no cookie processing by default. We need to configure our own cookie jar when initializing OkhttpClient.

After obtaining the response, first store the cookie in the response header into the cookie jar, and then if the request header "accept encoding: gzip" is not manually added, it will be returned by creating a responseBody that can be automatically decompressed - GzipSource, and then building a new response.

 

3.CacheInterceptor

 

CacheInterceptor, a cache interceptor, provides access to the network request cache. Using local cache reasonably through CacheInterceptor can effectively reduce network overhead and response delay.

 

1. Understand the following http caching mechanism:

First request:

 

Second request:

 

 

 

 

@Override public Response intercept(Chain chain) throws IOException {
    //Use the url of the request to get the response from the cache as a candidate (CacheStrategy determines whether to use it)
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  long now = System.currentTimeMillis();
    //Obtain cache policy according to request and candidate response
    //The caching policy determines whether to use caching: strategy Networkrequest is null and does not use the network; strategy.cacheResponse is null, cache is not used.
   //There are two types of caching policies
   //networkRequest
   //cacheResponse cached response
   CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
  Request networkRequest = strategy.networkRequest;
  Response cacheResponse = strategy.cacheResponse;
  
  //Update the statistical indicators according to the cache policy: number of requests, number of network requests and number of cache usage
  if (cache != null) {
      //Count requests and caches
    cache.trackResponse(strategy);
  }
  //There is a cache but it cannot be used. Close it
  if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
  }

  // If we're forbidden from using the network and the cache is insufficient, fail.
   //If the network request and cache cannot be used, 504 is returned 
   //If the use of the network is prohibited and the cache is insufficient, a Response of 504 and an empty body is returned
   if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  }

  // If we don't need the network, we're done.
  //If the network is not used, the cacheResponse is definitely not empty. Then the cache is used. It is over and the interceptor behind it will not be used.
  //If the network cannot be used in the policy, the cached response is encapsulated and returned
   if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  }
 //Here, networkrequest= Null (cacheResponse may or may not be null)
 //networkRequest != null is a network request, so the interceptor continues to process it.
  Response networkResponse = null;
  try {
      //Call the process of interceptor chain to get data from the network
    networkResponse = chain.proceed(networkRequest);
  } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
      closeQuietly(cacheCandidate.body());
    }
  }

  // If we have a cache response too, then we're doing a conditional get.
  //If the network request returns 304, indicating that the server-side resources have not been modified, combine the network response and cache response, then update the cache, return and end.
  //If there is a cached Response
  if (cacheResponse != null) {
      //If the code returned by the network request is 304, it means that the resource has not been modified
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        //Directly encapsulate the cached Response and return it
      Response response = cacheResponse.newBuilder()
          .headers(combine(cacheResponse.headers(), networkResponse.headers()))//Combine header
          .sentRequestAtMillis(networkResponse.sentRequestAtMillis())//Request event
          .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())//Accept event
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
      networkResponse.body().close();

      // Update the cache after combining headers but before stripping the
      // Content-Encoding header (as performed by initContentStream()).
      cache.trackConditionalCacheHit();
      cache.update(cacheResponse, response);
      return response;
    } else {
        //If it is not 304, it indicates that the server-side resources have been updated, and then close the cache body
      closeQuietly(cacheResponse.body());
    }
  }

  Response response = networkResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build();

  if (cache != null) {
      //Network response can be cached (the header cache control of request and response is not "no store")
    //Determine whether there is a principal and whether it can be cached for subsequent use
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
      // Offer this request to the cache.
      // Write cache
      //Add cache
      CacheRequest cacheRequest = cache.put(response);
      return cacheWritingResponse(cacheRequest, response);
    }
    //OkHttp only caches get requests by default, because the data requested by get is generally persistent
    //post is generally an interactive operation, which does not make much sense to cache
    //Remove cache instead of get request
    //If the request method is invalid, it is remove d from the cache
    if (HttpMethod.invalidatesCache(networkRequest.method())) {
      try {
        cache.remove(networkRequest);
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
    }
  }

  return response;
}

We usually use network caching in OkHttp to improve access efficiency.

  • When there is a network: frequent requests in a short time, and subsequent requests use the resources in the cache.
  • When there is no network: obtain the previously cached data for temporary page display. When the network is updated, refresh the data of the current activity and refresh the interface to avoid the scene of blank interface.

 

 

3.ConnectInterceptor

This interceptor is about TCP connections. The connection interceptor is to find an available connection, that is, a TCP connection, which is used for HTTP requests and responses.

  • Open a network connection to the specified server
  • The logic to hand over to the next interceptor, CallServerInterceptor, to process the request and get the data

 

 

 

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  Transmitter transmitter = realChain.transmitter();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  // If the request is in GET format, some additional checks are required
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

  return realChain.proceed(request, transmitter, exchange);
}
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  synchronized (connectionPool) {
    if (noMoreExchanges) {
      throw new IllegalStateException("released");
    }
    if (exchange != null) {
      throw new IllegalStateException("cannot make a new request because the previous response "
          + "is still open: please call response.close()");
    }
  }
  // Looking for an ExchangeCodec object
  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  // Build an Exchange object from the codec object found
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
  // Assign some variables
  synchronized (connectionPool) {
    this.exchange = result;
    this.exchangeRequestDone = false;
    this.exchangeResponseDone = false;
    return result;
  }
}

First, through the Exchange finder The find , method searches for the , exchangecode , and finds the corresponding , exchangecode , object. Then, an , Exchange , object is constructed through this codec object and returned.

  • Exchange codec: a codec used for a connection to encode HTTP requests and decode HTTP responses.
  • Exchange: a tool class that encapsulates this codec, which is used to manage exchange codec and handle actual I/O.

 

 

public ExchangeCodec find(
    OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  int connectTimeout = chain.connectTimeoutMillis();
  int readTimeout = chain.readTimeoutMillis();
  int writeTimeout = chain.writeTimeoutMillis();
  int pingIntervalMillis = client.pingIntervalMillis();
  boolean connectionRetryEnabled = client.retryOnConnectionFailure();

  try {
      // The findHealthyConnection method to get the RealConnection object
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
        writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
        //Realconnection. Was called The newcodec method gets the ExchangeCodec object
    return resultConnection.newCodec(client, chain);
  } catch (RouteException e) {
    trackFailure();
    throw e;
  } catch (IOException e) {
    trackFailure();
    throw new RouteException(e);
  }
}

Find available connections

Let's first see the findHealthyConnection method:

 

 

/**
 * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
 * until a healthy connection is found.
 */
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
    boolean doExtensiveHealthChecks) throws IOException {
  while (true) {
      //You can see that this is a loop, constantly calling the findConnection method to find a connection,
      //If a Healthy connection cannot be found, continue the cycle until it is found.
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        pingIntervalMillis, connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
        return candidate;
      }
    }

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      candidate.noNewExchanges();
      continue;
    }

    return candidate;
  }
}

 

Method to find connection: findConnection method.

 

/**
 * Returns a connection to host a new stream. This prefers the existing connection if it exists,
 * then the pool, finally building a new connection.
 */
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
  boolean foundPooledConnection = false;
  RealConnection result = null;
  Route selectedRoute = null;
  RealConnection releasedConnection;
  Socket toClose;
  synchronized (connectionPool) {
    if (transmitter.isCanceled()) throw new IOException("Canceled");
    hasStreamFailure = false; // This is a fresh attempt.

    // Attempt to use an already-allocated connection. We need to be careful here because our
    // already-allocated connection may have been restricted from creating new exchanges.
    // 1. Reuse the current connection and check whether the connection is available and reusable
       releasedConnection = transmitter.connection;
        //If the connection cannot create a Stream, release the resource and return the close socket to be closed
    toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
        ? transmitter.releaseConnectionNoEvents()
        : null;
     //Prove that the connection is available
    if (transmitter.connection != null) {
      // We had an already-allocated connection and it's good.
      //There are allocated connections available. Reuse the current connection
      result = transmitter.connection;
      releasedConnection = null;
    }
     //If there is no connection available, go to the connection pool to find it
    if (result == null) {
      // Attempt to get a connection from the pool.
      //2. Get available connections from connection pool
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
        foundPooledConnection = true;
        result = transmitter.connection;
      } else if (nextRouteToTry != null) {
        selectedRoute = nextRouteToTry;
        nextRouteToTry = null;
      } else if (retryCurrentRoute()) {
        selectedRoute = transmitter.connection.route();
      }
    }
  }
  closeQuietly(toClose);
  //Callback
  if (releasedConnection != null) {
    eventListener.connectionReleased(call, releasedConnection);
  }
  if (foundPooledConnection) {
    eventListener.connectionAcquired(call, result);
  }
  if (result != null) {
    // If we found an already-allocated or pooled connection, we're done.
     //A connection that has been allocated or in the connection pool is found. The process ends and the connection is returned
    return result;
  }

  // If we need a route selection, make one. This is a blocking operation.
  //Otherwise, we need a routing information, which is a blocking operation
  boolean newRouteSelection = false;
  if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
    newRouteSelection = true;
    routeSelection = routeSelector.next();
  }

  List<Route> routes = null;
  synchronized (connectionPool) {
    if (transmitter.isCanceled()) throw new IOException("Canceled");

    if (newRouteSelection) {
      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. This could match due to connection coalescing.
      routes = routeSelection.getAll();
      //3. Obtain available connections from the connection pool (through a group of routing routes), and obtain connections from the connection pool again through more comprehensive routing information
      if (connectionPool.transmitterAcquirePooledConnection(
          address, transmitter, routes, false)) {
        foundPooledConnection = true;
        result = transmitter.connection;
      }
    }
     //If no new connection is found
    if (!foundPooledConnection) {
      if (selectedRoute == null) {
        selectedRoute = routeSelection.next();
      }

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
     // 4. Create a new connection
       result = new RealConnection(connectionPool, selectedRoute);
      connectingConnection = result;
    }
  }

  // If we found a pooled connection on the 2nd time around, we're done.
  //If the connection is found from the connection pool, it indicates that it is reusable. Not newly generated
    //If a new connection is generated, you need to connect to the server to use it
  if (foundPooledConnection) {
    eventListener.connectionAcquired(call, result);
    return result;
  }

  // Do TCP + TLS handshakes. This is a blocking operation.
  //This indicates that it is a newly generated connection
    //tcp and tls handshake, blocking operation, connecting to server
  result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
      connectionRetryEnabled, call, eventListener);
      //Add the route information to the whitelist of routeDatabase to prove that the route can connect to the specified server
  connectionPool.routeDatabase.connected(result.route());

//The last attempt at join merge will only happen if we try it many times
      //Concurrent connection of the same host
  Socket socket = null;
  synchronized (connectionPool) {
    connectingConnection = null;
    // Last attempt at connection coalescing, which only occurs if we attempted multiple
    // concurrent connections to the same host.
    // //5. Obtain another connection to prevent other competing connections from being created during the new connection process
    if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
      // We lost the race! Close the connection we created and return the pooled connection.
      //Close the created connection and return to the connection pool
      result.noNewExchanges = true;
      socket = result.socket();
      result = transmitter.connection;

      // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
      // that case we will retry the route we just successfully connected with.
      //  It is possible to obtain an unhealthy connection. If this is the case, the route that has just successfully connected will be retried
      nextRouteToTry = selectedRoute;
    } else {
        6,Use the new connection created, put it into the connection pool, and return
      connectionPool.put(result);
      transmitter.acquireConnectionNoEvents(result);
    }
  }
  closeQuietly(socket);

  eventListener.connectionAcquired(call, result);
  return result;
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

The priority of available connections is: current connections > connections in the connection pool > new connections.

  • If the current connection is available, the current connection is preferred
  • If the current connection is not available, get the connection from the connection pool
  • If the connection pool acquisition fails, a new connection is created, TCP and TSL handshakes are performed, and then placed in the connection pool

Connection multiplexing can eliminate the handshake process between TCP and TLS, so as to improve the efficiency of network access.

 

The process of obtaining connections is very complex. The general process is as follows:

1. Check if the current connection is available. Determine whether this connection is available through nonexchanges. If not available, the transmitter The connection is set to null. When the request fails and needs to be retried or redirected, the connection is still there and can be reused directly.

2. Get available connections from the connection pool.

The difference between the following two codes:

 

 

connectionPool.callAcquirePooledConnection(address, call, null, false)
connectionPool.callAcquirePooledConnection(address, call, routes, false)

 

1. There is one more routes field.

A technology related to HTTP/2 is called HTTP/2 CONNECTION COALESCING. Assuming that there are two domain names that can be resolved to the same IP address and can use the same TLS Certificate (such as wildcard certificate), the client can reuse the same TCP connection to obtain resources from the two domain names.

This routes is the set of ip addresses that can be resolved by the current domain name (host name). The difference between the two methods is that one transmits the routing address and the other does not.

 

Continue to look at the callAcquirePooledConnection Code:

 

 

boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
    @Nullable List<Route> routes, boolean requireMultiplexed) {
  assert (Thread.holdsLock(this));
  for (RealConnection connection : connections) {
    if (requireMultiplexed && !connection.isMultiplexed()) continue;
    if (!connection.isEligible(address, routes)) continue;
    transmitter.acquireConnectionNoEvents(connection);
    return true;
  }
  return false;
}

First, the incoming requiremeultiplexed is false and iseligible (address address, @ nullable list < route > routes). Judge the host name, port number, etc. if the requests are exactly the same, return the connection directly. If the host name is different, you can also judge whether it is an HTTP/2 request. If so, continue to judge the routing address and certificate. If they can match, the connection is also available.

 

3. Create a new connection.

If no new connection is obtained from the connection pool, a new connection is created, which is actually called to socket Connect for TCP connection.

 

4. Get another connection from the connection pool to prevent other competing connections from being created during the new connection process.

During the creation process, there may be other requests to create a new connection with you, so we need to get another connection. If there is something available, we can use it directly to prevent resource waste.

In fact, this involves a knowledge point of HTTP2: multiplexing. That is, you do not need to make the next request after the last request of the current connection is completed. As long as there is a connection, you can use it directly. HTTP/2 can ensure that only one connection can be established in the same domain name, and requests can be made concurrently.

5. Put the new connection into the connection pool and return.

 

Connection multiplexing pool: RealConnectionPool

1. Reference count

Use the Transmitter class in OkHttp to count.

The operations of counting plus one and counting minus one are actually changing the size of the List < reference > List. The maintenance class of List < reference > is RealConnection, which is the packaging of socket physical connection. The number of Transmitter weak references in the List is the count of socket references. When the count is 0, it means that the connection is idle.

/** Current calls carried by this connection. */
final List<Reference<Transmitter>> transmitters = new ArrayList<>();

 

#Transmitter
void acquireConnectionNoEvents(RealConnection connection) {
  assert (Thread.holdsLock(connectionPool));

  if (this.connection != null) throw new IllegalStateException();
  this.connection = connection;
  //Reference count plus 1
  connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}

/**
 * Remove the transmitter from the connection's list of allocations. Returns a socket that the
 * caller should close.
 */
@Nullable Socket releaseConnectionNoEvents() {
  assert (Thread.holdsLock(connectionPool));

  int index = -1;
  for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
    Reference<Transmitter> reference = this.connection.transmitters.get(i);
    if (reference.get() == this) {
      index = i;
      break;
    }
  }

  if (index == -1) throw new IllegalStateException();

  RealConnection released = this.connection;
  //Reference count minus one
  released.transmitters.remove(index);
  this.connection = null;

  if (released.transmitters.isEmpty()) {
    released.idleAtNanos = System.nanoTime();
    if (connectionPool.connectionBecameIdle(released)) {
      return released.socket();
    }
  }

  return null;
}

 

  • 2.RealConnectionPool
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
    Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
    new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));

/** The maximum number of idle connections for each address. */
//Maximum number of free socket connections	  
private final int maxIdleConnections;
 //keepAlive time of socket
private final long keepAliveDurationNs;
//Two way queue, which maintains RealConnection, that is, the packaging of socket physical connection
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  //Record the blacklist of failed routes. When the connection fails, the failed routes will be added
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning; //Are you cleaning up
public final class ConnectionPool {
  final RealConnectionPool delegate;
  
  public ConnectionPool() {
  this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
  this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
}
    ... ...
}

The proxy mode is used here. Through the ConnectionPool class, we can find that the maximum number of connections of the default idle socket is 5, the keep alive time of the socket is 5 minutes, and the RealConnectionPool object is actually constructed when constructing the ConnectionPool object.

 

 

  • 3. Cache operation

3.1 placing connections

void put(RealConnection connection) {
  assert (Thread.holdsLock(this));
  if (!cleanupRunning) {
    cleanupRunning = true;
      //Use the thread pool to perform cleanup tasks
    executor.execute(cleanupRunnable);
  }
  //Add a connection to a two ended queue
  connections.add(connection);
}

There are two jobs to put into the connection:

  • If the current connection pool is not cleaning up connections, first use the thread pool to perform the cleaning task and set the flag bit being cleaned up to true
  • Adds the current connection to the double ended queue

 

3.2 cleaning connections

When putting in the connection, we will clean up the connection and call the thread pool to execute the task of cleanupRunnable. Let's take a look at this task first

//The thread continuously calls cleanup to clean up, and returns the interval of the next cleaning
private final Runnable cleanupRunnable = () -> {
  while (true) {
      //Clean up the connection and return the interval for the next time you need to clean up
    long waitNanos = cleanup(System.nanoTime());
    if (waitNanos == -1) return;
    if (waitNanos > 0) {
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      synchronized (RealConnectionPool.this) {
        try {
          RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
        } catch (InterruptedException ignored) {
        }
      }
    }
  }
};

It can be found that once the cleaning task starts to execute, the cleaning connection will be made every specified interval.

 

RealConnectionPool#cleanup

long cleanup(long now) {
 //Number of connections in use
  int inUseConnectionCount = 0;
  //Number of idle connections
  int idleConnectionCount = 0;
  //Long idle connections
  RealConnection longestIdleConnection = null;
  //Maximum idle time
  long longestIdleDurationNs = Long.MIN_VALUE;

  // Find either a connection to evict, or the time that the next eviction is due.
  synchronized (this) {
      // Traversal connection
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
      RealConnection connection = i.next();

     //Query the number of references of transmitter s for this connection
        //If the number of references is greater than 0, the number inUseConnectionCount plus 1 is used
        //Otherwise, the idle quantity idleConnectionCount is increased by 1
      // If the connection is in use, keep searching.
      if (pruneAndGetAllocationCount(connection, now) > 0) {
        inUseConnectionCount++;
        continue;
      }

      idleConnectionCount++;

      // If the connection is ready to be evicted, we're done.
      //Find the connection that has been idle for the longest time
      //Get free time
      long idleDurationNs = now - connection.idleAtNanos;
      if (idleDurationNs > longestIdleDurationNs) {
          //Maximum idle time
        longestIdleDurationNs = idleDurationNs;
        //Longest idle connection
        longestIdleConnection = connection;
      }
    }
  //If the idle time of idle connections exceeds 5 minutes, or the number of idle connections exceeds 5, remove the longest idle connection
    if (longestIdleDurationNs >= this.keepAliveDurationNs
        || idleConnectionCount > this.maxIdleConnections) {
      // We've found a connection to evict. Remove it from the list, then close it below (outside
      // of the synchronized block).
      connections.remove(longestIdleConnection);
    } else if (idleConnectionCount > 0) {
      // A connection will be ready to evict soon.
       //If the number of idle connections is greater than 0, the time when this link is about to expire is returned
      return keepAliveDurationNs - longestIdleDurationNs;
    } else if (inUseConnectionCount > 0) {
      // All connections are in use. It'll be at least the keep alive duration 'til we run again.
       //If all connections are in use, clean up after 5 minutes
       return keepAliveDurationNs;
    } else {
      // No connections, idle or in use.
       //If there are no connections, jump out of the loop
      cleanupRunning = false;
      return -1;
    }
  }

  closeQuietly(longestIdleConnection.socket());

  // Cleanup again immediately.
  return 0;
}

 

 

 

3.3 get the number of references of the connection

When cleaning up the connection, call the pruneAndGetAllocationCount method to judge the idle connection and active connection.

 

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
  List<Reference<Transmitter>> references = connection.transmitters;
  //Traverse weak reference list
  for (int i = 0; i < references.size(); ) {
    Reference<Transmitter> reference = references.get(i);
     //Skip if the Transmitter is used
    if (reference.get() != null) {
      i++;
      continue;
    }

    // We've discovered a leaked transmitter. This is an application bug.
    TransmitterReference transmitterRef = (TransmitterReference) reference;
    String message = "A connection to " + connection.route().address().url()
        + " was leaked. Did you forget to close a response body?";
    Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
    //If the Transmitter is not used, remove the reference
    references.remove(i);
    connection.noNewExchanges = true;

    // If this was the last allocation, the connection is eligible for immediate eviction.
    //If the list is empty, it indicates that this connection is not referenced, and 0 is returned, indicating that this connection is idle
    //Indicates that this connection is idle
    if (references.isEmpty()) {
        //
      connection.idleAtNanos = now - keepAliveDurationNs;
      return 0;
    }
  }
  //Otherwise, a non-zero number is returned, indicating that the connection is active
  return references.size();
}

 

4.IO interceptor (CallServerInterceptor)

 

@Override public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Exchange exchange = realChain.exchange();
  Request request = realChain.request();

  long sentRequestMillis = System.currentTimeMillis();
  //Write request header
  exchange.writeRequestHeaders(request);

  boolean responseHeadersStarted = false;
  Response.Builder responseBuilder = null;
  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
    // Continue" response before transmitting the request body. If we don't get that, return
    // what we did get (such as a 4xx response) without ever transmitting the request body.
    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
      exchange.flushRequest();
      responseHeadersStarted = true;
      exchange.responseHeadersStart();
      responseBuilder = exchange.readResponseHeaders(true);
    }
      
    if (responseBuilder == null) {
      if (request.body().isDuplex()) {
        // Prepare a duplex body so that the application can send a request body later.
        exchange.flushRequest();
        BufferedSink bufferedRequestBody = Okio.buffer(
            exchange.createRequestBody(request, true));
            //Write request body
        request.body().writeTo(bufferedRequestBody);
      } else {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        BufferedSink bufferedRequestBody = Okio.buffer(
            exchange.createRequestBody(request, false));
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      }
    } else {
      exchange.noRequestBody();
      if (!exchange.connection().isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        exchange.noNewExchangesOnConnection();
      }
    }
  } else {
    exchange.noRequestBody();
  }

  if (request.body() == null || !request.body().isDuplex()) {
    exchange.finishRequest();
  }

  if (!responseHeadersStarted) {
    exchange.responseHeadersStart();
  }
  //Read response header
  if (responseBuilder == null) {
    responseBuilder = exchange.readResponseHeaders(false);
  }

  Response response = responseBuilder
      .request(request)
      .handshake(exchange.connection().handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();
  //Read response body
  int code = response.code();
  if (code == 100) {
    // server sent a 100-continue even though we did not request one.
    // try again to read the actual response
    response = exchange.readResponseHeaders(false)
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    code = response.code();
  }

  exchange.responseHeadersEnd(response);

  if (forWebSocket && code == 101) {
    // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
    response = response.newBuilder()
        .body(Util.EMPTY_RESPONSE)
        .build();
  } else {
    response = response.newBuilder()
        .body(exchange.openResponseBody(response))
        .build();
  }

  if ("close".equalsIgnoreCase(response.request().header("Connection"))
      || "close".equalsIgnoreCase(response.header("Connection"))) {
    exchange.noNewExchangesOnConnection();
  }

  if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
    throw new ProtocolException(
        "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
  }
  //Return final response
  return response;
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Keywords: Android OkHttp Framework

Added by Thoughtless on Tue, 08 Feb 2022 02:57:03 +0200