Rxjava source code analysis - thread switching source code

summary

The source code of rxjava simple subscribe has been mentioned earlier. Interested readers can have a look.
Rxjava source code analysis (I) - subscribe source code

Based on the previous analysis, this article will continue to talk about rxjava interrupt thread switching.

Demo

The main logic is as follows:

  1. Observable calls create() to create ObservableCreate
  2. ObservableCreate calls map and returns ObservableMap
  3. ObservableMap calls subscribeOn and returns ObservableSubscribeOn
  4. Return ObservableSubscribeOn, call observaveon, and return ObservableSubscribeOn
  5. ObservableSubscribeOn calls subscribe to execute the final logic.
Observable
        .create(new ObservableOnSubscribe<String>() {

          @Override
          public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
            Log.i("RxJavaTest", "subscribe");
            emitter.onNext("123");
          }
        }).map(new Function<String, String>() {

      @Override
      public String apply(String s) throws Throwable {
        return s + "456";
      }
    })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {

          @Override
          public void onSubscribe(@NonNull Disposable d) {
            Log.i("RxJavaTest", "onSubscribe");
          }

          @Override
          public void onNext(@NonNull String s) {
            Log.i("RxJavaTest", "onNext: " + s);
          }

          @Override
          public void onError(@NonNull Throwable e) {
            Log.i("RxJavaTest", "onError");
          }

          @Override
          public void onComplete() {
            Log.i("RxJavaTest", "onComplete");
          }
        });

Operation results

RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456

schematic diagram


The above figure is the flow chart of key logic sorted according to the demo.
The main contents are as follows:

  • Each Observable corresponds to an observer. Observer is the internal class of Observable. For example, MapObserver is the internal class of ObservableMap.
  • The current Observable will store the previous Observable, that is, the source object. Like in Demo
    The source of ObservableMap in is ObservableCreate.
  • The Observer will store the last Observer, that is, downstream. For example, in the Demo, the downstream of MapObserver is CreateEmitter.
  • Since the first subscribe method is called, the Observable subscribe method will be called layer by layer, and finally the Observable subscribeActual method will be called. The logic of each subclass is different.
    For example, the subscribeActual method of ObservableObserveOn will get the Worker executing the Observer and then pass it to the source.
    For another example, ObservableMap will pass the Function object defining the map method to source.
  • The example used here is Observer The onnext method, of course, can also be other methods such as onError. Each Observer will refer to the previous Observer, i.e. downstream. The last Observer in the Demo is ObserveOnObserver.
    The processing logic of each Observer for onNext and other methods is also different. Let's give an example.
    For example, in the onNext of MapObserver, the result is first executed by the logic in the map, and then passed to the onNext method of downstream for processing.
    For example, in the onNext of ObserveOnObserver, the onNext method of source will be thrown into the Worker for processing, or into a specific thread for processing.

Key source code analysis

Observable.subscribe

In the subscribe method of Observable, the subscribeActual method will eventually be called. The subscribeActual method is a virtual method and will be implemented in subclasses

    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

Observable.subscribeOn

subscribeOn finally creates an ObservableSubscribeOn object.

    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

ObservableSubscribeOn.subscribeActual

The subscribeOn method is used to determine the thread to execute during subscription, so you need to throw it to the corresponding thread to run.

In subscribeActual, the Scheduler will eventually be called Scheduledirect method, in which the logic in the Observer will be run in the new Worker, that is, in the thread set by the Scheduler.

    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

ObservableSubscribeOn. SubscribeOnObserver.onNext

The onnext of SubscribeOnObserver only calls the onnext method of the next previous Observer.

Some readers here can be a little confused. Please explain it additionally.
Observable.subscribeOn: defines the thread to execute when subscribing
Observable.observeOn: defines the thread to execute when listening

        public void onNext(T t) {
            downstream.onNext(t);
        }

Observable.observeOn

The ObservableObserveOn method finally returns the ObservableObserveOn object.

    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn.subscribeActual

Here, a Worker will be created, that is, the thread to be run by the Observer will be defined, and then the Observer will be created and passed to source Subscribe method.

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }

ObservableObserveOn.ObserveOnObserver.onNext

The observeOn method is used to determine the thread to execute when listening. Therefore, the logic will be thrown into the corresponding thread for execution in the Observer method.

This will eventually call worker The schedule method puts the logic into the corresponding thread for execution.

        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

Keywords: Java Android source code rxjava

Added by itreP on Wed, 22 Dec 2021 14:12:24 +0200