Analysis of OkHttp source code of Android network framework -- interceptor analysis

Android network framework -- OkHttp source code analysis

This article is based on Kotlin language

Previous article:
Analysis of OkHttp source code of Android network framework (1) -- request process
After explaining the request process of OkHttp, this article enters the second part - OkHttp interceptor analysis.

Interceptor analysis of OkHttp source code analysis

In the process of sending the request in the previous chapter, there is an important method getResponseWithInterceptorChain(), which will create many interceptors. They will process the request before sending it to the server. After the server returns the response, these interceptors will return after processing the response. Interceptor is an important core function of okhttp. The implementation of each interceptor function will also involve okhttp's caching and connection mechanism:

// getResponseWithInterceptorChain method of RealCall
Response getResponseWithInterceptorChain() throws IOException {
    // Create an interceptor chain
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//Add custom application interceptor
    interceptors.add(retryAndFollowUpInterceptor);.//Add interceptor responsible for retrying redirection
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//Add interceptor responsible for transforming request response
    interceptors.add(new CacheInterceptor(client.internalCache()));//Add interceptor responsible for caching
    interceptors.add(new ConnectInterceptor(client));//Add interceptors responsible for managing connections
    if (!forWebSocket) {
        //Generally, the interceptor is not added without special requirements
        interceptors.addAll(client.networkInterceptors());//Add our custom network interceptor
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//Add an interceptor responsible for initiating a request and obtaining a response
    //Create chain
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

	//Call the Chain's processed (request) method to process the request (this method returns a response)
    return chain.proceed(originalRequest);
}

From the above code, we can see that interceptors can be divided into the following categories:

  • custom interceptor
  • RetryAndFollowUpInterceptor
  • BridgeInterceptor
  • CacheInterceptor (CACHE interceptor)
  • ConnectInterceptor
  • CallServerInterceptor (request server interceptor)
  • networkInterceptors() (network interceptors are not commonly used and will not be explained in this article)

1, Custom interceptor

OK, let's take a look at the HTTP interceptor first:

fun interface Interceptor {
  @Throws(IOException::class)
  //Interceptor logic
  fun intercept(chain: Chain): Response

  companion object {

    inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor =
      Interceptor { block(it) }
  }

  interface Chain {
  	//Return Request
    fun request(): Request

    @Throws(IOException::class)
    //Call the processed method of Chain to process the Request and return the Response
    fun proceed(request: Request): Response
    //Custom application interceptors are generally null
    fun connection(): Connection?
	//Return the corresponding Call object
    fun call(): Call
	//connection timed out
    fun connectTimeoutMillis(): Int

    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
	//Read in timeout
    fun readTimeoutMillis(): Int

    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
	//Write timeout
    fun writeTimeoutMillis(): Int

    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }
}

It can be seen that the interceptor interface is mainly composed of intercept(Chain) method and internal interface Chain. Here is a customized Interceptor:

class MyInterceptor : Interceptor {
    @Throws(IOException::class)
    fun intercept(chain: Chain): Response {

        //1. Get Request
        val request: Request = chain.request()

        //2. To process the Request, the logic part defines itself
        //...

        //3. Call the Chain's processed (request) method to process the request and get the Response
		response = chain.proceed(request)
        //4. Handle the Response, and the logic part defines it by itself
        //...

        //5. Return Response
        return response
    }
}

After customizing our custom interceptors, how can we add them to our OkHttp configuration? See the following java code:

OkHttpClient client = new OkHttpClient.Builder()
     .addInterceptor(new MyInterceptor())//Add a custom interceptor through the addInterceptor() method
     .build();

2, RetryAndFollowUpInterceptor

//intercept method of RetryAndFollowUpInterceptor class
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //Get request object
    var request = chain.request
    //Get RealCall object
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      //Note 1: create exchange finder internally
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }

        try {
          //Call the proceed method, which calls the intercept method of the next interceptor BridgeInterceptor (explained in the previous article)
          response = realChain.proceed(request)
          //A successful request indicates that the ExchangeFinder is occupied. Setting newExchangeFinder to true means that a new ExchangeFinder will be created next time
          newExchangeFinder = true
        } catch (e: RouteException) {
          //Call the recover method to detect whether the connection can continue to be used
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          //If the request fails, it means that the exchange finder is idle. If the new exchange finder is set to false, it means that the exchange finder is used instead of creating a new exchange finder next time
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          //Call the recover method to detect whether the connection can continue to be used
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          //If the request fails, it means that the exchange finder is idle. If the new exchange finder is set to false, it means that the exchange finder is used instead of creating a new exchange finder next time
          newExchangeFinder = false
          continue
        }
		//If there is a redirection request before, the response to the redirection request is returned directly
        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        val followUp = followUpRequest(response, exchange)

        //followUp is null, no redirection is required, and the Response is returned directly
        if (followUp == null) {
		//...
          return response
        }

        //followUp is not empty, redirection is required
        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()
		//The redirection times are too many. An exception is thrown
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }
		//Retry again with the reset backward Request
        request = followUp
        priorResponse = response
      } finally {
      //Release connection resources under abnormal conditions
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }

Note 1:

  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    check(interceptorScopedExchange == null)
	//...
    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

The above code can clearly see that whether an exchange finder object is returned according to the newExchangeFinder value. The purpose of this object is to request to find an available TCP connection, and one of the parameters will be request The URL is encapsulated in the Address object to see its implementation:

  private fun createAddress(url: HttpUrl): Address {
    var sslSocketFactory: SSLSocketFactory? = null
    var hostnameVerifier: HostnameVerifier? = null
    var certificatePinner: CertificatePinner? = null
    if (url.isHttps) {
      sslSocketFactory = client.sslSocketFactory
      hostnameVerifier = client.hostnameVerifier
      certificatePinner = client.certificatePinner
    }

    return Address(
        uriHost = url.host,
        uriPort = url.port,
        dns = client.dns,
        socketFactory = client.socketFactory,
        sslSocketFactory = sslSocketFactory,
        hostnameVerifier = hostnameVerifier,
        certificatePinner = certificatePinner,
        proxyAuthenticator = client.proxyAuthenticator,
        proxy = client.proxy,
        protocols = client.protocols,
        connectionSpecs = client.connectionSpecs,
        proxySelector = client.proxySelector
    )
  }

You can see the returned result. Create an address instance using the url and client configuration. Address is the address of the connection to the service, which can be understood as the request address and its configuration.

Address plays an important role: HTTP requests with the same address share the same connection, which is actually for http1 1 and http2 0 judgment of multiplexing connection request.

Here, we first remember that in the intercept method of the RetryAndFollowUpInterceptor class, we first create an ExchangeFinder object, which will be used later!

3, BridgeInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
      //Encapsulate Header
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    //If we add a "accept encoding: gzip" header field, we are also responsible for decompressing the transport stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }
	//Create cookieJar for OkhttpClient configuration
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
	//Call the proceed method, which calls the intercept method of the next interceptor, ChcheInterceptor (explained in the previous article)
    val networkResponse = chain.proceed(requestBuilder.build())

	//The following steps build a user response based on the network response
    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

With its name, it is a network connection bridge, which is responsible for converting the user constructed request into the request sent to the server, and converting the response returned by the server into a user-friendly response.

  • In the Request phase, configure user information and add some Request headers.
  • In the Response phase, cancel some request headers and unzip them.

4, CacheInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    //1. Read candidate cache
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()
	//2. Create cache policy, enforce cache, compare cache, etc
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

	//According to the cache policy, update the statistical indicators: number of requests, number of network requests and number of cache usage
    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

    if (cacheCandidate != null && cacheResponse == null) {
      //If the cache is not available, turn it off
      cacheCandidate.body?.closeQuietly()
    }

    //3. According to the policy, if the network is not used and there is no cache, an error is directly reported and the error code 504 is returned
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    //4. If the cache is available, the data in the cache is returned
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }

    if (cacheResponse != null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) {
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
      //5. Call the proceed method, which calls the intercept method of the next interceptor ConnectInterceptor (explained in the previous article)
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    //6. After receiving the network result, if the response code is 304, the cache is used and the cache result is returned
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        //Update cacheResponse
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }
	//7. Read network results
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()
	//8. Cache data
    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            // This will log a conditional cache miss only.
            listener.cacheMiss(call)
          }
        }
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }

CacheStrategy is the implementation of okhttp caching policy, which follows the HTTP caching policy. The following article is recommended. After understanding the HTTP caching policy, let's look at CacheStrategy again:
Detailed explanation of browser HTTP protocol caching mechanism

