The Simple Use of OkHttp and Source Code Analysis

Articles Catalogue

Simple use

  1. OkHttp's github: https://github.com/square/okhttp

  2. First you need to add dependencies

    implementation("com.squareup.okhttp3:okhttp:4.0.1")
    
  3. Adding Network Permissions

    <uses-permission android:name="android.permission.INTERNET" />
    
  4. Then create the OkHttpClient object

    OkHttpClient client = new OkHttpClient();
    //To configure OkHttpClient, you should create OkHttpClient through the Builder of OkHttpClient.
    OkHttpClient client = new OkHttpClient.Builder()
        .connectTimeout(3, TimeUnit.SECONDS)
        .retryOnConnectionFailure(true)
        //...
        .build();
    
  5. Next, create the Request object

    // Upload a single file
    RequestBody requestBody = RequestBody.create("", 								  		MediaType.parse("text/plain;charset=utf-8"));
    // Upload Forms
    RequestBody requestBody1 = new MultipartBody.Builder()
        //Uploading a form must have this sentence
        .setType(MultipartBody.FORM)
        .addFormDataPart("key", "value")
        .build();
    //FormBody is a subclass of RequestBody that can only be used to upload forms whose contents are strings.
    FormBody formBody = new FormBody.Builder()
        .add("key", "value")
        .build();
    
    Request request = new Request.Builder()
        .url("https://www.google.com")
        //.get()
        //.method("post", requestBody)
        .post(formBody)
        .addHeader("content-type", "application/json")
        .build();
    
  6. Encapsulate Request object as Call object

    Call call = client.newCall(request);
    
  7. Decide whether to execute synchronously or asynchronously

    // Synchronized execution
    call.execute();
    
    // Asynchronous execution
    call.enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Toast.makeText(context, "request was aborted", Toast.LENGTH_SHORT).show();
        }
    
        @Override
        public void onResponse(Call call, final Response response) throws IOException {
            String res = response.body().string();
        }
    });
    

Source code analysis

Overall architecture

brief introduction


The overall OkHttp architecture as shown in the figure is roughly divided into the following layers:

  • Interface - Interface Layer: Receiving Network Request Access
  • Protocol Layer - Protocol Layer: Processing Protocol Logic
  • Connection - Connection layer: Manage network connections, send new requests, and receive server access
  • Cache - Cache layer: Managing local caches
  • I/O-I/O Layer: Realization of Real Data Reading and Writing
  • Inteceptor - Interceptor layer: Intercept network requests and insert interception logic

Interface - Interface Layer

The interface layer is used to receive users'network requests and initiate actual network requests. See the analysis below.

Protocol - Protocol Layer

Protocol layer is responsible for processing protocol logic, OkHttp supports HTTP1/HTTP2/WebSocket three protocols.

Connection -- Connection Layer

This layer is responsible for network connection. There is a Real Connection Pool in the connection layer, which manages all Socket connections in a unified way. When a user initiates a network request, OkHttp will first find out whether the connection meets the requirements from the connection pool. If so, it will send network requests directly through the connection, otherwise a new connection will be established to send requests.

RealConnection describes a physical Socket connection that maintains multiple RealConnection objects in the connection pool. Because HTTP2 supports multiple multiplexing and a Real Connection supports multiple network access requests, Stream Allocation is introduced to describe the actual network request overhead. A Real Connection corresponds to one or more Stream Allocations, so Stream Allocation can be regarded as a counter of Real Connection, when Real Co. The reference count for nnection becomes zero, and will be released if it is not re-occupied by other requests for a long time.

Cache - Cache layer: Managing local caches

This layer is responsible for maintaining the request cache. When the user's network request has a local cache that meets the requirements, OkHttp returns the results directly from the cache. See Cache Interceptor Analysis.

I/O-I/O Layer: Realization of Real Data Reading and Writing

This layer is responsible for actual data reading and writing. Core class: ExchangeCodec, which is just an interface. The implementation classes are Http1ExchangeCodec and Http2ExchangeCodec

Inteceptor - Interceptor Layer: Intercept Network Access, Insert Interception Logic

The interceptor layer provides an AOP-like interface to facilitate users to intercept network access at all levels and execute relevant logic. Specifically, the following is an introduction

Interface - Interface Layer

The interface layer is used to receive users'network requests and initiate actual network requests. Following is an introduction to several classes:

OkHttpClient: OkHttpClient is the client of OkHttp framework. Users can configure OkHttpClient in a variety of ways. All network requests are made through OkHttpClient. Each OkHttpClient maintains its own task queue, connection pool, Cache, interceptor and so on, so it should use the singleton mode in a project.

Request: Request is used to describe the request to be initiated. It includes request url, request method, request header, request body, cache control, whether it is an https request or not.

Call: Call object describes an actual network request. Each network request of a user is a Call object, and a Call object can only be executed once. Call is actually an interface, and the implementation class is the RealCall class, which is used to describe synchronous requests. The ReaCall class has an internal class, AsyncCall class, which inherits from the Runnable interface, so each Call is a thread, and the process of executing the Call is the process of executeOn method. In RealCall, OkHttpClient, Request objects are saved, and whether they are WebSocket s is marked.

Dispatcher: This class is OkHttpClient's task queue, which maintains a thread pool internally. When a Call is received, it is responsible for finding idle threads in the thread pool and executing its executeOn method.

OkHttpClient

//In any case, the Builder object was finally created.
//A class in kotlin can have a primary constructor and one or more secondary constructors. The main constructor is part of the class header, following the class name.
//If the main constructor does not have any annotations or visibility modifiers, the internal keyword can be omitted.
//The main constructor cannot contain any code, and the initialization code can be placed in the initialization block prefixed by the init keyword. The parameters of the main constructor can be used in the initialization block.
//If there is a primary constructor, then each secondary constructor needs to be delegated to the primary constructor (directly or indirectly)
open class OkHttpClient internal constructor(
    builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
    //...
    constructor() : this(Builder())
    //...
    init {
        if (builder.sslSocketFactoryOrNull != null || connectionSpecs.none { it.isTls }) {
            this.sslSocketFactoryOrNull = builder.sslSocketFactoryOrNull
            this.certificateChainCleaner = builder.certificateChainCleaner
        } else {
            val trustManager = Platform.get().platformTrustManager()
            Platform.get().configureTrustManager(trustManager)
            this.sslSocketFactoryOrNull = newSslSocketFactory(trustManager)
            this.certificateChainCleaner = CertificateChainCleaner.get(trustManager)
        }

        if (sslSocketFactoryOrNull != null) {
            Platform.get().configureSslSocketFactory(sslSocketFactoryOrNull)
        }

        this.certificatePinner = builder.certificatePinner
        .withCertificateChainCleaner(certificateChainCleaner)

        check(null !in (interceptors as List<Interceptor?>)) {
            "Null interceptor: $interceptors"
        }
        check(null !in (networkInterceptors as List<Interceptor?>)) {
            "Null network interceptor: $networkInterceptors"
        }
    }
}
//Builder can be created in two ways: without parameters, with OkHttpClient parameters.
class Builder constructor() {
    //There are four visibility modifiers in kotlin: private, protected, internal, and public by default.
    //public: everywhere
    //private: declares that it is visible in the file
    //internal: Modules (an IDEA module; a Maven project; a Gradle source set; a set of files compiled by a < kotlinc > Ant task execution) are visible
    //protected: Not applicable to top-level declarations.
    internal var dispatcher: Dispatcher = Dispatcher()//Distributor
    internal var connectionPool: ConnectionPool = ConnectionPool()//Connection pool
    internal val interceptors: MutableList<Interceptor> = mutableListOf()//Interceptor
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()//Network Interceptor
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()//Monitor Factory
    internal var retryOnConnectionFailure = true//Whether to retry if the connection fails
    internal var authenticator: Authenticator = Authenticator.NONE//Local authentication
    internal var followRedirects = true//Whether HTTP is redirected or not
    internal var followSslRedirects = true//Whether HTTPS is redirected or not
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES//Cookie
    internal var cache: Cache? = null//cache
    internal var dns: Dns = Dns.SYSTEM//Domain Name Resolution
    internal var proxy: Proxy? = null//agent
    internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector()//surrogate selector
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE//Proxy authentication
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()//socket factory
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null//ssl socket factory for HTTPS
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS//Transport layer version and connection protocol.
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS//Supported protocols
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier//Used to determine the host name
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT//Certificate Chain
    internal var certificateChainCleaner: CertificateChainCleaner? = null//Certificate Confirmation
    internal var callTimeout = 0//Call timeout
    internal var connectTimeout = 10_000//Connection timeout
    internal var readTimeout = 10_000//Read timeout
    internal var writeTimeout = 10_000//Write timeout
    internal var pingInterval = 0//Long connection command interval

