Advanced use
queue browser
You can view messages in the queue without consumption, and there is no subscription function
package com.zjw.activemq.browser; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; /** * @author zjwblog <co.zjwblog@gmail.com> * @version 1.0 * @date 2021/12/16 12:04 morning */ public class BrowserQueue { private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"; private static final String USERNAME = null; private static final String PASSWORD = null; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); QueueBrowser browser = session.createBrowser(new ActiveMQQueue("test")); Enumeration enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage textMessage = (TextMessage) enumeration.nextElement(); System.out.println("textMessage:" + textMessage.getText()); } conn.close(); } }
JMSCorrelationID
It is used for the association between messages, giving people a sense of conversation
http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
Example
package com.zjw.activemq.correlation; import com.zjw.activemq.delay.cron.Test; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author zjwblog <co.zjwblog@gmail.com> * @version 1.0 * @date 2021/12/16 12:20 morning */ public class CorrelationIDQueueReceiver { public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false" ); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(message -> { if(message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { System.out.println(msg.getJMSCorrelationID()); } catch (JMSException e) { e.printStackTrace(); } } }); } } package com.zjw.activemq.correlation; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author zjwblog <co.zjwblog@gmail.com> * @version 1.0 * @date 2021/12/16 12:13 morning */ public class CorrelationIDQueueSender { public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false" ); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage("Message from ServerA" ); message.setJMSCorrelationID("movie"); producer.send(message); // conn.close(); } }
JMSReplyTo
The sender can accept the address of message consumption confirmation, which is similar to synchronization
replyTo is set in the send message
message.setJMSReplyTo(queue);
The replyTo can be obtained at the receiving end, and then the confirmation information can be sent to the corresponding location
Destination replyTo = message.getJMSReplyTo();
The later QueueRequestor is implemented using replayTo, but the sending client adds a listener in a temporary queue by default, and then replayTo is set to this address, and the receiving end can reply confirmation information to this address to achieve the effect of synchronous call
QueueRequestor synchronization message
QueueRequestor sends synchronous messages, which essentially violates mq's asynchronous communication principle. However, mq can still provide the characteristics of application decoupling and heterogeneous systems, because after using QueueRequestor to send a message, it will wait for the reply from the receiving end. If it does not receive a reply, it will cause death and other phenomena! Moreover, this method does not set the function of timeout waiting.
Example of receiving end:
package com.zjw.activemq.sync; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; /** * @author zjwblog <co.zjwblog@gmail.com> * @version 1.0 * @date 2021/12/15 9:13 morning */ public class QueueRequestorReceiver { private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"; private static final String USERNAME = null; private static final String PASSWORD = null; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("test")); consumer.setMessageListener(message -> { System.out.println("Receive a message and send back a confirmation message."); try { Destination replyTo = message.getJMSReplyTo(); System.out.println("replyTo:" + replyTo); MessageProducer producer = session.createProducer(replyTo); producer.send(session.createTextMessage(replyTo.toString())); } catch (JMSException e) { e.printStackTrace(); } }); } }
Sender example:
package com.zjw.activemq.sync; import javax.jms.Queue; import javax.jms.QueueRequestor; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author zjwblog <co.zjwblog@gmail.com> * @version 1.0 * @date 2021/12/15 9:12 morning * * @description The sender will be blocked and will not go back until the server returns a response, which is equivalent to a synchronous request */ public class QueueRequestorSender { private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"; private static final String USERNAME = null; private static final String PASSWORD = null; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); ActiveMQConnection conn = (ActiveMQConnection)factory.createConnection(); // This can't be less, because you need to listen to the returned message. If you just send a message, you can write this conn.start(); QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); QueueRequestor requestor = new QueueRequestor(session, queue); TextMessage message = session.createTextMessage("Hello"); System.out.println("Send synchronization request"); TextMessage respMsg = (TextMessage)requestor.request(message); System.out.println("Request response received"); System.out.println("Response message content: [" + respMsg.getText() +"]"); } }
Several factors affecting performance in production environment
OOM
Configuring memory in activemq startup script
ACTIVEMQ_OPTS=-Xms1G -Xmx1G
Modify the profile and the percentage in the profile
<memoryUsage percentOfJvmHeap="70" />
The SystemUsage configuration sets some system memory and hard disk capacity. When the system consumption exceeds these capacity settings, amq will "slow down producer", which is very important.
Persistent and non persistent
Message asynchronous sending
It is recommended to use the default. If it is forced to open, messages may be lost
The scenario of asynchronously sending a lost message is: the producer sets UseAsyncSend=true and uses producer send (MSG) keeps sending messages. Since the message is not blocked, the producer will think that all sent messages have been successfully sent to MQ. If the server suddenly goes down, all messages in the memory of the producer that have not been sent to MQ will be lost.
The following methods can be set to enable:
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true)
Batch confirmation
ActiveMQ supports batch confirmation messages by default. Batch confirmation can improve system performance
Closing method:
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
Consumption buffer and message backlog prefetchSize
On the consumer side, generally speaking, the faster the consumption, the better, and the smaller the backlog of broker s, the better.
However, considering the transaction and client confirmation, if a consumer obtains many messages at one time but does not confirm them, the transaction context will become larger and there will be more data in the "semi consumption state" on the broker side. Therefore, ActiveMQ has a prefetchSize parameter to control the maximum number of records that can be pre obtained without confirmation.
Pre fetch defaults
consumer type | default value |
---|---|
queue | 1000 |
queue browser | 500 |
topic | 32766 |
durable topic | 1000 |
prefetchSize can be set in 3 ways
Overall settings when creating connections
String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false&jms.prefetchPolicy.all=50"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null, null, ACTIVEMQ_HOST);
Set topic and queue separately when creating a connection
String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false&jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null, null, ACTIVEMQ_HOST);
Set separately for destination
Destination topic = session.createTopic("test?consumer.prefetchSize=10");
Note: setting prefetchsize for destination will overwrite the setting value during connection
Is the message pushed or pulled?
When sending a message, it is pushed to the broker
When getting a message:
-
The default is to push one by one
-
Stop pushing messages when the prefetchSize of the customer is full
-
When customer prefetchSize==0, pull the message