Kotlin | process manual (continuously updated)

As a component of the core of Kotlin, the cost of starting the Kotlin collaboration process is not high. I have reviewed the following demo s with reference to the examples on the official website. Kotlin Chinese website.

Flow among them, you can spend more time, which is still very interesting.

Start a collaborative process

fun main() {

    GlobalScope.launch {
        println(123)
    }
    Thread.sleep(10)
}

In blocking mode, wait for the execution of the cooperation process before executing the follow-up

    GlobalScope.launch {
        delay(1000)
        println(123)

    }.join()
    println(1231)

delay

Special suspend function, which will not cause function blocking, but will suspend the coroutine

Collaboration scope builder

runBlocking

Will block the current thread until the collaboration ends. Generally used for testing

 runBlocking {
                    launch(Dispatchers.IO) {
                        Log.e("demo", Thread.currentThread().name)
                        delay(10000)
                        println("!23")
                    }
                }

coroutineScope

Just hanging will release the underlying thread for other purposes and will not block the thread. So we call it the suspend function

coroutineScope {
                    launch(Dispatchers.IO) {
                        Log.e("demo", Thread.currentThread().name)
                        delay(10000)
                        println("!23")
                    }
                }

Structured concurrency

Although the collaboration process is very simple to use, when we use globalscope When we start a new process, we will forget that it is a top-level process, but we will continue to use this method when we start a new process. Therefore, in practical application, we prefer to start the collaboration within the specified scope where the operation is executed, rather than using it arbitrarily

Cancellation and timeout of collaboration

cancelAndJoin

