ActiveMQ: security mechanism, message persistence

1, Security mechanism

1.1. Modify ActiveMQ XML file

Configure the < plugins > tag under the < broker > < / broker > tag.

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
		
		<!-- stay<broker>Configuration under label<plugins>label -->
		<plugins>
			<simpleAuthenticationPlugin>
				<users>
					<authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
				</users>
			</simpleAuthenticationPlugin>
		</plugins>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>

1.2. Configure user name and password

Without configuring the < broker > user name and password, to create a connection, you only need to specify the user name as activemqconnectionfactory DEFAULT_ User, specify the password as activemqconnectionfactory DEFAULT_ Password is enough.

// 1. Create connection factory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        ActiveMQConnectionFactory.DEFAULT_USER,
        ActiveMQConnectionFactory.DEFAULT_PASSWORD,
        "tcp://localhost:61616"
);

Looking at the source code, you can find that in fact, these two values are null, so at this time, it is equivalent to not configuring the user name and password.

public static final String DEFAULT_USER = null;
public static final String DEFAULT_PASSWORD = null;

At this time, if the user name and password are null to connect, an exception will be thrown.

Exception in thread "main" javax.jms.JMSSecurityException: User name [null] or password is invalid.
	at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:52)
	at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1403)
	at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1486)
	at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:329)

Modify the code to the following code to successfully connect.

// 1. Create connection factory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
        "admin",
        "admin",
        "tcp://localhost:61616"
);

2, Message persistence

2.1. KahaDB storage

KahaDB is the default persistence policy. All messages are added to one log file in sequence, and another index file points to the storage address of these logs. KahaDB storage is a solution for message persistence.

2.1.1 document description

In the data/kahadb directory, four files will be generated to complete message persistence:

  1. db.data: the index file of the message, which is essentially a B-tree. The B-tree is used as the index to point to DB - * Log file.
  2. db-*.log: store the message contents and write them in the order of appending. When the file size is 32M by default, it will increase automatically when the threshold is reached.
  3. db.redo: used for message recovery after restart.
  4. Lock: lock, which is written to the broker that currently obtains the read-write permission of kahaDB. It is used to compete for the read-write permission in the cluster environment.

2.1.2 characteristics

Characteristics of KahaDB:

  1. Store messages in append form.
  2. The message index is stored in B-Tree structure and can be updated quickly.
  3. JMS transactions are fully supported.
  4. You can limit the size of each data file when db-1 When the log reaches the threshold, DB-2 is created Log to store messages.

2.2. JDBC storage (Mysql8)

It is inefficient and generally not used in production environment, but it is convenient for us to view persistent data.

2.2.1 configuration file

Add beans under the < beans > tag

<!-- data source Bean -->
<bean id="mysql-source" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
	<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
	<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true&amp;serverTimezone=UTC&amp;characterEncoding=utf8&amp;useUnicode=true&amp;useSSL=false"/>
	<property name="username" value="root"/>
	<property name="password" value="root"/>
	<property name="maxTotal" value="200"/>
	<property name="poolPreparedStatements" value="true"/>
</bean>

Configure the persistence mode as JDBC, and modify the persistenceAdapter under the < broker > tab to:

<persistenceAdapter>
	<!-- The table will be created automatically, but you need to create your own database activemq-->
    <jdbcPersistenceAdapter dataSource="#mysql-source" createTablesOnstartup="true"/>
</persistenceAdapter>

2.2.2. Add Jar package

Add Jar package in lib directory
mysql-connector-java-8.0.19.jar
commons-pool2-2.6.0.jar
JAR package download address

2.2.3 database and data sheet

You need to manually create the database create database activemq;, The table is created automatically.
After ActiveMQ is started, the following three tables are automatically created

  • activemq_msgs: message content. Corresponding records will be added after sending and automatically deleted after consumption
    ID: self incrementing primary key
    CONTAINER: CONTAINER name
    MSGID_PROD: the primary key of the message sender client
    MSGID_SEQ: sequence of messages, msgid_prod+msg_seq can form the message ID of jms
    EXPIRATION: the EXPIRATION time of the message. The number of milliseconds is stored
    MSG: serialized binary data of message body
    PRIORITY: PRIORITY, from 0 to 9. The higher the value, the higher the PRIORITY
  • activemq_acks: used to store subscription relationships. If it is a persistent Topic, the subscription relationship between the subscriber and the server is saved in this table
    CONTAINER: destination of the message
    SUB_ Other cluster information systems
    CLIENT_ID: Subscriber client ID
    SUB_NAME: subscriber name
    SELECTOR: SELECTOR
    LAST_ACKED_ID: the id that records the consumed message.
    PRIORITY: PRIORITY
  • activemq_lock: lock table, which is used to compete for read and write permissions in the cluster environment.
    ID: primary key
    TIME: TIME
    BROKER_NAME: Broker name

Keywords: message queue ActiveMQ

Added by erth on Sun, 30 Jan 2022 11:25:04 +0200