    internal constructor(okHttpClient: OkHttpClient) : this() {
        this.dispatcher = okHttpClient.dispatcher
        this.connectionPool = okHttpClient.connectionPool
        this.interceptors += okHttpClient.interceptors
        this.networkInterceptors += okHttpClient.networkInterceptors
        this.eventListenerFactory = okHttpClient.eventListenerFactory
        this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
        this.authenticator = okHttpClient.authenticator
        this.followRedirects = okHttpClient.followRedirects
        this.followSslRedirects = okHttpClient.followSslRedirects
        this.cookieJar = okHttpClient.cookieJar
        this.cache = okHttpClient.cache
        this.dns = okHttpClient.dns
        this.proxy = okHttpClient.proxy
        this.proxySelector = okHttpClient.proxySelector
        this.proxyAuthenticator = okHttpClient.proxyAuthenticator
        this.socketFactory = okHttpClient.socketFactory
        this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull
        this.connectionSpecs = okHttpClient.connectionSpecs
        this.protocols = okHttpClient.protocols
        this.hostnameVerifier = okHttpClient.hostnameVerifier
        this.certificatePinner = okHttpClient.certificatePinner
        this.certificateChainCleaner = okHttpClient.certificateChainCleaner
        this.callTimeout = okHttpClient.callTimeoutMillis
        this.connectTimeout = okHttpClient.connectTimeoutMillis
        this.readTimeout = okHttpClient.readTimeoutMillis
        this.writeTimeout = okHttpClient.writeTimeoutMillis
        this.pingInterval = okHttpClient.pingIntervalMillis
    }
}

Request

Request request  = new Request.Builder()
    .url("https://wwww.baidu.com")
    .get()
    .build();
//Request's unique constructor
class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl,
  @get:JvmName("method") val method: String,
  @get:JvmName("headers") val headers: Headers,
  @get:JvmName("body") val body: RequestBody?,
  internal val tags: Map<Class<*>, Any>
)
//Request.Builder class
open class Builder {
    internal var url: HttpUrl? = null//Request url
    internal var method: String//Request method
    internal var headers: Headers.Builder//Request header
    internal var body: RequestBody? = null//Requestor

    /** A mutable map of tags, or an immutable empty map if we don't have any. */
    internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()

    constructor() {//This constructor that we typically call
        this.method = "GET"//Default to GET request
        this.headers = Headers.Builder()//Create default headers
    }

    internal constructor(request: Request) {//Builder objects are constructed according to Request.
        this.url = request.url
        this.method = request.method
        this.body = request.body
        this.tags = if (request.tags.isEmpty()) {
            mutableMapOf()
        } else {
            request.tags.toMutableMap()
        }
        this.headers = request.headers.newBuilder()
    }
    //...
    open fun build(): Request {
        return Request(
            checkNotNull(url) { "url == null" },
            method,
            headers.build(),
            body,
            tags.toImmutableMap()
        )
    }
}

Call

Call call = client.newCall(request);
//OkHttpClient.newCall
//What is actually created is a RealCall object
override fun newCall(request: Request): Call {
    return RealCall.newRealCall(this, request, forWebSocket = false)
}
//RealCall.newCall
fun newRealCall(
    client: OkHttpClient,
    originalRequest: Request,
    forWebSocket: Boolean
): RealCall {
    // Safely publish the Call instance to the EventListener.
    //Secure publication of Call instances to EventListener
    //apply's role: Elegantly assign values to transmitter.
    return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
    }
}
internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
)
call.execute()


Execution logic:

  • Add corresponding tasks to the synchronous task queue of the distributor
  • Execution of tasks
  • Complete the task through the dispenser and queue the corresponding task
override fun execute(): Response {
    synchronized(this) {
        //A call object can only be executed once
        check(!executed) { "Already Executed" }
        executed = true
    }
    //timeout.enter(), which is the only sentence in this method.
    transmitter.timeoutEnter()
    //This method calls EventListener's start method, but this method is empty.
    transmitter.callStart()
    try {
        //Analysis of Dispatcher's Excuted Method
        client.dispatcher.executed(this)
        //See getResponseWithInterceptorChain method analysis.
        return getResponseWithInterceptorChain()//Synchronized execution, block threads
    } finally {
        //See Dispatcher's finished Method Analysis
        client.dispatcher.finished(this)
    }
}
call.enqueue(callback)


This method actually puts an AsyncCall instance in the asynchronous task queue and waits for the right time to execute.

override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
        //Similarly, a call object can only be executed once
        check(!executed) { "Already Executed" }
        executed = true
    }
    //The start method of EventListenrer is called, which is implemented empty.
    transmitter.callStart()
    //See Dispatcher.enqueue method analysis
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}
call.getResponseWithInterceptorChain()

This method adds all interceptor objects to interceptors, then creates Real Interceptor Chain objects, executes his proceed method to actually execute requests, and finally obtains Response and returns by executing each interceptor in the interceptor chain at one time.

//RealCall.getResponseWithInterceptorChain()
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    //Construct the interceptor chain.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors//Custom Interceptor
    interceptors += RetryAndFollowUpInterceptor(client)//Redirectional interceptor responsible for failed retries and redirections
    interceptors += BridgeInterceptor(client.cookieJar)//Bridge interceptor, which adds the necessary request header when requesting and removes the necessary request header when receiving the response
    interceptors += CacheInterceptor(client.cache)//Cache interceptor, responsible for reading the cache directly returned, update the cache
    interceptors += ConnectInterceptor//Connect interceptor, responsible for establishing connection with server
    //As you can see above, this value is false, and this value is passed into the settings when the object is created.
    //RealCall.newRealCall(this, request, forWebSocket = false)
    if (!forWebSocket) {
        interceptors += client.networkInterceptors//Customized network interceptor, network Interceptors set when configuring OkHttpClient
    }
    interceptors += CallServerInterceptor(forWebSocket)//Request server interceptor, responsible for sending request data to the server and reading the corresponding data from the server

    val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)//Get the interceptor chain

    var calledNoMoreExchanges = false
    try {
        //See Interceptor Chain Layer Analysis
        val response = chain.proceed(originalRequest)
        if (transmitter.isCanceled) {//Default to false
            response.closeQuietly()
            throw IOException("Canceled")
        }
        return response
    } catch (e: IOException) {
        calledNoMoreExchanges = true
        throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
        if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null)
        }
    }
}
AsyncCall
//RealCall's Internal Class
internal inner class AsyncCall(
    private val responseCallback: Callback//Callbacks set during asynchronous execution.
) : Runnable {
    @Volatile private var callsPerHost = AtomicInteger(0)

    fun callsPerHost(): AtomicInteger = callsPerHost

    fun reuseCallsPerHostFrom(other: AsyncCall) {
        this.callsPerHost = other.callsPerHost
    }

    fun host(): String = originalRequest.url.host//This url is an object of the HttpUrl class. The host should correspond to the Host in the Http request header. This field is used to identify which virtual host under ip, because an IP can correspond to multiple domain names.

    fun request(): Request = originalRequest

    fun get(): RealCall = this@RealCall

    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    //Executing the run method of the object in the thread pool is actually executing the getResponseWithInterceptorChain method. If the execution fails, the callback onFailure method is called and the distributor is notified to complete.
    fun executeOn(executorService: ExecutorService) {
        assert(!Thread.holdsLock(client.dispatcher))
        var success = false//Indicates whether the AsyncCall object was successfully executed.
        try {
            //Execute the run method of the object in the thread pool.
            executorService.execute(this)
            success = true
        } catch (e: RejectedExecutionException) {
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            transmitter.noMoreExchanges(ioException)
            responseCallback.onFailure(this@RealCall, ioException)
        } finally {
            if (!success) {
                client.dispatcher.finished(this) // This call is no longer running!
            }
        }
    }

    override fun run() {
        threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            transmitter.timeoutEnter()
            try {
                //The request is executed through the interceptor chain. This method has been analyzed, and this method is based on the external class RealCall.
                val response = getResponseWithInterceptorChain()
                signalledCallback = true
                //Successful callback.
                responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
                if (signalledCallback) {
                    // Do not signal the callback twice!
                    Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
                } else {
                    //Failure callback
                    responseCallback.onFailure(this@RealCall, e)
                }
            } finally {
                //Notify that the request has been executed
                client.dispatcher.finished(this)
            }
        }
    }
}

Dispatcher

brief introduction

OkHttp maintains a thread pool inside its task queue to execute specific network requests.

OkHttp's task queue consists of two parts:

  • Dispatcher: Responsible for finding suitable execution threads for tasks
  • Network Request Task Thread Pool

Dispatcher distributes tasks to appropriate idle threads to achieve non-blocking, high availability and high concurrent connections

class Dispatcher constructor() {
    //Maximum number of requests executed concurrently.
    @get:Synchronized var maxRequests = 64
    set(maxRequests) {
        require(maxRequests >= 1) { "max < 1: $maxRequests" }
        synchronized(this) {
            field = maxRequests
        }
        promoteAndExecute()
    }
    
