Chapter 4 RabbitMQ advanced in RabbitMQ Practical Guide

Chapter 4 RabbitMQ advanced (I) of RabbitMQ Practical Guide

1, Introduction

The previous chapter describes some basic concepts and usage methods, such as creating switches, queues and binding relationships. However, many details are not stated, some "pits" in the use process are not mentioned, and some advanced usages are not shown. Therefore, the content of this chapter is to make up for these shortcomings. Based on the basic knowledge of RabbitMQ, this chapter expounds some more distinctive details and functions, so as to provide a benchmark for readers to further master RabbitMQ

2, Where will the news go

mandatory and immediate are channel The two parameters in the basicpublish method have the function of returning the message to the producer when the destination is unreachable during message delivery. The Alternate Exchange provided by RabbitMQ can store messages that cannot be routed by the exchange (no binding queue or no matching binding) without returning them to the client

1.mandatory parameter

When the mandatory parameter is set to true, if the switch cannot find a qualified queue according to its type and routing key, RabbitMQ will call basic The return command returns the message to the producer. When the mandatory parameter is set to false, the above situation occurs, and the message is directly discarded

So how do producers get messages that are not routed correctly to the appropriate queue? At this time, you can call channel Addreturnlistener to add the ReturnListener listener implementation

The key codes for using the mandatory parameter are as follows:

channel.basicPublish(EXCHANGE_NAME,
						"",
						true,
						MessageProperties.PERSISTENT_TEXT_PLAIN,
						"mandatory test".getBytes()
						);
channel.addReturnListener(new ReturnListener(){
							public void handleReturn(int replyCode,
														String replyText,
														String exchange,
														String routingKey,
														AMQP.BasicProperties basicProperties,
														byte[] body
														) throws IOException
							{
								String message = new String(body);
								System.out.println("Basic.Return The result returned is:" + message);
							}
						);

In the above code, the producer does not successfully route the message to the queue. At this time, RabbitMQ will pass basic Return returns the message "mandatory test". Then the producer client listens to this event through ReturnListener. The final output of the above code should be "Basic.Return. The returned result is: mandatory test"

From the AMQP protocol level, the corresponding flow process is shown in Figure 4-1

2.immediate parameter

When the immediate parameter is set to true, if the switch finds that there are no consumers on the queue when routing the message to the queue, the message will not be stored in the queue. When all queues matching the routing key have no consumers, the message will pass through basic Return returns to the producer

In summary, the mandatory parameter tells the server to route the message to at least one queue, otherwise the message is returned to the producer. The immediate parameter tells the server that if there are consumers in the queue associated with the message, it will be delivered immediately. If there are no consumers in all matched queues, the message will be returned directly to the producer instead of waiting for consumers

RabbitMQ version 3.0 began to remove the support for the immediate parameter. RabbitMQ's official explanation is that the immediate parameter will affect the performance of the image queue and increase the code complexity. It is recommended to use TTL (expiration time) and DLX (dead letter queue) instead

3. Backup switch

The English name of the backup switch is Alternate Exchange, abbreviated as AE, or more frankly, it is called "spare tire switch". If the producer does not set the mandatory parameter when sending a message, the message will be lost if it is not routed; If the mandatory parameter is set, the programming logic of ReturnListener needs to be added, and the producer's code will become complex. If you do not want to complicate the producer's programming logic and do not want to lose messages, you can use a backup switch to store unrouted messages in RabbitMQ and process them when needed

This can be achieved by adding the alternate exchange parameter when declaring the switch (calling the channel.exchangeDeclare method), or by using the Policy method. If both are used at the same time, the former has higher priority and will override the Policy setting

The key codes for using parameter settings are as follows:

Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "myAe");
channel.exchangeDeclare("normalExchange", "direct", true, false, args);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueDeclare("unroutedQueue", true, false, false, null);
channel.queueBind("unroutedQueue", "myAe", "");

The above code declares two switches, normalExchange and myAe, which bind the two queues of normalQueue and unroutedQueue respectively, and sets myAe as the backup switch of normalExchange. Note that the switch type of myAe is fanout

Referring to Figure 4-2, if a message is sent to normalExchange at this time, when the routing key is equal to "normalKey", the message can be correctly routed to the queue of normalQueue. If the routing key is set to other values, such as "erroeKey", that is, the message cannot be correctly routed to any queue bound to normal exchange, it will be sent to myAe and then to the unroutedQueue queue

Similarly, if you use Policy to set the backup switch, you can refer to the following:

rabbitmqctl set_policy AE "^normalExchange$" `{"alternate-exchange": "myAE"}`

In fact, the backup switch is not much different from the ordinary switch. In order to facilitate use, it is recommended to set it to the fan out type. If the reader wants to set it to the direct or topic type, there is nothing wrong. It should be noted that the routing key when the message is re sent to the backup router is the same as that issued from the producer

Consider this situation. If the backup switch is of type direct and has a queue bound to it, assuming that the bound routing key is key1, when a message carrying the routing key key2 is forwarded to the backup switch, the backup switch does not match the appropriate queue, and the message is lost. If the routing key carried by the message is key1, it can be stored in the queue

For backup switches, the following special cases are summarized:

  • If the set backup switch does not exist, there will be no exceptions on the client and RabitMQ server, and the message will be lost
  • If the backup switch is not bound to any queue, there will be no exceptions on the client and RabbitMQ server, and the message will be lost
  • If the backup switch does not have any matching queues, there will be no exceptions on the client and RabbitMQ server, and the message will be lost
  • If the backup switch is used with the mandatory parameter, the mandatory parameter is invalid

3, Expiration time (TTL)

TTL, short for Time to Live, is the expiration time. RabbitMQ can set TTL for messages and queues

1. Set TTL of message

At present, there are two methods to set the TTL of messages. The first method is to set the queue property so that all messages in the queue have the same expiration time. The second method is to set the message itself separately, and the TTL of each message can be different. If the two methods are used together, the TTL of the message takes the smaller value between them. Once the lifetime of the message in the queue exceeds the set TTL value, it will become a "Dead Message", and the consumer will no longer receive the message (this is not absolute, you can refer to the dead letter queue in the next section)

The method of setting message TTL through queue properties is in channel The queuedeclare method is implemented by adding the x-message-ttl parameter. The unit of this parameter is milliseconds

The example code is as follows:

Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);

At the same time, TTL can also be set through Policy, for example:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

You can also set by calling the HTTP API interface:

If TTL is not set, this message will not expire; If TTL is set to 0, it means that the message will be discarded immediately unless it can be delivered directly to the consumer at this time. This feature can partially replace rabbit mq3 The reason why the immediate parameter before version 0 is partially replaced is that the immediate parameter will use basic when delivery fails Return returns the message (this function can be implemented with the dead letter queue)

The method of setting TTL for each message is in channel The attribute parameter of expiration is added to the basicpublish method. The unit is milliseconds

The key codes are as follows:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);		//Persistent message
builder.expiration("60000");	//Set TTL=60000ms
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,
						routingKey,
						mandatory,
						properties,
						"ttlTestMessage".getBytes()
						);

You can also use the following methods:

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("600000");
channel.basicPublish(exchangeName,
						routingKey,
						mandatory,
						properties,
						"ttlTestMessage".getBytes()
					);

You can also set through the HTTP API interface:

For the first method of setting the TTL attribute of the queue, once the message expires, it will be erased from the queue. In the second method, even if the message expires, it will not be erased from the queue immediately, because whether each message expires is determined before it is delivered to the consumer

Why are these two methods handled differently? In the first method, the expired messages in the queue must be in the queue head. RabbitMQ only needs to regularly scan the queue head for expired messages. In the second method, the expiration time of each message is different. If you want to delete all expired messages regularly, it is necessary to scan the whole queue, so you might as well wait until the message is about to be consumed to determine whether it expires. If it expires, you can delete it

2. Set TTL of queue

