ROS source code learning XIII. Message queue

2021SC@SDUSC

1, Write in front

This source code study focuses on analyzing the message queue mechanism of ROS. As a middleware, message queue can play the role of application decoupling, asynchronous message transmission and traffic peak shaving. The message queue in ROS is also based on this idea in order to optimize some data transmission processes

2, LazyMessage

public class LazyMessage<T> {

  private final ChannelBuffer buffer;
  private final MessageDeserializer<T> deserializer;
  private final Object mutex;

  private T message;


  public LazyMessage(ChannelBuffer buffer, MessageDeserializer<T> deserializer) {
    this.buffer = buffer;
    this.deserializer = deserializer;
    mutex = new Object();
  }

  @VisibleForTesting
  LazyMessage(T message) {
    this(null, null);
    this.message = message;
  }

  public T get() {
    synchronized (mutex) {
      if (message != null) {
        return message;
      }
      message = deserializer.deserialize(buffer);
    }
    return message;
  }
}

LazyMessage is a message delivered in the message queue. It can save memory and improve running speed by delaying decoding messages

LazyMessage has three private constant variables. ChannelBuffer is the buffer of message byte stream, the class in JBoss netty, and MessageDeserializer is the decoder of message, which is used to decode the message when accessing the information for the first time mutex is a lock object for mutually exclusive access

The construction method of LazyMessage receives two parameters, namely byte stream cache and decoder. In the construction method, the corresponding private constant is initialized with parameters, and a mutex object is created at the same time Another constructor receives a generic object whose type is the same as the data type received by this LazyMessage. This object is used in the constructor to assign a value to the private constant message

The get method is used to access the message data. After obtaining the lock object mutex, the get method enters the synchronized code block, and then determines whether the message object is empty. If it is not empty, it will be returned directly. Otherwise, the decoder object will be used to decode the byte stream in the buffer to get the message, assign it to the data member and return

Lazy message is an area for temporarily storing byte stream, but it can interpret byte stream at the same time. In order to save space and CPU time, if the message has never been accessed, it does not need to be decoded. This is also the origin of LazyMessage

3, MessageReceiver

public class MessageReceiver<T> extends AbstractNamedChannelHandler {

  private static final boolean DEBUG = false;
  private static final Log log = LogFactory.getLog(MessageReceiver.class);

  private final CircularBlockingDeque<LazyMessage<T>> lazyMessages;
  private final MessageDeserializer<T> deserializer;

  public MessageReceiver(CircularBlockingDeque<LazyMessage<T>> lazyMessages,
      MessageDeserializer<T> deserializer) {
    this.lazyMessages = lazyMessages;
    this.deserializer = deserializer;
  }

  @Override
  public String getName() {
    return "IncomingMessageQueueChannelHandler";
  }

  @Override
  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
    if (DEBUG) {
      log.info(String.format("Received %d byte message.", buffer.readableBytes()));
    }
  
    lazyMessages.addLast(new LazyMessage<T>(buffer.copy(), deserializer));
    super.messageReceived(ctx, e);
  }
}

MessageReceiver is used to listen to received messages and temporarily cache messages in the message queue in the form of LazyMessage There are two private constants in MessageReceiver. lazyMessages is a ring blocking queue, and deserializer is the decoder object of the format data The blocking queue is thread safe, so the message queue does not need to display the declaration synchronization area. The ring queue is a fixed size queue, which can reuse the allocated space and improve the performance

The construction method of MessageReceiver receives a ring queue and decoder as parameters to initialize the two data members mentioned above

The messageReceived method receives two parameters: the context ctx returned by the message listener callback function and the message event e. first, call the getMessage method of e and transform it into ChannelBuffer type Then copy it, construct a LazyMessage object together with the decoder as a parameter, and add the LazyMessage object to the ring queue The reason why the ChannelBuffer object is copied and then passed in as a parameter is that Netty may reuse it. Failure to copy will cause data pollution

public class MessageDispatcher<T> extends CancellableLoop {

  private static final boolean DEBUG = false;
  private static final Log log = LogFactory.getLog(MessageDispatcher.class);

