Message Oriented Middleware -- message rejection

In the previous section, I learned how consumers get messages

So when consumers get the message and find that the message should not be solved by themselves, or an exception occurs during processing, how should they deal with it

This is the message rejection in this section

1, Message reject

There are also two ways to Reject messages: Reject and Nack

Reject can reject only one message at a time, while Nack can reject multiple messages at one time

And either way, you can use the request ID

If it is false, it will not be sent again. Generally, this message will be discarded by RabbitMQ

If true, the message has been redelivered

//Reject (the second parameter here determines whether to redelivery)
channel.basicReject(envelope.getDeliveryTag(),true);

//Reject in Nack mode (the second parameter determines whether to batch)
channel.basicNack(envelope.getDeliveryTag(), false, false);
Complete consumer demo
public class RejectRequeuConsumer {

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,"direct");
        String queueName = "rejectrequeue";
        channel.queueDeclare(queueName,false,false,false,null);
        String routekey = "error"; 
        channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,routekey);

        System.out.println("waiting for message........");

        /*Declared a consumer*/
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                try{
                    String message = new String(body, "UTF-8");
                    System.out.println("Processing messages");
                    throw new RuntimeException("Handling exceptions"+message);
                }catch (Exception e){
                    e.printStackTrace();
                    //TODO Reject (the second parameter here determines whether to redelivery)
                    //channel.basicReject(envelope.getDeliveryTag(),true);

                    //Reject in TODO Nack mode (the second parameter determines whether to batch)
                    channel.basicNack(envelope.getDeliveryTag(), false, false);
                }
            }
        };
        /*The consumer officially starts consuming messages on the specified queue*/
        channel.basicConsume(queueName,false,consumer);
    }
}

2, Dead letter exchanger

In the above demonstration, when message rejection is used and request is set to false, the message will be discarded

This method is generally rough and is not recommended

In view of this situation, a dead letter exchanger is proposed to deal with dead messages

There are three situations in which a message becomes a dead letter:
1. The message is rejected and set requeue Parameter is false
2. Message expiration (by default) Rabbit Messages in do not expire, but you can set the expiration time of the queue and the expiration time of messages to achieve the effect of message expiration)
3. The queue reaches the maximum length (generally when the maximum queue length or size is set and reaches the maximum value)
 
The dead letter exchange is just an ordinary exchange
Only when declaring the primary switch, specify the dead letter switch. In this way, the dead letter switch will be sent to the dead letter switch for processing
public class MainConsumer {

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        
        //TODO binding dead letter exchanger
        /*Declare a queue and bind the dead letter exchange*/
        String queueName = "dlx_make";
        Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-dead-letter-exchange", DlxProcessConsumer.DLX_EXCHANGE_NAME);
        
        //TODO dead letter routing key will replace the original routing key of the message
        //args.put("x-dead-letter-routing-key", "deal");
        channel.queueDeclare(queueName,false,true,false,args);

        /*Binding: bind the queue and the switch through the routing key*/
        channel.queueBind(queueName,DlxProducer.EXCHANGE_NAME,"#");
        System.out.println("waiting for message........");

        /*Declared a consumer*/
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                //If TODO is zc's message confirmation
                if(envelope.getRoutingKey().equals("zc")){
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }else{
                    //If TODO is rejected by other messages (queue=false), it becomes a dead letter message
                    channel.basicReject(envelope.getDeliveryTag(),false);
                }

            }
        };
        /*The consumer officially starts consuming messages on the specified queue*/
        channel.basicConsume(queueName,false,consumer);
    }
}

Difference between and standby exchanger

1. If the standby switch is the primary switch that cannot route messages, the messages will be routed to the new standby switch, while the dead letter switch receives expired or rejected messages.

2. The standby switch is contacted when declaring the primary switch, while the dead letter switch is contacted when declaring the queue.

 

Keywords: Java RabbitMQ queue message queue

Added by Lauj on Mon, 31 Jan 2022 02:18:59 +0200