Android network framework -- OkHttp source code analysis
This article is based on Kotlin language
Previous article:
Analysis of OkHttp source code of Android network framework (1) -- request process
After explaining the request process of OkHttp, this article enters the second part - OkHttp interceptor analysis.
Interceptor analysis of OkHttp source code analysis
In the process of sending the request in the previous chapter, there is an important method getResponseWithInterceptorChain(), which will create many interceptors. They will process the request before sending it to the server. After the server returns the response, these interceptors will return after processing the response. Interceptor is an important core function of okhttp. The implementation of each interceptor function will also involve okhttp's caching and connection mechanism:
// getResponseWithInterceptorChain method of RealCall Response getResponseWithInterceptorChain() throws IOException { // Create an interceptor chain List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors());//Add custom application interceptor interceptors.add(retryAndFollowUpInterceptor);.//Add interceptor responsible for retrying redirection interceptors.add(new BridgeInterceptor(client.cookieJar()));//Add interceptor responsible for transforming request response interceptors.add(new CacheInterceptor(client.internalCache()));//Add interceptor responsible for caching interceptors.add(new ConnectInterceptor(client));//Add interceptors responsible for managing connections if (!forWebSocket) { //Generally, the interceptor is not added without special requirements interceptors.addAll(client.networkInterceptors());//Add our custom network interceptor } interceptors.add(new CallServerInterceptor(forWebSocket));//Add an interceptor responsible for initiating a request and obtaining a response //Create chain Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); //Call the Chain's processed (request) method to process the request (this method returns a response) return chain.proceed(originalRequest); }
From the above code, we can see that interceptors can be divided into the following categories:
- custom interceptor
- RetryAndFollowUpInterceptor
- BridgeInterceptor
- CacheInterceptor (CACHE interceptor)
- ConnectInterceptor
- CallServerInterceptor (request server interceptor)
- networkInterceptors() (network interceptors are not commonly used and will not be explained in this article)
1, Custom interceptor
OK, let's take a look at the HTTP interceptor first:
fun interface Interceptor { @Throws(IOException::class) //Interceptor logic fun intercept(chain: Chain): Response companion object { inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor = Interceptor { block(it) } } interface Chain { //Return Request fun request(): Request @Throws(IOException::class) //Call the processed method of Chain to process the Request and return the Response fun proceed(request: Request): Response //Custom application interceptors are generally null fun connection(): Connection? //Return the corresponding Call object fun call(): Call //connection timed out fun connectTimeoutMillis(): Int fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain //Read in timeout fun readTimeoutMillis(): Int fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain //Write timeout fun writeTimeoutMillis(): Int fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain } }
It can be seen that the interceptor interface is mainly composed of intercept(Chain) method and internal interface Chain. Here is a customized Interceptor:
class MyInterceptor : Interceptor { @Throws(IOException::class) fun intercept(chain: Chain): Response { //1. Get Request val request: Request = chain.request() //2. To process the Request, the logic part defines itself //... //3. Call the Chain's processed (request) method to process the request and get the Response response = chain.proceed(request) //4. Handle the Response, and the logic part defines it by itself //... //5. Return Response return response } }
After customizing our custom interceptors, how can we add them to our OkHttp configuration? See the following java code:
OkHttpClient client = new OkHttpClient.Builder() .addInterceptor(new MyInterceptor())//Add a custom interceptor through the addInterceptor() method .build();
2, RetryAndFollowUpInterceptor
//intercept method of RetryAndFollowUpInterceptor class @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain //Get request object var request = chain.request //Get RealCall object val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { //Note 1: create exchange finder internally call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") } try { //Call the proceed method, which calls the intercept method of the next interceptor BridgeInterceptor (explained in the previous article) response = realChain.proceed(request) //A successful request indicates that the ExchangeFinder is occupied. Setting newExchangeFinder to true means that a new ExchangeFinder will be created next time newExchangeFinder = true } catch (e: RouteException) { //Call the recover method to detect whether the connection can continue to be used if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } //If the request fails, it means that the exchange finder is idle. If the new exchange finder is set to false, it means that the exchange finder is used instead of creating a new exchange finder next time newExchangeFinder = false continue } catch (e: IOException) { //Call the recover method to detect whether the connection can continue to be used if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } //If the request fails, it means that the exchange finder is idle. If the new exchange finder is set to false, it means that the exchange finder is used instead of creating a new exchange finder next time newExchangeFinder = false continue } //If there is a redirection request before, the response to the redirection request is returned directly if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange) //followUp is null, no redirection is required, and the Response is returned directly if (followUp == null) { //... return response } //followUp is not empty, redirection is required val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() //The redirection times are too many. An exception is thrown if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } //Retry again with the reset backward Request request = followUp priorResponse = response } finally { //Release connection resources under abnormal conditions call.exitNetworkInterceptorExchange(closeActiveExchange) } } }
Note 1:
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) { check(interceptorScopedExchange == null) //... if (newExchangeFinder) { this.exchangeFinder = ExchangeFinder( connectionPool, createAddress(request.url), this, eventListener ) } }
The above code can clearly see that whether an exchange finder object is returned according to the newExchangeFinder value. The purpose of this object is to request to find an available TCP connection, and one of the parameters will be request The URL is encapsulated in the Address object to see its implementation:
private fun createAddress(url: HttpUrl): Address { var sslSocketFactory: SSLSocketFactory? = null var hostnameVerifier: HostnameVerifier? = null var certificatePinner: CertificatePinner? = null if (url.isHttps) { sslSocketFactory = client.sslSocketFactory hostnameVerifier = client.hostnameVerifier certificatePinner = client.certificatePinner } return Address( uriHost = url.host, uriPort = url.port, dns = client.dns, socketFactory = client.socketFactory, sslSocketFactory = sslSocketFactory, hostnameVerifier = hostnameVerifier, certificatePinner = certificatePinner, proxyAuthenticator = client.proxyAuthenticator, proxy = client.proxy, protocols = client.protocols, connectionSpecs = client.connectionSpecs, proxySelector = client.proxySelector ) }
You can see the returned result. Create an address instance using the url and client configuration. Address is the address of the connection to the service, which can be understood as the request address and its configuration.
Address plays an important role: HTTP requests with the same address share the same connection, which is actually for http1 1 and http2 0 judgment of multiplexing connection request.
Here, we first remember that in the intercept method of the RetryAndFollowUpInterceptor class, we first create an ExchangeFinder object, which will be used later!
3, BridgeInterceptor
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() val body = userRequest.body if (body != null) { //Encapsulate Header val contentType = body.contentType() if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()) } val contentLength = body.contentLength() if (contentLength != -1L) { requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.removeHeader("Transfer-Encoding") } else { requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.removeHeader("Content-Length") } } if (userRequest.header("Host") == null) { requestBuilder.header("Host", userRequest.url.toHostHeader()) } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive") } //If we add a "accept encoding: gzip" header field, we are also responsible for decompressing the transport stream. var transparentGzip = false if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true requestBuilder.header("Accept-Encoding", "gzip") } //Create cookieJar for OkhttpClient configuration val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } //Call the proceed method, which calls the intercept method of the next interceptor, ChcheInterceptor (explained in the previous article) val networkResponse = chain.proceed(requestBuilder.build()) //The following steps build a user response based on the network response cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() }
With its name, it is a network connection bridge, which is responsible for converting the user constructed request into the request sent to the server, and converting the response returned by the server into a user-friendly response.
- In the Request phase, configure user information and add some Request headers.
- In the Response phase, cancel some request headers and unzip them.
4, CacheInterceptor
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() //1. Read candidate cache val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() //2. Create cache policy, enforce cache, compare cache, etc val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse //According to the cache policy, update the statistical indicators: number of requests, number of network requests and number of cache usage cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE if (cacheCandidate != null && cacheResponse == null) { //If the cache is not available, turn it off cacheCandidate.body?.closeQuietly() } //3. According to the policy, if the network is not used and there is no cache, an error is directly reported and the error code 504 is returned if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } } //4. If the cache is available, the data in the cache is returned if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } } if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) } var networkResponse: Response? = null try { //5. Call the proceed method, which calls the intercept method of the next interceptor ConnectInterceptor (explained in the previous article) networkResponse = chain.proceed(networkRequest) } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } //6. After receiving the network result, if the response code is 304, the cache is used and the cache result is returned if (cacheResponse != null) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() //Update cacheResponse cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { cacheResponse.body?.closeQuietly() } } //7. Read network results val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() //8. Cache data if (cache != null) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { // This will log a conditional cache miss only. listener.cacheMiss(call) } } } if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } } return response }
CacheStrategy is the implementation of okhttp caching policy, which follows the HTTP caching policy. The following article is recommended. After understanding the HTTP caching policy, let's look at CacheStrategy again:
Detailed explanation of browser HTTP protocol caching mechanism
Note 2: call the compute method of CacheStrategy class to obtain the cache policy:
fun compute(): CacheStrategy { val candidate = computeCandidate() // We're forbidden from using the network and the cache is insufficient. if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) { return CacheStrategy(null, null) } return candidate }
The computeCandidate() method is called internally:
private fun computeCandidate(): CacheStrategy { //No cached response if (cacheResponse == null) { return CacheStrategy(request, null) } //If the required handshake is missing, the cached response is deleted if (request.isHttps && cacheResponse.handshake == null) { return CacheStrategy(request, null) } //If you can't cache if (!isCacheable(cacheResponse, request)) { return CacheStrategy(request, null) } val requestCaching = request.cacheControl Request header nocache Or the request header contains If-Modified-Since perhaps If-None-Match(This means that the local cache has expired, and the server needs to verify whether the local cache can continue to be used) if (requestCaching.noCache || hasConditions(request)) { return CacheStrategy(request, null) } //This indicates that the Response cache is available //Get the cache control field CacheControl of the Response val responseCaching = cacheResponse.cacheControl //Get the length of time that the Response has been cached val ageMillis = cacheResponseAge() //Get the length of time that the Response can be cached var freshMillis = computeFreshnessLifetime() if (requestCaching.maxAgeSeconds != -1) { freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong())) } var minFreshMillis: Long = 0 if (requestCaching.minFreshSeconds != -1) { minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong()) } var maxStaleMillis: Long = 0 if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) { maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong()) } //Judge whether the cache expires and decide whether to use the Response cache: the length of Response cached is < Max stale + Max age if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { val builder = cacheResponse.newBuilder() if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"") } val oneDayMillis = 24 * 60 * 60 * 1000L if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"") } //If the cache has not expired, use the Response cache directly return CacheStrategy(null, builder.build()) } //If the cache has expired, judge whether tag such as Etag or last modified is set val conditionName: String val conditionValue: String? when { etag != null -> { conditionName = "If-None-Match" conditionValue = etag } lastModified != null -> { conditionName = "If-Modified-Since" conditionValue = lastModifiedString } servedDate != null -> { conditionName = "If-Modified-Since" conditionValue = servedDateString } //No tag such as Etag or last modified is set in the cache, so the network request is made directly else -> return CacheStrategy(request, null) } //The cache is marked with Etag or last modified, so the if none match or if modified since request header is added to construct the request and submit it to the server to judge whether the cache is available val conditionalRequestHeaders = request.headers.newBuilder() conditionalRequestHeaders.addLenient(conditionName, conditionValue!!) val conditionalRequest = request.newBuilder() .headers(conditionalRequestHeaders.build()) .build() return CacheStrategy(conditionalRequest, cacheResponse) }
It can be seen that the OkHttp cache determines the combination of networkRequest and cacheResponse according to the HTTP cache policy:
- 1. Forced caching: the client participates in the decision-making to decide whether to continue using the cache. When the client requests data for the first time, the server returns the expiration time of the cache: Expires or cache control. When the client requests again, it will judge the expiration time of the cache. If it does not expire, it can continue to use the cache. Otherwise, it will not be used and re request the server.
- 2. Compare cache: the server participates in the decision-making to decide whether to continue using the cache. When the client requests data for the first time, the server will return the cache ID: last modified / if modified since, Etag / if none match and the data to the client. When the client requests again, the client will send the cache ID to the server, and the server will judge according to the cache ID, If the cache has not been updated and can be used, 304 is returned, indicating that the client can continue to use the cache. Otherwise, the client cannot continue to use the cache and can only use the new response returned by the server.
5, ConnectInterceptor
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain //Returning an Exchange instance is actually creating an available connection val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) //Call the interceptor in the previous article (the interceptor in the next article contains the interceptor) return connectedChain.proceed(realChain.request) }
Take a look at how initExchange is implemented:
internal fun initExchange(chain: RealInterceptorChain): Exchange { synchronized(this) { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) } //1. Get the exchange finder, which is defined in the redirection interceptor val exchangeFinder = this.exchangeFinder!! //2. Get codec val codec = exchangeFinder.find(client, chain) //3. Create connection val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result synchronized(this) { this.requestBodyOpen = true this.responseBodyOpen = true } if (canceled) throw IOException("Canceled") return result }
Note 2:
fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec { try { //1. Obtain a healthy and available connection according to the configuration of clien val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) //2. Return the codec of this connection return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } }
First enter the findHealthyConnection method to see what kind of connection is returned:
@Throws(IOException::class) private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { //Find connection val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) // Available connections are returned if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } //If the exception enters the next loop to continue to get the connection
Continue to follow up to findConnection. This method is difficult to understand. Look at the notes to enhance understanding:
@Throws(IOException::class) private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { if (call.isCanceled()) throw IOException("Canceled") //Try to use a connection that has already been created, which may have been restricted from creating a new flow val callConnection = call.connection if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { //If the created connection has been restricted from creating a new flow, release the connection (the connection will be empty in releaseConnectionNoEvents) and return the Socket of the connection to close it if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } //1. If the created connection can still be used, use it directly as the result if (call.connection != null) { check(toClose == null) return callConnection } toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } //Coming here shows that we need a new connection and give it new statistics. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 //2. Try to find an available connection from the connection pool for the first time if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } //Configure routing val routes: List<Route>? val route: Route if (nextRouteToTry != null) { routes = null route = nextRouteToTry!! nextRouteToTry = null } else if (routeSelection != null && routeSelection!!.hasNext()) { routes = null route = routeSelection!!.next() } else { var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection routes = localRouteSelection.routes if (call.isCanceled()) throw IOException("Canceled") //The second attempt to find an available connection from the connection pool (one more routing parameter than the first) if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } route = localRouteSelection.next() } //I still haven't got the connection, so a new connection will be created here, and the Socket connection will be carried out later val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) //If we have previously created a multiplexed connection with the same address, release the newly created connection and obtain the previous connection if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { //Gets the available multiplexed connections in the pool val result = call.connection!! nextRouteToTry = route //Release the newly generated connection newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { //If a new connection is generated, put it into the connection pool connectionPool.put(newConnection) //Replace the new connection with call connection call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
First enter the callAcquirePooledConnection method to see how to get the available connection:
fun callAcquirePooledConnection( address: Address, call: RealCall, routes: List<Route>?, requireMultiplexed: Boolean ): Boolean { //Traverse connection pool for (connection in connections) { synchronized(connection) { //If multiplexing is required but the connection cannot be multiplexed (for HTTP2), return false directly if (requireMultiplexed && !connection.isMultiplexed) return@synchronized //If the connection is qualified, the connection is taken out and assigned to call connection if (!connection.isEligible(address, routes)) return@synchronized call.acquireConnectionNoEvents(connection) return true } } return false }
Let's see what is qualified to use?
internal fun isEligible(address: Address, routes: List<Route>?): Boolean { assertThreadHoldsLock() //If new requests are not accepted or the number of requests exceeds the limit, false is returned if (calls.size >= allocationLimit || noNewExchanges) return false //If the url is different from the non host segment address, fasle is returned if (!this.route.address.equalsNonHost(address)) return false //Returns true if the host is the same if (address.url.host == this.route().address.url.host) { return true // This connection is a perfect match. } //In the case of HTTP2 //The HTTP2 connection is null and returns false if (http2Connection == null) return false //The route must share an IP address, otherwise false is returned if (routes == null || !routeMatchesAny(routes)) return false //This connection service certificate must cover the host if (address.hostnameVerifier !== OkHostnameVerifier) return false if (!supportsUrl(address.url)) return false //The fixed certificate must also match the host try { address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates) } catch (_: SSLPeerUnverifiedException) { return false } return true }
After these steps, we will return a RealConnection instance. Next, we will return to note 2 at the top of this interceptor and look at the statement return resultconnection What does newcodec (client, chain) do:
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec { val socket = this.socket!! val source = this.source!! val sink = this.sink!! val http2Connection = this.http2Connection //If it is an Http2 connection, an Http2 decoder is returned return if (http2Connection != null) { Http2ExchangeCodec(client, this, chain, http2Connection) } else { //Otherwise, an Http1 decoder is returned socket.soTimeout = chain.readTimeoutMillis() source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS) sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS) Http1ExchangeCodec(client, this, source, sink) } }
These codecs internally convert OkHttp requests into HTTP request messages, and convert response messages into user-friendly messages when returning results.
After obtaining the exclusive codec, the decoder and exchangeFinder objects are encapsulated into the Exchange object and returned.
Exchange corresponds to Request one by one. When a new Request is created, an exchange will be created. The exchange is responsible for sending the Request and reading the response data, and the exchange code is used for sending and receiving data.
6, CallServerInterceptor
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() //1. Write the requested Header through the Exchange object created by the previous interceptor exchange.writeRequestHeaders(request) var invokeStartEvent = true var responseBuilder: Response.Builder? = null if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { //... //Write the requested Body through okio if (responseBuilder == null) { if (requestBody.isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // Write the request body if the "Expect: 100-continue" expectation was met. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { //... } else { exchange.noRequestBody() } //... //Read the header of the response through the readResponseHeaders(boolean) method of Exchange if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } //After obtaining the Response, construct the Response through the Builder pattern var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code if (code == 100) { //Even if we didn't request, the server sent 100 continue. Try reading the actual response status again. responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() } response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code } //End of read request header exchange.responseHeadersEnd(response) response = if (forWebSocket && code == 101) { // The requester has asked the server to switch the protocol, and the server has confirmed and is ready to switch, but we need to ensure that the interceptor sees a non empty response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() //Returns a response with a response body .body(exchange.openResponseBody(response)) .build() } //... } return response }
In ConnectInterceptor, we have established a connection, connected to the server and obtained the input and output streams. Therefore, the intercept(Chain) method logic of CallServerInterceptor is to send the request to the server and then obtain the response of the server.
The whole process of this interceptor is:
- Write request header
- Write request body
- Read response header
- Read response body
References for two articles on OkHttp:
Request flow of okhttp3 source code analysis
Interceptor for okhttp3 source code analysis
OKhttp source code analysis series
Okhttp3 source code analysis