Use of Transformer in RxJava

Transformer purpose

Transformer, as the name suggests, means converter. As early as rxjava 1 The X version has Observable Transformer,Single.Transformer and complete Transformer, in 2 In version x, it becomes Observable transformer, SingleTransformer, completable transformer, Flowable transformer and maybtransformer. Among them, FlowableTransformer and maybtransformer are new. Because RxJava2 splits Observable into Observable and Flowable, there is an additional FlowableTransformer. At the same time, may is a new type of RxJava2, so there is a maybtransformer.

As like as two peas, Transformer can transform a Observable/Flowable/Single/Completable/Maybe object into another Observable/Flowable/Single/Completable/Maybe object, which is exactly the same as calling a series of inline operators.

For a simple example, write a transformer() method to convert an Observable that emits an integer into an Observable that emits a string.

public static <String> ObservableTransformer<Integer, java.lang.String> transformer() {
        return new ObservableTransformer<Integer, java.lang.String>() {
            @Override
            public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) {
                return upstream.map(new Function<Integer, java.lang.String>() {
                    @Override
                    public java.lang.String apply(@NonNull Integer integer) throws Exception {
                        return java.lang.String.valueOf(integer);
                    }
                });
            }
        };
    }

The next step is to use the transformer() method through the operation of standard RxJava.

Observable.just(123,456)
       .compose(transformer())
       .subscribe(new Consumer<String>() {
              @Override
               public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                   System.out.println("s="+s);
                    }
                });

Finally, it was printed twice, respectively

s=123
s=456

Through this example, you can simply and intuitively understand the role of Transformer.

In fact, there are similar transform concepts in the famous picture loading frameworks glass and Picasso, which can transform graphics.

Combined with the compose operator

compose operates in the whole data stream and can get the original observable < T > / flowable < T >
When creating Observable/Flowable The compose operator will be executed immediately, unlike other operators, which need to be executed after the onNext() call.

The foreigner's article on the compose operator is good Don't break the chain: use RxJava's compose() operator
There are also corresponding translations in China Avoid breaking the chain structure: use compose() operator

Common scenarios

1. Switch to the main thread

For network requests, we often do the following operations to switch threads.

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

So I made a simple package.

import io.reactivex.FlowableTransformer
import io.reactivex.ObservableTransformer
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers

/**
 * Created by Tony Shen on 2017/7/13.
 */
object RxJavaUtils {

    @JvmStatic
    fun <T> observableToMain():ObservableTransformer<T, T> {

        return ObservableTransformer{
            upstream ->
            upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
        }
    }

    @JvmStatic
    fun <T> flowableToMain(): FlowableTransformer<T, T> {

        return FlowableTransformer{
            upstream ->
            upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
        }
    }
}

The above code was written by Kotlin. Why not use Java? I am used to writing some tool classes with Kotlin, and it is more intuitive to use lambda expressions.

For the operation of switching from Flowable to the main thread, you can use this method

.compose(RxJavaUtils.flowableToMain())

2. Lifecycle transformer in rxlifecycle

trello RxLifecycle It can cooperate with the life cycle of Android and prevent App memory leakage, among which lifecycle transformer is used.
Zhihu made a similar one RxLifecycle , can do the same thing.

I also used RxLifecycle in my project. According to my personal habits and hobbies, I made some modifications to lifecycle Transformer and merged five transformers into one.

import org.reactivestreams.Publisher;

import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.CompletableTransformer;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.SingleTransformer;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.processors.BehaviorProcessor;

/**
 * Created by Tony Shen on 2017/5/25.
 */

