Kotlin collaboration process III - data Flow

Kotlin collaboration series article navigation:
Kotlin process I - Coroutine
Kotlin collaboration process 2 - Channel
Kotlin collaboration process III - data Flow
Kotlin collaboration process IV -- Application of Flow and Channel
Kotlin collaboration 5 - using kotlin collaboration in Android

1, Basic use of Flow

Using the hang function in the Kotlin coroutine can implement non blocking execution of tasks and return the results, but only a single calculation result can be returned. However, if you want multiple calculation results to be returned, you can use Flow.

1.1 Sequence and Flow

Before introducing Flow, take a look at the sequence generator:

val intSequence = sequence<Int> {
        Thread.sleep(1000) // Simulate time-consuming task 1
        yield(1)
        Thread.sleep(1000) // Simulate time-consuming task 2
        yield(2)
        Thread.sleep(1000) // Simulate time-consuming task 3
        yield(3)
    }

intSequence.forEach {
        println(it)
    }

As mentioned above, to extract the value from the sequence generator, we need to iterate the sequence generator and return three results in turn according to our expectations.

Sequence is a synchronous call and is blocked. Other pending functions cannot be called. Obviously, we often want to execute multiple tasks asynchronously and return the results in turn. Flow is the optimal solution of the scenario.

The Flow source code is as follows. There is only one collect method.

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

Flow can perform multiple tasks without blocking and return multiple results. Other pending functions can be called in flow. To retrieve the value in flow, you need to call the collect method. The use form of flow is:

Flow.collect()  // Pseudo code

Since collect is a hang function, the collect method must be invoked in the association.

1.2 simple use of flow

Achieve similar effects to the above Sequence:

private fun createFlow(): Flow<Int> = flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() = runBlocking {
    createFlow().collect {
        println(it)
    }
}

The above code uses Flow {...} To build a Flow type, which has the following characteristics:

  • flow{ ... } suspend function can be called internally;
  • createFlow does not need to be marked with suspend; (why is it not marked as a pending function to call a pending function?)
  • Use the emit() method to transmit data;
  • Use the collect() method to collect the results.

1.3 common methods for creating general Flow:

flow{...}

