Kill RxJava series -- 2 Handwritten FlowBus replaces RxBus/EventBus/LiveDataBus

Lack of LiveData

  • LiveData is an observable data storage class dedicated to Android with autonomous life cycle awareness. It is deliberately simplified in design, which makes it easy for developers to start, but its shortcomings are as follows:
  1. LiveData can only update data in the main thread (the underlying layer of postValue is also switched to the main thread, and there may be data loss);
  2. The LiveData operator is not powerful enough. For complex interactive data Flow scenarios, it is recommended to use RxJava or Flow;
  3. LiveData is closely connected with the Android platform. Although LiveData works well in the presentation layer, it is not suitable for the domain layer, because the domain layer is better independent of the platform;
  • LiveData is still a viable solution for Java developers, beginners or some simple scenarios. For MVVM architecture, View and ViewModel can interact through LiveData (StateFlow can also be used after reading below), and ViewModel and Repository can interact through Flow;

Deficiencies of RxJava

  • RxJava is still quite powerful. Chain calls based on event flow, time-consuming tasks and thread switching are a good asynchronous operation library, but it also has some shortcomings for Android development
  1. Powerful means complex, and its wide range of operators is a nightmare for beginners;
  2. It is unofficial, and google will not make great efforts to promote and optimize it;
  3. Additional increase in package volume for the project;

Flow

  • Flow is a "cold stream". "Cold flow" is a data source, The producer of this kind of data source will execute when each listener starts consuming events (that is, if it does not consume, it will not produce data, and the sender of LiveData does not depend on the receiver), so as to create a new data stream on each subscription (when there are multiple subscribers, their respective events are independent). Once the consumer stops listening or the producer's blocking ends, the data flow will be automatically closed.
  • Flow is the product of the combination of Kotlin collaborative process and responsive programming model. It supports thread switching and back pressure, and provides automatic cleaning function through the collaborative process cancellation function. Therefore, it tends to perform some heavy tasks.
  • Using take, first, toList and other operators can simplify the relevant code testing of Flow.
  • Flow itself does not understand the Android life cycle, nor does it provide automatic pause and recovery of collectors when the Android life cycle state changes. You can use the extension of LifecycleCoroutineScope, such as launchWhenStarted, to start coroutine to collect our flow - these collectors will automatically pause and recover synchronously with the Lifecycle of components.
  • Compared with Channel, the Flow end operator will trigger the execution of the data Flow, and will decide whether to successfully complete the operation or throw an exception according to the Flow operation on the producer's side. Therefore, Flows will automatically close the data Flow and will not leak resources on the producer's side; Once the Channel is not shut down properly, the producer may not clean up large resources, so Channels are more likely to cause resource leakage.

Some common operators of Flow

//        val flow = flowOf(1,2,3,4,5)
//        val flow: Flow<Int> = flow {
//            List(20) {
//                emit(it) / / send data
//                delay(300)
//            }
//        }
val flow = (1..10).asFlow()
lifecycleScope.launch {
    flow.flowOn(Dispatchers.IO)//Set the scheduler used when it runs. The set scheduler only affects its previous operations
        .onStart { log("onStart") }
        .flowOn(Dispatchers.Main)
        .onEach {
            log("onEach:$it")
            delay(300)
        }
        .filter {//filter
            it % 2 == 0
        }
        .map {//Transformation
            log("map:$it*$it")
            it * it
        }
        .transform<Int,String> {
            "num=$it"
//                    emit("num1=$it")
//                    emit("num2=$it")
        }
        .flowOn(Dispatchers.IO)
        .onCompletion {//When the subscription flow is completed, execute the logic when the flow is completed
            log("onCompletion: $it")
        }
        .catch {//Catch exceptions of Flow. Catch function can only catch exceptions upstream of Flow
            log("catch: $it")
        }
        .flowOn(Dispatchers.Main)
        .collect {//Consumption Flow
            log("collect1_1: $it")
        }
    //Flow can be consumed repeatedly
    flow.collect { log("collect1_2: $it") }
    //In addition to consuming Flow elements at the collect ion, you can also do this through onEach.
    // In this way, the specific operation of consumption does not need to be put together with the end operator, and the collect function can be called anywhere else
    flow.onEach {
        log("onEach2:$it")
    }
    withContext(Dispatchers.IO) {
        delay(1000)
        flow.collect()
    }
    //In addition to using the child coroutine to execute upstream, we can also use the launchIn function to make Flow use a new coroutine context
    flow.onEach {
        log("onEach2:$it")
    }.launchIn(CoroutineScope(Dispatchers.IO))
        .join()//The main thread waits for the execution of this coroutine to end

Cancellation of Flow

lifecycleScope.launch(Dispatchers.IO) {
    val flow2 = (1..10).asFlow().onEach { delay(1000) }
    val job: Job = lifecycleScope.launch {
        log("lifecycleScope.launch")
        flow2.flowOn(Dispatchers.IO)//Set the scheduler used when it runs
            .collect {//Consumption Flow
                log("flow2:$it")
            }
    }
    delay(2000)
    job.cancelAndJoin()
}

Back pressure of Flow

  • As long as it is responsive programming, there will be back pressure. Let's see what the back pressure is.
  • The back pressure problem occurs when the production rate of the producer is higher than the processing rate of the consumer. In order to ensure no data loss, we will also consider adding cache to alleviate the problem:
//Add buffer for Flow
flow {
    List(5) {
        emit(it)
    }
}.buffer().collect {
    log("flow buffer collect:$it")
}
  • You can also specify a capacity for the buffer. However, if we simply add cache instead of fundamentally solving the problem, we will always cause data backlog.
  • The root cause of the problem is the mismatch between production and consumption rates. In addition to directly optimizing the performance of consumers, we can also take some trade-offs.
  • The first is configure. Consistent with the CONFLAT mode of Channel, new data will overwrite old data,
flow {
    List(10) {
        emit(it)
    }
}
.conflate()
.collect { value ->
    log("flow conflate Collecting $value")
    delay(100)
    log("$value collected flow conflate ")
}
  • The second is collect latest. As the name suggests, it only processes the latest data, which seems to be no different from configure. In fact, it is very different: it does not directly overwrite the old data with the new data, but each one will be processed. However, if the last one comes before the previous one has been processed, the logic of processing the previous data will be cancelled. In addition to collectlast, mapLatest flatMapLatest and so on are all used for this purpose.
flow {
    List(10) {
        emit(it)
    }
}.collectLatest { value ->
    log("flow collectLatest Collecting $value")
    delay(100)
    log("$value collected flow collectLatest ")
}

Collect Android UI data streams in a more secure way

  • In Android development, use lifecycle owner addRepeatingJob,suspend Lifecycle. Repeatonlife or flow Flowwithlifecycle securely collects data flows from the UI layer. (provided in lifecycle runtime KTX: 2.4. + Library)
lifecycleScope.launch {
    delay(500)
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        flow.collect { log("collect2: $it") }
    }
}
lifecycleScope.launchWhenStarted {
    delay(1000)
    flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
    delay(1500)
    flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
        .collect { log("collect4: $it") }
}

SharedFlow

  • Cold flow and subscribers can only be one-to-one. When we want to realize the needs of one flow and multiple subscribers, we need heat flow. SharedFlow is a kind of heat flow
  • Its constructor is as follows
public fun <T> MutableSharedFlow(
    replay: Int = 0,//When a new subscriber collects, it sends several sent data to it. The default is 0, that is, the default new subscriber will not obtain the previous data
    extraBufferCapacity: Int = 0,//Indicates how much data MutableSharedFlow caches after deducting replay. The default value is 0
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//Indicates the cache policy, that is, how Flow handles when the buffer is full
    //BufferOverflow.SUSPEND policy, i.e. suspend policy, is suspended by default
    //BufferOverflow.DROP_OLDEST: discard old data
    //BufferOverflow.DROP_LATEST: discard the latest data
)
  • Simple use is as follows
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    sharedFlow.emit("aaa")
    delay(1000)
    sharedFlow.emit("bbb")
    delay(1000)
    sharedFlow.emit("ccc")
}
lifecycleScope.launch {
    delay(500)
    sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
    delay(3500)
    sharedFlow.collect { log("collect4:$it") }
}
  • Convert cold Flow to SharedFlow
lifecycleScope.launch {
    (1..5).asFlow().shareIn(
        //1. Scope of collaboration at the beginning of sharing
        scope = lifecycleScope,
        //2. Strategies for controlling the beginning and end of sharing
        // started = SharingStarted.Lazily, / / starts when the first subscriber appears, and terminates when the scope specified in the scope is ended
        // started = SharingStarted.Eagerly, / / starts immediately and terminates when the scope specified in the scope is ended
        //For operations that are performed only once, you can use lazy or Eagerly. However, if you need to look at other flows, you should use WhileSubscribed for subtle but important optimizations
        //The WhileSubscribed policy cancels the upstream data flow without a collector
        started = SharingStarted.WhileSubscribed(
            500,//stopTimeoutMillis controls a delay value in milliseconds, which refers to the time difference between the last subscriber ending the subscription and stopping the upstream flow. The default value is 0 (for example, when the user rotates the device, the original view will be destroyed first and then rebuilt within a few seconds)
            Long.MAX_VALUE//replayExpirationMillis indicates the obsolete time of data replay. If the user leaves the application for too long, you don't want the user to see the obsolete data. You can use this parameter
        ),
        //3. Number of replays of state stream
        replay = 0
    ).collect { log("shareIn.collect:$it") }
}

StateFlow

  • StateFlow inherits from SharedFlow and is a special variant of SharedFlow
  • The constructor is as follows. You only need to pass in a default value
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
  • StateFlow is essentially a SharedFlow with replay of 1 and no buffer. Therefore, the default value will be obtained when subscribing for the first time
  • StateFlow returns only when the value has been updated and the value has changed, that is, if the updated value has not changed, the Collect method will not be called back, which is different from LiveData
  • StateFlow is closest to LiveData because:
1. It always has value.
2. Its value is unique.
3. It is allowed to be shared by multiple observers (So it's a shared data stream). 
4. It will always only reproduce the latest values to subscribers, regardless of the number of active observers.
  • Simple use
log("StateFlow Default value:111")
val stateFlow = MutableStateFlow("111")

lifecycleScope.launch {
    delay(500)
    stateFlow.collect { log("StateFlow collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    stateFlow.collect { log("StateFlow collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    stateFlow.collect { log("StateFlow collect3:$it") }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(5000)
    log("StateFlow re emit:111")
    stateFlow.emit("111")
    delay(1000)
    log("StateFlow emit:222")
    stateFlow.emit("222")
}
  • Convert normal Flow to StateFlow
val stateFlow2: StateFlow<Int> = flow {
    List(10) {
        delay(300)
        emit(it)
    }
}.stateIn(
    scope = lifecycleScope,
    started = WhileSubscribed(5000),//After waiting for 5 seconds, if there is still no subscriber, terminate the process
    initialValue = 666//Default value
)
lifecycleScope.launchWhenStarted {//In the STARTED state, the collection stream will be STARTED, and the collection will be maintained in the RESUMED state. In the STOPPED state, the collection process will be ended
    stateFlow2.collect { log("StateFlow shareIn.collect:$it") }

}

The difference between StateFlow and SharedFlow

  1. SharedFlow configuration is more flexible and supports configuring replay, buffer size, etc. StateFlow is a specialized version of SharedFlow, replay is fixed to 1, and buffer size is 0 by default;
  2. StateFlow is similar to LiveData, and supports through myflow Value gets the current state. If this requirement exists, StateFlow must be used;
  3. SharedFlow supports sending and collecting duplicate values, while StateFlow will not call back collect when the value is repeated;
  4. For new subscribers, StateFlow will only replay the current latest value, and SharedFlow can configure the number of replay elements (0 by default, i.e. no replay);

Encapsulating FlowBus based on SharedFlow

Create message class EventMessage

class EventMessage {
    /**
     * key of message
     */
    var key: Int

    /**
     * Body of message message
     */
    var message: Any? = null
    private var messageMap: HashMap<String, Any?>? = null

    constructor(key: Int, message: Any?) {
        this.key = key
        this.message = message
    }

    constructor(key: Int) {
        this.key = key
    }

    fun put(key: String, message: Any?) {
        if (messageMap == null) {
            messageMap = HashMap<String, Any?>()
        }
        messageMap?.set(key, message)
    }

    operator fun <T> get(key: String?): T? {
        if (messageMap != null) {
            try {
                return messageMap!![key] as T?
            } catch (e: ClassCastException) {
                e.printStackTrace()
            }
        }
        return null
    }
}

Create FlowBus

class FlowBus : ViewModel() {
    companion object {
        val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
    }

    //Normal events
    private val events = mutableMapOf<String, Event<*>>()

    //Viscous event
    private val stickyEvents = mutableMapOf<String, Event<*>>()

    fun with(key: String, isSticky: Boolean = false): Event<Any> {
        return with(key, Any::class.java, isSticky)
    }

    fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
        return with(eventType.name, eventType, isSticky)
    }

    @Synchronized
    fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
        val flows = if (isSticky) stickyEvents else events
        if (!flows.containsKey(key)) {
            flows[key] = Event<T>(key, isSticky)
        }
        return flows[key] as Event<T>
    }


    class Event<T>(private val key: String, isSticky: Boolean) {

        // private mutable shared flow
        private val _events = MutableSharedFlow<T>(
            replay = if (isSticky) 1 else 0,
            extraBufferCapacity = Int.MAX_VALUE
        )

        // publicly exposed as read-only shared flow
        val events = _events.asSharedFlow()

        /**
         * need main thread execute
         */
        fun observeEvent(
            lifecycleOwner: LifecycleOwner,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
            minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
            action: (t: T) -> Unit
        ) {
            lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
                override fun onDestroy(owner: LifecycleOwner) {
                    super.onDestroy(owner)
                    LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
                    val subscriptCount = _events.subscriptionCount.value
                    if (subscriptCount <= 0)
                        instance.events.remove(key)
                }
            })
            lifecycleOwner.lifecycleScope.launch(dispatcher) {
                lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
                    events.collect {
                        try {
                            action(it)
                        } catch (e: Exception) {
                            LjyLogUtil.d("ker=$key , error=${e.message}")
                        }
                    }
                }
            }
        }

        /**
         * send value
         */
        suspend fun setValue(
            event: T,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
        ) {
            withContext(dispatcher) {
                _events.emit(event)
            }

        }
    }
}

