Articles Catalogue
- Simple use
- Source code analysis
- Overall architecture
- brief introduction
- Interface - Interface Layer
- Protocol - Protocol Layer
- Connection -- Connection Layer
- Cache - Cache layer: Managing local caches
- I/O-I/O Layer: Realization of Real Data Reading and Writing
- Inteceptor - Interceptor Layer: Intercept Network Access, Insert Interception Logic
- Interface - Interface Layer
- Interceptor layer
Simple use
-
OkHttp's github: https://github.com/square/okhttp
-
First you need to add dependencies
implementation("com.squareup.okhttp3:okhttp:4.0.1")
-
Adding Network Permissions
<uses-permission android:name="android.permission.INTERNET" />
-
Then create the OkHttpClient object
OkHttpClient client = new OkHttpClient(); //To configure OkHttpClient, you should create OkHttpClient through the Builder of OkHttpClient. OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(3, TimeUnit.SECONDS) .retryOnConnectionFailure(true) //... .build();
-
Next, create the Request object
// Upload a single file RequestBody requestBody = RequestBody.create("", MediaType.parse("text/plain;charset=utf-8")); // Upload Forms RequestBody requestBody1 = new MultipartBody.Builder() //Uploading a form must have this sentence .setType(MultipartBody.FORM) .addFormDataPart("key", "value") .build(); //FormBody is a subclass of RequestBody that can only be used to upload forms whose contents are strings. FormBody formBody = new FormBody.Builder() .add("key", "value") .build(); Request request = new Request.Builder() .url("https://www.google.com") //.get() //.method("post", requestBody) .post(formBody) .addHeader("content-type", "application/json") .build();
-
Encapsulate Request object as Call object
Call call = client.newCall(request);
-
Decide whether to execute synchronously or asynchronously
// Synchronized execution call.execute(); // Asynchronous execution call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Toast.makeText(context, "request was aborted", Toast.LENGTH_SHORT).show(); } @Override public void onResponse(Call call, final Response response) throws IOException { String res = response.body().string(); } });
Source code analysis
Overall architecture
brief introduction
The overall OkHttp architecture as shown in the figure is roughly divided into the following layers:
- Interface - Interface Layer: Receiving Network Request Access
- Protocol Layer - Protocol Layer: Processing Protocol Logic
- Connection - Connection layer: Manage network connections, send new requests, and receive server access
- Cache - Cache layer: Managing local caches
- I/O-I/O Layer: Realization of Real Data Reading and Writing
- Inteceptor - Interceptor layer: Intercept network requests and insert interception logic
Interface - Interface Layer
The interface layer is used to receive users'network requests and initiate actual network requests. See the analysis below.
Protocol - Protocol Layer
Protocol layer is responsible for processing protocol logic, OkHttp supports HTTP1/HTTP2/WebSocket three protocols.
Connection -- Connection Layer
This layer is responsible for network connection. There is a Real Connection Pool in the connection layer, which manages all Socket connections in a unified way. When a user initiates a network request, OkHttp will first find out whether the connection meets the requirements from the connection pool. If so, it will send network requests directly through the connection, otherwise a new connection will be established to send requests.
RealConnection describes a physical Socket connection that maintains multiple RealConnection objects in the connection pool. Because HTTP2 supports multiple multiplexing and a Real Connection supports multiple network access requests, Stream Allocation is introduced to describe the actual network request overhead. A Real Connection corresponds to one or more Stream Allocations, so Stream Allocation can be regarded as a counter of Real Connection, when Real Co. The reference count for nnection becomes zero, and will be released if it is not re-occupied by other requests for a long time.
Cache - Cache layer: Managing local caches
This layer is responsible for maintaining the request cache. When the user's network request has a local cache that meets the requirements, OkHttp returns the results directly from the cache. See Cache Interceptor Analysis.
I/O-I/O Layer: Realization of Real Data Reading and Writing
This layer is responsible for actual data reading and writing. Core class: ExchangeCodec, which is just an interface. The implementation classes are Http1ExchangeCodec and Http2ExchangeCodec
Inteceptor - Interceptor Layer: Intercept Network Access, Insert Interception Logic
The interceptor layer provides an AOP-like interface to facilitate users to intercept network access at all levels and execute relevant logic. Specifically, the following is an introduction
Interface - Interface Layer
The interface layer is used to receive users'network requests and initiate actual network requests. Following is an introduction to several classes:
OkHttpClient: OkHttpClient is the client of OkHttp framework. Users can configure OkHttpClient in a variety of ways. All network requests are made through OkHttpClient. Each OkHttpClient maintains its own task queue, connection pool, Cache, interceptor and so on, so it should use the singleton mode in a project.
Request: Request is used to describe the request to be initiated. It includes request url, request method, request header, request body, cache control, whether it is an https request or not.
Call: Call object describes an actual network request. Each network request of a user is a Call object, and a Call object can only be executed once. Call is actually an interface, and the implementation class is the RealCall class, which is used to describe synchronous requests. The ReaCall class has an internal class, AsyncCall class, which inherits from the Runnable interface, so each Call is a thread, and the process of executing the Call is the process of executeOn method. In RealCall, OkHttpClient, Request objects are saved, and whether they are WebSocket s is marked.
Dispatcher: This class is OkHttpClient's task queue, which maintains a thread pool internally. When a Call is received, it is responsible for finding idle threads in the thread pool and executing its executeOn method.
OkHttpClient
//In any case, the Builder object was finally created. //A class in kotlin can have a primary constructor and one or more secondary constructors. The main constructor is part of the class header, following the class name. //If the main constructor does not have any annotations or visibility modifiers, the internal keyword can be omitted. //The main constructor cannot contain any code, and the initialization code can be placed in the initialization block prefixed by the init keyword. The parameters of the main constructor can be used in the initialization block. //If there is a primary constructor, then each secondary constructor needs to be delegated to the primary constructor (directly or indirectly) open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { //... constructor() : this(Builder()) //... init { if (builder.sslSocketFactoryOrNull != null || connectionSpecs.none { it.isTls }) { this.sslSocketFactoryOrNull = builder.sslSocketFactoryOrNull this.certificateChainCleaner = builder.certificateChainCleaner } else { val trustManager = Platform.get().platformTrustManager() Platform.get().configureTrustManager(trustManager) this.sslSocketFactoryOrNull = newSslSocketFactory(trustManager) this.certificateChainCleaner = CertificateChainCleaner.get(trustManager) } if (sslSocketFactoryOrNull != null) { Platform.get().configureSslSocketFactory(sslSocketFactoryOrNull) } this.certificatePinner = builder.certificatePinner .withCertificateChainCleaner(certificateChainCleaner) check(null !in (interceptors as List<Interceptor?>)) { "Null interceptor: $interceptors" } check(null !in (networkInterceptors as List<Interceptor?>)) { "Null network interceptor: $networkInterceptors" } } }
//Builder can be created in two ways: without parameters, with OkHttpClient parameters. class Builder constructor() { //There are four visibility modifiers in kotlin: private, protected, internal, and public by default. //public: everywhere //private: declares that it is visible in the file //internal: Modules (an IDEA module; a Maven project; a Gradle source set; a set of files compiled by a < kotlinc > Ant task execution) are visible //protected: Not applicable to top-level declarations. internal var dispatcher: Dispatcher = Dispatcher()//Distributor internal var connectionPool: ConnectionPool = ConnectionPool()//Connection pool internal val interceptors: MutableList<Interceptor> = mutableListOf()//Interceptor internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()//Network Interceptor internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()//Monitor Factory internal var retryOnConnectionFailure = true//Whether to retry if the connection fails internal var authenticator: Authenticator = Authenticator.NONE//Local authentication internal var followRedirects = true//Whether HTTP is redirected or not internal var followSslRedirects = true//Whether HTTPS is redirected or not internal var cookieJar: CookieJar = CookieJar.NO_COOKIES//Cookie internal var cache: Cache? = null//cache internal var dns: Dns = Dns.SYSTEM//Domain Name Resolution internal var proxy: Proxy? = null//agent internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector()//surrogate selector internal var proxyAuthenticator: Authenticator = Authenticator.NONE//Proxy authentication internal var socketFactory: SocketFactory = SocketFactory.getDefault()//socket factory internal var sslSocketFactoryOrNull: SSLSocketFactory? = null//ssl socket factory for HTTPS internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS//Transport layer version and connection protocol. internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS//Supported protocols internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier//Used to determine the host name internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT//Certificate Chain internal var certificateChainCleaner: CertificateChainCleaner? = null//Certificate Confirmation internal var callTimeout = 0//Call timeout internal var connectTimeout = 10_000//Connection timeout internal var readTimeout = 10_000//Read timeout internal var writeTimeout = 10_000//Write timeout internal var pingInterval = 0//Long connection command interval internal constructor(okHttpClient: OkHttpClient) : this() { this.dispatcher = okHttpClient.dispatcher this.connectionPool = okHttpClient.connectionPool this.interceptors += okHttpClient.interceptors this.networkInterceptors += okHttpClient.networkInterceptors this.eventListenerFactory = okHttpClient.eventListenerFactory this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure this.authenticator = okHttpClient.authenticator this.followRedirects = okHttpClient.followRedirects this.followSslRedirects = okHttpClient.followSslRedirects this.cookieJar = okHttpClient.cookieJar this.cache = okHttpClient.cache this.dns = okHttpClient.dns this.proxy = okHttpClient.proxy this.proxySelector = okHttpClient.proxySelector this.proxyAuthenticator = okHttpClient.proxyAuthenticator this.socketFactory = okHttpClient.socketFactory this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull this.connectionSpecs = okHttpClient.connectionSpecs this.protocols = okHttpClient.protocols this.hostnameVerifier = okHttpClient.hostnameVerifier this.certificatePinner = okHttpClient.certificatePinner this.certificateChainCleaner = okHttpClient.certificateChainCleaner this.callTimeout = okHttpClient.callTimeoutMillis this.connectTimeout = okHttpClient.connectTimeoutMillis this.readTimeout = okHttpClient.readTimeoutMillis this.writeTimeout = okHttpClient.writeTimeoutMillis this.pingInterval = okHttpClient.pingIntervalMillis } }
Request
Request request = new Request.Builder() .url("https://wwww.baidu.com") .get() .build();
//Request's unique constructor class Request internal constructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internal val tags: Map<Class<*>, Any> ) //Request.Builder class open class Builder { internal var url: HttpUrl? = null//Request url internal var method: String//Request method internal var headers: Headers.Builder//Request header internal var body: RequestBody? = null//Requestor /** A mutable map of tags, or an immutable empty map if we don't have any. */ internal var tags: MutableMap<Class<*>, Any> = mutableMapOf() constructor() {//This constructor that we typically call this.method = "GET"//Default to GET request this.headers = Headers.Builder()//Create default headers } internal constructor(request: Request) {//Builder objects are constructed according to Request. this.url = request.url this.method = request.method this.body = request.body this.tags = if (request.tags.isEmpty()) { mutableMapOf() } else { request.tags.toMutableMap() } this.headers = request.headers.newBuilder() } //... open fun build(): Request { return Request( checkNotNull(url) { "url == null" }, method, headers.build(), body, tags.toImmutableMap() ) } }
Call
Call call = client.newCall(request);
//OkHttpClient.newCall //What is actually created is a RealCall object override fun newCall(request: Request): Call { return RealCall.newRealCall(this, request, forWebSocket = false) } //RealCall.newCall fun newRealCall( client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean ): RealCall { // Safely publish the Call instance to the EventListener. //Secure publication of Call instances to EventListener //apply's role: Elegantly assign values to transmitter. return RealCall(client, originalRequest, forWebSocket).apply { transmitter = Transmitter(client, this) } } internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean )
call.execute()
Execution logic:
- Add corresponding tasks to the synchronous task queue of the distributor
- Execution of tasks
- Complete the task through the dispenser and queue the corresponding task
override fun execute(): Response { synchronized(this) { //A call object can only be executed once check(!executed) { "Already Executed" } executed = true } //timeout.enter(), which is the only sentence in this method. transmitter.timeoutEnter() //This method calls EventListener's start method, but this method is empty. transmitter.callStart() try { //Analysis of Dispatcher's Excuted Method client.dispatcher.executed(this) //See getResponseWithInterceptorChain method analysis. return getResponseWithInterceptorChain()//Synchronized execution, block threads } finally { //See Dispatcher's finished Method Analysis client.dispatcher.finished(this) } }
call.enqueue(callback)
This method actually puts an AsyncCall instance in the asynchronous task queue and waits for the right time to execute.
override fun enqueue(responseCallback: Callback) { synchronized(this) { //Similarly, a call object can only be executed once check(!executed) { "Already Executed" } executed = true } //The start method of EventListenrer is called, which is implemented empty. transmitter.callStart() //See Dispatcher.enqueue method analysis client.dispatcher.enqueue(AsyncCall(responseCallback)) }
call.getResponseWithInterceptorChain()
This method adds all interceptor objects to interceptors, then creates Real Interceptor Chain objects, executes his proceed method to actually execute requests, and finally obtains Response and returns by executing each interceptor in the interceptor chain at one time.
//RealCall.getResponseWithInterceptorChain() @Throws(IOException::class) fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors. //Construct the interceptor chain. val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors//Custom Interceptor interceptors += RetryAndFollowUpInterceptor(client)//Redirectional interceptor responsible for failed retries and redirections interceptors += BridgeInterceptor(client.cookieJar)//Bridge interceptor, which adds the necessary request header when requesting and removes the necessary request header when receiving the response interceptors += CacheInterceptor(client.cache)//Cache interceptor, responsible for reading the cache directly returned, update the cache interceptors += ConnectInterceptor//Connect interceptor, responsible for establishing connection with server //As you can see above, this value is false, and this value is passed into the settings when the object is created. //RealCall.newRealCall(this, request, forWebSocket = false) if (!forWebSocket) { interceptors += client.networkInterceptors//Customized network interceptor, network Interceptors set when configuring OkHttpClient } interceptors += CallServerInterceptor(forWebSocket)//Request server interceptor, responsible for sending request data to the server and reading the corresponding data from the server val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)//Get the interceptor chain var calledNoMoreExchanges = false try { //See Interceptor Chain Layer Analysis val response = chain.proceed(originalRequest) if (transmitter.isCanceled) {//Default to false response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw transmitter.noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null) } } }
AsyncCall
//RealCall's Internal Class internal inner class AsyncCall( private val responseCallback: Callback//Callbacks set during asynchronous execution. ) : Runnable { @Volatile private var callsPerHost = AtomicInteger(0) fun callsPerHost(): AtomicInteger = callsPerHost fun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost } fun host(): String = originalRequest.url.host//This url is an object of the HttpUrl class. The host should correspond to the Host in the Http request header. This field is used to identify which virtual host under ip, because an IP can correspond to multiple domain names. fun request(): Request = originalRequest fun get(): RealCall = this@RealCall /** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ //Executing the run method of the object in the thread pool is actually executing the getResponseWithInterceptorChain method. If the execution fails, the callback onFailure method is called and the distributor is notified to complete. fun executeOn(executorService: ExecutorService) { assert(!Thread.holdsLock(client.dispatcher)) var success = false//Indicates whether the AsyncCall object was successfully executed. try { //Execute the run method of the object in the thread pool. executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) transmitter.noMoreExchanges(ioException) responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { client.dispatcher.finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false transmitter.timeoutEnter() try { //The request is executed through the interceptor chain. This method has been analyzed, and this method is based on the external class RealCall. val response = getResponseWithInterceptorChain() signalledCallback = true //Successful callback. responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e) } else { //Failure callback responseCallback.onFailure(this@RealCall, e) } } finally { //Notify that the request has been executed client.dispatcher.finished(this) } } } }
Dispatcher
brief introduction
OkHttp maintains a thread pool inside its task queue to execute specific network requests.
OkHttp's task queue consists of two parts:
- Dispatcher: Responsible for finding suitable execution threads for tasks
- Network Request Task Thread Pool
Dispatcher distributes tasks to appropriate idle threads to achieve non-blocking, high availability and high concurrent connections
class Dispatcher constructor() { //Maximum number of requests executed concurrently. @get:Synchronized var maxRequests = 64 set(maxRequests) { require(maxRequests >= 1) { "max < 1: $maxRequests" } synchronized(this) { field = maxRequests } promoteAndExecute() } /* Maximum number of requests executed concurrently by each host. This restricts requests through the host name of the URL. Note that concurrent requests for a single IP address may still exceed this limit: multiple hostnames may share IP addresses or route through the same HTTP proxy. If the request exceeds the [maxRequestsPerHost] request when it is invoked, the requests * will remain in flight. The WebSocket connection to the host does not include this restriction. */ @get:Synchronized var maxRequestsPerHost = 5 set(maxRequestsPerHost) { require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" } synchronized(this) { field = maxRequestsPerHost } promoteAndExecute() } @set:Synchronized @get:Synchronized var idleCallback: Runnable? = null //Thread pool private var executorServiceOrNull: ExecutorService? = null @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { //The number of core threads in the thread pool is 0, the maximum capacity is the maximum of int, and the threads are reclaimed after idling for 60 seconds. executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("OkHttp Dispatcher", false)) } return executorServiceOrNull!! } /** Ready async calls in the order they'll be run. */ //Asynchronous task queue private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ //Executing Asynchronous Task Queue private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ //Executing synchronous task queue private val runningSyncCalls = ArrayDeque<RealCall>() }
Dispatcher.executed
This method simply adds RealCall objects to the synchronous execution queue and waits for the right time to execute.
@Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) }
Dispatcher.enqueue
This method simply adds the Call object to the queue to be executed, and then calls the promoteAndExecute method.
//Dispatcher.enqueue internal fun enqueue(call: AsyncCall) { synchronized(this) { //private val readyAsyncCalls = ArrayDeque<AsyncCall>() //Add the AsyncCall object to the asynchronous execution queue readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to the same host. //Change AsyncCall so that it shares the existing Atomic Integer running calls with the same host. if (!call.get().forWebSocket) {//The default is false, so the default is executed val existingCall = findExistingCallWithHost(call.host()) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute()//True implementation }
Dispatcher.finished
This method removes the call object from the runningSyncCalls queue and executes the executable request in the asynchronous pending queue.
//Dispatcher.finished internal fun finished(call: RealCall) { finished(runningSyncCalls, call) } private fun <T> finished(calls: Deque<T>, call: T) { val idleCallback: Runnable? synchronized(this) { //Remove the call object from the runningSyncCalls queue. if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback } //See PromAndExecute analysis, where the return value indicates whether asynchronous execution is currently being performed val isRunning = promoteAndExecute() //If the thread pool is idle, execute the idle notification callback thread if (!isRunning && idleCallback != null) { idleCallback.run() } }
Dispatcher.promoteAndExecute
If conditions are met:
- The current number of requests is less than the maximum number of requests (64)
- Requests for a single host are less than the threshold (5)
Insert the task into the ongoing task queue and execute the corresponding task. If not, put it in the queue to be executed.
private fun promoteAndExecute(): Boolean { assert(!Thread.holdsLock(this)) val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) {//Traversing through the AsyncCall that currently needs to be executed val asyncCall = i.next() //The requests being processed have reached a maximum (64) and are not processed. if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //call takes less than the maximum number of host s (5) to execute if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity. //Remove from the queue to be processed i.remove() //Increase the current value by 1 atomically asyncCall.callsPerHost().incrementAndGet() //Add call to the executing asynchronous task queue executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } //If there is value in the pool being executed, it proves that there are also asynchronous tasks executed. isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) {//Let handled call s execute in the thread pool val asyncCall = executableCalls[i] asyncCall.executeOn(executorService)//Analysis 3: ExcuteOn Method of AsyncCall } return isRunning }
Interceptor layer
According to RealCall's getResponseWithInterceptorChain method, the order of interceptors in the interceptor chain is as follows: user-defined interceptors, RetryAndFollowUpInterceptor, BridgeInterceptor, CacheInterceptor, Connect Interceptor, Network Interceptors, CallServer Interceptor.
[External Link Picture Transfer Failure (img-WIwVuRgF-1565601083521) https://i.imgur.com/XRLzWiu.png)]
RealInterceptorChain.proceed()
This method actually does only two things:
Create the next interceptor chain. Input index+1 makes the next interceptor chain accessible only from the next interceptor.
Execute the intercept method index ed and pass the next interceptor chain into the method.
//RealInterceptorChain.proceed() override fun proceed(request: Request): Response { return proceed(request, transmitter, exchange) } //Practically executed by this method. @Throws(IOException::class) fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response { if (index >= interceptors.size) throw AssertionError() calls++ // If we already have a stream, confirm that the incoming request will use it. //If we already have a stream, make sure that the incoming request will use it. check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } // If we already have a stream, confirm that this is the only call to chain.proceed(). //If we already have a stream, make sure that this is the only call to chain.proceed (). check(this.exchange == null || calls <= 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } // Call the next interceptor in the chain. //Call the next interceptor in the chain. val next = RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout) //Get the current interceptor val interceptor = interceptors[index] //Analysis 3. Call the intercept method of the current interceptor and pass on the next interceptor object. @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") // Confirm that the next interceptor made its required call to chain.proceed(). //Verify that the next interceptor makes the necessary call to chain.proceed (). check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } check(response.body != null) { "interceptor $interceptor returned a response with no body" } return response }
RetryAndFollowUpInterceptor.intercept
If the user does not add a custom interceptor, the interceptor is the first interceptor in the interceptor chain. OkHttp supports reconnection by default.
//This interceptor recovers from failure and follows redirection when necessary. If the call is cancelled, it may throw [IOException]. @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { var request = chain.request() val realChain = chain as RealInterceptorChain //Transmitter is a bridge between OkHttp's application and network layer. This class exposes advanced application layer primitives: connections, requests, responses, and flows. val transmitter = realChain.transmitter() var followUpCount = 0 var priorResponse: Response? = null while (true) { transmitter.prepareToConnect(request) if (transmitter.isCanceled) { throw IOException("Canceled") } var response: Response var success = false try { //Execute the proceed method of the next interceptor chain. response = realChain.proceed(request, transmitter, null) success = true } catch (e: RouteException) { // The attempt to connect via a route failed. The request will not have been sent. //Attempts to connect via routing failed. The request will not be sent. if (!recover(e.lastConnectException, transmitter, false, request)) { throw e.firstConnectException } continue } catch (e: IOException) { // An attempt to communicate with a server failed. The request may have been sent. //The attempt to communicate with the server failed. The request may have been sent. val requestSendStarted = e !is ConnectionShutdownException if (!recover(e, transmitter, requestSendStarted, request)) throw e continue } finally { // The network call threw an exception. Release any resources. //Network calls caused anomalies. Release any resources. if (!success) { transmitter.exchangeDoneDueToException() } } // Attach the prior response if it exists. Such responses never have a body. //Attach the previous response (if it exists). This response has never been a response body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = response.exchange val route = exchange?.connection()?.route() //Calculate the response to receive the HTTP request of [userResponse]. This will add authentication headers, follow redirection or process client request timeouts. If subsequent operations are not necessary or applicable, null is returned. val followUp = followUpRequest(response, route) if (followUp == null) { if (exchange != null && exchange.isDuplex) { transmitter.timeoutEarlyExit() } return response } val followUpBody = followUp.body //isOneShot: If this principal needs to call [writeTo] at most once and can be transmitted at most once, it returns true and false by default. if (followUpBody != null && followUpBody.isOneShot()) { return response } response.body?.closeQuietly() if (transmitter.hasExchange()) { exchange?.detachWithViolence() } if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } }
BridgeInterceptor.intercept
A bridge from application code to network code. First, it builds network requests from user requests. Then it continues to call the network. Finally, it constructs the user response from the network response.
//This method mainly deals with the header of the message. The request is then handed over to the next interceptor in the interceptor chain. //Set content length and content encoding mode. //Set up gzip compression, decompress after receiving content, reprocess content length, content encoding, content type. //Add Cookie, Host, Connection and other message headers. @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //Get a request val userRequest = chain.request() //Store request content val requestBuilder = userRequest.newBuilder() //Requestor val body = userRequest.body //Processing the request header part. if (body != null) { //The type of content in the body of the request. val contentType = body.contentType() //Add a request header to indicate the type of request body. if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()) } //Entity body size, content coding is not accurate. val contentLength = body.contentLength() if (contentLength != -1L) { requestBuilder.header("Content-Length", contentLength.toString()) //The first part stipulates the encoding mode of entity subject. requestBuilder.removeHeader("Transfer-Encoding") } else { requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.removeHeader("Content-Length") } } //If the request header does not request the host, the Host request header is set according to the url. if (userRequest.header("Host") == null) { requestBuilder.header("Host", userRequest.url.toHostHeader()) } //Default persistent connection. if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive") } // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing the transfer stream. //If we add an "Accept-Encoding: gzip" header field, we are also responsible for decompressing/transporting the stream. var transparentGzip = false if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true requestBuilder.header("Accept-Encoding", "gzip") } //Add Cookie val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } //Execute the proceed method of the next interceptor chain. val networkResponse = chain.proceed(requestBuilder.build()) //Set the response head. cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) //Set the request. So that the application layer knows what it sends out and gets. val responseBuilder = networkResponse.newBuilder() .request(userRequest) //Decompress the responder. if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body //Content-Length is invalid when the response volume is encoded, so it needs to be removed. And set the response body type. if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() }
CacheInterceptor.intercept
Return the cache when there is a cache;
Update the cache when the content is updated;
Cache failure is deleted.
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //Check to see if there is a response cache for this Request. val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() //Use [cacheResponse] to return a policy that satisfies [request] val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse cache?.trackResponse(strategy) if (cacheCandidate != null && cacheResponse == null) { // The cache candidate wasn't applicable. Close it. //Cache candidates are not applicable. Close it. cacheCandidate.body?.closeQuietly() } // If we're forbidden from using the network and the cache is insufficient, fail. //If we are prohibited from using the network and have insufficient caching, we fail. if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) //HTTP1.1 .protocol(Protocol.HTTP_1_1) //504 .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") //Empty responder .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build() } // If we don't need the network, we're done. //If there is a cache and the network cannot be used, the cache is used if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build() } var networkResponse: Response? = null try { //proceed method to execute the next interceptor chain networkResponse = chain.proceed(networkRequest) } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. //If we crash in I/O or other areas, please do not leak the main body of the cache. if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } // If we have a cache response too, then we're doing a conditional get. //If we also have cached responses, then we are doing conditional acquisition. if (cacheResponse != null) { //304, resources unchanged, caching can be used if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() //Response headers combined with caching and network requests .headers(combine(cacheResponse.headers, networkResponse.headers)) //Delivery time .sentRequestAtMillis(networkResponse.sentRequestAtMillis) //Receiving time .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) //Cache response .cacheResponse(stripBody(cacheResponse)) //Network Request Response .networkResponse(stripBody(networkResponse)) .build() //Close the network request responder networkResponse.body!!.close() // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). //After merging the header, but before stripping the Content-Encoding header //Update the cache (executed by initContentStream (). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response } else {//If the cache is invalid, it is closed cacheResponse.body?.closeQuietly() } } //Recreate Response val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() //cache defaults to null when created if (cache != null) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. //Provide this request to the cache. val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response) } if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } } return response }
ConnectInterceptor.intercept
Open the connection to the target server and continue to execute the next interceptor.
@Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val request = realChain.request() val transmitter = realChain.transmitter() // We need the network to satisfy this request. Possibly for validating a conditional GET. //We need the Internet to meet this requirement. It may be used to validate the condition GET. val doExtensiveHealthChecks = request.method != "GET" //val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks) //Key code, creating ExchangeCodec instance //Find an available connection val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks) return realChain.proceed(request, transmitter, exchange) }
CallServerInterceptor.intercept
- Write request header information
- Write to the body of the request, not the GET or HEAD method
- Closing the request
- Getting response information
//Interceptor intercept is used to explain interceptor interception. @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //Get the interceptor chain. val realChain = chain as RealInterceptorChain //Exchange classes are used to transfer a single HTTP request and response pair. This layers connection management and events on [Exchange Codec] to handle actual I/O. val exchange = realChain.exchange() //Get the request to be sent val request = realChain.request() //Get the body of the request to be sent val requestBody = request.body //Sending request event is the current time of the system val sentRequestMillis = System.currentTimeMillis() //Write the request header for the current request exchange.writeRequestHeaders(request) var responseHeadersStarted = false var responseBuilder: Response.Builder? = null //Write to the requester if it has a requester and is not a GET request or a HEAD request //fun permitsRequestBody(method: String): Boolean = !(method == "GET" || method == "HEAD") if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. //If the value of the request header "Expect" is "100-continue", the request header is used to inform the server that a particular behavior is expected. if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { //Write the request through HttpConnection. Actually, the flush method of BufferedSink is called. exchange.flushRequest() responseHeadersStarted = true //EventListener notification starts returning the response header, which is actually empty. exchange.responseHeadersStart() //Read the response header. responseBuilder = exchange.readResponseHeaders(true) } if (responseBuilder == null) { if (requestBody.isDuplex()) {//This method returns false by default // Prepare a duplex body so that the application can send a request body later. //Prepare the duplex so that the application can send the request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // Write the request body if the "Expect: 100-continue" expectation was met. //If you meet Expect: 100-continue expectations, write to the request body. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection()!!.isMultiplexed) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. //Reuse of HTTP/1 connections is prevented if Expect: 100-continue expectations are not met. //Otherwise, we still have an obligation to send the request body to make the connection consistent. exchange.noNewExchangesOnConnection() } } } else {//GET requests or HEAD requests to obtain a body without the requester, do not write exchange.noRequestBody() } //The default value of requestBody.isDuplex() is false, which means that this method is executed by default. if (requestBody == null || !requestBody.isDuplex()) { //Notification request terminated exchange.finishRequest() } //When the value of "Expect" in the request header is "100-continue", the value is true and the default is false. if (!responseHeadersStarted) { exchange.responseHeadersStart() } //If the reading fails or fails, the reader reads again if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false)!! } //Get the response var response = responseBuilder .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false)!! .request(request) .handshake(exchange.connection()!!.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code } exchange.responseHeadersEnd(response) response = if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } if ("close".equals(response.request.header("Connection"), ignoreCase = true) || "close".equals(response.header("Connection"), ignoreCase = true)) { exchange.noNewExchangesOnConnection() } if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) { throw ProtocolException( "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}") } return response }