Transformer purpose
Transformer, as the name suggests, means converter. As early as rxjava 1 The X version has Observable Transformer,Single.Transformer and complete Transformer, in 2 In version x, it becomes Observable transformer, SingleTransformer, completable transformer, Flowable transformer and maybtransformer. Among them, FlowableTransformer and maybtransformer are new. Because RxJava2 splits Observable into Observable and Flowable, there is an additional FlowableTransformer. At the same time, may is a new type of RxJava2, so there is a maybtransformer.
As like as two peas, Transformer can transform a Observable/Flowable/Single/Completable/Maybe object into another Observable/Flowable/Single/Completable/Maybe object, which is exactly the same as calling a series of inline operators.
For a simple example, write a transformer() method to convert an Observable that emits an integer into an Observable that emits a string.
public static <String> ObservableTransformer<Integer, java.lang.String> transformer() { return new ObservableTransformer<Integer, java.lang.String>() { @Override public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) { return upstream.map(new Function<Integer, java.lang.String>() { @Override public java.lang.String apply(@NonNull Integer integer) throws Exception { return java.lang.String.valueOf(integer); } }); } }; }
The next step is to use the transformer() method through the operation of standard RxJava.
Observable.just(123,456) .compose(transformer()) .subscribe(new Consumer<String>() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });
Finally, it was printed twice, respectively
s=123 s=456
Through this example, you can simply and intuitively understand the role of Transformer.
In fact, there are similar transform concepts in the famous picture loading frameworks glass and Picasso, which can transform graphics.
Combined with the compose operator
compose operates in the whole data stream and can get the original observable < T > / flowable < T >
When creating Observable/Flowable The compose operator will be executed immediately, unlike other operators, which need to be executed after the onNext() call.
The foreigner's article on the compose operator is good Don't break the chain: use RxJava's compose() operator
There are also corresponding translations in China Avoid breaking the chain structure: use compose() operator
Common scenarios
1. Switch to the main thread
For network requests, we often do the following operations to switch threads.
.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
So I made a simple package.
import io.reactivex.FlowableTransformer import io.reactivex.ObservableTransformer import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.schedulers.Schedulers /** * Created by Tony Shen on 2017/7/13. */ object RxJavaUtils { @JvmStatic fun <T> observableToMain():ObservableTransformer<T, T> { return ObservableTransformer{ upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } } @JvmStatic fun <T> flowableToMain(): FlowableTransformer<T, T> { return FlowableTransformer{ upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } } }
The above code was written by Kotlin. Why not use Java? I am used to writing some tool classes with Kotlin, and it is more intuitive to use lambda expressions.
For the operation of switching from Flowable to the main thread, you can use this method
.compose(RxJavaUtils.flowableToMain())
2. Lifecycle transformer in rxlifecycle
trello RxLifecycle It can cooperate with the life cycle of Android and prevent App memory leakage, among which lifecycle transformer is used.
Zhihu made a similar one RxLifecycle , can do the same thing.
I also used RxLifecycle in my project. According to my personal habits and hobbies, I made some modifications to lifecycle Transformer and merged five transformers into one.
import org.reactivestreams.Publisher; import io.reactivex.Completable; import io.reactivex.CompletableSource; import io.reactivex.CompletableTransformer; import io.reactivex.Flowable; import io.reactivex.FlowableTransformer; import io.reactivex.Maybe; import io.reactivex.MaybeSource; import io.reactivex.MaybeTransformer; import io.reactivex.Observable; import io.reactivex.ObservableSource; import io.reactivex.ObservableTransformer; import io.reactivex.Single; import io.reactivex.SingleSource; import io.reactivex.SingleTransformer; import io.reactivex.annotations.NonNull; import io.reactivex.functions.Function; import io.reactivex.functions.Predicate; import io.reactivex.processors.BehaviorProcessor; /** * Created by Tony Shen on 2017/5/25. */ public class LifecycleTransformer<T> implements ObservableTransformer<T, T>, FlowableTransformer<T, T>, SingleTransformer<T, T>, MaybeTransformer<T, T>, CompletableTransformer { private final BehaviorProcessor<Integer> lifecycleBehavior; private LifecycleTransformer() throws IllegalAccessException { throw new IllegalAccessException(); } public LifecycleTransformer(@NonNull BehaviorProcessor<Integer> lifecycleBehavior) { this.lifecycleBehavior = lifecycleBehavior; } @Override public CompletableSource apply(Completable upstream) { return upstream.ambWith( lifecycleBehavior.filter(new Predicate<Integer>() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event == LifecyclePublisher.ON_DESTROY_VIEW || event == LifecyclePublisher.ON_DESTROY || event == LifecyclePublisher.ON_DETACH; } }).take(1).flatMapCompletable(new Function<Integer, Completable>() { @Override public Completable apply(Integer flowable) throws Exception { return Completable.complete(); } }) ); } @Override public Publisher<T> apply(final Flowable<T> upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate<Integer>() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } @Override public MaybeSource<T> apply(Maybe<T> upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate<Integer>() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate<Integer>() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }).toObservable() ); } @Override public SingleSource<T> apply(Single<T> upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate<Integer>() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } }
3. Use of cache
For caching, we generally write this
cache.put(key,value);
A more elegant approach is to use AOP, which is roughly written like this
@Cacheable(key = "...") getValue() { .... }
If you want to use caching in RxJava's chain calls, you can also consider using transformer. I've written a simple method below
/** * Created by Tony Shen on 2017/7/13. */ public class RxCache { public static <T> FlowableTransformer<T, T> transformer(final String key, final Cache cache) { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(@NonNull Flowable<T> upstream) { return upstream.map(new Function<T, T>() { @Override public T apply(@NonNull T t) throws Exception { cache.put(key,(Serializable) t); return t; } }); } }; } }
Combined with the above three usage scenarios, a method for obtaining content is encapsulated. Here, the network framework uses Retrofit. Although Retrofit itself supports adding Cache through Interceptor, you may still want to use your own Cache in some business scenarios, so you can use the following similar encapsulation.
/** * Get content * @param fragment * @param param * @param cacheKey * @return */ public Flowable<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) { return apiService.loadVideoContent(param) .compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer()) .compose(RxJavaUtils.<ContentModel>flowableToMain()) .compose(RxCache.<ContentModel>transformer(cacheKey,App.getInstance().cache)); }
4. Track RxJava usage
Beginners may be confused about the internal data flow of RxJava, so I wrote a class to track the use of RxJava, which is quite helpful for debugging code.
Let's start with a simple example
Observable.just("tony","cafei","aaron") .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .subscribe(new Consumer<String>() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });
The following figure shows the data flow in the above code.
Trace for the first time png
Then, add a map operator based on the code just now to convert lowercase strings to uppercase.
Observable.just("tony","cafei","aaron") .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .map(new Function<String, String>() { @Override public String apply(@io.reactivex.annotations.NonNull String s) throws Exception { return s.toUpperCase(); } }) .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA)) .subscribe(new Consumer<String>() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });
Look at how the data flows this time. Because the display is not large enough, the screenshot is a little less: (, but you can see the display of the log.
Do trace for the second time png
Finally, add monitoring oncomplete and OnTerminate
Observable.just("tony","cafei","aaron") .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .map(new Function<String, String>() { @Override public String apply(@io.reactivex.annotations.NonNull String s) throws Exception { return s.toUpperCase(); } }) .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA)) .compose(RxJavaUtils.<String>observableToMain()) .compose(RxTrace.<String>logObservable("third",RxTrace.LOG_COMPLETE|RxTrace.LOG_TERMINATE)) .subscribe(new Consumer<String>() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });
The screenshot shown above will not be displayed, but the final oncomplete and OnTerminate will be displayed.
Do trace for the third time png
summary
The combination of compose operator and Transformer can make the code look more concise on the one hand, and improve the reusability of the code on the other hand. RxJava advocates chain calling, and compose can prevent the chain from being broken.