Examples of higher-order functions of RXSwift

    let lgError = NSError.init(domain: "com.error.cn", code: 10090, userInfo: nil)

    var disposeBag = DisposeBag()

1. Combination operators

1. startWith -- emits a specified sequence of elements before starting to emit elements from observable sources

let ss = "12"

Observable.of("1", "2", "3", "4")  // The type of element in it determines what type to add later.

            .startWith("A")

            .startWith("a")

            .startWith("a", "c", "s")

            .startWith(ss)

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Results:12.

            a

            c

            s

            a

            A

            1

            2

            3

            4 

2. merge -- Combines elements in a source observable sequence into a new observable sequence, and emits each element just like elements emitted from each source observable sequence.

let subject1 = PublishSubject<String>()

let subject2 = PublishSubject<String>()

let subject3 = PublishSubject<String>()

 

Observable.of(subject1, subject2, subject3) // Any number can be added, but it must be of the same type

            .merge()

            .subscribe(onNext: { print(($0)) })

            .disposed(by: disposeBag)

 

subject1.onNext("a")

subject2.onNext("b")

subject1.onNext("aaa")

subject2.onNext("bbb")

subject2.onNext("ccc")

subject3.onNext("ccc")

Result: a

           b

           aaa

           bbb

           ccc

           ccc

3. zip -- Combine up to eight source observable sequences into a new observable sequence, and emit elements of each source observable sequence at the corresponding index from the combined observable sequence

let stringSubject1 = PublishSubject<String>()

let intSubject2 = PublishSubject<Int>()

        

Observable.zip(stringSubject1, intSubject2) { stringElement, intElement in // You can also have multiple sequences

            "\(stringElement) \(intElement)"

            }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

 

stringSubject1.onNext("aa")

intSubject2.onNext(23)

intSubject2.onNext(13)

intSubject2.onNext(26)

intSubject2.onNext(34)

stringSubject1.onNext("bb")

stringSubject1.onNext("cc")   // Two sequences must be paired to have value in order to respond.

Result: aa 23

           bb 13

           cc 26

4. Combine Latest -- Combine eight source observable sequences into a new observation sequence, and start to emit the latest element observable sequence for each source of the joint observation sequence once all emission source sequences have at least one element, and when any new element is emitted from the source observable sequence.

let stringSub1 = PublishSubject<String>()

let intSub2 = PublishSubject<Int>()

        

Observable.combineLatest(stringSub1, intSub2){stringElement, intElement in // You can also have multiple sequences

            "\(stringElement) \(intElement)"

            }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

        

stringSub1.onNext("X")

intSub2.onNext(12)

stringSub1.onNext("Q")

intSub2.onNext(23)

intSub2.onNext(34)   // Similar zip,Pairs of values are required to respond, but the latter will override the former.

Result  X 12

         Q 12

         Q 23

         Q 34

Application:For example, the account and password are satisfied at the same time.->To be able to land. It doesn't matter how the password of the account changes. Just look at the final value.

5. switchLatest -- Converts elements emitted from observable sequences into observable sequences and emits elements from the nearest internal observable sequences

let switchLatestSub1 = BehaviorSubject(value: "A")

let switchLatestSub2 = BehaviorSubject(value: "1")

 let switchLatestSub  = BehaviorSubject(value: switchLatestSub1) // Monitor switchLatestSub1

        

switchLatestSub.asObservable()

            .switchLatest()

            .subscribe(onNext: { print(($0)) })

            .disposed(by: disposeBag)

        

switchLatestSub1.onNext("B")

switchLatestSub1.onNext("C")

switchLatestSub2.onNext("2")

switchLatestSub2.onNext("3")    // switchLatestSub No "2" or "3" will be monitored, but it will be saved. By default, the latter will override the former.

switchLatestSub.onNext(switchLatestSub2)    // Switch monitoring switchLatestSub2

switchLatestSub1.onNext("DDD")

switchLatestSub2.onNext("222")

switchLatestSub.onNext(switchLatestSub1) 

 Result   A

           B

           C

           3

           222

           DDD

2. Mapping operators

1. map-transformation closure applies to the elements emitted by the observable sequence and returns the new observable sequence of the transformed element.

 Observable.of(1, 2, 3, 4)

            .map { (number) -> Int in

                return number + 2

            }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result 

3

4

5

6

2. flatMap - Converts the elements emitted by observable sequences into observable sequences, and merges the emission of two observable sequences into one observable sequence.

let boy = Player(score: 100)

let girl = Player(score: 90)

let player = BehaviorSubject(value: boy)

        

player.asObservable()

            .flatMap { $0.score.asObservable() }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

        

boy.score.onNext(60)

player.onNext(girl)   // Response to any new launch of an observable sequence

boy.score.onNext(50)

girl.score.onNext(20)

Result

