Know channel
channel is a concurrent and secure queue, which can connect processes and realize the communication of different processes.
Several types of channels are defined in the Library. They can store multiple elements internally, but they are different in terms of whether the send call can be suspended. For all Channel types, the behavior of receive call is the same: if the Channel is not empty, the element will be received, otherwise it will be suspended.
Unlimited channel
An unlimited channel is the closest simulation to a queue: the producer can send elements to this channel, and it will increase indefinitely. The send method will never be suspended. If there is no more memory, OutOfMemoryException will be thrown. Unlike queues, users try to receive messages from an empty channel and are suspended until some new elements are sent to the channel.
Buffered channel
The size of the buffered channel is limited by the specified number. Producers can send elements to this channel until the maximum limit is reached. All elements are stored internally. When the channel is full, the next send call will be suspended until more space is available.
Rendezvous channel
A rendezvous channel is a channel without a buffer. This is the same as establishing a buffered channel with a size of zero. One of the functions (send or receive) is suspended until the other function is called. If the send function is called, but the consumer is not ready to process the element, the receive is suspended, and the send is suspended. Similarly, if the receive function is called and the channel is empty, in other words, the send that is not ready to send the element is suspended - receive will also be suspended.
Conflated channel
The new element sent to the merged channel will overwrite the previously sent element, so the receiver will always be able to get only the latest element. The send call will never be suspended.
When establishing a channel, specify its type or buffer size (if the channel needs to be buffered):
val rendezvousChannel = Channel<String>() val bufferedChannel = Channel<String>(10) val conflatedChannel = Channel<String>(CONFLATED) val unlimitedChannel = Channel<String>(UNLIMITED)
By default, a rendezvous channel is established.
In the following example, a "contract" channel will be established, two producer processes and one consumer process:
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.* fun main() = runBlocking<Unit> { val channel = Channel<String>() launch { channel.send("A1") channel.send("A2") log("A done") } launch { channel.send("B1") log("B done") } launch { repeat(3) { val x = channel.receive() log(x) } } } fun log(message: Any?) { println("[${Thread.currentThread().name}] $message") } The above will print the following results: [main @coroutine#4] A1 [main @coroutine#4] B1 [main @coroutine#2] A done [main @coroutine#3] B done [main @coroutine#4] A2
channel is actually a queue. There must be a buffer in the queue. When the buffer is full and no one has called the receive take function, send needs to be suspended. It deliberately slows down the rhythm of the receiver. It is found that send is always suspended and will not continue to execute until after receiving.
fun run1() { val channel = Channel<Int>(Channel.UNLIMITED) //Channel inter process communication, concurrent secure queue runBlocking { //producer val p = launch { for (i in 1..5) { channel.send(i) log("send = $i") } } //consumer val c = launch { //Receive data normally while (true) { //Deliberately slow down the receive r. It is found that send is always suspended and will not continue to execute until after receiving delay(2000) val el = channel.receive() log("re = $el") } //Receive data through iterator //val iterator = channel.iterator() //while (iterator.hasNext()) { // delay(2000) // log("iterator = ${iterator.next()}") //} } joinAll(p,c) } } Print: com.z.zjetpack V/zx: send = 1 com.z.zjetpack V/zx: send = 2 com.z.zjetpack V/zx: send = 3 com.z.zjetpack V/zx: send = 4 com.z.zjetpack V/zx: send = 5 com.z.zjetpack V/zx: re = 1 com.z.zjetpack V/zx: re = 2 com.z.zjetpack V/zx: re = 3 com.z.zjetpack V/zx: re = 4 com.z.zjetpack V/zx: re = 5
Production and actor
- A convenient way to construct producers and consumers
- We can start a producer process through the produce method and return a reply channel, which can be used by other processes to receive data. In turn, we can start a consumer collaboration process with actor.
fun run2(){ runBlocking { //Quickly create a producer collaboration and return a receiving Channel val receiveChannel = produce<Int> { repeat(5){ delay(1000) send(it) } } val job2 = launch { for (i in receiveChannel) { log("receiveChannel = $i") } } job2.join() } runBlocking { //A convenient way to construct consumers val sendChannel = actor<Int> { while (true) { val re = receive() log("re = $re") } } val p = launch { for (i in 1..3) { sendChannel.send(i) } } p.join() } } Print: com.z.zjetpack V/zx: receiveChannel = 0 com.z.zjetpack V/zx: receiveChannel = 1 com.z.zjetpack V/zx: receiveChannel = 2 com.z.zjetpack V/zx: receiveChannel = 3 com.z.zjetpack V/zx: receiveChannel = 4 com.z.zjetpack V/zx: re = 1 com.z.zjetpack V/zx: re = 2 com.z.zjetpack V/zx: re = 3
Closing of channel
- The channels returned by both the production and the actor will be closed after the execution of the corresponding collaboration. In this way, the channel will be called hot data flow
- For a channel, if we call its close method, it will immediately stop receiving new elements, and its isClosedForSend will immediately return true. Due to the existence of channel buffer, some elements may not be processed, so isClosedForReceive will return true only after all elements are read
- The life cycle of the channel is best maintained by the leading Party. It is recommended that the leading Party close it.
fun run3(){ runBlocking { val channel = Channel<Int>(3) //producer launch { List(3){ channel.send(it) log("send = $it") } channel.close() log("isClosedForSend = ${channel.isClosedForSend}") log("isClosedForReceive = ${channel.isClosedForReceive}") } //consumer launch { for (c in channel) { log("re = $c") delay(1000) } log("consumption isClosedForSend = ${channel.isClosedForSend}") log("consumption isClosedForReceive = ${channel.isClosedForReceive}") } } } Print: com.z.zjetpack V/zx: send = 0 com.z.zjetpack V/zx: send = 1 com.z.zjetpack V/zx: send = 2 com.z.zjetpack V/zx: isClosedForSend = true com.z.zjetpack V/zx: isClosedForReceive = false com.z.zjetpack V/zx: re = 0 com.z.zjetpack V/zx: re = 1 com.z.zjetpack V/zx: re = 2 com.z.zjetpack V/zx: consumption isClosedForSend = true com.z.zjetpack V/zx: consumption isClosedForReceive = true
BroadcastChannel
There is a one to many scenario between the sender and receiver in the channel. Although there are multiple receivers, the same element will only be read by one receiver, but the broadcast is different. Multiple receivers do not have mutually exclusive behavior.
fun run4() { runBlocking { //Create directly // val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED) //broadcast method creation val channel = Channel<Int>() val broadcastChannel = channel.broadcast(Channel.BUFFERED) //Create 3 processes to receive List(3) { launch { val receiveChannel = broadcastChannel.openSubscription() for (r in receiveChannel) { log("Synergetic process $it, re = $r") } } } launch { List(3) { broadcastChannel.send(it) } broadcastChannel.close() } } } Print: com.z.zjetpack V/zx: Synergy 0, re = 0 com.z.zjetpack V/zx: Synergy 0, re = 1 com.z.zjetpack V/zx: Synergy 0, re = 2 com.z.zjetpack V/zx: Synergetic process 1, re = 0 com.z.zjetpack V/zx: Synergetic process 1, re = 1 com.z.zjetpack V/zx: Synergetic process 1, re = 2 com.z.zjetpack V/zx: Synergetic process 2, re = 0 com.z.zjetpack V/zx: Synergetic process 2, re = 1 com.z.zjetpack V/zx: Synergetic process 2, re = 2
Multiplexing
Reuse multiple await s
The two APIs obtain data from the network and local respectively, and display which one is expected to return first
fun CoroutineScope.getFromLocal() = async { delay(1000) "Return local data" } fun CoroutineScope.getFromNet() = async { "Return network data" } fun run5() { runBlocking { launch { val local = getFromLocal() val net = getFromNet() val res = select<String> { local.onAwait { it } net.onAwait { it } } log("value = $res") }.join() } } Print: com.z.zjetpack V/zx: value = Return network data
Multiplexing multiple channel s
Similar to await, it will receive the fastest channel message
fun run6() { runBlocking { val channels = listOf(Channel<Int>(), Channel<Int>()) launch { delay(100) channels[0].send(1) } launch { delay(500) channels[1].send(5) } val result = select<Int> { channels.forEach { re -> re.onReceive{it} } } log("result = $result") } } Print: com.z.zjetpack V/zx: result = 1
SelectClause
Which events can be select ed? SelectClause type
include:
SelectClause0: the corresponding event has no return value. For example, join has no return value. The corresponding onJoin is of this type. When used, the onJoin parameter is a parameterless function:
public val onJoin: SelectClause0 runBlocking { val job1 = launch { delay(100) log("job1") } val job2 = launch { delay(10) log("job2") } select<Unit> { job1.onJoin { log("job1.onJoin") } job2.onJoin { log("job2.onJoin") } } } Print: com.z.zjetpack V/zx: job2 com.z.zjetpack V/zx: job2.onJoin com.z.zjetpack V/zx: job1
SelectClause1: the corresponding event has a return value. The previous onAwait and onReceive are both such cases.
public val onAwait: SelectClause1<T> public val onReceive: SelectClause1<E>
SelectClause2: the corresponding event has a return value. In addition, an additional parameter is required, such as Channel Onsend has two parameters. The first is a value of Channel data type, indicating the value to be sent, and the second is the callback when the transmission is successful.
If we want to confirm whether the pending function supports select, we can check whether the corresponding SelectClauseN type can be called back.
//Return to SelectClause2 public val onSend: SelectClause2<E, SendChannel<E>> runBlocking { val channels = listOf(Channel<Int>(), Channel<Int>()) launch { select<Unit> { launch { delay(100) channels[0].onSend(1) { sendChannel -> log("send on $sendChannel") } } launch { delay(500) channels[1].onSend(5) { sendChannel -> log("send on $sendChannel") } } } } launch { for (c in channels) { log("data = ${c.receive()}") } } } Print: com.z.zjetpack V/zx: send on RendezvousChannel@63db1bf{EmptyQueue} com.z.zjetpack V/zx: data = 1
Flow multiplexing
coroutineScope { val login = "..." listOf(::getUserFromApi, ::getUserFromLocal) ... ① .map { function -> function.call(login) ... ② } .map { deferred -> flow { emit(deferred.await()) } ... ③ } .merge() ... ④ .onEach { user -> println("Result: $user") }.launchIn(this) }
Among them, ① creates a List composed of two function references; ② Call them to get deferred; ③ For each deferred, we create a separate Flow and send the deferred The result returned by await(), that is, the returned User object; Now that we have two Flow instances, we need to integrate them into one Flow for processing and call the merge function.
Concurrency security of collaborative processes
In addition to the common means to solve concurrency security problems in threads, coroutines provide some concurrency security tools
- Channel: concurrent secure message channel
- Mutex: lightweight lock. Lock and unlock are similar to thread lock. Lightweight means that it will not block the thread when it cannot obtain the lock, but hang and wait for the release of the lock.
- Semaphore: lightweight semaphore. There can be multiple semaphores. After obtaining the semaphore, the collaborative process can perform concurrent operations. When the parameter of semaphore is 1, the effect is equivalent to Mutex
fun run7() { runBlocking { var count = 0 List(10000) { //GlobalScope is thread unsafe GlobalScope.launch { count++ } }.joinAll() log("default count = $count") } //Solving concurrency problems with volatile runBlocking { var count = AtomicInteger(0) List(10000) { //GlobalScope is thread unsafe GlobalScope.launch { count.incrementAndGet() } }.joinAll() log("volatile count = ${count.get()}") } //Solving concurrency problems with Mutex runBlocking { var count = 0 var mutex = Mutex() List(10000) { //GlobalScope is thread unsafe GlobalScope.launch { mutex.withLock { count++ } } }.joinAll() log("Mutex count = $count") } //Solving concurrency problems with Semaphore runBlocking { var count = 0 var semaphore = Semaphore(1) List(10000) { //GlobalScope is thread unsafe GlobalScope.launch { semaphore.withPermit { count++ } } }.joinAll() log("Semaphore count = $count") } }
Print:
com.z.zjetpack V/zx: default count = 9991 com.z.zjetpack V/zx: volatile count = 10000 com.z.zjetpack V/zx: Mutex count = 10000 com.z.zjetpack V/zx: Semaphore count = 10000
In addition to using these tools to solve the concurrency problem, it can also avoid accessing external variable states. When writing a function, it is required that it cannot access external states. It can only perform operations based on input parameters and provide operation results through return values.
runBlocking { var count = 0 //count has no concurrency problem outside the collaboration process val result = count + List(10000){ GlobalScope.async { 1 } }.map { it.await() }.sum() log("count count = $result") }