RabbitMQ has six modes, message response, persistence and message distribution

1: HelloWorld simple mode
2: Work Queues work mode
3: Publish/SubScribe mode
4: Routing mode
5: Topics topic mode
6: Publish Comfirm release confirmation mode

Code demonstration

1: HelloWorld simple mode (switches use the default -- binding RoutingKey -- queue)

Consumers actively get messages through listeners

Create maven project and add dependencies

<!--appoint jdk Compiled version-->
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<dependencies>
    <!--rabbitmq Dependent client-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!--A dependency on the operation file stream-->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

View mq port number default 5672

 ps -ef |grep rabbit

Create producer classes and write code

package com.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//Simple mode
public class Producer {
    //Queue name
    public static final String queue_name = "hello";
    //Send a message
    public static void main(String[] args) throws IOException, TimeoutException {
        //Create factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //ip
        connectionFactory.setHost("192.168.6.100");
        //port will go without writing
//        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123");
        //Create connection
        Connection connection = connectionFactory.newConnection();
        //Create channel
        Channel channel = connection.createChannel();
        /**
         * Generate a queue
         * 1.Queue name
         * 2.Is the message in the queue persistent? The default message is stored in memory
         * 3.Whether the queue is only used by one consumer or shared. true can be used by multiple consumers
         * 4.Whether to automatically delete the queue after the last consumer is disconnected. true: automatically delete
         * 5.Other parameters
         */
        channel.queueDeclare(queue_name,false,false,false,null);
		 String message="hello world";
        /**
         * Send a message
         * 1.Send to that switch '' default
         * 2.Which key is the routing key
         * 3.Other parameter information
         * 4.The body of the message sent
         */
        channel.basicPublish("",queue_name,null,message.getBytes());
        System.out.println("Message sent");
        channel.close();
        connection.close();
    }
}

consumer

package com.mq;

import com.rabbitmq.client.*;

//Receive messages regardless of channel and connection
public class Consumer {
    //Same name as producer
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("Waiting to receive message....");
        //How to perform interface callback for consumption of pushed messages
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
        };
        //A callback interface for canceling consumption, such as when the queue is deleted during consumption
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("Message consumption interrupted");
        };
        /**
         * Consumer News
         * 1.Which queue to consume
         * 2.Whether to answer automatically after consumption is successful. true means to answer automatically. false means to answer manually
         * 3.Callback of unsuccessful consumption by consumers
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

2: Work Queues work mode

Compared with the simple model, there are multiple consumers

Message response

Automatic response: high efficiency and fast speed, and message loss will occur

Manual response: messages will be saved in the queue, and consumers will respond only after processing

Code before modification

//How to perform interface callback for consumption of pushed messages
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
            //. . .  Other codes
            //Parameter: 1 Identification, 2 Batch
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
         //A callback interface for canceling consumption, such as when the queue is deleted during consumption
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("Message consumption interrupted");
        };
        /**
         * Consumer News
         * 1.Which queue to consume
         * 2.Whether to answer automatically after consumption is successful. true means to answer automatically. false means to answer manually
         * 3.Callback of unsuccessful consumption by consumers
         */
        //The manual response message failed and can be retransmitted
        boolean autoAsk = false;
        channel.basicConsume(QUEUE_NAME,autoAsk,deliverCallback,cancelCallback);
        //Only after the above code is executed can the message be deleted from the confrontation queue be answered

When two consumers receive messages, they will poll normally. If one consumer is abnormal, the other consumer will accept his message (the message will automatically rejoin the queue)

Persistence

Ensure that the messages sent by the message producer are not lost after the RabbitMQ service is stopped

Set queue persistence and message persistence

Code before modification

