let lgError = NSError.init(domain: "com.error.cn", code: 10090, userInfo: nil)
var disposeBag = DisposeBag()
1. Combination operators
1. startWith -- emits a specified sequence of elements before starting to emit elements from observable sources
let ss = "12"
Observable.of("1", "2", "3", "4") // The type of element in it determines what type to add later.
.startWith("A")
.startWith("a")
.startWith("a", "c", "s")
.startWith(ss)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Results:12.
a
c
s
a
A
1
2
3
4
2. merge -- Combines elements in a source observable sequence into a new observable sequence, and emits each element just like elements emitted from each source observable sequence.
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
let subject3 = PublishSubject<String>()
Observable.of(subject1, subject2, subject3) // Any number can be added, but it must be of the same type
.merge()
.subscribe(onNext: { print(($0)) })
.disposed(by: disposeBag)
subject1.onNext("a")
subject2.onNext("b")
subject1.onNext("aaa")
subject2.onNext("bbb")
subject2.onNext("ccc")
subject3.onNext("ccc")
Result: a
b
aaa
bbb
ccc
ccc
3. zip -- Combine up to eight source observable sequences into a new observable sequence, and emit elements of each source observable sequence at the corresponding index from the combined observable sequence
let stringSubject1 = PublishSubject<String>()
let intSubject2 = PublishSubject<Int>()
Observable.zip(stringSubject1, intSubject2) { stringElement, intElement in // You can also have multiple sequences
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject1.onNext("aa")
intSubject2.onNext(23)
intSubject2.onNext(13)
intSubject2.onNext(26)
intSubject2.onNext(34)
stringSubject1.onNext("bb")
stringSubject1.onNext("cc") // Two sequences must be paired to have value in order to respond.
Result: aa 23
bb 13
cc 26
4. Combine Latest -- Combine eight source observable sequences into a new observation sequence, and start to emit the latest element observable sequence for each source of the joint observation sequence once all emission source sequences have at least one element, and when any new element is emitted from the source observable sequence.
let stringSub1 = PublishSubject<String>()
let intSub2 = PublishSubject<Int>()
Observable.combineLatest(stringSub1, intSub2){stringElement, intElement in // You can also have multiple sequences
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub1.onNext("X")
intSub2.onNext(12)
stringSub1.onNext("Q")
intSub2.onNext(23)
intSub2.onNext(34) // Similar zip,Pairs of values are required to respond, but the latter will override the former.
Result X 12
Q 12
Q 23
Q 34
Application:For example, the account and password are satisfied at the same time.->To be able to land. It doesn't matter how the password of the account changes. Just look at the final value.
5. switchLatest -- Converts elements emitted from observable sequences into observable sequences and emits elements from the nearest internal observable sequences
let switchLatestSub1 = BehaviorSubject(value: "A")
let switchLatestSub2 = BehaviorSubject(value: "1")
let switchLatestSub = BehaviorSubject(value: switchLatestSub1) // Monitor switchLatestSub1
switchLatestSub.asObservable()
.switchLatest()
.subscribe(onNext: { print(($0)) })
.disposed(by: disposeBag)
switchLatestSub1.onNext("B")
switchLatestSub1.onNext("C")
switchLatestSub2.onNext("2")
switchLatestSub2.onNext("3") // switchLatestSub No "2" or "3" will be monitored, but it will be saved. By default, the latter will override the former.
switchLatestSub.onNext(switchLatestSub2) // Switch monitoring switchLatestSub2
switchLatestSub1.onNext("DDD")
switchLatestSub2.onNext("222")
switchLatestSub.onNext(switchLatestSub1)
Result A
B
C
3
222
DDD
2. Mapping operators
1. map-transformation closure applies to the elements emitted by the observable sequence and returns the new observable sequence of the transformed element.
Observable.of(1, 2, 3, 4)
.map { (number) -> Int in
return number + 2
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
3
4
5
6
2. flatMap - Converts the elements emitted by observable sequences into observable sequences, and merges the emission of two observable sequences into one observable sequence.
let boy = Player(score: 100)
let girl = Player(score: 90)
let player = BehaviorSubject(value: boy)
player.asObservable()
.flatMap { $0.score.asObservable() }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
boy.score.onNext(60)
player.onNext(girl) // Response to any new launch of an observable sequence
boy.score.onNext(50)
girl.score.onNext(20)
Result
100
60
90
50
20
3. scan -- Start with a default value at the beginning, then apply an accumulator closure to each element emitted by an observable sequence, and return each intermediate result in the form of a single observable sequence of elements
Observable.of(10, 100, 1000)
.scan(1) { aggregateValue, newValue in
aggregateValue + newValue // 1 + (10) , 1 + (100 + 10) , 1 + (1000 + 100 + 10)
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
11
111
1111
3. Filter Conditions Operator
1. filter -- Only those elements are emitted from observable sequences that satisfy specified conditions
Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.filter{ $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
2
4
6
8
10
2. distinct UntilChanged -- Inhibits sequential repetitive elements emitted by observable sequences
Observable.of("1", "1", "2", "2", "2", "3", "4", "2", "2") // It can also be Int type
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag) // Continuous repetition eliminates duplication
Result
1
2
3
4
2
3. elementAt -- emits elements only at the specified index of all elements emitted by the observable sequence
Observable.of("t", "e", "s", "t")
.elementAt(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
s
4. single -- emits only the first element emitted by the observable sequence (or the first element that satisfies the criteria). If the observable sequence emits multiple elements, an error is thrown.
Observable.of("bbb")
.single() // Send out the first, because there is only one, so no error will be thrown.
.subscribe(onNext: {print($0)} )
.disposed(by: disposeBag)
Observable.of("bbb", "aaa", "aaa", "ccc")
.single() // Send the first, because there are many, so the error will be thrown.
.subscribe(onNext: {print($0)} )
.disposed(by: disposeBag)
Observable.of("aaa", "bbb", "aaa", "ccc")
.single { $0 == "ccc" } // There's only one condition to satisfy, so if you send this, you won't throw an error.
.subscribe(onNext: {print($0)} )
.disposed(by: disposeBag)
Observable.of("aaa", "bbb", "aaa", "ccc")
.single { $0 == "aaa" } // There are more than one satisfying condition, so issue this and throw an error.
.subscribe(onNext: {print($0)} )
.disposed(by: disposeBag)
Result
bbb
bbb
Unhandled error happened: Sequence contains more than one element.
subscription called from:
ccc
aaa
Unhandled error happened: Sequence contains more than one element.
subscription called from:
5. take -- emits only a specified number of elements from the beginning of an observable sequence. There is only one sequence of signal s above that will be limited in actual development.
Observable.of("aaa", "bbb", "ccc", "ddd")
.take(3)
.subscribe(onNext: { print($0) } )
.disposed(by: disposeBag)
Result
aaa
bbb
ccc
6. takeLast -- emits a specified number of elements only from the end of an observable sequence
Observable.of("aaa", "bbb", "ccc", "ddd")
.takeLast(3)
.subscribe(onNext: { print($0) } )
.disposed(by: disposeBag)
Result
bbb
ccc
ddd
7. takeWhile -- emits elements from the beginning of an observable sequence as long as the value of the specified condition is true
Observable.of(1, 2, 3, 4, 5, 6, 7, 8)
.takeWhile{ $0 < 4 }
.subscribe(onNext: { print($0) } )
.disposed(by: disposeBag)
Result
1
2
3
8. takeUntil -- emit elements from source observable sequences until reference observable sequences emit elements (cell reuse)
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSequence.onNext("aaa")
sourceSequence.onNext("bbb")
sourceSequence.onNext("ccc")
referenceSequence.onNext("AAA") // down sourceSequence No more elements are emitted.
sourceSequence.onNext("ddd")
Result
aaa
bbb
ccc
9. skip -- emits elements from source observable sequences until reference observable sequences emit elements
Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 4)
.skip(4)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
5
6
7
8
4
10,skipWhile --
Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 4)
.skipWhile{ $0 < 3 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
3
4
5
6
7
8
4
11. skipUntil -- Inhibits elements emitted from source observable sequences until reference observable sequences emit elements
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.skipUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
sourceSeq.onNext("aaa")
sourceSeq.onNext("bbb")
sourceSeq.onNext("ccc")
referenceSeq.onNext("AAA") // down sourceSeq It's just starting to emit elements.
sourceSeq.onNext("ddd")
Result
ddd
4. Collective Control Operators
1. toArray -- Converts an observable sequence into an array, emits the array as a new unit observable sequence, and then terminates.
Observable.range(start: 1, count: 10)
.toArray()
.subscribe{ print($0) }
.disposed(by: disposeBag)
Result
success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
2. reduce -- Start with a set initialization value, then apply the accumulator closure to all elements emitted by an observable sequence, and return the aggregation results in the form of a single observable sequence of elements
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Observable.of(10, 100, 1000)
.reduce(1, accumulator: -) // 1 - (10 + 100 + 1000) = -1109
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
1111
-1109
3. concat -- Connect elements from an internal observable sequence of an observable sequence sequentially, waiting for each sequence to terminate successfully before issuing elements from the next sequence
let subject1 = BehaviorSubject(value: "aaa")
let subject2 = BehaviorSubject(value: "AAA")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe(onNext: {print($0)} )
.disposed(by: disposeBag)
subject1.onNext("bbb")
subject1.onNext("ccc")
subjectsSubject.onNext(subject2)
subject1.onNext("ddd")
subject2.onNext("BBB")
subject2.onNext("CCC")
subject1.onCompleted() // subject1 Stop hair subject2 Start sending, subject2 The last saved element will be issued
subject2.onNext("DDD")
subject1.onNext("eee")
Result
aaa
bbb
ccc
ddd
CCC
DDD
5. Operators Recovered from Error Notification
1. catchError JustReturn -- Recovering from an error event by returning an observable sequence that emits a single element and then terminates
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("Test")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AAA")
sequenceThatFails.onNext("BBB") // Successful sending of normal sequence
sequenceThatFails.onError(self.lgError) // Sequences that fail to send,Once the subscription is in place, we return to our previous error plan.
Result
next(AAA)
next(BBB)
next(Test)
completed
2. catchError -- Recovering from an error event by switching to the provided recovery observable sequence
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AAA")
sequenceThatFails.onNext("BBB") // Successful sending of normal sequence
sequenceThatFails.onError(lgError) // Send a sequence that fails, then it can be sent successfully in the future.
recoverySequence.onNext("CCC")
Result
Error: Error Domain=com.error.cn Code=10090 "(null)"
next(CCC)
3. retry -- Recovering duplicated error events by indefinitely re-subscribing observable sequences
var count = 1 // External variable control flow
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count == 1 { // When the process comes in, it goes too far - the conditions here can be used as an exit, the number of failures.
observer.onError(self.lgError) // Received error sequence,Retrial Sequence Generation
print("Here comes the error sequence.")
count += 1
}
observer.onNext("aaa")
observer.onNext("bbb")
observer.onNext("ccc")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
AAA
BBB
CCC
Here comes the error sequence.
AAA
BBB
CCC
aaa
bbb
ccc
4. retry(:) -- Repeatedly recovering from error events by re-subscribing to observable sequences until the number of retries reaches the max attempt count
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AAA")
observer.onNext("BBB")
observer.onNext("CCC")
if count < 10 {
observer.onError(self.lgError)
print("Here comes the error sequence.")
count += 1
}
observer.onNext("aaa")
observer.onNext("bbb")
observer.onNext("ccc")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3) // Maximum repetition number
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Result
AAA
BBB
CCC
Here comes the error sequence.
AAA
BBB
CCC
Here comes the error sequence.
AAA
BBB
CCC
Here comes the error sequence.
Unhandled error happened: Error Domain=com.error.cn Code=10090 "(null)"
subscription called from: