Implement a combine latest operation of operators in Rxjs every day (2)

stay Last article The use of combineLatest is described in, and the core code implementation is listed below:

class CombineLatestSubscriber extends OuterSubscriber {
  _next(observable) {

  _complete() {
    const observables = this.observables;
    const len = observables.length;
    if (len === 0) {
    } else {
      this.toRespond = len;
      for (let i = 0; i < len; i++) {
        const observable = observables[i];
        subscribeToResult(this, observable, observable, i);

  notifyNext(outerValue, innerValue, outerIndex) {
    const values = this.values;
    const oldVal = values[outerIndex];
    const toRespond = !this.toRespond
      ? 0
      : oldVal === none ? --this.toRespond : this.toRespond;
    values[outerIndex] = innerValue;
    if (toRespond === 0) {;


export class CombineLatestOperator {
  call(subscriber, source) {
    return source.subscribe(new CombineLatestSubscriber(subscriber));

export function higherOrderCombineLatest(observables) {
  return source => ArrayObservable([source, ...observables]), new CombineLatestOperator());

You can see in the code:

  1. When the combinelateoperation is called, a new Observable will be created with the parameters Observable and source Observable, and the Observable is an array.
  2. When the subscribe method of ArrayObservable is executed, all the observables will be stored in the observables variable of CombineLatestSubscriber
  3. Execute the method of ArrayObservable, and call the method of CombineLatestSubscriber. In this method, the notifyNext method will be called at the time of each observable exit data.
  4. In notifyNext, we can see that the values array stores the latest data transmitted by each observable, and the next method of destination will be executed only when all observable have transmitted data, that is, when the toRespond is 0.

Added by Scott_J on Fri, 27 Dec 2019 00:02:11 +0200