summary
The source code of rxjava simple subscribe has been mentioned earlier. Interested readers can have a look.
Rxjava source code analysis (I) - subscribe source code
Based on the previous analysis, this article will continue to talk about rxjava interrupt thread switching.
Demo
The main logic is as follows:
- Observable calls create() to create ObservableCreate
- ObservableCreate calls map and returns ObservableMap
- ObservableMap calls subscribeOn and returns ObservableSubscribeOn
- Return ObservableSubscribeOn, call observaveon, and return ObservableSubscribeOn
- ObservableSubscribeOn calls subscribe to execute the final logic.
Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { Log.i("RxJavaTest", "subscribe"); emitter.onNext("123"); } }).map(new Function<String, String>() { @Override public String apply(String s) throws Throwable { return s + "456"; } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i("RxJavaTest", "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i("RxJavaTest", "onNext: " + s); } @Override public void onError(@NonNull Throwable e) { Log.i("RxJavaTest", "onError"); } @Override public void onComplete() { Log.i("RxJavaTest", "onComplete"); } });
Operation results
RxJavaTest: onSubscribe RxJavaTest: subscribe RxJavaTest: onNext: 123456
schematic diagram
The above figure is the flow chart of key logic sorted according to the demo.
The main contents are as follows:
- Each Observable corresponds to an observer. Observer is the internal class of Observable. For example, MapObserver is the internal class of ObservableMap.
- The current Observable will store the previous Observable, that is, the source object. Like in Demo
The source of ObservableMap in is ObservableCreate. - The Observer will store the last Observer, that is, downstream. For example, in the Demo, the downstream of MapObserver is CreateEmitter.
- Since the first subscribe method is called, the Observable subscribe method will be called layer by layer, and finally the Observable subscribeActual method will be called. The logic of each subclass is different.
For example, the subscribeActual method of ObservableObserveOn will get the Worker executing the Observer and then pass it to the source.
For another example, ObservableMap will pass the Function object defining the map method to source. - The example used here is Observer The onnext method, of course, can also be other methods such as onError. Each Observer will refer to the previous Observer, i.e. downstream. The last Observer in the Demo is ObserveOnObserver.
The processing logic of each Observer for onNext and other methods is also different. Let's give an example.
For example, in the onNext of MapObserver, the result is first executed by the logic in the map, and then passed to the onNext method of downstream for processing.
For example, in the onNext of ObserveOnObserver, the onNext method of source will be thrown into the Worker for processing, or into a specific thread for processing.
Key source code analysis
Observable.subscribe
In the subscribe method of Observable, the subscribeActual method will eventually be called. The subscribeActual method is a virtual method and will be implemented in subclasses
public final void subscribe(@NonNull Observer<? super T> observer) { Objects.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
Observable.subscribeOn
subscribeOn finally creates an ObservableSubscribeOn object.
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler)); }
ObservableSubscribeOn.subscribeActual
The subscribeOn method is used to determine the thread to execute during subscription, so you need to throw it to the corresponding thread to run.
In subscribeActual, the Scheduler will eventually be called Scheduledirect method, in which the logic in the Observer will be run in the new Worker, that is, in the thread set by the Scheduler.
public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
ObservableSubscribeOn. SubscribeOnObserver.onNext
The onnext of SubscribeOnObserver only calls the onnext method of the next previous Observer.
Some readers here can be a little confused. Please explain it additionally.
Observable.subscribeOn: defines the thread to execute when subscribing
Observable.observeOn: defines the thread to execute when listening
public void onNext(T t) { downstream.onNext(t); }
Observable.observeOn
The ObservableObserveOn method finally returns the ObservableObserveOn object.
public final Observable<T> observeOn(@NonNull Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { Objects.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)); }
ObservableObserveOn.subscribeActual
Here, a Worker will be created, that is, the thread to be run by the Observer will be defined, and then the Observer will be created and passed to source Subscribe method.
protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); } }
ObservableObserveOn.ObserveOnObserver.onNext
The observeOn method is used to determine the thread to execute when listening. Therefore, the logic will be thrown into the corresponding thread for execution in the Observer method.
This will eventually call worker The schedule method puts the logic into the corresponding thread for execution.
public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }