RxJava3.x Getting Started Guide - filter operators

RxJava3.x Getting Started Guide (IV) -- filter operator

1, Filter events based on specified event criteria

1.filter()

  • Function: filter the events sent by the observer through certain logic. If true is returned, the event will be sent, otherwise it will not be sent

Demo code:

        Observable
                .just(1,2,3,4,5)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Throwable {
                        return integer % 3 == 2;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

2.ofType()

  • ofType is a special form of the filter operator. It filters an Observable and returns only data of the specified type.

Demo code:

        Observable
                .just(1,2,3,4,5,"Hello,World!")
                .ofType(Integer.class)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

3.skip() & skipLast()

  • skip() function: ignore the first N items of data transmitted by Observable 'and retain only the subsequent data.
  • Skipplast() function: skip a specified number of events from the end to the front

Demo code:

        Observable
                .just(1,2,3,4,5)
                .skip(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
        Observable
                .just(1,2,3,4,5)
                .skipLast(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

Code run result:

4.distinct()/distinctUntilChanged()

  • distinct() function: filter the repeated events in the event sequence
  • Distinguishuntilchanged() function: to filter the continuous repeated events in the event sequence

Demo code:

Observable
                .just(1,1,2,3,2,2,4,3,5)
                .skipLast(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
        Observable
                .just(1,1,2,3,2,2,4,3,5)
                .distinctUntilChanged()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

2, Filter events based on the specified number of events

1.take()

  • Function: only transmit the previous N items of data

Demo code:

        Observable
                .just(1,2,3,4,5)
                .take(3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

2.takeLast()

  • Function: transmit the last N data transmitted by Observable

Demo code:

        Observable
                .just(1,2,3,4,5)
                .takeLast(3)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

3, Filter events according to the specified time

1.throttleFirst()/throttleLast()

  • throttleFirst() function: send only the first event in a certain period of time
  • throttleLast() function: only send the last event in a certain period of time

Demo code:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2);
                Thread.sleep(400);
                e.onNext(3);
                Thread.sleep(300);
                e.onNext(4);
                Thread.sleep(300);
                e.onNext(5);
                Thread.sleep(300);
                e.onNext(6);
                Thread.sleep(400);
                e.onNext(7);
                Thread.sleep(300);
                e.onNext(8);
                Thread.sleep(300);
                e.onNext(9);
                Thread.sleep(300);
                e.onComplete();

            }
        }).throttleFirst(1, TimeUnit.SECONDS)//Use data every 1 second
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2);
                Thread.sleep(400);
                e.onNext(3);
                Thread.sleep(300);
                e.onNext(4);
                Thread.sleep(300);
                e.onNext(5);
                Thread.sleep(300);
                e.onNext(6);
                Thread.sleep(400);
                e.onNext(7);
                Thread.sleep(300);
                e.onNext(8);
                Thread.sleep(300);
                e.onNext(9);
                Thread.sleep(300);
                e.onComplete();

            }
        }).throttleLast(1, TimeUnit.SECONDS)//Use data every 1 second
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

2.sample()

  • Function: send only the latest (last) event in a certain period of time

The function is similar to that of throttleLast(). It can be replaced directly without code demonstration.

3.throttleWithTimeout()/debounce()

  • Function: when sending data events, if the interval between two sending events is less than the specified time, the previous data will be discarded until no new data is transmitted within the specified time

Demo code:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2);
                Thread.sleep(400);
                e.onNext(3);
                Thread.sleep(300);
                e.onNext(4);
                Thread.sleep(1100);
                e.onNext(5);
                Thread.sleep(300);
                e.onNext(6);
                Thread.sleep(400);
                e.onNext(7);
                Thread.sleep(300);
                e.onNext(8);
                Thread.sleep(300);
                e.onNext(9);
                Thread.sleep(300);
                e.onComplete();

            }
        }).debounce(1, TimeUnit.SECONDS)//Set interval
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: "+d);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: "+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

Code run result:

4, Filter events based on the specified event location

1.firstElement()/lastElement()

  • Function: select only the first / last element

Demo code:

        Observable
                .just(1,2,3,4,5)
                .firstElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
        Observable
                .just(1,2,3,4,5)
                .lastElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d(TAG, "accept: " + integer);
                    }
                });

Code run result:

2.elementAt()

  • Function: specify to receive an element (determined by index value, starting from 0)
  • Feature: it is allowed to cross the boundary, i.e. the obtained location index > the length of the sending event sequence (exceeding the number of no sending events)

Demo code:

        Observable
                .just(1,2,3,4,5)
                .elementAt(3)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d(TAG, "accept: " + integer);
                    }
                });

Code run result:

3.elementAtOrError()

  • Function: on the basis of elementAt(), an exception is thrown in case of out of bounds (i.e. the obtained location index > the length of the sent event sequence)

Demo code:

        Observable
                .just(1,2,3,4,5)
                .elementAtOrError(6)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Throwable {
                        Log.d(TAG, "accept: " + integer);
                    }
                });

Code run result:

reference

Rxjava2.0 for Android X 5 -- rxjava filter operator

Keywords: Android Design Pattern Functional Programming

Added by stb74 on Tue, 04 Jan 2022 20:54:56 +0200