Message queue -- six working modes of RabbitMQ

Message queuing - six working modes of RabbitMQ:

1, Simple mode:

1. Simple mode:

There are no switches in the simple mode broker, only queues.

2. Related concepts:

⚫ P: The producer is the program that sends the message
⚫ C: Consumer: the receiver of the message will always wait for the message to arrive
⚫ Queue: message queue, shown in red. Similar to a mailbox, messages can be cached; The producer delivers the message to it, and the consumer takes the message out of it

3. Case:

Step 1: import dependency between producers and consumers:

Step 2: write producer and send message to producer:

package com.itheima.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * send message
 */
public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {


        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//The ip default value of rabbitmq is localhost
        factory.setPort(5672); //The default value of rabbitmq port is 5672
        factory.setVirtualHost("/itcast");//The virtual machine has a default virtual machine of "/". Here, create a virtual machine of "/ itcast"
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
        //5. Create Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Parameters:
            1. queue: Queue name
            2. durable:Is it persistent? When mq restarts, the queue is still there
            3. exclusive: 
                * Exclusive. Only one consumer can monitor this queue
                * Delete queue when Connection is closed
                *
            4. autoDelete:Whether to delete automatically. When there is no Consumer, the queue is automatically deleted
            5. arguments: Parameters.

         */
        //If there's no one named hello_ If there is a queue in the world, the queue will be created. If there is, it will not be created
        channel.queueDeclare("hello_world",true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        Parameters:
            1. exchange: Switch name. In simple mode, exchange opportunities use the default ""
            2. routingKey: Route name: in the simple mode, since the default switch is used, the message can be sent to the queue only after the route name is consistent with the queue name
            3. props: configuration information
            4. body: Send message data

         */

        String body = "hello rabbitmq~~~";

        //6. Send message
        channel.basicPublish("","hello_world",null,body.getBytes());


        //7. Release resources
      channel.close();
      connection.close();

    }
}

Step 3: write the consumer to receive the message:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
        //5. Create Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Parameters:
            1. queue: Queue name
            2. durable:Whether it is persistent or not, after mq restarts, is still there
            3. exclusive: 
                * Exclusive. Only one consumer can monitor this queue
                * Delete queue when Connection is closed
                *
            4. autoDelete:Whether to delete automatically. When there is no Consumer, it will be deleted automatically
            5. arguments: Parameters.

         */
        //If there's no one named hello_ If there is a queue in the world, the queue will be created. If there is, it will not be created
        channel.queueDeclare("hello_world",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);
                System.out.println("body: "+new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);


        //Close resource? No

    }
}

2, Work queue mode:

1. Work queue mode:

2. Related concepts:

⚫ Work Queues: compared with the simple mode of the entry program, there are one or more consumers, and multiple consumers consume messages in the same queue together.
⚫ Application scenario: in the case of heavy tasks or more tasks, using work queue can improve the speed of task processing
⚫ C1, C2: these consumers are competitive, that is, a message can only be consumed by one of them.

3. Case:

Producer Code:

package com.itheima.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * send message
 */
public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
        //5. Create Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Parameters:
            1. queue: Queue name
            2. durable:Whether it is persistent or not, after mq restarts, is still there
            3. exclusive: 
                * Exclusive. Only one consumer can monitor this queue
                * Delete queue when Connection is closed
                *
            4. autoDelete:Whether to delete automatically. When there is no Consumer, it will be deleted automatically
            5. arguments: Parameters.

         */
        //If there's no one named hello_ If there is a queue in the world, the queue will be created. If there is, it will not be created
        channel.queueDeclare("work_queues",true,false,false,null);
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        Parameters:
            1. exchange: Switch name. In simple mode, exchange opportunities use the default ""
            2. routingKey: Route name
            3. props: configuration information
            4. body: Send message data

         */
        for (int i = 1; i <= 10; i++) {
            String body = i+"hello rabbitmq~~~";

            //6. Send message
            channel.basicPublish("","work_queues",null,body.getBytes());
        }



        //7. Release resources
      channel.close();
      connection.close();

    }
}

Consumer 1's code:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_WorkQueues1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Default port 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
        //5. Create Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Parameters:
            1. queue: Queue name
            2. durable:Whether it is persistent or not, after mq restarts, is still there
            3. exclusive: 
                * Exclusive. Only one consumer can monitor this queue
                * Delete queue when Connection is closed
                *
            4. autoDelete:Whether to delete automatically. When there is no Consumer, it will be deleted automatically
            5. arguments: Parameters.

         */
        //If there's no one named hello_ If there is a queue in the world, the queue will be created. If there is, it will not be created
        channel.queueDeclare("work_queues",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);


        //Close resource? No

    }
}

