Source code analysis of OkHttp -- Analysis of connection interceptor

We know that OkHttp is connected through Socket, but so far, we haven't seen the operations related to Socket connection. This section is mainly to see how OkHttp connects through ConnectInterceptor

catalogue

1, Mainstream process

2, Core code

3, Process combing

4, Summary

1, Mainstream process

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    //first line
    val realChain = chain as RealInterceptorChain
    //Second line
    val exchange = realChain.call.initExchange(chain)
    //Third line
    val connectedChain = realChain.copy(exchange = exchange)
    //Fourth line
    return connectedChain.proceed(realChain.request)
  }
}

See the code of ConnectInterceptor, isn't it very concise, but it's not simple

Line 1: obtain the chain of responsibility, which is also available in other interceptors

Line 2: take the responsibility chain as a parameter and obtain the Exchange object through initExchange of RealCall. What is this Exchange object? It's so mysterious and it's hard to understand the literal meaning. Let's take a look at the notes of Exchange

/**
 * Transmits a single HTTP request and a response pair. This layers connection management and events
 * on [ExchangeCodec], which handles the actual I/O.
 */

Transport a single HTTP request and response pair. The connection management and events of this layer are in exchange code, which handles the actual I/O

We can know that Exchange is responsible for managing connections through Exchange code

Line 3: transfer the Exchange object as a parameter into the responsibility chain, and copy other parameters to obtain a new responsibility chain

Line 4: return the response data through the new responsibility chain

Through the above code, we know that the core step is Exchange in the second line. Next, we take Exchange as the entry

2, Core code

Let's first look at initExchange, which is responsible for creating or finding an available connection in the connection pool

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }
    //It is created in RetryAndFollowUpInterceptor, mentioned in the previous chapter, and used here in RealCall
    val exchangeFinder = this.exchangeFinder!!
    //The core code is here
    //The find method returns ExchangeCodec, which is responsible for the encoding and decoding of requests and responses
    //There are two specific implementations: Http2ExchangeCodec and Http1ExchangeCodec
    //Http2ExchangeCodec is a socket connection that supports HTTP/2, and Http1ExchangeCodec supports HTTP/1.1            
    //Socket connection
    val codec = exchangeFinder.find(client, chain)
    //Create exchange. Exchange is the role of managing connections and handles I/O operations through exchange code
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }

The above key code has been annotated. I mainly focus on the socket connection. Let's take a look at the find method

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

Create a RealConnection through the findHealthyConnection method, and complete socket connection, tls connection and other operations in the RealConnection. Let's look at the findHealthyConnection method first

The findHealthyConnection method is an endless loop to find an available connection. There are two results: either an available connection is found or an exception is thrown

private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Confirm that the connection is good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      // If it isn't, take it out of the pool.
      candidate.noNewExchanges()

      // Make sure we have some routes left to try. One example where we may exhaust all the routes
      // would happen if we made a new connection and it immediately is detected as unhealthy.
      if (nextRouteToTry != null) continue

      val routesLeft = routeSelection?.hasNext() ?: true
      if (routesLeft) continue

      val routesSelectionLeft = routeSelector?.hasNext() ?: true
      if (routesSelectionLeft) continue

      throw IOException("exhausted all routes")
    }
  }

Focus on the findConnection method inside

private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // Attempt to reuse the connection from the call.
    //1
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
      // because we already acquired it.
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      // The call's connection was released.
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    // We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // Attempt to get a connection from the pool.
    //2
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // Nothing in the pool. Figure out what route we'll try next.
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      // Use a route from a preceding coalesced connection.
      //3
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // Use a route from an existing route selection.
      //4
      routes = null
      route = routeSelection!!.next()
    } else {
      // Compute a new route selection. This is a blocking operation!
      //5 take this branch for the first time
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    // Connect. Tell the call about the connecting call so async cancels work.
    //6
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }
  1. First check whether the last connection is available, and return directly if available
  2. The last connection was unavailable. Check the connection pool to see if there are any available connections
  3. If not in the ConnectionPool , use to find a route from a previously merged connection
  4. If the route in step 3 cannot be found, use the route in the existing routeSelection
  5. If no available routes are found in step 4, get a route set routes through the RouteSelector, find the available connections through the IP address and route set again, return if found, and return a route through localRouteSelection if not found
  6. Create a new connection RealConnection(connectionPool, route) through the route returned in a step of 3 / 4 / 5. The new connection completes the tunnel establishment, socket connection, TLS handshake, encryption and other operations through the connect method

