Use of JMS in SpringBook

Use of JMS in SpringBook

Abstract: This article belongs to the original, welcome to reprint, reprint please retain the source: https://github.com/jasonGeng88/blog>

All services in this paper are deployed in docker container mode

Current environment

  1. Mac OS 10.11.x

  2. docker 1.12.1

  3. JDK 1.8

  4. SpringBoot 1.5

Preface

Based on the previous article "A story tells you what a message queue is." Understanding the usage scenarios of message queues and related features. This paper mainly describes the use of message service in JAVA.

There are many kinds of technology choices about message queue in the market. If our code framework supports different message implementations, we must encapsulate the framework in a certain way on the premise of guaranteeing its high scalability.

In JAVA, that's not necessarily the case. Because JAVA has developed a set of standard JMS specifications. The specification defines a common set of interfaces and related semantics, and provides message services such as persistence, validation and transaction. Its main purpose is to allow Java applications to access existing message middleware. Just like JDBC.

Basic concepts

Before introducing the specific use, I will briefly introduce some basic knowledge of JMS. Here I intend to introduce it in three parts: connection of message queue (MQ), message sending and message receiving.

Here our technology selection is SpringBoot, JMS, ActiveMQ.

To better understand JMS, SpringBoot zero configuration is not used to build the project here.

MQ Connection

The first step in using MQ must be to connect MQ first. Because the JMS specification is used here, for any MQ that complies with the JMS specification, the corresponding ConnectionFactory interface will be implemented, so we only need to create a ConnectionFactory class, which implements MQ connection and encapsulates MQ parameters of a series of features.

Example: Here we take ActiveMQ as an example.

maven dependence:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.3.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
</dependencies>

Create ActiveMQ Connection Factory:

@Bean
public ConnectionFactory connectionFactory(){

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(ActiveMQ_URL);
    connectionFactory.setUserName(ActiveMQ_USER);
    connectionFactory.setPassword(ActiveMQ_PASSWORD);
    return connectionFactory;
    
}

message sending

The sending of messages is realized by JmsTemplate class in JMS core package, which simplifies the use of JMS, because it helps us to create and release resources when sending or receiving messages synchronously. It is not difficult to deduce from its function. It needs to refer to the connection factory we created above. The specific code is as follows:

@Bean
public JmsTemplate jmsQueueTemplate(){

    return new JmsTemplate(connectionFactory());
    
}

Once the JmsTemplate is created, we can call its methods to send messages. Here are two concepts to note:

  1. Where will the message be sent? -> That is to say, we need to specify the Destination of the sending queue, which is a JMS management object that can be stored and extracted in JNDI.

  2. What is the message body sent? -> The object of javax.jms.Message is implemented, similar to the Emote object of JAVA RMI.

Code example:

@Autowired
private JmsTemplate jmsQueueTemplate;

/**
 * Send the original message Message
 */
public void send(){

    jmsQueueTemplate.send("queue1", new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage("I'm the original news.");
        }
    });
    
}

Optimize: Of course, we don't have to create Message objects by using MessageCreator anonymous classes every time. The JmsTemplate class provides a method for converting object entities into Message objects automatically, convertAndSend (String destination Name, final Object message).

Optimize code examples:

/**
 * Sending messages are automatically converted into original messages
 */
public void convertAndSend(){

    jmsQueueTemplate.convertAndSend("queue1", "I am a message that is automatically converted.");
    
}

Note: With regard to message transformation, you can also customize the transformation content by implementing the MessageConverter interface

messages receiving

Now that we have finished sending messages, let's conclude by saying how the messages are received. Since a Message is sent to a specified destination in the form of a Message object, the reception of the Message will inevitably go to the specified destination to receive the Message. Here we use the way of listener to listen to the Message of the designated place, and use the annotation @JmsListener to set up the method of listener.

Code example:

@Component
public class Listener1 {

    @JmsListener(destination = "queue1")
    public void receive(String msg){
        System.out.println("The contents of the monitored information are as follows: " + msg);
    }
    
}

With the goal and method of monitoring, the listener has to be associated with MQ, so that it can work. There may be more than one listener here, and it would be inappropriate for each to establish a connection with MQ. So you need the concept of a listening container factory, the interface JmsListener ContainerFactory, which refers to the connection factory created above with MQ, which receives messages and distributes them to the specified listener. Of course, it also includes transaction management, resource acquisition and release, and exception conversion.

Code example:

@Bean
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {
    
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    //Set the number of connections
    factory.setConcurrency("3-10");
    //Reconnection interval
    factory.setRecoveryInterval(1000L);
    return factory;
    
}

scene

Code address: https://github.com/jasonGeng88/springboot-jms

With a basic understanding of JMS, let's use it in specific scenarios.

First, we need to start ActiveMQ, where we start it in a Docker container.

Start command:

docker run -d -p 8161:8161 -p 61616:61616 --name activemq webcenter/activemq

After successful startup, view the effect in ActiveMQ visual interface( http://localhost:8161):

Point-to-point model (single consumer)

Here's one of the most common scenarios in message queues, the point-to-point mode. The basic concepts are as follows:

  1. Each message can only be consumed by one Consumer. Once the message is consumed, it no longer exists in the message queue.

  2. There is no time dependence between the sender and the receiver, that is to say, when the sender sends a message, whether the receiver is running or not, it will not affect the message being sent to the queue.

  3. The recipient needs to reply to the queue successfully after receiving the message successfully.

Code implementation (to simplify the code, some of the code follows the above):

  • Application.java

@SpringBootApplication
@EnableJms
public class Application {

    ...

    /**
     * JMS Template class of queue
     * connectionFactory() Connect factories for ActiveMQ
     */
    @Bean
    public JmsTemplate jmsQueueTemplate(){
        return new JmsTemplate(connectionFactory());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

Annotation @EnableJms is set on the @Configuration class to declare support for JMS annotations.

  • Message Producer (PtpProducer.java)

@Component
public class PtpProducer {

    @Autowired
    private JmsTemplate jmsQueueTemplate;

    /**
     * Sending messages are automatically converted into original messages
     */
    public void convertAndSend(){
        jmsQueueTemplate.convertAndSend("ptp", "I am a message that is automatically converted.");
    }
}
  • Producer Call Class (PtpController.java)

@RestController
@RequestMapping(value = "/ptp")
public class PtpController {

    @Autowired
    private PtpProducer ptpProducer;

    @RequestMapping(value = "/convertAndSend")
    public Object convertAndSend(){
        ptpProducer.convertAndSend();
        return "success";
    }

}
  • Message Listening Container Factory

@SpringBootApplication
@EnableJms
public class Application {

    ...

    /**
     * JMS Queue Monitoring Container Factory
     */
    @Bean(name = "jmsQueueListenerCF")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        //Set the number of connections
        factory.setConcurrency("3-10");
        //Reconnection interval
        factory.setRecoveryInterval(1000L);
        return factory;
    }

   ...
   
}
  • MessageListener

@Component
public class PtpListener1 {

    /**
     * Message queue listener
     * destination Queue Address
     * containerFactory Monitor container factory, if there are more than two monitor container factories, need to be designated
     */
    @JmsListener(destination = "ptp", containerFactory = "jmsQueueListenerCF")
    public void receive(String msg){
    
        System.out.println("Point-to-point mode 1: " + msg);
        
    }
}

Demonstration

After starting the project, the message producer is invoked to send the message through the REST interface. The request is as follows:

curl -XGET 127.0.0.1:8080/ptp/convertAndSend

Consumer Console Information:

ActiveMQ console information:

List description:

  • Name: Queue name.

  • Number Of Pending Messages: Number of messages waiting for consumption.

  • Number Of Consumers: Number of consumers currently connected, because we use the connection pool method to connect, the initial number of connections is 3, so the display number is 3.

  • Messages Enqueued: The total number of messages entering the queue, including those out of the queue and those to be consumed, only increases.

  • Messages Dequeued: Messages out of the queue can be understood as the number of messages consumed.

Point-to-point model (multi-consumer)

Based on the above pattern of consumer consumption, a consumer may become a bottleneck when many producers send messages like a queue. So we need multiple consumers to share the consumption pressure (the consumption thread pool can solve some pressure, but after all, it can not be distributed on a single machine, so it is necessary to have multiple consumers). This also produces the following scenario.

code implementation

  • Add a new listener

@Component
public class PtpListener2 {

    @JmsListener(destination = Constant.QUEUE_NAME, containerFactory = "jmsQueueListenerCF")
    public void receive(String msg){
    
        System.out.println("Point-to-point mode 2: " + msg);
        
    }
}

Demonstration

Here we launch 10 requests to observe consumer consumption:

Because the listener container has a thread pool, the order of the listener consumption will be different in the actual consumption process.

Publish and Subscribe Mode

In addition to the point-to-point mode, publish-subscribe mode is also a common use of message queues. Imagine an instant messaging group where you send a message. All the people in this group (that is, those who subscribe to this group) will receive the message you send.

Basic concepts:

  1. Each message can have multiple consumers.

  2. There is a time dependence between publishers and subscribers. For a Topic subscriber, it must create a subscriber before it can consume the publisher's message.

  3. In order to consume messages, subscribers must keep running.

code implementation

  • Modify the JmsTemplate template class to support publish-subscribe functionality

@SpringBootApplication
@EnableJms
public class Application {

    ...

    @Bean
    public JmsTemplate jmsTopicTemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }
    
    ...

}
  • Message Producer (PubSubProducer.java)

@Component
public class PtpProducer {

    @Autowired
    private JmsTemplate jmsTopicTemplate;

    public void convertAndSend(){
        jmsTopicTemplate.convertAndSend("topic", "I am a message that is automatically converted.");
    }
}
  • Producer Call Class (PubSubController.java)

@RestController
@RequestMapping(value = "/pubsub")
public class PtpController {

    @Autowired
    private PubSubProducer pubSubProducer;

    @RequestMapping(value = "/convertAndSend")
    public String convertAndSend(){
        pubSubProducer.convertAndSend();
        return "success";
    }

}
  • Modify the DefaultJmsListenerContainerFactory class to support publish-subscribe functionality

@SpringBootApplication
@EnableJms
public class Application {

    ...

    /**
     * JMS Queue Monitoring Container Factory
     */
    @Bean(name = "jmsTopicListenerCF")
    public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1");
        factory.setPubSubDomain(true);
        return factory;
    }

   ...
   
}
  • Message listener (set up two subscribers here)

@Component
public class PubSubListener1 {

    @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF")
    public void receive(String msg){
        System.out.println("Subscriber 1 - " + msg);
    }
}

@Component
public class PubSubListener2 {

    @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF")
    public void receive(String msg){
        System.out.println("Subscriber 2 - " + msg);
    }
}

Demonstration

curl -XGET 127.0.0.1:8080/pubSub/convertAndSend

Consumer Console Information:

ActiveMQ console information:

summary

Here is only a brief description and use of SpringBoot and JMS integration. Detailed introduction can be seen in Spring's official documents. I have the honor to participate in the translation of Spring 5 initiated by Concurrent Programming Network. I mainly translated it. JMS Chapter of Spring 5 The content of JMS has detailed explanations for the basic concepts mentioned above. Interested ones can take a look at it. Of course, the translation level is limited. Good English suggestions should be read in the original.

Keywords: Java SpringBoot Spring Docker

Added by rahulephp on Tue, 25 Jun 2019 00:14:52 +0300