Cancel a collaboration and wait for the end

   runBlocking {
        val startTime = System.currentTimeMillis()
        val job = launch(Dispatchers.Default) {
            var nextPrintTime = startTime
            var i = 0
            while (i < 5) { // A loop that performs calculations just to occupy the CPU
                // Print messages twice per second
                if (System.currentTimeMillis() >= nextPrintTime) {
                    println("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            }
        }
        delay(1300L) // Wait for a while
        println("main: I'm tired of waiting!")
        job.cancelAndJoin() // Cancel a job and wait for it to finish
        println("main: Now I can quit.")
    }

withTimeout

Throw an exception after timeout

//Timeout throw exception
withTimeout(1300L) {
    delay(1400)
}

withTimeoutOrNull

Throw null pointer after timeout

//Timeout throw exception
        withTimeoutOrNull(1300L){
            delay(1400)
        }

Release resources in finally

coroutineScope {
    val a = launch {
        try {
            repeat(1000) { i ->
                println("Daily printing")
                delay(500)
            }
        } finally {
            println("Recycling resources")
        }
    }
    delay(1000)	//Delay for some time
    println("Delayed end")
    a.cancelAndJoin()   //Cancel a job and wait for it to finish
}

Suspend the coroutine again in finally

In our practical application, it may be necessary to suspend a cancelled collaboration in finally, so the corresponding code can be wrapped in **withContext(NoCancellable) * *

coroutineScope {
        val a = launch {
            try {
                repeat(1000) { i ->
                    println("Daily printing")
                    delay(500)
                }
            } finally {
                withContext(NonCancellable){
                    println("Suspend a canceled collaboration")
                    delay(1000)
                    println("Pending cancellation")
                }
            }
        }
        delay(1000)
        println("Delayed end")
        a.cancelAndJoin()   //Cancel a job and wait for it to finish
    }

Timeout throw exception

Set the timeout and throw an exception if it exceeds the expected time. Can be used for countdown, etc

coroutineScope {
    try {
        withTimeout(1000){
            println("Over 2000 ms Fail")
            delay(2000)
        }
    }catch (e:TimeoutCancellationException){
        println(e.message)
        println("Okay, okay, I see")
    }
}
Over 2000 ms Fail
Timed out waiting for 1000 ms
 Okay, okay, I see

Timeout throw null pointer

In some cases, you may not want to throw an exception directly, you can let it throw a null pointer

   coroutineScope {
        val time = withTimeoutOrNull(1000) {
            println("Over 2000 ms Fail")
            delay(2000)
        }
        println(time) //null
    }
Over 2000 ms Fail
null

Combining pending functions

Call pending functions in default order

measureTimeMillis

suspend fun main() {
    val time = measureTimeMillis {
        playGame()
        playPP()
    }
    println("Yes $time ms")
}

suspend fun playGame() {
    delay(1000)
    println("Beat beans for 1 second")
}

suspend fun playPP() {
    delay(1000)
    println("Hit the ass for 1 second")
}
Beat beans for 1 second
 Hit the ass for 1 second
 After 2031 ms

async

Concurrent execution

In the above example, we execute in order, but in our actual development, we mostly want to execute in parallel. We can implement it with the help of async.

be careful

Conceptually, async It's similar to launch . It starts a separate coroutine, which is a lightweight thread and works concurrently with all other coroutines. The difference is that launch returns a Job And without any result value, async returns one Deferred ——A lightweight non blocking future, which represents a promise that will provide results later. You can use it await() gets its final result on a Deferred value, but Deferred is also a Job, so you can cancel it if necessary.

suspend fun main() {
    measureTimeMillis {
        coroutineScope {
            async { playGame() }
            async { playPP() }
        }
    }.let {
        println("Time spent+ $it")
    }
}

suspend fun playGame() {
    delay(1000)
    println("Beat beans for 1 second")
}

suspend fun playPP() {
    delay(1000)
    println("Hit the ass for 1 second")
}
Beat beans for 1 second
 Hit the ass for 1 second
 Time spent+ 1084

Lazy start async

We can implement the lazy mode by changing the properties of async. In this mode, only through await or async's return value job Start to start Note: if await is called directly, the result will be sequential execution

suspend fun main() {

    measureTimeMillis {
        coroutineScope {
            val game = async(start = LAZY) { playGame() }
            val pp = async(start = LAZY) { playPP() }
            game.start()
            pp.start()
            println("Start complete")
        }
    }.let(::println)
    }

suspend fun playGame() {
    delay(1000)
    println("Beat beans for 1 second")
}

suspend fun playPP() {
    delay(1000)
    println("Hit the ass for 1 second")
}

async style functions

We can define asynchronous style functions to call playGame and playPP asynchronously, and use async coprocessor builder with an explicit GlobalScope reference

suspend fun main() {
  measureTimeMillis {
        val somethingPlayGame = somethingPlayGame()
        val somethingPlayPP = somethingPlayPP()
        runBlocking {
            somethingPlayGame.await()
            somethingPlayPP.await()
        }
    }.let(::println)
}
fun somethingPlayGame() = GlobalScope.async {
    playGame()
}

fun somethingPlayPP() = GlobalScope.async {
    playPP()
}

suspend fun playGame() {
    delay(1000)
    println("Beat beans for 1 second")
}

suspend fun playPP() {
    delay(1000)
    println("Hit the ass for 1 second")
}
Hit the ass for 1 second
 Beat beans for 1 second
1085

Note: This is not recommended. If val somethingPlayGame = somethingPlayGame() and somethingplaygame Await() has a logic error. The program will throw an exception, but somethingPlayPP is still executed in the background. However, because of the former exception, all coroutines will be closed, so the somethingPlayPP operation will also be terminated

The case is as follows:

...

suspend fun playGame() {
    delay(500)
    throw ArithmeticException("error")
    println("Beat beans for 1 second")
}

...
Exception in thread "main" java.lang.ArithmeticException: error
	at com.xiecheng_demo.java.TestKt.playGame(Test.kt:46)
	....

Correct structured concurrency using async

Transfer according to the hierarchy of collaborative process

suspend fun main() {
    runBlocking {
        try {
            test()
        } catch (e: ArithmeticException) {
            println("main-${e.message}")
        }
    }
}

suspend fun test() = coroutineScope {
    async { playGame() }
    async { playPP() }
}


suspend fun playGame() {
    try {
        delay(1000)
        println("Beat beans for 1 second")
    } finally {
        println("I'm beating peas. In case of any abnormality...")
    }
}

suspend fun playPP() {
    delay(500)
    throw ArithmeticException("Throw exception")
}
I'm beating peas. In case of any abnormality...
main-Throw exception

Note: if one of the child processes fails, the first playGame and the waiting parent process will be cancelled

Collaboration context and scheduler

A coroutine context is a collection of various elements. In fact, coroutine context is a context that stores the information of a coroutine

Scheduler

coroutineContext contains dispatchers, with which we can limit the working threads of the collaboration.

They are as follows:

  • Dispatchers.Default thread
  • Dispatchers. IO thread
  • Dispatchers.Main main thread
  • Dispatchers.Unconfined is unlimited and will run directly on the current thread

Subprocess

When a process is started by other processes in CoroutineScope, it will pass through CoroutineScope Coroutinecontext inherits the context, and the new collaboration will become the child operation of the parent collaboration. When a parent collaboration is cancelled, it also means that all child collaboration will be cancelled.

However, if you use globalscope If launch starts a child collaboration, it will be independent of the scope of the parent collaboration and run independently.

    val a = GlobalScope.launch {
        GlobalScope.launch {
            println("use GlobalScope.launch start-up")
            delay(1000)
            println("GlobalScope.launch-Delayed end")
        }

        launch {
            println("use launch start-up")
            delay(1000)
            println("launch-Delayed end")
        }
    }
    delay(500)
    println("Cancel parent launch")
    a.cancel()
    delay(1000)
use GlobalScope.launch start-up
 use launch start-up
 Cancel parent launch
GlobalScope.launch-Delayed end

join

Use join to wait for all subprocesses to complete the task.

suspend fun main() {
    val a = GlobalScope.launch {
        GlobalScope.launch {
            println("use GlobalScope.launch start-up")
            delay(1000)
            println("GlobalScope.launch-Delayed end")
        }

        launch {
            println("use launch start-up")
            delay(1000)
            println("launch-Delayed end")
        }
    }
   a.join()
}

In the main function above, globalscope The cooperative process started by the launch will be executed immediately and independently. If the join is not used, the main may be executed instantly, so the effect cannot be seen. Use the join method to pause the collaboration where the main is located until globalscope Launch execution completed.

Specify the process name

Use the + operator to specify

   GlobalScope.launch(Dispatchers.Default+CoroutineName("test")){
        println(Thread.currentThread().name)
    }.join()

The jvm parameter - dkotlinx. DLL is used here coroutines. debug

How to configure jvm parameters: Android Studio,Intellij

Synergy scope

After we understand the above concepts, we begin to combine what we learned earlier. Define a global collaboration.

class Main4Activity : AppCompatActivity() {
    private val mainScope = CoroutineScope(Dispatchers.Default)
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main4)
      
        btn_start.setOnClickListener {
            mainScope.launch {
                //After a wave of operation
                launch(Dispatchers.Main) {
                    toast("one")
                }
            }
            Log.e("demo","123")
            mainScope.launch (Dispatchers.Main) {
                delay(3000)
                    toastCenter("two")
            }
        }

        btn_cancel.setOnClickListener {
            onDestrxx()
        }

    }

    fun onDestrxx() {
        mainScope.cancel()
    }
}

We declare a CoroutineScope object in the Activity, and then create some scopes, so that when we destroy the Activity, we can destroy them all.

For the purpose of saving code

Effect: after clicking btn1 and then btn2, only one toast will pop up, and the second toast will not pop up

Thread local data

Some local data can be transferred to the collaboration process through threadloadcl.

suspend fun main() {
    val threadLocal = ThreadLocal<String>()
    threadLocal.set("main")
    GlobalScope.launch(Dispatchers.Default+threadLocal.asContextElement(value = "123")) {
        println("thread-${Thread.currentThread().name},value=${threadLocal.get()}")
        println("thread-${Thread.currentThread().name},value=${threadLocal.get()}")
    }.join()
    println("thread-${Thread.currentThread().name},value=${threadLocal.get()}")
    delay(1000)
    println("thread-${Thread.currentThread().name},value=${threadLocal.get()}")
}
thread-DefaultDispatcher-worker-1 @coroutine#1,value=123
thread-DefaultDispatcher-worker-1 @coroutine#1,value=123
thread-main,value=main
thread-kotlinx.coroutines.DefaultExecutor,value=null

You may wonder * *, why after delay, threadload Get *? Please note that a suspend is added in front of the main function, and the main function is equivalent to the coprocessor body. When we directly call globalscope When launching, it runs directly and independently. At this time, the internal coroutineContext is passed manually. When we call delay, we directly suspend the coroutine. At this time, the coroutineContext in our main function is null by default, so get is null

Asynchronous flow

The suspend function can return a single value asynchronously, and how to return multiple calculated values is exactly where flow is used

Use list to represent multiple values

fun foo(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    foo().forEach { value -> println(value) } 
}
1
2
3

We can see that the corresponding values are returned together in an instant. What if we need them to return individually?

Use Sequence to represent multiple values

Using Sequence can achieve synchronous return data, but it also blocks the thread

fun main() {
    foo().forEach(::println)
}

fun foo():Sequence<Int> = sequence {
    println(System.currentTimeMillis())
    for (i in 1..3){
        Thread.sleep(300)
      	//yield generates a value and suspends waiting for the next call
        yield(i)
    }
    println(System.currentTimeMillis())

}
1578822422344
1  //->Interval 300ms
2		//->Interval 300ms
3		//->Interval 300ms
1578822423255

Suspend function

The function that uses the suspend flag is the suspended function. For the compiler, suspend is just a sign.

In our code above, suspend is something we often see.

Flow

Using list to return results means that we will return all values at once. Although using Sequence can return synchronously, if there are time-consuming operations, our threads will be blocked.

flow is the solution to the above disadvantages.

fun main() {
    runBlocking {
   			logThread()
        launch {
            logThread()
            for (i in 1..3) {
                println("lauch block+$i")
                delay(100)
            }
        }
        logThread()
        foo().collect { value ->
            println(value)
        }
    }
}

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun logThread() = println("Current thread----${Thread.currentThread().name}")
Current thread----main @coroutine#1
 Current thread----main @coroutine#1
 Current thread----main @coroutine#2  //lauch{} 
lauch block+1
1
lauch block+2
2
lauch block+3
3

Code in flow {} can be suspended Use emit() function Use the collect function to collect values. (it can be considered as startup)

Cancel Flow

Canceling a Flow is actually canceling the collaboration process. We can't cancel the Flow directly, but we can achieve the goal by canceling the collaboration process where the Flow is located.

Observing the above demo, if we print the thread in foo() method, we will find that the thread is the same as runBlocking, that is, foo() uses runBlocking context.

We changed the code as follows:

fun main(){
	...
	withTimeoutOrNull(200){
            foo().collect { value ->
                println(value)
            }
        }
  ...
}
...
Current thread----main @coroutine#1
 Current thread----main @coroutine#2
lauch block+1
1
lauch block+2
lauch block+3

Why is lauch still running? As we have said before, launch {} runs a coroutine independently and has nothing to do with the parent coroutine. Therefore, launch {} is not affected by cancellation at this time

Flow builder

flowOf

Defines a stream that emits a fixed set of values

flowOf("123","123").collect{
    value ->
    println(value)
}

asFlow

Used to convert various sets and sequences into Flow

(1..3).asFlow().collect{value-> println(value)}

Transitional flow operator

map

Data conversion using map

runBlocking {
        (1..3).asFlow()
            .map {
                delay(1000)
                "f-$it"
            }
            .collect { value -> println(value) }
    }

Conversion Operators

transform

Using transform, we can emit a string and track the response before executing an asynchronous request

    runBlocking {
        (1..3).asFlow()
            .transform {
                request ->
                emit("test-$request")
                //time-consuming operation 
                delay(500)
                emit(request)
            }
            .collect { value -> println(value) }
    }
test-1
1
test-2
2
test-3
3

Length limiting operator

When the flow touches the corresponding limit, its execution will be cancelled. The cancellation operation in the collaboration process is always executed by throwing an exception, so that all resource management function (try{},finally {} blocks will run normally when cancelled

take

Gets the number of launches of the specified number. When the upper limit is reached, the launch will be stopped

  runBlocking {
        (1..3).asFlow()
            .take(1)
            .collect { value -> println(value) }
    }
1  //There was only one result

End flow operator

toList

//toList
    runBlocking {
        (1..3).asFlow()
            .toList().let(::println)
    }
[1, 2, 3]

toSet

//toSet
    runBlocking {
        (1..3).asFlow()
            .toSet().let(::println)
    }
[1, 2, 3]

first

Reduce the flow to a single value. That is, only the first data is sent

 runBlocking {
        (1..3).asFlow()
            .first().let(::println)
    }
1

single

Reduce the flow to a single value. That is, only the first data is sent. The difference is that if more than one data is sent, an IllegalStateException will be thrown

 //single
    runBlocking {
        flowOf(1,2).single().let(::println)
      	//flowOf(1).single().let(::println)
    }
1 
Exception in thread "main" java.lang.IllegalStateException: Expected only one element

reduce

Data accumulation

runBlocking {
        (1..3).asFlow()
            .reduce { a, b -> a + b }.let(::println)
    }
6

fold

The convection is accumulated and the data is accumulated. Unlike reduce, fold can be assigned an initial value

runBlocking {
        (1..3).asFlow()
            .fold(10) {acc, i -> acc + i }.let(::println)
    }
16

The flow is continuous

In kotlin, flows are executed sequentially. Each transition operator from upstream to downstream processes each emitted value and then passes it to the end operator.

The simple understanding is to execute from top to bottom. The following operators will be executed only when the upstream conditions are met.

 (1..5).asFlow()
        .filter {
            println("filter=$it")
            it % 2 == 0
        }
        .map {
            println("map=$it")
            "map-$it"
        }
        .collect {
            println(
                "collect-$it"
            )
        }
filter=1
filter=2
map=2
collect-map-2
filter=3
filter=4
map=4
collect-map-4
filter=5

Examples of errors in Flow

In a workflow, withContext is usually used to switch the context (it is not accurate to simply understand the switching thread, because the context of the workflow contains a lot of data, such as value, which is usually used to switch threads). However, the code in the flow {} builder must follow the context saving attribute (that is, it is not allowed to change the context), And it is not allowed to transmit data from other contexts (it is not allowed to transmit from other launch {}).

But it can be changed through flowOn

Error case 1: change context

fun main() {
    runBlocking {
        flow {
            withContext(Dispatchers.Default) {
                for (i in 1..3) {
                    delay(100)
                    emit(i)
                }
            }

        }.collect().let(::println)
    }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@1f88ae32, BlockingEventLoop@32db9536],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@77a8c47e, DefaultDispatcher].
		Please refer to 'flow' documentation or use 'flowOn' instead
...

Error case 2: transmitting data from another context

fun main() {
    runBlocking {
        flow {
            launch {
                emit(123)
                emit(123)
                emit(123)
            }
            delay(100)

        }.collect {

            println(it)
        }
    }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Emission from another coroutine is detected.
		Child of "coroutine#2":StandaloneCoroutine{Active}@379619aa, expected child of "coroutine#1":BlockingCoroutine{Active}@cac736f.
		FlowCollector is not thread-safe and concurrent emissions are prohibited.
		To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
	at 
	...

flowOn

Used to change the context of stream emission.

fun main() {
    runBlocking {
        logThread()
        flow {
            for (i in 1..3) {
                logThread()
                emit(i)
                delay(10)
            }
        }.flowOn(Dispatchers.IO).collect {
            println(it)
        }
    }
}
Current thread----main @coroutine#1
 Current thread----DefaultDispatcher-worker-1 @coroutine#2
1
 Current thread----DefaultDispatcher-worker-1 @coroutine#2
2
 Current thread----DefaultDispatcher-worker-1 @coroutine#2
3

Here, we collect data in the main thread and transmit data in the IO thread. It also means that our collection and launch are in two processes at this time.

Buffer

The emission and collection of streams are usually performed in sequence. Through the above, we found that running different parts of the stream in different processes will greatly reduce the time. But now if we don't use flowOn, the time taken to emit a stream and collect a stream will add up.

For example, it takes 100ms to transmit a stream and 200ms to collect, so it takes at least 900ms to send three streams and collect+

fun main() {
    runBlocking {
        val start=System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
                delay(100)
            }
        }.collect {
            delay(300)
            println(it)
        }
        println("Spend time-${System.currentTimeMillis()-start}ms")
    }
}
1
2
3
 Spend time-1230ms

We can use the buffer operator on the stream to run data transmission and collection concurrently, rather than sequentially

The change code is as follows:

fun main() {
    runBlocking {
        val start = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                logThread()
                emit(i)
                delay(100)
            }
        }.buffer().collect {
            logThread()
            delay(300)
            println(it)
        }
        println("Spend time-${System.currentTimeMillis() - start}ms")
    }
}
Current thread----main @coroutine#2
 Current thread----main @coroutine#1
 Current thread----main @coroutine#2
 Current thread----main @coroutine#2
1
 Current thread----main @coroutine#1
2
 Current thread----main @coroutine#1
3
 Spend time-961ms

We found that, in fact, buffer switches threads internally, that is, buffer and flowOn use the same caching mechanism, but buffer does not significantly change the context.

merge

conflate

Used to skip intermediate values and process only the latest values.

fun main() {
    measureTimeMillis {
        runBlocking {
            (1..5).asFlow()
                .conflate()
                .buffer()
                .collect {
                    delay(100)
                    println(it)
                }
        }
    }.let{
    println("Spend time-${it}ms")
    }
}
1
5
 Spend time-325ms

Process latest value

collectLatest & conf

Cancel the slow collector and restart it every time a new value is emitted.

fun main() {
    measureTimeMillis {
        runBlocking {
            (1..5).asFlow()
                .collectLatest {
                    println(it)
                    delay(100)
                    println("Re launch-$it")
                }
        }
    }.let {
        println("Spend time-${it}ms")
    }
}
1
2
3
4
5
 Re launch-5
 Spend time-267ms

Combined flow

zip

Combine related values in two streams

fun main() {
    measureTimeMillis {
        val strs= flowOf("one","two","three")
        runBlocking {
            (1..5).asFlow()
                .zip(strs){
                    a,b ->

                    "$a -> $b"
                }.collect { println(it) }
        }
    }.let {
        println("Spend time-${it}ms")
    }
}
1 -> one
2 -> two
3 -> three
 Spend time-120ms

Combine

suspend fun main() {
    val nums = (1..3).asFlow().onEach { delay(300) } // Transmit digital 1 3. 300 ms interval
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // The string is emitted every 400 milliseconds
    val startTime = System.currentTimeMillis() // Record start time
    nums.combine(strs) { a, b -> "$a -> $b" } // Use "zip" to combine individual strings
        .collect { value -> // Collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}
1 -> one at 472 ms from start
2 -> one at 682 ms from start
2 -> two at 874 ms from start
3 -> two at 987 ms from start
3 -> three at 1280 ms from start

Channel channel

  • Non blocking communication infrastructure
  • Similar to BlockingQueue+suspend

Provides a method to transfer data in Flow

Channel classification

classification

describe

RENDEZVOUS

See you later, hang up after the send call until the receive arrives

UNLIMITED

Unlimited capacity, returned directly after the send call

CONFLATED

Keep up-to-date. reveive can only get the value of the latest send

BUFFERED

Default capacity. You can set the default size through the program parameters. The default size is 64

FIXED

Fixed capacity, execute cache size through parameters

Channel foundation

suspend fun main() {
   runBlocking {
       val channel = Channel<Int>()
       launch {
           // Here may be asynchronous logic that consumes a lot of CPU operations. We will only square the integer five times and send it
           for (x in 1..5) channel.send(x * x)
       }
// Here we print the received integers five times:
       repeat(5) { println(channel.receive()) }
       println("Done!")
   }
}
1
4
9
16
Done!

channel closure and iteration

suspend fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            // Here may be asynchronous logic that consumes a lot of CPU operations. We will only square the integer five times and send it
            for (x in 1..5) {
                channel.send(x * x)
            }
            channel.close()
            println("Is the channel closed+${channel.isClosedForSend}")
        }
        for (i in channel) {
            println(i)
        }
        println("Did you receive all the data+${channel.isClosedForReceive}")
    }
}
1
4
9
16
25

consumeEach

To replace the for loop

suspend fun main() {
   runBlocking {
       val channel = Channel<Int>()
       launch {
           // Here may be asynchronous logic that consumes a lot of CPU operations. We will only square the integer five times and send it
           for (x in 1..5) {
               channel.send(x * x)
           }
           channel.close()
       }
       channel.consumeEach(::println)
   }
}

Co process Builder of Channel

  • Produce starts a producer collaboration and returns to ReceiveChannel
  • actor: start a consumer collaboration and return to SendChannel (temporarily abandoned)
  • The corresponding Channel of the above collaboration Builder is automatically closed

Produce

suspend fun main() {
    val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) {
        for (i in 0..3) {
            send(i)
        }
    }

      GlobalScope.launch {
        receiveChannel.consumeEach(::println)
    }.join()

}
0
1
2
3

actor

suspend fun main() {
    val sendChannel = GlobalScope.actor<Int>(capacity = Channel.UNLIMITED) {
        channel.consumeEach(::println)
    }

    GlobalScope.launch {
        for (i in 1..3){
            sendChannel.send(i)
        }
    }.join()

}

BroadcastChannel

  • Channel elements can only be consumed by one consumer
  • The elements of BroadcastChannel can be distributed to all subscribers
  • BroadcastChannel does not support RENDEZVOUS
suspend fun main() {
    val broadcastChannel = GlobalScope.broadcast {
        for (i in 1..3)
            send(i)
    }

    List(3) {
        GlobalScope.launch {
            val receiveChannel = broadcastChannel.openSubscription()
            println("-----$it")
            receiveChannel.consumeEach(::println)

        }
    }.joinAll()
}
-----2
-----0
-----1
1
1
1
2
2
3
3
2
3

Select (experimental)

  • It is a concept of IO multiplexing
  • Multiplexing for suspending functions in koltin

The Select expression can wait for multiple pending functions at the same time and Select the first available function

Use in Channel

suspend fun main() {
    runBlocking {
        val fizz = fizz()
        val buzz = buzz()
        repeat(7) {
            selectFizzBuzz(fizz, buzz)
        }
        coroutineContext.cancelChildren()
    }
}

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> {
        // < unit > means that the select expression does not return any results
        fizz.onReceive { value ->
            // This is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->
            // This is the second select clause
            println("buzz -> '$value'")
        }
    }
}

@ExperimentalCoroutinesApi
fun CoroutineScope.fizz() = produce {
    while (true) {
        delay(300)
        send("Fizz")
    }
}

@ExperimentalCoroutinesApi
fun CoroutineScope.buzz() = produce {
    while (true) {
        delay(500)
        send("Buzz")
    }
}
fizz -> 'Fizz'
buzz -> 'Buzz'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz'
fizz -> 'Fizz'
buzz -> 'Buzz'

Using the receive suspend function, we can receive data from one of the two channels, but select allows us to receive data from both channels at the same time using its onReceive clause. Note: onReceiver will fail to execute the closed channel and throw an exception. We can use onReceiveOrNull clause to perform specific operations when closing the channel

Added by Jaehoon on Wed, 09 Feb 2022 17:18:19 +0200