Message Middleware - Advanced Features of RabbitMQ

Preface

Before that, we introduced the installation of RabbitMQ, the comparison of major message middleware, the core concept of AMQP, the use of management console, and the Quick Start RabbitMQ. This chapter introduces the advanced features of RabbitMQ. It is introduced in two parts (top/bottom).

  • How can messages be delivered 100% successfully?
  • Explanation of Idempotency Concept
  • How to avoid the problem of repeated consumption of information during the peak period of business generated by mass orders?
  • Confirm confirmation message, Return return message

How to guarantee 100% success of message delivery?

1.1 What is reliability delivery at the production end?

  • Guarantee the successful delivery of the message
  • Ensuring the Successful Receiving of MQ Nodes
  • The sender receives a confirmation response from the MQ node (Broker)
  • Perfect message compensation mechanism

The first three steps do not necessarily guarantee 100% success. So add a fourth step.

BAT/TMD Internet Factory Solutions:
- Message Deposit, Marking Message Status
When sending a message, it is necessary to persist the message to the database and set a state for the message (not sent, sent, arrived). When the state of the message changes, a change needs to be made to the message. Do a round-robin operation for messages that have not arrived and resend them. The number of rotation training also needs to be limited by 3-5 times. Ensure that messages are sent successfully.

  • Delayed delivery of messages, double confirmation, callback check

The concurrent amount of business and message is also needed to decide which solution to adopt.

1.2 The first scheme:

Production End-Reliability Delivery

Illustration:

The blue part indicates that the producer is responsible for sending messages to the Broker side
Biz DB: Order database MSG DB: message data
In the face of small-scale applications, we can adopt the way of adding transactions to ensure the consistency of transactions. However, in the face of high concurrency in large factories, there is no additional transaction, the performance of transaction splicing is very serious, but to make compensation.

For example, send an order message as follows.

Step 1: Store order messages (create orders), store business data, and store messages. Disadvantage: You need to persist twice. (status:0)
Step 2: Send messages on the premise that step 1 succeeds
Step 3: Broker receives the message and confirms it to our production side. Confirm Listener asynchronously listens for messages sent back by Broker.
Step 4: Grab the specified message and update it (status=1) to indicate that the message has been delivered successfully.

Step 5: Distributed timing tasks get message status and fetch data if it is equal to 0.
Step 6: Resend the message
Step 7: Retry limit set 3 times. If the message is retried three times or fails, then (status=2), the message is considered to be a failure.

Why querying these messages fails may require manual query.

Assuming that step 2 is successfully executed, step 3 is due to a network outage. So confirm will never receive messages, so we need to set a rule:
For example, when a message is put into storage, a critical value timeout = 5 minutes is set, and when it exceeds 5 minutes, the data is fetched out.
Or write a timed task that grabs the status=0 message every five minutes. There may be a minor problem: when a message is sent out, the timing task just happens to be executed, and the Confirm has not yet received, the timing task will be executed, resulting in the execution of the message twice.
More elaborate operation: message timeout tolerance restriction. If confirm does not receive a message within 2-3 minutes, it will be sent again.

  • To guarantee MQ, we wonder if the first reliable delivery is appropriate in high concurrency scenarios.

The first scheme stores data twice, business data once and message once. This is a bottleneck for data storage.
In fact, we only need to store business.

  • Delayed delivery of messages, double confirmation, callback check

This approach does not necessarily guarantee 100% success, but it also guarantees 99.99% success. If you encounter a particularly extreme situation, you have to compensate manually, or do regular tasks.
The second way is mainly to reduce the operation of the database.

Look at the second way:

Illustration:

Upstream service: production side
DownStream service: consumer side
Callback service: Callback service

Step 1: The first message is sent after the business message is successfully stored in the library.
Step 2: Similarly, the second message is sent after the message has been successfully stored in the library. Both messages are sent simultaneously. The second message is delayed check, which can be set to delay sending for 2 minutes and 5 minutes.
Step 3: The consumer listens for the specified queue.
Step 4: After the consumer has processed the message, a new message sent confirm is generated internally. Delivery to MQ Broker.
Step 5: Callback Service callback service listens for MQ Broker. If you receive a message sent by Downstream service, you can determine that the message was sent successfully and the execution message is stored in MSG DB.
Step 6: Check Detail checks for messages that listen for delayed step2 delivery. At this point, the two monitored queues are not the same. Five minutes later, the Callback service receives the message and checks the MSG DB. If you find that the previous message has been delivered successfully, you don't need to do anything else. If the check fails, Callback compensates and actively sends RPC communication. Notify the upstream producer to resend the message.

The goal of doing this is to save DB storage once. The focus is not on 100% delivery success, but on performance.

2. Idempotent Concept

What is 2.1 idempotency?

Idempotent (idempotent, idempotence) is a mathematical and computer concept, which is commonly found in abstract algebra, that is, f (x) = f (x). Simply put, the result of multiple executions of an operation is consistent with that of one execution.

  • We can learn from the optimistic locking mechanism of database:
  • For example, we execute an SQL statement to update inventory:
  • UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1

Ensure idempotency by adding version number Version.

2.2 Consumer End-Idempotency Guarantee

How to avoid the problem of repeated consumption of information during the peak period of business generated by mass orders?

In the case of high concurrency, a large number of messages will arrive at MQ, and consumers need to monitor a large number of messages. In this case, there will inevitably be repeated delivery of messages, network flash and so on. If we do not do idempotent, there will be repeated consumption of news.
- The realization of idempotency on the consumer side means that our message will never be consumed many times, even if we receive multiple identical messages, it will only be executed once.

Look at the idempotent operation of the mainstream of Internet manufacturers:
- Unique ID + fingerprint mechanism, using database primary key to de-duplicate.
- Atomic Realization Using Redis
- Other technologies achieve idempotency

2.2.1 Unique ID+Fingerprint Mechanism

  • Unique ID + fingerprint mechanism, using database primary key to de-duplicate.
    Guarantee uniqueness
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = Unique ID + Fingerprint Code
    If the query does not exist, add. There is no need to do any operation, the consumer does not need to consume information.
  • Benefits: Simple implementation
  • Disadvantage: Performance bottlenecks for database writing in high concurrency
  • Solution: Follow-up ID for subdatabase and subtable for algorithm routing
    Allocate flow pressure.

2.2.2 Redis Atomic Characteristic Realization

The easiest way to use Redis is to increase itself.

  • To use Redis for idempotency, we need to consider the problem.
  • Firstly, do we need data dropping out? If dropping out, the key problem is how to make the database and cache atomic?
    Adding transactions is not good, Redis and database transactions are not the same, can not guarantee simultaneous success and failure. What better plan do you have?
  • Second: If you do not drop out, then they are stored in the cache, how to set the timing synchronization strategy?
    How to achieve the stability of cached data?

3. Confirm confirmation message

Understand Confirm message validation mechanism:

  • Message confirmation means that after the producer delivers the message, if Broker receives the message, it will give us a response from the producer.
  • The producer receives a response to determine whether the message is sent to Broker properly, which is also the core guarantee of reliable delivery of the message.

Blue: producer producer red: MQ Broker server

The producer sends the message to the Broker side, which receives the message and sends it back to the producer. Confirm Listener listens for replies.

Operations are asynchronous operations, and when the producer has sent the message, there is no need to take care of it. Confirm Listener listens for MQ Broker's response.

3.1 How to implement Confirm confirmation message?

Step 1: Open confirmation mode on channel: channel.confirmSelect()
The second step is to add a listener on chanel: addConfirmListener, which listens for the return results of success and failure, and then sends the message again or records the log according to the specific results.

3.2 Coding:

Producer:


/**
 * 
* @ClassName: Producer 
* @Description: Producer
* @author Coder programming
* @date 2019 July 30, 2001, 21:27:02 a.m. 
*
 */
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 Create ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
    
        //2 Create a new Channel through Connection
        Channel channel = connection.createChannel();
        
        
        //3 Specify our message delivery mode: message confirmation mode 
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        
        //4 Send a message
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        
        //5 Add a confirmation listener for sending messages to the Broker end and then send back messages
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });
    }
}


