RxJava3.x Getting Started Guide (IV) -- filter operator
1, Filter events based on specified event criteria
1.filter()
- Function: filter the events sent by the observer through certain logic. If true is returned, the event will be sent, otherwise it will not be sent
Demo code:
Observable .just(1,2,3,4,5) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Throwable { return integer % 3 == 2; } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
2.ofType()
- ofType is a special form of the filter operator. It filters an Observable and returns only data of the specified type.
Demo code:
Observable .just(1,2,3,4,5,"Hello,World!") .ofType(Integer.class) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
3.skip() & skipLast()
- skip() function: ignore the first N items of data transmitted by Observable 'and retain only the subsequent data.
- Skipplast() function: skip a specified number of events from the end to the front
Demo code:
Observable .just(1,2,3,4,5) .skip(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Observable .just(1,2,3,4,5) .skipLast(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
Code run result:
4.distinct()/distinctUntilChanged()
- distinct() function: filter the repeated events in the event sequence
- Distinguishuntilchanged() function: to filter the continuous repeated events in the event sequence
Demo code:
Observable .just(1,1,2,3,2,2,4,3,5) .skipLast(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Observable .just(1,1,2,3,2,2,4,3,5) .distinctUntilChanged() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
2, Filter events based on the specified number of events
1.take()
- Function: only transmit the previous N items of data
Demo code:
Observable .just(1,2,3,4,5) .take(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
2.takeLast()
- Function: transmit the last N data transmitted by Observable
Demo code:
Observable .just(1,2,3,4,5) .takeLast(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
3, Filter events according to the specified time
1.throttleFirst()/throttleLast()
- throttleFirst() function: send only the first event in a certain period of time
- throttleLast() function: only send the last event in a certain period of time
Demo code:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).throttleFirst(1, TimeUnit.SECONDS)//Use data every 1 second .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).throttleLast(1, TimeUnit.SECONDS)//Use data every 1 second .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
2.sample()
- Function: send only the latest (last) event in a certain period of time
The function is similar to that of throttleLast(). It can be replaced directly without code demonstration.
3.throttleWithTimeout()/debounce()
- Function: when sending data events, if the interval between two sending events is less than the specified time, the previous data will be discarded until no new data is transmitted within the specified time
Demo code:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(1100); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).debounce(1, TimeUnit.SECONDS)//Set interval .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
4, Filter events based on the specified event location
1.firstElement()/lastElement()
- Function: select only the first / last element
Demo code:
Observable .just(1,2,3,4,5) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept: " + integer); } });
Observable .just(1,2,3,4,5) .lastElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept: " + integer); } });
Code run result:
2.elementAt()
- Function: specify to receive an element (determined by index value, starting from 0)
- Feature: it is allowed to cross the boundary, i.e. the obtained location index > the length of the sending event sequence (exceeding the number of no sending events)
Demo code:
Observable .just(1,2,3,4,5) .elementAt(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept: " + integer); } });
Code run result:
3.elementAtOrError()
- Function: on the basis of elementAt(), an exception is thrown in case of out of bounds (i.e. the obtained location index > the length of the sent event sequence)
Demo code:
Observable .just(1,2,3,4,5) .elementAtOrError(6) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept: " + integer); } });
Code run result: