Asynchronous Programming RxJava - Introduction

Preface
Write an article a while ago Some understanding of the equation , whether it's a collaboration or callback, it essentially provides an asynchronous, non-blocking programming mode; it also introduces java support for asynchronous, non-blocking programming mode, mainly referring to Future and CompletableFuture; after that, some classmates mentioned RxJava in the following message, just recently seen Microservice Design This book refers to Reactive Extensions (Rx), and RxJava is the implementation of Rx on the JVM, all intending to learn more about RxJava.

Introduction to RxJava
RxJava's official website address: https://github.com/ReactiveX/RxJava
RxJava is described in one sentence: RxJava - Reactive Extensions for the JVM - a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
The main idea is: a library of asynchronous, event-based programs that use observable sequences in the Java VM.
A more detailed explanation describes the main features of RxJava in an article on the Netflix Technology Blog:
1. Easy concurrency to make better use of the server's capabilities.
2. Easy conditional asynchronous execution.
3. A better way to avoid calling back to Hell.
4. A responsive approach.

Comparing with CompletableFuture
As mentioned earlier, CompletableFuture really implements asynchronous programming, a common scenario to use:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Time-consuming function);
Future<Integer> f = future.whenComplete((v, e) -> {
        System.out.println(v);
        System.out.println(e);
});
System.out.println("other...");

Here is a simple example of how RxJava implements an asynchronous programming mode:

Observable<Long> observable = Observable.just(1, 2)
        .subscribeOn(Schedulers.io()).map(new Func1<Integer, Long>() {
            @Override
            public Long call(Integer t) {
                try {
                    Thread.sleep(1000); //Time-consuming operation
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return (long) (t * 2);
            }
        });
observable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println("error" + e);
    }
    @Override
    public void onNext(Long result) {
        System.out.println("result = " + result);
    }
});
System.out.println("other...");

Func1 performs a time-consuming operation asynchronously and Subscriber is subscribed to Observable, which calls back the onNext method in Subscriber.
The asynchronous method is specified in subscribeOn(Schedulers.io()), which can be understood as starting a new thread each time a time-consuming operation is performed.
Structurally, it's similar to CompletableFuture in that it asynchronously performs a time-consuming operation and then actively tells me the result when it comes to it.What else do we need RxJava for? I don't know if you have noticed that in the above example, there are actually 2 streams [1,2], and any one of them will tell me voluntarily. Of course, this is only one of them. RxJava has many useful functions, which are described below.

Asynchronous Observer Mode
Did the above code find any special resemblance to the design pattern: the observer pattern; first, provide an Observable for the observer, then add the observer Subscriber to the list of observers;
There are four roles available in RxJava: Observable, Observer, Subscriber, Subjects
Observables and Subjects are two observers, Observers and Subscribers are observers;
Of course, we can also look at the source code to see Observer in jdk and Observer in RxJava
Observer in jdk:

public interface Observer {
    void update(Observable o, Object arg);
}

Observer for RxJava:

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

Subscriber can also be found to be implements Observer:

public abstract class Subscriber<T> implements Observer<T>, Subscription

You can see that two new methods have been introduced into Observer in RxJava: onCompleted() and onError()
onCompleted(): Notifies the observer that Observable has no more data and that the event queue is complete
onError(): When an exception occurs during event handling, onError() is triggered and the queue terminates automatically. No more events are allowed to be emitted.
Because RxJava provides both synchronous and asynchronous ways to handle events, and I think the asynchronous way is more valuable for RxJava, I name it Asynchronous Observer mode here.

Okay, here's a formal introduction to the flexible operators of RxJava. It's just a simple introduction and a simple example. In what scenarios, they'll be described in a future article

Maven Introduction

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.2.4</version>
</dependency>

Create Observable
1.create() creates an Observable and defines event trigger rules for it

Observable<Integer> observable = Observable
            .create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> observer) {
                    for (int i = 0; i < 5; i++) {
                        observer.onNext(i);
                    }
                    observer.onCompleted();
                }
            });
observable.subscribe(new Observer<Integer>() {...});

2.from() creates an Observable from a list, which emits each element in the list

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items);
observable.subscribe(new Observer<Integer>() {...});

