ActiveMQ encoding implementation

1. ActiveMq coding implementation

Create a new maven project corresponding to pom.xml Some contents of the file are as follows. jdk is 1.8 and other configurations are generated automatically.

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>
  </dependencies>

2. Producers

public class JmsProduce {

    public static final String ACTIVEMQ_URL = "tcp://10.5.96.48:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[]args) throws JMSException {
        //1. Create connection factory
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Through the connection factory, obtain the connection and start the access
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3. Create session
        //Two eucalyptus trees, the first is business, the second is sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create a destination (specifically queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5. Create the producer of the message
        MessageProducer messageProducer = session.createProducer(queue);
        //6. Use message producer to produce 3 messages and send them to the queue
        for (int i = 0; i <3 ; i++) {
            // 7. Create message
            TextMessage textMessage = session.createTextMessage("Msg---" + i);
            //8. Message producer sends to mq
            messageProducer.send(textMessage);
        }
        //9. Turn off the resources and turn them on backward
        messageProducer.close();
        session.close();
        connection.close();

        System.out.println("**** Message to MQ complete ****");
    }

}

Execute the main method, as shown in the figure

Front page

Number of Pending Message the number of messages waiting to be consumed that are not currently queued. Formula = total received - Total outgoing

Number of Consumer data quantity of consumers

Messages Enqueued the total number of messages in the queue, including those out of the queue. This number only increases or does not decrease

Messages Dequeued messages means the number of messages consumed by consumers

Code execution successfully sent 3 messages to mq queue queue01

3 consumer receive() this is the first way

public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://10.5.96.48:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[]args) throws JMSException {
        //1. Create connection factory
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Through the connection factory, obtain the connection and start the access
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3. Create session
        //Two parameters, the first is transaction and the second is sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create a destination (queue or Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //Create consumers
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while(true){
            TextMessage textMessage =  (TextMessage) messageConsumer.receive();
            if( null !=textMessage ){
                System.out.println("***Consumer receives news:" + textMessage.getText());
            }else{
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }

Note that the red running light in the background is always on, indicating that the program has been running. This is that the receive() method has been waiting for messages. Even after three messages have been eliminated, the wait is still terminated.


 

The front end shows that there is a consumer consuming three messages

 

When you click the "red" button at the end of operation

At the end of the program, look at the changes in the front desk. One of the consumers in the red box becomes 0. The program is over, and the consumers are gone

When the receive() method is passed into the parameter 4000, it means that the message can not be received in 4 seconds, the consumer leaves and the program is finished

Summary

/**
 *Mode 1
 *Synchronous blocking mode receive()
 *The subscriber or receiver calls the MessageConsumer's receive() method to receive the message. The receive method can receive the message
 *Blocking until message (or timeout)
 **/

4 receive mode 2

//Add listener
 messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("****Consumer received message:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read(); //Ensure that the console runs all the time, consume when there is a message, monitor when there is no message
        messageConsumer.close();
        session.close();
        connection.close();

Summary

/**Mode 2
         *Consume message messageconsumer messageconsumer through listener= session.createConsumer ();
         *Asynchronous non blocking mode (listener onMessage())
         *The subscriber or receiver registers a message listener through the setMessageListener(MessageListener listener) of MessageConsumer
         *When the message arrives, the system automatically calls the onMessage(Message Message) method of the listener
         */

 

Keywords: Session Junit Maven xml

Added by IcedEarth on Wed, 24 Jun 2020 08:20:44 +0300