What is rabbit MQ
RabbitMQ is an open source message broker and queue server based on AMQP protocol.
advantage:
- Erlang language is used for development as the underlying language: Erlang has the same latency as the native Socket, so the performance is very high
- Open source, excellent performance and stability guarantee
- Provide reliability message delivery mode (confirm) and return mode (return)
- Perfect integration with spring AMQP, rich API
- The cluster mode is rich, including expression configuration, HA mode and mirror queue model
- Ensure high reliability and availability on the premise of no data loss
AMOP proper nouns:
- Server: also known as Broker
- Connection: connection, the network connection between the application and the broker
- Channel: network channel, the task of a network session
- Message: message
- Virtual host: virtual address, used for logical isolation and message routing at the top layer. A virtual host can have several exchages and queues, but the same virtual host cannot have exchages and queues with the same name
- Exchange: the switch receives messages and forwards them to the binding queue according to the routing key
- Binding: the virtual connection between Exchage and Queue. The binding can contain routing key
- Routing key: a routing rule
- Queue: a container that holds specific messages
RabbitMQ message flow process:
Application scenario / role
-
Asynchronous buffer
Some services can be asynchronous. As long as the final consistency is achieved, MQ can be used without strong consistency -
Service decoupling
-
Strong dependency: service invocation and connection using dubbo or spring cloud are strong dependencies. [for example, registration and discovery need to rely on other services]
-
Weak dependency: MQ Middleware
- Does not mean that weak dependencies can fail
- If it can not fail, it is necessary to ensure the reliability of data delivery at the upstream message publishing end
Scenario example: after the user places an order, the order needs to update the inventory
Problems under strong dependency:
1) If the inventory system cannot be accessed, the order inventory reduction fails, resulting in the failure of order generation
2) Order module and inventory module are strongly coupled
3) If a thread is enabled for offline operation, it only makes asynchronous access, and the access only improves the speed. Whether the normal call is successful cannot be guaranteedSolve the above problems through weak dependency:
1) The order production successfully writes the message to the message queue (ensure the reliable delivery of the message)
2) The inventory system obtains the order information through the subscription message, and the inventory system performs inventory operations according to the order information
3) If the inventory system is abnormal and the inventory consumption message fails, the message will return to the queue and wait for the next transmission
-
-
Peak cutting and valley filling
- When our downstream services can't handle it, we can cache these messages in one place and process them step by step
- It is the process of peak shaving and valley filling to slow down the backlog of business for a short period of time
reflection
- Reliable delivery at the production end;
- If the news is about money, it must not be lost
- To achieve 100% delivery at the production end, you need to ensure atomicity with business data
- Idempotent of consumer;
- If the production side wants to achieve reliable delivery, there may be repeated delivery
- The consumer has consumed two or more times, and the data may be inconsistent
- Therefore, the consumer must achieve the same result obtained by consuming the same request many times
- MQ itself needs to be considered
- HA: high availability
- Low delay
- Reliability: ensure that the data is complete
- Stacking capacity: This is the guarantee that MQ can carry your business level
- Scalability: whether it can naturally support horizontal expansion without perceptual capacity expansion
Principle analysis of RabbitMq cluster architecture
1) Active standby mode
The master slave structure can be understood as hot backup. The master is responsible for reading and writing. After the Master goes down, it switches to the slave
2) Mirror mode
More popular models in the industry;
The classic RabbitMQ cluster is the mirror mode, which ensures 100% data loss;
High availability, low data synchronization latency, odd number of nodes.
Disadvantages:
The drawback of the image queue cluster is that it cannot scale horizontally well, because each node is a complete node that replicates with each other, and too many image nodes will increase the burden of MQ. A data write will be copied to multiple nodes, and the throughput will be reduced
Installation and use of rabbit MQ
RabbitMq-3.8.19 installation details I have already introduced an article before, and then omit it.
Modify user login and connection heartbeat
- Set loopback_users.guest = false, remove the previous comments
- Change {heartbeat, 60} to {heartbeat, 10}
Check whether the MQ port is enabled: yum -y install lsof
- lsof -i:5672
Launch plug-in:
- rabbitmq-plugins enable rabbitmq_management
Check whether the management background is started
- lsof -i:15672
Common commands
# Start service systemctl start rabbitmq-server # perhaps rabbitmq-server -detached # Open the web management interface plug-in rabbitmq-plugins enable rabbitmq_management # Close app rabbitmqctl stop_app # Start application rabbitmqctl start_app # Node status rabbitmqctl status # Add user password rabbitmqctl add_user username password # Modify user password rabbitmqctl change_password username password # List all users rabbitmqctl list_users # delete user rabbitmqctl delete_user username # List user permissions rabbitmqctl list_user_permissions username # Clear user permissions rabbitmqctl clear_permissions -p vhostpath username # Set user permissions # Three * correspond to: configure write read rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*" rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*" # List all virtual hosts rabbitmqctl list_vhosts # Create virtual host rabbitmqctl add_vhost vhostpath # Lists the permissions of the virtual host rabbitmqctl list_permissions -p vhostpath # Delete virtual host rabbitmqctl delete_vhost vhostpath # View all queues rabbitmqctl list_queues # Clear messages in queue rabbitmqctl -p vhostpath purge_queue queueName # Clear all data rabbitmqctl reset # This action is best performed after the MQ service is stopped
Spring boot integration rabbitmq
send out
Build a SpringBoot project and preparations. I wrote it in another blog. I won't repeat it here. SpringBoot global exception handling, integration Swagger and parameter verification , all the preparations we need now are in this article.
Add application yaml
server: port: 8088 spring: rabbitmq: host: 192.168.150.130 username: guest password: guest virtual-host: / connection-timeout: 10000
Message entity
package cn.com.springboot.vo; import lombok.Data; import java.io.Serializable; @Data public class OrderInfo implements Serializable { private String id; private String orderName; private String messageId; }
Send class
package cn.com.springboot.web; import cn.com.springboot.vo.OrderInfo; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; private static final String ORDER_EXCHANGE = "order_exchange"; private static final String ORDER_ROUTING_KEY = "order_r_key"; public void sendOrder(OrderInfo orderInfo){ //correlationData: Message unique id CorrelationData correlationData = new CorrelationData(); correlationData.setId(orderInfo.getMessageId()); //String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData); } }
rabbitmq preparation
Create exchange
Description of Exchange related properties
-
Name: the name of the exchange
-
Type description
-
direct: exchange will set the routingkey when binding with the queue. Only when the routingkey is exactly the same, exchange will forward the message to the corresponding queue, which is equivalent to point-to-point
-
fanout: directly route messages to all bound queues without matching the routingkey of the message, because the routingkey is not bound, and all messages are forwarded the fastest (broadcast mode)
-
Topic: this type of exchange is similar to direct, but the direct type requires the same routingkey, and topic can use wildcards: '*', '#'
Where '*' means matching one word and '#' means matching no or more words
-
Header: the routing rule is determined according to the header
-
Summary: generally, direct and topic are used for specific routing information. If broadcast is used, fanout is used, and the header type is less used
-
-
Durability: Durable means persistent to disk
-
Auto Delete: if set to yes, exchange will automatically delete the last queue bound to exchange after it is deleted
-
Internal: If yes, it indicates that the exchange is used internally by rabbitmq and is not provided to external system applications. It is generally used when writing customized extensions in erlang
-
Arguments this is the customized content used when extending the AMQP protocol
Create Queue
Exchange and Queue are associated through Binding and routed by routingkey
Test send
package cn.com.springboot.web; import cn.com.springboot.vo.OrderInfo; import cn.com.springboot.vo.ResultVo; import io.swagger.annotations.Api; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @Api("send message") @RestController public class RabbitMqController { @Autowired private OrderSender orderSender; @PostMapping("/sendOrder") public ResultVo sender(@RequestBody OrderInfo orderInfo){ orderSender.sendOrder(orderInfo); return ResultVo.success(); } }
Test with swagger
The message was successfully sent to the queue
be careful:
- An exchange can bind multiple queues. As long as the routingkey is the same, a message will be sent to multiple queues
- exchange is bound to a queue. No matter how many routingkeys are bound, messages that meet the routingkey rule will be sent to the queue. When receiving messages from any routingkey, the consumer connected to the queue will consume them, which is equivalent to a queue corresponding to multiple message rules
messages receiving
Create another project: consumer and producer. The construction steps are basically the same as those above.
application.yaml
server: port: 8080 spring: rabbitmq: host: 192.168.150.130 username: guest password: guest virtual-host: / connection-timeout: 10000 listener: simple: concurrency: 5 # Initialization concurrency max-concurrency: 10 # Maximum concurrent number auto-startup: true # Automatically turn on monitoring prefetch: 1 # Each connection can process a maximum of several messages at the same time, and the current limit setting acknowledge-mode: manual # The sign in mode is manual sign in
Add consumer class
package cn.com.springboot.web; import cn.com.springboot.vo.OrderInfo; import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Log4j2 @Component public class OrderReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order_queue", durable = "true"), exchange = @Exchange(value = "order_exchange", type = "topic"),//durable defaults to true key = "order_r_key"//My routingKey is order_r_key )) @RabbitHandler public void receiveOrderInfo(@Payload OrderInfo orderInfo, @Headers Map<String, Object> headers, Channel channel) throws IOException { log.info("Start consumption"); log.info("orderName: {}, messageId: {}", orderInfo.getOrderName(), orderInfo.getMessageId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
Start test
Share here for the time being, welcome to correct!