flow {
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

flowOf()

flowOf(1,2,3).onEach {
    delay(1000)
}

The flowOf() builder defines a Flow that emits a fixed value set. Building a Flow using flowOf does not need to display the data emitted by calling emit()

asFlow()

listOf(1, 2, 3).asFlow().onEach {
    delay(1000)
}

Using the asFlow() extension function, you can convert various sets and sequences into streams without displaying the data transmitted by calling emit()

1.4 Flow is cold flow (inert)

Like Sequences, Flows are also inert, that is, before calling the end flow operator (collect is one of them), flow {...} The code in does not execute. We call it cold flow.

private fun createFlow(): Flow<Int> = flow {
    println("flow started")
    delay(1000)
    emit(1)
    delay(1000)
    emit(2)
    delay(1000)
    emit(3)
}

fun main() = runBlocking {
    val flow = createFlow()
    println("calling collect...")
    flow.collect {
        println(it)
    }
    println("calling collect again...")
    flow.collect {
        println(it)
    }
}

The results are as follows:

calling collect...
flow started
1
2
3
calling collect again...
flow started
1
2
3

This is why a function that returns a Flow is not marked suspend. Even if it calls the suspend function internally, calling createFlow will return immediately without any waiting. Each time the results are collected, the Flow will be started.

Is there any heat flow? The ChannelFlow mentioned later is heat flow. The upstream data will be sent to the downstream collector immediately.

1.5 cancellation of flow

The Flow adopts the same assistance cancellation as the collaborative process. The collection of streams can be cancelled when the stream is suspended in a cancelable suspend function (such as delay). To cancel Flow, you only need to cancel its collaboration.
The following example shows how the flow cancels and stops executing its code when the code in the withTimeoutOrNull block is running:

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250 milliseconds
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

Note that in the simple function, the stream emits only two numbers, resulting in the following output:

Emitting 1
1
Emitting 2
2
Done

2, Flow operator

2.1 Terminal flow operators

The end operator is a pending function on a stream that starts stream collection. collect is the most basic end operator, but there are other end operators that are more convenient to use:

  • Convert to various sets, toList/toSet/toCollection
  • Gets the first value, the last value, and the operator that ensures that the stream emits a single value
  • reduce and fold the flow to a single value
  • count
  • launchIn/produceIn/broadcastIn

Let's look at some common end flow operators

2.1.1 collect

Collect data sent upstream

2.1.2 reduce

Reduce is similar to the reduce function in the Kotlin set. It can calculate the set. As mentioned earlier, reduce is an end stream operator.

fun main() = runBlocking {
    val sum = (1..5).asFlow().reduce { a, b ->
        a + b
    }
    println(sum)
}

Output results:

15

2.1.3 fold

Fold is also similar to fold in the Kotlin set. You need to set an initial value, and fold is also an end flow operator.

fun main() = runBlocking {
    val sum = (1..5).asFlow().fold(100) { a, b ->
        a + b
    }
    println(sum)
}

Output results:

115

2.1.4 launchIn

launchIn is used to start flow within the specified CoroutineScope. A parameter needs to be passed in: CoroutineScope
Source code:

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

Example:

private val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
fun main() {
    val scope = CoroutineScope(mDispatcher)
    (1..5).asFlow().onEach { println(it) }
        .onCompletion { mDispatcher.close() }
        .launchIn(scope)
}

Output results:

1
2
3
4
5

Take another example:

fun main() = runBlocking{
    val cosTime = measureTimeMillis {
        (1..5).asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }

        flowOf("one", "two", "three", "four", "five")
            .onEach { delay(200) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
    println("cosTime: $cosTime")
}

We want to execute two flows in parallel. Let's see the output results:

1
2
3
4
5
one
two
three
four
five
cosTime: 1645

The result is not executed in parallel, which is easy to understand, because the first collect ion will not go to the second until it is executed.

The correct way to write it should be to set up a separate process for each Flow:

fun main() = runBlocking<Unit>{
    launch {
        (1..5).asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
    launch {
        flowOf("one", "two", "three", "four", "five")
            .onEach { delay(200) }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
}

Or use launchIn, which is more elegant:

fun main() = runBlocking<Unit>{

    (1..5).asFlow()
        .onEach { delay(100) }
        .flowOn(Dispatchers.IO)
        .onEach { println(it) }
        .launchIn(this)

    flowOf("one", "two", "three", "four", "five")
        .onEach { delay(200) }
        .flowOn(Dispatchers.IO)
        .onEach { println(it) }
        .launchIn(this)
}

Output results:

1
one
2
3
4
two
5
three
four
five

2.2 the flow is continuous

Similar to Sequence, each individual collection of Flow is performed sequentially, unless the operator performing special operations uses multiple streams. By default, new processes are not started. Each transition operator from upstream to downstream processes each emitted value and then passes it to the end operator.

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}

Output:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

2.3 onStart flow startup

Flow starts the callback at the beginning of execution, which can be used for loading during time-consuming operations.

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { delay(200) }
        .onStart { println("onStart") }
        .collect { println(it) }
}

Output results:

onStart
1
2
3
4
5

2.4 when oncompletion flow is completed

When Flow is completed (normal or abnormal), if an operation needs to be performed, it can be completed in two ways:

2.4.1 using try Finally implementation

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}

2.4.2 implementation through onCompletion function

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}

Output:

1
2
3
4
5
Done

2.5 back pressure

