guava learning: observer mode

The observer pattern is a common behavioral design pattern.In the native implementation of Java, the Observer interface is implemented by the observer, and the Observable is inherited by the observer.

Write a simple implementation using the Java api below.

Observer code:

public class MyObserver implements Observer {

    public void update(Observable o, Object arg) {
        if (o instanceof MyObservable){
            System.out.println(arg);
        }

    }
}

Observed:

public class MyObservable extends Observable {

    @Override
    public void notifyObservers(Object message){
        super.setChanged();
        super.notifyObservers(message);
    }

}

 

Bind subject class:

public class Subject {

    private Observable observable = new MyObservable();

    public void registerObserver(MyObserver observer) {
        observable.addObserver(observer);
    }

    public void removeObserver(MyObserver observer) {
        observable.deleteObserver(observer);
    }

    public void notifyObservers(String message) {
        observable.notifyObservers(message);
    }

}

Test Code

public static void main(String[] args) {
        Subject subject = new Subject();
        MyObserver observer = new MyObserver();
        subject.registerObserver(observer);
        subject.notifyObservers("hi, I am subject Observable");
}

The implementation of java couples business and non-business code if the observer implements message processing asynchronously.

guava encapsulates Java's observer mode and facilitates asynchronous support.talk is cheap, take a look at the code first:

Define two observers:

public class AObserver {

    Logger logger = LoggerFactory.getLogger(getClass());

    @Subscribe
    public void handleMessage(String msg){
        logger.info("a obsesrver receive message:{}", msg);
    }
}
public class BObserver {

    Logger logger = LoggerFactory.getLogger(getClass());

    @Subscribe
    public void handleMessage(String msg){
        logger.info("b obsesrver receive message:{}", msg);
    }
}

 

EventBusUtil class

public class EventBusUtil {

    public static EventBus getEventBus(){
        return EventBusFactory.getAsyncInstance();
    }

    public static class EventBusFactory{
        private static EventBus asyncEventBus = new AsyncEventBus(LocalThreadPoolExecutor.getExecutor());
        private static EventBus syncEventBus = new AsyncEventBus(MoreExecutors.directExecutor());

        public static EventBus getAsyncInstance(){
            return asyncEventBus;
        }

        public static EventBus getyncInstance(){
            return syncEventBus;
        }

    }
}

Note: MoreExecutors.directExecutor() looks like a thread pool, but it's a single thread. See the source notes:

Test code:

public class TestEventBus{

    public static void main(String[] args){
        EventBus eventBus = EventBusUtil.getEventBus();
        eventBus.register(new AObserver());
        eventBus.register(new BObserver());

        for (int j = 0; j < 2; j ++){
            eventBus.post("hi, observer" + j);
        }
    }
}

Let's look at the implementation in guava:

1) Registration in EventBus, any object can be registered as an observer

  /**
   * Registers all subscriber methods on {@code object} to receive events.
   *
   * @param object object whose subscriber methods should be registered.
   */
  public void register(Object object) {
    subscribers.register(object);
  }

 

All observer classes have @Subscribe annotated the way they handle listening events, and when registered, they look for the method in the class that annotates it and register it, as shown in the code below

findAllSubscribers method

/** Registers all subscriber methods on the given listener object. */
  void register(Object listener) {
//Find all methods that contain @Subscribe comments
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {/No observers registered yet
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

2) Notifications in EventBus

/**
   * Posts an event to all registered subscribers. This method will return successfully after the
   * event has been posted to all subscribers, and regardless of any exceptions thrown by
   * subscribers.
   *
   * <p>If no subscribers have been subscribed for {@code event}'s class, and {@code event} is not
   * already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted.
   *
   * @param event event to post.
   */
  public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

As you can see from the code above, notifications are made through the dispatcher.dispatch method, whose code looks like the following:

@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

The code above shows that the event of a message and the observer subscriber are encapsulated into an object that is put into a concurrent queue and then out of the queue to have the observer trigger message processing.

/** Dispatches {@code event} to this subscriber using the proper executor. */
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

The thread pool here is the thread pool variable that we passed in when declaring EventBus.The last event triggered a reflection using java.

/**
   * Invokes the subscriber method. This method can be overridden to make the invocation
   * synchronized.
   */
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

That's where code analysis comes in. See here for more guava code:

https://github.com/google/guava

See here for example code

https://github.com/jinjunzhu/myguava.git

 

Welcome to pay attention to the Personal Public Number, study together and grow together

30 original articles published. 2. 40,000 visits+
Private letter follow

Keywords: Java github Google git

Added by manitoon on Tue, 17 Mar 2020 04:11:21 +0200