RxJava3. Getting started with X (2) -- create operators and transform operators
Create Operator
1, Foundation creation
1.create()
Use the Create operator to Create a complete Observable, which can pass onNext, onError, onCompleted and other events.
Demo code:
Observable.create((ObservableOnSubscribe<Integer>) e -> { e.onNext(1); e.onNext(2); e.onNext(3); }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
2, Quick create
1.just()
- Features: send the incoming events directly
matters needing attention: - just() can only send up to 9 parameters
- If you pass null to just, it will return an Observable that emits null values
Demo code:
Observable.just(1, 2, 3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
2.fromArray()
- Features: directly send the data array passed in
- fromArray converts the data in the array into an Observable object
Application scenario:
The Observable object sends more than 10 events (in array form), and the array is traversed
Integer[] nums = {1, 2, 3}; Observable.fromArray(nums) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
3.fromIterable()
- Features: send the incoming collection List data directly
Application scenario:
- Quickly create an Observable object and send more than 10 events (collection form)
- Collection element traversal
List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: "+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
4. Other quick creation operators
- empty(): only the Complete event is sent and the completion is notified directly
- error(): send only the error event and directly notify the exception
- never(): do not send any events
Observable.empty() .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Object o) { Log.d(TAG, "onNext: "+o); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } });
Observable.error(new RuntimeException()) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Object o) { Log.d(TAG, "onNext: "+o); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } });
Observable.never() .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(Object o) { Log.d(TAG, "onNext: "+o); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } });
3, Delay creation
Demand scenario:
- Timing operation: after x seconds, y operation needs to be performed automatically
- Periodic operation: after every x seconds, y operation needs to be performed automatically
1.timer()
- timer(long delay, @NonNull TimeUnit unit)
Parameter Description: parameter 1 = delay time, parameter 2 = time unit - Function: create an Observable, which sends a Long value of 0 after a given delay
- Application: delay the specified event and send a 0, which is generally used for detection
Demo code:
Observable.timer(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull Long aLong) { Log.d(TAG, "onNext: "+aLong); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
3.interval()
- interval(long initialDelay, long period, @NonNull TimeUnit unit)
Parameter Description: parameter 1 = first delay time, parameter 2 = interval time number, parameter 3 = time unit - Function: transmit an infinitely increasing integer sequence at a fixed time interval
Demo code:
Observable.interval(2, 2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull Long aLong) { Log.d(TAG, "onNext: "+aLong); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Features of the event sequence sent in this example: send the event after a delay of 2s, and generate a number every 2 seconds (increasing by 1 from 0, unlimited)
3.range()
- range(int start, int count)
Parameter Description: parameter 1 = starting point of event sequence, parameter 2 = number of events - Function: continuously and incrementally send count event sequences from start without delay
Demo code:
Observable.range(2, 10) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull Integer i) { Log.d(TAG, "onNext: "+i); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Features of the event sequence sent in this example: start sending from 2, and each sending event is incremented by 1. A total of 10 events are sent
Transform operator
1, Function
Process (i.e. transform) the events / the whole event sequence in the event sequence to make it into different events / the whole event sequence
2, Operator summary
1.map()
- Function: apply a function to each data transmitted by Observable to perform transformation operation
- Application scenario: data type conversion
Demo code:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Throwable { return "map After function processing"+integer+"Becomes a string type"; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
2.flatMap()
- Function: transform an Observable transmitting data into multiple Observables, and then merge their transmitted data into a single Observable
- Application scenario: unordered transformation of the whole event sequence sent by the observer
Demo code:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Throwable { ArrayList<String> list = new ArrayList<>(); for(int i = 0 ; i < 3 ; i ++){ list.add("I'm data"+integer+"Branch events"+i); } return Observable.fromIterable(list); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
Note: the unordered sending event is not reflected here. Note that it is an unordered sending event when using it
3.concatMap()
- Function: transform an Observable transmitting data into multiple Observables, and then merge their transmitted data into a single Observable
- For scenario: orderly transform the whole event sequence sent by the observer
Demo code:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Throwable { ArrayList<String> list = new ArrayList<>(); for(int i = 0 ; i < 3 ; i ++){ list.add("I'm data"+integer+"Branch events"+i); } return Observable.fromIterable(list); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
4.buffer()
- Function: regularly obtain a certain number of events from the events that the observable needs to send, put them into the cache, and finally send them
- Application scenario: cache events sent by observers
Demo code:
Observable.just(1,2,3,4,5) .buffer(3,1)//Buffer size & step size .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "+d); } @Override public void onNext(@NonNull List<Integer> list) { Log.d(TAG, "Number of events in the cache = " + list.size()); for(Integer integer: list) Log.d(TAG, "onNext: event"+integer); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "+e); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });
Code run result:
reference
Rxjava2.0 for Android X 2 -- rxjava creation operator
Rxjava2.0 for Android X 3 -- rxjava transform operator