Backpressure is one of the functions of responsive programming. Flowable in Rxjava2 supports the following backpressure strategies:

  • MISSING: the created Flowable does not specify a back pressure policy and will not cache or discard the data transmitted through OnNext.
  • ERROR: if the data in the asynchronous cache pool of Flowable exceeds the limit, a MissingBackpressureException will be thrown.
  • BUFFER: the asynchronous cache pool of Flowable is the same as that of Observable. It has no fixed size and can add data without limit. It will not throw MissingBackpressureException, but it will cause OOM.
  • DROP: if the asynchronous cache pool of Flowable is full, the data to be put into the cache pool will be lost.
  • LATEST: if the cache pool is full, the data to be put into the cache pool will be lost. This is the same as the DROP policy, except that the LATEST policy will force the last piece of data into the cache pool regardless of the state of the cache pool.

In the Flow code block, we can receive every processing result, but if the processing result is also a time-consuming operation. It is possible that too much data is sent and the processing is not timely.
The Backpressure of Flow is implemented through the suspend function.

2.5.1 buffer buffer

buffer corresponds to the buffer policy of Rxjava. The buffer operation refers to setting the buffer. Of course, the buffer has a size. If it overflows, there will be different processing strategies.

  • Set the buffer. If it overflows, suspend the current collaboration until the data in the buffer is consumed.
  • Set the buffer. If it overflows, discard the latest data.
  • Set the buffer. If it overflows, discard the oldest data.

The size of the buffer can be set to 0, that is, no buffer is required.

Let's look at an example where no buffer is set. Suppose that it takes 100ms for each data generated and then transmitted, and 700ms for each data processed. The code is as follows:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.collect {
            delay(700)
            println("collect: $it")
        }
    }
    println("cosTime: $cosTime")
}

The results are as follows:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 4
collect: 4
produce data: 5
collect: 5
cosTime: 4069

Since the stream is inert and continuous, it takes about 4000 MS to complete the data processing in the whole stream

Next, we use buffer () to set a buffer. buffer(),
Two parameters are received. The first parameter is size, which indicates the size of the buffer. The second parameter is BufferOverflow, which represents the processing policy after buffer overflow. Its value is the following enumeration type. The default is BufferOverflow SUSPEND

The source code of the processing strategy is as follows:

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,

    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,

    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}

Set the buffer and adopt the suspended policy

After modifying the code, we set the buffer size to 1:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.SUSPEND)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

The results are as follows:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
collect: 1
produce data: 5
collect: 2
collect: 3
collect: 4
collect: 5
cosTime: 3713

It can be seen that the overall time is about 3713ms. The buffer operator can make the transmitted and collected code run concurrently, so as to improve efficiency.
The following is a brief analysis of the execution process:
Note that the buffer capacity is calculated from 0.
First, we collect the first data, generate the first data, and then 2, 3 and 4 are stored in the buffer. When the fifth data is transmitted, the buffer is full and will hang. Wait until the first data collection is completed before transmitting the fifth data.

Set the buffer to discard the latest data
If the above code handles cache overflow, the policy is bufferoverflow DROP_ Latest, code as follows:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.DROP_LATEST)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

The output is as follows:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 2
collect: 3
cosTime: 2272

You can see that the fourth and fifth data are directly discarded because the buffer is full and will not be collected.

Set the buffer to discard the old data
If the above code handles cache overflow, the policy is bufferoverflow DROP_ Oldest, code as follows:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.buffer(2, BufferOverflow.DROP_OLDEST)
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

The output results are as follows:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 4
collect: 5
cosTime: 2289

It can be seen that when the fourth data enters the buffer, the second data will be discarded, and when the fifth data enters the buffer, the third data will be discarded.

2.5.2 merge

When the flow represents partial operation results or operation status updates, it may not be necessary to process each value, but only the latest one. The conflate operator can be used to skip intermediate values:

fun main() = runBlocking {
    val cosTime = measureTimeMillis {
        (1..5).asFlow().onEach {
            delay(100)
            println("produce data: $it")
        }.conflate()
            .collect {
                delay(700)
                println("collect: $it")
            }
    }
    println("cosTime: $cosTime")
}

Output results:

produce data: 1
produce data: 2
produce data: 3
produce data: 4
produce data: 5
collect: 1
collect: 5
cosTime: 1596

The conflate operator does not set a buffer, that is, the buffer size is 0, discards the old data, that is, drops_ The oldest policy is actually equivalent to buffer(0, BufferOverflow.DROP_OLDEST).

2.6 Flow exception handling

2.6.1 catch operator catches upstream exception

The onCompletion mentioned earlier is used to determine whether the Flow collection is complete, even if an exception is encountered.

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion {
        println("onCompletion")
    }.collect {
        println("collect: $it")
    }
}

Output:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
onCompletion
Exception in thread "main" java.lang.Exception: test exception
...

In fact, you can judge whether there is an exception in onCompletion (action: suspend flowcollector < T > (cause: throwable?) - > Unit) has a parameter. If there is an exception in the upstream of flow, this parameter is not null. If there is no exception in the upstream, it is null. Therefore, we can judge the exception in onCompletion:

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.collect {
        println("collect: $it")
    }
}

However, onCompletion intelligently judges whether an exception has occurred and cannot catch an exception.

Catch operators can be used to catch exceptions.

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

Output results:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
flow completed exception
catch exception: test exception

However, if onCompletion and catch are exchanged, the downstream will not be affected after the catch operation catches the exception:
code:

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.onCompletion { cause ->
        if (cause != null) {
            println("flow completed exception")
        } else {
            println("onCompletion")
        }
    }.collect {
        println("collect: $it")
    }
}

Output results:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
catch exception: test exception
onCompletion
  • The catch operator is used to handle exceptions transparently. Catch is only an intermediate operator and cannot catch downstream exceptions,.
  • In the catch operator, throw can be used to throw the exception again, emit() can be used to convert to the emission value, and can be used for printing or other business logic processing

2.6.2 retry and retryWhen operators

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

If an exception is encountered upstream and the retry operator is used, retry will cause Flow to retry the specified number of times at most

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.retry(2) {
        it.message == "test exception"
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

Output results:

produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
produce data: 1
collect: 1
produce data: 2
collect: 2
produce data: 3
collect: 3
catch exception: test exception

Note that only when an exception is encountered and the retry method returns true will the retry be performed.

retryWhen finally calls the retry when operator. The following code is consistent with the logic of the above code.

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        if (it == 4) {
            throw Exception("test exception")
        }
        delay(100)
        println("produce data: $it")
    }.retryWhen { cause, attempt ->
        cause.message == "test exception" && attempt < 2
    }.catch { ex ->
        println("catch exception: ${ex.message}")
    }.collect {
        println("collect: $it")
    }
}

Imagine: what happens if you swap catch and retry/retryWhen in your code?

2.7 Flow thread switching

2.7.1 response thread

Flow switches threads based on CoroutineContext. Because collect is a suspend function and must be executed in CoroutineScope, the response thread is determined by CoroutineContext. For example, if the main thread always executes collect, the response thread is dispatchers Main.

2.7.2 flowOn switching threads

Rxjava determines the thread transmitting data and observers through subscribeOn and observaon. Moreover, if the upstream calls subscribeOn multiple times, the last one will prevail.
Flows switches threads through the flowOn method. Multiple calls will affect the upstream code. for instance:

private val mDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()

fun main() = runBlocking {
    (1..5).asFlow().onEach {
        printWithThreadInfo("produce data: $it")
    }.flowOn(Dispatchers.IO)
        .map {
            printWithThreadInfo("$it to String")
            "String: $it"
        }.flowOn(mDispatcher)
        .onCompletion {
            mDispatcher.close()
        }
        .collect {
            printWithThreadInfo("collect: $it")
        }
}

The output results are as follows:

thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 1
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 2
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 3
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 4
thread id: 13, thread name: DefaultDispatcher-worker-1 ---> produce data: 5
thread id: 12, thread name: pool-1-thread-1 ---> 1 to String
thread id: 12, thread name: pool-1-thread-1 ---> 2 to String
thread id: 1, thread name: main ---> collect: String: 1
thread id: 12, thread name: pool-1-thread-1 ---> 3 to String
thread id: 1, thread name: main ---> collect: String: 2
thread id: 12, thread name: pool-1-thread-1 ---> 4 to String
thread id: 1, thread name: main ---> collect: String: 3
thread id: 12, thread name: pool-1-thread-1 ---> 5 to String
thread id: 1, thread name: main ---> collect: String: 4
thread id: 1, thread name: main ---> collect: String: 5

As you can see, the transmission data is in dispatchers The map operation is performed by the IO thread in our custom thread pool, and the collect operation is performed in dispatchers Main thread.

2.8 Flow intermediate conversion operator

2.8.1 map

The previous example has been used for the map operator. The map operator can not only be used for flow, but also for List, which means that each element in the List is converted into a new element and added to a new List. Finally, let's talk about the return of the new List,
The map operator is used for the Flow representation to convert each element in the Flow and then emit it.

fun main() = runBlocking {
    (1..5).asFlow().map { "string: $it" }
        .collect {
            println(it)
        }
}

Output:

string: 1
string: 2
string: 3
string: 4
string: 5

2.8.2 transform

When using the transform operator, you can call emit any number of times, which is the biggest difference between transform and map:

fun main() = runBlocking {
    (1..5).asFlow().transform {
        emit(it * 2)
        delay(100)
        emit("String: $it")
    }.collect {
            println(it)
        }
}

Output results:

2
String: 1
4
String: 2
6
String: 3
8
String: 4
10
String: 5

2.8.3 onEach

ergodic

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { println("onEach: $it") }
        .collect { println(it) }
}

Output:

onEach: 1
1
onEach: 2
2
onEach: 3
3
onEach: 4
4
onEach: 5
5

2.8.4 filter

Filter by criteria

fun main() = runBlocking {
    (1..5).asFlow()
        .filter { it % 2 == 0 }
        .collect { println(it) }
}

Output results:

2
4

2.8.5 drop / dropWhile

drop filters out the first few elements
dropWhile filters elements that meet the criteria

2.8.6 take

The take operator takes only the values emitted by the first few emits

fun main() = runBlocking {
    (1..5).asFlow().take(2).collect {
            println(it)
        }
}

Output:

1
2

2.8.7 zip

zip is an operator that can merge two flow s

fun main() = runBlocking {
    val flowA = (1..6).asFlow()
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200)
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

Output results:

1 and one
2 and two
3 and three
4 and four
5 and five

The zip operator will merge an item in flowA with a corresponding item in flowB. Even if each item in flowB uses the delay() function, it will wait for delay() to execute before merging.

If the number of items in flowA and flowB is inconsistent, the number of new flow item s after merging is equal to the smaller number of items

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five", "six", "seven").onEach { delay(200) }
    flowA.zip(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

Output results:

1 and one
2 and two
3 and three
4 and four
5 and five

2.8.8 combine

combine is also a merge, but it is different from zip.

When merging with combine, each time a new item is issued from flowA, it will be merged with the latest item of flowB.

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five", "six", "seven").onEach { delay(200) }
    flowA.combine(flowB) { a, b -> "$a and $b" }
        .collect {
            println(it)
        }
}

Output results:

1 and one
2 and one
3 and one
3 and two
4 and two
5 and two
5 and three
5 and four
5 and five
5 and six
5 and seven

2.8.9 flattening of flattencontact and flattenMerge

flattenContact
flattenConcat flattens a given flow into a single flow in order without interleaving nested flows.
Source code:

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

example:

fun main() = runBlocking {
    val flowA = (1..5).asFlow()
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(1000) }

    flowOf(flowA,flowB)
        .flattenConcat()
        .collect{ println(it) }
}

Output:

1
2
3
4
5
// delay 1000ms
one
// delay 1000ms
two
// delay 1000ms
three
// delay 1000ms
four
// delay 1000ms
five

flattenMerge
fattenMerge has a parameter, concurrency limit, and the default bit is 16.
Source code:

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

It can be seen that the parameter must be greater than 0, and when the parameter is 1, it is consistent with flattenConcat.

fun main() = runBlocking {
    val flowA = (1..5).asFlow().onEach { delay(100) }
    val flowB = flowOf("one", "two", "three","four","five").onEach { delay(200) }

    flowOf(flowA,flowB)
        .flattenMerge(2)
        .collect{ println(it) }
}

Output results:

1
one
2
3
two
4
5
three
four
five

2.8.10 flatMapMerge and flatMapContact

flatMapMerge is implemented by map and flattenMerge operators

@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)

example:

fun main() = runBlocking {
    (1..5).asFlow()
        .flatMapMerge {
            flow {
                emit(it)
                delay(1000)
                emit("string: $it")
            }
        }.collect { println(it) }
}

Output results:

1
2
3
4
5
// delay 1000ms
string: 1
string: 2
string: 3
string: 4
string: 5

flatMapContact is implemented by map and flattenConcat operators

@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

example:

fun main() = runBlocking {
    (1..5).asFlow()
        .flatMapConcat {
            flow {
                emit(it)
                delay(1000)
                emit("string: $it")
            }
        }.collect { println(it) }
}

Output results:

1
// delay 1000ms
string: 1
2
// delay 1000ms
string: 2
3
// delay 1000ms
string: 3
4
// delay 1000ms
string: 4
5
// delay 1000ms
string: 5

Both flatMapMerge and flatMapContact convert one flow into another.
The difference is that flatMapMerge does not wait for the internal flow to complete. After calling flatMapConcat, the collect function will wait for the internal flow to complete before collecting new values.

2.8.11 flatMapLatest

When a new value is emitted, the previous flow will be cancelled.

fun main() = runBlocking {
    (1..5).asFlow().onEach { delay(100) }
        .flatMapLatest {
            flow {
                println("begin flatMapLatest $it")
                delay(200)
                emit("string: $it")
                println("end flatMapLatest $it")
            }
        }.collect {
            println(it)
        }
}

Output results:

begin flatMapLatest 1
begin flatMapLatest 2
begin flatMapLatest 3
begin flatMapLatest 4
begin flatMapLatest 5
end flatMapLatest 5
string: 5

3, StateFlow and SharedFlow

StateFlow and SharedFlow are new API s to replace BroadcastChannel. It is used for upstream transmission data and can be collected by multiple subscribers at the same time.

3.1 StateFlow

The official document explains that StateFlow is a state container observable data flow, which can send current state updates and new state updates to its collector. You can also read the current state value through its value attribute. To update the state and send it to the data flow, assign a new value to the value property of the MutableStateFlow class.

In Android, StateFlow is ideal for classes that need to keep variable states observable.

StateFlow has two types: StateFlow and MutableStateFlow:

public interface StateFlow<out T> : SharedFlow<T> {
   public val value: T
}

public interface MutableStateFlow<out T>: StateFlow<T>, MutableSharedFlow<T> {
   public override var value: T
   public fun compareAndSet(expect: T, update: T): Boolean
}

Status is represented by its value. Any update to the value will feed back the new value to the receivers of all streams.

3.1.1 StateFlow basic usage

Use example:

class Test {
    private val _state = MutableStateFlow<String>("unKnown")
    val state: StateFlow<String> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            val res = getApi()
            _state.value = res
        }
    }

    private suspend fun getApi() = withContext(Dispatchers.IO) {
        delay(2000) // Simulate time consuming requests
        "hello, stateFlow"
    }
}

fun main() = runBlocking<Unit> {
    val test: Test = Test()

    test.getApi(this) // Start getting results

    launch(Dispatchers.IO) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
    launch(Dispatchers.IO) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
}

The result output is as follows, and the program does not stop.

thread id: 14, thread name: DefaultDispatcher-worker-3 ---> unKnown
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> unKnown
// Wait two seconds
thread id: 14, thread name: DefaultDispatcher-worker-3 ---> hello, stateFlow
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> hello, stateFlow

StateFlow is used in a similar way to LiveData.
MutableStateFlow is of variable type, that is, the value of value can be changed. StateFlow is read-only. This is the same as LiveData and MutableLiveData. For the encapsulation of the program. Generally, immutable read-only variables are exposed.

The output results prove that:

  1. StateFlow is that the transmitted data can be collected simultaneously by multiple recipients in different processes.
  2. StateFlow is heat flow. As long as the data changes, it will emit data.

The program did not stop because the collector calling collect in StateFlow will suspend the current collaboration and will never end.

StateFlow differs from LiveData in that:

  1. StateFlow must have an initial value, which is not required for LiveData.
  2. LiveData will be bound with the Activity declaration cycle. When the View enters the stopped state, LiveData Observer () will automatically unregister, and the operation of collecting data from StateFlow or any other data flow will not stop.

3.1.2 why StateFlow

We know that LiveData has the following characteristics:

  1. The data can only be updated in the main thread. Even if the child thread passes the postValue() method, it will eventually post the value to the setValue() called by the main thread
  2. LiveData is not anti shake
  3. The transformation of LiveData works in the main thread
  4. LiveData needs to properly handle "sticky events".

In view of this, the above scenario can be easily solved by using StateFlow.

3.1.3 prevent task leakage

There are two solutions:

  1. Instead of directly using the collector of StateFlow, use the asLiveData() method to convert it to LiveData———— Why not use LiveData directly? What's wrong?
  2. Manually cancel the collaboration of StateFlow subscribers. In Android, it can be accessed from lifecycle The repeatonllifecycle block collects data streams.

The corresponding codes are as follows:

lifecycleSope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
}

3.1.4 SateFlow will only transmit the latest data to subscribers.

We modify the above code:

class Test {
    private val _state = MutableStateFlow<String>("unKnown")
    val state: StateFlow<String> get() = _state

    fun getApi1(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "helloļ¼Œcoroutine"
        }
    }

    fun getApi2(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello, kotlin"
        }
    }
}

fun main() = runBlocking<Unit> {
    val test: Test = Test()

    test.getApi1(this) // Start getting results
    delay(1000)
    test.getApi2(this) // Start getting results

    val job1 = launch(Dispatchers.IO) {
        delay(8000)
        test.state.collect {
            printWithThreadInfo(it)
        }
    }
    val job2 = launch(Dispatchers.IO) {
        delay(8000)
        test.state.collect {
            printWithThreadInfo(it)
        }
    }

    // Avoid task leakage and cancel manually
    delay(10000)
    job1.cancel()
    job2.cancel()
}

The current scenario is to first request getApi1(), and then request getApi2(), so that the value of stateflow plus the initial value has been assigned three times. Ensure that after all three assignments are completed, we can collect the data in stateflow.
The output results are as follows:

thread id: 13, thread name: DefaultDispatcher-worker-2 ---> hello, kotlin
thread id: 12, thread name: DefaultDispatcher-worker-1 ---> hello, kotlin

The results show that StateFlow will only send the latest data to subscribers. Compared with LiveData, LiveData has the concept of version. For registered subscribers, they will judge according to the version and send the historical data to the subscribers. The so-called "Stickiness". I don't think "Stickiness" is a design defect of LiveData. I think it is a feature that is really needed in many scenarios. StateFlow does not have this attribute.

Then I can't use this feature. Do I use LiveData again? The SharedFlow described below is used to solve this scenario.

3.2 SharedFlow

If you only need to manage a series of status updates (i.e. event flow), rather than managing the current status You can use SharedFlow to share streams. This API is handy if you are interested in issuing a series of values. Compared with the version control of LiveData, SharedFlow is more flexible and powerful.

There are also two types of SharedFlow: SharedFlow and MutableSharedFlow:

public interface SharedFlow<out T> : Flow<T> {
   public val replayCache: List<T>
}

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
   suspend fun emit(value: T)
   fun tryEmit(value: T): Boolean
   val subscriptionCount: StateFlow<Int>
   fun resetReplayCache()
}

SharedFlow is a stream that contains a replayCache that can be used as an atomic snapshot. Each new subscriber will get the value from the replay cache before receiving the newly issued value.

MutableSharedFlow can be used to emit values from pending or non pending contexts. As the name suggests, the replayCache of MutableSharedFlow can be reset. The number of subscribers is also exposed as Flow.

Implementing a custom MutableSharedFlow can be cumbersome. Therefore, the government provides some convenient ways to use SharedFlow:

public fun <T> MutableSharedFlow(
   replay: Int,   // When a new subscriber collects, it sends several sent data to it
   extraBufferCapacity: Int = 0,  // How much data does MutableSharedFlow cache after deducting replay
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  // There are three strategies to deal with cache overflow: lose the latest value, lose the oldest value and hang
): MutableSharedFlow<T>

The parameters of MutableSharedFlow are explained in the corresponding comments above.

3.2.1 basic use of sharedflow

class SharedFlowTest {
    private val _state = MutableSharedFlow<Int>(replay = 3, extraBufferCapacity = 2)
    val state: SharedFlow<Int> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            for (i in 0..20) {
                delay(200)
                _state.emit(i)
                println("send data: $i")
            }
        }
    }
}

fun main() = runBlocking<Unit> {
    val test: SharedFlowTest = SharedFlowTest()

    test.getApi(this) // Start getting results

    val job = launch(Dispatchers.IO) {
        delay(3000)
        test.state.collect {
            println("---collect1: $it")
        }
    }
    delay(5000)
    job.cancel()  // Cancel the task to avoid leakage
}

The output results are as follows:

send data: 0
send data: 1
send data: 2
send data: 3
send data: 4
send data: 5
send data: 6
send data: 7
send data: 8
send data: 9
send data: 10
send data: 11
send data: 12
send data: 13
---collect1: 11
---collect1: 12
---collect1: 13
send data: 14
---collect1: 14
send data: 15
---collect1: 15
send data: 16
---collect1: 16
send data: 17
---collect1: 17
send data: 18
---collect1: 18
send data: 19
---collect1: 19
send data: 20
---collect1: 20

The analysis results are as follows:
SharedFlow transmits data every 200ms, and a total of 21 data are transmitted, taking about 4s.
The replay of SharedFlow is set to 3 and the extrabuffercapacity is set to 2, that is, the cache of SharedFlow is 5. The processing policy of cache overflow is suspended by default.
The subscriber starts the mobile data after 3s. At this time, 14 pieces of data should have been transmitted, that is, 0-13, the cache of SharedFlow is 8, and the cached data is 9-13. However, only 3 old data are sent to the subscriber, that is, the value collected by the subscriber starts from 11.

3.2.2 other interfaces of mutablesharedflow

MutableSharedFlow also has the subscriptionCount attribute, which contains the number of active collectors to optimize the business logic accordingly.
MutableSharedFlow also includes a resetReplayCache function, which is used when you do not want to replay the latest information sent to the data stream.

3.3 usage scenarios of stateflow and SharedFlow

The naming of StateFlow has explained the applicable scenarios. StateFlow will only transmit the latest values to subscribers, which is suitable for monitoring states.
SharedFlow can be configured to subscribe to historical transmitted data, which is suitable for listening to events.

3.4 converting cold flow to heat flow

Use the sharedIn method to convert Flow to SharedFlow. See below for details.

Keywords: kotlin

Added by jaymoore_299 on Fri, 14 Jan 2022 23:56:19 +0200