Consumer 2's code:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_WorkQueues2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
        //5. Create Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        Parameters:
            1. queue: Queue name
            2. durable:Whether it is persistent or not, after mq restarts, is still there
            3. exclusive: 
                * Exclusive. Only one consumer can monitor this queue
                * Delete queue when Connection is closed
                *
            4. autoDelete:Whether to delete automatically. When Consumer is not deleted, it is automatically deleted
            5. arguments: Parameters.

         */
        //If there's no one named hello_ If there is a queue in the world, the queue will be created. If there is, it will not be created
        channel.queueDeclare("work_queues",true,false,false,null);

        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);


        //Close resource? No

    }
}

3, Pub/Sub subscription mode:

1. Pub/Sub subscription mode:

The switch needs to be bound with the queue. After binding; A message can be received by multiple consumers.

2. Related concepts:

⚫ P: The producer, that is, the program that sends the message, but it is no longer sent to the queue, but to X (switch)
⚫ C: The consumer, the receiver of the message, will always wait for the message to arrive
⚫ Queue: message queue, receiving messages and caching messages
⚫ Exchange: switch. On the one hand, it receives messages sent by producers. On the other hand, you know how to process messages, such as delivering to a special queue, delivering to all queues, or discarding messages. How to operate depends on the type of exchange.
There are three common types of Exchange:
∙ Fanout: broadcast and deliver the message to all queues bound to the switch
Direct: direct: send the message to the queue that meets the specified routing key
Topic: wildcard, which gives the message to the queue conforming to the routing pattern
 Headers: parameter matching, less use

Note: Exchange (switch) is only responsible for forwarding messages, and does not have the ability to store messages. Therefore, if there is no queue bound to exchange, or there is no compliance
The queue of routing rules, then the message will be lost!

3. Case:

Producer Code:

package com.itheima.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * send message
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       Parameters:
        1. exchange:Switch name
        2. type:Switch Type 
            DIRECT("direct"),: directional
            FANOUT("fanout"),: Sector (broadcast), sending messages to each queue bound to it.
            TOPIC("topic"),Wildcard mode
            HEADERS("headers");Parameter matching

        3. durable:Persistent
        4. autoDelete:Auto delete
        5. internal: Internal use. General false
        6. arguments: parameter
        */

       String exchangeName = "test_fanout";
        //5. Create a switch
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. Create queue
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. Bind queue and switch
        /*
        queueBind(String queue, String exchange, String routingKey)
        Parameters:
            1. queue: Queue name
            2. exchange: Switch name
            3. routingKey: Routing keys, binding rules
                If the switch type is fanout, the routingKey is set to "", because fanout sends messages to all queues without specific routing rules
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "Log information: Zhang San called findAll method...Log level: info...";
        //8. Send message
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. Release resources
        channel.close();
        connection.close();

    }
}

Consumer 1 Code:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_fanout_queue1";



        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
                System.out.println("Print log information to the console.....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //Close resource? No

    }
}

Consumer 2 Code:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_PubSub2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();


        String queue2Name = "test_fanout_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
                System.out.println("Save log information to database.....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //Close resource? No

    }
}

4. Differences between publish subscribe mode and work queue mode:

  1. Work queue mode does not need to define switches, while publish / subscribe mode needs to define switches
  2. The producer of publish / subscribe mode sends messages to the switch, and the producer of work queue mode sends messages to the queue (the default switch is used at the bottom)
  3. The publish / subscribe mode needs to set the binding between the queue and the switch. The work queue mode does not need to be set. In fact, the work queue mode will bind the queue to the default switch

4, Routing mode:

1. Mode:

  1. The binding between the queue and the switch cannot be arbitrary, but a routing key should be specified
  2. When sending a message to Exchange, the sender of the message must also specify the RoutingKey of the message
  3. Exchange will no longer deliver messages to each bound queue, but will judge according to the routing key of the message. Messages will be received only if the routing key of the queue is completely consistent with the routing key of the message

2. Related concepts:

⚫ P: The producer sends messages to Exchange. When sending messages, a routing key will be specified
⚫ 10: Exchange (exchange) receives the message from the producer, and then submits the message to the queue that exactly matches the routing key
⚫ C1: consumer whose queue specifies the message whose routing key is error
⚫ C2: consumer, whose queue specifies the messages whose routing key needs to be info, error and warning

3. Case:

Producer Code:

package com.itheima.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * send message
 */
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       Parameters:
        1. exchange:Switch name
        2. type:Switch Type 
            DIRECT("direct"),: directional
            FANOUT("fanout"),: Sector (broadcast), sending messages to each queue bound to it.
            TOPIC("topic"),Wildcard mode
            HEADERS("headers");Parameter matching

        3. durable:Persistent
        4. autoDelete:Auto delete
        5. internal: Internal use. General false
        6. arguments: parameter
        */

       String exchangeName = "test_direct";
        //5. Create a switch
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //6. Create queue
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. Bind queue and switch
        /*
        queueBind(String queue, String exchange, String routingKey)
        Parameters:
            1. queue: Queue name
            2. exchange: Switch name
            3. routingKey: Routing keys, binding rules
                If the switch type is fanout, routingKey is set to ""
         */
        //Queue 1 binding error
        channel.queueBind(queue1Name,exchangeName,"error");
        //Queue 2 bind info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String body = "Log information: Zhang San called delete method...Something went wrong... Log level: error...";
        //8. Send message
        channel.basicPublish(exchangeName,"warning",null,body.getBytes());

        //9. Release resources
        channel.close();
        connection.close();

    }
}

