[sleepy town] explore RabbitMQ -- actual Rabbitmq Client

order

There are two common clients related to rabbitmq development: the native API, com rabbitmq. The other is the springboot integration package org springframework. amqp. The latter encapsulates some technical details in the form of spring (annotation), which is easier to use, such as adding functions such as connection / channel management and concurrent queue. However, it is not suitable to really understand rabbitmq. This chapter starts with the native API.

This series explores RabbitMQ in practical applications in three chapters:

[sleepy town] explore RabbitMQ (I) -- build high availability cluster (HA)

[sleepy] explore RabbitMQ (II) -- actual Rabbitmq Client

[sleepy] explore RabbitMQ (III) -- actual combat SpringBoot AMQP

1, What is rabbitmq

First learn about AMQP (from Baidu Encyclopedia)

AMQP, or Advanced Message Queuing Protocol, is an application layer standard Advanced Message Queuing Protocol that provides unified messaging services. It is an open standard of application layer protocol and is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages, which is not limited by different products and different development languages of the client / middleware.

The biggest difference between AMQP and its predecessor JMS is that the former is a protocol and supports multilingual implementation, while the latter is limited to JAVA API. rabbitmq is one of the implementations of AMQP. It is essentially a message agent, which is responsible for receiving and forwarding messages. In the actual application scenario, it is mainly used for asynchronous, decoupling and peak elimination. Let's understand Producer, Consumer, Queue, Exchange, RoutingKey, etc. through practical application scenarios.

2, Application scenario - Asynchronous

In our actual development, multi-purpose distributed architecture, services are usually connected through interfaces, that is, service call chain. When a service with a long processing time appears in the service call chain, we need to consider encapsulating it as task asynchronous processing, so that the main process can return in advance. (this does not mean page ajax, but service asynchrony)

The flow chart is designed as follows:

The implementation code is as follows:

The Queue is responsible for temporarily storing messages in rabbitmq. Declaring the Queue is like creating a table in the database, which can be created in advance. (it is not recommended to put it together with sending messages)

public class QueueAdmin {
    public final static String QUEUE_NAME = "TaskQueue";

    public static Connection createConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.102");
        factory.setPort(6001);
        factory.setUsername("admin");
        factory.setPassword("admin");
        return factory.newConnection();
    }

    public static void initQueue() throws Exception {
        try(Connection connection = QueueAdmin.createConnection();
            Channel channel = connection.createChannel();) {
            //Declare queue. Parameters: queue name, whether the queue is persistent (not the data in the queue), whether it is exclusive, and whether it is automatically deleted, Other parameters (such as expiration time, dead letter queue, etc.)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("Initialize queue:" + QUEUE_NAME);
        }
    }

    public static void main(String[] args) throws Exception {
        QueueAdmin.initQueue();
    }
}

The Consumer is responsible for receiving and processing messages.

public class Consumer {
    public void recevedMessage() throws Exception {
        System.out.println("Waiting to receive message...");
        Connection connection = QueueAdmin.createConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                String message = new String(body, "UTF-8");
                doTask(message);
            }
        };
        //Define consumers. Parameters: queue name, auto answer or not, callback function
        channel.basicConsume(QueueAdmin.QUEUE_NAME, true, defaultConsumer);
    }

    public void doTask(String message) {
        try {
            Thread.sleep(5000);
            System.out.println(message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        Consumer consumer = new Consumer();
        consumer.recevedMessage();
    }
}

The Producer is responsible for sending messages.

public class Producer {
    public void doMainWork(int i) throws Exception {
        System.out.println("Process main flow" + i);
        sendMessage(i);
    }

    public void sendMessage(int i) throws Exception {
        String message = "Processing time-consuming tasks" + i;
        try(Connection connection = QueueAdmin.createConnection();
            Channel channel = connection.createChannel();) 
            //Publish message, parameters: switch exchange, routing key, routingKey, message attribute, message body
            channel.basicPublish("", QueueAdmin.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println("Send message:" + message);
        }
    }

    public static void main(String[] args) throws Exception {
        Producer producer = new Producer();
        for (int i = 0; i < 3; i++) {
            producer.doMainWork(i);
        }
    }
}

explain:

(1) Observe the direction of messages 1-4 in the flowchart. When there are multiple consumers in a queue, rabbitmq will send each message to the next consumer in order by default, so that each consumer can get approximately the same number of messages on average, which is called round robin dispatching. It can be inferred that the more consumers, the more messages they can consume at the same time.

(2) Declare queue (QueueAdmin line 17)

//Declare queue. Parameters: queue name, whether the queue is persistent (not the data in the queue), whether it is exclusive, and whether it is automatically deleted, Other parameters (such as expiration time, dead letter queue, etc.)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

The first two parameters are easy to understand, mainly the third and fourth parameters,

Exclusive: exclusive. true means that the queue is restricted to the current Connection. When the current Connection is disconnected, the queue will be deleted.

autoDelete: whether to delete automatically. true means to delete the queue when it is no longer used, that is, when all clients are disconnected.

(3) Define Consumer (line 15 of Consumer)

//Define consumers. Parameters: queue name, auto answer or not, callback function
channel.basicConsume(QueueAdmin.QUEUE_NAME, true, defaultConsumer);

Mainly understand the second parameter autoAck: automatic response. If true, it means that the Consumer will respond automatically when it receives a message. If false, it needs to manually call channel after successful business processing Basicack manual response. (described later)

In addition, messages have two statuses in the rabbitmq queue management page: ready and unacknowledged. In fact, there is another kind of deletion. The status flow is as follows:

The message is generated from the Producer and sent to rabbitmq. The status is ready, and then forwarded from rabbitmq to the Consumer. The status is changed to unacknowledged. Finally, the message is successfully received and replied by the Consumer. Rabbitmq deletes the message, either manually or automatically.

(3) Publish message (Producer line 12)

//Publish message, parameters: switch exchange, routing key, routingKey, message attribute, message body
channel.basicPublish("", QueueAdmin.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

Mainly understand the first to third parameters,

exchange: switch, routingKey: routing key

When exchange is an empty string, it indicates the default switch. At this time, the routing key must be the target queue. It can be simply understood that the producer sends messages directly to the target queue. (described later)

props: Message attribute, where persistent_ TEXT_ Plan indicates message persistence.

3, Application scenario - decoupling

When multiple non main business interfaces need to be called in the service call chain of a main business, such as logging, sending real-time report data, etc. We need to consider whether we can change multiple non main business interface calls into publish / subscribe message model and separate them from the main business, which is decoupling.

The first step is optimization. We try to change the calling log interface and sending real-time report data interface to message sending.

channel.basicPublish("", "LogQueue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
channel.basicPublish("", "ReportQueue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

In this way, changing the non main business interface call to send messages only removes the interface dependency and will not affect the main business because of the interface call failure or timeout. However, the Producer is still coupled with multiple queues. If monitoring data needs to be sent at this time, the Producer must be modified to publish a message. This is not really decoupling, nor is it a publish subscribe pattern.

We need to add an intermediary between Producer and Queue, which is Exchange. The Producer only sends messages to Exchange, and the Exchange is responsible for receiving messages from the Producer and pushing them to multiple queues bound to it.

The second optimization is to send multiple queue messages to the switch instead.

The flow chart is as follows:

The code implementation is as follows:

public class QueueAdmin {
    public final static String QUEUE_NAME1 = "LogQueue";
    public final static String QUEUE_NAME2 = "ReportQueue";
    public final static String EXCHANGE_NAME = "TestExchage2";

    public static Connection createConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.102");
        factory.setPort(6001);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection =  factory.newConnection();
        return connection;
    }

    public static void initQueue() throws Exception {
        try(Connection connection = QueueAdmin.createConnection();
            Channel channel = connection.createChannel();) {
            //Declare switch, parameters: switch name, type, whether to persist
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
            //Declare queue. Parameters: queue name, whether the queue is persistent (not the data in the queue), whether it is exclusive, and whether it is automatically deleted,
            channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
            channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
            //Baoding queue, parameters: queue name, switch name, routingKey, routing key
            channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
            channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
            System.out.println("Initialize switch:"
                    + EXCHANGE_NAME + ",Queue:" + QUEUE_NAME1 + ", " + QUEUE_NAME2);
        }
    }

    public static void main(String[] args) throws Exception {
        QueueAdmin.initQueue();
    }
}
public class Producer2 {
    public void doMainWork(int i) throws Exception {
        System.out.println("Process main flow" + i);
        sendMessage(i);
    }

    public void sendMessage(int i) throws Exception {
        String message = "Mainstream business data" + i;
        try(Connection connection = QueueAdmin.createConnection();
            Channel channel = connection.createChannel();) {
            //Publish message, parameters: switch exchange, routing key, routingKey, message attribute, message body
            channel.basicPublish(QueueAdmin.EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println("Send message:" + message);
        }
    }

    public static void main(String[] args) throws Exception {
        Producer producer = new Producer();
        for (int i = 0; i < 3; i++) {
            producer.doMainWork(i);
        }
    }
}
public class LogConsumer {
    public void recevedMessage() throws Exception {
        System.out.println("Waiting to receive message...");
        Connection connection = QueueAdmin.createConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                String message = new String(body, "UTF-8");
                doTask(message);
            }
        };
        channel.basicConsume(QueueAdmin.QUEUE_NAME1, true, defaultConsumer);
    }

    public void doTask(String message) {
        System.out.println("Log:" + message);
    }

    public static void main(String[] args) throws Exception {
        LogConsumer consumer = new LogConsumer();
        consumer.recevedMessage();
    }
}
public class ReportConsumer {
    public void recevedMessage() throws Exception {
        System.out.println("Waiting to receive message...");
        Connection connection = QueueAdmin.createConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                String message = new String(body, "UTF-8");
                doTask(message);
            }
        };
        channel.basicConsume(QueueAdmin.QUEUE_NAME2, true, defaultConsumer);
    }

    public void doTask(String message) {
        System.out.println("Generate real-time report data:" + message);
    }

    public static void main(String[] args) throws Exception {
        ReportConsumer consumer = new ReportConsumer();
        consumer.recevedMessage();
    }
}