    /*
    Maximum number of requests executed concurrently by each host. This restricts requests through the host name of the URL. Note that concurrent requests for a single IP address may still exceed this limit: multiple hostnames may share IP addresses or route through the same HTTP proxy. If the request exceeds the [maxRequestsPerHost] request when it is invoked, the requests * will remain in flight. The WebSocket connection to the host does not include this restriction.
    */
    @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
        require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
        synchronized(this) {
            field = maxRequestsPerHost
        }
        promoteAndExecute()
    }
    @set:Synchronized
    @get:Synchronized
    var idleCallback: Runnable? = null
	//Thread pool
    private var executorServiceOrNull: ExecutorService? = null

    @get:Synchronized
    @get:JvmName("executorService") val executorService: ExecutorService
    get() {
        if (executorServiceOrNull == null) {
            //The number of core threads in the thread pool is 0, the maximum capacity is the maximum of int, and the threads are reclaimed after idling for 60 seconds.
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
        }
        return executorServiceOrNull!!
    }

    /** Ready async calls in the order they'll be run. */
    //Asynchronous task queue
    private val readyAsyncCalls = ArrayDeque<AsyncCall>()

    /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
    //Executing Asynchronous Task Queue
    private val runningAsyncCalls = ArrayDeque<AsyncCall>()

    /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
    //Executing synchronous task queue
    private val runningSyncCalls = ArrayDeque<RealCall>()
}
Dispatcher.executed

This method simply adds RealCall objects to the synchronous execution queue and waits for the right time to execute.

@Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
}
Dispatcher.enqueue

This method simply adds the Call object to the queue to be executed, and then calls the promoteAndExecute method.

//Dispatcher.enqueue
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //private val readyAsyncCalls = ArrayDeque<AsyncCall>()
        //Add the AsyncCall object to the asynchronous execution queue
        readyAsyncCalls.add(call)

        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to the same host.
        //Change AsyncCall so that it shares the existing Atomic Integer running calls with the same host.
        if (!call.get().forWebSocket) {//The default is false, so the default is executed
            val existingCall = findExistingCallWithHost(call.host())
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
    }
    promoteAndExecute()//True implementation
}
Dispatcher.finished

This method removes the call object from the runningSyncCalls queue and executes the executable request in the asynchronous pending queue.

//Dispatcher.finished
internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
    	//Remove the call object from the runningSyncCalls queue.
        if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
        idleCallback = this.idleCallback
    }
	//See PromAndExecute analysis, where the return value indicates whether asynchronous execution is currently being performed
    val isRunning = promoteAndExecute()
	//If the thread pool is idle, execute the idle notification callback thread
    if (!isRunning && idleCallback != null) {
        idleCallback.run()
    }
}
Dispatcher.promoteAndExecute

If conditions are met:

  • The current number of requests is less than the maximum number of requests (64)
  • Requests for a single host are less than the threshold (5)

Insert the task into the ongoing task queue and execute the corresponding task. If not, put it in the queue to be executed.

private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
        val i = readyAsyncCalls.iterator()
        while (i.hasNext()) {//Traversing through the AsyncCall that currently needs to be executed
            val asyncCall = i.next()

            //The requests being processed have reached a maximum (64) and are not processed.
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            //call takes less than the maximum number of host s (5) to execute
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
            //Remove from the queue to be processed
            i.remove()
            //Increase the current value by 1 atomically
            asyncCall.callsPerHost().incrementAndGet()
            //Add call to the executing asynchronous task queue
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
        }
        //If there is value in the pool being executed, it proves that there are also asynchronous tasks executed.
        isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {//Let handled call s execute in the thread pool
        val asyncCall = executableCalls[i]
        asyncCall.executeOn(executorService)//Analysis 3: ExcuteOn Method of AsyncCall
    }

    return isRunning
}

Interceptor layer

According to RealCall's getResponseWithInterceptorChain method, the order of interceptors in the interceptor chain is as follows: user-defined interceptors, RetryAndFollowUpInterceptor, BridgeInterceptor, CacheInterceptor, Connect Interceptor, Network Interceptors, CallServer Interceptor.

