The core principle of spring cloud RPC: RxJava responsive programming framework, creation operator

Create type operator

The creation operator is used to create an Observable object, Observable subject object and pop up data. RxJava has many creation operators, which are roughly as follows:

(1) Create (): create an Observable topic object from scratch using the function.

(2) defer(): Observable topic objects are created only when subscribers subscribe, and a new Observable topic object is created for each subscription.

(3) Range (): creates an Observable subject object that ejects a sequence of integers in a specified range.

(4) Interval (): creates an Observable subject object that ejects an integer sequence at a given time interval.

(5) timer (): creates an Observable subject object that ejects a single data after a given delay.

(6) empty (): create an Observable subject object that does nothing and notifies the completion directly.

(7) error (): create an Observable subject object that does nothing to directly notify errors.

(8) never (): create an Observable subject object that does not eject any data.

Next, take just, from, range, interval and defer operators as examples.

just operator

The just operator of Observable is used to create an Observable topic and will eject the argument data. The just operator can receive multiple arguments, all of which will be ejected one by one.

The demonstration code of just operator is as follows:

package com.crazymaker.demo.rxJava.basic;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import rx.Observable;
@Slf4j
public class CreaterOperatorDemo {
 /**
 *Demonstrate the basic use of just
 */
 @Test
 public void justDemo() {
 //Send a string "hello world"
 Observable.just("hello world")
 .subscribe(s -> log.info("just string->" + s));
 //Send four integers 1, 2, 3 and 4 one by one
 Observable.just(1, 2, 3, 4)
 .subscribe(i -> log.info("just int->" + i));
 }
}

The results after operation are roughly as follows:

20:53:17.653 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->hello world
20:53:17.658 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->1
20:53:17.659 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->2
20:53:17.659 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->3
20:53:17.659 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->4

Note that the just operator is simply ejected as is. If the argument is an array or iteratable iterator object, the array or iteratable will be ejected as a single data.

Although the just operator can eject multiple data, the upper limit is 9.

from operator

The from operator takes objects such as arrays and iteratable iterators as input, creates an Observable topic object, and then ejects the data elements in the arguments (such as arrays and iteratable iterators) one by one.

The demonstration code of the from operator is as follows:

...
@Slf4j
public class CreaterOperatorDemo {
 /***Demonstrate the basic use of from */
 @Test
 public void fromDemo() {
 //Send each element in an array one by one
 String[] items = {"a", "b", "c", "d", "e", "f"};
 Observable.from(items)
.subscribe(s -> log.info("just string->" + s));
 //Send each element in the iterator one by one
 Integer[] array = {1, 2, 3, 4};
 List<Integer> list = Arrays.asList(array);
 Observable.from(list)
.subscribe(i -> log.info("just int->" + i));
}
...
}

Run the above demonstration code, and the results are as follows:

21:10:18.537 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->a
21:10:18.540 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->b
21:10:18.540 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->c
21:10:18.540 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->d
21:10:18.540 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->e
21:10:18.541 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just string->f
21:10:18.543 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->1
21:10:18.544 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->2
21:10:18.544 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->3
21:10:18.545 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->4

As can be seen from the above output, the from() operation splits the incoming array or iteratable into a single element and ejects it in turn.

range operator

The range operator takes a set of integer ranges as input, creates an Observable subject object, and ejects all integers contained in the integer range.

The demonstration code of the range operator is as follows:

package com.crazymaker.demo.rxJava.basic;
...
@Slf4j
public class CreaterOperatorDemo {
 /**Demonstrate the basic use of range */
 @Test
 public void rangeDemo() {
 //Send a series of integers in a range one by one
 Observable.range(1, 10)
 .subscribe(i -> log.info("just int->" + i));
 }
}

Run the above demonstration code, and the output results are as follows:

21:24:50.507 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->1
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->2
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->3
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->4
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->5
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->6
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->7
21:24:50.513 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->8
21:24:50.514 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->9
21:24:50.514 [main] INFO c.c.d.r.b.CreaterOperatorDemo - just int->10

Observable.range (1,10) represents the data ejected in the range [1,10], and its range includes the upper and lower limits of the range.

interval operator

The interval operator creates an Observable topic object (message flow), which will emit an integer sequence at a fixed time interval. The demonstration code of the interval operator is as follows:

package com.crazymaker.demo.rxJava.basic;
...
@Slf4j
public class OtherOperatorDemo
{
 /**
 *Demonstrate interval conversion
 */
 @Test
 public void intervalDemo() throws InterruptedException
 {
 Observable
 .interval(100, TimeUnit.MILLISECONDS)
 .subscribe(aLong -> log.info(aLong.toString()));
 Thread.sleep(Integer.MAX_VALUE);
 }
...
}

The ejection interval of the interval operator in the demonstration code is 100 milliseconds. Run this demo program and the output is as follows:

[RxComputationScheduler-1] INFO c.c.d.r.b.OtherOperatorDemo - 0
[RxComputationScheduler-1] INFO c.c.d.r.b.OtherOperatorDemo - 1
[RxComputationScheduler-1] INFO c.c.d.r.b.OtherOperatorDemo - 2
[RxComputationScheduler-1] INFO c.c.d.r.b.OtherOperatorDemo - 3
[RxComputationScheduler-1] INFO c.c.d.r.b.OtherOperatorDemo - 4
...

defer operator

just, from, range, and other creation operators eject data when creating a topic, not when subscribed. The defer operator does not eject data when creating a topic. It will wait until an observer subscribes to it.

The demo code of defer operator is as follows:

package com.crazymaker.demo.rxJava.defer;
...
@Slf4j
public class SimpleDeferDemo
{
 /**
 *Demonstrates the defer creation operator
 */
 @Test
 public void deferDemo()
 {
 AtomicInteger foo = new AtomicInteger(100);
 Observable observable = Observable.just(foo.get());
 /**
 *Delay creation
 */
 Observable dObservable = Observable.defer(
() -> Observable.just(foo.get()));
 /**
 *Modify the value of the object
 */
 foo.set(200);
 /**
 *Observer subscription
 */
 observable.subscribe(
integer -> log.info("just emit {}", String.valueOf(integer)));
 /**
 *Observer subscription
 */
 dObservable.subscribe(
integer -> log.info("defer just emit {}", String.valueOf(integer)));
 }
}

Run this demo program and the output is as follows:

[main] INFO c.c.d.r.defer.SimpleDeferDemo - just emit 100
[main] INFO c.c.d.r.defer.SimpleDeferDemo - defer just emit 200

In essence, a new Observable topic will be created when the observer subscribes to the topic created through defer. Therefore, although each subscriber thinks he subscribes to the same Observable, in fact, each subscriber obtains an independent message sequence.

The content of this article is the core principle of spring cloud RPC remote call: RxJava responsive programming framework and creation operator

  1. The next article will explain the core principles of spring cloud RPC remote call: RxJava responsive programming framework and filter operators;
  2. Friends who think the article is good can forward this article and pay attention to the Xiaobian;
  3. Thank you for your support!

Keywords: Java Programming Android Programmer rxjava

Added by rossh on Sun, 23 Jan 2022 09:04:58 +0200