1. Preface
RabbitMQ was actually the first MQ framework that I came into contact with. I remember that I went to the library to see it by myself when I was in college. Since the English of RabbitMQ website was not too difficult, it was also learned by referencing the website. There were six chapters, which were developed by Node at that time, and I spent the afternoon reading and understanding them.Now looking back, I find I've forgotten about it. Now I'll look back and see it again, as it should be remembered.In case of forgetting again, readers should have a certain MQ base.
2. RabbitMQ
The first thing we need to know is that RabbitMQ is based on Advanced Queue Protocol (AMQP), which is written by Elang. Here we will focus on RabbitMQ queue, switch and RPC.
2.1, Queue
Where messages are stored, multiple producers can send messages to a queue, and multiple consumers can consume messages from the same queue.
Note: When multiple consumers listen on a queue, only one consumer is consumed by the producer sending a message to the queue, and the consumer consumes in the order in which the consumer initiates a round-robin.
2.2. Consumers
The party consuming the message
public class Send { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); }catch (Exception e){ e.printStackTrace(); } } }
public class Recv { private final static String QUEUE_NAME = "hello"; private final static String IP = "172.16.12.162"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }catch (Exception e){ e.printStackTrace(); } } }
2.3. Summary
1. How does Rabbit ensure that messages are consumed?
A: via ACK mechanism.Whenever a message is consumed by the consumer, the consumer can send an ACK to RabbitMQ, so RabbitMQ knows that the message has been consumed completely and can be delete d.; if a message is consumed but no ack is sent, RabbitMQ will think it needs to be re-consumed at this time, and if there are other consumers at this time, RabbitMQ will give the message to it for processing.
Note: The ack mechanism is turned on by autoAck=false;
2. How do messages persist?
- Persist queue by setting channel.queueDeclare (QUEUE_NAME, true, false, null); the second parameter durable is true
- Set message persistence, that is, set MessageProperties.PERSISTENT_TEXT_PLAIN
Note: Message persistence does not necessarily guarantee that messages will not be lost
3. How does RabbitMQ prevent two consumers from being very busy and very idle?
With the following settings, a consumer is guaranteed to consume only one message at a time, and no new messages are sent to RabbitMQ until it has finished consuming and returned ack s.
int prefetchCount = 1 ; channel.basicQos(prefetchCount)
4. How can RabbitMQ exceptions ensure that messages are not consumed repeatedly?
RabbitMQ does not provide a better way to ensure that the business itself is secure.
2.2. Switches
In RabbitMQ, a producer never actually sends a message to a queue, or even does not know which queue the message was sent to.Where was it sent?That's the main point of this section: the switch. Here's its introduction in RabbitMQ.(X is the switch) The producer sends a message to the switch, which then forwards the message to the queue.
The question arises from the figure above: How does X send a message to queue?Does it send messages to all queues or to a specified queue or discard messages?That's the type of switch you see.Let's talk about these types
2.2.1,fanout
fanout: Broadcast mode, which understands better that all queues receive messages from the switch.
As above, both queues can receive messages from the switch.
2.2.2,direct
This mode is equivalent to the Publish/Subscribe mode, where we need to set two parameters when the switch type is direct:
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"); the second parameter, which we can call routeKey
- channel.queueBind(queueName, EXCHANGE_NAME, ""); the third parameter, which we call bindKey
With these two parameters, we can specify which messages we subscribe to.
As shown in the figure, Q1 subscribes to orange's messages and Q2 subscribes to black, green's messages.
2.2.3,topic
In fact, top and direct are a bit similar, which is equivalent to enhancing direct.In direct, the bind routeKey we mentioned above is black, green, it is limited, it can only be absolutely equal to the routeKey, but sometimes our needs are different, we may want a regular match, then Topic comes in handy.
When the type is topic, its bindKey corresponding string needs to be split by'.', and RabbitMQ also provides two symbols:
- Asterisk (*): Indicates a word
- Well sign (#): 0, more than one word
The image above means that all messages with the second word orange send a Q1, all messages with the last word rabbit or the first word lazy send a Q2.
2.2.4,header
There is not much explanation for this type of official demo, nor is it studied here.
2.3,RPC
RabbitMQ also implements RPC (remote procedure call).What is RPC is simply a local call to the remote method.In RabbitMQ, the Client sends a request message, which is returned to the Client after the Server process is complete.Here's a question? How does Server return a response to Client, where RabbitMQ defines a concept: Callback Queue.
Callback Queue
Note that this queue is unique String replyQueueName = channel.queueDeclare().getQueue();.
The first thing we need to understand is why we need this queue?We know that when RabbitMQ is queuing messages, Client simply drops the message into the queue and Server retrieves it from the queue.But one more thing about RabbitMQ as an RPC is that Client still needs to return the result, and then how does the server know to send the message to Client, which is what Callback Queue does.
Correlation Id
As we know above that Server returns data to Client via Callback Queue, do you create a queue for each request?This is a waste of resources and RabbitMQ has a better solution.When we send a request, bind a unique ID (correlation Id), and then take out the ID to match the ID sent out when the message is processed and returned.So a Callback Queue is at the Client level, not at the request level.
Realization
The two most important concepts of implementing RPC with RabbitMQ are described above, whether the code is simple or pasted down.
client side
public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) throws Exception{ RPCClient fibonacciRpc = new RPCClient(); try { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (Exception e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } }
Server
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
3. Summary
Looking back at RabbitMQ this time, I understand the following RabbitMQ again, there is something to chew slowly.Of course, these are also examples of how to get started on the official website, so you can delve into them later if you have the opportunity.