[External Link Picture Transfer Failure (img-WIwVuRgF-1565601083521) https://i.imgur.com/XRLzWiu.png)]

RealInterceptorChain.proceed()

This method actually does only two things:

Create the next interceptor chain. Input index+1 makes the next interceptor chain accessible only from the next interceptor.

Execute the intercept method index ed and pass the next interceptor chain into the method.

//RealInterceptorChain.proceed()
override fun proceed(request: Request): Response {
    return proceed(request, transmitter, exchange)
}
//Practically executed by this method.
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()

    calls++

    // If we already have a stream, confirm that the incoming request will use it.
    //If we already have a stream, make sure that the incoming request will use it.
    check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    //If we already have a stream, make sure that this is the only call to chain.proceed ().
    check(this.exchange == null || calls <= 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }

    // Call the next interceptor in the chain.
    //Call the next interceptor in the chain.
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
                                    index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    //Get the current interceptor
    val interceptor = interceptors[index]

    //Analysis 3. Call the intercept method of the current interceptor and pass on the next interceptor object.
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    // Confirm that the next interceptor made its required call to chain.proceed().
    //Verify that the next interceptor makes the necessary call to chain.proceed ().
    check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
}
RetryAndFollowUpInterceptor.intercept

If the user does not add a custom interceptor, the interceptor is the first interceptor in the interceptor chain. OkHttp supports reconnection by default.

//This interceptor recovers from failure and follows redirection when necessary. If the call is cancelled, it may throw [IOException].
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    var request = chain.request()
    val realChain = chain as RealInterceptorChain
    //Transmitter is a bridge between OkHttp's application and network layer. This class exposes advanced application layer primitives: connections, requests, responses, and flows.
    val transmitter = realChain.transmitter()
    var followUpCount = 0
    var priorResponse: Response? = null
    while (true) {
        transmitter.prepareToConnect(request)

        if (transmitter.isCanceled) {
            throw IOException("Canceled")
        }

        var response: Response
        var success = false
        try {
            //Execute the proceed method of the next interceptor chain.
            response = realChain.proceed(request, transmitter, null)
            success = true
        } catch (e: RouteException) {
            // The attempt to connect via a route failed. The request will not have been sent.
            //Attempts to connect via routing failed. The request will not be sent.
            if (!recover(e.lastConnectException, transmitter, false, request)) {
                throw e.firstConnectException
            }
            continue
        } catch (e: IOException) {
            // An attempt to communicate with a server failed. The request may have been sent.
            //The attempt to communicate with the server failed. The request may have been sent.
            val requestSendStarted = e !is ConnectionShutdownException
            if (!recover(e, transmitter, requestSendStarted, request)) throw e
            continue
        } finally {
            // The network call threw an exception. Release any resources.
            //Network calls caused anomalies. Release any resources.
            if (!success) {
                transmitter.exchangeDoneDueToException()
            }
        }

        // Attach the prior response if it exists. Such responses never have a body.
        //Attach the previous response (if it exists). This response has never been a response body.
        if (priorResponse != null) {
            response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                           .body(null)
                           .build())
            .build()
        }

        val exchange = response.exchange
        val route = exchange?.connection()?.route()
        //Calculate the response to receive the HTTP request of [userResponse]. This will add authentication headers, follow redirection or process client request timeouts. If subsequent operations are not necessary or applicable, null is returned.
        val followUp = followUpRequest(response, route)

        if (followUp == null) {
            if (exchange != null && exchange.isDuplex) {
                transmitter.timeoutEarlyExit()
            }
            return response
        }

        val followUpBody = followUp.body
        //isOneShot: If this principal needs to call [writeTo] at most once and can be transmitted at most once, it returns true and false by default.
        if (followUpBody != null && followUpBody.isOneShot()) {
            return response
        }

        response.body?.closeQuietly()
        if (transmitter.hasExchange()) {
            exchange?.detachWithViolence()
        }

        if (++followUpCount > MAX_FOLLOW_UPS) {
            throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
    }
}
BridgeInterceptor.intercept

A bridge from application code to network code. First, it builds network requests from user requests. Then it continues to call the network. Finally, it constructs the user response from the network response.

