Flow asynchronous flow
know
- characteristic
- Builder and context
- start-up
- Cancel and cancel detection
- buffer
Operator
- Transition operator
- End operator
- combination
- Flatten
abnormal
- exception handling
- complete
How to represent multiple values?
The suspend function can return a single value asynchronously, but how to return multiple calculated values asynchronously?
programme
- aggregate
- sequence
- Suspend function
Flow
Use a collection to return multiple values, but not asynchronous.private fun createList() = listOf<Int>(1, 2, 3) @Test fun test_list() { createList().forEach { println(it) } }
Use sequence to return a sequence of integers
private fun createSequence(): Sequence<Int> { return sequence { for (i in 1..3) { Thread.sleep(1000) // Pretending to be calculating, this is blocking and can't do anything else // delay(1000) the suspend function cannot be used here yield(i) } } } @Test fun test_sequence() { createSequence().forEach { println(it) } }
Look at the source code
public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence { iterator(block) }
What is passed in is an extension function of SequenceScope.
@RestrictsSuspension @SinceKotlin("1.3") public abstract class SequenceScope<in T> internal constructor()
However, RestrictsSuspension restricts the use of existing suspension functions provided in it, such as yield, deliverall, etc.
createSequence returned multiple values, but it was also synchronized.// Return multiple values, asynchronous private suspend fun createList2(): List<Int> { delay(5000) return listOf<Int>(1, 2, 3) } @Test fun test_list2() = runBlocking<Unit> { createList().forEach { println(it) } }
You can use the suspend function to return multiple values, which is asynchronous, but multiple values are returned at one time. Can you return multiple values and keep asynchronous like a stream? Flow can solve this problem.
private suspend fun createFlow(): Flow<Int> = flow { for (i in 1..3) { delay(1000) emit(i) // Emission, resulting in an element } } @Test fun test_flow() = runBlocking<Unit> { createFlow().collect { println(it) } // collect is an end operator, which will be described later }
An element is generated every 1 second, and here it is suspended. Use an example to prove:
private suspend fun createFlow(): Flow<Int> = flow { for (i in 1..3) { delay(1000) emit(i) } } @Test fun test_flow2() = runBlocking<Unit> { launch { for (i in 1..3) { println("I am running and not blocked $i") delay(1500) } } createFlow().collect { println(it) } }
output
I am running and not blocked 1 1 I am running and not blocked 2 2 I am running and not blocked 3 3 Process finished with exit code 0
The process of collect ing results does not block other processes. After printing 1, when the delay is suspended, execute other tasks. There is no blocking. The two tasks switch back and forth.
Flow really returns multiple values and is asynchronous.
Differences between Flow and other methods
- Builder function of flow type named flow
- flow{...} Code in a building block can be suspended
- The function createFlow() is no longer marked with the suspend modifier. The suspend modifier in the above code can be removed
- The stream uses the emit function to emit values
The stream uses the collect function to collect values
Flow application
In android, file download is a very typical application of Flow.
Cold flow
Flow is a sequence like cold flow. The code in the flow builder does not run until the flow is collected.
private fun createFlow2() = flow<Int> { println("Flow started.") for (i in 1..3) { delay(1000) emit(i) } } @Test fun test_flow_cold() = runBlocking<Unit> { val flow = createFlow2() println("calling collect...") flow.collect { value -> println(value) } println("calling collect again...") flow.collect { value -> println(value) } }
calling collect... Flow started. 1 2 3 calling collect again... Flow started. 1 2 3 Process finished with exit code 0
You can see that the flow starts running when the collect method is called, and can be called multiple times.
Continuity of flow
- Each individual collection of streams is performed sequentially, unless special operators are used.
From upstream to downstream, each transition operator processes each emitted value and then passes it to the end operator.
@Test fun test_flow_continuation() = runBlocking<Unit> { (1..5).asFlow() .filter { it % 2 == 0 } .map { "string $it" } .collect { println("collect $it") } }
collect string 2 collect string 4 Process finished with exit code 0
The modified example goes through the following steps: generate a stream, filter out even numbers, convert them into strings, and start collecting
Stream builder
- The flowOf builder defines a flow that emits a fixed set of values.
use. asFlow() extension function, which can convert various sets and sequences into streams.
@Test fun test_flow_builder() = runBlocking<Unit> { // flowOf builder flowOf("one", "two", "three") .onEach { delay(1000) } .collect { value -> println(value) } // asFlow extension function (1..3).asFlow().collect { value -> println(value) } }
one two three 1 2 3 Process finished with exit code 0
Flow context
- The collection of the flow always occurs in the context of the calling coroutine, and the property of the flow is saved as the context.
- flow{...} The code in the builder must follow the context to save properties and is not allowed to be emitted from other contexts.
flowOn operator, which is used to change the context of flow emission.
private fun createFlow3() = flow<Int> { println("Flow started ${Thread.currentThread()}") for (i in 1..3) { delay(1000) emit(i) } } @Test fun test_flow_context() = runBlocking<Unit> { createFlow3() .collect { println("$it, thread: ${Thread.currentThread()}") } }
Flow started Thread[main @coroutine#1,5,main] 1, thread: Thread[main @coroutine#1,5,main] 2, thread: Thread[main @coroutine#1,5,main] 3, thread: Thread[main @coroutine#1,5,main] Process finished with exit code 0
Without thread switching, the collection and construction are in the same context, and the running threads are the same.
Try changing the thread as followsprivate fun createFlow4() = flow { withContext(Dispatchers.IO) { // Using io thread println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) } } } @Test fun test_flow_on() = runBlocking { createFlow4().collect { println("collect $it, ${Thread.currentThread()}") } }
Flow started DefaultDispatcher-worker-1 @coroutine#1 java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO]. Please refer to 'flow' documentation or use 'flowOn' instead
It can be seen that the build flow is executed in the IO thread, but an error is reported when collecting the flow. This is not allowed. It is recommended to use flowOn
The correct approach is as follows:private fun createFlow5() = flow { println("Flow started ${Thread.currentThread().name}") for (i in 1..3) { delay(1000) emit(i) } }.flowOn(Dispatchers.IO) @Test fun test_flow_on2() = runBlocking { createFlow5().collect { println("collect $it, ${Thread.currentThread().name}") } }
Flow started DefaultDispatcher-worker-1 @coroutine#2 collect 1, main @coroutine#1 collect 2, main @coroutine#1 collect 3, main @coroutine#1 Process finished with exit code 0
Streams are built and emitted in the IO thread and collected in the main thread.
Start flow
By replacing collect with launchIn, the collection of flows can be started in a separate process.
private fun events() = (1..3).asFlow() .onEach { delay(1000) println("$it, ${Thread.currentThread().name}") }.flowOn(Dispatchers.Default) @Test fun testFlowLaunch() = runBlocking<Unit> { events() .onEach { e -> println("Event: $e ${Thread.currentThread().name}") } //.collect() .launchIn(CoroutineScope(Dispatchers.IO)) .join() }
1, DefaultDispatcher-worker-3 @coroutine#3 Event: 1 DefaultDispatcher-worker-1 @coroutine#2 2, DefaultDispatcher-worker-1 @coroutine#3 Event: 2 DefaultDispatcher-worker-1 @coroutine#2 3, DefaultDispatcher-worker-1 @coroutine#3 Event: 3 DefaultDispatcher-worker-2 @coroutine#2 Process finished with exit code 0
onEach is a transition operator and does not trigger data collection. collect is an end operator to trigger data collection. The transition operator is like a filter, and the end operator is like a faucet valve. Water cannot flow out without opening the valve, no matter how many filtering devices are added in the middle.
If you want to specify which collaboration to collect data in, you can use the end operator launchIn(), which can pass in a scope, and the scope can specify the scheduler, launchin (coroutine scope (dispatchers. IO))
What launchIn returns is a job object, which can be used for operations such as cancel. for example
@Test fun testFlowLaunch2() = runBlocking<Unit> { val job = events() .onEach { e -> println("Event: $e ${Thread.currentThread().name}") } .launchIn(CoroutineScope(Dispatchers.IO)) delay(2000) job.cancel() }
1, DefaultDispatcher-worker-1 @coroutine#3 Event: 1 DefaultDispatcher-worker-3 @coroutine#2 Process finished with exit code 0
As above, only one number is collected and the job is cancelled.
In fact, runBlockint itself is a main thread scope, which can be put into launchIn, as follows
@Test fun testFlowLaunch3() = runBlocking<Unit> { val job = events() .onEach { e -> println("Event: $e ${Thread.currentThread().name}") } .launchIn(this) }
1, DefaultDispatcher-worker-1 @coroutine#3 Event: 1 main @coroutine#2 2, DefaultDispatcher-worker-1 @coroutine#3 Event: 2 main @coroutine#2 3, DefaultDispatcher-worker-1 @coroutine#3 Event: 3 main @coroutine#2 Process finished with exit code 0
Cancellation of flow
The flow adopts the same cooperative cancellation as the collaboration process. The collection of streams can be cancelled when the stream is suspended in a cancelable suspend function (such as delay).
private fun createFlow6() = flow<Int> { for (i in 1..3) { delay(1000) println("emitting $i") emit(i) } } @Test fun testCancelFlow() = runBlocking<Unit> { withTimeoutOrNull(2500) { createFlow6().collect { println("collect: $it") } } println("Done.") }
emitting 1 collect: 1 emitting 2 collect: 2 Done. Process finished with exit code 0
Set the timeout of 2.5 seconds. Before the stream is launched for 3 seconds, it will timeout and the stream will be cancelled.
Cancel detection of stream
- For convenience, the flow builder performs additional ensureActive detection on each emission value to cancel, which means from flow {...} The busy cycle issued can be cancelled.
- For performance reasons, most other flow operations will not perform other cancellation detection by themselves. When the collaboration is in a busy cycle, it must be clearly detected whether to cancel.
The operation is performed through the cancelable operator.
private fun createFlow7() = flow<Int> { for (i in 1..5) { delay(1000) println("emitting $i") emit(i) } } @Test fun testCancelFlowCheck() = runBlocking<Unit> { createFlow7().collect { if (it == 3) cancel() println("collect: $it") } println("Done.") }
emitting 1 collect: 1 emitting 2 collect: 2 emitting 3 collect: 3 kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled ... Process finished with exit code 255
When collecting streams, cancel the operation when 3 is encountered and throw jobcancelationexception. 3 will still be collected.
@Test fun testCancelFlowCheck2() = runBlocking<Unit> { (1..5).asFlow().collect { if (it == 3) cancel() println("collect: $it") } println("Done.") }
collect: 1 collect: 2 collect: 3 collect: 4 collect: 5 Done. kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
Use asFlow() to create a stream. When collecting, although it is cancelled when encountering 3, it still prints all the elements before throwing an exception. If you want to really block the flow during execution, you need to add the cancelable() operation, as follows:
@Test fun testCancelFlowCheck3() = runBlocking<Unit> { (1..5).asFlow().cancellable().collect { if (it == 3) cancel() println("collect: $it") } println("Done.") }
collect: 1 collect: 2 collect: 3 kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
Back pressure
- buffer(), which concurrently runs the code that emits elements in the stream.
- conflate(), which merges the emitted items without processing each value.
- Collectlast(), cancels and re emits the last value.
- When the CoroutineDispatcher must be changed, the flowOn operator uses the same buffering mechanism, but the buffer function explicitly requests buffering without changing the execution context.
private fun createFlow8() = flow<Int> { for (i in 1..5) { delay(100) // It takes 0.1 seconds to produce an element println("emitting $i") emit(i) } } @Test fun testBackPressure() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .buffer(50) // Emitting elements in concurrent running streams .collect { delay(200) // It takes 0.2 seconds to consume an element println("collect: $it") } } println("Done, total $time") }
emitting 1 emitting 2 collect: 1 emitting 3 emitting 4 collect: 2 emitting 5 collect: 3 collect: 4 collect: 5 Done, total 1188
Using buffer can make transmitting elements execute concurrently and improve efficiency.
Using flowOn() to switch threads can also improve efficiency.
private fun createFlow8() = flow<Int> { for (i in 1..5) { delay(100) // It takes 0.1 seconds to produce an element println("emitting $i, ${Thread.currentThread().name}") emit(i) } } @Test fun testBackPressure2() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .flowOn(Dispatchers.Default) .collect { delay(200) // It takes 0.2 seconds to consume an element println("collect: $it, ${Thread.currentThread().name}") } } println("Done, total $time") }
emitting 1, DefaultDispatcher-worker-1 @coroutine#2 emitting 2, DefaultDispatcher-worker-1 @coroutine#2 emitting 3, DefaultDispatcher-worker-1 @coroutine#2 collect: 1, main @coroutine#1 emitting 4, DefaultDispatcher-worker-1 @coroutine#2 collect: 2, main @coroutine#1 emitting 5, DefaultDispatcher-worker-1 @coroutine#2 collect: 3, main @coroutine#1 collect: 4, main @coroutine#1 collect: 5, main @coroutine#1 Done, total 1186
conflate() can merge emitted items, but does not process each value.
@Test fun testBackPressure3() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .conflate() .collect { delay(200) // It takes 0.2 seconds to consume an element println("collect: $it, ${Thread.currentThread().name}") } } println("Done, total $time") }
emitting 1, main @coroutine#2 emitting 2, main @coroutine#2 emitting 3, main @coroutine#2 collect: 1, main @coroutine#1 emitting 4, main @coroutine#2 collect: 3, main @coroutine#1 emitting 5, main @coroutine#2 collect: 4, main @coroutine#1 collect: 5, main @coroutine#1 Done, total 1016
In the above example, 2. 0 is skipped when using configure() collect
Using collectlast() will only collect the last value, as follows:
@Test fun testBackPressure4() = runBlocking<Unit> { val time = measureTimeMillis { createFlow8() .collectLatest { delay(200) // It takes 0.2 seconds to consume an element println("collect: $it, ${Thread.currentThread().name}") } } println("Done, total $time") }
emitting 1, main @coroutine#2 emitting 2, main @coroutine#2 emitting 3, main @coroutine#2 emitting 4, main @coroutine#2 emitting 5, main @coroutine#2 collect: 5, main @coroutine#7 Done, total 913 Process finished with exit code 0
Operator
Transition flow operator
- You can use operators to transform streams, just as you can use sets and sequences.
- The transition operator applies to the upstream flow and returns the downstream flow.
- These operators are also cold operators, just like flow. Such operators are not themselves suspended functions.
It runs very fast and returns the definition of a new transformation flow.
private fun createFlow9() = flow<Int> { for (i in 1..3) { delay(100) // It takes 0.1 seconds to produce an element println("emitting $i") emit(i) } } @Test fun testMap() = runBlocking<Unit> { createFlow9() .map { data -> performRequest(data) } .collect { println("collect: $it") } }
emitting 1 collect: --response 1-- emitting 2 collect: --response 2-- emitting 3 collect: --response 3-- Process finished with exit code 0
In the above example, the map operator turns Int into a String stream.
@Test fun testTransform() = runBlocking<Unit> { createFlow9() .transform { data -> emit("making request $data") emit(performRequest(data)) } .collect { println("collect: $it") } }
emitting 1 collect: making request 1 collect: --response 1-- emitting 2 collect: making request 2 collect: --response 2-- emitting 3 collect: making request 3 collect: --response 3-- Process finished with exit code 0
In the above example, the transform operator can convert and emit the stream multiple times.
Length limiting operator
take operator
private fun numbers() = flow<Int> { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally.") } } @Test fun testLimitOperator() = runBlocking { numbers().take(2).collect { println("collect $it") } }
collect 1 collect 2 Finally.
If take passes in parameter 2, only 2 data will be taken.
End flow operator
The end operator is a pending function on a stream that starts stream collection. collect is the most basic end operator, but there are some more convenient end operators:
- Into various sets, such as toList and toSet
- The operator that gets the first value and ensures that the stream emits a single value.
reduce and fold the flow to a single value.
For example, the reduce operator@Test fun testTerminateOperator() = runBlocking { val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b } println(sum) }
55
Calculate the square of numbers 1-5, and then sum to get 55
Combine multiple streams
Like sequence in the Kotlin standard library Like the zip extension function, a stream has a zip operator to combine the related values in the two streams.
@Test fun testZip() = runBlocking { val numbers = (1..3).asFlow() val strings = flowOf("One", "Two", "Three") numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println(it) } }
This example combines the digital stream and string stream with the zip operator to form a character stream.
1 -> One 2 -> Two 3 -> Three Process finished with exit code 0
@Test fun testZip2() = runBlocking { val numbers = (1..3).asFlow().onEach { delay(300) } val strings = flowOf("One", "Two", "Three").onEach { delay(500) } val start = System.currentTimeMillis() numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println("$it, ${System.currentTimeMillis() - start}") } }
If two streams each have a delay, the merge operation will wait for the data with a longer delay.
1 -> One, 563 2 -> Two, 1065 3 -> Three, 1569 Process finished with exit code 0
Advection flow
A stream represents a value sequence received asynchronously, so it is easy to encounter such a situation: each value will trigger a request for another value sequence. However, due to the asynchronous nature of the stream, different flattening modes are required. Therefore, there are a series of flow flattening operators:
- flatMapConcat connection mode
- flatMapMerge merge mode
- Latest flattening mode of flatMapLatest
Using flatMapConcat connection mode
private fun requestFlow(i: Int) = flow<String> { emit("$i: First") delay(500) emit("$i: Second") } @Test fun testFlatMapConcat() = runBlocking { val startTime = System.currentTimeMillis() (1..3).asFlow() .onEach { delay(100) } //. map {requestflow (it)} / / if map is used, a flow < flow < string > > .flatMapConcat { requestFlow(it) } // Use flatMapConcat to flatten the flow into one dimension to achieve the effect .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") } }
1: First at 144 ms 1: Second at 649 ms 2: First at 754 ms 2: Second at 1256 ms 3: First at 1361 ms 3: Second at 1861 ms Process finished with exit code 0
Using flatMapMerge
@Test fun testFlatMapMergeConcat() = runBlocking { val startTime = System.currentTimeMillis() (1..3).asFlow() .onEach { delay(100) } .flatMapMerge { requestFlow(it) } .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") } }
1: First at 202 ms 2: First at 301 ms 3: First at 407 ms 1: Second at 708 ms 2: Second at 805 ms 3: Second at 927 ms Process finished with exit code 0
After launching 1:First, the delay is 500ms. During this period, launch 2:First and launch 3:First; These data are collected separately, and then the rest of the data are cumulatively transmitted and collected.
Let's look at the flatMapLatest operator
private fun requestFlow(i: Int) = flow<String> { emit("$i: First") delay(500) emit("$i: Second") } @Test fun testFlatMapLatestConcat() = runBlocking { val startTime = System.currentTimeMillis() (1..3).asFlow() .onEach { delay(200) } .flatMapLatest { requestFlow(it) } .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") } }
1: First at 313 ms 2: First at 581 ms 3: First at 786 ms 3: Second at 1291 ms Process finished with exit code 0
Skip some intermediate values and collect only the latest values.
Exception handling of stream
When the emitter or code in the operator throws an exception, there are several methods to handle the exception:
- try catch block
catch function
Catch downstream exceptions using code blocksprivate fun createFlow10() = flow<Int> { for (i in 1..3) { println("emitting $i") emit(i) } } @Test fun testException() = runBlocking { try { createFlow10().collect { println("collect: $it") check(it <= 1) { "wrong value " } } } catch (e: Exception) { println("handle the exception: $e") } }
emitting 1 collect: 1 emitting 2 collect: 2 handle the exception: java.lang.IllegalStateException: wrong value Process finished with exit code 0
Catch upstream exceptions using the catch operator
@Test fun testException2() = runBlocking { flow { emit(1) 1/0 emit(2) } .catch { println("$it") } .flowOn(Dispatchers.IO) .collect { println("collect: $it") } }
java.lang.ArithmeticException: / by zero collect: 1 Process finished with exit code 0
You can transmit a supplementary data after capturing an exception
@Test fun testException2() = runBlocking { flow { emit(1) 1/0 emit(2) } .catch { println("$it") emit(10) } .flowOn(Dispatchers.IO) .collect { println("collect: $it") } }
java.lang.ArithmeticException: / by zero collect: 1 collect: 10 Process finished with exit code 0
Of course, the latter 2 can't be received.
Completion of flow
When the stream collection is complete (normal or abnormal), it may need to perform an action.
- Imperative finally block
onCompletion declarative processing
finally blockprivate fun createFlow11() = flow<Int> { emit(1) 1 / 0 emit(2) } @Test fun testComplete() = runBlocking { try { createFlow11().collect { println("collect $it") } } catch (e: Exception) { println("exception: $e") } finally { println("Finally.") } }
collect 1 exception: java.lang.ArithmeticException: / by zero Finally. Process finished with exit code 0
@Test fun testComplete2() = runBlocking { createFlow11() .onCompletion { println("exception: $it") } .collect { println("collect $it") } }
collect 1 exception: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
onCompletion can receive exceptions, but it cannot catch exceptions. If you want to catch exceptions, you also need a catch operator.