1. ActiveMq coding implementation
Create a new maven project corresponding to pom.xml Some contents of the file are as follows. jdk is 1.8 and other configurations are generated automatically.
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> </dependencies>
2. Producers
public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://10.5.96.48:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[]args) throws JMSException { //1. Create connection factory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Through the connection factory, obtain the connection and start the access Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3. Create session //Two eucalyptus trees, the first is business, the second is sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create a destination (specifically queue or topic) Queue queue = session.createQueue(QUEUE_NAME); //5. Create the producer of the message MessageProducer messageProducer = session.createProducer(queue); //6. Use message producer to produce 3 messages and send them to the queue for (int i = 0; i <3 ; i++) { // 7. Create message TextMessage textMessage = session.createTextMessage("Msg---" + i); //8. Message producer sends to mq messageProducer.send(textMessage); } //9. Turn off the resources and turn them on backward messageProducer.close(); session.close(); connection.close(); System.out.println("**** Message to MQ complete ****"); } }
Execute the main method, as shown in the figure
Front page
Number of Pending Message the number of messages waiting to be consumed that are not currently queued. Formula = total received - Total outgoing
Number of Consumer data quantity of consumers
Messages Enqueued the total number of messages in the queue, including those out of the queue. This number only increases or does not decrease
Messages Dequeued messages means the number of messages consumed by consumers
Code execution successfully sent 3 messages to mq queue queue01
3 consumer receive() this is the first way
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://10.5.96.48:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[]args) throws JMSException { //1. Create connection factory ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Through the connection factory, obtain the connection and start the access Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3. Create session //Two parameters, the first is transaction and the second is sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create a destination (queue or Topic) Queue queue = session.createQueue(QUEUE_NAME); //Create consumers MessageConsumer messageConsumer = session.createConsumer(queue); while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(); if( null !=textMessage ){ System.out.println("***Consumer receives news:" + textMessage.getText()); }else{ break; } } messageConsumer.close(); session.close(); connection.close(); }
Note that the red running light in the background is always on, indicating that the program has been running. This is that the receive() method has been waiting for messages. Even after three messages have been eliminated, the wait is still terminated.
The front end shows that there is a consumer consuming three messages
When you click the "red" button at the end of operation
At the end of the program, look at the changes in the front desk. One of the consumers in the red box becomes 0. The program is over, and the consumers are gone
When the receive() method is passed into the parameter 4000, it means that the message can not be received in 4 seconds, the consumer leaves and the program is finished
Summary
/** *Mode 1 *Synchronous blocking mode receive() *The subscriber or receiver calls the MessageConsumer's receive() method to receive the message. The receive method can receive the message *Blocking until message (or timeout) **/
4 receive mode 2
//Add listener messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("****Consumer received message:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); //Ensure that the console runs all the time, consume when there is a message, monitor when there is no message messageConsumer.close(); session.close(); connection.close();
Summary
/**Mode 2 *Consume message messageconsumer messageconsumer through listener= session.createConsumer (); *Asynchronous non blocking mode (listener onMessage()) *The subscriber or receiver registers a message listener through the setMessageListener(MessageListener listener) of MessageConsumer *When the message arrives, the system automatically calls the onMessage(Message Message) method of the listener */