explain:

(1) What is RoutingKey?

The Producer sends the message to exchange, and the Consumer gets the message from the Queue. Exchange forwards the message to the Queue.

If one Exchange can correspond to multiple queues, how to determine which queues to send messages to?

The answer is RoutingKey, which is similar to the many to many intermediate table foreign key in the database and is responsible for binding queues and switches.

a. After declaring Queue and Exchange, bind Queue and Exchange together with RoutingKey.

//Baoding queue, parameters: queue name, switch name, routingKey, routing key
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");

b. When publishing a message, specify Exchange and RoutingKey, and match them to one or more queues in the previous binding relationship according to these two values.

(matching rules vary according to switch type)

//Publish message, parameters: switch exchange, routing key, routingKey, message attribute, message body
channel.basicPublish(QueueAdmin.EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

(2) When declaring a switch, specify its type (QueueAdmin line 20)

//Declare switch, parameters: switch name, type, whether to persist
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);

The second parameter is the switch type. There are four types: direct, fan out, topic and headers. Here are the first three commonly used.

a. fanout: the switch sends messages to all queues bound to it. That is, there is no need to match the queue to send, so the RoutingKey is meaningless to the fan out switch, so it is set as an empty string.

b. direct: set the RoutingKey to send messages, and the switch will send messages to the correct queue matching the RoutingKey specified when binding with it.

channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);//direct type switch
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "rk_test");//Exact match required_ test
channel.basicPublish(QueueAdmin.EXCHANGE_NAME, "rk_test", 
    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); //Specify rk_test send message

c. Topic: similar to direct, the switch sends messages to the correct queue matching the RoutingKey. The difference is that direct is an exact match and topic is a format match.

channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "com.xy.*"); //*Match a word
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "lazy.#"); //#Match multiple words
channel.basicPublish(QueueAdmin.EXCHANGE_NAME, "com.xy.rk_test", 
    MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); //Specify com xy. rk_ Test send message

Send the message according to the above code, queue 1 receives the message, but queue 2 does not. If RoutingKey is changed to lazy xy. rk_ Test, then queue 2 receives a message and queue 1 does not.

4, Application scenario - peak elimination

In a certain period of time, a large number of user requests pour in. When the speed of processing requests is much lower than that of requests, the database connection will be filled first, so the application server connection will also be filled, and the subsequent connections will be inaccessible in the end. At this time, we need to consider whether we can temporarily store the requests that need to be written to the database, and lag the processing according to the processing speed of the database, which is peak elimination,

The flow chart is as follows:

4000 concurrent write requests are converted into 4000 messages and written to rabbitmq. Rabbitmq has a single machine throughput of 10000. There is no problem receiving messages. We use four consumers and each Consumer consumes 100 messages at the same time (by default, blocking polling can be implemented and multithreading can be enabled to increase throughput). Therefore, the concurrent write pressure on the DB is up to 400. From the beginning, 4000 concurrent writes decreased to one tenth, which is peak elimination. (the maximum number of connections in MYSQL connection pool 5.7 is 16384, which is generally set to 5000. It is impossible to use it for one application scenario at the same time)

In order to realize the above design, we need to solve three problems.

1. How to "limit flow" messages in OrderQueue to consumers? (400 in the figure)

Automatic response is changed to manual response, and the number of messages not consumed is limited.

public class OrderConsumer {
    public void recevedMessage() throws Exception {
        System.out.println("Waiting to receive message...");
        Connection connection = com.example.mqtest2.QueueAdmin.createConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(100); //The maximum number of quality of service unacknowledged status messages is 100
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                try {
                    String message = new String(body, "UTF-8");
                    doOrder(message);
                } finally {
                    //Manual response, parameter: deliveryTag, multiple. If true, it means the unified response of messages until deliveryTag
                    channel.basicAck(envelope.getDeliveryTag(), false); 
                }
            }
        };
        channel.basicConsume(QueueAdmin.QUEUE_NAME2, false, defaultConsumer);
    }

    public void doOrder(String message) {
        try {
            Thread.sleep(5000);
            System.out.println("Process order:" + message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        OrderConsumer consumer = new OrderConsumer();
        consumer.recevedMessage();
    }
}

explain:

(1) As mentioned above, the message has multiple statuses (II. Application scenario - asynchronous description (3)). The status of the message sent to the queue is ready, the status of the message received by the client is unacknowledged, and the message is deleted after the client responds. Now we need to limit the number of messages that are not consumed by the channel, that is, to limit the number of messages in unacketed state.

 channel.basicQos(100); //The maximum number of quality of service unacknowledged status messages is 100

(2) After processing the business, you need to respond manually, preferably in finally.

//Manual response, parameter: deliveryTag, multiple. If true, it means the unified response of messages until deliveryTag
channel.basicAck(envelope.getDeliveryTag(), false); 

Both parameters need to be understood:

deliveryTag: each channel generates a deliveryTag serial number every time it receives a message, starting from 1 and adding 1 each time.

multiple: the default is false, which means that messages are answered one by one. If it is set to true, it means that the batch response in the channel includes deliveryTag and all previous messages.

If deliveryTag is 5 and multiple is true, the first 1 ~ 5 will be answered in batch; otherwise, one by one will be answered.

2. When each Consumer receives 100 messages at the same time, how to deal with concurrency?

The answer is not ideal. The rabbitmq native api does not have concurrency, but blocks execution in order. Why?

Let's look at channel Method call chain of basicconsume source code:

Therefore, the core method is addWork.

public void addWork(Channel channel, Runnable runnable) { //This method is triggered every time mq sends a message to the consumer
    if (this.workPool.addWorkItem(channel, runnable)) { //Blocking occurs here. The first call will wake up the channel to execute. Subsequent calls only add the callback method to the callback queue and do not execute
        this.executor.execute(new WorkPoolRunnable()); //Start the thread pool to execute the callback queue, which is only executed when the channel wakes up, that is, only one thread is executing at the same time.
    }
}
public boolean addWorkItem(K key, W item) { //key is the channel and item is the callback method
    VariableLinkedBlockingQueue<W> queue;
    synchronized (this) {
        queue = this.pool.get(key); //A channel corresponds to a callback queue
    }
    // The put operation may block. We need to make sure we are not holding the lock while that happens.
    if (queue != null) {
        enqueueingCallback.accept(queue, item); //Add the new runnable to the callback queue

        synchronized (this) {
            if (isDormant(key)) { //Wake up the channel and return true. If it has been awakened (e.g. it is ready), return false
                dormantToReady(key);
                return true;
            }
        }
    }
    return false;
}
private final class WorkPoolRunnable implements Runnable {

    @Override
    public void run() {
        int size = MAX_RUNNABLE_BLOCK_SIZE;
        List<Runnable> block = new ArrayList<Runnable>(size);
        try {
            Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); //Retrieve the callback method from the callback queue
            if (key == null) return; // nothing ready to run
            try {
                for (Runnable runnable : block) { //Loop execution callback method
                    runnable.run();
                }
            } finally {
                if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { //After execution, check whether a new callback method is added to the callback queue during the above execution
                    ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); //Recursively execute the callback method and keep checking the callback queue.
                }
            }
        } catch (RuntimeException e) {
            Thread.currentThread().interrupt();
        }
    }
}

