Kotlin's Flow practice

Flow asynchronous flow

  • know

    • characteristic
    • Builder and context
    • start-up
    • Cancel and cancel detection
    • buffer
  • Operator

    • Transition operator
    • End operator
    • combination
    • Flatten
  • abnormal

    • exception handling
    • complete

How to represent multiple values?
The suspend function can return a single value asynchronously, but how to return multiple calculated values asynchronously?

programme

  • aggregate
  • sequence
  • Suspend function
  • Flow
    Use a collection to return multiple values, but not asynchronous.

    private fun createList() = listOf<Int>(1, 2, 3)
    
    @Test
    fun test_list() {
      createList().forEach { println(it) }
    }

    Use sequence to return a sequence of integers

    private fun createSequence(): Sequence<Int> {
      return sequence {
          for (i in 1..3) {
              Thread.sleep(1000) // Pretending to be calculating, this is blocking and can't do anything else
              // delay(1000) the suspend function cannot be used here
              yield(i)
          }
      }
    }
    
    @Test
    fun test_sequence() {
      createSequence().forEach { println(it) }
    }

    Look at the source code

    public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence { iterator(block) }

    What is passed in is an extension function of SequenceScope.

    @RestrictsSuspension
    @SinceKotlin("1.3")
    public abstract class SequenceScope<in T> internal constructor()

    However, RestrictsSuspension restricts the use of existing suspension functions provided in it, such as yield, deliverall, etc.
    createSequence returned multiple values, but it was also synchronized.

    // Return multiple values, asynchronous
    private suspend fun createList2(): List<Int> {
      delay(5000)
      return listOf<Int>(1, 2, 3)
    }
    
    @Test
    fun test_list2() = runBlocking<Unit> {
      createList().forEach { println(it) }
    }

    You can use the suspend function to return multiple values, which is asynchronous, but multiple values are returned at one time. Can you return multiple values and keep asynchronous like a stream? Flow can solve this problem.

    private suspend fun createFlow(): Flow<Int> = flow {
      for (i in 1..3) {
          delay(1000)
          emit(i) // Emission, resulting in an element
      }
    }
    
    @Test
    fun test_flow() = runBlocking<Unit> {
      createFlow().collect { println(it) } // collect is an end operator, which will be described later
    }

    An element is generated every 1 second, and here it is suspended. Use an example to prove:

      private suspend fun createFlow(): Flow<Int> = flow {
          for (i in 1..3) {
              delay(1000)
              emit(i)
          }
      }
      @Test
      fun test_flow2() = runBlocking<Unit> {
          launch {
              for (i in 1..3) {
                  println("I am running and not blocked $i")
                  delay(1500)
              }
          }
          createFlow().collect { println(it) }
      }

    output

    I am running and not blocked 1
    1
    I am running and not blocked 2
    2
    I am running and not blocked 3
    3
    
    Process finished with exit code 0

    The process of collect ing results does not block other processes. After printing 1, when the delay is suspended, execute other tasks. There is no blocking. The two tasks switch back and forth.
    Flow really returns multiple values and is asynchronous.

Differences between Flow and other methods

  • Builder function of flow type named flow
  • flow{...} Code in a building block can be suspended
  • The function createFlow() is no longer marked with the suspend modifier. The suspend modifier in the above code can be removed
  • The stream uses the emit function to emit values
  • The stream uses the collect function to collect values

    Flow application

    In android, file download is a very typical application of Flow.

Cold flow

Flow is a sequence like cold flow. The code in the flow builder does not run until the flow is collected.

private fun createFlow2() = flow<Int> {
    println("Flow started.")
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}
@Test
fun test_flow_cold() = runBlocking<Unit> {
    val flow = createFlow2()
    println("calling collect...")
    flow.collect { value -> println(value) }
    println("calling collect again...")
    flow.collect { value -> println(value) }
}
calling collect...
Flow started.
1
2
3
calling collect again...
Flow started.
1
2
3

