01. Transmission protocol of ActiveMQ
Refer to official documents: http://activemq.apache.org/configuring-version-5-transports.html
summary
The client broker (client MQ server) communication protocols supported by ActiveMQ include TCP, NIO, UDP, SSL, Http(s) and VM. The file for configuring the Transport Connector is in conf / ActiveMQ in the ActiveMQ installation directory Within the < transportconnectors > tag in XML, as follows:
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
In the configuration information given above, the header of URI description information adopts the protocol name: for example
- When describing the listening port of amqp protocol, the URI description format adopted is amqp: / / ······;
- When describing the listening port of stomp protocol, the URI description format is stomp: / / ······;
- Only when the openwire protocol is described, the URI header uses tcp: / / ···········. This is because the default message protocol in ActiveMQ is openwire
Supported transport protocols
be careful:
-
In addition to tcp and nio protocols, other understanding is OK.
-
Each protocol has its own middleware that is good at the protocol. Generally, activemq will not be used to implement these protocols.
-
mqtt is a special protocol for the Internet of things, and the middleware used is generally mosquito.
-
ws is the protocol of websocket, which is commonly used to connect with the front end. Generally, a base station (Middleware) is embedded in java code.
-
stomp seems to be the protocol used by mailboxes. Major mailbox companies have base stations (Middleware).
Different protocols, our code will be different.
Network protocols supported by ActiveMQ:
agreement | describe |
---|---|
TCP | The default protocol has relatively good performance |
NIO | Based on TCP protocol, it has been expanded and optimized, which has better expansibility |
UDP | Performance is better than TCP, but it is not reliable |
SSL | Secure connection |
HTTPS | Based on HTTP or HTTPS |
VM | VM itself is not a protocol. When the client and agent run in the same java virtual machine (VM), they want to communicate directly instead of occupying the network channel |
TCP protocol
(1) Transmission Control Protocol(TCP) is the default, and the Client listening port of TCP is 61616
(2) Before transmitting data over the network, data must be serialized. Messages are serialized into byte streams through a called wire protocol.
(3) The URI form of TCP connection is as follows: tcp://HostName:port?key=value&key=value , the following parameters are optional.
(4) Advantages of TCP transmission:
- TCP protocol has high transmission reliability and strong stability
- High efficiency: byte stream transmission is very efficient
- Effectiveness and availability: it is widely used and supports any platform
(5) For optional configuration parameters of Transport protocol, please refer to the official website http://activemq.apache.org/tcp-transport-reference
NIO protocol
(1) New I/O API Protocol(NIO)
(2) NIO protocol is similar to TCP protocol, but NIO focuses more on the underlying access operations. It allows developers to have more client calls to the same resource and more load on the server.
(3) Scenarios suitable for using NIO protocol:
- There may be a large number of clients connecting to the Broker. Generally, a large number of clients connecting to the Broker are limited by the threads of the operating system. Therefore, NIO implementation requires fewer threads to run than TCP, so NIO protocol is recommended.
- There may be a slow network transmission for Broker, and NIO provides better performance than TCP.
(4) The URI form of NiO connection is as follows: nio://hostname:port?key=value&key=value
(5) For optional configuration parameters of Transport protocol, please refer to the official website http://activemq.apache.org/nio-transport-reference
AMQP protocol (understand)
Official website address: http://activemq.apache.org/amqp
STOMP protocol (understand)
Official website address: http://activemq.apache.org/stomp
MQTT protocol (understand)
Official website address: http://activemq.apache.org/mqtt
NIO protocol case
We have used a lot of TCP protocol cases before, so we won't repeat them. Let's see how NIO protocol is used
- BIO: synchronous blocking IO
- BOI: synchronous non blocking IO, new IO
- AIO: asynchronous non blocking IO
The bottom layer of ActiveMQ protocol transmission uses the IO model of BIO network by default. NiO's IO model is used only when we specify NiO.
Modify the configuration file ActiveMQ xml
If the network listening ports of ActiveMQ are not specifically specified, these ports will use the IO model of BIO network (OpenWire, STOMP, AMQP, etc.), so in order to improve the throughput performance of a single ActiveMQ server, we need to explicitly specify the network IO model of ActiveMQ as NIO, which is a NIO network IO model based on TCP protocol.
-
① Modify the configuration file ActiveMQ XML add the following content under the < transportconnectors > node:
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
-
② Restart activemq after modification:
.\activemq restart
-
③ Viewing the management background, you can see that there is an additional nio on the page.
Producer code
public class Jms_TX_Producer { private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61618"; private static final String ACTIVEMQ_QUEUE_NAME = "nio-test"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //queue Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageProducer producer = session.createProducer(queue); try { for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("tx msg--" + i); producer.send(textMessage); } System.out.println("Message sending completed"); } catch (Exception e) { e.printStackTrace(); } finally { //8. Close resources producer.close(); session.close(); connection.close(); } } }
Consumer code
public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61618"; private static final String ACTIVEMQ_QUEUE_NAME = "nio-test"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { try { TextMessage textMessage = (TextMessage) message; System.out.println("***Messages received by consumers: " + textMessage.getText()); } catch (Exception e) { System.out.println("Exception occurs, consumption fails, give up consumption"); } } } }); System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
NIO protocol case enhancement
The above is the NIO network IO model used at the bottom of Openwire protocol transmission. How to use NIO network IO model for other protocol transmission bottom layers?
The URI format starts with NiO, indicating that this port uses the NiO network IO model based on TCP protocol. The previous setting method can only make one port support OpenWire protocol, that is, port 6168 supports NiO.
How to make a port support NIO network model and multiple protocols?
That is, no matter which protocol you use, you can access this port, and this port is in the form of NIO
Note: the previous ports only support a specific protocol. If the protocol name and port number do not correspond, they cannot be accessed.
Solution:
Using the auto keyword
Use the + symbol to set multiple properties for the port
Modify the configuration file ActiveMQ xml
Modify the configuration file ActiveMQ XML add the following content under the < transportconnectors > node:
><transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
- auto: for all protocols, it will identify what protocol we are.
- NIO: use NIO network IO model.
- Restart activemq after modifying the configuration file
Producer code
//For the tcp protocol producer using nio model, other codes are the same as before public class Jms_TX_Producer { private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61608"; private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio"; public static void main(String[] args) throws JMSException { ...... } }
//nio protocol producer using nio model, other codes are the same as before public class Jms_TX_Producer { private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61608"; private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio"; public static void main(String[] args) throws JMSException { ...... } }
Consumer code
public class Jms_TX_Consumer { //For tcp protocol consumers using nio model, other codes are the same as before private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61608"; private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio"; public static void main(String[] args) throws JMSException, IOException { ...... } }
//For nio protocol consumers using nio model, other codes are the same as before public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "nio://127.0.0.1:61608"; private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio"; public static void main(String[] args) throws JMSException, IOException { ...... } }
02. Message storage and persistence of ActiveMQ
Official website document address: http://activemq.apache.org/persistence
summary
(1) Here is the difference between persistence and previous persistence
- MQ high availability: transaction, persistence, and sign in are features of MQ itself. Persistence here is an external force, an external plug-in. The persistence mentioned before is the external expression of MQ, and now the persistence is the underlying implementation.
(2) What is persistence?
What is persistence?
- In a word: ActiveMQ is down, and messages will not be lost.
explain:
- In order to avoid the loss of information after accidental downtime, it is necessary to restore the message queue after restart, and half of the message system will adopt the persistence mechanism.
- ActiveMQ's message persistence mechanisms include JDBC, AMQ, KahaDB and LevelDB. No matter which persistence method is used, the message storage logic is the same, that is, after the sender sends the message, the message center first stores the message in the local data file, memory database or remote database, and then tries to send the message to the receiver, If successful, delete the message from the storage, and if failed, continue to try to send.
- After the message center is started, check whether there are messages that have not been successfully sent in the specified storage location. If so, the messages in the storage location will be sent first.
What are the ways of persistence
(1) AMQ Message Store
- File based storage mechanism, which was the default mechanism in the past, is no longer used.
- AMQ is a file storage form. It has the characteristics of fast writing speed and easy recovery. The default size of the file in the message storage file is 32M. When all the messages in a file have been consumed, the file will be marked as removable. In the next cleaning stage, the file will be deleted.
- AMQ is applicable to activemq5 Versions prior to 3
(2) kahaDB
- Now the default. Let's introduce it in detail below.
(3) JDBC message store
- Let's introduce it in detail below.
(4) LevelDB message store
- Too new technology, now some uncertainty.
(5) JDBC Message Store with ActiveMQ Journal
- Let's introduce it in detail below.
kahaDB message store
summary
Based on the log file, from activemq5 4 (inclusive) start the default persistence plug-in.
Official website documents: http://activemq.apache.org/kahadb , there are other configuration parameters on the official website.
In the configuration file ActiveMQ XML can be configured as follows:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
The log files are stored in:% activemq installation directory% / data/kahadb
explain
- KahaDB is the default storage method. Can it be used in any scenario? It improves performance and recovery.
- The message store uses a transaction log and an index file to store all its addresses.
- KahaDB is a solution specifically for message persistence, which optimizes typical message usage patterns
- The data is appended to the data logs. When the data in the log file is not needed, the log file will be discarded.
Storage principle of KahaDB
KahaDB has only four types of files in the directory where messages are saved. Compared with other file storage engines such as lock and ActiveMQ, this is very concise
- db-<num>. Log, KahaDB stores messages in a data record file with a predefined size, and the file name is DB - < num > Log, when the data file is full, a new file will be created, and the value of num will increase accordingly. When there are no more messages referenced in the data file, the file will be deleted or archived.
- db. The data file contains the persistent BTree index, which indexes the messages in the message data record. It is the message index file, which is essentially a B-Tree. The B-Tree is used as the index to point to DB - < num > Log.
- db.free file, current dB Those pages in the data file are idle. The specific content of the file is the ID of all idle pages.
- db.redo file is used for message reply. If the KahaDB message store is started after forced exit, it is used to recover the B-Tree index.
- Lock file lock, which indicates the broker currently obtaining read-write permission
JDBC message store
One sentence: MQ+MySQL
- Select MySQL as the persistence method
MQ+MySQL
Add the mysql database driver package to the lib folder
Add the mysql database driver package to:% ActiveMQ installation location% / lib
JDBC persistenceadapter configuration
Modify the ActiveMQ configuration file ActiveMQ xml:
-
Before modification:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
-
After modification
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/> </persistenceAdapter>
dataSource Specifies the name of the persistent database that will be referenced bean Name of createTablesOnStartup Whether to create a database table at startup. The default is true, In this way, a database table will be created every time it is started. Generally, it is set to true Later read false
Database connection pool configuration
We need to prepare a mysql database, create a database named ActiveMQ, and then configure it in the ActiveMQ configuration file: ActiveMQ Configuration in XML
<!-- Database configuration --> <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://IP:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="mysql Database user name"/> <property name="password" value="mysql Database password"/> <property name="poolPreparedStatements" value="true"/> </bean>
Note: the newly created database shall adopt latin1 code or ASCII code.
The default is the dbcp database connection pool. If you want to change to another database connection pool, you need to put the connection pool jar package into the lib directory.
Create SQL databases and tables
The database creation statement is as follows, specifying the encoding method:
create database activemq default character set latin1;
Restart activemq and the following three tables will be generated automatically. If they are not generated automatically, we need to manually execute SQL to generate these three tables.
-
ACTIVEMQ_MSGS data table: both queue and topic exist in it
-
ACTIVEMQ_ACKS data table: stores the message of the persistent subscription and the message ID received by the last persistent subscription
-
ACTIVEMQ_LOCK data table: it is only used in the cluster environment. Only one broker can get messages, which is called the master broker. Others can only be backed up and wait until the master broker is unavailable, which can be called the next master broker. This table is used to record which broker is the current master broker
These three tables can generally be generated automatically. If they are not generated, you can view the ActiveMQ startup log to solve the problem. The ER diagram of the generated three tables is as follows:
queue type validation and data table changes
queue type: point-to-point type
- When DeliveryMode is set to non_ During persistence, the message will not be persisted, but will be saved in memory.
- When DeliveryMode is set to PERSISTENCE, the message is saved in the file or database of the Broker response
Note: once a peer-to-peer message is consumed, it will be deleted from the Broker.
give an example:
- After we use queue mode PERSISTENCE (DeliveryMode is set to PERSISTENCE) to produce 3 messages, we will find that there are 3 more data in the ACTIVEMQ_MSGS data table.
- Start the consumer, consume all the messages, and find that the data in the data table disappears.
Summary:
- queue pattern: non persistence does not persist messages to the database.
- queue mode: persistence will persist the message to the database.
code
- Producer Code:
//Message producer for persistent queue public class JmsProduce_persistence { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME= "queue_persistence"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue=session.createQueue(QUEUE_NAME); MessageProducer messageProducer = session.createProducer(queue); // Set persistent queue messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // After setting the persistent queue, start the connection connection.start(); for (int i = 1; i < 4 ; i++) { TextMessage textMessage = session.createTextMessage("queue_name--" + i); messageProducer.send(textMessage); MapMessage mapMessage = session.createMapMessage(); } messageProducer.close(); session.close(); connection.close(); System.out.println(" **** QUEUE_NAME Message sent to MQ complete ****"); } }
- Consumer code:
// Message consumer for persistent queue public class JmsConsummer_persistence { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String QUEUE_NAME = "queue_persistence"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); // Set the client ID. Register your name with the MQ server connection.setClientID("AISMALL_QUEUE01"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue=session.createQueue(QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); connection.start(); while(true){ TextMessage message = (TextMessage)messageConsumer.receive(); if (null != message){ System.out.println("****Consumer message:"+message.getText()); }else { break; } } messageConsumer.close(); session.close(); connection.close(); } }
topic type validation and data table changes
be careful:
- To start the consumer of the persistent topic, you can see ActiveMQ_ There will be one more message in the acks data table
- ACTIVEMQ_ The acks data table contains the identity information of a consumer, and a record represents the consumer of a persistent topic
give an example:
-
We started the persistence producer to release three data, ActiveMQ_ Three new data items are added to the msgs data sheet.
-
After consumers consume all the data, ActiveMQ_ The data in the msgs data table did not disappear.
-
No matter whether the message of persistent topic is consumed or not, the generated data always exists and only one message is stored.
-
Note: persistent topic may cause performance degradation after large storage. Here is just like the official account. After the consumer consumes, the message will remain.
code
- Producer Code:
public class JmsProduce_persistence { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "topic_persistence"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer messageProducer = session.createProducer(topic); // Set persistent topic messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // After setting the persistent topic, start the connection connection.start(); for (int i = 1; i < 4 ; i++) { TextMessage textMessage = session.createTextMessage("topic_name--" + i); messageProducer.send(textMessage); MapMessage mapMessage = session.createMapMessage(); } messageProducer.close(); session.close(); connection.close(); System.out.println(" **** TOPIC_NAME Message sent to MQ complete ****"); } }
- Consumer code:
// Message consumers of persistent topic public class JmsConsummer_persistence { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "topic_persistence"; public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); // Set the client ID. Register your name with the MQ server connection.setClientID("AISMALL_TOPIC01"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); // Create a topic subscriber object. The first parameter is topic and the second parameter is the subscriber name TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark..."); connection.start(); Message message = topicSubscriber.receive(); while (null != message){ TextMessage textMessage = (TextMessage)message; System.out.println(" Persistence received topic : "+textMessage.getText()); message = topicSubscriber.receive(); } session.close(); connection.close(); } }
summary
If queue
The message will be saved to activate without being consumed by the consumer_ In the msgs table, these messages will be deleted as long as they are consumed by any consumer.
If topic
Generally, the consumer subscription is started first, and then the message will be saved to ActiveMQ in the case of production_ In the acks table
Points to note during development: when configuring a relational database as a persistence scheme for ActiveMQ:
-
For the database jar package, remember to put the database jar package in the lib directory of the ActiveMQ installation directory. If you use the database connection pool, you should also put the connection pool jar package in it.
-
createTablesOnStartup property. This property is set to true when starting for the first time. It is also true by default, and then set to false.
JDBC Message Store with ActiveMQ Journal
summary
This method overcomes the shortcomings of JDBC Store. Every time a message comes from JDBC, you need to write to and read from the library. ActiveMQ Journal uses cache write technology to greatly improve the performance.
When the speed of consumers can keep up with the production speed of producer messages in time, the journal file can greatly reduce the messages that need to be written to the DB.
for instance:
- The producer produces 1000 messages, which will be saved to the journal file. If the consumer consumes more than 90% of the messages before the journal file is synchronized to the DB, only the remaining 10% of the messages need to be synchronized to the dB at this time.
- If the speed of the consumer is very slow, the journal file can make the messages written to the DB in batch.
For high performance, this method uses log file storage + database storage. First persist the messages to the log file, wait for a period of time, and then persist the unused messages to the database. This method has higher performance than pure JDBC.
to configure
Modify the ActiveMQ configuration file ActiveMQ XML, based on the above JDBC configuration, make another modification:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/> </persistenceAdapter>
- Modify the following to change the Adapter to Factory:
<persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data"/> </persistenceFactory>
summary
1. jdbc efficiency is low, kahaDB efficiency is high, and jdbc+Journal efficiency is high.
2. Persistent messages mainly refer to the mechanism that messages will not be lost when the server where MQ is located goes down.
3. Evolution process of persistence mechanism:
- From the original AMQ Message Store scheme to the High Performance Journal (high performance transaction support) attachment exited from ActiveMQ V4 version, and synchronously launched the storage scheme for relational databases.
- ActiveMQ5. Version 3 also introduced support for KahaDB (it was used as the default persistence scheme after version 5.4),
- Later, ActiveMQ version 5.8 began to support LevelDB, and now 5.9 provides a standard Zookeeper+LevelDB clustering scheme.
4. ActiveMQ message persistence mechanisms include
AQM | Based on log file |
---|---|
KahaDB | Based on the log file, from activemq5 four |
Start default use | |
JDBC | Based on a third-party database, such as MySQL |
Replicated LevelDB Store | Starting from 5.9, LevelDB and zookeeper data replication methods are provided for the preferred data replication scheme of master slave mode |