introduce
The operator map, literally, is mapping. How to use this operator?
Take a chestnut
1. Code example
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Logger("Emit 1");
e.onNext(1);
Logger("Emit 2");
e.onNext(2);
e.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Logger("onSubscribe");
}
@Override
public void onNext(String object) {
Logger(" onNext " + object);
}
@Override
public void onError(Throwable e) {
Logger("onError e = " + e.getMessage());
}
@Override
public void onComplete() {
Logger("onComplete");
}
};
observable.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "i am " + integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
2. Operation results
You can see that the onSubscribe method is executed first, and then two onNext events are sent. After conversion, the two Integer parameters become String types, and then they are passed to the Observer for action. This is the role of Map.
3. Source code analysis
Here is the source code of the map operator.
//Observe the two parameters of the Function in the map. One is T, which is the type before the change. R is the type after the change. The last parameter returned is the observerable of R type (let's say that)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//Routine non empty inspection
ObjectHelper.requireNonNull(mapper, "mapper is null");
//It's onAssembly again. It's not important. Pay attention to ObservableMap
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Look at the construction method of ObservableMap. The input parameter, source is the observable < T > before conversion. The second parameter is our function.
ObservableMap inherits AbstractObservableWithUpstream, and the latter inherits Observable. It can be seen that the Observable object after map conversion is the type of ObservableMap.
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
Since the conversion is of the ObservableMap type, there must be a subscribeActual method
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
What is MapObserver?
//MapObserver inherits BasicFuseableObserver, which implements the Observer interface
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
//function in map operator
final Function<? super T, ? extends U> mapper;
//actual is the Observer of the subscription behind us in the chestnut
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//Done is true after the implementation of onError or onComplete. For details, see the parent class BasicFuseableObserver. When done is true, no processing is done.
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//U type, converted type, String in chestnut
U v;
try {
//Routine non empty check, but there is an apply operation. The apply operation is the operation of converting Integer to string in the chestnut above. v is the converted String type
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//The Observer performs the onNext operation. v is the converted type, such as String in chestnut
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
Part of the content of the middle pass is missing, which does not affect our analysis of the main process. Interested friends can analyze the part of the poll, and then leave a message to me.
Some friends may have questions:
Why the onNext of the Observer does not execute immediately after the launch, because the threads of subscribeOn and observeOn in my chestnut are different.
Last
[code Taoist], one week more, full of content.