Using FlowBus

FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.IO) {
    withContext(Dispatchers.Main) {//Do not create a new collaboration, specify the code block to run on the collaboration, and you can switch threads
        FlowBus.instance.with(EventMessage::class.java)
            .observeEvent(this@EventBusActivity) {
                LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
            }
    }
}
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.Main) {

    val event = EventMessage(111)
    LjyLogUtil.d(
        "FlowBus:send1_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
    delay(2000)
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(101))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(102))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(103))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(104))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(105))
}
lifecycleScope.launch(Dispatchers.IO) {
    delay(4000)
    val event = EventMessage(222, "bbb")
    LjyLogUtil.d(
        "FlowBus:send2_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
lifecycleScope.launch(Dispatchers.Default) {
    delay(6000)
    withContext(Dispatchers.Main) {
        val event = EventMessage(333, "ccc")
        event.put("key1", 123)
        event.put("key2", "abc")
        LjyLogUtil.d(
            "FlowBus:send3_${Thread.currentThread().name}_${
                GsonUtils.toJson(
                    event
                )
            }"
        )
        FlowBus.instance.with(EventMessage::class.java).setValue(event)
    }
}

Further optimization

  • Use the extension function, ViewModelStoreOwner, and pre transmit eventmessage:: class Javas is easier to use in the current project
//Using extension function
fun LifecycleOwner.observeEvent(
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    isSticky: Boolean = false,
    action: (t: EventMessage) -> Unit
) {
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .with(EventMessage::class.java, isSticky = isSticky)
        .observeEvent(this@observeEvent, dispatcher, minActiveState, action)
}

fun postValue(
    event: EventMessage,
    delayTimeMillis: Long = 0,
    isSticky: Boolean = false,
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
) {
    LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .viewModelScope
        .launch(dispatcher) {
            delay(delayTimeMillis)
            ApplicationScopeViewModelProvider
                .getApplicationScopeViewModel(FlowBus::class.java)
                .with(EventMessage::class.java, isSticky = isSticky)
                .setValue(event)
        }
}

private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {

    private val eventViewModelStore: ViewModelStore = ViewModelStore()

    override fun getViewModelStore(): ViewModelStore {
        return eventViewModelStore
    }

    private val mApplicationProvider: ViewModelProvider by lazy {
        ViewModelProvider(
            ApplicationScopeViewModelProvider,
            ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
        )
    }

    fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
        return mApplicationProvider[modelClass]
    }

}

object FlowBusInitializer {
    lateinit var application: Application
    //Initialize in Application
    fun init(application: Application) {
        FlowBusInitializer.application = application
    }
}
  • use
lifecycleScope.launch(Dispatchers.IO) {
    observeEvent {
        LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    observeEvent(Dispatchers.IO) {
        LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }

    observeEvent(Dispatchers.Main) {
        LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    postValue(EventMessage(100))
    postValue(EventMessage(101), 1000)
    postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
    val event3 = EventMessage(103, "ccc")
    event3.put("key1", 123)
    event3.put("key2", "abc")
    postValue(event3, 2000, dispatcher = Dispatchers.Main)
}

reference resources

I am today, if you want to step in and know more dry cargo, welcome to WeChat official account "Jinyang" to receive my latest article.

Keywords: Android kotlin rxjava

Added by rednax on Fri, 31 Dec 2021 11:29:45 +0200