//This method mainly deals with the header of the message. The request is then handed over to the next interceptor in the interceptor chain.
//Set content length and content encoding mode.
//Set up gzip compression, decompress after receiving content, reprocess content length, content encoding, content type.
//Add Cookie, Host, Connection and other message headers.
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    //Get a request
    val userRequest = chain.request()
    //Store request content
    val requestBuilder = userRequest.newBuilder()
	//Requestor
    val body = userRequest.body
    //Processing the request header part.
    if (body != null) {
        //The type of content in the body of the request.
        val contentType = body.contentType()
        //Add a request header to indicate the type of request body.
        if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString())
        }
		//Entity body size, content coding is not accurate.
        val contentLength = body.contentLength()
        
        if (contentLength != -1L) {
            requestBuilder.header("Content-Length", contentLength.toString())
        	//The first part stipulates the encoding mode of entity subject.
            requestBuilder.removeHeader("Transfer-Encoding")
        } else {
            requestBuilder.header("Transfer-Encoding", "chunked")
            requestBuilder.removeHeader("Content-Length")
        }
    }
	//If the request header does not request the host, the Host request header is set according to the url.
    if (userRequest.header("Host") == null) {
        requestBuilder.header("Host", userRequest.url.toHostHeader())
    }
	//Default persistent connection.
    if (userRequest.header("Connection") == null) {
        requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing the transfer stream.
    //If we add an "Accept-Encoding: gzip" header field, we are also responsible for decompressing/transporting the stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
        transparentGzip = true
        requestBuilder.header("Accept-Encoding", "gzip")
    }
	//Add Cookie
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
        requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
        requestBuilder.header("User-Agent", userAgent)
    }
	//Execute the proceed method of the next interceptor chain.
    val networkResponse = chain.proceed(requestBuilder.build())
	//Set the response head.
    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
	//Set the request. So that the application layer knows what it sends out and gets.
    val responseBuilder = networkResponse.newBuilder()
    .request(userRequest)
	//Decompress the responder.
    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
        val responseBody = networkResponse.body
        //Content-Length is invalid when the response volume is encoded, so it needs to be removed. And set the response body type.
        if (responseBody != null) {
            val gzipSource = GzipSource(responseBody.source())
            val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
            responseBuilder.headers(strippedHeaders)
            val contentType = networkResponse.header("Content-Type")
            responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
        }
    }

    return responseBuilder.build()
}
CacheInterceptor.intercept

Return the cache when there is a cache;

Update the cache when the content is updated;

Cache failure is deleted.

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    //Check to see if there is a response cache for this Request.
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()
	//Use [cacheResponse] to return a policy that satisfies [request]
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)

    if (cacheCandidate != null && cacheResponse == null) {
        // The cache candidate wasn't applicable. Close it.
        //Cache candidates are not applicable. Close it.
        cacheCandidate.body?.closeQuietly()
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    //If we are prohibited from using the network and have insufficient caching, we fail.
    if (networkRequest == null && cacheResponse == null) {
        return Response.Builder()
        .request(chain.request())
        //HTTP1.1
        .protocol(Protocol.HTTP_1_1)
        //504
        .code(HTTP_GATEWAY_TIMEOUT)
        .message("Unsatisfiable Request (only-if-cached)")
        //Empty responder
        .body(EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    }

    // If we don't need the network, we're done.
    //If there is a cache and the network cannot be used, the cache is used
    if (networkRequest == null) {
        return cacheResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build()
    }

    var networkResponse: Response? = null
    try {
        //proceed method to execute the next interceptor chain
        networkResponse = chain.proceed(networkRequest)
    } finally {
        // If we're crashing on I/O or otherwise, don't leak the cache body.
        //If we crash in I/O or other areas, please do not leak the main body of the cache.
        if (networkResponse == null && cacheCandidate != null) {
            cacheCandidate.body?.closeQuietly()
        }
    }

    // If we have a cache response too, then we're doing a conditional get.
    //If we also have cached responses, then we are doing conditional acquisition.
    if (cacheResponse != null) {
        //304, resources unchanged, caching can be used
        if (networkResponse?.code == HTTP_NOT_MODIFIED) {
            val response = cacheResponse.newBuilder()
            //Response headers combined with caching and network requests
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            //Delivery time
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            //Receiving time
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            //Cache response
            .cacheResponse(stripBody(cacheResponse))
            //Network Request Response
            .networkResponse(stripBody(networkResponse))
            .build()
			//Close the network request responder
            networkResponse.body!!.close()

            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            //After merging the header, but before stripping the Content-Encoding header
            //Update the cache (executed by initContentStream ().
            cache!!.trackConditionalCacheHit()
            cache.update(cacheResponse, response)
            return response
        } else {//If the cache is invalid, it is closed
            cacheResponse.body?.closeQuietly()
        }
    }
	//Recreate Response
    val response = networkResponse!!.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build()
	//cache defaults to null when created
    if (cache != null) {
        if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            //Provide this request to the cache.
            val cacheRequest = cache.put(response)
            return cacheWritingResponse(cacheRequest, response)
        }

        if (HttpMethod.invalidatesCache(networkRequest.method)) {
            try {
                cache.remove(networkRequest)
            } catch (_: IOException) {
                // The cache cannot be written.
            }
        }
    }

    return response
}
ConnectInterceptor.intercept