Consumer code 1:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
                System.out.println("Print log information to the console.....");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //Close resource? No

    }
}

Consumer 2 Code:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Routing2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
                System.out.println("Store log information to the database.....");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //Close resource? No

    }
}

5, Topics wildcard mode:

1. Topics wildcard mode:

  1. Compared with Direct, Topic type can route messages to different queues according to RoutingKey. However, Topic type Exchange allows the queue to use wildcards when Binding routing keys!
  2. A Routingkey is usually composed of one or more words with "." between them Split, for example: item insert
  3. Wildcard rule: # match one or more words, * match exactly one word, for example: item# Can match item insert. ABC or item insert,item.* Can only match item insert

⚫ Red Queue: the bound is usa. #, so all routing key s starting with usa. will be matched
⚫ Yellow Queue: bound to # News, so usually with The routing key s at the end of news will be matched

2. Case:

Producer Code:

package com.itheima.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * send message
 */
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();
       /*

       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       Parameters:
        1. exchange:Switch name
        2. type:Switch Type 
            DIRECT("direct"),: directional
            FANOUT("fanout"),: Sector (broadcast), sending messages to each queue bound to it.
            TOPIC("topic"),Wildcard mode
            HEADERS("headers");Parameter matching

        3. durable:Persistent
        4. autoDelete:Auto delete
        5. internal: Internal use. General false
        6. arguments: parameter
        */

       String exchangeName = "test_topic";
        //5. Create a switch
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //6. Create queue
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. Bind queue and switch
        /*
        queueBind(String queue, String exchange, String routingKey)
        Parameters:
            1. queue: Queue name
            2. exchange: Switch name
            3. routingKey: Routing keys, binding rules
                If the switch type is fanout, routingKey is set to ""
         */

        // routing key system name The level of the log.
        //=Requirements: all error level logs are stored in the database, and all order system logs are stored in the database
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");

        String body = "Log information: Zhang San called findAll method...Log level: info...";
        //8. Send message
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

        //9. Release resources
        channel.close();
        connection.close();

    }
}

Consumer 1 Code:

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
                System.out.println("Store log information into database.......");
            }
        };
        channel.basicConsume(queue1Name,true,consumer);


        //Close resource? No

    }
}

package com.itheima.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1. Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //2. Set parameters
        factory.setHost("172.16.98.133");//ip default value localhost
        factory.setPort(5672); //Port default 5672
        factory.setVirtualHost("/itcast");//Virtual machine defaults/
        factory.setUsername("heima");//User name: default guest
        factory.setPassword("heima");//Password default: guest
        //3. Create a Connection
        Connection connection = factory.newConnection();
        //4. Create a Channel
        Channel channel = connection.createChannel();


        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";


        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        Parameters:
            1. queue: Queue name
            2. autoAck: Auto confirm
            3. callback: callback object 

         */
        // receive messages
        Consumer consumer = new DefaultConsumer(channel){
            /*
                Callback method, which will be automatically executed after receiving the message

                1. consumerTag: identification
                2. envelope: Get some information, switch, routing key
                3. properties:configuration information
                4. body: data

             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              /*  System.out.println("consumerTag: "+consumerTag);
                System.out.println("Exchange: "+envelope.getExchange());
                System.out.println("RoutingKey: "+envelope.getRoutingKey());
                System.out.println("properties: "+properties);*/
                System.out.println("body: "+new String(body));
                System.out.println("Print log information to console.......");
            }
        };
        channel.basicConsume(queue2Name,true,consumer);


        //Close resource? No

    }
}

6, Summary:

  1. Simple mode HelloWorld
    One producer and one consumer do not need to set the switch (use the default switch).
  2. Work Queue mode
    One producer and multiple consumers (competitive relationship) do not need to set the switch (use the default switch).
  3. Publish/subscribe mode
    The switch with the type of fanout needs to be set, and the switch is bound to the queue. When the message is sent to the switch, the exchange opportunity sends the message to the bound queue.
  4. Routing mode
    You need to set the switch with the type of direct, bind the switch to the queue, and specify the routing key. When sending a message to the switch, the switch will send the message to the corresponding queue according to the routing key.
  5. Wildcard pattern Topic
    It is necessary to set the switch with the type of topic, bind the switch to the queue, and specify the routing key in the wildcard mode. When a message is sent to the switch, the switch will send the message to the corresponding queue according to the routing key.

Keywords: Java RabbitMQ Distribution message queue

Added by sheffrem on Tue, 15 Feb 2022 05:07:13 +0200