Message oriented middleware ActiveMQ

ActiveMQ

Introduction to ActiveMQ

ActiveMQ is the most popular and powerful open source message bus produced by Apache. ActiveMQ is a
Fully support JMS1 1 and J2EE 1.4 specifications, although the JMS specification has been issued for a long time
But JMS still plays a special role in today's J2EE applications.

What is news

A message is a unit of data transmitted between two computers. Messages can be very simple, such as containing only text strings;
It can also be more complex and may contain embedded objects.

What is a queue

What is a message queue

Message queue is a container that holds messages during transmission

Common message service applications

  • ActiveMQ
    ActiveMQ is the most popular and powerful open source message bus produced by Apache. ActiveMQ is a fully supported JMS 1 1 and J2EE 1.4 specifications.
  • RabbitMQ
    RabbitMQ is a reusable enterprise message system based on AMQP. He follows the Mozilla Public License open source agreement. The development language is Erlang
  • RocketMQ
    A set of message queue application services defined and developed by Alibaba

Application scenario of message service

The main feature of message queue is asynchronous processing. The main purpose is to reduce request response time and decoupling. Therefore, the main usage scenario is to put the operation that is time-consuming and does not need to return the result in real time (synchronization) into the message queue as a message. At the same time, due to the use of message queue, as long as the message format remains unchanged, the sender and receiver of the message do not need to contact each other or be affected by each other, that is, decoupling and

Asynchronous processing

User registration

User registration process:

  • Register, process and write database
  • Send SMS with successful registration
  • Send mail information of successful registration

If you use message oriented middleware: you can create two threads to do these things and send messages directly to message oriented middleware,
Then let the email service and SMS service go to the message middleware to get the message, and then do the corresponding business operation after getting the message. It's so convenient

Applied decoupling

Order processing

Order generation process:

  1. Click settlement in the shopping cart
  2. Complete payment
  3. Create order
  4. Call inventory system

After the order is completed, the order system does not directly call the inventory system, but sends a message to the message middleware and writes a message
Order information. The inventory system obtains the information from the message middleware, then processes the delivery and updates the inventory, which can
Fast enough to realize the pursuit of Internet applications. After the inventory system reads the order, the operation of inventory application is also non-standard
Often fast, so having message oriented middleware is also a good direction for decoupling.

Peak clipping of flow

Second kill function

Second kill process:

  1. User clicks second kill
  2. Send request to seckill app
  3. Put the request into the message queue before requesting the second kill application
  4. The seckill application obtains the request from the message queue and processes it.

For example, the system holds second kill activities and popular goods. The flow is swarming to 100 goods, and 100000 people are crowded in. What's the matter
What? 100000 second kill operation, put it into the message queue. The second kill application processes the top 100 of the 100000 requests in the message queue
One, others call back, notification failed. The traffic peak is controlled at the message queue, and the second kill application will not be killed instantly

JMS

JMS (Java Messaging Service) is a technical specification of message oriented middleware on Java platform, which is convenient for
Java applications in the message system exchange messages, and provide standard access to generate, send and receive messages
To simplify the development of enterprise applications.

Point to point model

The producer sends a message to the queue, and only one consumer can receive it.

Publish / subscribe model

Only subscribers who subscribe to topic will receive messages sent by publishers to topic

ActiveMQ installation

You can download the installation package on the ActiveMQ official website: http://activemq.apache.org

be careful:

  • ActiveMQ5. Version 10. X and above must use jdk1 8 can be used normally.
  • ActiveMQ5.9.x and below use jdk1 7 can be used normally

Extract the installation files

tar -zxf apache-activemq-5.9.0-bin.tar.gz

Copy applied to local directory

cp -r apache-activemq-5.9.0 /usr/local/activemq

Start ActiveMQ

/usr/local/activemq/bin/activemq start

If activemq does not have permission to execute the file, it needs to be added through the chmod command

Testing ActiveMQ

ps aux | grep activemq

Seeing the following indicates successful startup

Management interface

Use the browser to access the ActiveMQ management application. The address is as follows:
http://ip:8161/admin/
User name: admin
Password: admin

Modify access port:

vim /usr/local/activemq/conf/jetty.xml

Modify user name and password

vim /usr/local/activemq/conf/users.properties

Then set the following key value pairs

Change to
User defined user name: user defined password
that will do

Note: whether you want to change the port or the user name or password, you need to restart activemq before it takes effect

Restart ActiveMQ

/usr/local/activemq/bin/activemq restart

Turn off ActiveMQ

/usr/local/activemq/bin/activemq stop

Configuration file ActiveMQ xml

In the configuration file, the core configuration information of ActiveMQ is configured Is the configuration used when providing services Can be modified
Access port to start That is, the access port to access ActiveMQ in java programming
The default port is 61616
The protocol used is: tcp protocol
Modify port:

vim /usr/local/activemq/conf/activemq.xml

After modifying the port, save and restart ActiveMQ service

ActiveMQ directory introduction

  • bin stores script files
  • conf stores the basic configuration file
  • data stores log files
  • docs stores instruction documents
  • Examples store simple examples
  • lib stores the jar package required by activemq
  • webapps is the directory used to store the project

ActiveMQ terminology

Destination

Destination: the JMS Provider (Message Middleware) is responsible for maintaining the object used to manage messages.
MessageProducer needs to specify a Destination to send messages, and MessageReceiver needs to specify a Destination to receive messages.

Producer

The Message generator is responsible for sending the Message to the destination.

Consumer | Receiver

The Message consumer is responsible for consuming [process | monitor | subscribe] messages from the destination

Message

Message encapsulates the content of a communication

ActiveMQ application

Introduction to ActiveMQ common API s

  • ConnectionFactory: link factory, the type of factory used to create the link
  • Connection: link The type used to establish a connection to ActiveMQ, created by the link factory
  • Session: session, a persistent and stateful access Created by link
  • Destination & Queue:
    Destination, used to describe the message access destination of ActiveMQ this time That is, the specific queue in ActiveMQ service Created by session
    interface Queue extends Destination·
  • MessageProducer:
    Message consumer [message subscriber, message handler], a tool used to obtain messages from ActiveMQ service in a valid session Created by session
  • Message:
    Message, the data carrier object used by the message generator when sending a message to ActiveMQ service or the data carrier object used by the message consumer when getting a message from ActiveMQ service It is the top-level interface of specific types of all messages [text message, object message, etc.) It can be created through a session or obtained from ActiveMQ service through a session

ActiveMQ simple instance

  • pom.xml dependency
<!--https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.9.0</version>
</dependency>
  • Message producer (first maven project)
public class HelloWorldProducer {
	public void sendHelloWorldActiveMQ(String 
msgTest){
		//Define link factory
		ConnectionFactory connectionFactory = null;
		//Define linked objects
		Connection connection = null;
		//Define session
		Session session = null;
		//destination
		Destination destination = null;
		//Defines the sender of the message
		MessageProducer producer = null;
		//Define message
		Message message = null;
		try{

			/**
			* userName:The user name to access the ActiveMQ service. User password. The default is admin.
			* password:The user name to access the ActiveMQ service. User password. The default is admin.
			* brokerURL:The path address to access the ActiveMQ service. Path structure: protocol name: / / host address: port number
			*/
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.70.151:61616");
	
			//Create connection object
			connection = 
			connectionFactory.createConnection();
			//Start connection
			connection.start();
			/**
			* transacted:Whether to use transactions. The optional values are: true|false
				* true:Use transactions when setting this variable value. Session.SESSION_TRANSACTED
				* false:Not applicable to transactions. If this variable is set, the acknowledgeMode parameter must be set
			* acknowledgeMode:
				* Session.AUTO_ACKNOWLEDGE:Automatic message confirmation
				mechanism
				* Session.CLIENT_ACKNOWLEDGE:Client confirmation
				mechanism
				* Session.DUPS_OK_ACKNOWLEDGE:Guest with copy
			Client confirmation message mechanism
			*/
			session = connection.createSession(false, 
	Session.AUTO_ACKNOWLEDGE);
	
			//Create a destination. The destination name is the name of the queue. The consumer of the message needs to access the corresponding queue through this name
			destination = session.createQueue("helloworld-destination");
			//Create the producer of the message
			producer = session.createProducer(destination);
			//Create message object
			message = session.createTextMessage(msgTest);
			//send message
			producer.send(message);
		}catch(Exception e){
			e.printStackTrace();
		}finally{
		//Reclaim message sender resources
			if(producer != null){
				try {
					producer.close();
				} catch (JMSException e) {
				// TODO Auto-generated catch block
					e.printStackTrace();
				} 
			}
			
			if(session != null){
				try {
					session.close();
				} catch (JMSException e) {
				// TODO Auto-generated catch block
					e.printStackTrace();
				} 
			}
			
			if(connection != null){
				try {
					connection.close();
				} catch (JMSException e) {
				// TODO Auto-generated catch block
					e.printStackTrace();
				} 
			} 
		} 
	} 
}
  • Message consumer (second maven project)
public class HelloWorldConsumer {
	public void readHelloWorldActiveMQ() {
		// Define link factory
		ConnectionFactory connectionFactory = null;
		// Define linked objects
		Connection connection = null;
		// Define session
		Session session = null;
		// destination
		Destination destination = null;
		// Defines the sender of the message
		MessageConsumer consumer = null;
		// Define message
		Message message = null;
		try {
			/** 
			* userName:The user name to access the ActiveMQ service. The default is admin.
			* password:The user name to access the ActiveMQ service. The default is admin.
			* brokerURL:The path address to access the ActiveMQ service.
			* Path structure: protocol name: / / host address: port number
			*/
			connectionFactory = new
			ActiveMQConnectionFactory("admin", "admin","tcp://192.168.70.151:61616");
			// Create connection object
			connection =  connectionFactory.createConnection();
			// Start connection
			connection.start();
			/**
			* transacted:Whether to use transaction. The optional values are:
			* true|false 
				* true:Use transactions when setting the value of this variable. Session.SESSION_TRANSACTED 
				* false:If this variable is not applicable to transactions, the acknowledgeMode parameter must be set
			* acknowledgeMode:
				* Session.AUTO_ACKNOWLEDGE:Automatic message confirmation mechanism
				* Session.CLIENT_ACKNOWLEDGE:Client confirmation mechanism
				* Session.DUPS_OK_ACKNOWLEDGE:Client acknowledgement message mechanism with replica
			*/
			session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
			// Create a destination. The destination name is the name of the queue. The consumer of the message needs to access the corresponding queue through this name
			destination = session.createQueue("helloworld-destination");
			// Consumer who created the message
			consumer =session.createConsumer(destination);
			// Create message object
			message = consumer.receive();
			//Processing messages
			String msg = ((TextMessage)message).getText();
			System.out.println("from ActiveMQ Text information obtained from the service "+msg);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// Reclaim message sender resources
			if (consumer != null) {
				try {
					consumer.close();
				} catch (JMSException e) {
				// TODO Auto-generated catch block
					e.printStackTrace();
				} 
			}
			if (session != null) {
				try {
					session.close();
				} catch (JMSException e) {
				// TODO Auto-generated catch block
					e.printStackTrace();
				} 
			}
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
				// TODO Auto-generated catch block
					e.printStackTrace();
				} 
			} 
		} 
	} 
}
  • test
    producer
public class Test {
	public static void main(String[] args) {
		HelloWorldProducer producer = new HelloWorldProducer();
		producer.sendHelloWorldActiveMQ("HelloWorld");
	} 
}

consumer

public class Test {
	public static void main(String[] args) {
		HelloWorldConsumer consumer = newHelloWorldConsumer();
		consumer.readHelloWorldActiveMQ();
	}
}

ActiveMQ processing object information

The class of the object that can be processed must implement the Serializable interface

  • Changes in producer code
//When processing string information in the above simple example:
// message = session.createTextMessage(msgTest);

//When processing object information
//Create message object
message = session.createObjectMessage(users);
  • Changes in consumer code
//When processing string information in the above simple example: 
//String msg = ((TextMessage)message).getText();

//When processing object information
// Create message object
message = consumer.receive();
//Processing messages
ObjectMessage objMessage = (ObjectMessage)message;
Users users = (Users)objMessage.getObject();

JMS - implement queue service listening

  • No change in producer code
  • Changes in consumer code
//In the above simple example: 
//String msg = ((TextMessage)message).getText();

//Queue service
consumer.setMessageListener(new MessageListener() {
	//Method of ActiveMQ callback. This method passes the message to the consumer
	@Override
	public void onMessage(Message message) {
		//Processing messages
		String msg=null;
		try {
			msg = ((TextMessage)message).getText();
		} catch (JMSException e) {
		// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("from ActiveMQ Text information obtained from the service "+msg);
	}
});

Publish/Subscribe processing mode (Topic)

  • Changes in producer code
//In the above simple example: 
//destination = session.createQueue("helloworld-destination");

//Publish/Subscribe processing mode
destination = session.createTopic("test-topic");
  • Changes in consumer code
//In the above simple example: 
//destination = session.createQueue("helloworld-destination");
//String msg = ((TextMessage)message).getText();

//Publish/Subscribe processing mode
destination = session.createTopic("test-topic");

consumer.setMessageListener(new MessageListener() {
	//Method of ActiveMQ callback. This method passes the message to the consumer
	@Override
	public void onMessage(Message message) {
		//Processing messages
		String msg=null;
		try {
			msg = ((TextMessage)message).getText();
		} catch (JMSException e) {
		// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("from ActiveMQ Text information obtained from the service "+msg);
	}
});

//The run method is overridden in the producer class
@Override
public void run() {
	this.readHelloWorldActiveMQ();
}

Keywords: Java RabbitMQ Middleware message queue

Added by jonwondering on Tue, 08 Mar 2022 18:49:03 +0200