Open the connection to the target server and continue to execute the next interceptor.

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    //We need the Internet to meet this requirement. It may be used to validate the condition GET.
    val doExtensiveHealthChecks = request.method != "GET"
    //val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    //Key code, creating ExchangeCodec instance
    //Find an available connection
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
}
CallServerInterceptor.intercept
  1. Write request header information
  2. Write to the body of the request, not the GET or HEAD method
  3. Closing the request
  4. Getting response information
//Interceptor intercept is used to explain interceptor interception.
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    //Get the interceptor chain.
    val realChain = chain as RealInterceptorChain
    //Exchange classes are used to transfer a single HTTP request and response pair. This layers connection management and events on [Exchange Codec] to handle actual I/O.
    val exchange = realChain.exchange()
    //Get the request to be sent
    val request = realChain.request()
    //Get the body of the request to be sent
    val requestBody = request.body
    //Sending request event is the current time of the system
    val sentRequestMillis = System.currentTimeMillis()
	//Write the request header for the current request
    exchange.writeRequestHeaders(request)

    var responseHeadersStarted = false
    var responseBuilder: Response.Builder? = null
    //Write to the requester if it has a requester and is not a GET request or a HEAD request
    //fun permitsRequestBody(method: String): Boolean = !(method == "GET" || method == "HEAD")
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
        // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
        // Continue" response before transmitting the request body. If we don't get that, return
        // what we did get (such as a 4xx response) without ever transmitting the request body.
        //If the value of the request header "Expect" is "100-continue", the request header is used to inform the server that a particular behavior is expected.
        if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
            //Write the request through HttpConnection. Actually, the flush method of BufferedSink is called.
            exchange.flushRequest()
            responseHeadersStarted = true
            //EventListener notification starts returning the response header, which is actually empty.
            exchange.responseHeadersStart()
            //Read the response header.
            responseBuilder = exchange.readResponseHeaders(true)
        }
        if (responseBuilder == null) {
            if (requestBody.isDuplex()) {//This method returns false by default
                // Prepare a duplex body so that the application can send a request body later.
                //Prepare the duplex so that the application can send the request body later.
                exchange.flushRequest()
                val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
                requestBody.writeTo(bufferedRequestBody)
            } else {
                // Write the request body if the "Expect: 100-continue" expectation was met.
                //If you meet Expect: 100-continue expectations, write to the request body.
                val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
                requestBody.writeTo(bufferedRequestBody)
                bufferedRequestBody.close()
            }
        } else {
            exchange.noRequestBody()
            if (!exchange.connection()!!.isMultiplexed) {
                // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
                // from being reused. Otherwise we're still obligated to transmit the request body to
                // leave the connection in a consistent state.
                //Reuse of HTTP/1 connections is prevented if Expect: 100-continue expectations are not met.
                //Otherwise, we still have an obligation to send the request body to make the connection consistent.
                exchange.noNewExchangesOnConnection()
            }
        }
    } else {//GET requests or HEAD requests to obtain a body without the requester, do not write
        exchange.noRequestBody()
    }
	//The default value of requestBody.isDuplex() is false, which means that this method is executed by default.
    if (requestBody == null || !requestBody.isDuplex()) {
        //Notification request terminated
        exchange.finishRequest()
    }
    //When the value of "Expect" in the request header is "100-continue", the value is true and the default is false.
    if (!responseHeadersStarted) {
        exchange.responseHeadersStart()
    }
    //If the reading fails or fails, the reader reads again
    if (responseBuilder == null) {
        responseBuilder = exchange.readResponseHeaders(false)!!
    }
    //Get the response
    var response = responseBuilder
    .request(request)
    .handshake(exchange.connection()!!.handshake())
    .sentRequestAtMillis(sentRequestMillis)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build()
    var code = response.code
    if (code == 100) {
        // server sent a 100-continue even though we did not request one.
        // try again to read the actual response
        response = exchange.readResponseHeaders(false)!!
        .request(request)
        .handshake(exchange.connection()!!.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
        code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
        // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
        response.newBuilder()
        .body(EMPTY_RESPONSE)
        .build()
    } else {
        response.newBuilder()
        .body(exchange.openResponseBody(response))
        .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
        exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
        throw ProtocolException(
            "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
}

Keywords: network OkHttp encoding DNS

Added by imstupid on Mon, 12 Aug 2019 12:32:45 +0300