Search: Java class representatives, pay attention to the official account, and get more Java dry cargo in time.
2 work queue
stay First tutorial In, we wrote two programs to send and receive messages from the specified queue. In this tutorial, we will create a work queue to distribute some "time-consuming" tasks to multiple worker s.
The idea behind the work queue (or task queue) is to avoid immediately processing tasks that consume resources and need to wait for the end of their operation (class representative note: in short, peak shaving). Instead, the task is scheduled to be performed later (note to class representatives: to put it bluntly, it is asynchronous execution). A working program running in the background will receive and execute the task. When you run multiple work programs, the tasks in the work queue will be shared by them.
This idea is very useful in web applications, because in web applications, complex tasks cannot be handled through a short http request window.
Preparation
In the previous tutorial, we sent a string message: "Hello World!". Next, we send some strings to represent the complexity of the task. We don't have complex tasks like image scaling and PDF file rendering in the real world, so let's use thread Sleep () method to pretend to be busy. Use the number of dotted points in the string as the complexity of the task: each dot represents a second of "work". For example, the string hello The delegate's task will take 3 seconds.
Send. In the previous example The Java code is slightly changed to allow arbitrary messages to be entered from the terminal. The application will schedule tasks to our work queue, so it is named: newtask java
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
Old recv The Java application also needs to make some changes: it needs to forge a second of work for each dot in the message. It will be responsible for receiving messages and processing tasks, so it is named worker java
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
Dummy tasks used to simulate execution time:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
Compile as in tutorial 1 (ensure that all required jar packages are in the working directory and the environment variable: CP is set):
javac -cp $CP NewTask.java Worker.java
Replace $CP with% CP under Windows, the same below—— Class representative note
Round robin dispatching
One of the advantages of using task queues is to facilitate scale out. Assuming that there is a backlog of tasks, we can add more worker programs to expand easily.
First, let's run two worker instances at the same time. They will all get messages from the queue, but how does it work? Let's explore it together.
You need to open three terminals. Two are used to run the worker program. These two will be consumers - C1 and C2
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
The third terminal is used to publish new tasks. After the consumer starts, several messages can be sent:
# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message..' java -cp $CP NewTask Third message... # => [x] Sent 'Third message...' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message....' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message.....'
Let's see what the terminal running worker Prints:
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
By default, RabbitMQ sends each message to the next consumer in order. Each consumer will be equally assigned to the same number of messages. This message distribution mechanism is called polling.
You can run several more worker instances to try by yourself.
Message acknowledgement
The task may take some time. Have you ever thought about what to do if the application hangs up before the task is completed? In our current code, once RabbitMQ distributes the message to the consumer, it will immediately mark the message as deleted. In this way, once the worker program is terminated, it will lose the messages it is processing and the messages it has received but has not started processing.
But we don't want to lose the task. If a worker application hangs, we hope that the tasks it handles can be handed over to other workers.
To ensure that messages are not lost, RabbitMQ provides Message confirmation mechanism . The message confirmation is sent back by the consumer, telling RabbitMQ that a specified message has been received and processed, and RabbitMQ can delete the message.
If a consumer hangs up without returning an ACK (the channel is closed, the link is closed, or the TCP connection is lost), RabbitMQ will think that the message has not been processed correctly and will re queue it. If other consumers are online at this time, RabbitMQ will quickly send the message to them. This ensures that messages will not be lost even if the worker suddenly hangs up.
The message will not time out: RabbitMQ will resend the message when a consumer hangs up. Even if it takes a long time to process a message, it doesn't matter.
Manual message confirmation
On by default. In the previous example, we turned it off by setting autoAck=true. Now we set the flag bit to false and let the worker send a confirmation message when the work is completed.
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
The above code can ensure that even if you use CTRL+C to stop a worker processing messages, you will not lose any messages. After the worker hangs up, unconfirmed messages will be posted again soon.
The sending of the confirmation message must be the same as the channel when receiving the message. If you try to use different channels to return confirmation, you will report a channel protocol exception. See for details Reference documents for confirmation mechanism
Forget to confirm
A common mistake is to forget to call basicAck. This simple mistake will lead to serious consequences. When your program finishes processing the message but forgets to send a confirmation, the message will be re delivered. RabbitMQ cannot delete the unconfirmed message, resulting in more and more memory consumption.
To facilitate troubleshooting of such problems, you can use rabbitmqctl tool to print messages_unacknowledged field:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Remove sudo under Windows:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message persistence
We have learned how to ensure that tasks are not lost when consumers hang up. However, if the RabbitMQ service stops, the task will still be lost.
If not configured, when RabbitMQ stops or crashes, it will lose existing messages in the queue. To avoid this situation, we need to set both the queue and the message to be persistent:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
Although the above command is correct, it does not work correctly at present. Because we have declared a non persistent queue named "hello" in RabbitMQ. RabbitMQ cannot modify the parameters of an existing queue. We can change our thinking, name a new queue and start the persistent queue, such as task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
The queueDeclare method whose persistence parameter is true needs to be added in both producer and consumer code.
At this point, we can be sure that even if RabbitMQ restarts, the task_ The queue will not be lost. Next, we set the value of MessageProperties to persistent_ TEXT_ Plan to set the message to persistent.
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Considerations for message persistence
Marking messages as persistent does not completely guarantee that messages are not lost. Although RabbitMQ was told to save the message to disk, there was still a small window period. RabbitMQ received the message but did not have time to save it. In addition, RabbitMQ does not perform fsync(2) on every message -- it may have just been written to the cache and not really written to disk. The persistence mechanism is not robust, but queues are sufficient for task s. If you need more reliable persistence, you need to use publisher confirms.
Fair dispatch
Polling distribution sometimes does not meet our needs. For example, in a scenario with only two workers, messages with odd numbers involve a lot of operations, while messages with even numbers are very simple. RabbitMQ does not know the difficulty of the message. It will only distribute it evenly to two workers.
This happens because RabbitMQ is only responsible for distributing the messages received in the queue and does not care about the number of unconfirmed messages by consumers. It just blindly sends the nth message to the nth consumer.
To solve this problem, we can call the basicQos method and set its parameter prefetchCount to 1. This will tell RabbitMQ not to send more than 1 message to the worker at the same time. In other words, don't distribute new messages to the worker until he returns a confirmation. In this way, RabbitMQ will send messages to other workers who are not busy.
int prefetchCount = 1; channel.basicQos(prefetchCount);
About queue size
If all workers are busy, the queue may be full. You need to monitor its size in real time, or increase the number of workers, or adopt other strategies (class representative note: for example, control the proportion of producers and consumers)
Putting it all together
Final newtask The Java code is as follows:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
Worker.java:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
The work queue is established using message acknowledgment and setting the prefetchCount parameter. Its persistence setting allows messages to remain after RabbitMQ is restarted.
For more information on Channel and MessageProperties, please visit: JavaDocs online.
Next we enter Tutorial 3 , learn how to send the same message to multiple consumers.
[recommendation of previous dry goods]
RabbitMQ tutorial 1 “Hello World”
Explain MySQL priority queue in simple terms (order by limit problem you will step on)
Is the downloaded attachment name always garbled? You should read the RFC document!
The codeword is not easy. Welcome to like and share.
Search: Java class representatives, pay attention to the official account, and get more Java dry cargo in time.