explain:

(1) Receiving a message for the first time wakes up the execution thread (there is only one at most at the same time). The subsequent messages are only added to the blocking queue and will not start thread execution. In the execution thread, a batch of callback methods are taken out from the blocking queue each time for circular execution. After executing a batch of recursion, take out the next batch of execution. It can be seen that messages block execution in order in the Consumer.

(2) In this case, we can only consider opening the thread pool in each Consumer to execute specific services and increase throughput. In addition, concurrency can be set in the SpringBoot integrated client to support concurrency.

(3) At the same time, note that concurrency disrupts the message sequence. For some business scenarios that need to ensure the message sequence, special processing is required, such as adding the sending time to the message body. When receiving the message, judge that the message sending time is greater than the latest data modification time, otherwise it is an invalid message, etc.

3. When the pressure of DB is transferred to the queue of Rabbitmq, how to deal with the message backlog?

According to the design in the flow chart, 4000 concurrent messages are consumed by the Consumer at most 400 at the same time, so the pressure on the DB is small, but the remaining 3600 messages will be temporarily stored in the OrderQueue. mq can be used as a temporary database in a short time. Once the messages are overstocked to a certain extent, such as 10 million, we estimate how long a message can be consumed, 10 00000 * 5 (consume one message every 5s) / 400 (consume at most 400 concurrently) / 3600 (1 hour 3600 seconds) = 347 hours, that is, 14 days. The worst case is that if the remaining disk space of rabbitmq is less than 200M (default), any message will be rejected (the Consumer is normal). So how to deal with message backlog?

If such pressure is a common phenomenon, it can only increase the number of consumers and concurrency, and optimize the time of consuming a message, so as to improve consumption efficiency. If it is a sudden phenomenon, there are two ways to solve it:

(1) Set the dead letter queue, temporarily store the database and wait for compensation

First, set the message expiration time TTL (Time to Live), such as 600000 (10 minutes), and apply for dead letter exchange DXL (dead letter exchange). When the message expires, it will be forwarded from the normal queue to the dead letter exchange. Configure the dead letter queue and bind the dead letter switch (DXL), enable multiple consumers to receive the dead letter queue messages, and the consumers do not do any business processing after receiving the messages, temporarily store the messages in the database, and finally make periodic compensation (reissue the messages or directly call the interface). Note that the dead letter queue and the normal service queue consume messages at the same time, and the message order cannot be guaranteed.

