Sorting out kotlin collaborative process knowledge points

preface

The term coroutines was put forward in 1958. Coprocessing was invented and used to build assemblers, which shows that coprocessing is a programming idea and is not limited to a specific language. The author simply passed through the document and recorded some things. There are some things that are not commonly used that have not been studied. I will add later. The article is a little long and may be a little dry. First, I put on my favorite panda picture.

Supplement 1: Why are there so many processes in the task manager of my 8-core 16 thread processor?

Take AMD3700x 8-core 16 thread processor as an example. Strictly speaking, it has only 8 cores. At the same time, there should be only 8 processes and 16 threads running. But in fact, whether it's a process or a thread, it's far more than that. Why? Because now the hardware performance is very good, the process switching is very fast, and it gives people a macro feeling that multiple programs are running at the same time, so there are so many processes and process trees in the task manager.

Supplement 2: parallelism and concurrency

Concurrency: in the operating system, several programs run on the same CPU at a certain time, but only one program runs on the CPU at any point in time.
Parallel: when the operating system has multiple CPUs, the operation of threads may be non concurrent. When one CPU executes one thread, the other CPU can execute another thread. The two threads do not preempt CPU resources and can be carried out at the same time. This method becomes parallel.

Concurrency tells us that it has the ability to handle multiple tasks, but it does not require simultaneous processing. Parallelism tells us that we have the ability to handle multiple tasks at the same time. So the focus is on the same time.

Supplement 3: processes and threads

Process is the smallest unit of resource allocation. Thread is the smallest unit in the process of program execution.
In fact, the operating system has its own set of process management methods, because it needs to be used by people. All of them have a kernel state and user state. Have you found one thing? Some processes in the task manager can't be turned off or the direct black screen can't be turned off. Yes, users should pay close attention to your user threads and don't move the system if there's nothing else.

Supplement 4: the efficiency of threads is so high. Why do you need to cooperate

Coprocess is a lightweight thread in user mode. Compared with processes, threads occupy less memory and consume less resources during switching. The essence of collaborative process is a single thread, which performs generally in CPU intensive applications, but now the hardware performance is getting better and better, and the collaborative process performs well in most applications.

Important concepts in collaborative process

CoroutineScope

The CoroutineContext with scope inside is the scope of the collaboration. The use of a collaboration cannot be separated from its scope. All collaboration builders are declared as extensions on it.

Constructor

ContextScope(context: CoroutineContext)

Examples

//MainScope is a factory function
private val mainScope = MainScope()

CoroutineContext

CoroutineContext is used as the coroutine context. A collaboration context is a collection of various elements, and its main element is a Job in the collaboration. Sometimes we need to define multiple elements in the context of a collaboration. We can use the + operator to implement it (it's an operator on the surface, but it's actually an operator function 😏).

Example:

//Specify scheduler and name
launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

//The job object is created and the scheduler is specified
ContextScope(SupervisorJob() + Dispatchers.Main)

//Empty coroutine context
EmptyCoroutineContext

CoroutineDispatcher

Collaborative scheduler. All collaboration builders such as launch and async receive an optional CoroutineContext parameter, which can be used to explicitly specify a scheduler for a new collaboration or other context elements. If not specified, the context (and scheduler) is inherited from the CoroutineScope that started it.

The following schedulers are often used when using coprocessors within the scope

  • Dispatchers.Default
  • Dispatchers.Main
  • Dispatchers.Unconfined
  • Dispatchers.IO

Examples

//ExecutorCoroutineDispatcher used
launch(newSingleThreadContext("MyOwnThread")) { // Will get it a new thread
    println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}

withContext(Dispatchers.IO){
    
}

closure

Closures have been supported since Java 8. There is a syntax sugar in kotlin: when the last parameter of a function is a lambda expression, you can write lambda outside parentheses.
Examples

//Create a Thread, which is generally written like this
Thread(object : Runnable{
    override fun run() {
        
    }
})

//If the conditions are met, it is transformed into lambda
Thread({    })

//Simplify it to
Thread {    }

In a structure like Thread {...}, {} is a closure.

Threads in Java

Threads in Java

  1. If you want to know that the thread execution is over, use FutureTask;
  2. If you want a thread to release resources into a blocking state and wait for another thread to finish executing, you need to use the join method;
  3. For multiple thread management, the thread pool Executor should be used.
  4. Android encapsulates Handler for inter thread communication. In addition, there is the outdated AsyTask.

For more information about threads and locks in Java, see Basic knowledge of Java Thread and lock section in.

What is a collaborative process?

First, a coroutine is an object, a function modified by the suspend keyword. Check the source code of withContext and other functions.
The function blocks decorated by the suspend keyword can only be called in the scope of the association or any other function block decorated by the suspend keyword. One of the most powerful aspects of collaboration is the "non blocking" hang, including the automatic scheduling of threads, which we will talk about later. First understand that this object is a function.

What is a hang?

When the code block closed in a launch, async or withContext function is executed to a suspend function, the coroutine will be "suspended", that is, suspended. In other words, it is detached from the thread executing him. It will not block the current thread.

//doSomeThing and doOtherThing are functions decorated with suspend, and time-consuming operations are carried out internally
coroutineScope.launch() {//The current thread name is main
            launch { doSomeThing() }  //👈  Executing doSomeThing does not block the main thread
            launch { doOtherThing() } //👈  Executing doOtherThing does not block the main thread
            doXXThing() //Normal execution
        }
  • Suspension cannot be discussed in the context of multithreading.
  • The suspended object is a coroutine.
  • The source of the suspension is from the current thread.
  • Resume after suspend. And suspending does not block the current thread.

What is hanging: a thread scheduling operation that will be automatically cut back later.

Non blocking hanging

"Non blocking hang" is a very powerful part of the collaboration process. Blocking is for a single thread, because hanging involves multiple threads. Once the thread is cut, it must be non blocking. After the thread is cut, the original thread is free and there are no obstacles. You can continue to do other things.

Code is essentially blocking, and only time-consuming code will lead to human perceptible waiting. For example, a network request is more reflected in "waiting". How many threads you cut can't reduce the time it takes. This has nothing to do with half a dime of the collaborative process. The collaborative process just helps us cut threads, so it looks like writing asynchronous code in a synchronous way.

preparation

Project directory build Add information under gradle

buildscript {
    ext.kotlin_version = "1.4.31"
     dependencies {
         classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
     }
}

Build. Under project moudle Gradle add info

dependencies {
    // Collaborative process base
    api "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3"
    api "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.3"
}

Start a process

To use a coroutine, there must be a scope, and the scope must have a context.

Top level functions in any class use

The top-level function is similar to the global static function in java.

/**
* The scenario used for unit testing is not applicable in business development because it is thread blocked
**/
runBlocking {
    //todo
}

Using global GlobalScope

/**
* The thread will not be blocked, but it is not recommended. The life cycle of this scope is consistent with the application and cannot be cancelled actively
**/
GlobalScope.launch { // Start a new collaboration in the background and continue
    //delay is a special suspend function, which will not cause thread blocking, but will suspend the coroutine and can only be used in the coroutine
    delay(1000L) // Non blocking wait for 1 second (default time unit is milliseconds)
    print("World!") // Print out after delay
}
print("Hello,") // The main thread continues while the coroutine is waiting
Thread.sleep(2000L) // Block the main thread for 2 seconds to keep the JVM alive
println("My Name is Tom!") 

The print result of this example is: Hello,World!My Name is Tom!

Some official architectures encapsulate the support for collaborative processes

Here is a brief mention, such as lifecycle. In ViewModel Coroutinescope also has lifecycle in the active and fragmented pages coroutineScope.

Create a CoroutineScope yourself

val coroutineScope = CoroutineScope(SupervisorJob()+Dispatchers.Main)
coroutineScope.launch {
    //Do something the main thread does
     withContext(Dispatchers.IO) {
         //Switch to IO thread
     }
     //Will automatically cut back
}

Common open covariance function

In the scope, the common methods are async, launch, withcontext and actor.

Job

An interface is the focus of the collaborative process.

  • Some values in Job: isActive, isCompleted, isCancelled and children.
  • Some methods in Job: start, cancel, join, invokeOnCompletion, plus.

Deferred

Inherited from Job.

  • Deferred median: onAwait
  • Some methods in Deferred: await, getCompleted, getCompletionExceptionOrNull

Basic usage of async

Common concurrency

val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")

Lazy start async

//Only when we call start mode or idle mode, we can get the start mode
//DEFAULT mode
val one=async (start = CoroutineStart.LAZY){
    
}
one.start()


val one = async { doSomethingUsefulOne() }
one.invokeOnCompletion{
//If the specified startup method is lazy, there will be no callback here, which proves that it has not been started at all    
}

characteristic

  • Async is used to propagate exceptions. The unhandled exception in async {...} will cause all the processes started in the scope to be cancelled. It is based on the scope. Whether it is the waiting parent process or another child process, it will be affected as long as it is in one scope.
  • async returns more coroutines than launch, which implements the methods in the deferred interface (deferred inherits from Job). Deferred means delay.
  • To handle exceptions, you can pass in a custom CoroutineExceptionHandler during initialization, and then wrap the code with try catch or runCatching.

Basic usage of launch

Many usages can refer to async

What's a little bad is that it's not very tidy when you need to switch threads frequently

//In order to eliminate the nesting of this kind of concurrent code in collaboration, you can use withContext
coroutineScope.launch(Dispachers.IO) {
    ...
   launch(Dispachers.Main){
        ...
        launch(Dispachers.IO) {
            ...
            launch(Dispacher.Main) {
                ...
            }
        }
    }
}

characteristic

  • It is executed by default without specifying the start option, so there is no need to call the Job#start() or join() method manually.
  • By default, uncapped exceptions in this collaboration will cancel the parent job in the context (unless CoroutineExceptionHandler is explicitly specified), which means that any uncapped exception will cause the cancellation of the parent collaboration when launch is used with the context of another collaboration.

Supplement 1: CoroutineStart is the startup option of collaborative process

  • DEFAULT immediately execute the coordination process according to the coordination process arrangement;
  • LAZY only lazily starts the collaborative process when necessary;
  • ATOMIC atomically (in a non cancellable manner) schedules a process according to the context of the process to execute the process;
  • UNDISPATCHED executes the coroutine immediately until it is the first hanging point in the current thread.

Basic usage of withContext

Eliminate nesting when using launch concurrent code in collaboration.

coroutineScope.launch(Dispachers.Main) {
    ...
    withContext(Dispachers.IO) {
        ...
    }
    ...
    withContext(Dispachers.IO) {
        ...
    }
    ...
}

characteristic

  • By default, the scheduler Dispatchers in the new context is used. If a new scheduler is specified, the execution of the block is transferred to different threads and back to the original scheduler when it is completed.

Example analysis 1

//doSomeThing and doOtherThing are functions decorated with suspend and perform time-consuming operations
coroutineScope.launch(Dispatchers.Main) {
            Log.e("launch","I'm working in thread ${Thread.currentThread().name}")
            doSomeThing()
            doOtherThing()
            Log.e("finally","come here")
        }
        
coroutineScope.launch(Dispatchers.Main) {
            Log.e("launch","I'm working in thread ${Thread.currentThread().name}")
            withContext(Dispatchers.IO) {
                doSomeThing()
            }
            withContext(Dispatchers.IO) {
                doOtherThing()
            }
            Log.e("finally","come here")
        }

coroutineScope.launch(Dispatchers.Main) {
            Log.e("launch","I'm working in thread ${Thread.currentThread().name}")
            withContext(Dispatchers.IO) {
                doSomeThing()
            }
            doOtherThing()
            Log.e("finally","come here")
        }

The print result is

launch: I'm working in thread main
doSomeThing: I'm working in thread DefaultDispatcher-worker-1
doOtherThing: I'm working in thread main
finally: come here

Result analysis: only after doSomeThing returns the result can we continue to execute dosotherthing. Only after dosotherthing returns the result can we print "finally: come here".

launch starts a coroutine s, which runs in the main thread mian, and the result is the same whether you call the function directly or wrap it with withcontext. It looks like writing asynchronous code in a synchronous way.

Example analysis 2

Change it a little

coroutineScope.launch(Dispatchers.Main) {
            Log.e("launch","I'm working in thread ${Thread.currentThread().name}")
            launch { doSomeThing() }
            launch { doOtherThing() }
            Log.e("finally","come here")
        }

The print result is

launch: I'm working in thread main
finally: come here
doSomeThing: I'm working in thread main
doOtherThing: I'm working in thread main

The results show that doSomeThing and doOtherThing are executed asynchronously. It is completely different from the synchronization in example analysis 1.

Support of official structure for collaborative process

Android provides first-class support for collaboration scope in all entities with life cycle, such as ViewModel, lifecycle, livedata, room, etc.

We know that the scope is related to the life cycle. Here are some examples to illustrate the use of coprocedures in different scenarios

  • In ViewModel

A labeled scope is used. The context uses:

//+Is the operator function plus in the CoroutineContext interface
SupervisorJob() + Dispatchers.Main.immediate

With a scope, you can use a coroutine.

viewModelScope.launch {
   
}

Analysis: the implementation of viewModelScope is very simple, with dispatchers Main's SupervisorJob, when ViewModel When clear(), call job Cancel(), because of structural concurrency, all collaborations started within the scope of viewModelScope will be cascaded and cancelled

  • In Activity and Fragment

It is not recommended to use GlobalScope to start the collaboration process. Its life cycle is consistent with the app and cannot be cancelled.
It is recommended to use viewlifecycle owner Lifecyclescope or lifecycle coroutineScope.

lifecycle.coroutineScope.launch {}
lifecycle.coroutineScope.launchWhenCreated {  }
lifecycle.coroutineScope.launchWhenResumed {  }
lifecycle.coroutineScope.launchWhenStarted {  }

viewLifecycleOwner.lifecycleScope.launch {}
viewLifecycleOwner.lifecycleScope.launchWhenCreated {  }
viewLifecycleOwner.lifecycleScope.launchWhenResumed {  }
viewLifecycleOwner.lifecycleScope.launchWhenStarted {  }

In Activity or Fragment, we sometimes need to wait until a certain life cycle method, or at least after a certain life cycle method, to execute a task. For example, the page state must be at least STARTED before FragmentTransaction can be executed. For this demand, the Lifecycle library provides support for the process: launchWhenCreated, launchWhenResumed, launchWhenStarted. When the page status is not satisfied, the collaboration will hang.

Note: if the lifecycle method is called repeatedly, the coroutine will not be restarted. For example, jump from active page a to active page B and return to A. at this time, the life cycle of a is onrestart - > OnStart - > onresume. At this time, the function inside launchWhenStarted will not be called.

Another point is that we mentioned above that some resources can be released inside the onCleared method of ViewModel. In the active page and fragment, although it can be in onDestroy, after knowing that the page will be destroyed and the collaboration will be automatically cancelled, we can write part of it in the collaboration. The example is as follows

 lifecycle.coroutineScope.launchWhenResumed {
     try {
         //do something
         //maybe suspend function
         delay(10000L)//Simulation time
     }finally {
         //The page was destroyed before the tasks in the cooperation process were finished. I will come here
         //Normally, it will come here, but the current state is generally not state DESTROYED
         when(lifecycle.currentState==Lifecycle.State.DESTROYED){
             //do something
         }
     }
 }
  • In LiveData

val user: LiveData<User> = liveData (timeoutInMs = 5000) {
    val data = database.loadUser() // loadUser is a pending function
    emit(data)
}

Here, liveData is a builder function. In the builder code block, it calls the suspend function loadUser() and then emits the loaded data through emit
It should be noted that when the status of LiveData becomes active (i.e. someone subscribes to observe it), the loading action will be actually executed. When it becomes inactive and idle for timeout INMS milliseconds, it will be automatically cancelled If you cancel it before you finish, LiveData will restart if it becomes active again. If it completes successfully in a previous run, it will not restart. Please note that it will not restart until it is automatically cancelled. If the block is canceled for any other reason (for example, a CancelationException is thrown), it will not restart.

At any time, if you want to emit new values, you can also use emitSource(source: LiveData) to emit multiple values. Here, the source of its values has changed Calling emit() or emitSource() removes the previously added source

class UserDao: Dao {
    @Query("SELECT * FROM User WHERE id = :id")
    fun getUser(id: String): LiveData<User>
}
 
class MyRepository {
    fun getUser(id: String) = liveData<User> {
        val disposable = emitSource( // The database is used here as the source
            userDao.getUser(id).map {
                Result.loading(it)
            }
        )
        try {
            val user = webservice.fetchUser(id)
            // Stop the previous emission to avoid dispatching the updated user
            // as `loading`.
            disposable.dispose()
            // Update the database.
            userDao.insert(user)
            // Re-establish the emission with success type.
            emitSource( // Reuse the database as the source, because it was dispose d before
                userDao.getUser(id).map {
                    Result.success(it)
                }
            )
        } catch(exception: IOException) {
            // Any call to `emit` disposes the previous one automatically so we don't
            // need to dispose it here as we didn't get an updated value.
            emitSource(
                userDao.getUser(id).map {
                    Result.error(exception, it)
                }
            )
        }
    }
}
  • In Room

Room version 2.1 supports collaborative processes from the beginning

@Dao
interface UsersDao {
 @Query("SELECT * FROM users")
 suspend fun getUsers(): List<User>
 
 @Query("UPDATE users SET age = age + 1 WHERE userId = :userId")
 suspend fun incrementUserAge(userId: String)
 
 @Insert
 suspend fun insertUser(user: User)
 
 @Update
 suspend fun updateUser(user: User)
 
 @Delete
 suspend fun deleteUser(user: User)
}

Use record of Flow

Recall that a collaboration scope has both launch and async methods. The former returns a Job object and the latter returns a Deferred object. Then lauch does not propagate exceptions, that is, exceptions will not affect the code operation, while async propagates exceptions. If an exception is thrown by a process in the scope and not handled, the whole process in the scope will be cancelled. Therefore, it is often handled by runCatching function or try catch.

Flow interface concept

Asynchronous data flow, which issues values sequentially and completes normally or with exceptions.
Intermediate operators in streams (such as map, filter, take, zip, etc.) are functions that apply to one or more upstream streams and return downstream streams. Other operators can be applied to them.
The intermediate operation does not execute any code in the stream and does not suspend the function itself. They only set up a series of operation chains for future execution and quick return. This is called the cold flow characteristic.
The terminal operator on the stream is either a pause function, such as collect, single, reduce, toList, etc., or a launchIn operator, which starts to collect the stream within a given range. They will be applied to upstream processes and trigger the execution of all operations. The execution of a stream is also called a collection stream and is always suspended without actual blocking. The terminal operator performs normal or exceptional operations based on the success or failure of all flow operations in the upstream.
The most basic terminal operator is collect, for example:

try {
    flow.collect { value ->
        println("Received $value")
    }
} catch (e: Exception) {
    println("The flow has thrown an exception: $e")
}

By default, the flow is sequential, and all flow operations are executed sequentially in the same process, except for some operations specifically used to introduce concurrency into the flow execution (such as buffer and flatMapMerge).

Usage scenario

Seeing the operator, I thought of rxjava for the first time. What about the usage scenario of flow? It feels the same.

  • If the data source will continue to send values, it will be convenient to use flow.
  • You need to process the data source and then return the results we need. You can also use flow.
//Send a new weather value every 2 seconds
suspend fun fetchWeatherFlow(): Flow<String> = flow {
    var counter = 0
    while(true) {
        counter++
        delay(2000)
        emit(weatherConditions[counter % weatherConditions.size])
    }
}

//Calling a method within the scope of a collaboration
fetchWeatherFlow().collect{
   // Finally, back to collect, by the way, the sample code does not need to catch exceptions, but generally it does
   // runCatching {/ / code that needs to catch exceptions} onSuccess {/ / processing} onFailure {/ / processing}
}

Synchronization between processes

Mutex is required for synchronization of processes

val mutex = Mutex()

suspend fun doSomething(i: Int) {
    mutex.withLock {
        println("#$i enter critical section.")

        // do something
        delay(1000) // <- The 'delay' suspension point is inside a critical section

        println("#$i exit critical section.")
    }
}

fun main() = runBlocking {
    repeat(2) { i ->
        launch(Dispatchers.Default) {
            println("#$i thread name: ${Thread.currentThread().name}")
            doSomething(i)
        }
    }
}

Tip: don't take it for granted to add the @ Synchronized keyword to the suspended function. Do you think it's useful? I can't sync at all. If you don't believe it, practice it.

thank

Fang Zhuo's Kotlin Coroutine series

kotlin language Chinese station

School starts on Mr. hencoder's yard

Android KTX official documents

Keywords: kotlin

Added by cdwhalley.com on Fri, 11 Feb 2022 08:24:01 +0200