rxjava2 source code analysis and thread switching analysis

usage method

Let's start with the most basic use:

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "s = " + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
Copy code

Compared with the previous example, there are two more pieces of code, subscribeOn and observeOn. As we know, subscribeOn is used to adjust the thread of the observer (emission source), while observeOn is used to adjust the thread of the observer (processor).

observeOn

Let's first look at the source code of observeOn to see how it controls the threads of the processor:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //We can ignore the short judgment code and the mechanism related to s hock. We can directly look at ObservableObserveOn
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
Copy code

First of all, we know that observeOn and subscribeOn are both methods of the decorator observable, and their return values are also observable (as mentioned earlier, this is the decorator mode). It's still the same here. Just look at the ObservableObserveOn object.

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        //Store upstream source locally
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //If it is the current thread, it will not be processed
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
Copy code

The key point is the subscribeActual method. We can see that when initializing ObservableObserveOn, the scheduler we set is passed in. Therefore, in subscribeActual, first judge whether the scheduler is a trampoline scheduler. What is the trampoline scheduler? Let's look at the official note:

/**
 * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
 * after the current unit of work is completed.
 *Scheduled to work on the current thread, but not immediately. Put the work into the queue and execute it after the current unit of work is completed.
 */
Copy code

OK, it's clear at a glance. If it's the current thread, you can bind it directly without any processing. Otherwise, create a new ObserveOnObserver object and bind the upstream decorator (here, the upstream is the upstream of the code flow, that is, the decorator calling observeOn) before this object. Does this look familiar? Let's go back to the subscribeActual method in ObservableCreate mentioned in the previous article and compare the differences.

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
Copy code

You can see that both create a new object internally and bind the upstream decorator (or emission source) to it. The difference is that ObservableCreate creates a new emitter CreateEmitter object, while ObservableObserveOn creates a new processor ObserveOnObserver object. ObservableCreate is to bind the previously stored upstream emission source to the transmitter, and ObservableObserveOn is to bind the upstream decorator to the processor.
According to this point, we can divide the decorator into two styles:

  • One is similar to ObservableCreate, which belongs to the beginning of each pipeline. It itself is a decorator, the upstream is the emission source, and a transmitter is generated internally to process the initial emission process. We call it the start decorator
  • One is similar to ObservableObserveOn, which belongs to the intermediate process of the pipeline. It itself is a decorator. The upstream is a decorator. A new processor is built to deal with the upstream events, and the downstream is a processor or other decorators. We call it a process decorator.

Let's take a look at the source code of ObserveOnObserver.

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //Synchronous asynchronous correlation, temporarily ignored
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //The point is this
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver inherits the runnable interface, which means it can be executed as a thread task. This represents the execution of the run method in the new thread.
                worker.schedule(this);
            }
        }
        //ObserveOnObserver inherits the runnable interface
        @Override
        public void run() {
        //For synchronous asynchronous correlation, let's look directly at drainNormal()
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    ····//Omit some judgment code
                    v = q.poll();
                    //Here you can see that the downstream onNext method is switched to a new thread for execution.
                    a.onNext(v);
                }
                ···
            }
        }
        
    }
Copy code

We won't go into detail here (we'll talk about it in detail in the next article). We just need to know that this is the upstream processor executing onNext, which is transmitted here and uses the previously set thread to execute the downstream onNext method. Therefore, the thread switching function is completed here. And this switch continues to onNext of all downstream processes. If you call ObserverOn again downstream, you will switch the subsequent processor to another thread.
Therefore, we can conclude that ObserverOn can be called multiple times, and each call will act on all downstream processors until a new ObserverOn is encountered.

subscribeOn

Next, you can take a look at the source code of SubscribeOn.

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //Empty judgment and s hock mechanism code, ignored. Look directly at ObservableSubscribeOn
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
Copy code

With the same formula and taste, we can directly look at the subscribeActual method of ObservableSubscribeOn class.

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //Create SubscribeOnObserver internal class object
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //The downstream onSubscribe is called here
        observer.onSubscribe(parent);
        //scheduler. The scheduledirect method returns a disposable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
Copy code

It can be seen here that observable subscribeon is also a process decorator. When calling subscribe, a new processor will be created internally, which holds each other with the downstream processor. This is different from ObservableObserveOn. Instead of immediately executing subscribe to connect the upstream decorator with the internal processor, it executes:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
Copy code

Let's see what SubscribeTask is:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //The source here is the upstream decorator and the parent is the internal processor
            source.subscribe(parent);
        }
    }
Copy code

Very simply, SubscribeTask inherits Runnable, where the run method is the subscribe method that executes the upstream decorator. Let's look at the scheduleDirect method.

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
Copy code

Here, the createWorker method is rewritten by each different scheduler. We don't delve into this code, we just need to understand that in subscribeActual, a new thread is created according to the previously set scheduler. In the thread, the subscribe method of the upstream source is executed immediately and bound with the internal processor SubscribeOnObserver. SubscribeOnObserver processor is no different from general processor, so no code is pasted.
Then, the above implementation is implemented, and the subscribe method is executed in another thread. As you can see from the previous basic process, it is basically determined here that the threads executed by all upstream subscribe methods. Therefore, we know that the subscribeOn() method will switch all the upstream subscribe methods. As for the thread where the emission source is located, it is only related to the thread switched in the nearest subscribeOn() method. Therefore, the subscribeOn() method only needs to be executed once, and only the first time is effective.

onSubscribe

We can see that there is an onSubscribe method in the terminal Observer. We generally perform some initialization operations here, and there are many onSubscribe methods in the previous source code. Which thread does this method execute on? We look for the answer from the source code.
To know which thread the onSubscribe method is on, you only need to see which thread calls onSubscribe() in the upstream decorator nearest to the terminal processor. Let's see if subscribeOn and observeon have changed the threads it runs. Let's first look at observeon. After all, this is a thread switching method for downstream processors.

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
Copy code

From the above source code, we can see that subscribeActual in ObservableObserveOn class has no onSubscribe related content. Let's look at the source code of ObserveOnObserver:

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                ·····//Irrelevant code
                //Downstream here refers to the downstream processor
                downstream.onSubscribe(this);
            }
        }
Copy code

We can see that when ObserveOnObserver is called onSubscribe, the downstream onSubscribe will be called, and the parameter is itself. That is, observaon does not switch the thread of the onSubscribe method.
Looking at the subscribeOn method, it is obvious that it directly executes the onSubscribe method of the downstream processor in subscribeActual.

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
Copy code

The method switched by these two threads does not change the thread of onSubscribe() method. Therefore, we can confirm that the onsubscribe () method in the terminal processor Observer is on the same thread as the external processor.

To sum up

  • observeOn acts on all downstream processors and can be called multiple times. The thread that each processor runs depends on the thread specified in its nearest upstream observeOn method.
  • subscribeOn acts on the upstream emission source and is mainly used to specify the thread where the subscribe method is located. For the emission source, only the thread specified in the nearest downstream subscribeOn method takes effect. Therefore, calling the subscribeOn method multiple times has no effect.
  • The onSubscribe() method does not switch threads following internal thread switching. The thread running on is only consistent with the thread that creates this whole set of observer mode externally.

Keywords: rxjava

Added by harryman100 on Sat, 19 Feb 2022 17:41:20 +0200