Consumer:


/**
 * 
* @ClassName: Consumer 
* @Description: Consumer
* @author Coder programming
* @date 2019 21:32:02 a.m. on 30 July 2000 
*
 */
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 Get a connection 
        Connection connection = ConnectionUtils.getConnection();
        
        //2 Create a new Channel through Connection
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";
        
        //3 Declare switches and queues, then bind settings, and finally make routing keys
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //4. Creating Consumers 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            
            System.err.println("Consumer side: " + msg);
        }
        
        
    }
}


Tool class:


/**
 * 
* @ClassName: ConnectionUtils 
* @Description: Connection Tool Class
* @author Coder programming
* @date 2019 22:28:22 a.m. on 21 June 2000 
*
 */
public class ConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        //Define connection factories
        ConnectionFactory factory = new ConnectionFactory();
        //Setting Service Address
        factory.setHost("127.0.0.1");
        //port
        factory.setPort(5672);//amqp protocol port similar to mysql 3306
        //Setting account information, username, password, vhost
        factory.setVirtualHost("/vhost_cp");
        factory.setUsername("user_cp");
        factory.setPassword("123456");
        // Getting Connections Through Engineering
        Connection connection = factory.newConnection();
        return connection;
    }
}

Start the consumer side= and then start the production side

3.3 View console:

3.4 Print results:

It can be observed that the consumer receives the message first, and then the producer receives the callback information. If the disk is full, RabbitMQ is abnormal, queue capacity reaches the upper limit, it is possible to receive no ack.

If neither ack nor no ack message is received, that's what I said before. RabbitMQ suffers from network outage, which can be compensated by the message mentioned above.

4. Return message mechanism

  • Return Listener is used to process some non-routing messages!
  • Our message producer, by specifying an Exchange and Routing key, sends the message to a queue, and then our consumers monitor the queue for consumption processing operations!
  • But in some cases, if the current exchange does not exist or the designated routing key cannot be routed when sending a message, then if we need to listen for this unreachable message, we need to use Return Listener!

There is a key configuration item in the basic API:

  • Mandatory: If true, the listener receives a message that is not routed, and then proceeds with subsequent processing. If false, the broker automatically deletes the message!

4.1 Return message mechanism process

Producer producer sends messages to MQ Broker, but NotFind Exchange appears. Exchange of messages sent can not be found at Broker. Or it can be found, but the routing key route cannot reach the specified queue. So it's a wrong message.
At this point, the producer should know that the message sent will not be processed. So MQ Broker provides this Return mechanism to send these unreachable messages to the producer, which needs to set up a Return Listener to receive these unreachable messages. Then log in time to process these messages.

4.2 Code Demo

Producer:


/**
 * 
* @ClassName: Producer 
* @Description: Producer
* @author Coder programming
* @date 2019 July 30, 2000, 22:03:22 a.m. 
*
 */
public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 Create ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        String exchange = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "abc.save";
        
        String msg = "Hello RabbitMQ Return Message";
        
        
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
                System.err.println("---------handle  return----------");
                //Response Code
                System.err.println("replyCode: " + replyCode);
                //Response text
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        
        //The third parameter, mandatory=true, means that mq will not delete messages if the route is not reached, and false will delete messages automatically.
        channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        //Modify routingkey to test whether messages can be received
        //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    }
}

Consumer:


/**
 * 
* @ClassName: Consumer 
* @Description: Consumer
* @author Coder programming
* @date 2019 July 30, 2000, 22:33:34 a.m. 
*
 */
public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 Create ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("Consumer: " + msg);
        }
    }
}

The ConnectionUtils tool code is above.

Start the consumer side and view the console.

4.3 View Control Console

4.4 View print results

Open the consumer code: channel. basic Publish (exchange, routing Key, true, null, msg. getBytes ());
Consumer printing results:

You can see that the print results are normal, then change the code to:
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

You can see that the producer receives unreachable messages.

Keywords: Programming Database RabbitMQ Redis

Added by njdirt on Mon, 26 Aug 2019 10:54:53 +0300