public class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
        FlowableTransformer<T, T>,
        SingleTransformer<T, T>,
        MaybeTransformer<T, T>,
        CompletableTransformer {

    private final BehaviorProcessor<Integer> lifecycleBehavior;

    private LifecycleTransformer() throws IllegalAccessException {
        throw new IllegalAccessException();
    }

    public LifecycleTransformer(@NonNull BehaviorProcessor<Integer> lifecycleBehavior) {
        this.lifecycleBehavior = lifecycleBehavior;
    }

    @Override
    public CompletableSource apply(Completable upstream) {
        return upstream.ambWith(
                lifecycleBehavior.filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event == LifecyclePublisher.ON_DESTROY_VIEW ||
                                event == LifecyclePublisher.ON_DESTROY ||
                                event == LifecyclePublisher.ON_DETACH;
                    }
                }).take(1).flatMapCompletable(new Function<Integer, Completable>() {
                    @Override
                    public Completable apply(Integer flowable) throws Exception {
                        return Completable.complete();
                    }
                })
        );
    }

    @Override
    public Publisher<T> apply(final Flowable<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                })
        );
    }

    @Override
    public MaybeSource<T> apply(Maybe<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                })
        );
    }

    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                }).toObservable()
        );
    }

    @Override
    public SingleSource<T> apply(Single<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                })
        );
    }
}

3. Use of cache

For caching, we generally write this

cache.put(key,value);

A more elegant approach is to use AOP, which is roughly written like this

@Cacheable(key = "...")
getValue() {
    ....
}

If you want to use caching in RxJava's chain calls, you can also consider using transformer. I've written a simple method below

/**
 * Created by Tony Shen on 2017/7/13.
 */

public class RxCache {

    public static <T> FlowableTransformer<T, T> transformer(final String key, final Cache cache) {
        return new FlowableTransformer<T, T>() {
            @Override
            public Publisher<T> apply(@NonNull Flowable<T> upstream) {

                return upstream.map(new Function<T, T>() {
                    @Override
                    public T apply(@NonNull T t) throws Exception {
                        cache.put(key,(Serializable) t);
                        return t;
                    }
                });

            }
        };
    }
}

Combined with the above three usage scenarios, a method for obtaining content is encapsulated. Here, the network framework uses Retrofit. Although Retrofit itself supports adding Cache through Interceptor, you may still want to use your own Cache in some business scenarios, so you can use the following similar encapsulation.

    /**
     * Get content
     * @param fragment
     * @param param
     * @param cacheKey
     * @return
     */
    public Flowable<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) {

        return apiService.loadVideoContent(param)
                .compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer())
                .compose(RxJavaUtils.<ContentModel>flowableToMain())
                .compose(RxCache.<ContentModel>transformer(cacheKey,App.getInstance().cache));
    }

4. Track RxJava usage

Beginners may be confused about the internal data flow of RxJava, so I wrote a class to track the use of RxJava, which is quite helpful for debugging code.

Let's start with a simple example

Observable.just("tony","cafei","aaron")
                .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                        System.out.println("s="+s);
                    }
                });

The following figure shows the data flow in the above code.

Trace for the first time png

Then, add a map operator based on the code just now to convert lowercase strings to uppercase.

Observable.just("tony","cafei","aaron")
                .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                .map(new Function<String, String>() {


                    @Override
                    public String apply(@io.reactivex.annotations.NonNull String s) throws
                            Exception {
                        return s.toUpperCase();
                    }
                })
                .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                        System.out.println("s="+s);
                    }
                });

Look at how the data flows this time. Because the display is not large enough, the screenshot is a little less: (, but you can see the display of the log.

Do trace for the second time png

Finally, add monitoring oncomplete and OnTerminate

Observable.just("tony","cafei","aaron")
                .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                .map(new Function<String, String>() {


                    @Override
                    public String apply(@io.reactivex.annotations.NonNull String s) throws
                            Exception {
                        return s.toUpperCase();
                    }
                })
                .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA))
                .compose(RxJavaUtils.<String>observableToMain())
                .compose(RxTrace.<String>logObservable("third",RxTrace.LOG_COMPLETE|RxTrace.LOG_TERMINATE))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                        System.out.println("s="+s);
                    }
                });

The screenshot shown above will not be displayed, but the final oncomplete and OnTerminate will be displayed.

Do trace for the third time png

summary

The combination of compose operator and Transformer can make the code look more concise on the one hand, and improve the reusability of the code on the other hand. RxJava advocates chain calling, and compose can prevent the chain from being broken.

Keywords: rxjava

Added by sd9sd on Mon, 21 Feb 2022 17:49:20 +0200