RxJava 1.x from introduction to abandonment to RxJava 2.x (2)

Preface

RxJava 1.x from entry to abandonment to RxJava 2.x (1)
This article explains the simplest introduction, and in this section we will learn about RxJava thread scheduling together.

text

In the beginning of the last article, RxJava was used to handle asynchronous tasks. But by default, subscribe() is invoked in which thread we produce events, and consumed events in which thread we produce events. So how do you make it asynchronous? RxJava provides us with Scheduler for thread scheduling. Let's see what Scheduler RxJava provides.

Let's look at a piece of code:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "upstream's thread:"+Thread.currentThread().getName());
        emitter.onNext(1);
        emitter.onComplete();


    }
}).subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "downstream's thread:"+Thread.currentThread().getName());
            }
        });

After running log:

04-23 02:25:21.323 4135-4162/com.example.dawn4get.myapplication D/rx: upstream's thread:RxNewThreadScheduler-1

04-23 02:25:21.361 4135-4135/com.example.dawn4get.myapplication D/rx: downstream's thread:main

As you can see, the upstream transmits the data source in the sub-thread and the downstream receives it in the main thread. The main code that works is:

.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())

Simply put, subscribeOn() specifies the thread that sends events upstream and observeOn() specifies the thread that receives events downstream.

The threads that specify upstream multiple times are only valid for the first time, that is, multiple calls to subscribeOn() are only valid for the first time, and the rest are ignored.

It is possible to specify downstream threads several times, that is to say, every time observeOn() is called, downstream threads will switch once.

Take a chestnut

Retrofit+RxJava obtains data from dry goods concentration camps:

Add gradle configuration

 //retrofit
 compile 'com.squareup.retrofit2:retrofit:2.1.0'
 //Gson converter
 compile 'com.squareup.retrofit2:converter-gson:2.1.0'
 //RxJava2 Adapter
 compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
 //okhttp
 compile 'com.squareup.okhttp3:okhttp:3.4.1'
 compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'
public class ServiceFactory {

    private final Gson mGson;
    private OkHttpClient.Builder mBuilder;


    private ServiceFactory() {
        mGson = new GsonBuilder()
                .setDateFormat("yyyy-MM-dd hh:mm:ss")
                .create();

        mBuilder = new OkHttpClient.Builder();
        mBuilder.readTimeout(10, TimeUnit.SECONDS);
        mBuilder.connectTimeout(9, TimeUnit.SECONDS);
    }


    private static class SingletonHolder {
        private static final ServiceFactory INSTANCE = new ServiceFactory();
    }

    public static ServiceFactory getInstance() {
        return SingletonHolder.INSTANCE;
    }


    public <S> S createService(Class<S> serviceClass) {
        String baseUrl = "";
        try {
            Field field1 = serviceClass.getField("BASE_URL");
            baseUrl = (String) field1.get(serviceClass);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.getMessage();
            e.printStackTrace();
        }
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(baseUrl)
                .client(mBuilder.build())
                .addConverterFactory(GsonConverterFactory.create(mGson))
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        return retrofit.create(serviceClass);
    }
}
public interface GankService {

    String BASE_URL = "http://www.gank.io/api/";


    /***
     * Query dry goods by category
     *
     * @param category
     * @param pageIndex
     * @return
     */
    @GET("data/{category}/20/{pageIndex}")
    Observable<HttpResult<List<GanHuoData>>> getGanHuo(@Path("category") String category
            , @Path("pageIndex") int pageIndex);


}
ServiceFactory.getInstance()
       .createService(GankService.class).getGanHuo("welfare", 3)
       .observeOn(AndroidSchedulers.mainThread())//Go back to the main thread to process the request results
       .subscribeOn(Schedulers.io())//Network requests in IO threads
       .subscribe(new Observer<HttpResult<List<GanHuoData>>>() {
   @Override
   public void onSubscribe(Disposable d) {

   }

   @Override
   public void onNext(HttpResult<List<GanHuoData>> value) {
       Log.d(TAG,"value'size:"+value.results.size());
   }

   @Override
   public void onError(Throwable e) {
       Log.e(TAG,e.getMessage());
  }

   @Override
   public void onComplete() {
       Log.d(TAG,"onComplete");
   }
});

This example is very rough, used to get 20 "sister" data, but I just lazy to show the log simply, but it's not a big problem. The results after operation are as follows:

04-23 03:48:17.410 7881-7881/com.example.dawn4get.myapplication D/rx: value'size:20
04-23 03:48:17.410 7881-7881/com.example.dawn4get.myapplication D/rx: onComplete

It seems perfect, but we neglect that if Activity has exited during the request process, and if we go back to the main thread to update the UI, then APP will crash. What can we do? In the last section, we talked about Disposable, which is a switch, when we call its dispose() method, we will cut off the water pipe, so that If you can't receive events downstream, you won't update the UI anymore. So we can save this Disposable in Activity and cut it off when Activity exits.

What if there are more than one Disposable? RxJava already has a container Composite Disposable built in. Whenever we get a Disposable, we call Composite Disposable. add () to add it to the container. When we exit, we call Composite Disposable. clear () to cut off all the pipes.

ok, this time I'm here to learn. The next article in this series is about learning Rx Java operators together, see u:)

Keywords: Retrofit OkHttp REST Gradle

Added by RickyF on Sat, 06 Jul 2019 23:36:13 +0300