1. High availability cluster construction scheme
Node A: synchronize messages with node B, node C, so node a, node B, node C can be used as consumer access nodes
Node B: as master node, as producer access node and consumer access node
Node C: as slave node of slave, as producer access node and consumer access node. When the master node is hung up, slave will be automatically converted to master node
2. Steps to build high availability cluster
Normally, the cluster should be built on three independent servers. This demonstration is only built on the same server
Extract the ActiveMQ installation package to three different folders: activemq-a, activemq-b, and activemq-c
Modify activemq-a/conf/activemq.xml
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>--> </transportConnectors> <networkConnectors> <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/> </networkConnectors>
Modify activemq-b/conf/activemq.xml
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>--> </transportConnectors> <networkConnectors> <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)"/> </networkConnectors>
Modify activemq-b/conf/jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8162"/> </bean>
Modify activemq-c/conf/activemq.xml
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>--> </transportConnectors> <networkConnectors> <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)"/> </networkConnectors>
Modify activemq-c/conf/jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8163"/> </bean>
3. Simple test with code
package com.codingos.springboot.activemq.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue mode producer cluster test */ public class AppProducer2 { private static final String url = "failover:(tcp://192.168.159.128:61617,tcp://192.168.159.128:61618)?randomize=true"; private static final String queueName = "queue-test1"; public static void main(String[] args) throws JMSException { // 1. Common ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2. Create Connection Connection connection = connectionFactory.createConnection(); // 3. Start connection connection.start(); // 4. Create session (1 Parameter: whether to process in transaction, 2 parameter: use auto answer mode) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. Create a target queue Destination destination = session.createQueue(queueName); // 6. Create producers MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7. Create message TextMessage textMessage = session.createTextMessage("Message " + i); // 8. Send message producer.send(textMessage); System.out.println("send message: " + textMessage.getText()); } // 9. Close connection connection.close(); } }
package com.codingos.springboot.activemq.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue mode consumer cluster test */ public class AppConsumer2 { private static final String url = "failover:(tcp://192.168.159.128:61616,tcp://192.168.159.128:61617,tcp://192.168.159.128:61618)?randomize=true"; private static final String queueName = "queue-test1"; public static void main(String[] args) throws JMSException { // 1. Common ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2. Create Connection Connection connection = connectionFactory.createConnection(); // 3. Start connection connection.start(); // 4. Create session (1 Parameter: whether to process in transaction, 2 parameter: use auto answer mode) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. Create a target queue Destination destination = session.createQueue(queueName); // 6. Create consumers MessageConsumer consumer = session.createConsumer(destination); // 7. Create a listener consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("receive messages: "+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
Finally, visit the ActiveMQ management interface of the three nodes to view relevant information
activemq-a http://192.168.159.128:8161
activemq-b http://192.168.159.128:8162
activemq-c http://192.168.159.128:8163
Relevant project codes can be referred to https://gitee.com/jayking/spring-boot-activemq