  private final CircularBlockingDeque<LazyMessage<T>> lazyMessages;
  private final ListenerGroup<MessageListener<T>> messageListeners;


  private final Object mutex;

  private boolean latchMode;
  private LazyMessage<T> latchedMessage;

  public MessageDispatcher(CircularBlockingDeque<LazyMessage<T>> lazyMessages,
      ExecutorService executorService) {
    this.lazyMessages = lazyMessages;
    messageListeners = new ListenerGroup<MessageListener<T>>(executorService);
    mutex = new Object();
    latchMode = false;
  }


  public void addListener(MessageListener<T> messageListener, int limit) {
    if (DEBUG) {
      log.info("Adding listener.");
    }
    synchronized (mutex) {
      EventDispatcher<MessageListener<T>> eventDispatcher =
          messageListeners.add(messageListener, limit);
      if (latchMode && latchedMessage != null) {
        eventDispatcher.signal(newSignalRunnable(latchedMessage));
      }
    }
  }


  public boolean removeListener(MessageListener<T> messageListener) {
    if (DEBUG) {
      log.info("Removing listener.");
    }
    synchronized (mutex) {
      return messageListeners.remove(messageListener);
    }
  }


  public void removeAllListeners() {
    if (DEBUG) {
      log.info("Removing all listeners.");
    }
    synchronized (mutex) {
      messageListeners.shutdown();
    }
  }


  private SignalRunnable<MessageListener<T>> newSignalRunnable(final LazyMessage<T> lazyMessage) {
    return new SignalRunnable<MessageListener<T>>() {
      @Override
      public void run(MessageListener<T> messageListener) {
        messageListener.onNewMessage(lazyMessage.get());
      }
    };
  }


  public void setLatchMode(boolean enabled) {
    latchMode = enabled;
  }


  public boolean getLatchMode() {
    return latchMode;
  }

  @Override
  public void loop() throws InterruptedException {
    LazyMessage<T> lazyMessage = lazyMessages.takeFirst();
    synchronized (mutex) {
      latchedMessage = lazyMessage;
      if (DEBUG) {
        log.info("Dispatching message: " + latchedMessage.get());
      }
      messageListeners.signal(newSignalRunnable(latchedMessage));
    }
  }

  @Override
  protected void handleInterruptedException(InterruptedException e) {
    messageListeners.shutdown();
  }
}

MessageDispatcher is used to distribute LazyMessage objects cached in message queue

MessageDispatcher has four private variables. lazyMessages is the ring blocking queue as buffer, messageListeners is the list of listeners subscribing to the message, latchMode is the flag of cache mode, and latchedMessage is the reference of cache message

The construction method of MessageDispatcher receives two parameters: lazyMessages is the buffer object of message queue, and executorService is the thread pool provided by Java thread pool, which is used to dispatch task execution process The constructor initializes the listener list with executorService, creates a new mutex object, and turns off the cache mode

The addListener method receives a listener object and an integer value representing the maximum number of listeners maintained in the listener list. After obtaining the lock object, it enters the mutex area, calls the add method of messageListeners to add listeners, and the add method returns a message distributor through executorService, If the caching mode is on, the last cached message is sent to the subscriber through the distributor

removeListener receives a reference to a listener object and removes it from the listener list removeAllListeners the shutdown method of calling listener list after getting the mutex, closes all listeners.

The SignalRunnable method receives a LazyMessage object and returns a SignalRunnable object, which is a thread object. Its run method is to distribute a message through the messageListener object

setLatchMode and getLatchMode are used to set the cache mode When the caching mode is enabled, the distributor will maintain the last message sent and send the cached message to new subscribers when they join

The loop method sends the buffer queue message to MessageDispatcher. The loop method calls the takeFirst method in the buffer queue to return the message of the team header, updates the cache message after obtaining the lock object, and then calls the signal method of messageListeners to send the extracted message to the listener.

The handleinteruptedexception method is an interrupt callback function. When an interrupt exception occurs, all listeners will be closed

Keywords: Middleware

Added by Bunkermaster on Mon, 20 Dec 2021 17:26:20 +0200