Note 2: call the compute method of CacheStrategy class to obtain the cache policy:

    fun compute(): CacheStrategy {
      val candidate = computeCandidate()

      // We're forbidden from using the network and the cache is insufficient.
      if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null, null)
      }

      return candidate
    }

The computeCandidate() method is called internally:

   private fun computeCandidate(): CacheStrategy {
      //No cached response
      if (cacheResponse == null) {
        return CacheStrategy(request, null)
      }

      //If the required handshake is missing, the cached response is deleted
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)
      }

      //If you can't cache
      if (!isCacheable(cacheResponse, request)) {
        return CacheStrategy(request, null)
      }

      val requestCaching = request.cacheControl
      Request header nocache Or the request header contains If-Modified-Since perhaps If-None-Match(This means that the local cache has expired, and the server needs to verify whether the local cache can continue to be used)
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)
      }
	  //This indicates that the Response cache is available

	  //Get the cache control field CacheControl of the Response
      val responseCaching = cacheResponse.cacheControl
	  //Get the length of time that the Response has been cached
      val ageMillis = cacheResponseAge()
      //Get the length of time that the Response can be cached
      var freshMillis = computeFreshnessLifetime()

      if (requestCaching.maxAgeSeconds != -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      if (requestCaching.minFreshSeconds != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

      var maxStaleMillis: Long = 0
      if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }
	  //Judge whether the cache expires and decide whether to use the Response cache: the length of Response cached is < Max stale + Max age
      if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
        }
        val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
        }
        //If the cache has not expired, use the Response cache directly
        return CacheStrategy(null, builder.build())
      }

      //If the cache has expired, judge whether tag such as Etag or last modified is set
      val conditionName: String
      val conditionValue: String?
      when {
        etag != null -> {
          conditionName = "If-None-Match"
          conditionValue = etag
        }

        lastModified != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = lastModifiedString
        }

        servedDate != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }
		//No tag such as Etag or last modified is set in the cache, so the network request is made directly
        else -> return CacheStrategy(request, null)
      }
	  //The cache is marked with Etag or last modified, so the if none match or if modified since request header is added to construct the request and submit it to the server to judge whether the cache is available
      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
      return CacheStrategy(conditionalRequest, cacheResponse)
    }

It can be seen that the OkHttp cache determines the combination of networkRequest and cacheResponse according to the HTTP cache policy:

  • 1. Forced caching: the client participates in the decision-making to decide whether to continue using the cache. When the client requests data for the first time, the server returns the expiration time of the cache: Expires or cache control. When the client requests again, it will judge the expiration time of the cache. If it does not expire, it can continue to use the cache. Otherwise, it will not be used and re request the server.
  • 2. Compare cache: the server participates in the decision-making to decide whether to continue using the cache. When the client requests data for the first time, the server will return the cache ID: last modified / if modified since, Etag / if none match and the data to the client. When the client requests again, the client will send the cache ID to the server, and the server will judge according to the cache ID, If the cache has not been updated and can be used, 304 is returned, indicating that the client can continue to use the cache. Otherwise, the client cannot continue to use the cache and can only use the new response returned by the server.

5, ConnectInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //Returning an Exchange instance is actually creating an available connection
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    //Call the interceptor in the previous article (the interceptor in the next article contains the interceptor)
    return connectedChain.proceed(realChain.request)
  }

Take a look at how initExchange is implemented:

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }
	//1. Get the exchange finder, which is defined in the redirection interceptor
    val exchangeFinder = this.exchangeFinder!!
    //2. Get codec
    val codec = exchangeFinder.find(client, chain)
    //3. Create connection
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }

Note 2:

  fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      //1. Obtain a healthy and available connection according to the configuration of clien
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      //2. Return the codec of this connection
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

First enter the findHealthyConnection method to see what kind of connection is returned:

  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      //Find connection
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Available connections are returned
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }
		
	//If the exception enters the next loop to continue to get the connection

Continue to follow up to findConnection. This method is difficult to understand. Look at the notes to enhance understanding:

  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    //Try to use a connection that has already been created, which may have been restricted from creating a new flow
    val callConnection = call.connection 
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        //If the created connection has been restricted from creating a new flow, release the connection (the connection will be empty in releaseConnectionNoEvents) and return the Socket of the connection to close it
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      //1. If the created connection can still be used, use it directly as the result
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    //Coming here shows that we need a new connection and give it new statistics.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    //2. Try to find an available connection from the connection pool for the first time
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    //Configure routing
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      routes = null
      route = routeSelection!!.next()
    } else {
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      //The second attempt to find an available connection from the connection pool (one more routing parameter than the first)
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    //I still haven't got the connection, so a new connection will be created here, and the Socket connection will be carried out later
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    //If we have previously created a multiplexed connection with the same address, release the newly created connection and obtain the previous connection
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      //Gets the available multiplexed connections in the pool
      val result = call.connection!!
      nextRouteToTry = route
      //Release the newly generated connection
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      //If a new connection is generated, put it into the connection pool
      connectionPool.put(newConnection)
      //Replace the new connection with call connection
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }

First enter the callAcquirePooledConnection method to see how to get the available connection:

  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
  //Traverse connection pool
    for (connection in connections) {
      synchronized(connection) {
        //If multiplexing is required but the connection cannot be multiplexed (for HTTP2), return false directly
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        //If the connection is qualified, the connection is taken out and assigned to call connection
        if (!connection.isEligible(address, routes)) return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }

Let's see what is qualified to use?

  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
    assertThreadHoldsLock()

    //If new requests are not accepted or the number of requests exceeds the limit, false is returned
    if (calls.size >= allocationLimit || noNewExchanges) return false

    //If the url is different from the non host segment address, fasle is returned
    if (!this.route.address.equalsNonHost(address)) return false

    //Returns true if the host is the same
    if (address.url.host == this.route().address.url.host) {
      return true // This connection is a perfect match.
    }
    
    //In the case of HTTP2
    //The HTTP2 connection is null and returns false
    if (http2Connection == null) return false

    //The route must share an IP address, otherwise false is returned
    if (routes == null || !routeMatchesAny(routes)) return false

    //This connection service certificate must cover the host
    if (address.hostnameVerifier !== OkHostnameVerifier) return false
    if (!supportsUrl(address.url)) return false

    //The fixed certificate must also match the host
    try {
      address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
    } catch (_: SSLPeerUnverifiedException) {
      return false
    }

    return true
  }

After these steps, we will return a RealConnection instance. Next, we will return to note 2 at the top of this interceptor and look at the statement return resultconnection What does newcodec (client, chain) do:

  internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection
	//If it is an Http2 connection, an Http2 decoder is returned
    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
    //Otherwise, an Http1 decoder is returned
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }

These codecs internally convert OkHttp requests into HTTP request messages, and convert response messages into user-friendly messages when returning results.

After obtaining the exclusive codec, the decoder and exchangeFinder objects are encapsulated into the Exchange object and returned.

Exchange corresponds to Request one by one. When a new Request is created, an exchange will be created. The exchange is responsible for sending the Request and reading the response data, and the exchange code is used for sending and receiving data.

6, CallServerInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()
    
	//1. Write the requested Header through the Exchange object created by the previous interceptor
    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
	//...
	  //Write the requested Body through okio
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        //...
    } else {
      exchange.noRequestBody()
    }
	//...
	
    //Read the header of the response through the readResponseHeaders(boolean) method of Exchange
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    //After obtaining the Response, construct the Response through the Builder pattern
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      //Even if we didn't request, the server sent 100 continue. Try reading the actual response status again.
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }
	//End of read request header
    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
     // The requester has asked the server to switch the protocol, and the server has confirmed and is ready to switch, but we need to ensure that the interceptor sees a non empty response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
      	  //Returns a response with a response body
          .body(exchange.openResponseBody(response))
          .build()
    }
	//...
    }
    return response
  }

In ConnectInterceptor, we have established a connection, connected to the server and obtained the input and output streams. Therefore, the intercept(Chain) method logic of CallServerInterceptor is to send the request to the server and then obtain the response of the server.

The whole process of this interceptor is:

  1. Write request header
  2. Write request body
  3. Read response header
  4. Read response body

References for two articles on OkHttp:
Request flow of okhttp3 source code analysis
Interceptor for okhttp3 source code analysis
OKhttp source code analysis series
Okhttp3 source code analysis

Keywords: Android network Design Pattern kotlin

Added by seriousdamage on Sat, 15 Jan 2022 11:46:01 +0200