catalogue
target
Master the ideas of several common modes of RabbitMQ. This article will provide relevant code cases based on the RabbitMQ official website.
prepare
- Erlang environment;
- RabbitMQ corresponding to Erlang version;
- Open 5672 and 15672 ports.
Official website: RabbitMQ Erlang version requirementshttps://www.rabbitmq.com/which-erlang.html
Getting to know RabbitMQ
brief introduction
- Support multiple messaging protocols, message queues, delivery confirmation, flexible queue routing, and multiple exchange types.
- Deploy using BOSH, Chef, Docker, and Puppet. Develop cross language messaging using your favorite programming languages, such as Java NET, PHP, Python, JavaScript, Ruby, Go, etc.
- Distributed deployment for high availability and throughput;
- Pluggable authentication and authorization, supporting TLS and LDAP. Lightweight and easy to deploy in public and private clouds.
- Various tools and plug-ins supporting continuous integration, operational indicators and integration with other enterprise systems. A flexible plug-in method for extending RabbitMQ functionality.
- HTTP-API, command line tools, and UI for managing and monitoring RabbitMQ.
Official website: RabbitMQ functionhttps://www.rabbitmq.com/
Supported protocols
- AMQP 0-9-1
- STOMP
- MQTT
- AMQP 1.0
- HTTP and WebSockets (although HTTP is not a real messaging protocol, RabbitMQ can transmit messages through HTTP in three ways).
Official website: what protocols does RabbitMQ support?https://www.rabbitmq.com/protocols.html
Several common modes
Official website: RabbitMQ tutorialhttps://www.rabbitmq.com/getstarted.html
Hello World mode
Description: a producer (sender) sends a message, and a consumer (receiver) receives the message and prints it. This is the simplest model.
Illustration: where P is the producer, C is the consumer, and the red rectangle is the queue.
Work queues mode
Description: that is, work queue mode (task queue). In order to avoid long-time waiting for complex business execution, some tasks are encapsulated as messages and sent to the queue. These tasks are shared by these consumers.
Illustration: where P is the producer and C is the consumer.
Publish/Subscribe mode
Description: in the publish / subscribe mode, the producer can only send messages to the switch, and the consumer can specify the switch to subscribe to messages.
Illustration: where P is the producer, C is the consumer, and X is the switch (type: fanout). The producer does not directly send the message to the queue, but puts the message into the queue through the switch.
Routing mode
Description: i.e. routing mode, through routing_key (routing key or binding key) selectively receives messages. However, this mode cannot route based on multiple standards.
Illustration: where P is the producer, C is the consumer, and X is the switch (type: direct). The routing key of C1 has only error, so C1 only receives messages with the routing key of error.
Topics mode
Description: topic mode or wildcard mode. Compared with routing mode, topic mode is more flexible. Switches and queues are bound by wildcard style routing keys.
Wildcard:
- *Means to replace a word;
- #Represents the substitution of zero or more words;
Wildcard example:
routingKey of sender | Receiver 1 (*. henan. #) | Receiver 2 (*. henan. *) |
---|---|---|
china.hunan.changsh | ||
china.hunan.changsha.tianxinqu | ||
china.hunan.changsha.furongqu | ||
china.hunan.yueyang | ||
china.hunan.changde | ||
china.henan.zhengzhou.jinshuiqu | matching | |
china.henan.hebi | matching | matching |
china.henan.shangqiu | matching | matching |
china.beijing |
Illustration:
Code demonstration
Correlation dependency
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
Related tools
package com.rabbit.utils; public class RabbitConstant { public static final String QUEUE_SMS = "sms"; public static final String QUEUE_BAIDU = "baidu"; public static final String QUEUE_ALIBABA = "alibaba"; public static final String EXCHANGE_FANOUT_AIR_CONDITION = "fanout_air_condition"; public static final String EXCHANGE_DIRECT_AIR_CONDITION = "direct_air_condition"; public static final String EXCHANGE_TOPIC_AIR_CONDITION = "topic_air_condition"; }
package com.rabbit.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitUtils { private static ConnectionFactory connectionFactory = new ConnectionFactory(); static { connectionFactory.setHost("my IP"); connectionFactory.setPort(5672);//5672 is the default port number for RabbitMQ connectionFactory.setUsername("user name"); connectionFactory.setPassword("password"); connectionFactory.setVirtualHost("Virtual machine name"); } public static Connection getConnection(){ Connection conn = null; try { conn = connectionFactory.newConnection(); return conn; } catch (Exception e) { throw new RuntimeException(e); } } }
Hello World mode
producer
package com.rabbit.helloworld; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @date 2022/1/18 * @describe */ public class Producter { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); //Parameter 1: queue name //Parameter 2: persistent //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it. //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed. //Parameter 5: others. channel.queueDeclare("helloworld", false, false, false, null); //send message //Parameter 1: switch (switch can not be used in hello world mode) //Parameter 2: queue name //Parameter 3: additional attributes //Parameter 4: message String msg="This is the message I sent."; channel.basicPublish("", "helloworld", null, msg.getBytes()); channel.close(); connection.close(); System.out.println("Sent successfully"); } }
consumer
package com.rabbit.helloworld; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @date 2022/1/18 * @describe */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); //Parameter 1: queue name //Parameter 2: persistent //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it. //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed. //Parameter 5: others. channel.queueDeclare("helloworld", false, false, false, null); //Get data from MQ //Parameter 1: queue name //Diner 2: false indicates manual programming confirmation message. //Parameter 3: object of subclass of DefaultConsumer channel.basicConsume("helloworld",false,new Reciver(channel)); } }
Subclass of DefaultConsumer
package com.rabbit.helloworld; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; /** * @author Chen Tianxiang * @date 2022/1/19 * @describe */ class Reciver extends DefaultConsumer { private Channel channel; public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(msg); System.out.println("Label of the message:" + envelope.getDeliveryTag()); //Confirm only the current message channel.basicAck(envelope.getDeliveryTag(), false); } }
Work queues mode
producer
package com.rabbit.workqueues; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.google.gson.Gson; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * SMS sender (producer) */ public class OrderSystem { public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = RabbitUtils.getConnection(); channel = connection.createChannel(); //Parameter 1: queue name //Parameter 2: persistent //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it. //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed. //Parameter 5: others. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for (int i = 1; i <= 100; i++) { SMS sms = new SMS("passenger" + i, "1890000000" + i, "Your ticket has been booked successfully!"); channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, sms.toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } finally { System.out.println("Sending data succeeded."); if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
SMS entity class
package com.rabbit.workqueues; import lombok.Data; /** * SMS entity class */ @Data public class SMS { /*full name*/ private String name; /*mobile phone*/ private String mobile; /*content*/ private String content; public SMS(String name, String mobile, String content) { this.name = name; this.mobile = mobile; this.content = content; } }
Consumer 1
package com.rabbit.workqueues; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * Ticket buyer 1 (consumer) */ public class SMSSender1 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //If you do not write channel Basicqos (1), MQ will automatically send all requests to all consumers on average //channel.basicQos,MQ no longer sends multiple requests to consumers at one time, but after the consumer processes a message (after confirmation), it obtains a new request from the queue //After processing one by one, the performance of different servers is different. After server A consumes the message, it takes the message out of the queue again. At this time, server B has not consumed it. Equivalent to distribution on demand. channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("Ticket buyer 1 (consumer) received:" + jsonSMS); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Consumer 2
package com.rabbit.workqueues; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * Ticket buyer 2 (consumer) */ public class SMSSender2 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("Ticket buyer 2 (consumer) received:" + jsonSMS); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Consumer 3
package com.rabbit.workqueues; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * Ticket buyer 3 (consumer) */ public class SMSSender3 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("Ticket buyer 3 (consumer) received:" + jsonSMS); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Publish/Subscribe mode
producer
package com.rabbit.publishsubscribe; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @date 2022/1/21 * @describe Message producer * The switch needs to be created in advance, and the switch type = fanout */ public class Producter { public static void main(String[] args){ Channel channel = null; Connection connection = null; try { connection = RabbitUtils.getConnection(); channel = connection.createChannel(); //The first parameter is the switch name, and other parameters are the same as before channel.basicPublish(RabbitConstant.EXCHANGE_FANOUT_AIR_CONDITION, "", null, "Changsha air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.".getBytes()); System.out.println("Air quality message sent."); }catch (IOException e) { e.printStackTrace(); }finally{ if(channel !=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Consumer 1
package com.rabbit.publishsubscribe; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * @date 2022/1/21 * @describe */ public class Alibaba { public static void main(String[] args) throws IOException { //Get TCP long connection Connection connection = RabbitUtils.getConnection(); //Get virtual connection final Channel channel = connection.createChannel(); //Declare queue information channel.queueDeclare(RabbitConstant.QUEUE_ALIBABA, false, false, false, null); //queueBind is used to bind queues to switches //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key (temporarily unavailable) channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_FANOUT_AIR_CONDITION, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ALIBABA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Alibaba received air quality information:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Consumer 2
package com.rabbit.publishsubscribe; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * @date 2022/1/21 * @describe */ public class BaiDu { public static void main(String[] args) throws IOException { //Get TCP long connection Connection connection = RabbitUtils.getConnection(); //Get virtual connection final Channel channel = connection.createChannel(); //Declare queue information channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind is used to bind queues to switches //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key (temporarily unavailable) channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_FANOUT_AIR_CONDITION, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Baidu received air quality information:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
Routing mode
producer
package com.rabbit.routing; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * @date 2022/1/21 * @describe Message producer * The switch needs to be created in advance, and the switch type = direct */ public class Producter { public static Map<String,Object> getData(){ HashMap<String, Object> map = new HashMap<>(); map.put("china.hunan.changsha","Changsha air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.changsha.tianxinqu","Air index of Tianxin District of Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.changsha.furongqu","Air index of Furong district, Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.yueyang","Yueyang air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.changde","Changde air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.henan.zhengzhou","Zhengzhou air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.henan.hebi","Hebi air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.henan.shangqiu","Shangqiu air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); return map; } public static void main(String[] args){ Map<String, Object> map = getData(); Channel channel = null; Connection connection = null; try { connection = RabbitUtils.getConnection(); channel = connection.createChannel(); for(String key:map.keySet()){ System.out.println(map.get(key).toString()); //First parameter switch name //The second parameter is used as the routing key of the message channel.basicPublish( RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, key, null, map.get(key).toString().getBytes() ); } System.out.println("Air quality message sent."); }catch (IOException e) { e.printStackTrace(); }finally{ if(channel !=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Consumer 1
package com.rabbit.routing; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * @date 2022/1/21 * @describe */ public class Alibaba { public static void main(String[] args) throws IOException { //Get TCP long connection Connection connection = RabbitUtils.getConnection(); //Get virtual connection final Channel channel = connection.createChannel(); //Declare queue information channel.queueDeclare(RabbitConstant.QUEUE_ALIBABA, false, false, false, null); //queueBind is used to bind queues to switches //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.hunan.changsha.tianxinqu"); channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.hunan.changsha.furongqu"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ALIBABA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Alibaba received air quality information:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Consumer 2
package com.rabbit.routing; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * @date 2022/1/21 * @describe */ public class BaiDu { public static void main(String[] args) throws IOException { //Get TCP long connection Connection connection = RabbitUtils.getConnection(); //Get virtual connection final Channel channel = connection.createChannel(); //Declare queue information channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind is used to bind queues to switches //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.henan.zhengzhou"); channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.henan.hebi"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Baidu received air quality information:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
Topics mode
producer
package com.rabbit.topic; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * @date 2022/1/21 * @describe Message producer * The switch needs to be created in advance, and the switch type = direct */ public class Producter { public static Map<String,Object> getData(){ HashMap<String, Object> map = new HashMap<>(); map.put("china.hunan.changsha","Changsha air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.changsha.tianxinqu","Air index of Tianxin District of Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.changsha.furongqu","Air index of Furong district, Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.yueyang","Yueyang air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.hunan.changde","Changde air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.henan.zhengzhou.jinshuiqu","Zhengzhou air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.henan.hebi","Hebi air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.henan.shangqiu","Shangqiu air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); map.put("china.beijing","Beijing air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible."); return map; } public static void main(String[] args){ Map<String, Object> map = getData(); Channel channel = null; Connection connection = null; try { connection = RabbitUtils.getConnection(); channel = connection.createChannel(); for(String key:map.keySet()){ System.out.println(key+map.get(key).toString()); //First parameter switch name //The second parameter is the routing key of the message channel.basicPublish( RabbitConstant.EXCHANGE_TOPIC_AIR_CONDITION, key, null, (key+map.get(key).toString()).getBytes() ); } System.out.println("Air quality message sent."); }catch (IOException e) { e.printStackTrace(); }finally{ if(channel !=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Consumer 1
package com.rabbit.topic; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * @date 2022/1/21 * @describe */ public class Alibaba { public static void main(String[] args) throws IOException { //Get TCP long connection Connection connection = RabbitUtils.getConnection(); //Get virtual connection final Channel channel = connection.createChannel(); //To facilitate testing different rules, delete this queue first. channel.queueDelete(RabbitConstant.QUEUE_ALIBABA); //Declare queue information channel.queueDeclare(RabbitConstant.QUEUE_ALIBABA, false, false, true, null); //queueBind is used to bind queues to switches //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_TOPIC_AIR_CONDITION, "*.henan.#"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_ALIBABA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Alibaba received air quality information:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Consumer 2
package com.rabbit.topic; import com.rabbit.utils.RabbitConstant; import com.rabbit.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * @date 2022/1/21 * @describe */ public class BaiDu { public static void main(String[] args) throws IOException { //Get TCP long connection Connection connection = RabbitUtils.getConnection(); //Get virtual connection final Channel channel = connection.createChannel(); //To facilitate testing different rules, delete this queue first. channel.queueDelete(RabbitConstant.QUEUE_BAIDU); //Parameter 1: queue name //Parameter 2: persistent //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it. //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed. //Parameter 5: others. channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind is used to bind queues to switches //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_TOPIC_AIR_CONDITION, "*.henan.*"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Baidu received air quality information:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Java client API Guide
Java client API Guidehttps://www.rabbitmq.com/api-guide.html
Project address
Project addresshttps://download.csdn.net/download/qq_39706570/77151462