ActiveMQ-05 of Message Oriented Middleware

Advanced use

queue browser

You can view messages in the queue without consumption, and there is no subscription function

package com.zjw.activemq.browser;

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * @author zjwblog <co.zjwblog@gmail.com>
 * @version 1.0
 * @date 2021/12/16 12:04 morning
 */
public class BrowserQueue {
  private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false";
  private static final String USERNAME = null;
  private static final String PASSWORD = null;

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);
    Connection conn = factory.createConnection();
    conn.start();
    Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);

    QueueBrowser browser = session.createBrowser(new ActiveMQQueue("test"));

    Enumeration enumeration = browser.getEnumeration();

    while (enumeration.hasMoreElements()) {
      TextMessage textMessage = (TextMessage) enumeration.nextElement();
      System.out.println("textMessage:" + textMessage.getText());
    }
    
    conn.close();
  }
}

JMSCorrelationID

It is used for the association between messages, giving people a sense of conversation

http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

Example

package com.zjw.activemq.correlation;

import com.zjw.activemq.delay.cron.Test;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author zjwblog <co.zjwblog@gmail.com>
 * @version 1.0
 * @date 2021/12/16 12:20 morning
 */
public class CorrelationIDQueueReceiver {

  public static void main(String[] args) throws Exception {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        ActiveMQConnectionFactory.DEFAULT_USER,
        ActiveMQConnectionFactory.DEFAULT_PASSWORD,
        "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"
    );
    Connection conn = factory.createConnection();
    conn.start();
    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue("test");

    MessageConsumer consumer = session.createConsumer(queue);

    consumer.setMessageListener(message -> {
      if(message instanceof TextMessage) {
        TextMessage msg = (TextMessage) message;
        try {
          System.out.println(msg.getJMSCorrelationID());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      }
    });
  }


}


package com.zjw.activemq.correlation;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author zjwblog <co.zjwblog@gmail.com>
 * @version 1.0
 * @date 2021/12/16 12:13 morning
 */
public class CorrelationIDQueueSender {

  public static void main(String[] args) throws Exception {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        ActiveMQConnectionFactory.DEFAULT_USER,
        ActiveMQConnectionFactory.DEFAULT_PASSWORD,
        "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"
    );
    Connection conn = factory.createConnection();
    conn.start();
    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

    Queue queue = session.createQueue("test");

    MessageProducer producer = session.createProducer(queue);

    Message message = session.createTextMessage("Message from ServerA" );
    message.setJMSCorrelationID("movie");
    producer.send(message);

    // conn.close();
  }
}

JMSReplyTo

The sender can accept the address of message consumption confirmation, which is similar to synchronization

replyTo is set in the send message

message.setJMSReplyTo(queue);

The replyTo can be obtained at the receiving end, and then the confirmation information can be sent to the corresponding location

Destination replyTo = message.getJMSReplyTo();

The later QueueRequestor is implemented using replayTo, but the sending client adds a listener in a temporary queue by default, and then replayTo is set to this address, and the receiving end can reply confirmation information to this address to achieve the effect of synchronous call

QueueRequestor synchronization message

QueueRequestor sends synchronous messages, which essentially violates mq's asynchronous communication principle. However, mq can still provide the characteristics of application decoupling and heterogeneous systems, because after using QueueRequestor to send a message, it will wait for the reply from the receiving end. If it does not receive a reply, it will cause death and other phenomena! Moreover, this method does not set the function of timeout waiting.

Example of receiving end:

package com.zjw.activemq.sync;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

/**
 * @author zjwblog <co.zjwblog@gmail.com>
 * @version 1.0
 * @date 2021/12/15 9:13 morning
 */
public class QueueRequestorReceiver {
  private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false";
  private static final String USERNAME = null;
  private static final String PASSWORD = null;
  public static void main(String[] args) throws Exception {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);

    Connection conn = factory.createConnection();
    conn.start();

    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

    MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("test"));

    consumer.setMessageListener(message -> {
      System.out.println("Receive a message and send back a confirmation message.");
      try {
        Destination replyTo = message.getJMSReplyTo();
        System.out.println("replyTo:" + replyTo);
        MessageProducer producer = session.createProducer(replyTo);
        producer.send(session.createTextMessage(replyTo.toString()));

      } catch (JMSException e) {
        e.printStackTrace();
      }
    });

  }
}

Sender example:

package com.zjw.activemq.sync;

import javax.jms.Queue;
import javax.jms.QueueRequestor;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author zjwblog <co.zjwblog@gmail.com>
 * @version 1.0
 * @date 2021/12/15 9:12 morning
 *
 * @description The sender will be blocked and will not go back until the server returns a response, which is equivalent to a synchronous request
 */
public class QueueRequestorSender {
  private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false";
  private static final String USERNAME = null;
  private static final String PASSWORD = null;
  public static void main(String[] args) throws Exception {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);
    ActiveMQConnection conn = (ActiveMQConnection)factory.createConnection();
    // This can't be less, because you need to listen to the returned message. If you just send a message, you can write this
    conn.start();

    QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

    Queue queue = session.createQueue("test");

    QueueRequestor requestor = new QueueRequestor(session, queue);

    TextMessage message = session.createTextMessage("Hello");
    System.out.println("Send synchronization request");
    TextMessage respMsg = (TextMessage)requestor.request(message);
    System.out.println("Request response received");
    System.out.println("Response message content: [" + respMsg.getText() +"]");
  }
}

Several factors affecting performance in production environment

OOM

Configuring memory in activemq startup script

ACTIVEMQ_OPTS=-Xms1G -Xmx1G

Modify the profile and the percentage in the profile

<memoryUsage percentOfJvmHeap="70" />

The SystemUsage configuration sets some system memory and hard disk capacity. When the system consumption exceeds these capacity settings, amq will "slow down producer", which is very important.

Persistent and non persistent

Message asynchronous sending

It is recommended to use the default. If it is forced to open, messages may be lost

The scenario of asynchronously sending a lost message is: the producer sets UseAsyncSend=true and uses producer send (MSG) keeps sending messages. Since the message is not blocked, the producer will think that all sent messages have been successfully sent to MQ. If the server suddenly goes down, all messages in the memory of the producer that have not been sent to MQ will be lost.

The following methods can be set to enable:

new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true)

Batch confirmation

ActiveMQ supports batch confirmation messages by default. Batch confirmation can improve system performance

Closing method:

new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

Consumption buffer and message backlog prefetchSize

On the consumer side, generally speaking, the faster the consumption, the better, and the smaller the backlog of broker s, the better.

However, considering the transaction and client confirmation, if a consumer obtains many messages at one time but does not confirm them, the transaction context will become larger and there will be more data in the "semi consumption state" on the broker side. Therefore, ActiveMQ has a prefetchSize parameter to control the maximum number of records that can be pre obtained without confirmation.

Pre fetch defaults

consumer typedefault value
queue1000
queue browser500
topic32766
durable topic1000

prefetchSize can be set in 3 ways

Overall settings when creating connections

String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false&jms.prefetchPolicy.all=50";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null, null, ACTIVEMQ_HOST);

Set topic and queue separately when creating a connection

String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false&jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null, null, ACTIVEMQ_HOST);

Set separately for destination

Destination topic = session.createTopic("test?consumer.prefetchSize=10");

Note: setting prefetchsize for destination will overwrite the setting value during connection

Is the message pushed or pulled?

When sending a message, it is pushed to the broker

When getting a message:

  • The default is to push one by one

  • Stop pushing messages when the prefetchSize of the customer is full

  • When customer prefetchSize==0, pull the message

Keywords: Java ActiveMQ

Added by jay_bo on Wed, 15 Dec 2021 19:00:53 +0200