100

60

90

50

20

 

3. scan -- Start with a default value at the beginning, then apply an accumulator closure to each element emitted by an observable sequence, and return each intermediate result in the form of a single observable sequence of elements

Observable.of(10, 100, 1000)

            .scan(1) { aggregateValue, newValue in

                aggregateValue + newValue //  1 + (10) ,  1 + (100 + 10) , 1 + (1000 + 100 + 10)

            }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

11

111

1111

3. Filter Conditions Operator

1. filter -- Only those elements are emitted from observable sequences that satisfy specified conditions

Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

            .filter{ $0 % 2 == 0 }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

2

4

6

8

10

2. distinct UntilChanged -- Inhibits sequential repetitive elements emitted by observable sequences

Observable.of("1", "1", "2", "2", "2", "3", "4", "2", "2") // It can also be Int type

            .distinctUntilChanged()

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)  // Continuous repetition eliminates duplication

Result

1

2

3

4

2

3. elementAt -- emits elements only at the specified index of all elements emitted by the observable sequence

Observable.of("t", "e", "s", "t")

            .elementAt(2)

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

s

4. single -- emits only the first element emitted by the observable sequence (or the first element that satisfies the criteria). If the observable sequence emits multiple elements, an error is thrown.

Observable.of("bbb")

            .single() // Send out the first, because there is only one, so no error will be thrown.

            .subscribe(onNext: {print($0)} )

            .disposed(by: disposeBag)

        

Observable.of("bbb", "aaa", "aaa", "ccc")

            .single() // Send the first, because there are many, so the error will be thrown.

            .subscribe(onNext: {print($0)} )

            .disposed(by: disposeBag)

               

Observable.of("aaa", "bbb", "aaa", "ccc")

            .single { $0 == "ccc" } // There's only one condition to satisfy, so if you send this, you won't throw an error.

            .subscribe(onNext: {print($0)} )

            .disposed(by: disposeBag)

        

Observable.of("aaa", "bbb", "aaa", "ccc")

            .single { $0 == "aaa" } // There are more than one satisfying condition, so issue this and throw an error.

            .subscribe(onNext: {print($0)} )

            .disposed(by: disposeBag)

Result

bbb

bbb

Unhandled error happened: Sequence contains more than one element.

 subscription called from:

 

ccc

aaa

Unhandled error happened: Sequence contains more than one element.

 subscription called from:

5. take -- emits only a specified number of elements from the beginning of an observable sequence. There is only one sequence of signal s above that will be limited in actual development.

Observable.of("aaa", "bbb", "ccc", "ddd")

            .take(3)

            .subscribe(onNext: { print($0) } )

            .disposed(by: disposeBag)

Result

aaa

bbb

ccc

6. takeLast -- emits a specified number of elements only from the end of an observable sequence

Observable.of("aaa", "bbb", "ccc", "ddd")

            .takeLast(3)

            .subscribe(onNext: { print($0) } )

            .disposed(by: disposeBag)

Result

bbb

ccc

ddd

7. takeWhile -- emits elements from the beginning of an observable sequence as long as the value of the specified condition is true

Observable.of(1, 2, 3, 4, 5, 6, 7, 8)

            .takeWhile{ $0 < 4 }

            .subscribe(onNext: { print($0) } )

            .disposed(by: disposeBag)

Result

1

2

3

8. takeUntil -- emit elements from source observable sequences until reference observable sequences emit elements (cell reuse)

let sourceSequence = PublishSubject<String>()

let referenceSequence = PublishSubject<String>()

        

sourceSequence

            .takeUntil(referenceSequence)

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

        

sourceSequence.onNext("aaa")

sourceSequence.onNext("bbb")

sourceSequence.onNext("ccc")

        

 referenceSequence.onNext("AAA")  // down sourceSequence No more elements are emitted.

sourceSequence.onNext("ddd")

Result

aaa

bbb

ccc

9. skip -- emits elements from source observable sequences until reference observable sequences emit elements

Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 4)

            .skip(4)

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

5

6

7

8

4

    10,skipWhile -- 

Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 4)

            .skipWhile{ $0 < 3 }

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

3

4

5

6

7

8

4

11. skipUntil -- Inhibits elements emitted from source observable sequences until reference observable sequences emit elements

let sourceSeq = PublishSubject<String>()

let referenceSeq = PublishSubject<String>()

        

sourceSeq

            .skipUntil(referenceSeq)

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

        

sourceSeq.onNext("aaa")

sourceSeq.onNext("bbb")

sourceSeq.onNext("ccc")

        

referenceSeq.onNext("AAA")  // down sourceSeq It's just starting to emit elements.

sourceSeq.onNext("ddd")

Result

ddd

4. Collective Control Operators

1. toArray -- Converts an observable sequence into an array, emits the array as a new unit observable sequence, and then terminates.