public class QueueAdmin {
    public final static String QUEUE_NAME = "OrderQueue2";
    public final static String EXCHANGE_NAME = "TestExchage4";
    public final static String DXL_EXCHANGE_NAME = "DXLExchage";
    public final static String DXL_QUEUE_NAME = "DXLQueue";
    public final static String DXL_ROUTING_KEY = "DXLRoutingKey";
    public static Connection createConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.102");
        factory.setPort(6001);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection =  factory.newConnection();
        return connection;
    }

    public static void initQueue() throws Exception {
        try(Connection connection = QueueAdmin.createConnection();
            Channel channel = connection.createChannel();) {
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
            channel.exchangeDeclare(DXL_EXCHANGE_NAME, "direct",true);
            Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 600000); //Expiration time
            args.put("x-dead-letter-exchange", DXL_EXCHANGE_NAME); //Dead letter switch
            args.put("x-dead-letter-routing-key", DXL_ROUTING_KEY); //Dead letter routing key
            //Queue name, whether the queue is persistent (not the data in the queue), whether it is exclusive, and whether it is automatically deleted,
            channel.queueDeclare(QUEUE_NAME, true, false, false, args);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.queueDeclare(DXL_QUEUE_NAME, true, false, false, null);
            channel.queueBind(DXL_QUEUE_NAME, DXL_EXCHANGE_NAME, DXL_ROUTING_KEY);
            System.out.println("Initialize switch:"
                    + EXCHANGE_NAME + ",Queue:" + QUEUE_NAME);
        }
    }

    public static void main(String[] args) throws Exception {
        QueueAdmin.initQueue();
    }
}
public class DXLConsumer {
    public void recevedMessage() throws Exception {
        System.out.println("Waiting to receive message...");
        Connection connection = com.example.mqtest2.QueueAdmin.createConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: " + consumerTag);
                try {
                    String message = new String(body, "UTF-8");
                    execute(message);
                } finally {
                    System.out.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QueueAdmin.DXL_QUEUE_NAME, false, defaultConsumer);
    }

    public void execute(String message) {
        System.out.println("Write database wait message compensation");
        System.out.println("Message content:" + message);
    }

    public static void main(String[] args) throws Exception {
        DXLConsumer consumer = new DXLConsumer();
        consumer.recevedMessage();
    }
}

(2) Message move

If the pressure to rush into the queue is too great, there will be an excessive backlog before the message expires. At this time, you need to temporarily create a queue 10 times larger on the new rabbitmq node, and then write a message handling client to transport messages from the backlog queue to the new high-capacity queue. At the same time, you need to temporarily add consumption machines on both sides and increase consumption efficiency.

5, Prevent message loss

Compared with traditional interface calls, sending messages has the advantages of asynchrony, decoupling and peak elimination, but it increases the length of the call chain and the probability of error. For example, messages from producer - > MQ queue - > consumer may be lost in three places during this process.

1. The message was lost during forwarding from the Producer to the Rabbitmq queue.

Confirm listening is added at the sender to solve the problem.

public class Producer4 {
    private TreeSet<Long> unconfirmSet = new TreeSet();
    private Connection connection = null;
    private Channel channel = null;

    public void doConfirmWork() throws Exception {
        try {
            createConfirmChannel();
            for (int i = 0; i < 3; i++) {
                sendMessage(i);
            }
        } finally {
            closeConfirmChannel();
        }
    }