This process involves RouteSelector, Selection and Route. Let's take a look at the relationship between them first

 

Let's first look at the class annotation of RouteSelector

/**
 * Selects routes to connect to an origin server. Each connection requires a choice of proxy server,
 * IP address, and TLS mode. Connections may also be recycled.
 */

Select the route to connect to the source server. Each connection needs to select proxy server, IP address and TLS mode. Connections can also be recycled.

That's very clear. It's responsible for routing

Selection can simply understand a set of router routes. The above code:

val localRouteSelection = localRouteSelector.next()

The return is RouteSelection, and the next step is the core Route class. Let's continue to take a look

class Route(
  @get:JvmName("address") val address: Address,
  /**
   * Returns the [Proxy] of this route.
   *
   * **Warning:** This may disagree with [Address.proxy] when it is null. When
   * the address's proxy is null, the proxy selector is used.
   */
  @get:JvmName("proxy") val proxy: Proxy,
  @get:JvmName("socketAddress") val socketAddress: InetSocketAddress
) {
//ellipsis
}

It can be seen that route encapsulates address, proxy and soketAddress, and Selection is an iterator of list < route >, which is responsible for traversing route.

Call path of RouteSelector:

RouteSelector-->>nextProxy-->>resetNextInetSocketAddres

In resetNextInetSocketAddres, the final IP address is obtained by calling address dns. Lookup, DNS is client DNS is the DNS passed in during the construction of OkHttpclient System, in the lookup method, call InetAddress The getallbyname method obtains the IP of the corresponding domain name, that is, the default DNS implementation

val addresses = address.dns.lookup(socketHost)

Here, the DNS resolution is completed, the RealConnection is found, and then the socket connection is completed

Look directly at the connect method of RealConnection

Omit front
    while (true) {
      try {
        //Returns true if this route transmits HTTPS through an HTTP proxy tunnel
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break
          }
        } else {
          //socket connection
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        //Various protocol connections are completed in this step; https and non https
        //https will connect through TLS protocol. After the handshake is successful, if HTTP/2 is supported, HTTP2 is preferred;
        //Non Https takes precedence over HTTP/2. If it is not supported, go to HTTP/1
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch (e: IOException) {
        //ellipsis
      }
    }

This involves a tunnel establishment problem. You can see that in the connectTunnel method, you first judge whether to use the tunnel connection. If not, you can use the socket connection instead of the socket connection; The meaning of tunnel connection is that the proxy server can only forward and cannot tamper with data. On the contrary, it is the opposite;

That's all for the basic process. Let's sort it out through a diagram

3, Process combing

The above is the sequence diagram of ConnectInterceptor, involving some core classes; Return the ExchangeCodec object through newCodec in RealConnection. This class has two implementations: Http1ExchangeCodec (HTTP/1.1 protocol) and Http2Exchangecodec (HTTP/2 protocol)

4, Summary

  • ConnectInterceptor involves DNS resolution, TLS connection and socket connection
  • DNS resolution in the process of route finding by RouteSelector, IP address is found by DNS of address, and DNS of address is passed in DNS when building client System
  • The establishment of socket connection and tunnel connection should be judged. Either the connect method is executed or the socket connection is established
  • RealConnection.connectTls is a TLS connection, involving handshake, certificate verification, etc

Added by [UW] Jake on Wed, 05 Jan 2022 03:56:13 +0200