1. What is MQ
mq is A message middleware. The simplest understanding is that service A sends A message to service B, and service B performs A series of actions after receiving the message
What problem can this solve
- Peak shaving: if services are called through the interface, whether http or rpc, but it is a synchronous action, it means that there will be a large amount of data to the called service at the same time, and mq is used to solve this problem perfectly
- Decoupling: the data interaction between services is no longer realized through mysql, redis, etc
2. What situation will use mq?
When there is data interaction between services, but timeliness is not required
For example:
- In the e-commerce scenario, after the order system generates an order, it will notify the warehouse to generate an issue document and the distribution system to generate a distribution document. Why not use the rpc or http interface in this scenario, because the call failure of the http interface will lead to data loss, and rpc also has the same problem. The second reason is that when a large number of orders are generated at a certain time, It is very likely to cause the warehouse service or distribution service to be suspended.
- Log service: multiple online servers will save logs to the unified log service to facilitate viewing online logs. This scenario is also very suitable for using mq
3. Why do test students need to master this skill
At the present time of service splitting, a large number of micro service scenarios appear, and mq scenarios are widely used. If the corresponding special tests are not carried out, there will probably be online problems.
4. What is the architecture of mq?
The producer sends a message to the topic(broker), and then the broker sends the message to the consumer. Each consumer has an offset (offset) to record whether the message has been consumed and prevent repeated consumption
The most important is the idempotent problem. When there is a problem in the network, the broker may send the same message to consumers multiple times
5. How to set up local rocketMQ
1. Download and unzip rocketMQ rocketmq.apache.org/release_not...
2. Start nameserver
sh mqnamesrv
3. Start broker
sh bin/mqbroker -n localhost:9876
4. Test with built-in tools
export NAMESRV_ADDR=localhost:9876 sh tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId=C0A80064502A355DA2545C7FD53503E5, offsetMsgId=C0A8006400002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=taomindeMacBook-Pro-2.local, queueId=1], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A80064502A355DA2545C7FD53603E6, offsetMsgId=C0A8006400002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=taomindeMacBook-Pro-2.local, queueId=2], queueOffset=249]
So far, the local rocketMQ environment has been set up. The next step is how to use spring boot for rocketMQ access
5. Create topic and consumer groups using the command line
establish topic sh mqadmin updateTopic -n 192.168.0.103:9876 -b 192.168.0.103:10911 -t message-topic Create consumption group sh mqadmin updateSubGroup -c DefaultCluster -n 127.0.0.1:9876 -g nexcusdemo
6. Introduce dependencies in the project (pom.xml)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.1</version> </dependency>
7. Configure in consumer (application.properties)
rocketmq.name-server=localhost:9876
8. Producer configuration
# rocketMQ configuration rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=test
9. Producer code
@GetMapping("/send") public String sendMessage(String content){ mqService.sendHello(content); return "ok"; } @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private RocketMQProducer<String> stringRocketMQProducer; public void sendHello(String message){ UserInfoDto userInfoDto = new UserInfoDto(); userInfoDto.setPassword("123"); userInfoDto.setUsername(message); userInfoDto.setId(1L); rocketMQTemplate.convertAndSend("message-topic", userInfoDto); }
10. Consumer configuration
rocketmq.name-server=127.0.0.1:9876
11. Consumer code
@Log4j2 @Component @RocketMQMessageListener(topic = "message-topic", consumerGroup = "nexcusdemo") public class MessageListener implements RocketMQListener<UserInfoDto> { @Override public void onMessage(UserInfoDto message) { log.info("Message received: {}", message); } }
Here, the code and configuration of calling rocketMQ to send messages through Java have been completed, and the results are demonstrated
12. Call the A service interface through the postman demonstration, and then view the log print on the B service
- Via postman call
- View the A service log, including the log of sending messages
- View the B service log, including the log of received messages
6. How does python call rocketMQ for automated testing
- Installing C + + dependencies for rocketMQ
macos:
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz tar -xzf rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz cd rocketmq-client-cpp mkdir /usr/local/include/rocketmq cp include/* /usr/local/include/rocketmq cp lib/* /usr/local/lib install_name_tool -id "@rpath/librocketmq.dylib" /usr/local/lib/librocketmq.dylib
centos:
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
- Installing python dependencies
pip install rocketmq-client-python
- As a producer
from rocketmq.client import Producer, Message import json producer = Producer("test") producer.set_name_server_address('127.0.0.1:9876') producer.start() msg = Message('message-topic') # msg.set_keys('XXX') # msg.set_tags('XXX') msg.set_body(json.dumps({"username":"python", "password":"python3", "id":2})) ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) producer.shutdown()
- Execute the producer code to see if the Java consumer receives
- As a consumer
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS consumer = PushConsumer('nexcusdemo') consumer.set_name_server_address('127.0.0.1:9876') consumer.subscribe('message-topic', callback) consumer.start() while True: time.sleep(3600) consumer.shutdown()
- Check whether the message can be received after sending the request through POST
7. Common problems, idempotent test
- The scenario is as follows
User call A Service order interface, A The service will send MQ Message to B Service, generate 1 distribution document The problem is: if A When the service sends a message, because of network jitter and other problems, broker Will give B The service sends duplicate messages, resulting in the generation of two distribution orders. There should be several distribution orders for one order
- Service A (order service) will send A mq message to service B (distribution service) to generate A distribution document after generating an order
View A service code
//Generate order interface @PostMapping("/order/create") public Result<String> createOrder(@RequestBody OrderQuery orderQuery){ // Generate an order and send a message to the distribution service to generate a distribution document mqService.createOrder(orderQuery); return new Result<String>().success("ok"); } // Generate the order method and send a message to the distribution service public boolean createOrder(OrderQuery orderQuery){ HcOrder hcOrder = new HcOrder(); hcOrder.setSku(orderQuery.getSku()); hcOrder.setOrderNo(OrderUtil.getLeafNo()); hcOrder.setQuantity(String.valueOf(orderQuery.getQuantity())); // Drop order data hcOrderMapper.insert(hcOrder); // Send message to delivery service sendOrder2Transport(hcOrder); return true; } // Send message to delivery service public void sendOrder2Transport(HcOrder hcOrder){ String order = JSONObject.toJSONString(hcOrder); rocketMQTemplate.convertAndSend("order-topic", order); }
Then we can make sure that service A sends A message to service B, which is actually A json. This json is the hcOrder object, which is actually
{ "sku": "1", //Indicates which product it is "orderNo": "hc1111", //Which order number does it represent "quantity": "2" // Said he bought several pieces }
- View B service code
@Override public void onMessage(String order) { log.info("Message received: {}", order); HcOrder hcOrder = JSONUtil.convertString2Object(order, HcOrder.class); String orderNo = hcOrder.getOrderNo(); // For idempotent processing, query first and return directly if there is one (if there is no processing, there may be one order corresponding to one distribution document) LambdaQueryWrapper<DeliveryOrder> deliveryOrderLambdaQueryWrapper = new LambdaQueryWrapper<>(); deliveryOrderLambdaQueryWrapper.eq(DeliveryOrder::getOrderNo, orderNo); DeliveryOrder deliveryOrder1 = deliveryMapper.selectOne(deliveryOrderLambdaQueryWrapper); if (deliveryOrder1 != null){ log.info("orderNo: {},Corresponding distribution document: {}, Already exists", orderNo, deliveryOrder1.getDeliveryNo()); return; } DeliveryOrder deliveryOrder = new DeliveryOrder(); deliveryOrder.setOrderNo(orderNo); deliveryOrder.setDeliveryNo(DeliveryOrderUtil.getLeafNo()); deliveryMapper.insert(deliveryOrder); }
- How to use distribution to verify whether idempotent is done
First, check the distribution documents corresponding to hc1624011377626 in the distribution document table
If we use python to send another message to service B and check whether service B continues to insert a piece of data into the database, we can judge whether idempotent has been made
from rocketmq.client import Message, Producer import json producer = Producer("test") producer.set_name_server_address("127.0.0.1:9876") producer.start() msg = Message("order-topic") msg.set_body(json.dumps({"orderNo":"hc1624011377626", "sku":"1", "quantity":"2"})) res = producer.send_sync(msg) print(res.status, res.msg_id, res.offset) producer.shutdown()
- Execute the python script to check the log and database. The log print data is repeated, and there is only one delivery order corresponding to the order. Finally, it is proved that idempotent takes effect
8. Write at the end
MQ message oriented middleware is a very small part of microservice testing. In the future, there will be various skills that need to be mastered for microservice testing and development. If you have any questions, you can join the group (323432957) for comments.