/**
         * Generate a queue
         * 1.Queue name
         * 2.Is the message in the queue persistent? The default message is stored in memory
         * 3.Whether the queue is only used by one consumer or shared. true can be used by multiple consumers
         * 4.Whether to automatically delete the queue after the last consumer is disconnected. true: automatically delete
         * 5.Other parameters
         */
        channel.queueDeclare(queue_name,true,false,false,null);
       
        /**
         * Send a message
         * 1.Send to that switch '' default
         * 2.Which key is the routing key
         * 3.Other parameter information is persisted messageproperties PERSISTENT_ TEXT_ PLAIN
         * 4.The body of the message sent
         */
        channel.basicPublish("",queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

You need to re create a queue, otherwise an error will be reported, because once the queue is created, you cannot modify whether it is persistent or not

Message distribution

Fair distribution: one of the consumer messages may be overstocked due to unfair polling

 //Set fair distribution messages
        int prefetchCount = 1;  //The default is 0
        channel.basicQos(prefetchCount);

prefetchCount: prefetchCount: when writing other numbers, the channel will backlog messages. The self-test value should be competitive

Switch

The core idea of RabbitMQ messaging model is that messages produced by producers are never sent directly to queues. In fact, usually producers don't even know which queues these messages are delivered to.

On the contrary, the producer can only send messages to the switch. The work of the switch is very simple. On the one hand, it receives messages from the producer, on the other hand, it pushes them into the queue. The switch must know exactly how to process the received message. Should these messages be put in a specific queue, or should they be put in many queues, or should they be discarded. This depends on the type of switch.

Type: direct, topic, headers, fan out

An empty string indicates the default or unnamed switch: messages can be routed to the queue, which is actually specified by the routing key (binding key) binding key

Temporary queue

Whenever we connect to Rabbit, we need a new empty queue. Therefore, we can create a queue with * * random name * *, or let the server choose a random queue name for us. Secondly, once we disconnect the consumer, the queue will be deleted automatically.

Create a temporary queue as follows:

String queueName = channel.queueDeclare().getQueue();

3. Publish subscriber mode

Mass distribution, each consumer has its own queue

Code: create connection tool class

public class RabbitMqUtils {
    //Get a connected channel
    public static Channel getChannel() throws Exception{
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

producer

public class EmitLog {
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Mass messaging
        Scanner sc = new Scanner(System.in);
        System.out.println("Please enter information");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            channel.basicPublish("logs", "", null, message.getBytes("UTF-8"));
            System.out.println("Producer sends message" + message);
        }
    }
}

Consumer, copying a copy

public class ReceiveLogs01 {
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Temporary queue
        String q1 = channel.queueDeclare().getQueue();
//        String q2 = channel.queueDeclare().getQueue();
        //Switch name and type
        channel.exchangeDeclare("logs","fanout");
        //Bind queue name, switch name, connection name
        channel.queueBind(q1,"logs","");
//        channel.queueBind(q2,"logs","");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("The console prints the received message"+message);
        };
        channel.basicConsume(q1, true, deliverCallback, consumerTag -> { });
    }
}

Start the consumer first, you can create a binding to the column, and you can receive all messages when you start the producer to send messages

The first two: nameless switch

Subscriber mode: using fan out switch, mass sending

Routing mode: direct switch

Theme mode: theme switch

4. Routing mode

Multiple binding. The binding name can be set to send messages to that consumer

Consumer 1

//Accept log
public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "disk";
        //queue
        channel.queueDeclare(queueName, false, false, false, null);
        //binding
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println("Waiting to receive message.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            message="Receive binding key:"+delivery.getEnvelope().getRoutingKey()+",news:"+message;
            System.out.println("Error log received");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

Consumer 2

//Accept log
public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "console";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("Waiting to receive message.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Receive binding key:" + delivery.getEnvelope().getRoutingKey() + ",news:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

producer

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //Create multiple bindingkeys
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info","ordinary info information");
            bindingKeyMap.put("warning","warning warning information");
            bindingKeyMap.put("error","error error information");
            //debug does not consume, so all messages received will be lost
            bindingKeyMap.put("debug","debugging debug information");
            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("Producer sends message:" + message);
            }
        }
    }
}

5. Thematic model

Compared with routing patterns, wildcards can be used for matching

An asterisk can replace a word

#Can replace zero or more words

Code

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            /**
             * Q1-->Bound is
             *      String with 3 words in the middle (*. orange. *)
             * Q2-->Bound is
             *      The last word is three words of rabbit (*. *. rabbit)
             *      The first word is multiple words of lazy (lazy. #)
             *
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit","By queue Q1Q2 Received");
            bindingKeyMap.put("lazy.orange.elephant","By queue Q1Q2 Received");
            bindingKeyMap.put("quick.orange.fox","By queue Q1 Received");
            bindingKeyMap.put("lazy.brown.fox","By queue Q2 Received");
            bindingKeyMap.put("lazy.pink.rabbit","Although two bindings are satisfied, they are only used by the queue Q2 Receive once");
            bindingKeyMap.put("quick.brown.fox","Any binding that does not match will not be received by any queue and will be discarded");
            bindingKeyMap.put("quick.orange.male.rabbit","If the four words do not match, any binding will be discarded");
            bindingKeyMap.put("lazy.orange.male.rabbit","It's four words, but it doesn't match Q2");

            for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("Producer sends message" + message);
            }
        }
    }
}
public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //Declare Q1 queue and binding relationship
        String queueName="Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

        System.out.println("Waiting to receive message.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Receive queue:"+queueName+"Binding key:"+delivery.getEnvelope().getRoutingKey()+",news:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
public class ReceiveLogsTopic02 {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //Declare Q2 queue and binding relationship
        String queueName="Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("Waiting to receive message.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Receive queue:"+queueName+"Binding key:"+delivery.getEnvelope().getRoutingKey()+",news:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

Keywords: RabbitMQ Distribution Middleware message queue

Added by walshd on Tue, 11 Jan 2022 20:03:21 +0200