Via channel The x-expires parameter in the queuedeclare method controls how long the queue is unused before it is automatically deleted. Unused means that there are no consumers on the queue, the queue has not been redeclared, and basic has not been called within the expiration period Get command

The TTL in the setting queue can be applied to reply queues similar to RPC. In RPC, many queues will be created, but they are not used

RabbitMQ will ensure that the queue is deleted when the expiration time arrives, but it does not guarantee how timely the deletion action is. After RabbitMQ restarts, the expiration time of the persistent queue will be recalculated

The x-expires parameter used to indicate the expiration time is in milliseconds and subject to the same constraints as x-message-ttl, but cannot be set to 0 For example, if the parameter is set to 1000, it means that the queue will be deleted if it is not used within 1 second

The following code demonstrates creating a queue with an expiration time of 30 minutes:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

4, Dead letter queue

DLX, fully known as dead letter exchange, can be called dead letter exchange or dead letter mailbox. When a message becomes dead message in a queue, it can be re sent to another switch, which is DLX. The queue bound to DLX is called dead message queue

Messages become dead letters generally due to the following situations:

  • The message is rejected (Basic.Reject/Basic.Nack) and the request parameter is set to false
  • Message expiration
  • The queue has reached its maximum length

DLX is also a normal switch. It is no different from a general switch. It can be specified on any queue. In fact, it is to set the properties of a queue. When there is a dead letter in the queue, RabbitMQ will automatically republish the message to the set DLX, and then it will be routed to another queue, that is, the dead letter queue. You can listen to the messages in the queue for corresponding processing. This feature can be used in conjunction with setting the TTL of the message to 0 to compensate for the function of the immediate parameter

Through in channel Set the x-deat-letter-exchange parameter in the queuedeclare method to add DLX to the queue. The code is as follows:

channel.exchangeDeclare("dlx_exchange", "direct");		//Create DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
//Add DLX for queue myqueue
channel.queueDeclare("myqueue", false, false, false, args);

You can also specify a routing key for this DLX. If there is no special specification, the routing key of the original queue is used:

args.put("x-dead-letter-routing-key", "dlx-routing-key");

Of course, it can also be set through Policy:

rabbitmqctl set_policy DLX ".*" '{}"dead-letter-exchange":"dlx_exchange"}' --apply-to queues

Next, create a queue and set TTL, DLX, etc. for it. The code is as follows:

channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 10000);
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare("queue.normal", true, false, false, args);
channel.queueBind("queue.normal", "exchange.normal", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");
channel.basicPublish("exchange.normal",
						"rk",
						MessageProperties.PERSISTENT_TEXT_PLAIN,
						"dlx".getBytes()
						);

Two exchanges are created here Normal and exchange Dlx, bind two queues respectively Normal and queue dlx

As can be seen from the Web management page (Figure 4-3), both queues are marked with "D", which is the abbreviation of durable, that is, queue persistence is set. queue.normal is also configured with TTL, DLX and DLK, where DLX refers to the attribute x-dead-letter-routing-key

Referring to Figure 4-4, the producer first sends a message with the routing key "rk" and then passes through the exchange Normal is successfully stored in the queue Normal. Due to queue Normal sets the expiration time to 10s. If no consumer consumes this message within 10s, it is determined that this message is expired. Because DLX is set, when it expires, the message is lost to the exchange DLX, and exchange DLX matched queue DLS, the last message is stored in queue DLX is in the dead letter queue

DLX is a very useful feature for rabbit MQ. It can handle exceptions, Messages cannot be consumed correctly by consumers (the consumer calls Basic.Nack or Basic.Reject) and is placed in the dead letter queue. The subsequent analysis program can analyze the abnormal conditions encountered at that time by consuming the contents of the dead letter queue, so as to improve and optimize the system. DLX can also realize the function of delay queue by configuring TTL. See the next section for details

5, Delay queue

The object stored in the delay queue is the corresponding delay message. The so-called "delay message" means that when the message is sent, the consumer does not want to get the message immediately, but waits for a specific time before the consumer can get the message for consumption

There are many usage scenarios for delay queues, such as:

  • In the order system, a user usually has 30 minutes to pay after placing an order. If the payment is not successful within 30 minutes, the order will be handled abnormally. At this time, the delay queue can be used to process these orders
  • Users want to remotely control their smart devices at home through mobile phones and work at a specified time. At this time, the user instruction can be sent to the delay queue. When the set time of the instruction is up, the instruction can be pushed to the intelligent device

In AMQP protocol, RabbitMQ does not directly support the function of delay queue, but the function of delay queue can be simulated through DLX and TTL introduced earlier

In Figure 4-4, not only the usage of dead letter queue, but also the usage of delay queue, for queue Dlx, as a dead letter queue, can also be regarded as a delay queue. Suppose an application needs to set each message to a delay of 10 seconds, and the producer passes exchange Normal this switch stores the messages sent in the queue Normal is in this queue. Consumers do not subscribe to queue Normal this queue, but queue Dlx this queue. When a message is sent from queue Normal is stored in the queue after expiration In the DLX queue, consumers happen to consume this message with a delay of 10 seconds

In real applications, the delay queue can be divided into multiple levels according to the length of delay time, generally divided into five seconds, 10 seconds, 30 seconds, 1 minute, 5 minutes, 10 minutes, 30 minutes and 1 hour. Of course, it can also be further refined

Referring to figure 4-5, in order to simplify the description, only four levels of 5 seconds, 10 seconds, 30 seconds and 1 minute are set here. According to different application requirements, when sending messages, producers set different routing keys to send messages to different queues bound to the switch. Here, the expiration time is set to 5 seconds, 10 seconds, 30 seconds and 1 minute respectively. At the same time, DLX and corresponding dead letter queues are also configured respectively. When the corresponding message expires, it will be transferred to the corresponding dead letter queue (i.e. delay queue), so that consumers can select delay queues with different delay levels for consumption according to their own business conditions

6, Priority queue

Priority queue, as the name suggests, queues with high priority have high priority, and messages with high priority have the privilege of being consumed first

This can be achieved by setting the x-max-priority parameter of the queue. The example code is as follows:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue.priority", true, false, false, args);

The identification of "Pri" can be seen through the Web management page, as shown in Figure 4-6

The above code demonstrates how to configure the maximum priority of a queue. After that, you need to set the current priority of the message in the message when sending. The example code is as follows:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority", "rk_priority", properties, ("message").getBytes());

In the above code, the priority of the message is set to 5. The default minimum is 0 and the highest is the maximum priority set for the queue. Messages with high priority can be consumed preferentially, which is also a prerequisite: if the consumption speed of consumers is greater than that of producers and there is no message accumulation in the Broker, setting the priority of sent messages has no practical significance. Because the producer is consumed by the consumer just after sending a message, it is equivalent to that there is only one message in the Broker. For a single message, priority is meaningless

7, RPC implementation

Generally, RPC in RabbitMQ is very simple. The client sends a request message, and the server replies to the response message. In order to receive the response message, we need to send a callback queue in the request message (refer to replyTo in the following code). The default queue can be used. The specific example code is as follows:

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
//then code to read a response message from the callback_queue...

The class BasicProperties involved in the code was explained in the previous chapter when describing sending messages. It contains 14 properties, and two properties are used here:

  • replyTo: usually used to set up a callback queue
  • correlationId: used to associate the request with the response after calling RPC

It would be very inefficient to create a callback queue for each RPC request, as in the code above. Fortunately, however, there is a common solution -- you can create a single callback queue for each client

This creates a new problem. For the callback queue, after it receives a reply message, it does not know which request the message should match. The correlationId attribute is used here. We should set a unique correlationId for each request. After that, when the callback queue receives the reply message, it can match the corresponding request according to this attribute. If the callback queue receives a reply message with unknown correlationId, it can be simply discarded

You might ask, why discard the location message in the callback queue instead of just seeing it as a failure? This can make up for this failure. Referring to figure 4-7, consider such a case, The RPC server may be sending to the callback queue (amq.gen-LhQzlgv3GhDOv8PIDabOXA) and acknowledging receipt of the requested message (messages in the rpc_queue) and then hang up. You only need to restart the RPC server. The RPC service will consume the requests in the rpc_queue again. In this way, there will be no case that the RPC server does not process the requests, and the RPC requests also need to ensure that they are idempotent (supplement: consumers usually process business logic first, and then use Basic.Ack to confirm that the message has been received, so as to prevent unnecessary loss of the message)

As shown in Figure 4-7, the processing flow of RPC is as follows:

  1. When the client starts, create an anonymous callback queue (the name is automatically created by RabbitMQ, and the callback queue in Figure 4-7 is amq.gen-LhQzlgv3GhDOv8PIDabOXA)
  2. The client sets two attributes for the RPC request: replyTo is used to inform the RPC server of the destination queue when replying to the request, that is, the callback queue; The correlationId is used to mark a request
  3. The request is sent to rpc_queue in queue
  4. RPC server listens to RPC_ The request in the queue. When the request arrives, the server will process it and send the message with the result to the client. The received queue is the callback queue set by replyTo
  5. The client listens to the callback queue. When there is a message, check the correlationId attribute. If it matches the request, it will be the result

The following is an example from the RabbitMQ official website. The RPC client calls the server method through RPC to obtain the corresponding Fibonacci value

The first is the key code of the server:

public class RPCServer{
	private static final String RPC_QUEUE_NAME = "rpc_queue";

	public static void main(String args[]) throws Exception {
		//The process of creating Connection and Channel is omitted
		channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
		channel.basicQos(1);
		System.out.println(" [x] Awaiting RPC requests");

		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,
										Envelope envelope,
										AMQP.BasicProperties properties,
										byte[] body
										) throws IOException
			{
				AMQP.BasicProperties replyProps = new AMQP.BasicProperties
															.BasicProperties
															.Builder()
															.correlationId(properties.getCorrelationId())
															.build();
				String response = "";
				try{
					String message = new String(body, "UTF-8");
					int n = Integer.parseInt(message);
					System.out.println(" [.] fib(" + message + ")");
					response += fib(n);
				} catch (RuntimeException e){
					System.out.pringlt(" [.] " + e.toString());
				} finally{
					channel.basicPublish("",
											properties.getReplyTo(),
											replyProps,
											response.getBytes("UTF-8")
										);
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
	}

	private static int fib(int n){
		if(n == 0) return 0;
		if(n == 1) return 1;
		return fib(n-1) + fib(n-2);
	}
}

The key codes of RPC client are as follows:

public class RPCClient{
	private Connection connecion;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	public RPCClient() throws IOException, TimeoutException{
		//The process of creating Connection and Channel is omitted
		replyQueueName - channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}
	
	public String call(String message) throws IOException,
												ShutdownSignalException,
												ConsumerCancelledException,
												InterruptedException
	{
		String response = null;
		String corrId - UUID.randomUUID().toString();

		BasicProperties props = new BasicProperties.Builder()
													.correlationId(corrId)
													.replyTo(replyQueueName)
													.build();
		channel.basicPublish("",
								requestQueueName,
								props,
								message.getBytes()
								);
		
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			if(delivery.getProperties().getCorrelationId().equals(corrId)){
				response = new String(delivery.getBody());
				break;
			}
		}
		return responsel
	}
	
	public void close() throws Exception{
		connection.close();
	}

	public static void main(String args[]) throws Exception{
		RPCClient fibRpc = new RPCClient();
		System.out.println(" [x] Requesting fib(30)");
		String response = fibRpc.call("30");
		System.out.pringln(" [.] Got '"+response+"'");
		fibRpc.close();
	}
}

Keywords: RabbitMQ

Added by crwtrue on Fri, 31 Dec 2021 02:23:51 +0200