[Understanding RxSwift source code]-Operators

Opening chapter


RxSwift.png

stay Last article In this article, we analyze the entire subscription process in RxSwift. Before introducing the operation of transformation, we should first clarify the concept of Sink. Students who are not clear can turn to the analysis in the previous article. Simply put, a Sink convection operation is performed before each subscription operation. If the flow in Rx is treated as water, then Sink is equivalent to the filter mesh of each water pipe faucet, and the final processing is carried out before the water is discharged.


Sink.png

Every subscribe can be analogized to an effluent. Before each effluent, RxSwift does one thing:

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

From the source code above, we can see that whenever an Observable is subscribed, the Observable will execute the run method, and what the run method does is Sink's related processing operations.

Simply put, Sink does two things:

  1. Forwarding Next, Complete, Error events;
  2. Advanced changes before convection forwarding.

And our transformation operations are basically operated in a variety of Sinks, why is it basically? Because in some cases of high-order change (nested transformation), Sink is not the place where the change takes place, the specific situation will be discussed slowly below.

Example

Here's the simplest example code. Let's start with the most common map. Let's see how Krunoslav Zaher handles the map.

Observable.of(1, 2, 3)
    .map { $0 * $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

We can stick a breakpoint on top of the map method. After the program runs, we can see that the method definition stopped below.

extension ObservableType {
    public func map<R>(_ transform: @escaping (E) throws -> R)
        -> Observable<R> {
        return self.asObservable().composeMap(transform)
    }
}

As you can see, there are two things to do here. First, make sure that the caller is converted to Observable, because objects that conform to ObservableType may be ControlEvent, ControlProperty or something like that. Then we call the composeMap method to pass in the closure of the desired transformation operation.

OK, let's go one more level and see what composeMap does:

internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }

As you can see, here Observable calls its own _map private method:

internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
    return Map(source: source, transform: transform)
}
final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType
    private let _source: Observable<SourceType>
    private let _transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        _source = source
        _transform = transform
    }
    override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
        let originalSelector = _transform
        return Map<SourceType, R>(source: _source, transform: { (s: SourceType) throws -> R in
            let r: ResultType = try originalSelector(s)
            return try selector(r)
        })
    }
    
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
        let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
        let subscription = _source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

We can see that the so-called _map actually returns a Map object based on the Producer class (Producer inherits from Observable, and Observable class is the place where composeMap was first defined, and this integration chain is important for the next understanding). Here are three main things to do:

  1. First, the observable sequence and the transformation operation are saved by the constructor for reserve.
  2. Rewrite the composeMap of the parent class, from the original direct use of the incoming "transformation operation" (transform) to construct the Map object to use the Map object's own "transformation operation" for a transformation, and then use the incoming "transformation operation" for a transformation. Such recursive processing can achieve the purpose of nesting map operations, as follows: Observable < Int >. of (1, 3, 5). map (+). map (+).
  3. Rewrite the run method of the parent class. As mentioned earlier, the run method executes before the subscription, and uses various kinds of links to process the "data source" when passing data. In this case, the Sink is MapSink, which uses the incoming transform to process the data source at each Next event, and then passes the processed data source out.

So far, all map operations have been completed. We can see that map operations are actually "inert". That is to say, when you use map operations, they will not perform transformation operations immediately unless you use nested maps or subscribe to observation sequences.

Producer-consumer model

In the design and implementation of RxSwift, it is also true. Producer-consumer pattern Practical application. In RxSwift, all observable sequences act as producers, so we can transform operations to return a subclass inherited from the Producer class (except some Subjects, Subjects are special, which will be discussed later).


Overview of Producer Inheritance. png

The BrainMap above shows the subclasses derived from Producer. We can see that whether our usual "initialization" methods are just, of, from, or our usual transformation methods: map, flatMap,merge, their corresponding implementations are a kind of Producer.

We can see that it is also due to the implementation of the producer-consumer model that RxSwift can customize the process before the end of each pipeline in the observable sequence, just like the pipeline in the factory.


Work Flow.png

summary

