ActiveMQ's three server high availability cluster construction scheme

1. High availability cluster construction scheme

Node A: synchronize messages with node B, node C, so node a, node B, node C can be used as consumer access nodes

Node B: as master node, as producer access node and consumer access node

Node C: as slave node of slave, as producer access node and consumer access node. When the master node is hung up, slave will be automatically converted to master node

2. Steps to build high availability cluster

Normally, the cluster should be built on three independent servers. This demonstration is only built on the same server

Extract the ActiveMQ installation package to three different folders: activemq-a, activemq-b, and activemq-c

Modify activemq-a/conf/activemq.xml

<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>

<networkConnectors>
	<networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>

Modify activemq-b/conf/activemq.xml

<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>
<networkConnectors>
	<networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)"/>
</networkConnectors>

Modify activemq-b/conf/jetty.xml

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
	<!-- the default port number for the web console -->
	<property name="host" value="0.0.0.0"/>
	<property name="port" value="8162"/>
</bean>

Modify activemq-c/conf/activemq.xml

<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
	<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
</transportConnectors>
<networkConnectors>
	<networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)"/>
</networkConnectors>

Modify activemq-c/conf/jetty.xml

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
	<!-- the default port number for the web console -->
	<property name="host" value="0.0.0.0"/>
	<property name="port" value="8163"/>
</bean>

3. Simple test with code

package com.codingos.springboot.activemq.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * Queue mode producer cluster test
 */
public class AppProducer2 {

	private static final String url = "failover:(tcp://192.168.159.128:61617,tcp://192.168.159.128:61618)?randomize=true";
	private static final String queueName = "queue-test1";

	public static void main(String[] args) throws JMSException {
		// 1. Common ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 2. Create Connection
		Connection connection = connectionFactory.createConnection();
		// 3. Start connection
		connection.start();
		// 4. Create session (1 Parameter: whether to process in transaction, 2 parameter: use auto answer mode)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 5. Create a target queue
		Destination destination = session.createQueue(queueName);
		// 6. Create producers
		MessageProducer producer = session.createProducer(destination);

		for (int i = 0; i < 100; i++) {
			// 7. Create message
			TextMessage textMessage = session.createTextMessage("Message " + i);
			// 8. Send message
			producer.send(textMessage);
			System.out.println("send message: " + textMessage.getText());
		}
		// 9. Close connection
		connection.close();
	}
}
package com.codingos.springboot.activemq.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * Queue mode consumer cluster test
 */
public class AppConsumer2 {
	
	private static final String url = "failover:(tcp://192.168.159.128:61616,tcp://192.168.159.128:61617,tcp://192.168.159.128:61618)?randomize=true";
	private static final String queueName = "queue-test1";

	public static void main(String[] args) throws JMSException {
		// 1. Common ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 2. Create Connection
		Connection connection = connectionFactory.createConnection();
		// 3. Start connection
		connection.start();
		// 4. Create session (1 Parameter: whether to process in transaction, 2 parameter: use auto answer mode)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 5. Create a target queue
		Destination destination = session.createQueue(queueName);
		// 6. Create consumers
		MessageConsumer consumer = session.createConsumer(destination);
		// 7. Create a listener
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				TextMessage textMessage = (TextMessage) message;
				try {
					System.out.println("receive messages: "+textMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
	}
}

Finally, visit the ActiveMQ management interface of the three nodes to view relevant information

activemq-a    http://192.168.159.128:8161

activemq-b    http://192.168.159.128:8162

activemq-c    http://192.168.159.128:8163

Relevant project codes can be referred to https://gitee.com/jayking/spring-boot-activemq

Keywords: Session xml Apache Jetty

Added by solus on Mon, 06 Apr 2020 03:38:17 +0300