Observable.range(start: 1, count: 10)

            .toArray()

            .subscribe{ print($0) }

            .disposed(by: disposeBag)

Result

success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

2. reduce -- Start with a set initialization value, then apply the accumulator closure to all elements emitted by an observable sequence, and return the aggregation results in the form of a single observable sequence of elements

Observable.of(10, 100, 1000)

            .reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

        

Observable.of(10, 100, 1000)

            .reduce(1, accumulator: -) // 1 - (10 + 100 + 1000) = -1109

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

1111

-1109

3. concat -- Connect elements from an internal observable sequence of an observable sequence sequentially, waiting for each sequence to terminate successfully before issuing elements from the next sequence

let subject1 = BehaviorSubject(value: "aaa")

let subject2 = BehaviorSubject(value: "AAA")

 

let subjectsSubject = BehaviorSubject(value: subject1)

subjectsSubject.asObservable()

            .concat()

            .subscribe(onNext: {print($0)} )

            .disposed(by: disposeBag)

subject1.onNext("bbb")

subject1.onNext("ccc")

        

subjectsSubject.onNext(subject2)

        

subject1.onNext("ddd")

subject2.onNext("BBB")

subject2.onNext("CCC")

 

subject1.onCompleted() // subject1 Stop hair subject2 Start sending, subject2 The last saved element will be issued

        

subject2.onNext("DDD")

subject1.onNext("eee")

Result

aaa

bbb

ccc

ddd

CCC

DDD

5. Operators Recovered from Error Notification

1. catchError JustReturn -- Recovering from an error event by returning an observable sequence that emits a single element and then terminates

let sequenceThatFails = PublishSubject<String>()

 

sequenceThatFails

            .catchErrorJustReturn("Test")

            .subscribe { print($0) }

            .disposed(by: disposeBag)

 

sequenceThatFails.onNext("AAA")

sequenceThatFails.onNext("BBB") // Successful sending of normal sequence

sequenceThatFails.onError(self.lgError) // Sequences that fail to send,Once the subscription is in place, we return to our previous error plan.

Result

next(AAA)

next(BBB)

next(Test)

completed

2. catchError -- Recovering from an error event by switching to the provided recovery observable sequence

let recoverySequence = PublishSubject<String>()

sequenceThatFails

            .catchError {

                print("Error:", $0)

                return recoverySequence  

            }

            .subscribe { print($0) }

            .disposed(by: disposeBag)

 

sequenceThatFails.onNext("AAA")

sequenceThatFails.onNext("BBB") // Successful sending of normal sequence

sequenceThatFails.onError(lgError) // Send a sequence that fails, then it can be sent successfully in the future.

        

recoverySequence.onNext("CCC")

Result

Error: Error Domain=com.error.cn Code=10090 "(null)"

next(CCC)

3. retry -- Recovering duplicated error events by indefinitely re-subscribing observable sequences

var count = 1 // External variable control flow

let sequenceRetryErrors = Observable<String>.create { observer in

            observer.onNext("AAA")

            observer.onNext("BBB")

            observer.onNext("CCC")

            

            if count == 1 { // When the process comes in, it goes too far - the conditions here can be used as an exit, the number of failures.

                observer.onError(self.lgError)  // Received error sequence,Retrial Sequence Generation

                print("Here comes the error sequence.")

                count += 1

            }

            

            observer.onNext("aaa")

            observer.onNext("bbb")

            observer.onNext("ccc")

            observer.onCompleted()

            

            return Disposables.create()

        }

        

sequenceRetryErrors

            .retry()

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

AAA

BBB

CCC

Here comes the error sequence.

AAA

BBB

CCC

aaa

bbb

ccc

4. retry(:) -- Repeatedly recovering from error events by re-subscribing to observable sequences until the number of retries reaches the max attempt count

let sequenceThatErrors = Observable<String>.create { observer in

            observer.onNext("AAA")

            observer.onNext("BBB")

            observer.onNext("CCC")

            

            if count < 10 { 

                observer.onError(self.lgError)

                print("Here comes the error sequence.")

                count += 1

            }

            

            observer.onNext("aaa")

            observer.onNext("bbb")

            observer.onNext("ccc")

            observer.onCompleted()

            

            return Disposables.create()

        }

        

sequenceThatErrors

            .retry(3) // Maximum repetition number

            .subscribe(onNext: { print($0) })

            .disposed(by: disposeBag)

Result

AAA

BBB

CCC

Here comes the error sequence.

AAA

BBB

CCC

Here comes the error sequence.

AAA

BBB

CCC

Here comes the error sequence.

Unhandled error happened: Error Domain=com.error.cn Code=10090 "(null)"

 subscription called from:

 

 

Added by dragin33 on Fri, 16 Aug 2019 08:48:10 +0300