Next, we can have a look at RxSwift's operation of event transformation. Here are some logical sorting out work. I hope you can see more clearly.

1. Protocol Extension

Start with an agreement. - WWDC 2015

We know that RxSwift's observable sequences are based on Observable Type, so when we need to add a transformation operation to all observable sequences, we just need to add a public method through extension, and then implement it.

extension ObservableType {

    public func map<R>(_ transform: @escaping (E) throws -> R)
        -> Observable<R> {
        return self.asObservable().composeMap(transform)
    }

    public func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
        -> Observable<O.E> {
            return FlatMap(source: asObservable(), selector: selector)
    }

    public func concat<O: ObservableConvertibleType>(_ second: O) -> Observable<E> where O.E == E {
        return Observable.concat([self.asObservable(), second.asObservable()])
    }

    public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2)
            -> Observable<(O1.E, O2.E)> {
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: { ($0, $1) }
        )
    }

    // More and more ....
}

The code I listed above is placed in the same extension for the sake of centralized presentation. In the actual source code, they are scattered in different swift files. So we know that all the transformation operations we use are extended to ObservableType protocol through extension.

Looking at the source code, we can see that all the above transformation operations actually do one thing, that is, return a specific subclass of Producer. For example, map returns instance objects of Map class and combineLatest returns instance objects of CombineLatest2 class.

2. Representational Producer

So what has been done to the Producer subclass returned by the extension method?

First, the representational Producer must rewrite the override func run < O: Observer Type >(observer: O, cancel: Cancelable) - > (sink: Disposable, subscription: Disposable) where O.E== Accumulate method, in which RxSwift processes the data source through the representational Sink, and then lets the source observable sequence perform subscriptions.

Secondly, Producer receives at least two parameters at initialization: one is the observable sequence passed, and the other is the closure of the transformation operation. Of course, some transformations may require three parameters because of their characteristics. Scan operations, for example, need not only closure accumulator, but also a seed, which is also determined by the characteristics of Scan operations, not to mention here. After the Producer saves the necessary parameters for these transformations, sink in the run method can perform these transformations before subscribing to the output and then output them to the subscriber.

It is worth noting that because of the recursive calls between the run method and subscribe method, such an implementation pattern also naturally supports nested transformation operations.

3. Coolie Sink

So the execution of transformed closures is in all kinds of Sinks, such as MapSink:

func on(_ event: Event<SourceType>) {
    switch event {
    case .next(let element):
        do {
            /// Transform operations
            let mappedElement = try _selector(element, try incrementChecked(&_index))
            /// Forward the event after the transformation operation to the original observer
            forwardOn(.next(mappedElement))
        }
        catch let e {
            forwardOn(.error(e))
            dispose()
        }
    case .error(let error):
        forwardOn(.error(error))
        dispose()
    case .completed:
        forwardOn(.completed)
        dispose()
    }
}

As we can see, here we finally do the transformation operation, and the result is forwarded to the observer after the transformation operation.

So far, the whole transformation chain has been converted.

Design regret

On the basis of the definition method of composeMap, we can see the following annotation:

    // this is kind of ugly I know :(
    // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
    /// Optimizations for map operator

In the summary of the previous section, we know that the nesting of transformation operations in RxSwift is solved by recursive calls to run and subscribe methods. But there are problems, for example, when you nest 10 flatMap methods, each onNext will result in 10 recursive calls to transform operations, and then generate the final value to pass to the subscriber. It's like this in simple functional expressions:

10(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1)(+1) = 20

So why can't we just do that?

10(+10) = 20

Based on this consideration, we can see that the default implementation of map is special. It does not return a Map object directly, but a Map object through composeMap, and then rewrite the composeMap in the Map object to optimize the functional call when nested calls occur:

final fileprivate class Map<SourceType, ResultType>: Producer<ResultType> {

    // ....

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
        let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)
        let subscription = _source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

It is for such an optimization that it seems ugly, which is also a design pity.

Keywords: Swift

Added by GetPutDelete on Tue, 04 Jun 2019 04:26:09 +0300