Process finished with exit code 0

You can see that the flow starts running when the collect method is called, and can be called multiple times.

Continuity of flow

  • Each individual collection of streams is performed sequentially, unless special operators are used.
  • From upstream to downstream, each transition operator processes each emitted value and then passes it to the end operator.

    @Test
    fun test_flow_continuation() = runBlocking<Unit> {
      (1..5).asFlow()
          .filter { it % 2 == 0 }
          .map { "string $it" }
          .collect { println("collect $it") }
    }
    collect string 2
    collect string 4
    
    Process finished with exit code 0

    The modified example goes through the following steps: generate a stream, filter out even numbers, convert them into strings, and start collecting

    Stream builder

  • The flowOf builder defines a flow that emits a fixed set of values.
  • use. asFlow() extension function, which can convert various sets and sequences into streams.

    @Test
    fun test_flow_builder() = runBlocking<Unit> {
      // flowOf builder
      flowOf("one", "two", "three")
          .onEach { delay(1000) }
          .collect { value -> println(value) }
      // asFlow extension function
      (1..3).asFlow().collect { value -> println(value) }
    }
    one
    two
    three
    1
    2
    3
    
    Process finished with exit code 0

    Flow context

  • The collection of the flow always occurs in the context of the calling coroutine, and the property of the flow is saved as the context.
  • flow{...} The code in the builder must follow the context to save properties and is not allowed to be emitted from other contexts.
  • flowOn operator, which is used to change the context of flow emission.

    private fun createFlow3() = flow<Int> {
      println("Flow started ${Thread.currentThread()}")
      for (i in 1..3) {
          delay(1000)
          emit(i)
      }
    }
    
    @Test
    fun test_flow_context() = runBlocking<Unit> {
      createFlow3()
          .collect {
              println("$it, thread: ${Thread.currentThread()}")
          }
    }
    Flow started Thread[main @coroutine#1,5,main]
    1, thread: Thread[main @coroutine#1,5,main]
    2, thread: Thread[main @coroutine#1,5,main]
    3, thread: Thread[main @coroutine#1,5,main]
    
    Process finished with exit code 0

    Without thread switching, the collection and construction are in the same context, and the running threads are the same.
    Try changing the thread as follows

    private fun createFlow4() = flow {
      withContext(Dispatchers.IO) { // Using io thread
          println("Flow started ${Thread.currentThread().name}")
          for (i in 1..3) {
              delay(1000)
              emit(i)
          }
      }
    }
    
    @Test
    fun test_flow_on() = runBlocking {
      createFlow4().collect {
          println("collect $it, ${Thread.currentThread()}")
      }
    }
    Flow started DefaultDispatcher-worker-1 @coroutine#1
    
    java.lang.IllegalStateException: Flow invariant is violated:
          Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@4600ac86, BlockingEventLoop@1e1d1f06],
          but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79c0c2ed, Dispatchers.IO].
          Please refer to 'flow' documentation or use 'flowOn' instead

    It can be seen that the build flow is executed in the IO thread, but an error is reported when collecting the flow. This is not allowed. It is recommended to use flowOn
    The correct approach is as follows:

    private fun createFlow5() = flow {
      println("Flow started ${Thread.currentThread().name}")
      for (i in 1..3) {
          delay(1000)
          emit(i)
      }
    }.flowOn(Dispatchers.IO)
    
    @Test
    fun test_flow_on2() = runBlocking {
      createFlow5().collect {
          println("collect $it, ${Thread.currentThread().name}")
      }
    }
    Flow started DefaultDispatcher-worker-1 @coroutine#2
    collect 1, main @coroutine#1
    collect 2, main @coroutine#1
    collect 3, main @coroutine#1
    
    Process finished with exit code 0

    Streams are built and emitted in the IO thread and collected in the main thread.

    Start flow

  • By replacing collect with launchIn, the collection of flows can be started in a separate process.

      private fun events() = (1..3).asFlow()
          .onEach {
              delay(1000)
              println("$it, ${Thread.currentThread().name}")
          }.flowOn(Dispatchers.Default)
    
    
      @Test
      fun testFlowLaunch() = runBlocking<Unit> {
          events()
              .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }
              //.collect()
              .launchIn(CoroutineScope(Dispatchers.IO))
              .join()
      }
    1, DefaultDispatcher-worker-3 @coroutine#3
    Event: 1 DefaultDispatcher-worker-1 @coroutine#2
    2, DefaultDispatcher-worker-1 @coroutine#3
    Event: 2 DefaultDispatcher-worker-1 @coroutine#2
    3, DefaultDispatcher-worker-1 @coroutine#3
    Event: 3 DefaultDispatcher-worker-2 @coroutine#2
    
    Process finished with exit code 0

    onEach is a transition operator and does not trigger data collection. collect is an end operator to trigger data collection. The transition operator is like a filter, and the end operator is like a faucet valve. Water cannot flow out without opening the valve, no matter how many filtering devices are added in the middle.
    If you want to specify which collaboration to collect data in, you can use the end operator launchIn(), which can pass in a scope, and the scope can specify the scheduler, launchin (coroutine scope (dispatchers. IO))

What launchIn returns is a job object, which can be used for operations such as cancel. for example

@Test
fun testFlowLaunch2() = runBlocking<Unit> {
    val job = events()
        .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }
        .launchIn(CoroutineScope(Dispatchers.IO))
    delay(2000)
    job.cancel()
}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 DefaultDispatcher-worker-3 @coroutine#2

Process finished with exit code 0

As above, only one number is collected and the job is cancelled.
In fact, runBlockint itself is a main thread scope, which can be put into launchIn, as follows

@Test
fun testFlowLaunch3() = runBlocking<Unit> {
    val job = events()
        .onEach { e -> println("Event: $e ${Thread.currentThread().name}") }
        .launchIn(this)
}
1, DefaultDispatcher-worker-1 @coroutine#3
Event: 1 main @coroutine#2
2, DefaultDispatcher-worker-1 @coroutine#3
Event: 2 main @coroutine#2
3, DefaultDispatcher-worker-1 @coroutine#3
Event: 3 main @coroutine#2

Process finished with exit code 0

Cancellation of flow

  • The flow adopts the same cooperative cancellation as the collaboration process. The collection of streams can be cancelled when the stream is suspended in a cancelable suspend function (such as delay).

      private fun createFlow6() = flow<Int> {
          for (i in 1..3) {
              delay(1000)
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testCancelFlow() = runBlocking<Unit> {
          withTimeoutOrNull(2500) {
              createFlow6().collect {
                  println("collect: $it")
              }
          }
          println("Done.")
      }
    emitting 1
    collect: 1
    emitting 2
    collect: 2
    Done.
    
    Process finished with exit code 0

    Set the timeout of 2.5 seconds. Before the stream is launched for 3 seconds, it will timeout and the stream will be cancelled.

    Cancel detection of stream

  • For convenience, the flow builder performs additional ensureActive detection on each emission value to cancel, which means from flow {...} The busy cycle issued can be cancelled.
  • For performance reasons, most other flow operations will not perform other cancellation detection by themselves. When the collaboration is in a busy cycle, it must be clearly detected whether to cancel.
  • The operation is performed through the cancelable operator.

      private fun createFlow7() = flow<Int> {
          for (i in 1..5) {
              delay(1000)
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testCancelFlowCheck() = runBlocking<Unit> {
          createFlow7().collect {
              if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    emitting 1
    collect: 1
    emitting 2
    collect: 2
    emitting 3
    collect: 3
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
    ...
    
    Process finished with exit code 255

    When collecting streams, cancel the operation when 3 is encountered and throw jobcancelationexception. 3 will still be collected.

      @Test
      fun testCancelFlowCheck2() = runBlocking<Unit> {
          (1..5).asFlow().collect {
              if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    collect: 1
    collect: 2
    collect: 3
    collect: 4
    collect: 5
    Done.
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    Use asFlow() to create a stream. When collecting, although it is cancelled when encountering 3, it still prints all the elements before throwing an exception. If you want to really block the flow during execution, you need to add the cancelable() operation, as follows:

      @Test
      fun testCancelFlowCheck3() = runBlocking<Unit> {
          (1..5).asFlow().cancellable().collect {
              if (it == 3) cancel()
              println("collect: $it")
          }
          println("Done.")
      }
    collect: 1
    collect: 2
    collect: 3
    
    kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

    Back pressure

  • buffer(), which concurrently runs the code that emits elements in the stream.
  • conflate(), which merges the emitted items without processing each value.
  • Collectlast(), cancels and re emits the last value.
  • When the CoroutineDispatcher must be changed, the flowOn operator uses the same buffering mechanism, but the buffer function explicitly requests buffering without changing the execution context.
    private fun createFlow8() = flow<Int> {
        for (i in 1..5) {
            delay(100) // It takes 0.1 seconds to produce an element
            println("emitting $i")
            emit(i)
        }
    }

    @Test
    fun testBackPressure() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .buffer(50) // Emitting elements in concurrent running streams
                .collect {
                    delay(200) // It takes 0.2 seconds to consume an element
                    println("collect: $it")
                }
        }

        println("Done, total $time")
    }
emitting 1
emitting 2
collect: 1
emitting 3
emitting 4
collect: 2
emitting 5
collect: 3
collect: 4
collect: 5
Done, total 1188

Using buffer can make transmitting elements execute concurrently and improve efficiency.
Using flowOn() to switch threads can also improve efficiency.

    private fun createFlow8() = flow<Int> {
        for (i in 1..5) {
            delay(100) // It takes 0.1 seconds to produce an element
            println("emitting $i, ${Thread.currentThread().name}")
            emit(i)
        }
    }

    @Test
    fun testBackPressure2() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .flowOn(Dispatchers.Default)
                .collect {
                    delay(200) // It takes 0.2 seconds to consume an element
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, DefaultDispatcher-worker-1 @coroutine#2
emitting 2, DefaultDispatcher-worker-1 @coroutine#2
emitting 3, DefaultDispatcher-worker-1 @coroutine#2
collect: 1, main @coroutine#1
emitting 4, DefaultDispatcher-worker-1 @coroutine#2
collect: 2, main @coroutine#1
emitting 5, DefaultDispatcher-worker-1 @coroutine#2
collect: 3, main @coroutine#1
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1186

conflate() can merge emitted items, but does not process each value.

    @Test
    fun testBackPressure3() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .conflate()
                .collect {
                    delay(200) // It takes 0.2 seconds to consume an element
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
collect: 1, main @coroutine#1
emitting 4, main @coroutine#2
collect: 3, main @coroutine#1
emitting 5, main @coroutine#2
collect: 4, main @coroutine#1
collect: 5, main @coroutine#1
Done, total 1016

In the above example, 2. 0 is skipped when using configure() collect
Using collectlast() will only collect the last value, as follows:

    @Test
    fun testBackPressure4() = runBlocking<Unit> {
        val time = measureTimeMillis {
            createFlow8()
                .collectLatest {
                    delay(200) // It takes 0.2 seconds to consume an element
                    println("collect: $it, ${Thread.currentThread().name}")
                }
        }

        println("Done, total $time")
    }
emitting 1, main @coroutine#2
emitting 2, main @coroutine#2
emitting 3, main @coroutine#2
emitting 4, main @coroutine#2
emitting 5, main @coroutine#2
collect: 5, main @coroutine#7
Done, total 913

Process finished with exit code 0

Operator

Transition flow operator

  • You can use operators to transform streams, just as you can use sets and sequences.
  • The transition operator applies to the upstream flow and returns the downstream flow.
  • These operators are also cold operators, just like flow. Such operators are not themselves suspended functions.
  • It runs very fast and returns the definition of a new transformation flow.

      private fun createFlow9() = flow<Int> {
          for (i in 1..3) {
              delay(100) // It takes 0.1 seconds to produce an element
              println("emitting $i")
              emit(i)
          }
      }
    
      @Test
      fun testMap() = runBlocking<Unit> {
          createFlow9()
              .map { data -> performRequest(data) }
              .collect {
                  println("collect: $it")
              }
      }
    emitting 1
    collect: --response 1--
    emitting 2
    collect: --response 2--
    emitting 3
    collect: --response 3--
    
    Process finished with exit code 0

    In the above example, the map operator turns Int into a String stream.

      @Test
      fun testTransform() = runBlocking<Unit> {
          createFlow9()
              .transform { data ->
                  emit("making request $data")
                  emit(performRequest(data))
              }
              .collect {
                  println("collect: $it")
              }
      }
    emitting 1
    collect: making request 1
    collect: --response 1--
    emitting 2
    collect: making request 2
    collect: --response 2--
    emitting 3
    collect: making request 3
    collect: --response 3--
    
    Process finished with exit code 0

    In the above example, the transform operator can convert and emit the stream multiple times.

    Length limiting operator

    take operator

      private fun numbers() = flow<Int> {
          try {
              emit(1)
              emit(2)
              println("This line will not execute")
              emit(3)
          } finally {
              println("Finally.")
          }
      }
    
      @Test
      fun testLimitOperator() = runBlocking {
          numbers().take(2).collect {
              println("collect $it")
          }
      }
    collect 1
    collect 2
    Finally.

    If take passes in parameter 2, only 2 data will be taken.

    End flow operator

    The end operator is a pending function on a stream that starts stream collection. collect is the most basic end operator, but there are some more convenient end operators:

  • Into various sets, such as toList and toSet
  • The operator that gets the first value and ensures that the stream emits a single value.
  • reduce and fold the flow to a single value.
    For example, the reduce operator

      @Test
      fun testTerminateOperator() = runBlocking {
          val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
          println(sum)
      }
    55

    Calculate the square of numbers 1-5, and then sum to get 55

    Combine multiple streams

    Like sequence in the Kotlin standard library Like the zip extension function, a stream has a zip operator to combine the related values in the two streams.

      @Test
      fun testZip() = runBlocking {
          val numbers = (1..3).asFlow()
          val strings = flowOf("One", "Two", "Three")
          numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println(it) }
      }

    This example combines the digital stream and string stream with the zip operator to form a character stream.

1 -> One
2 -> Two
3 -> Three

Process finished with exit code 0
    @Test
    fun testZip2() = runBlocking {
        val numbers = (1..3).asFlow().onEach { delay(300) }
        val strings = flowOf("One", "Two", "Three").onEach { delay(500) }
        val start = System.currentTimeMillis()
        numbers.zip(strings) { a, b -> "$a -> $b" }.collect { println("$it, ${System.currentTimeMillis() - start}") }
    }

If two streams each have a delay, the merge operation will wait for the data with a longer delay.

1 -> One, 563
2 -> Two, 1065
3 -> Three, 1569

Process finished with exit code 0

Advection flow

A stream represents a value sequence received asynchronously, so it is easy to encounter such a situation: each value will trigger a request for another value sequence. However, due to the asynchronous nature of the stream, different flattening modes are required. Therefore, there are a series of flow flattening operators:

  • flatMapConcat connection mode
  • flatMapMerge merge mode
  • Latest flattening mode of flatMapLatest

Using flatMapConcat connection mode

    private fun requestFlow(i: Int) = flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")
    }

    @Test
    fun testFlatMapConcat() = runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            //. map {requestflow (it)} / / if map is used, a flow < flow < string > >
            .flatMapConcat { requestFlow(it) } // Use flatMapConcat to flatten the flow into one dimension to achieve the effect
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 144 ms
1: Second at 649 ms
2: First at 754 ms
2: Second at 1256 ms
3: First at 1361 ms
3: Second at 1861 ms

Process finished with exit code 0

Using flatMapMerge

    @Test
    fun testFlatMapMergeConcat() = runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            .flatMapMerge { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 202 ms
2: First at 301 ms
3: First at 407 ms
1: Second at 708 ms
2: Second at 805 ms
3: Second at 927 ms

Process finished with exit code 0

After launching 1:First, the delay is 500ms. During this period, launch 2:First and launch 3:First; These data are collected separately, and then the rest of the data are cumulatively transmitted and collected.
Let's look at the flatMapLatest operator

    private fun requestFlow(i: Int) = flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")
    }
    
    @Test
    fun testFlatMapLatestConcat() = runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(200) }
            .flatMapLatest { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms") }
    }
1: First at 313 ms
2: First at 581 ms
3: First at 786 ms
3: Second at 1291 ms

Process finished with exit code 0

Skip some intermediate values and collect only the latest values.

Exception handling of stream

When the emitter or code in the operator throws an exception, there are several methods to handle the exception:

  • try catch block
  • catch function
    Catch downstream exceptions using code blocks

      private fun createFlow10() = flow<Int> {
          for (i in 1..3) {
              println("emitting  $i")
              emit(i)
          }
      }
    
      @Test
      fun testException() = runBlocking {
          try {
              createFlow10().collect {
                  println("collect: $it")
                  check(it <= 1) { "wrong value " }
              }
          } catch (e: Exception) {
              println("handle the exception: $e")
          }
      }
    emitting  1
    collect: 1
    emitting  2
    collect: 2
    handle the exception: java.lang.IllegalStateException: wrong value 
    
    Process finished with exit code 0

    Catch upstream exceptions using the catch operator

      @Test
      fun testException2() = runBlocking {
          flow {
              emit(1)
              1/0
              emit(2)
          }
              .catch { println("$it") }
              .flowOn(Dispatchers.IO)
              .collect { println("collect: $it") }
      }
    java.lang.ArithmeticException: / by zero
    collect: 1
    
    Process finished with exit code 0

    You can transmit a supplementary data after capturing an exception

      @Test
      fun testException2() = runBlocking {
          flow {
              emit(1)
              1/0
              emit(2)
          }
              .catch {
                  println("$it")
                  emit(10)
              }
              .flowOn(Dispatchers.IO)
              .collect { println("collect: $it") }
      }
    java.lang.ArithmeticException: / by zero
    collect: 1
    collect: 10
    
    Process finished with exit code 0

    Of course, the latter 2 can't be received.

    Completion of flow

    When the stream collection is complete (normal or abnormal), it may need to perform an action.

  • Imperative finally block
  • onCompletion declarative processing
    finally block

      private fun createFlow11() = flow<Int> {
          emit(1)
          1 / 0
          emit(2)
      }
    
      @Test
      fun testComplete() = runBlocking {
          try {
              createFlow11().collect { println("collect $it") }
          } catch (e: Exception) {
              println("exception: $e")
          } finally {
              println("Finally.")
          }
      }
    
    collect 1
    exception: java.lang.ArithmeticException: / by zero
    Finally.
    
    Process finished with exit code 0
      @Test
      fun testComplete2() = runBlocking {
          createFlow11()
              .onCompletion { println("exception: $it") }
              .collect { println("collect $it") }
      }
    collect 1
    exception: java.lang.ArithmeticException: / by zero
    
    java.lang.ArithmeticException: / by zero

    onCompletion can receive exceptions, but it cannot catch exceptions. If you want to catch exceptions, you also need a catch operator.

Keywords: Android kotlin coroutine

Added by Mohit_Prog on Wed, 05 Jan 2022 19:11:37 +0200