Data filtering details and examples of Rxjava2 Observable

Continue to the previous article: Data filtering details and examples of Rxjava2 Observable (1)

6. Filter

Only data items filtered by the function are emitted.

Instance code:

	// filter(Predicate<? super Integer> predicate)
	// Verify data and decide whether to transmit data
	Observable.range(1, 10)
			.filter(new Predicate<Integer>() {

				@Override
				public boolean test(Integer t) throws Exception {
					// Test to see if launch data is required
					return t > 5 ? true : false;
				}
			}).subscribe(new Consumer<Integer>() {

				@Override
				public void accept(Integer t) throws Exception {
					System.out.println("--> accept filter: " + t);
				}
			});

Output:

--> accept filter: 6
--> accept filter: 7
--> accept filter: 8
--> accept filter: 9
--> accept filter: 10

Javadoc: filter(predicate)

7. Frist

Only the First item or the First item of data satisfying a certain condition is transmitted. If you are only interested in the First data that is emitted by Observable, or the First data that satisfies a certain condition, you can use the First operator.

The Frist operator has the following operations:

7.1 firstElement()

Only the first data is transmitted when the data exists.

Instance code:

	// 1. firstElement()
	// Transmit only the first data
	Observable.range(1, 10)
		.firstElement()
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept firstElement(1): "  + t);
			}
		});

Output:

--> accept firstElement(1): 1

Javadoc: firstElement()

7.2 first(defaultItem)

first(defaultItem) is similar to firstElement(), but emits a defaultItem default value that you specify in the parameter when the observer does not emit any data.

Instance code:

	// 2. first(Integer defaultItem)
	// Launch the first data item. If there is no data item, send the default default item
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).first(999) // When no data is sent, send the default value 999
	  .subscribe(new Consumer<Integer>() {

		  @Override
		  public void accept(Integer t) throws Exception {
			  System.out.println("--> accept first(2): " + t);
		  }
	  });

Output:

--> accept first(2): 999

Javadoc: first(defaultItem)

7.3 firstOrError()

Launch the first data item, and if there is no data item, a NoSuchElementException notification will be sent.

Instance code:

	// 3. first(Integer defaultItem)
	// Launch the first data item. If there is no data item, there will be Error: NoSuchElementException
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).firstOrError() // NoSuchElementException notification will be sent when no data is sent
	  .subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe: ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> accept onSuccess(3): " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> acctpt onError(3): " + e);
			}
	  });

Output:

--> onSubscribe: 
--> acctpt onError(3): java.util.NoSuchElementException

Javadoc: firstOrError()

8. Single

single is similar to first, but if the original Observable does not launch data exactly once before it completes, it will throw a notification of NoSuchElementException.

Single has the following operations:

8.1 singleElement()

If more than one instance of data is sent, a NoSuchElementException notification will be sent.

Instance code:

	// 1.singleElement()
	// If more than one instance of data is transmitted, no suchelementexception will occur	
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);
			emitter.onNext(2);
			emitter.onComplete();
		}
	}).singleElement() // Send a single data, more than 1 data will have Error notification
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept singleElement(1): " + t);
			}
	  },new Consumer<Throwable>() {

		@Override
		public void accept(Throwable t) throws Exception {
			System.out.println("--> OnError(1): " + t);
		}
	});

Output:

--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!

Javadoc: singleElement()

8.2 single(defaultItem)

Send single instance data. If no data item is received, send the specified default item data.

Instance code:

	// 2. single(Integer defaultItem)
	// Send single instance data, no data item send the specified default item
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).single(999) // Send default data if no data is received 999
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept single(2): " + t);
			}
	  });

Output:

--> accept single(2): 999

Javadoc: single(defaultItem)

8.3 singleOrError()

Send a single instance of data. If the data source has no data items, send a NoSuchElementException notification.

Instance code:

	// 3.singleOrError()
	// Send a single instance of data. If there is no data item in the data source, send a NoSuchElementException exception notification
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onComplete();
		}
	}).singleOrError() // If no data item is sent, a NoSuchElementException exception notification is sent
	  .subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe(3): ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> onSuccess(3): " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(3): " + e);
			}
		});

Output:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: singleOrError()

9. ElementAt

The ElementAt operator takes the data sequence emitted by the original Observable, specifies the data item at the index location, and then emits it as its own unique data.

The ElementAt operator has the following operations:

9.1 elementAt(index)

The index item data of the index position is emitted (counting from 0). If the data does not exist, the IndexOutOfBoundsException exception will occur.

Instance code:

	// 1. elementAt(long index)
	// Specifies to transmit the nth item of data (counting from 0). If the data does not exist, IndexOutOfBoundsException will occur
	Observable.range(1, 10)
		.elementAt(5) // Data item with index 5 in the emission data sequence, index starts from 0
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept ElementAt(1): " + t);
			}
		});

