Kotlin's concurrency

Know channel

channel is a concurrent and secure queue, which can connect processes and realize the communication of different processes.

Several types of channels are defined in the Library. They can store multiple elements internally, but they are different in terms of whether the send call can be suspended. For all Channel types, the behavior of receive call is the same: if the Channel is not empty, the element will be received, otherwise it will be suspended.

Unlimited channel

An unlimited channel is the closest simulation to a queue: the producer can send elements to this channel, and it will increase indefinitely. The send method will never be suspended. If there is no more memory, OutOfMemoryException will be thrown. Unlike queues, users try to receive messages from an empty channel and are suspended until some new elements are sent to the channel.

Buffered channel

The size of the buffered channel is limited by the specified number. Producers can send elements to this channel until the maximum limit is reached. All elements are stored internally. When the channel is full, the next send call will be suspended until more space is available.

Rendezvous channel

A rendezvous channel is a channel without a buffer. This is the same as establishing a buffered channel with a size of zero. One of the functions (send or receive) is suspended until the other function is called. If the send function is called, but the consumer is not ready to process the element, the receive is suspended, and the send is suspended. Similarly, if the receive function is called and the channel is empty, in other words, the send that is not ready to send the element is suspended - receive will also be suspended.

Conflated channel

The new element sent to the merged channel will overwrite the previously sent element, so the receiver will always be able to get only the latest element. The send call will never be suspended.
When establishing a channel, specify its type or buffer size (if the channel needs to be buffered):

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

By default, a rendezvous channel is established.

In the following example, a "contract" channel will be established, two producer processes and one consumer process:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

The above will print the following results:

[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2

channel is actually a queue. There must be a buffer in the queue. When the buffer is full and no one has called the receive take function, send needs to be suspended. It deliberately slows down the rhythm of the receiver. It is found that send is always suspended and will not continue to execute until after receiving.

    fun run1() {
        val channel = Channel<Int>(Channel.UNLIMITED)
        //Channel inter process communication, concurrent secure queue
        runBlocking {
            //producer
            val p = launch {
                for (i in 1..5) {
                    channel.send(i)
                    log("send = $i")
                }
            }
            //consumer
            val c = launch {
                //Receive data normally
                while (true) {
                    //Deliberately slow down the receive r. It is found that send is always suspended and will not continue to execute until after receiving
                    delay(2000)
                    val el = channel.receive()
                    log("re = $el")
                }
                //Receive data through iterator
                //val iterator = channel.iterator()
                //while (iterator.hasNext()) {
                //    delay(2000)
                //    log("iterator = ${iterator.next()}")
                //}

            }

            joinAll(p,c)
        }


    }

Print:
com.z.zjetpack V/zx: send = 1
com.z.zjetpack V/zx: send = 2
com.z.zjetpack V/zx: send = 3
com.z.zjetpack V/zx: send = 4
com.z.zjetpack V/zx: send = 5
com.z.zjetpack V/zx: re = 1
com.z.zjetpack V/zx: re = 2
com.z.zjetpack V/zx: re = 3
com.z.zjetpack V/zx: re = 4
com.z.zjetpack V/zx: re = 5

Production and actor

  • A convenient way to construct producers and consumers
  • We can start a producer process through the produce method and return a reply channel, which can be used by other processes to receive data. In turn, we can start a consumer collaboration process with actor.
    fun run2(){
       runBlocking {
           //Quickly create a producer collaboration and return a receiving Channel
          val receiveChannel = produce<Int> {
               repeat(5){
                   delay(1000)
                   send(it)
               }
          }

           val job2 = launch {
               for (i in receiveChannel) {
                   log("receiveChannel = $i")
               }
           }
           job2.join()
       }

        runBlocking {
            //A convenient way to construct consumers
            val sendChannel = actor<Int> {
                while (true) {
                    val re = receive()
                    log("re = $re")
                }
            }
           val p =  launch {
                for (i in 1..3) {
                    sendChannel.send(i)
                }

            }
            p.join()
        }
    }

Print:
com.z.zjetpack V/zx: receiveChannel = 0
com.z.zjetpack V/zx: receiveChannel = 1
com.z.zjetpack V/zx: receiveChannel = 2
com.z.zjetpack V/zx: receiveChannel = 3
com.z.zjetpack V/zx: receiveChannel = 4
com.z.zjetpack V/zx: re = 1
com.z.zjetpack V/zx: re = 2
com.z.zjetpack V/zx: re = 3

Closing of channel

  • The channels returned by both the production and the actor will be closed after the execution of the corresponding collaboration. In this way, the channel will be called hot data flow
  • For a channel, if we call its close method, it will immediately stop receiving new elements, and its isClosedForSend will immediately return true. Due to the existence of channel buffer, some elements may not be processed, so isClosedForReceive will return true only after all elements are read
  • The life cycle of the channel is best maintained by the leading Party. It is recommended that the leading Party close it.
    fun run3(){
        runBlocking {
            val channel = Channel<Int>(3)
            //producer
            launch {
                List(3){
                    channel.send(it)
                    log("send = $it")
                }

                channel.close()
                log("isClosedForSend = ${channel.isClosedForSend}")
                log("isClosedForReceive = ${channel.isClosedForReceive}")
            }

            //consumer
            launch {
                for (c in channel) {
                    log("re = $c")
                    delay(1000)
                }

                log("consumption isClosedForSend = ${channel.isClosedForSend}")
                log("consumption isClosedForReceive = ${channel.isClosedForReceive}")
            }

        }
    }

Print:
com.z.zjetpack V/zx: send = 0
com.z.zjetpack V/zx: send = 1
com.z.zjetpack V/zx: send = 2
com.z.zjetpack V/zx: isClosedForSend = true
com.z.zjetpack V/zx: isClosedForReceive = false
com.z.zjetpack V/zx: re = 0
com.z.zjetpack V/zx: re = 1
com.z.zjetpack V/zx: re = 2
com.z.zjetpack V/zx: consumption isClosedForSend = true
com.z.zjetpack V/zx: consumption isClosedForReceive = true

BroadcastChannel

There is a one to many scenario between the sender and receiver in the channel. Although there are multiple receivers, the same element will only be read by one receiver, but the broadcast is different. Multiple receivers do not have mutually exclusive behavior.

    fun run4() {
        runBlocking {
            //Create directly
            // val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
            //broadcast method creation
            val channel = Channel<Int>()
            val broadcastChannel = channel.broadcast(Channel.BUFFERED)
            //Create 3 processes to receive
            List(3) {
                launch {
                    val receiveChannel = broadcastChannel.openSubscription()
                    for (r in receiveChannel) {
                        log("Synergetic process $it, re = $r")
                    }
                }
            }
            launch {
                List(3) {
                    broadcastChannel.send(it)
                }
                broadcastChannel.close()
            }


        }
    }

Print:
com.z.zjetpack V/zx: Synergy 0, re = 0
com.z.zjetpack V/zx: Synergy 0, re = 1
com.z.zjetpack V/zx: Synergy 0, re = 2
com.z.zjetpack V/zx: Synergetic process 1, re = 0
com.z.zjetpack V/zx: Synergetic process 1, re = 1
com.z.zjetpack V/zx: Synergetic process 1, re = 2
com.z.zjetpack V/zx: Synergetic process 2, re = 0
com.z.zjetpack V/zx: Synergetic process 2, re = 1
com.z.zjetpack V/zx: Synergetic process 2, re = 2

Multiplexing

Reuse multiple await s

The two APIs obtain data from the network and local respectively, and display which one is expected to return first

    fun CoroutineScope.getFromLocal() = async {
        delay(1000)
        "Return local data"
    }

    fun CoroutineScope.getFromNet() = async {
        "Return network data"
    }

    fun run5() {
        runBlocking {
            launch {
                val local = getFromLocal()
                val net = getFromNet()

                val res = select<String> {
                    local.onAwait { it }
                    net.onAwait { it }
                }

                log("value = $res")
            }.join()


        }
    }

Print:
com.z.zjetpack V/zx: value = Return network data

Multiplexing multiple channel s

Similar to await, it will receive the fastest channel message

    fun run6() {
        runBlocking {
            val channels = listOf(Channel<Int>(), Channel<Int>())
            launch {
                delay(100)
                channels[0].send(1)
            }

            launch {
                delay(500)
                channels[1].send(5)
            }


            val result = select<Int> {
                channels.forEach { re ->
                    re.onReceive{it}
                }
            }

            log("result = $result")
        }
    }
Print:
com.z.zjetpack V/zx: result = 1

SelectClause

Which events can be select ed? SelectClause type
include:
SelectClause0: the corresponding event has no return value. For example, join has no return value. The corresponding onJoin is of this type. When used, the onJoin parameter is a parameterless function:

   public val onJoin: SelectClause0


        runBlocking {
            val job1 = launch {
                delay(100)
                log("job1")
            }
            val job2 = launch {
                delay(10)
                log("job2")
            }

            select<Unit> {
                job1.onJoin {
                    log("job1.onJoin")
                }
                job2.onJoin {
                    log("job2.onJoin")
                }
            }

        }


Print:
com.z.zjetpack V/zx: job2
com.z.zjetpack V/zx: job2.onJoin
com.z.zjetpack V/zx: job1

SelectClause1: the corresponding event has a return value. The previous onAwait and onReceive are both such cases.

    public val onAwait: SelectClause1<T>
    public val onReceive: SelectClause1<E>

SelectClause2: the corresponding event has a return value. In addition, an additional parameter is required, such as Channel Onsend has two parameters. The first is a value of Channel data type, indicating the value to be sent, and the second is the callback when the transmission is successful.
If we want to confirm whether the pending function supports select, we can check whether the corresponding SelectClauseN type can be called back.

	//Return to SelectClause2
    public val onSend: SelectClause2<E, SendChannel<E>>


        runBlocking {
            val channels = listOf(Channel<Int>(), Channel<Int>())
            launch {
                select<Unit> {
                    launch {
                        delay(100)
                        channels[0].onSend(1) { sendChannel ->
                            log("send on $sendChannel")

                        }
                    }

                    launch {
                        delay(500)
                        channels[1].onSend(5) { sendChannel ->
                            log("send on $sendChannel")
                        }
                    }

                }
            }


            launch {
                for (c in channels) {
                    log("data  = ${c.receive()}")
                }
            }

        }
Print:
com.z.zjetpack V/zx: send on RendezvousChannel@63db1bf{EmptyQueue}
com.z.zjetpack V/zx: data  = 1

Flow multiplexing

coroutineScope {
  val login = "..."
  listOf(::getUserFromApi, ::getUserFromLocal) ... ①
    .map { function ->
      function.call(login) ... ②
    }
    .map { deferred ->
      flow { emit(deferred.await()) } ... ③
    }
    .merge() ... ④
    .onEach { user ->
      println("Result: $user")
    }.launchIn(this)
}

Among them, ① creates a List composed of two function references; ② Call them to get deferred; ③ For each deferred, we create a separate Flow and send the deferred The result returned by await(), that is, the returned User object; Now that we have two Flow instances, we need to integrate them into one Flow for processing and call the merge function.

Concurrency security of collaborative processes

In addition to the common means to solve concurrency security problems in threads, coroutines provide some concurrency security tools

  • Channel: concurrent secure message channel
  • Mutex: lightweight lock. Lock and unlock are similar to thread lock. Lightweight means that it will not block the thread when it cannot obtain the lock, but hang and wait for the release of the lock.
  • Semaphore: lightweight semaphore. There can be multiple semaphores. After obtaining the semaphore, the collaborative process can perform concurrent operations. When the parameter of semaphore is 1, the effect is equivalent to Mutex
    fun run7() {
        runBlocking {
            var count = 0
            List(10000) {
                //GlobalScope is thread unsafe
                GlobalScope.launch {
                    count++
                }

            }.joinAll()
            log("default count = $count")

        }
        //Solving concurrency problems with volatile
        runBlocking {
            var count = AtomicInteger(0)
            List(10000) {
                //GlobalScope is thread unsafe
                GlobalScope.launch {
                    count.incrementAndGet()
                }

            }.joinAll()
            log("volatile count = ${count.get()}")

        }
        //Solving concurrency problems with Mutex
        runBlocking {
            var count = 0
            var mutex = Mutex()
            List(10000) {
                //GlobalScope is thread unsafe
                GlobalScope.launch {
                    mutex.withLock {
                        count++
                    }
                }

            }.joinAll()
            log("Mutex count = $count")

        }
        //Solving concurrency problems with Semaphore
        runBlocking {
            var count = 0
            var semaphore = Semaphore(1)
            List(10000) {
                //GlobalScope is thread unsafe
                GlobalScope.launch {
                    semaphore.withPermit {
                        count++
                    }
                }

            }.joinAll()
            log("Semaphore count = $count")

        }


    }

Print:

com.z.zjetpack V/zx: default count = 9991
com.z.zjetpack V/zx: volatile count = 10000
com.z.zjetpack V/zx: Mutex count = 10000
com.z.zjetpack V/zx: Semaphore count = 10000

In addition to using these tools to solve the concurrency problem, it can also avoid accessing external variable states. When writing a function, it is required that it cannot access external states. It can only perform operations based on input parameters and provide operation results through return values.

           runBlocking {
            var count = 0
            //count has no concurrency problem outside the collaboration process
            val result = count + List(10000){
                GlobalScope.async { 1 }
            }.map {
                it.await()
            }.sum()
            log("count count = $result")

        }

Keywords: Android kotlin jetpack

Added by geo115fr on Mon, 07 Feb 2022 14:33:32 +0200