Microservice test development - how to test rocketmq when used in the project

1. What is MQ

mq is A message middleware. The simplest understanding is that service A sends A message to service B, and service B performs A series of actions after receiving the message

What problem can this solve

  • Peak shaving: if services are called through the interface, whether http or rpc, but it is a synchronous action, it means that there will be a large amount of data to the called service at the same time, and mq is used to solve this problem perfectly
  • Decoupling: the data interaction between services is no longer realized through mysql, redis, etc

2. What situation will use mq?

When there is data interaction between services, but timeliness is not required

For example:

  1. In the e-commerce scenario, after the order system generates an order, it will notify the warehouse to generate an issue document and the distribution system to generate a distribution document. Why not use the rpc or http interface in this scenario, because the call failure of the http interface will lead to data loss, and rpc also has the same problem. The second reason is that when a large number of orders are generated at a certain time, It is very likely to cause the warehouse service or distribution service to be suspended.
  2. Log service: multiple online servers will save logs to the unified log service to facilitate viewing online logs. This scenario is also very suitable for using mq

3. Why do test students need to master this skill

At the present time of service splitting, a large number of micro service scenarios appear, and mq scenarios are widely used. If the corresponding special tests are not carried out, there will probably be online problems.

4. What is the architecture of mq?

The producer sends a message to the topic(broker), and then the broker sends the message to the consumer. Each consumer has an offset (offset) to record whether the message has been consumed and prevent repeated consumption

The most important is the idempotent problem. When there is a problem in the network, the broker may send the same message to consumers multiple times

5. How to set up local rocketMQ

1. Download and unzip rocketMQ rocketmq.apache.org/release_not...

2. Start nameserver

sh mqnamesrv

3. Start broker

sh bin/mqbroker -n localhost:9876

4. Test with built-in tools

export NAMESRV_ADDR=localhost:9876
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=C0A80064502A355DA2545C7FD53503E5, offsetMsgId=C0A8006400002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=taomindeMacBook-Pro-2.local, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A80064502A355DA2545C7FD53603E6, offsetMsgId=C0A8006400002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=taomindeMacBook-Pro-2.local, queueId=2], queueOffset=249]

So far, the local rocketMQ environment has been set up. The next step is how to use spring boot for rocketMQ access

5. Create topic and consumer groups using the command line

establish topic
sh mqadmin updateTopic -n 192.168.0.103:9876 -b 192.168.0.103:10911 -t message-topic
 Create consumption group
sh mqadmin updateSubGroup -c DefaultCluster  -n 127.0.0.1:9876 -g nexcusdemo

6. Introduce dependencies in the project (pom.xml)

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>4.5.1</version>
</dependency>

7. Configure in consumer (application.properties)

rocketmq.name-server=localhost:9876

8. Producer configuration

# rocketMQ configuration
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=test

9. Producer code

@GetMapping("/send")
public String sendMessage(String content){
    mqService.sendHello(content);
    return "ok";
}

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Autowired
private RocketMQProducer<String> stringRocketMQProducer;

public void sendHello(String message){
    UserInfoDto userInfoDto = new UserInfoDto();
    userInfoDto.setPassword("123");
    userInfoDto.setUsername(message);
    userInfoDto.setId(1L);
    rocketMQTemplate.convertAndSend("message-topic", userInfoDto);
}

10. Consumer configuration

rocketmq.name-server=127.0.0.1:9876

11. Consumer code

@Log4j2
@Component
@RocketMQMessageListener(topic = "message-topic", consumerGroup = "nexcusdemo")
public class MessageListener implements RocketMQListener<UserInfoDto> {

    @Override
    public void onMessage(UserInfoDto message) {
        log.info("Message received: {}", message);
    }
}

Here, the code and configuration of calling rocketMQ to send messages through Java have been completed, and the results are demonstrated

12. Call the A service interface through the postman demonstration, and then view the log print on the B service

  1. Via postman call

  1. View the A service log, including the log of sending messages

  1. View the B service log, including the log of received messages

6. How does python call rocketMQ for automated testing

  1. Installing C + + dependencies for rocketMQ

macos:

wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz
tar -xzf rocketmq-client-cpp-2.0.0-bin-release.darwin.tar.gz
cd rocketmq-client-cpp
mkdir /usr/local/include/rocketmq
cp include/* /usr/local/include/rocketmq
cp lib/* /usr/local/lib
install_name_tool -id "@rpath/librocketmq.dylib" /usr/local/lib/librocketmq.dylib

centos:

wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
  1. Installing python dependencies
pip install rocketmq-client-python
  1. As a producer
from rocketmq.client import Producer, Message
import json


producer = Producer("test")
producer.set_name_server_address('127.0.0.1:9876')
producer.start()
msg = Message('message-topic')
# msg.set_keys('XXX')
# msg.set_tags('XXX')
msg.set_body(json.dumps({"username":"python", "password":"python3", "id":2}))
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
  1. Execute the producer code to see if the Java consumer receives

  1. As a consumer
import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS


consumer = PushConsumer('nexcusdemo')
consumer.set_name_server_address('127.0.0.1:9876')
consumer.subscribe('message-topic', callback)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()
  1. Check whether the message can be received after sending the request through POST

7. Common problems, idempotent test

  1. The scenario is as follows
User call A Service order interface, A The service will send MQ Message to B Service, generate 1 distribution document
 The problem is: if A When the service sends a message, because of network jitter and other problems, broker Will give B The service sends duplicate messages, resulting in the generation of two distribution orders. There should be several distribution orders for one order

  1. Service A (order service) will send A mq message to service B (distribution service) to generate A distribution document after generating an order

View A service code

//Generate order interface
@PostMapping("/order/create")
public Result<String> createOrder(@RequestBody OrderQuery orderQuery){
    // Generate an order and send a message to the distribution service to generate a distribution document
    mqService.createOrder(orderQuery);
    return new Result<String>().success("ok");
}

// Generate the order method and send a message to the distribution service
public boolean createOrder(OrderQuery orderQuery){

    HcOrder hcOrder = new HcOrder();
    hcOrder.setSku(orderQuery.getSku());
    hcOrder.setOrderNo(OrderUtil.getLeafNo());
    hcOrder.setQuantity(String.valueOf(orderQuery.getQuantity()));
    // Drop order data
    hcOrderMapper.insert(hcOrder);

    // Send message to delivery service
    sendOrder2Transport(hcOrder);
    return true;
}

// Send message to delivery service
public void sendOrder2Transport(HcOrder hcOrder){
     String order = JSONObject.toJSONString(hcOrder);
     rocketMQTemplate.convertAndSend("order-topic", order);
}

Then we can make sure that service A sends A message to service B, which is actually A json. This json is the hcOrder object, which is actually

{
    "sku": "1",  //Indicates which product it is
    "orderNo": "hc1111",  //Which order number does it represent
    "quantity": "2"  // Said he bought several pieces
}
  1. View B service code
@Override
public void onMessage(String order) {
    log.info("Message received: {}", order);
    HcOrder hcOrder = JSONUtil.convertString2Object(order, HcOrder.class);
    String orderNo = hcOrder.getOrderNo();

    // For idempotent processing, query first and return directly if there is one (if there is no processing, there may be one order corresponding to one distribution document)
    LambdaQueryWrapper<DeliveryOrder> deliveryOrderLambdaQueryWrapper = new LambdaQueryWrapper<>();
    deliveryOrderLambdaQueryWrapper.eq(DeliveryOrder::getOrderNo, orderNo);
    DeliveryOrder deliveryOrder1 = deliveryMapper.selectOne(deliveryOrderLambdaQueryWrapper);
    if (deliveryOrder1 != null){
        log.info("orderNo: {},Corresponding distribution document: {}, Already exists", orderNo, deliveryOrder1.getDeliveryNo());
        return;
    }

    DeliveryOrder deliveryOrder = new DeliveryOrder();
    deliveryOrder.setOrderNo(orderNo);
    deliveryOrder.setDeliveryNo(DeliveryOrderUtil.getLeafNo());
    deliveryMapper.insert(deliveryOrder);
    }
  1. How to use distribution to verify whether idempotent is done

First, check the distribution documents corresponding to hc1624011377626 in the distribution document table

If we use python to send another message to service B and check whether service B continues to insert a piece of data into the database, we can judge whether idempotent has been made

from rocketmq.client import Message, Producer
import json

producer = Producer("test")
producer.set_name_server_address("127.0.0.1:9876")
producer.start()

msg = Message("order-topic")
msg.set_body(json.dumps({"orderNo":"hc1624011377626", "sku":"1", "quantity":"2"}))
res = producer.send_sync(msg)
print(res.status, res.msg_id, res.offset)
producer.shutdown()
  1. Execute the python script to check the log and database. The log print data is repeated, and there is only one delivery order corresponding to the order. Finally, it is proved that idempotent takes effect

8. Write at the end

MQ message oriented middleware is a very small part of microservice testing. In the future, there will be various skills that need to be mastered for microservice testing and development. If you have any questions, you can join the group (323432957) for comments.

Keywords: Python Java Database MySQL kafka

Added by LOUDMOUTH on Sat, 22 Jan 2022 19:26:56 +0200