Output:

--> accept ElementAt(1): 6

Javadoc: elementAt(index)

9.2 elementAt(index, defaultItem)

Launch the index item data of index location (counting from 0). If the data does not exist, send the default defaultItem data.

Instance code:

	// 2. elementAt(long index, Integer defaultItem)
	// Specifies to send the nth item of data (counting from 0). If the data does not exist, send the default defaultItem
	Observable.range(1, 10)
		.elementAt(20, 0) // Send the 20th data of index. If this data does not exist, send the default data 0
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept elementAt(2): " + t);
			}
		});

Output:

--> accept elementAt(2): 0

Javadoc: elementAt(index, defaultItem)

9.3 elementAtOrError(index)

Launch index position index item data (counting from 0). If the specified launch data does not exist, NoSuchElementException exception notification will be launched.

Instance code:

	// 3. elementAtOrError(long index)
	// NoSuchElementException is thrown if the specified emitted data does not exist
	Observable.range(1, 10)
		.elementAtOrError(50) // Launch data with index 50, send NoSuchElementException exception notification if it does not exist
		.subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe(3): ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> onSuccess(3): " + t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(3): " + e);
			}
		});

Output:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: elementAtOrError(index)

10. ignoreElements

No data is transmitted, only the termination notice of Observable is transmitted.

The IgnoreElements operator suppresses all data emitted by the original Observable and only allows its termination notification (onError or onCompleted) to pass.

Parsing: if you don't care about the data emitted by an Observable, but want to be notified when it is finished or when it encounters an error termination, you can use the ignoreElements operator on the Observable, which ensures that the observer's onNext() method will never be called.

Instance code:

	// ignoreElements()
	// Only accept the notification of onError or onCompleted and intercept the onNext event (do not care about the launched data, only hope to receive the notification in case of success or failure)
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);
			//	int i = 1/0;
			emitter.onComplete();
		}
	}).ignoreElements()
	  .subscribe(new CompletableObserver() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe");
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError: " + e);
			}

			@Override
			public void onComplete() {
				System.out.println("--> onComplete");
			}
	  });

Output:

--> onSubscribe
--> onComplete

Javadoc: ignoreElements()

11. Last

Only the last item (or the last item satisfying a certain condition) data is transmitted.

If you are only interested in the Last data that is emitted by Observable, or the Last data that satisfies a certain condition, you can use the Last operator.

Last has the following operations:

11.1 lastElement()

Only the last data is transmitted, and the last operator without parameters is used. If there is no data transmission in Observable, there is no data transmission.

Instance code:

	// 1. lastElement()
	// Accept last data
	Observable.create(new ObservableOnSubscribe<Integer>() {

		@Override
		public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
			emitter.onNext(1);
			emitter.onNext(2);
			emitter.onNext(3);
			emitter.onComplete();
		}
	}).lastElement() // If there is data transmission, the last data is transmitted, otherwise there is no data transmission
	  .subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept lastElement(1): " + t);
			}
		});

Output:

--> accept lastElement(1): 3

Javadoc: lastElement()

11.2 last(defaultItem)

Only the last data is sent. If no data is sent in the Observable, the specified default value, defaultItem, is sent.

Instance code:

	// 2. last(Integer defaultItem)
	// Accept the last data. If no data is sent, send the default data: defaultItem
	Observable.range(0, 0)
		.last(999) // Accept the last data and send the default data 999 if there is no data
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept: last(2): " + t);
			}
		});

Output:

--> accept: last(2): 999

Javadoc: last(defaultItem)

11.3 lastOrError()

Accept the last data, if no data is sent, throw NoSuchElementException exception notification.

Instance code:

	// 3. lastOrError()
	// Accept the last data, if no data is sent, throw onError: NoSuchElementException
	Observable.range(0, 0)
		.lastOrError() // Accept the last data, and if there is no data, reflect NoSuchElementException exception notification
		.subscribe(new SingleObserver<Integer>() {

			@Override
			public void onSubscribe(Disposable d) {
				System.out.println("--> onSubscribe: ");
			}

			@Override
			public void onSuccess(Integer t) {
				System.out.println("--> onSuccess(3)");
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("--> onError(3): " + e);
			}
		});

Output:

--> onSubscribe: 
--> onError(3): java.util.NoSuchElementException

Javadoc: lastOrError()

12. Take

Using the Take operator allows you to modify the Observable behavior, return only the previous N items of data, and then send a completion notification to ignore the remaining data.

The Take operator has the following operations:

12.1 take(count)

If you use the take(n) operator for an Observable, and the data emitted by that Observable is less than N items, the Observable generated by the take operation will not throw an exception or send an onError notification, and only a small amount of the same data will be emitted before completion.