    public void sendMessage(int i ) throws Exception {
        String message = "need Confirm" + i;
        long nextSeqNo = channel.getNextPublishSeqNo();
        channel.basicPublish(QueueAdmin.EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
        unconfirmSet.add(nextSeqNo);
        System.out.println("send message..." + message);
    }

    public void createConfirmChannel() throws Exception {
        connection = QueueAdmin.createConnection();
        channel = connection.createChannel();
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("handleAck: deliveryTag="+ deliveryTag + ", multiple=" + multiple);
                if (multiple) {
                    unconfirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    unconfirmSet.remove(deliveryTag);
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("handleNack: deliveryTag="+ deliveryTag + ", multiple=" + multiple);
                if (multiple) {
                    unconfirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    unconfirmSet.remove(deliveryTag);
                }
            }
        });
    }

    public void closeConfirmChannel() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                //Confirm all or close the connection and channel after 60s
                long time = 0;
                while (!(unconfirmSet.size() == 0 || time > 60000)) {
                    try {
                        Thread.sleep(1000);
                        time += 1000;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    if (channel != null) {
                        channel.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void main(String[] args) throws Exception {
        Producer4 producer = new Producer4();
        producer.doConfirmWork();
    }
}

explain:

(1) The code is actually very simple to listen whether a message reaches the queue.

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
     ......
}

(2) The reason why the code in the above example is complex is that the channel and connection timing cannot be closed simply through finally, such as:

try(Connection connection = QueueAdmin.createConnection();
            Channel channel = connection.createChannel();) {
    ......
}

If the channel is closed in advance, the callback function handleAck and handleNack will not be executed. Therefore, we need to record the deliveryTag sent each time we send a message:

unconfirmSet.add(nextSeqNo);

Then, during the callback, remove the confirmed deliveryTag. Note here that instead of sending messages several times, mq will confirm the callback several times. For example, the deliveryTag sent is 1, 2, 3, 4, 5. The mq callback may only be once. The deliveryTag is 5, and multiple is true, indicating that all messages before 5 have been forwarded to the queue, Then you need to remove all deliveryTag from unconfirmSet. Of course, there may be callbacks one by one. In this case, multiple is false.

if (multiple) {
    unconfirmSet.headSet(deliveryTag + 1).clear();
} else {
    unconfirmSet.remove(deliveryTag);
}

Finally, when we close the channel and connection, the unconfirmSet becomes empty (both confirmed) or timeout.

 while (!(unconfirmSet.size() == 0 || time > 60000)) {

2. The message has arrived in the rabbitmq queue and is being consumed. At this time, rabbitmq is lost after being restarted.

(1) Queue persistence

//Queue name, whether the queue is persistent (not the data in the queue), whether it is exclusive, and whether it is automatically deleted,
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

(2) Switch persistence

//Switch name, switch type, switch persistence
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);

(3) Message persistence

//exchange name, routingkey, message persistence, message
channel.basicPublish("", QueueAdmin.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

3. Messages are lost during forwarding from the queue to the consumer.

Use the manual response method, as described above. (IV. application scenario - peak elimination-1)

//Queue name, auto answer or not, callback function
channel.basicConsume(QueueAdmin.QUEUE_NAME, false, defaultConsumer);
try {
......
} finally {
    //Manual response, parameter: deliveryTag, multiple. If true, it means the unified response of messages until deliveryTag
    channel.basicAck(envelope.getDeliveryTag(), false); 
}

4. rabbitmq does not configure the dead letter queue. In the following cases, the message is lost.

(1) Message rejected

(2) Message expiration

(3) The queue has reached its maximum length

The configuration of the dead letter queue and how to handle the dead letter have been described above. (IV. application scenario - peak elimination-3)

6, Prevent repeated consumption of messages

The interface timeout needs to be called repeatedly, and the message timeout or other reasons also need to be pushed again. At this time, the Producer will send the message repeatedly, and the Consumer will consume the message repeatedly. We usually use idempotent to prevent repeated consumption.

There are also many ways to do idempotent. Redis distributed lock is generally adopted. The idea is as follows:

(1) The message body contains a unique id msg_uniq_id

(2) When a message is received, MSG is displayed_ uniq_ ID as the key, try to get the result from redis. Key = "msg_result_"+ msg_uniq_id.

(3) If value is not empty, it means that there has been a request before, and the last successful result is returned.

(4) If value is empty, it means that no previous request has been made. In this case, use the redis setnx method, key = "msg_uniq_"+ msg_ uniq_ id.

(5) If setnx succeeds, the business will be processed normally. After success, the result will be saved in redis, key = "msg_result_"+ msg_ uniq_ id. Handle business exceptions and delete the key of setnx.

(6) If setnx fails, it indicates that there are concurrent requests. Only one is successful, and the others fail. At this time, the page returns a prompt that the processing fails. Please try again.

(7) Both the setnx key and the key storing the result need to set the timeout. The setnx key can be set shorter, such as 3 minutes, and the key storing the result can be set longer, such as 24 hours.

 

(unfinished)

 

 

 

 

Keywords: Java RabbitMQ Distribution Middleware message queue

Added by busin3ss on Sun, 23 Jan 2022 21:16:52 +0200