Code usage of ActiveMQ

Use by code

1. Point to point mode
Producer code
import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author Producer
 */
public class Producer {

    //Default connection user name
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //Default connection password
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //Default connection address
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // Default connection user name
        String USERNAME = ActiveMQConnection.DEFAULT_USER;
        // Default connection password
        String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        // Default connection address
        String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
        // When it's not the default address, it needs to be written like this
        // String BROKER_URL = "failover://tcp://120.79.150.160:61616";
        // 1. Create a connection factory object
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // The second way
        // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        try {
            // 2. Use the factory object to create a Connection object
            Connection connection = connectionFactory.createConnection();
            // 3. Open the Connection and call the start method of the Connection object
            connection.start();
            // 4. Create a Session object
            // The first parameter: whether to start a transaction. If the transaction is true, two parameters are meaningless. Generally, the transaction is not false
            // The second parameter: answer mode. General automatic response or manual response. General automatic response
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5. Use the Session object to create a Destination object. There are two forms of queue,topic. Now you should use queue
            Destination destination = session.createQueue("MyFirstQueueTest");
            // 6. Create a Producer object using the Session object
            MessageProducer producer = session.createProducer(destination);
            // The setting is not persistent. Learn here. It can be determined according to the project. It can not be set
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 7. Create a Message object and use Message
            TextMessage message = session.createTextMessage("woshi ceshi xiaoxi");
            // Message message = session.createTextMessage("woshi ceshi xiaoxi");
            // 8. Send message
            producer.send(message);
            // Commit a transaction. You need to commit when the response mode is transaction
            session.commit();
            // 9. Close resources
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Consumer code
public void consumer() throws Exception{
    //1. Create connection factory
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    //2. Create a connection
    Connection connection = factory.createConnection();
    //3. Open connection
    connection.start();
    //4. Create session object
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5. To create a destination, it needs to be consistent with the destination of the message sent by the producer, otherwise the message cannot be received
    Queue queue = session.createQueue("MyFirstQueueTest");
    //Using Session object to create a consumer object
    MessageConsumer consumer = session.createConsumer(queue);
    //receive messages
    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message message) {

            TextMessage textMessage = (TextMessage)message;
            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    //Waiting to receive the message, the consumption will be finished
    System.in.read();//Wait until we hit the keyboard
    //close resource
    consumer.close();
    session.close();
    connection.close();
}

PS: the process of creating a session is the same, but the subsequent operations are different

2. Broadcast mode (the difference between PTP mode and PTP mode is that session creates Topic instead of Destination)
Producer code
public void topicProducer() throws Exception{
    //1. Create connection factory
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    //2. Create a connection
    Connection connection = factory.createConnection();
    //3. Open connection
    connection.start();
    //4. Create session object
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5. Create a Destination, which creates a Topic
    Topic topic = session.createTopic("MyFirstQueueTest");
    //6. Create producers
    MessageProducer producer = session.createProducer(topic);
    //7. Create message
    TextMessage message = session.createTextMessage("hello topic");
    //8. Send message
    producer.send(message);
    //9. Close resources
    producer.close();
    session.close();
    connection.close();
}
Consumer code
public void topicConsumer() throws Exception{
    //1. Create connection factory
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    //2. Create a connection
    Connection connection = factory.createConnection();
    //3. Open connection
    connection.start();
    //4. Create session object
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5. Create a Destination, which creates a Topic
    Topic topic = session.createTopic("MyFirstQueueTest");
    //6. Create consumers
    MessageConsumer consumer = session.createConsumer(topic);
    //7. Receive message
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage)message;
            //Get information and print
            try {
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    System.out.println("----Consumer 1 ready----");
    System.in.read();
    //9. Close resources
    consumer.close();
    session.close();
    connection.close();
}
3. Set the mode of message reply

Set response mode (that is, ack message mode, automatic consumption, or manual consumption through code call, etc.)

//The first parameter: whether to start a transaction. If the transaction is true, two parameters are meaningless (session.session ﹣ transacted mode). Generally, transaction false is not enabled
//The second parameter: answer mode. General automatic response or manual response. General automatic response
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Several response modes

Session.AUTO_ACKNOWLEDGE Automatic answer
Session.CLIENT_ACKNOWLEDGE Manual response
Session.DUPS_OK_ACKNOWLEDGE Delayed submission
Session.SESSION_TRANSACTED affair

Manual consumption message

If the mode is client "acknowedge mode, i.e. manual response consumption
The input parameter message of the onMessage rewritten in the listener needs to be manually ack in the method body

message.acknowledge();

Reference article:
https://www.jianshu.com/p/9769a1749778
https://blog.csdn.net/hyhanyu/article/details/79816279

Keywords: Session Apache

Added by Rado001 on Sat, 09 Nov 2019 20:25:32 +0200