Instance code:

	// 1. take(long count)
	// Return the previous count data
	Observable.range(1, 100)
		.take(5) // Return to the first 5 items of data
		.subscribe(new Consumer<Integer>() {

			@Override
			public void accept(Integer t) throws Exception {
				System.out.println("--> accept take(1): " + t);
			}
		});

Output:

--> accept take(1): 1
--> accept take(1): 2
--> accept take(1): 3
--> accept take(1): 4
--> accept take(1): 5

Javadoc: take(count)

12.2 take(timeout, TimeUnit)

Take the data within a certain time interval, with the optional parameter scheduler to specify the thread scheduler.

Instance code:

	// 2. take(long time, TimeUnit unit,[Scheduler] scheduler)
	//  Take the data within a certain time interval. The optional parameter scheduler specifies the thread scheduler
	Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS)
		.take(5, TimeUnit.SECONDS) // Return the data items of the first 5 seconds
		.subscribe(new Consumer<Long>() {

			@Override
			public void accept(Long t) throws Exception {
				System.out.println("--> accept take(2): " + t);
			}
		});

Output:

--> accept take(2): 1
--> accept take(2): 2
--> accept take(2): 3
--> accept take(2): 4
--> accept take(2): 5

Javadoc: take(timeout, TimeUnit)
Javadoc: take(timeout, TimeUnit, Scheduler)

13. TakeLast

Use the TakeLast operator to modify the original Observable. You can only transmit the last N items of data emitted by the Observable and ignore the previous data.

This variation of takeLast is executed on the calculation scheduler by default, but you can specify other schedulers with the third parameter.

TakeLast generally has the following operations:

13.1 takeLast(count)

By using the takeLast(count) operator, you can only transmit the last count data (or the count data before onCompleted() of the original Observable) and ignore the previous data. Note: This delays any data items that were emitted by the original Observable until it is complete.

Instance code:

    // 1. takeLast(int count)
    // Accept Count data before the completion of Observable data transmission, and ignore the previous data
    Observable.range(1, 10)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(1): ");
                }
            })
            .takeLast(5) // Send 5 items of data before the completion of data transmission
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept takeLast(1): " + t);
                }
            });

Output:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
--> onCompleted(1): 
--> accept takeLast(1): 6
--> accept takeLast(1): 7
--> accept takeLast(1): 8
--> accept takeLast(1): 9
--> accept takeLast(1): 10

Javadoc: takeLast(count)

13.2 takeLast(time, TimeUnit)

There is also a takeLast variant that takes a time rather than a quantity parameter. It will emit data that was emitted in the last period of the original Observable's life cycle. The duration and time unit are specified by parameters.

Note: This delays any data items that were emitted by the original Observable until it is complete.

Instance code:

    // 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // Optional parameter scheduler: specify work scheduler delayError: delay Error notification bufferSize: specify cache size
    // Accept the data items transmitted at the specified time interval before the completion of Observable data transmission
    Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(2): ");
                }
            })
            .takeLast(3, TimeUnit.SECONDS) // Send data within 3 seconds before the completion of data transmission
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(2): " + t);
                }
            });                                                     

Output:

--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
--> onCompleted(2): 
--> accept takeLast(2): 3
--> accept takeLast(2): 4
--> accept takeLast(2): 5

Javadoc: takeLast(long time, TimeUnit unit)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

13.3 takeLast(count, time, TimeUnit)

Collect count data and launch in time period before receiving Observable launch.

Example code:

    // 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // Optional parameter scheduler: specify work scheduler delayError: delay Error notification bufferSize: specify cache size
    // Collect count data and launch before receiving Observable data transmission
    Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(3): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(3): ");
                }
            })
            .takeLast(2, 500, TimeUnit.MILLISECONDS) // Receive 2 items of data within 500ms before the completion of the original data transmission
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(3): " + t);
                }
            });

Output:

--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10
--> onCompleted(3): 
--> accept takeLast(3): 9
--> accept takeLast(3): 10

Javadoc: takeLast(long count, long time, TimeUnit unit)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

14. OfType

ofType is a special form of filter operator. It filters an Observable to return only data of the specified type.

Example code:

    Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)};
    // ofType(Class clazz)
    // Filter data, return only specific types of data
    Observable.fromArray(dataObjects)
            .ofType(Integer.class) // Filter data of type Integer
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept ofType: " + t);
                }
            });

Output:

--> accept ofType: 1
--> accept ofType: 5

Javadoc: ofType(Class clazz)

Summary:

The operators of data filtering mainly filter the data sequence transmitted by the Observable, filter the data items according to the specified rules, and ignore and discard other data. The actual development scenarios, such as network data filtering, database data filtering and so on, are important and common operations in development.

Rx introduction and explanation and complete catalog reference: Rxjava2 introduction and detailed examples

Instance code:

Keywords: Mobile Java less network Database

Added by sk8erh4x0r on Thu, 02 Jan 2020 23:31:44 +0200