Java tips: use RxJava to create observable data RxLiveData

1. Problem scenario

In practical work, we often need to share data between different objects and different modules, and these data are usually changeable, so a problem may occur: when the data changes, the relevant objects or modules do not know and do not update the data in time. At this time, we hope to notify other modules to update synchronously when the data changes, so as to achieve a similar effect of linkage between data. The easiest to think of is the observer mode of monitoring callbacks. Here is a less elegant implementation seen before:

class User {
    //......  The fields of Java Bean are omitted
}

interface Listener {
    void onUserUpdated(User user);
}

class UserManager {
    private static UserManager manager = new UserManager();

    private UserManager() {
    }

    public static UserManager getInstance() {
        return manager;
    }

    private User user;
    private List<Listener> listeners = new LinkedList<>();

    public void addUserListener(Listener listener) {
        listeners.add(listener);
    }

    public void removeUserListener(Listener listener) {
        listeners.remove(listener);
    }

    public User getUser() {
        return user;
    }

    public void setUser(User user) {
        this.user = user;
        for (Listener listener : listeners) {
            listener.onUserUpdated(this.user);
        }
    }
}

This approach has the following disadvantages:

  1. It does not have reusability (the callback monitoring must be re implemented every time new data is added);
  2. Increase the risk of memory overflow (the person calling addUserListener may forget to call removeUserListener);
  3. Pollution of setter method (doing superfluous things).

Faced with such problems, RxJava, Observable and flow APIs in JDK and LiveData in Android all provide available implementation methods, which is not so convenient in actual development. What this article wants to introduce is that I use RxJava to create a more convenient Observable object tool class - RxLiveData (see the bottom of the code).

2. Use examples

Let's take a short and complete example:

/* Java Bean data class for testing*/
class User {
    //......  The fields of Java Bean are omitted
}
/* A single case */
class UserManager {
    private static final UserManager manager = new UserManager();

    private UserManager() {
    }

    public static UserManager getInstance() {
        return manager;
    }

    private final RxLiveData<User> userData = new RxLiveData<>();

    public RxLiveData<User> getUserData() {
        return userData;
    }
}

class A {
    public void init() {
        //Subscribe to observable objects so that data can be recalled when it changes
        UserManager.getInstance().getUserData().getObservable()
                .subscribe((User user) -> {//Use lambda version
                    update(user);// Every time the user information changes, it will be called here
                });

        update(UserManager.getInstance().getUserData().getValue());
    }

    private void update(User user) {
        System.out.println("user changed");
    }
}

class B{
    public B() {
        UserManager.getInstance().getUserData().getObservable().subscribe(this::update);//Method reference version
    }

    private void update(User user) {
        System.out.println("user changed");
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        A a = new A();
        a.init();
        B b = new B();
        //Update the data in UserManager. At this time, the corresponding methods in A and B will be called
        UserManager.getInstance().getUserData().postData(new User());
    }
}

Here, the simulated UserManager data is shared between objects of class A and class B. when the content of UserManager changes, a and B can be notified to perform corresponding operations.

At this time, if you want to add a data to the UserManager, such as a long type time, you only need to add a property and a getter method as follows:

private final RxLiveData<Long> timeData = new RxLiveData<>();

public RxLiveData<Long> getTimeData() {
    return timeData;
}

In case of Android application development, you can also use the functions of RxAndroid and RxLifecycle to control the execution thread of callback and cancel the subscription when the interface is destroyed, for example:

userManager.getUserData().getObservable()
        .compose(bindUntilEvent(ActivityEvent.DESTROY))//Specifies to unsubscribe on onDestroy callback
        .observeOn(AndroidSchedulers.mainThread())//Specify main thread
        .subscribe(user -> {
            
        }, Throwable::printStackTrace);

3. Introduction to main methods

3.1 getObservable method

Method signature: public observable < T > getobservable()

This method is used to obtain the Observable of RxJava, and then subscribe to the data. It can also be supported by RxJava related functions (for example, Stream operation, specifying threads, controlling life cycle, etc.).

3.2 postData method

Method signature: public void postData(T value)

This method is used to update data. It will update the data existing in the current RxLiveData object and trigger the callback of the observer through RxJava's ObservableEmitter.

Note: when the parameter is null, RxJava will throw an exception for null, so the implementation here is to store only data when it is judged to be null without triggering the callback of the observer.

3.3 getValue method

Method signature: public T getValue()

This method is only used to obtain the data existing in the current RxLiveData.

3.4 optValue method

Method signature: public optional < T > optvalue()

The Optional version of the getValue method.

4. Complete realization

import io.reactivex.rxjava3.core.Observable;//If RxJava2 is used, please change the package name of this version
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.Optional;

public class RxLiveData<T> {
    private final Observable<T> observable;
    private Disposable disposable;
    private T value;
    private ObservableEmitter<T> emitter;

    public RxLiveData() {
        observable = Observable
                .create((ObservableEmitter<T> emitter) -> this.emitter = emitter)
                .publish()
                .autoConnect(0, disposable -> this.disposable = disposable);
    }

    public Observable<T> getObservable() {
        return observable;
    }

    public void postData(T value) {
        this.value = value;
        if (emitter != null && value != null) {
            emitter.onNext(value);
        }
    }

    public T getValue() {
        return value;
    }

    public Optional<T> optValue() {
        return Optional.ofNullable(value);
    }
}

Keywords: Java rxjava LiveData

Added by God Ownz on Sun, 30 Jan 2022 10:59:04 +0200