Continue to the previous article: Data filtering details and examples of Rxjava2 Observable (1)
6. Filter
Only data items filtered by the function are emitted.
Instance code:
// filter(Predicate<? super Integer> predicate) // Verify data and decide whether to transmit data Observable.range(1, 10) .filter(new Predicate<Integer>() { @Override public boolean test(Integer t) throws Exception { // Test to see if launch data is required return t > 5 ? true : false; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept filter: " + t); } });
Output:
--> accept filter: 6 --> accept filter: 7 --> accept filter: 8 --> accept filter: 9 --> accept filter: 10
Javadoc: filter(predicate)
7. Frist
Only the First item or the First item of data satisfying a certain condition is transmitted. If you are only interested in the First data that is emitted by Observable, or the First data that satisfies a certain condition, you can use the First operator.
The Frist operator has the following operations:
7.1 firstElement()
Only the first data is transmitted when the data exists.
Instance code:
// 1. firstElement() // Transmit only the first data Observable.range(1, 10) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept firstElement(1): " + t); } });
Output:
--> accept firstElement(1): 1
Javadoc: firstElement()
7.2 first(defaultItem)
first(defaultItem) is similar to firstElement(), but emits a defaultItem default value that you specify in the parameter when the observer does not emit any data.
Instance code:
// 2. first(Integer defaultItem) // Launch the first data item. If there is no data item, send the default default item Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).first(999) // When no data is sent, send the default value 999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept first(2): " + t); } });
Output:
--> accept first(2): 999
Javadoc: first(defaultItem)
7.3 firstOrError()
Launch the first data item, and if there is no data item, a NoSuchElementException notification will be sent.
Instance code:
// 3. first(Integer defaultItem) // Launch the first data item. If there is no data item, there will be Error: NoSuchElementException Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).firstOrError() // NoSuchElementException notification will be sent when no data is sent .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe: "); } @Override public void onSuccess(Integer t) { System.out.println("--> accept onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> acctpt onError(3): " + e); } });
Output:
--> onSubscribe: --> acctpt onError(3): java.util.NoSuchElementException
Javadoc: firstOrError()
8. Single
single is similar to first, but if the original Observable does not launch data exactly once before it completes, it will throw a notification of NoSuchElementException.
Single has the following operations:
8.1 singleElement()
If more than one instance of data is sent, a NoSuchElementException notification will be sent.
Instance code:
// 1.singleElement() // If more than one instance of data is transmitted, no suchelementexception will occur Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).singleElement() // Send a single data, more than 1 data will have Error notification .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept singleElement(1): " + t); } },new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("--> OnError(1): " + t); } });
Output:
--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!
Javadoc: singleElement()
8.2 single(defaultItem)
Send single instance data. If no data item is received, send the specified default item data.
Instance code:
// 2. single(Integer defaultItem) // Send single instance data, no data item send the specified default item Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).single(999) // Send default data if no data is received 999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept single(2): " + t); } });
Output:
--> accept single(2): 999
Javadoc: single(defaultItem)
8.3 singleOrError()
Send a single instance of data. If the data source has no data items, send a NoSuchElementException notification.
Instance code:
// 3.singleOrError() // Send a single instance of data. If there is no data item in the data source, send a NoSuchElementException exception notification Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).singleOrError() // If no data item is sent, a NoSuchElementException exception notification is sent .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3): "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
Output:
--> onSubscribe(3): --> onError(3): java.util.NoSuchElementException
Javadoc: singleOrError()
9. ElementAt
The ElementAt operator takes the data sequence emitted by the original Observable, specifies the data item at the index location, and then emits it as its own unique data.
The ElementAt operator has the following operations:
9.1 elementAt(index)
The index item data of the index position is emitted (counting from 0). If the data does not exist, the IndexOutOfBoundsException exception will occur.
Instance code:
// 1. elementAt(long index) // Specifies to transmit the nth item of data (counting from 0). If the data does not exist, IndexOutOfBoundsException will occur Observable.range(1, 10) .elementAt(5) // Data item with index 5 in the emission data sequence, index starts from 0 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept ElementAt(1): " + t); } });
Output:
--> accept ElementAt(1): 6
Javadoc: elementAt(index)
9.2 elementAt(index, defaultItem)
Launch the index item data of index location (counting from 0). If the data does not exist, send the default defaultItem data.
Instance code:
// 2. elementAt(long index, Integer defaultItem) // Specifies to send the nth item of data (counting from 0). If the data does not exist, send the default defaultItem Observable.range(1, 10) .elementAt(20, 0) // Send the 20th data of index. If this data does not exist, send the default data 0 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept elementAt(2): " + t); } });
Output:
--> accept elementAt(2): 0
Javadoc: elementAt(index, defaultItem)
9.3 elementAtOrError(index)
Launch index position index item data (counting from 0). If the specified launch data does not exist, NoSuchElementException exception notification will be launched.
Instance code:
// 3. elementAtOrError(long index) // NoSuchElementException is thrown if the specified emitted data does not exist Observable.range(1, 10) .elementAtOrError(50) // Launch data with index 50, send NoSuchElementException exception notification if it does not exist .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3): "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
Output:
--> onSubscribe(3): --> onError(3): java.util.NoSuchElementException
Javadoc: elementAtOrError(index)
10. ignoreElements
No data is transmitted, only the termination notice of Observable is transmitted.
The IgnoreElements operator suppresses all data emitted by the original Observable and only allows its termination notification (onError or onCompleted) to pass.
Parsing: if you don't care about the data emitted by an Observable, but want to be notified when it is finished or when it encounters an error termination, you can use the ignoreElements operator on the Observable, which ensures that the observer's onNext() method will never be called.
Instance code:
// ignoreElements() // Only accept the notification of onError or onCompleted and intercept the onNext event (do not care about the launched data, only hope to receive the notification in case of success or failure) Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // int i = 1/0; emitter.onComplete(); } }).ignoreElements() .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
Output:
--> onSubscribe --> onComplete
Javadoc: ignoreElements()
11. Last
Only the last item (or the last item satisfying a certain condition) data is transmitted.
If you are only interested in the Last data that is emitted by Observable, or the Last data that satisfies a certain condition, you can use the Last operator.
Last has the following operations:
11.1 lastElement()
Only the last data is transmitted, and the last operator without parameters is used. If there is no data transmission in Observable, there is no data transmission.
Instance code:
// 1. lastElement() // Accept last data Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).lastElement() // If there is data transmission, the last data is transmitted, otherwise there is no data transmission .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept lastElement(1): " + t); } });
Output:
--> accept lastElement(1): 3
Javadoc: lastElement()
11.2 last(defaultItem)
Only the last data is sent. If no data is sent in the Observable, the specified default value, defaultItem, is sent.
Instance code:
// 2. last(Integer defaultItem) // Accept the last data. If no data is sent, send the default data: defaultItem Observable.range(0, 0) .last(999) // Accept the last data and send the default data 999 if there is no data .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: last(2): " + t); } });
Output:
--> accept: last(2): 999
Javadoc: last(defaultItem)
11.3 lastOrError()
Accept the last data, if no data is sent, throw NoSuchElementException exception notification.
Instance code:
// 3. lastOrError() // Accept the last data, if no data is sent, throw onError: NoSuchElementException Observable.range(0, 0) .lastOrError() // Accept the last data, and if there is no data, reflect NoSuchElementException exception notification .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe: "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3)"); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
Output:
--> onSubscribe: --> onError(3): java.util.NoSuchElementException
Javadoc: lastOrError()
12. Take
Using the Take operator allows you to modify the Observable behavior, return only the previous N items of data, and then send a completion notification to ignore the remaining data.
The Take operator has the following operations:
12.1 take(count)
If you use the take(n) operator for an Observable, and the data emitted by that Observable is less than N items, the Observable generated by the take operation will not throw an exception or send an onError notification, and only a small amount of the same data will be emitted before completion.
Instance code:
// 1. take(long count) // Return the previous count data Observable.range(1, 100) .take(5) // Return to the first 5 items of data .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept take(1): " + t); } });
Output:
--> accept take(1): 1 --> accept take(1): 2 --> accept take(1): 3 --> accept take(1): 4 --> accept take(1): 5
Javadoc: take(count)
12.2 take(timeout, TimeUnit)
Take the data within a certain time interval, with the optional parameter scheduler to specify the thread scheduler.
Instance code:
// 2. take(long time, TimeUnit unit,[Scheduler] scheduler) // Take the data within a certain time interval. The optional parameter scheduler specifies the thread scheduler Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS) .take(5, TimeUnit.SECONDS) // Return the data items of the first 5 seconds .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept take(2): " + t); } });
Output:
--> accept take(2): 1 --> accept take(2): 2 --> accept take(2): 3 --> accept take(2): 4 --> accept take(2): 5
Javadoc: take(timeout, TimeUnit)
Javadoc: take(timeout, TimeUnit, Scheduler)
13. TakeLast
Use the TakeLast operator to modify the original Observable. You can only transmit the last N items of data emitted by the Observable and ignore the previous data.
This variation of takeLast is executed on the calculation scheduler by default, but you can specify other schedulers with the third parameter.
TakeLast generally has the following operations:
13.1 takeLast(count)
By using the takeLast(count) operator, you can only transmit the last count data (or the count data before onCompleted() of the original Observable) and ignore the previous data. Note: This delays any data items that were emitted by the original Observable until it is complete.
Instance code:
// 1. takeLast(int count) // Accept Count data before the completion of Observable data transmission, and ignore the previous data Observable.range(1, 10) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(1): "); } }) .takeLast(5) // Send 5 items of data before the completion of data transmission .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept takeLast(1): " + t); } });
Output:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 6 --> accept(1): 7 --> accept(1): 8 --> accept(1): 9 --> accept(1): 10 --> onCompleted(1): --> accept takeLast(1): 6 --> accept takeLast(1): 7 --> accept takeLast(1): 8 --> accept takeLast(1): 9 --> accept takeLast(1): 10
Javadoc: takeLast(count)
13.2 takeLast(time, TimeUnit)
There is also a takeLast variant that takes a time rather than a quantity parameter. It will emit data that was emitted in the last period of the original Observable's life cycle. The duration and time unit are specified by parameters.
Note: This delays any data items that were emitted by the original Observable until it is complete.
Instance code:
// 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) // Optional parameter scheduler: specify work scheduler delayError: delay Error notification bufferSize: specify cache size // Accept the data items transmitted at the specified time interval before the completion of Observable data transmission Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(2): "); } }) .takeLast(3, TimeUnit.SECONDS) // Send data within 3 seconds before the completion of data transmission .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept takeLast(2): " + t); } });
Output:
--> accept(2): 1 --> accept(2): 2 --> accept(2): 3 --> accept(2): 4 --> accept(2): 5 --> onCompleted(2): --> accept takeLast(2): 3 --> accept takeLast(2): 4 --> accept takeLast(2): 5
Javadoc: takeLast(long time, TimeUnit unit)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
13.3 takeLast(count, time, TimeUnit)
Collect count data and launch in time period before receiving Observable launch.
Example code:
// 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) // Optional parameter scheduler: specify work scheduler delayError: delay Error notification bufferSize: specify cache size // Collect count data and launch before receiving Observable data transmission Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(3): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(3): "); } }) .takeLast(2, 500, TimeUnit.MILLISECONDS) // Receive 2 items of data within 500ms before the completion of the original data transmission .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept takeLast(3): " + t); } });
Output:
--> accept(3): 1 --> accept(3): 2 --> accept(3): 3 --> accept(3): 4 --> accept(3): 5 --> accept(3): 6 --> accept(3): 7 --> accept(3): 8 --> accept(3): 9 --> accept(3): 10 --> onCompleted(3): --> accept takeLast(3): 9 --> accept takeLast(3): 10
Javadoc: takeLast(long count, long time, TimeUnit unit)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
14. OfType
ofType is a special form of filter operator. It filters an Observable to return only data of the specified type.
Example code:
Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)}; // ofType(Class clazz) // Filter data, return only specific types of data Observable.fromArray(dataObjects) .ofType(Integer.class) // Filter data of type Integer .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept ofType: " + t); } });
Output:
--> accept ofType: 1 --> accept ofType: 5
Javadoc: ofType(Class clazz)
Summary:
The operators of data filtering mainly filter the data sequence transmitted by the Observable, filter the data items according to the specified rules, and ignore and discard other data. The actual development scenarios, such as network data filtering, database data filtering and so on, are important and common operations in development.
Rx introduction and explanation and complete catalog reference: Rxjava2 introduction and detailed examples
Instance code: