RabbitMQ several common patterns and API s

catalogue

target

prepare

Getting to know RabbitMQ

brief introduction

Supported protocols

Several common modes

Hello World mode

Work queues mode

Publish/Subscribe mode

Routing mode

Topics mode

Code demonstration

Correlation dependency

Related tools

Hello World mode

Work queues mode

Publish/Subscribe mode

Routing mode

Topics mode

Java client API Guide

Project address

target

Master the ideas of several common modes of RabbitMQ. This article will provide relevant code cases based on the RabbitMQ official website.

prepare

  • Erlang environment;
  • RabbitMQ corresponding to Erlang version;
  • Open 5672 and 15672 ports.

Official website: RabbitMQ Erlang version requirementshttps://www.rabbitmq.com/which-erlang.html

Getting to know RabbitMQ

brief introduction

  1. Support multiple messaging protocols, message queues, delivery confirmation, flexible queue routing, and multiple exchange types.
  2. Deploy using BOSH, Chef, Docker, and Puppet. Develop cross language messaging using your favorite programming languages, such as Java NET, PHP, Python, JavaScript, Ruby, Go, etc.
  3. Distributed deployment for high availability and throughput;
  4. Pluggable authentication and authorization, supporting TLS and LDAP. Lightweight and easy to deploy in public and private clouds.
  5. Various tools and plug-ins supporting continuous integration, operational indicators and integration with other enterprise systems. A flexible plug-in method for extending RabbitMQ functionality.
  6. HTTP-API, command line tools, and UI for managing and monitoring RabbitMQ.

Official website: RabbitMQ functionhttps://www.rabbitmq.com/

Supported protocols

  1. AMQP 0-9-1
  2. STOMP
  3. MQTT
  4. AMQP 1.0
  5. HTTP and WebSockets (although HTTP is not a real messaging protocol, RabbitMQ can transmit messages through HTTP in three ways).

Official website: what protocols does RabbitMQ support?https://www.rabbitmq.com/protocols.html

Several common modes

Official website: RabbitMQ tutorialhttps://www.rabbitmq.com/getstarted.html

Hello World mode

Description: a producer (sender) sends a message, and a consumer (receiver) receives the message and prints it. This is the simplest model.

Illustration: where P is the producer, C is the consumer, and the red rectangle is the queue.

Work queues mode

Description: that is, work queue mode (task queue). In order to avoid long-time waiting for complex business execution, some tasks are encapsulated as messages and sent to the queue. These tasks are shared by these consumers.

Illustration: where P is the producer and C is the consumer.

Publish/Subscribe mode

Description: in the publish / subscribe mode, the producer can only send messages to the switch, and the consumer can specify the switch to subscribe to messages.

Illustration: where P is the producer, C is the consumer, and X is the switch (type: fanout). The producer does not directly send the message to the queue, but puts the message into the queue through the switch.

Routing mode

Description: i.e. routing mode, through routing_key (routing key or binding key) selectively receives messages. However, this mode cannot route based on multiple standards.

Illustration: where P is the producer, C is the consumer, and X is the switch (type: direct). The routing key of C1 has only error, so C1 only receives messages with the routing key of error.

Topics mode

Description: topic mode or wildcard mode. Compared with routing mode, topic mode is more flexible. Switches and queues are bound by wildcard style routing keys.

Wildcard:

  • *Means to replace a word;
  • #Represents the substitution of zero or more words;

Wildcard example:

routingKey of senderReceiver 1 (*. henan. #)Receiver 2 (*. henan. *)
china.hunan.changsh
china.hunan.changsha.tianxinqu
china.hunan.changsha.furongqu
china.hunan.yueyang
china.hunan.changde
china.henan.zhengzhou.jinshuiqumatching
china.henan.hebimatchingmatching
china.henan.shangqiumatchingmatching
china.beijing

Illustration:

Code demonstration

Correlation dependency

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

Related tools

package com.rabbit.utils;
public class RabbitConstant {
    public static final String QUEUE_SMS = "sms";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_ALIBABA = "alibaba";
    public static final String EXCHANGE_FANOUT_AIR_CONDITION = "fanout_air_condition";
    public static final String EXCHANGE_DIRECT_AIR_CONDITION = "direct_air_condition";
    public static final String EXCHANGE_TOPIC_AIR_CONDITION = "topic_air_condition";
}
package com.rabbit.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
    static {
        connectionFactory.setHost("my IP");
        connectionFactory.setPort(5672);//5672 is the default port number for RabbitMQ
        connectionFactory.setUsername("user name");
        connectionFactory.setPassword("password");
        connectionFactory.setVirtualHost("Virtual machine name");
    }
    public static Connection getConnection(){
        Connection conn = null;
        try {
            conn = connectionFactory.newConnection();
            return conn;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

Hello World mode

producer

package com.rabbit.helloworld;

import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @date 2022/1/18
 * @describe
 */
public class Producter {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //Parameter 1: queue name
        //Parameter 2: persistent
        //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it.
        //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed.
        //Parameter 5: others.
        channel.queueDeclare("helloworld", false, false, false, null);
        //send message
        //Parameter 1: switch (switch can not be used in hello world mode)
        //Parameter 2: queue name
        //Parameter 3: additional attributes
        //Parameter 4: message
        String msg="This is the message I sent.";
        channel.basicPublish("", "helloworld", null, msg.getBytes());
        channel.close();
        connection.close();
        System.out.println("Sent successfully");
    }
}

consumer

package com.rabbit.helloworld;

import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @date 2022/1/18
 * @describe
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //Parameter 1: queue name
        //Parameter 2: persistent
        //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it.
        //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed.
        //Parameter 5: others.
        channel.queueDeclare("helloworld", false, false, false, null);
        //Get data from MQ
        //Parameter 1: queue name
        //Diner 2: false indicates manual programming confirmation message.
        //Parameter 3: object of subclass of DefaultConsumer
        channel.basicConsume("helloworld",false,new Reciver(channel));
    }
}

Subclass of DefaultConsumer

package com.rabbit.helloworld;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * @author Chen Tianxiang
 * @date 2022/1/19
 * @describe
 */
class Reciver extends DefaultConsumer {
    private Channel channel;

    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body);
        System.out.println(msg);
        System.out.println("Label of the message:" + envelope.getDeliveryTag());
        //Confirm only the current message
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

Work queues mode

producer

package com.rabbit.workqueues;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * SMS sender (producer)
 */
public class OrderSystem {
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabbitUtils.getConnection();
            channel = connection.createChannel();
            //Parameter 1: queue name
            //Parameter 2: persistent
            //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it.
            //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed.
            //Parameter 5: others.
            channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

            for (int i = 1; i <= 100; i++) {
                SMS sms = new SMS("passenger" + i, "1890000000" + i, "Your ticket has been booked successfully!");
                channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, sms.toString().getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Sending data succeeded.");
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
SMS entity class
package com.rabbit.workqueues;

import lombok.Data;

/**
 * SMS entity class
 */
@Data
public class SMS {
    /*full name*/
    private String name;
    /*mobile phone*/
    private String mobile;
    /*content*/
    private String content;

    public SMS(String name, String mobile, String content) {
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    }
}

Consumer 1

package com.rabbit.workqueues;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Ticket buyer 1 (consumer)
 */
public class SMSSender1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        //If you do not write channel Basicqos (1), MQ will automatically send all requests to all consumers on average
        //channel.basicQos,MQ no longer sends multiple requests to consumers at one time, but after the consumer processes a message (after confirmation), it obtains a new request from the queue
        //After processing one by one, the performance of different servers is different. After server A consumes the message, it takes the message out of the queue again. At this time, server B has not consumed it. Equivalent to distribution on demand.
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("Ticket buyer 1 (consumer) received:" + jsonSMS);

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Consumer 2

package com.rabbit.workqueues;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Ticket buyer 2 (consumer)
 */
public class SMSSender2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("Ticket buyer 2 (consumer) received:" + jsonSMS);

                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Consumer 3

package com.rabbit.workqueues;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Ticket buyer 3 (consumer)
 */
public class SMSSender3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("Ticket buyer 3 (consumer) received:" + jsonSMS);
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Publish/Subscribe mode

producer

package com.rabbit.publishsubscribe;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @date 2022/1/21
 * @describe Message producer
 * The switch needs to be created in advance, and the switch type = fanout
 */
public class Producter {
    public static void main(String[] args){
        Channel channel = null;
        Connection connection = null;
        try {
            connection = RabbitUtils.getConnection();
            channel = connection.createChannel();
            //The first parameter is the switch name, and other parameters are the same as before
            channel.basicPublish(RabbitConstant.EXCHANGE_FANOUT_AIR_CONDITION, "", null, "Changsha air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.".getBytes());
            System.out.println("Air quality message sent.");
        }catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(channel !=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Consumer 1

package com.rabbit.publishsubscribe;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @date 2022/1/21
 * @describe
 */
public class Alibaba {
    public static void main(String[] args) throws IOException {
        //Get TCP long connection
        Connection connection = RabbitUtils.getConnection();
        //Get virtual connection
        final Channel channel = connection.createChannel();
        //Declare queue information
        channel.queueDeclare(RabbitConstant.QUEUE_ALIBABA, false, false, false, null);
        //queueBind is used to bind queues to switches
        //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key (temporarily unavailable)
        channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_FANOUT_AIR_CONDITION, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_ALIBABA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Alibaba received air quality information:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Consumer 2

package com.rabbit.publishsubscribe;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @date 2022/1/21
 * @describe
 */
public class BaiDu {
    public static void main(String[] args) throws IOException {
        //Get TCP long connection
        Connection connection = RabbitUtils.getConnection();
        //Get virtual connection
        final Channel channel = connection.createChannel();
        //Declare queue information
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind is used to bind queues to switches
        //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key (temporarily unavailable)
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_FANOUT_AIR_CONDITION, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Baidu received air quality information:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

Routing mode

producer

package com.rabbit.routing;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @date 2022/1/21
 * @describe Message producer
 * The switch needs to be created in advance, and the switch type = direct
 */
public class Producter {
    public static Map<String,Object> getData(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("china.hunan.changsha","Changsha air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.changsha.tianxinqu","Air index of Tianxin District of Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.changsha.furongqu","Air index of Furong district, Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.yueyang","Yueyang air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.changde","Changde air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.henan.zhengzhou","Zhengzhou air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.henan.hebi","Hebi air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.henan.shangqiu","Shangqiu air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        return map;
    }
    public static void main(String[] args){
        Map<String, Object> map = getData();
        Channel channel = null;
        Connection connection = null;
        try {
            connection = RabbitUtils.getConnection();
            channel = connection.createChannel();
            for(String key:map.keySet()){
                System.out.println(map.get(key).toString());
                //First parameter switch name
                //The second parameter is used as the routing key of the message
                channel.basicPublish(
                        RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION,
                        key,
                        null,
                        map.get(key).toString().getBytes()
                );
            }
            System.out.println("Air quality message sent.");
        }catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(channel !=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Consumer 1

package com.rabbit.routing;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @date 2022/1/21
 * @describe
 */
public class Alibaba {
    public static void main(String[] args) throws IOException {
        //Get TCP long connection
        Connection connection = RabbitUtils.getConnection();
        //Get virtual connection
        final Channel channel = connection.createChannel();
        //Declare queue information
        channel.queueDeclare(RabbitConstant.QUEUE_ALIBABA, false, false, false, null);
        //queueBind is used to bind queues to switches
        //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key
        channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.hunan.changsha.tianxinqu");
        channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.hunan.changsha.furongqu");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_ALIBABA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Alibaba received air quality information:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Consumer 2

package com.rabbit.routing;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @date 2022/1/21
 * @describe
 */
public class BaiDu {
    public static void main(String[] args) throws IOException {
        //Get TCP long connection
        Connection connection = RabbitUtils.getConnection();
        //Get virtual connection
        final Channel channel = connection.createChannel();
        //Declare queue information
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind is used to bind queues to switches
        //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.henan.zhengzhou");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_DIRECT_AIR_CONDITION, "china.henan.hebi");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Baidu received air quality information:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

Topics mode

producer

package com.rabbit.topic;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @date 2022/1/21
 * @describe Message producer
 * The switch needs to be created in advance, and the switch type = direct
 */
public class Producter {
    public static Map<String,Object> getData(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("china.hunan.changsha","Changsha air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.changsha.tianxinqu","Air index of Tianxin District of Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.changsha.furongqu","Air index of Furong district, Changsha: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.yueyang","Yueyang air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.hunan.changde","Changde air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.henan.zhengzhou.jinshuiqu","Zhengzhou air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.henan.hebi","Hebi air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.henan.shangqiu","Shangqiu air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        map.put("china.beijing","Beijing air index: good. Warm tips: outdoor activities should be reduced, masks should be worn when going out, and sensitive people should avoid going out as far as possible.");
        return map;
    }
    
    public static void main(String[] args){
        Map<String, Object> map = getData();
        Channel channel = null;
        Connection connection = null;
        try {
            connection = RabbitUtils.getConnection();
            channel = connection.createChannel();
            for(String key:map.keySet()){
                System.out.println(key+map.get(key).toString());
                //First parameter switch name
                //The second parameter is the routing key of the message
                channel.basicPublish(
                        RabbitConstant.EXCHANGE_TOPIC_AIR_CONDITION,
                        key,
                        null,
                        (key+map.get(key).toString()).getBytes()
                );
            }
            System.out.println("Air quality message sent.");
        }catch (IOException e) {
            e.printStackTrace();
        }finally{
            if(channel !=null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Consumer 1

package com.rabbit.topic;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @date 2022/1/21
 * @describe
 */
public class Alibaba {
    public static void main(String[] args) throws IOException {
        //Get TCP long connection
        Connection connection = RabbitUtils.getConnection();
        //Get virtual connection
        final Channel channel = connection.createChannel();
        //To facilitate testing different rules, delete this queue first.
        channel.queueDelete(RabbitConstant.QUEUE_ALIBABA);
        //Declare queue information
        channel.queueDeclare(RabbitConstant.QUEUE_ALIBABA, false, false, true, null);
        //queueBind is used to bind queues to switches
        //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key
        channel.queueBind(RabbitConstant.QUEUE_ALIBABA, RabbitConstant.EXCHANGE_TOPIC_AIR_CONDITION, "*.henan.#");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_ALIBABA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Alibaba received air quality information:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Consumer 2

package com.rabbit.topic;

import com.rabbit.utils.RabbitConstant;
import com.rabbit.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @date 2022/1/21
 * @describe
 */
public class BaiDu {
    public static void main(String[] args) throws IOException {
        //Get TCP long connection
        Connection connection = RabbitUtils.getConnection();
        //Get virtual connection
        final Channel channel = connection.createChannel();
        //To facilitate testing different rules, delete this queue first.
        channel.queueDelete(RabbitConstant.QUEUE_BAIDU);
        //Parameter 1: queue name
        //Parameter 2: persistent
        //Parameter 3: privatization. false means that all consumers can access it, and true means that only consumers who own it for the first time can access it.
        //Parameter 4: whether to delete automatically. false indicates that the queue will not be deleted after the link is closed.
        //Parameter 5: others.
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind is used to bind queues to switches
        //Parameter 1: queue name parameter 2: interactive machine name parameter 3: routing key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_TOPIC_AIR_CONDITION, "*.henan.*");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Baidu received air quality information:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

Java client API Guide

Java client API Guidehttps://www.rabbitmq.com/api-guide.html

Project address

Project addresshttps://download.csdn.net/download/qq_39706570/77151462

Keywords: RabbitMQ

Added by busterbry on Tue, 25 Jan 2022 08:07:52 +0200