1: HelloWorld simple mode
2: Work Queues work mode
3: Publish/SubScribe mode
4: Routing mode
5: Topics topic mode
6: Publish Comfirm release confirmation mode
Code demonstration
1: HelloWorld simple mode (switches use the default -- binding RoutingKey -- queue)
Consumers actively get messages through listeners
Create maven project and add dependencies
<!--appoint jdk Compiled version--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq Dependent client--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--A dependency on the operation file stream--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
View mq port number default 5672
ps -ef |grep rabbit
Create producer classes and write code
package com.mq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; //Simple mode public class Producer { //Queue name public static final String queue_name = "hello"; //Send a message public static void main(String[] args) throws IOException, TimeoutException { //Create factory ConnectionFactory connectionFactory = new ConnectionFactory(); //ip connectionFactory.setHost("192.168.6.100"); //port will go without writing // connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123"); //Create connection Connection connection = connectionFactory.newConnection(); //Create channel Channel channel = connection.createChannel(); /** * Generate a queue * 1.Queue name * 2.Is the message in the queue persistent? The default message is stored in memory * 3.Whether the queue is only used by one consumer or shared. true can be used by multiple consumers * 4.Whether to automatically delete the queue after the last consumer is disconnected. true: automatically delete * 5.Other parameters */ channel.queueDeclare(queue_name,false,false,false,null); String message="hello world"; /** * Send a message * 1.Send to that switch '' default * 2.Which key is the routing key * 3.Other parameter information * 4.The body of the message sent */ channel.basicPublish("",queue_name,null,message.getBytes()); System.out.println("Message sent"); channel.close(); connection.close(); } }
consumer
package com.mq; import com.rabbitmq.client.*; //Receive messages regardless of channel and connection public class Consumer { //Same name as producer private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.6.100"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("Waiting to receive message...."); //How to perform interface callback for consumption of pushed messages DeliverCallback deliverCallback=(consumerTag, delivery)->{ String message= new String(delivery.getBody()); System.out.println(message); }; //A callback interface for canceling consumption, such as when the queue is deleted during consumption CancelCallback cancelCallback=(consumerTag)->{ System.out.println("Message consumption interrupted"); }; /** * Consumer News * 1.Which queue to consume * 2.Whether to answer automatically after consumption is successful. true means to answer automatically. false means to answer manually * 3.Callback of unsuccessful consumption by consumers */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
2: Work Queues work mode
Compared with the simple model, there are multiple consumers
Message response
Automatic response: high efficiency and fast speed, and message loss will occur
Manual response: messages will be saved in the queue, and consumers will respond only after processing
Code before modification
//How to perform interface callback for consumption of pushed messages DeliverCallback deliverCallback=(consumerTag, delivery)->{ String message= new String(delivery.getBody()); System.out.println(message); //. . . Other codes //Parameter: 1 Identification, 2 Batch channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //A callback interface for canceling consumption, such as when the queue is deleted during consumption CancelCallback cancelCallback=(consumerTag)->{ System.out.println("Message consumption interrupted"); }; /** * Consumer News * 1.Which queue to consume * 2.Whether to answer automatically after consumption is successful. true means to answer automatically. false means to answer manually * 3.Callback of unsuccessful consumption by consumers */ //The manual response message failed and can be retransmitted boolean autoAsk = false; channel.basicConsume(QUEUE_NAME,autoAsk,deliverCallback,cancelCallback); //Only after the above code is executed can the message be deleted from the confrontation queue be answered
When two consumers receive messages, they will poll normally. If one consumer is abnormal, the other consumer will accept his message (the message will automatically rejoin the queue)
Persistence
Ensure that the messages sent by the message producer are not lost after the RabbitMQ service is stopped
Set queue persistence and message persistence
Code before modification
/** * Generate a queue * 1.Queue name * 2.Is the message in the queue persistent? The default message is stored in memory * 3.Whether the queue is only used by one consumer or shared. true can be used by multiple consumers * 4.Whether to automatically delete the queue after the last consumer is disconnected. true: automatically delete * 5.Other parameters */ channel.queueDeclare(queue_name,true,false,false,null); /** * Send a message * 1.Send to that switch '' default * 2.Which key is the routing key * 3.Other parameter information is persisted messageproperties PERSISTENT_ TEXT_ PLAIN * 4.The body of the message sent */ channel.basicPublish("",queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
You need to re create a queue, otherwise an error will be reported, because once the queue is created, you cannot modify whether it is persistent or not
Message distribution
Fair distribution: one of the consumer messages may be overstocked due to unfair polling
//Set fair distribution messages int prefetchCount = 1; //The default is 0 channel.basicQos(prefetchCount);
prefetchCount: prefetchCount: when writing other numbers, the channel will backlog messages. The self-test value should be competitive
Switch
The core idea of RabbitMQ messaging model is that messages produced by producers are never sent directly to queues. In fact, usually producers don't even know which queues these messages are delivered to.
On the contrary, the producer can only send messages to the switch. The work of the switch is very simple. On the one hand, it receives messages from the producer, on the other hand, it pushes them into the queue. The switch must know exactly how to process the received message. Should these messages be put in a specific queue, or should they be put in many queues, or should they be discarded. This depends on the type of switch.
Type: direct, topic, headers, fan out
An empty string indicates the default or unnamed switch: messages can be routed to the queue, which is actually specified by the routing key (binding key) binding key
Temporary queue
Whenever we connect to Rabbit, we need a new empty queue. Therefore, we can create a queue with * * random name * *, or let the server choose a random queue name for us. Secondly, once we disconnect the consumer, the queue will be deleted automatically.
Create a temporary queue as follows:
String queueName = channel.queueDeclare().getQueue();
3. Publish subscriber mode
Mass distribution, each consumer has its own queue
Code: create connection tool class
public class RabbitMqUtils { //Get a connected channel public static Channel getChannel() throws Exception{ //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.6.100"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
producer
public class EmitLog { public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //Mass messaging Scanner sc = new Scanner(System.in); System.out.println("Please enter information"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("logs", "", null, message.getBytes("UTF-8")); System.out.println("Producer sends message" + message); } } }
Consumer, copying a copy
public class ReceiveLogs01 { public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //Temporary queue String q1 = channel.queueDeclare().getQueue(); // String q2 = channel.queueDeclare().getQueue(); //Switch name and type channel.exchangeDeclare("logs","fanout"); //Bind queue name, switch name, connection name channel.queueBind(q1,"logs",""); // channel.queueBind(q2,"logs",""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("The console prints the received message"+message); }; channel.basicConsume(q1, true, deliverCallback, consumerTag -> { }); } }
Start the consumer first, you can create a binding to the column, and you can receive all messages when you start the producer to send messages
The first two: nameless switch
Subscriber mode: using fan out switch, mass sending
Routing mode: direct switch
Theme mode: theme switch
4. Routing mode
Multiple binding. The binding name can be set to send messages to that consumer
Consumer 1
//Accept log public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "disk"; //queue channel.queueDeclare(queueName, false, false, false, null); //binding channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("Waiting to receive message....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); message="Receive binding key:"+delivery.getEnvelope().getRoutingKey()+",news:"+message; System.out.println("Error log received"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Consumer 2
//Accept log public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "console"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); System.out.println("Waiting to receive message....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Receive binding key:" + delivery.getEnvelope().getRoutingKey() + ",news:" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
producer
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //Create multiple bindingkeys Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info","ordinary info information"); bindingKeyMap.put("warning","warning warning information"); bindingKeyMap.put("error","error error information"); //debug does not consume, so all messages received will be lost bindingKeyMap.put("debug","debugging debug information"); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("Producer sends message:" + message); } } } }
5. Thematic model
Compared with routing patterns, wildcards can be used for matching
An asterisk can replace a word
#Can replace zero or more words
Code
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); /** * Q1-->Bound is * String with 3 words in the middle (*. orange. *) * Q2-->Bound is * The last word is three words of rabbit (*. *. rabbit) * The first word is multiple words of lazy (lazy. #) * */ Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","By queue Q1Q2 Received"); bindingKeyMap.put("lazy.orange.elephant","By queue Q1Q2 Received"); bindingKeyMap.put("quick.orange.fox","By queue Q1 Received"); bindingKeyMap.put("lazy.brown.fox","By queue Q2 Received"); bindingKeyMap.put("lazy.pink.rabbit","Although two bindings are satisfied, they are only used by the queue Q2 Receive once"); bindingKeyMap.put("quick.brown.fox","Any binding that does not match will not be received by any queue and will be discarded"); bindingKeyMap.put("quick.orange.male.rabbit","If the four words do not match, any binding will be discarded"); bindingKeyMap.put("lazy.orange.male.rabbit","It's four words, but it doesn't match Q2"); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("Producer sends message" + message); } } } }
public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //Declare Q1 queue and binding relationship String queueName="Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("Waiting to receive message....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Receive queue:"+queueName+"Binding key:"+delivery.getEnvelope().getRoutingKey()+",news:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //Declare Q2 queue and binding relationship String queueName="Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("Waiting to receive message....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Receive queue:"+queueName+"Binding key:"+delivery.getEnvelope().getRoutingKey()+",news:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }