RxJava3. Getting started with X -- creating and transforming operators

RxJava3. Getting started with X (2) -- create operators and transform operators

Create Operator

1, Foundation creation

1.create()

Use the Create operator to Create a complete Observable, which can pass onNext, onError, onCompleted and other events.

Demo code:

        Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

2, Quick create

1.just()

  • Features: send the incoming events directly
    matters needing attention:
  • just() can only send up to 9 parameters
  • If you pass null to just, it will return an Observable that emits null values

Demo code:

Observable.just(1, 2, 3)
               .subscribe(new Observer<Integer>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                       Log.d(TAG, "onSubscribe: "+d);
                   }

                   @Override
                   public void onNext(Integer integer) {
                       Log.d(TAG, "onNext: "+integer);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: "+e);
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete");
                   }
               });

2.fromArray()

  • Features: directly send the data array passed in
  • fromArray converts the data in the array into an Observable object

Application scenario:

The Observable object sends more than 10 events (in array form), and the array is traversed

Integer[] nums = {1, 2, 3};
        Observable.fromArray(nums)
               .subscribe(new Observer<Integer>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                       Log.d(TAG, "onSubscribe: "+d);
                   }

                   @Override
                   public void onNext(Integer integer) {
                       Log.d(TAG, "onNext: "+integer);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: "+e);
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete");
                   }
               });

3.fromIterable()

  • Features: send the incoming collection List data directly

Application scenario:

  1. Quickly create an Observable object and send more than 10 events (collection form)
  2. Collection element traversal
  List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        Observable.fromIterable(list)
               .subscribe(new Observer<Integer>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                       Log.d(TAG, "onSubscribe: "+d);
                   }

                   @Override
                   public void onNext(Integer integer) {
                       Log.d(TAG, "onNext: "+integer);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: "+e);
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete");
                   }
               });
    }

4. Other quick creation operators

  • empty(): only the Complete event is sent and the completion is notified directly
  • error(): send only the error event and directly notify the exception
  • never(): do not send any events
 Observable.empty()
               .subscribe(new Observer<Object>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                       Log.d(TAG, "onSubscribe: "+d);
                   }

                   @Override
                   public void onNext(Object o) {
                       Log.d(TAG, "onNext: "+o);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: ");
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete: ");
                   }
               });
 Observable.error(new RuntimeException())
               .subscribe(new Observer<Object>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                       Log.d(TAG, "onSubscribe: "+d);
                   }

                   @Override
                   public void onNext(Object o) {
                       Log.d(TAG, "onNext: "+o);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: ");
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete: ");
                   }
               });
Observable.never()
               .subscribe(new Observer<Object>() {
                   @Override
                   public void onSubscribe(Disposable d) {
                       Log.d(TAG, "onSubscribe: "+d);
                   }

                   @Override
                   public void onNext(Object o) {
                       Log.d(TAG, "onNext: "+o);
                   }

                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: ");
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete: ");
                   }
               });

3, Delay creation

Demand scenario:

  • Timing operation: after x seconds, y operation needs to be performed automatically
  • Periodic operation: after every x seconds, y operation needs to be performed automatically

1.timer()

  • timer(long delay, @NonNull TimeUnit unit)
    Parameter Description: parameter 1 = delay time, parameter 2 = time unit
  • Function: create an Observable, which sends a Long value of 0 after a given delay
  • Application: delay the specified event and send a 0, which is generally used for detection

Demo code:

        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull Long aLong) {
                Log.d(TAG, "onNext: "+aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

3.interval()

  • interval(long initialDelay, long period, @NonNull TimeUnit unit)
    Parameter Description: parameter 1 = first delay time, parameter 2 = interval time number, parameter 3 = time unit
  • Function: transmit an infinitely increasing integer sequence at a fixed time interval

Demo code:

        Observable.interval(2, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull Long aLong) {
                Log.d(TAG, "onNext: "+aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

Features of the event sequence sent in this example: send the event after a delay of 2s, and generate a number every 2 seconds (increasing by 1 from 0, unlimited)

3.range()

  • range(int start, int count)
    Parameter Description: parameter 1 = starting point of event sequence, parameter 2 = number of events
  • Function: continuously and incrementally send count event sequences from start without delay

Demo code:

        Observable.range(2, 10)
                .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull Integer i) {
                Log.d(TAG, "onNext: "+i);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

Features of the event sequence sent in this example: start sending from 2, and each sending event is incremented by 1. A total of 10 events are sent

Transform operator

1, Function

Process (i.e. transform) the events / the whole event sequence in the event sequence to make it into different events / the whole event sequence

2, Operator summary

1.map()

  • Function: apply a function to each data transmitted by Observable to perform transformation operation
  • Application scenario: data type conversion

Demo code:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return "map After function processing"+integer+"Becomes a string type";
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

Code run result:

2.flatMap()

  • Function: transform an Observable transmitting data into multiple Observables, and then merge their transmitted data into a single Observable
  • Application scenario: unordered transformation of the whole event sequence sent by the observer

Demo code:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Throwable {
                ArrayList<String> list = new ArrayList<>();
                for(int i = 0 ; i < 3 ; i ++){
                    list.add("I'm data"+integer+"Branch events"+i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

Code run result:
Note: the unordered sending event is not reflected here. Note that it is an unordered sending event when using it

3.concatMap()

  • Function: transform an Observable transmitting data into multiple Observables, and then merge their transmitted data into a single Observable
  • For scenario: orderly transform the whole event sequence sent by the observer

Demo code:

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Throwable {
                ArrayList<String> list = new ArrayList<>();
                for(int i = 0 ; i < 3 ; i ++){
                    list.add("I'm data"+integer+"Branch events"+i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d(TAG, "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

Code run result:

4.buffer()

  • Function: regularly obtain a certain number of events from the events that the observable needs to send, put them into the cache, and finally send them
  • Application scenario: cache events sent by observers

Demo code:

        Observable.just(1,2,3,4,5)
                .buffer(3,1)//Buffer size & step size
                .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(@NonNull List<Integer> list) {
                Log.d(TAG, "Number of events in the cache = " + list.size());
                for(Integer integer: list)
                Log.d(TAG, "onNext: event"+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

Code run result:

reference

Rxjava2.0 for Android X 2 -- rxjava creation operator
Rxjava2.0 for Android X 3 -- rxjava transform operator

Keywords: Android Design Pattern Functional Programming

Added by Qazsad on Wed, 05 Jan 2022 12:44:40 +0200