Channel channel in Kotlin

select multiplexing in channel

  • In the data communication system or computer network system, the bandwidth or capacity of the transmission medium is often greater than the demand for transmitting a single signal. In order to make effective use of the communication line, one channel is expected to transmit multiple signals at the same time, which is the so-called multiplexing technology.

Reuse multiple await s

  • The two API s obtain data from the network and local caches respectively, and display which one is expected to return first.
private val cachePath = "E://coroutine.cache"
    private val gson = Gson()
    data class Response<T>(val value:T, val isLocal:Boolean)
    fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO){
    delay(1000)//Intentional delay
    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
    }
    fun CoroutineScope.getUserFromRemote(name:String) = async(Dispatchers.IO){
    userServiceApi.getUser(name)
    }

    @Test
    fun `test select await`() = runBlocking<Unit> {
        GlobalScope.launch {
            val localRequest = getUserFromLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            val userRespone = select<Response<User>> {
                localRequest.onAwait{ Response(it, true) }
                remoteRequest.onAwait{Response(it, false)}
            }
            userRespone.value?.let { println(it) }
        }.join()
    }

Multiplexing multiple channels

  • Similar to await, it will receive the fastest channel message.
 @Test
    fun `test select channel`() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[1].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)
    }

SelectClause

  • How do we know which events can be selected? In fact, all events that can be selected are of type SelectClauseN, including:
    1.SelectClause0: the corresponding event has no return value. For example, if the join has no return value, onJoin is of type SelectClauseN. When in use,
    The parameter of onJoin is a parameterless function
    2.SelectClause1: the corresponding event has a return value. The previous onAwait and onReceive are both such cases.
    3.SelectClause2: the corresponding event has a return value. In addition, an additional parameter is required, such as Channel Onsend has two parameters. The first is the value of Channel data type, which indicates the value to be sent; The second is the callback parameter when sending successfully.
  • If we want to confirm whether the pending function supports select, we just need to check whether there is a corresponding SelectClauseN type callback.
@Test
    fun `test selectClause0`() = runBlocking<Unit> {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")
        }
        val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }
        select<Unit> {
            job1.onJoin { println("job 1 onJoin") }
            job2.onJoin { println("job 1 onJoin") }
        }
        delay(1000)
    }

    @Test
    fun `test selectClause2`() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)

        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(200) { sentChannel ->
                        println("sent on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sendChannel ->
                        println("send on $sendChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
        delay(1000)
    }

Multiplexing using Flow

  • In most cases, we can achieve the effect of multiplexing by constructing an appropriate Flow.
@Test
    fun `test select flow`() = runBlocking<Unit> {
    //Function - > coroutine - > flow - > flow merge
    val name = "guest"
    coroutineScope {
    listOf(::getUserFromLocal, ::getUserFromRemote)
    .map{ function ->
    function.call(name)
    }.map{deferred ->
    flow{emit(deferred.await())}
    }.merge().collect{user -> println(user)}
    }
    }

Concurrent security of channel

  • Insecure concurrent access
    When we use threads to solve concurrency problems, we will always encounter thread safety problems, and the implementation of Kotlin coroutine on Java platform will inevitably have concurrent scheduling, so thread safety is also worth paying attention to.
@Test
    fun `test not safe concurrent`() = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch { count++ }
        }.joinAll()
        println(count)
    }

    @Test
    fun `test safe concurrent`() = runBlocking<Unit> {
        var count = AtomicInteger(0)
        List(1000) {
            GlobalScope.launch { count.incrementAndGet() }
        }.joinAll()
        println(count)
    }

Concurrent tools for collaborative processes

  • In addition to our common means to solve concurrency problems in threads, the collaborative process framework also provides some concurrency safety tools, including:
    1.Channel: concurrent secure message channel, which we are already very familiar with.
    2.Mutex: lightweight lock. Its lock and unlock are semantically similar to thread locks. The reason why it is lightweight is that it will not block the thread when it cannot obtain the lock, but hang and wait for the release of the lock.
    3.Semaphore: lightweight semaphore. There can be multiple semaphores. The coroutine can perform concurrent operations after obtaining the semaphore. When the parameter of semaphore is 1, the effect is equivalent to 1Mutex
 @Test
    fun `test safe concurrent tools`() = runBlocking<Unit> {
        var count = 0
        val mutex = Mutex()
        List(1000) {
            GlobalScope.launch {
                mutex.withLock {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }

    @Test
    fun `test safe concurrent tools2`() = runBlocking<Unit> {
        var count = 0
        val semaphore = Semaphore(1)
        List(1000) {
            GlobalScope.launch {
                semaphore.withPermit {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }

Avoid accessing external mutable States

  • When writing a function, it is required that it cannot access the external state. It can only perform operations based on parameters, and provide the operation results through the return value.
  @Test
    fun `test avoid access outer variable`() = runBlocking<Unit> {
        var count = 0
        val result = count + List(1000) {
            GlobalScope.async { 1 }
        }.map { it.await() }.sum()
        println(result)
    }

Keywords: Android kotlin

Added by seodevhead on Fri, 18 Feb 2022 01:55:57 +0200