3.just() Sends the incoming parameters in turn

Observable<Integer> observable = Observable.just(1, 2, 3);
observable.subscribe(new Observer<Integer>() {...});

Filter Observable
1.filter() to filter unwanted values in our observation sequence

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).filter(
        new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer t) {
                return t == 1;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

2.take() and taskLast() take the first and last elements, respectively

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).take(3);
observable.subscribe(new Observer<Integer>() {...});
Observable<Integer> observable = Observable.from(items).takeLast(2);

3.distinct() and distinct UntilChanged()
distinct() filters out duplicate values

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(10);
Observable<Integer> observable = Observable.from(items).distinct();
observable.subscribe(new Observer<Integer>() {...});

Let us be notified when distinctUntilChanged() column emits a new value different from the previous one

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(100);
items.add(100);
items.add(200);
Observable<Integer> observable = Observable.from(items).distinctUntilChanged();
observable.subscribe(new Observer<Integer>() {...});

4.first() and last() take the first and last element, respectively

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
// Observable<Integer> observable = Observable.from(items).first();
Observable<Integer> observable = Observable.from(items).last();
observable.subscribe(new Observer<Integer>() {...});

5. Skp() and skipLast() skip several elements before or after each other

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
// Observable<Integer> observable = Observable.from(items).skip(2);
Observable<Integer> observable = Observable.from(items).skipLast(2);
observable.subscribe(new Observer<Integer>() {...});

6.elementAt() takes the number of elements to launch

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).elementAt(2);
observable.subscribe(new Observer<Integer>() {...});

7.sample() Specifies the launch interval for launch

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 50000; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).sample(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...});

8.timeout() emits an error if no value is obtained within the set time interval

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).timeout(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...onError()...});

9.debounce() has not yet launched one after a specified time interval, so it will launch the last one

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).debounce(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...});

Convert Observable
1.map() receives a specified Func object and applies it to each value emitted by Observable

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).map(
        new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer t) {
                return t * 2;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

2. The flatMap() function provides a way to flatten the sequence and then merge the data emitted by these Observables

final Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(3));
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).flatMap(
        new Func1<Integer, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> call(Integer t) {
                List<Integer> items = new ArrayList<Integer>();
                items.add(t);
                items.add(99999);
                return Observable.from(items).subscribeOn(scheduler);
            }
        });
observable.subscribe(new Observer<Integer>() {...});

One important tip is about merging parts: it allows crossovers.This means that flatMap() does not guarantee the exact launch of the source Observables in the final generated Observables
Sequence.

3. The concatMap () function solves the crossover problem of flatMap(), providing a tiling function that allows the emitted values to be kept together instead of merging them.
Same as above, replace flatMap with concatMap, and the output looks orderly

4.switchMap() is similar to flatMap(), except that whenever the source Observable launches a new data item (Observable), it cancels the subscription and stops monitoring the Observable generated by the previous data item and starts monitoring the currently launched one.
Same as above, replace flatMap with switchMap, and the output will only have the last value

5.scan() is an accumulative function that applies a function to each data emitted by the original Observable, calculates the result value of the function, and fills the value back into the observable sequence, waiting to be used with the next data emitted.

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).scan(
        new Func2<Integer, Integer, Integer>() {
 
            @Override
            public Integer call(Integer t1, Integer t2) {
                System.out.println(t1 + "+" + t2);
                return t1 + t2;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

6.groupBy() to group elements

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<GroupedObservable<Integer, Integer>> observable = Observable
                .from(items).groupBy(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer t) {
                        return t % 3;
                    }
                });
observable.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
        @Override
        public void onNext(final GroupedObservable<Integer, Integer> t) {
            t.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer value) {
                    System.out.println("key:" + t.getKey()+ ", value:" + value);
                }
            });
                  
});

The 7.buffer() function transforms the source Observable into a new Observable, which emits a list of values at a time instead of one.

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<List<Integer>> observable = Observable.from(items).buffer(2);
observable.subscribe(new Observer<List<Integer>>() {...});

8.The window() function is similar to buffer(), but it emits Observable instead of a list

List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Observable<Integer>> observable = Observable.from(items).window(2);
observable.subscribe(new Observer<Observable<Integer>>() {
    @Override
    public void onNext(Observable<Integer> t) {
        t.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer t) {
                System.out.println("this Action1 = " + this+ ",result = " + t);
            }
        });
        //onCompleted and onError
});

9.cast() converts each item of data in the source Observable into a new type, turning it into a different Class

List<Father> items = new ArrayList<Father>();
items.add(new Son());
items.add(new Son());
items.add(new Father());
items.add(new Father());
Observable<Son> observable = Observable.from(items).cast(Son.class);
observable.subscribe(new Observer<Son>() {...});
 
class Father {
}
 
class Son extends Father {
}

Combining Observables
1. The merge () method will help you merge two or more Observables into the data items they emit

List<Integer> items1 = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items1.add(i);
}
List<Integer> items2 = new ArrayList<Integer>();
for (int i = 5; i < 10; i++) {
    items2.add(i);
}
Observable<Integer> observable1 = Observable.from(items1);
Observable<Integer> observable2 = Observable.from(items2);
Observable<Integer> observableMerge = Observable.merge(observable1,observable2);
observable.subscribe(new Observer<Integer>() {...});

2.zip() merges two or more data items emitted by Observables, transforms them according to the specified function Func*, and emits a new value

List<Integer> items1 = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
    items1.add(i);
}
List<Integer> items2 = new ArrayList<Integer>();
for (int i = 5; i < 10; i++) {
    items2.add(i);
}
Observable<Integer> observable1 = Observable.from(items1);
Observable<Integer> observable2 = Observable.from(items2);
Observable<Integer> observableZip = Observable.zip(observable1,
        observable2, new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer t1, Integer t2) {
                return t1 * t2;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

3.combineLatest() combines the results of two Observables. The results of either Observable are combined with the results of the other Observable according to certain rules.

Observable<Long> observable1 = Observable.interval(1000,TimeUnit.MILLISECONDS);
Observable<Long> observable2 = Observable.interval(1000,TimeUnit.MILLISECONDS);
Observable.combineLatest(observable1, observable2,
        new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long t1, Long t2) {
                System.out.println("t1 = " + t1 + ",t2 = " + t2);
                return t1 + t2;
            }
        }).subscribe(new Observer<Long>() {...});
Thread.sleep(100000);

4.join() is similar to combineLatest(), but the join operator controls the life cycle of each Observable result, which can be combined with another Observable result according to certain rules.

Observable<Long> observable1 = Observable.interval(1000,
                TimeUnit.MILLISECONDS);
        Observable<Long> observable2 = Observable.interval(1000,
                TimeUnit.MILLISECONDS);
        observable1.join(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long t) {
                System.out.println("left=" + t);
                return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long t) {
                System.out.println("right=" + t);
                return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long t1, Long t2) {
                return t1 + t2;
            }
        }).subscribe(new Observer<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Observable  completed");
            }
 
            @Override
            public void onError(Throwable e) {
                System.out.println("Oh,no!  Something   wrong   happened!");
            }
 
            @Override
            public void onNext(Long t) {
                System.out.println("[result=]" + t);
            }
        });
 
        Thread.sleep(100000);

5.switchOnNext() converts a set of Observables into an Observable, and for each Observable in the set of Observables, if there are two or more Observables submitting results at the same time, only the last Observable submission is given to the subscriber

Observable<Observable<Long>> observable = Observable.interval(2, TimeUnit.SECONDS)
        .map(new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.interval(1, TimeUnit.MILLISECONDS).take(5);
            }
        }).take(2);
 
Observable.switchOnNext(observable).subscribe(new Observer<Long>() {...});
Thread.sleep(1000000);

6.startWith() Launches a data sequence by passing a parameter before Observable starts to emit their data

Observable.just(1000, 2000).startWith(1, 2).subscribe(new Observer<Integer>() {...});

summary
In this paper, rxjava is briefly introduced, rxjava is analyzed from the point of asynchronous programming, and Observable's API of filtering, conversion, combination is briefly introduced, of course, we are more concerned about the application scenarios of rxjava.

Keywords: Java Programming jvm JDK

Added by linux1880 on Fri, 20 Mar 2020 04:42:10 +0200