RxJava2 little white manual (3) - map


The operator map, literally, is mapping. How to use this operator?

Take a chestnut

1. Code example

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Logger("Emit 1");
        Logger("Emit 2");

Observer<String> observer = new Observer<String>() {
    public void onSubscribe(Disposable d) {

    public void onNext(String object) {
           Logger(" onNext " +  object);

    public void onError(Throwable e) {
        Logger("onError e = " + e.getMessage());

    public void onComplete() {
        .map(new Function<Integer, String>() {
            public String apply(Integer integer) throws Exception {
                return "i am " + integer;

2. Operation results

You can see that the onSubscribe method is executed first, and then two onNext events are sent. After conversion, the two Integer parameters become String types, and then they are passed to the Observer for action. This is the role of Map.

3. Source code analysis

Here is the source code of the map operator.

    //Observe the two parameters of the Function in the map. One is T, which is the type before the change. R is the type after the change. The last parameter returned is the observerable of R type (let's say that)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //Routine non empty inspection
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //It's onAssembly again. It's not important. Pay attention to ObservableMap
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));

Look at the construction method of ObservableMap. The input parameter, source is the observable < T > before conversion. The second parameter is our function.
ObservableMap inherits AbstractObservableWithUpstream, and the latter inherits Observable. It can be seen that the Observable object after map conversion is the type of ObservableMap.

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        this.function = function;

Since the conversion is of the ObservableMap type, there must be a subscribeActual method

    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));

What is MapObserver?

   //MapObserver inherits BasicFuseableObserver, which implements the Observer interface
   static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        //function in map operator
        final Function<? super T, ? extends U> mapper;
        //actual is the Observer of the subscription behind us in the chestnut
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            this.mapper = mapper;

        public void onNext(T t) {
            //Done is true after the implementation of onError or onComplete. For details, see the parent class BasicFuseableObserver. When done is true, no processing is done.
            if (done) {

            if (sourceMode != NONE) {

            //U type, converted type, String in chestnut
            U v;

            try {
                //Routine non empty check, but there is an apply operation. The apply operation is the operation of converting Integer to string in the chestnut above. v is the converted String type
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
            //The Observer performs the onNext operation. v is the converted type, such as String in chestnut

        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);

        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;

Part of the content of the middle pass is missing, which does not affect our analysis of the main process. Interested friends can analyze the part of the poll, and then leave a message to me.
Some friends may have questions:
Why the onNext of the Observer does not execute immediately after the launch, because the threads of subscribeOn and observeOn in my chestnut are different.


[code Taoist], one week more, full of content.

Added by ggkfc on Fri, 01 May 2020 19:31:31 +0300