ActiveMQ Message Queuing implements Point-to-Point (Queue) and Publish/Subscribe (Topic)

(Message Queue Message Queue) JMS

Preface

JMS, Java Message Service Application Interface, is a Java platform API for Message-Oriented Middleware (MOM), used to send messages between two applications for asynchronous communication
JMS is a vendor-independent API for accessing and receiving system messages, similar to JDBC(Java Database Connectivity). Here, JDBC is an API that can be used to access many different relational databases, while JMS provides vendor-independent access methods to access messaging services.

1. What is ActiveMQ?

ActiveMQ is the most popular and powerful open source messaging bus from Apache. ActiveMQ is a fully supported JMS1. JMS implementation of 1 and J2EE 1.4 specifications, although it has been a long time since the JMS specification was introduced, JMS still plays a special role (sending messages between two applications) in today's J2EE applications.

2. Start MQ

Double-click ActiveMQ directly. Ba is OK, note that ActiveMQ in win64 is selected according to the system selection (window 64 bit)

Start succeeded.

Url:http://127.0.0.1:8161/admin/index.jsp
Default username and password: admin/admin

ActiveMQ's console, the published messages can be seen.

3. Differences between Queue and Topic

1. Queue

2. Publish/Subscribe (Topic)

3. Differences between peer-to-peer and publish/subscribe

4. Implementation of ActiveMQ Point-to-Point Code

1. Sender

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Production message to Queue
{
    public static void main(String[] args) {
        //Create Connection Factory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "tcp://localhost:61616");
        //Declare Connection
        Connection connection = null;
        //Create Session: for sending and receiving messages
        Session session = null;
        //Create the destination of the message
        Destination destination = null;
        //Declare message producer
        MessageProducer producer = null;


        try {
            //1, get a connection to ActiveMQ
            connection = connectionFactory.createConnection();
            //2, Open this connection
            connection.start();
            //3, create a Session object: this will be a thread for sending or receiving messages
            //The first createSession parameter, transacted, indicates whether transactions are supported
            //The second parameter acknowledgeMode has the following
            //1.AUTO_ACKNOWLEDGE automatic confirmation mode, which does not require client confirmation (usually used)
            //2.CLIENT_ACKNOWLEDGE Client Confirmation
            //A message is required after the client gets the message.acknowledge();
            //3.DUPS_OK_ACKNOWLEDGE allows duplicate messages
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            //4, create the destination and indicate the name of the queue where the message is placed
            destination = session.createQueue("Xiangfang Railway Bureau");//Queue is a Destination subinterface
            //5, create a producer that sends messages to that destination
            producer = session.createProducer(destination);
            //6. Set persistence mode
            // Persistence Mode NON_PERSISTENT is not persistent, PERSISTENT persistent, persistent by default
            //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Default is persistent
            //Send three messages
            for(int cs=1;cs<=3;cs++) {
                //7, Create a message to send
                TextMessage message = session.createTextMessage("T1978is over  "+cs);
                //8, Send message to destination
                producer.send(message);
            }
            //9, submit a message
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        finally
        {
            //Close Connection
            try {
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

2. Message Receiver

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class From the specified Queue receive messages
{
    public static void main(String[] args) {
        //Declare connection factory
        ConnectionFactory connectionFactory=null;
        //Declare Connection
        Connection conn = null;
        //Declare Session
        Session session = null;
        //Declare Destination
        Destination destination =null;
        //Declare Recipient
        MessageConsumer consumer =null;

        try
        {
            //Create Connection Factory
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
         /*   connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");*/
            //Create Connection Object
            conn = connectionFactory.createConnection();
            //Open Connection
            conn.start();//This can't be forgotten
            //Create Session, false does not start transaction, AUTO_ACKNOWLEDGE automatic confirmation mode, no client confirmation required
            session = conn.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            //Create Destination
            destination = session.createQueue("Xiangfang Railway Bureau");
            //Create Message Receiver
            consumer = session.createConsumer(destination);
   /*         TextMessage textMessage =  (TextMessage)consumer.receive();

            System.out.println("The message received is: "+textMessage.getText()";*/
            while(true)
            {
                //consumer.receive();
                    //Returns null if none of the values in the message queue in one second
                    TextMessage textMessage = (TextMessage) consumer.receive(1000);
		//End the loop if no information is available
                    if(textMessage == null)
                    {
                        break;
                    }
                    System.out.println(textMessage.getText());
            }
	session.commit();//Messages will only be removed (removed) from MQ if submitted

        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            try {
                conn.close();

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

5. ActiveMQ Publish/Subscribe Code Implementation

1. Message Publisher

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class PublisherDemo
{
    public static void main(String[] args) {
        //Declare connection factory
        ConnectionFactory connectionFactory = null;
        //Declare Connection
        Connection connection = null;
        //Declare Session
        Session session =null;
        //Declare message destination
        Destination destination =null;
        //Declare message publisher
        MessageProducer messageProducer =null;
        try
        {
            //Create Connection Factory
            connectionFactory = new ActiveMQConnectionFactory(
                    "admin",
                    "admin",
                    "tcp://localhost:61616"
            );
            //Create Connection
            connection = connectionFactory.createConnection();
            //Start Connecting
            connection.start();
            //Create Session
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            //Create Destination
            destination = session.createTopic("Acheng Railway Administration");//createTopic means publish/subscribe mode
            //Create message publisher
            messageProducer = session.createProducer(destination);
            //send message
            TextMessage textMessage =session.createTextMessage("T1983 is over");
            messageProducer.send(textMessage);
            //Submit
            session.commit();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            try {
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }
}

2. Message Subscribers

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class SubscriberDemo
{
    public static void main(String[] args) {
        //Declare connection factory
        ConnectionFactory connectionFactory = null;
        //Declare Connection
        Connection connection = null;
        //Declare Session
        Session session =null;
        //Declare message destination
        Destination destination =null;
        //Declare message publisher
        MessageConsumer messageConsumer =null;
        try
        {
            //Create Connection Factory
            connectionFactory = new ActiveMQConnectionFactory(
                    "admin",
                    "admin",
                    "tcp://localhost:61616"
            );
            //Create Connection
            connection = connectionFactory.createConnection();
            //Start Connecting
            connection.start();
            //Create Session
            session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            //Create Destination
            destination = session.createTopic("Acheng Railway Administration");//createTopic means publish/subscribe mode
            //Create message subscribers
            messageConsumer = session.createConsumer(destination);
            //receive messages
           messageConsumer.setMessageListener(new MessageListener(){//Implement MessageListener interface with anonymous internal class object
               @Override
               public void onMessage(Message message) {//Receive Subscriptions
                   TextMessage textMessage = (TextMessage) message;
                   try {
                       System.out.println("Subscriber:"+textMessage.getText());
                   } catch (JMSException e) {
                       e.printStackTrace();
                   }
               }
           });
            //Submit
            session.commit();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }//Note as a Subscriber: 1. Make sure to start the publication waiting for the message first, 2. The connection cannot be closed in the code
    }
}

summary

Many vendors support ActiveMQ, which we are learning today (ActiveMQ is just one of the Java products that implement the JMS operational standard, that is, all Java products that implement the JMS operational standard can be called MQ, and its main application is to enable messages to be sent between two applications asynchronously) (main application: Weather Forecast Service).

Keywords: Java Big Data RabbitMQ queue ActiveMQ

Added by Bac on Wed, 22 Dec 2021 07:36:45 +0200