This article describes in detail the scenarios in which message queuing is applied

When we are developing or designing a website, we often encounter mass SMS, mass email, or send in station letters to all users of the system, or in the order system, we need to record a large number of logs. If our system is an e-commerce system, when designing rush buying and second kill activities, the server is high and distributed, and we simply can't bear this instantaneous pressure. Many examples... If we encounter these problems, how to ensure the normal and effective operation of the system, how should we design and deal with them?

At this time, we need to use message queue to deal with this kind of problem. It can be said that message queue is a middleware, which is used to divert and decompress the pressure caused by various concurrency. So what is a message queue?

Familiar message queue (principle)
Message queue is actually a middleware with queue structure, that is, after putting messages and contents into a container, they can be returned directly. Regardless of the results of its post-processing, the contents in the container will be processed one by one by another program in order.

A message queue results in the following process:
A business system enters the queue and inserts the messages (contents) into the message queue one by one. After successful insertion, the successful result will be returned directly. Then, a message processing system will take out and process the records in the message queue one by one for out of the queue operation.

What are the application scenarios of message queuing
Message queuing is mainly used in redundancy, decoupling, traffic peak shaving, asynchronous communication, as well as some scalability and sorting guarantee. Let's learn more about these characteristics

data redundancy
For example, in an order system, when there are many orders, strict conversion and recording are required in the follow-up. At this time, the message queue can persist and store these data in the queue, and then obtain it by the order processing program. After the follow-up processing is completed, delete this record to ensure that each record can be processed.

system decoupling
Message queuing separates two systems: in queue system and out of queue system, which solves the problem of deep coupling between the two systems. After using message queue, there is no direct relationship between the queued system and the queued system. When one of the queued system and the queued system crashes, it will not affect the normal operation of the other system.

We use a case of system decoupling to explain in detail: queue processing order system and distribution system
Scenario: after submitting an order during online shopping, you see that your order goods are in distribution, so you can participate in a system, the distribution system. If we design the order system and the distribution system together during architecture, there will be problems. Firstly, for the order system, the processing pressure of the order system is large. For the distribution system, it is not necessary to reflect these pressures in time. We do not need to have problems in the order system and the distribution system at the same time. At this time, the operation of the two systems will be affected at the same time, so we can use decoupling to solve them.

After the two systems are separated, we can communicate between the two systems through a team list. First, the order system will receive the user's orders, process the orders, and write these orders to the queue list. This queue list is the key to communicating the two systems. The regularly executed program in the distribution system will read the queue list for processing. After processing, the distribution system will mark the processed records, which is the whole detailed process.

The detailed design is as follows (Mysql queue example):

First, we use the order.php file to receive the user's order.
Then generate the order number and process the order. After the order system is processed, the data required by the distribution system will be added to the queue list.

Order form

CREATE TABLE `order_queue` (  
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'Order id number',  
  `order_id` int(11) NOT NULL,  
  `mobile` varchar(20) NOT NULL COMMENT 'User's mobile phone number',  
  `address` varchar(100) NOT NULL COMMENT 'User's address',  
  `created_at` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 'Order creation time',  
  `updated_at` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT 'Completion time of logistics system processing',  
  `status` tinyint(2) NOT NULL COMMENT 'Current status, 0 unprocessed, 1 processed, 2 processing',  
  PRIMARY KEY (`id`)  
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

Then there is a timing script to start the distribution processing program every minute. The distribution processing program: goods.php is used to process the data in the queue list. When the processing is completed, the field status in the queue list will be changed to processing completed, which ends the whole process.

The specific codes are as follows:
1 order.php file for processing orders

<?php   
include 'class/db.php';  

if(!empty($_GET['mobile'])){  
    $order_id = rand(10000,99999).date("YmdHis").'688';  
    $insert_data = array(  
        'order_id'=>$order_id,  
        'mobile'=>$_GET['mobile'],      //Remember to filter
        'created_at'=>date('Y-m-d H:i:s',time()),  
        'order_id'=>$order_id,  
        'status'=>0,    //0, unprocessed status
    );  
    $db = DB::getIntance();  
//Put the data into the queue list
    $res = $db->insert('order_queue',$insert_data);  
    if($res){  
        echo $insert_data['order_id']."Saved successfully";  
    }else{  
        echo "Save failed";  
    }  
}else{  
    echo "1";  
}  
?>

The distribution system processes the order file goods.php

<?php   
//The distribution system processes the order and marks it
include 'class/db.php';  
$db = DB::getIntance();  
//1: First, change the data status to be processed to pending processing
$waiting = array('status'=>0,);  
$lock = array('status'=>2,);  
$res_lock = $db->update('order_queue',$lock,$waiting,2);  
//2: Select the data just updated, and then process it in the distribution system
if($res_lock){  
    //Select the order content to process
    $res = $db->selectAll('order_queue',$lock);  
     //Then it is handled by the distribution system..... And other operations
    //3: Change the processed to the processed state
    $success = array(  
        'status'=>1,  
        'updated_at'=>date('Y-m-d H;i:s',time()),    
    );  
    $res_last = $db->update('order_queue',$success,$lock);  
    if($res_last){  
       echo "Processing succeeded:".$res_last;   
    }else{  
        echo "Processing failed:".$res_last;  
    }  
}else{  
    echo "All processing completed";  
}  
?>

Regularly execute the script's goods.sh, which is executed every minute

#!/bin/bash  
date "+%G-%m-%d %H:%M:%S"    //Current date
cd /data/wwwroot/default/mq/  
php goods.php  

Then the crontab task executes the script regularly, creates a log file, and specifies the output format

*/1 * * * * /data/wwwroot/default/mq/good.sh >> /data/wwwroot/default/mq/log.log 2>&1 //Specify the script directory and format the output / / of course, create a log.log file

Monitoring log

tail -f log.log  //Monitoring log

In this way, the order system and the distribution system j are independent of each other and do not affect the normal operation of the other system, which is system decoupling

Flow peak clipping
The most classic of this scenario is second kill and rush purchase. In this case, there will be a great surge of traffic. A large number of demands are concentrated in just a few seconds, which will put great pressure on the server. We cooperate with cache redis to use message queue to effectively solve this kind of instantaneous access and prevent the server from collapsing.

We also use a case to understand: use Redis's List type to realize second kill.
We will use these redis functions:

RPUSH/RPUSHX: insert the value at the end of the linked list. Ditto, opposite position
LPOP: remove and get the first element in the linked list.
RPOP: remove and get the last element in the linked list.
LTRIM: keep the elements within the specified interval.
LLEN: get the length of the linked list.
LSET: use the index to set the value of the linked list element.
LINDEX: get the elements in the linked list through the index.
LRANGE: get the elements within the specified range of the linked list

scene
Record which user participated in the second kill and record the time at the same time, so as to facilitate subsequent processing. The user ID will be stored in the [redis] linked list for queuing. For example, it is planned to make the first 10 people succeed in the second kill and the later people fail in the second kill. In this way, it is enough to keep the length of the redis linked list at 10. If you request additional data in redis after 10, the request will be rejected in the program, After redis is accessed, the following program will take the value of redis, because the data cannot be stored in the cache for a long time. A program will traverse and process the redis value and put it into the database for permanent storage. Because the second kill will not be too long, it can be scanned circularly with a script.

detailed description:
First, the redis program will put the user's request data into redis, mainly uid and microsecond timestamp; Then check the length of the redis linked list. If it exceeds the length, give up the processing; Read the contents of the redis linked list from the dead cycle data and store them in the database.

Design of second kill recorder:

CREATE TABLE `redis_queue` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `uid` int(11) NOT NULL DEFAULT '0',
  `time_stamp` varchar(24) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

Program to receive user requests:

<?php

$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis-_name = 'miaosha';

//Spike user influx simulation, 500 users
for ($i =0; $i < 500; $i++) {
    $uid = rand(1000000,99999999);
}
//Check the length of redis linked list (existing quantity)
$num = 10;
if ($redis->lLen($redis_name) < 10 ) {
    //Add the tail of the linked list
    $redis->rPush($redis_name, $uid.'%'.microtime());
} else {  //If you reach 10
    //The second kill is over
}
$redis->close();

Handler (get redis data and write it into the data table)

<?php
//Read a value from the queue header to determine whether the value exists. If it exists, cut out the time and uid and save it to the database. (for redis, if the value is taken from redis, the value will not be in the redis queue. If a problem occurs and fails, we need to have a mechanism to put the failed data back into the redis linked list.)
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis-_name = 'miaosha';

//Deadlock detection redis queue
while(1) {
    $user = $redis->lpop($redis_name);
    if (!$user || $user == 'null') {  //If there is no data, jump out of the loop
        //If it is executed all the time and the speed is very fast, the server is under great pressure. Here, it takes 2 seconds
        sleep(2);
        //Jump out of loop
        continue;
    } 
    //Take out the microsecond timestamp and uid
    $user_arr = explode('%', $user);
    $insert_data = array(
        'uid' => $user_arr[0];
        'time_stamp' => $user_arr[1];
    );
    $res = $db->insert('redis_queue', $insert_data);
    //If the insertion fails
    if (!$res) {
        //Take it out in which direction and insert it back in which direction
        $redis->lpush($redis_name, $user);
        sleep(2);
    }
} 
$redis->close();

For testing, you can execute the cycle detection script first, and then execute the second kill script to start the test and monitor the changes of Mysql database.

Asynchronous communication
The message itself can make the queued system return directly, so the asynchronous operation of the program is realized. Therefore, as long as it is suitable for asynchronous scenarios, the message queue can be used.

Let's look at specific cases:
Basic knowledge points
The following commands are mainly used to realize our message push.

  • brpop blocking mode is deleted after getting the value from the right side of the queue
  • brpoplpush is deleted after taking values from the right side of queue A and placed in queue B from the left side

logic analysis

  • In the normal task script, write the target of the push_queue queue to send messages, and set a content to be pushed for the target, which will never expire
  • Brpoppush is processed in RedisPushQueue, and the processed value is placed in temp_queue, which mainly prevents push failure caused by program crash
  • RedisAutoDeleteTempqueueItems handles temp_queue. brpop is used here

Code: normal task script

<?php
foreach ($user_list as $item) {
  //Naming rules business type operation ID random 6-bit value customization I customized is "push content"
  $k_name = 'rabbit_push_' . $item['uid'].'_'.rand(100000,999999);
  $redis->lPush('push_queue',$k_name);//Left entry queue
  $redis->set($k_name, 'Push content');
}

RedisPushQueue

<?php
//Message queue processing push~
//
 // Daemon running
 // Nohup PHP yourpath / redispushqueue.php & start the daemon to run. After modifying the file, you need to restart it
// blpop returns if it has a value and blocks if it has no value. The main reason is that this function works, but it is not safe. The crash of the program during execution will lead to the contents in the queue
 // Permanent loss~
 // In the BRPOPLPUSH blocking mode, it is required to enter from the left when filling in the queue content
 //
ini_set('default_socket_timeout', -1); //No timeout
require_once 'YOURPARH/Rongcloud.php';

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->select(2);//Switch to db2
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);

// temp_queue temporary queue prevents the program from crashing, resulting in the loss of contents in the queue. 0 means never timeout!
While ($key = $redis->brpoplpush('push_queue', 'temp_queue', 0)) {
  if ($val = $redis->get($key)) {
    //rabbit_push_20_175990
    $arr = explode('_', $key);
    if (count($arr) != 4) {
      continue;
    }
    $id = $arr[2];
    push($id, $val);
    //Delete key content
    $redis->del($key);
  } 
}
function push($id, $v)
{
 //Push operation~
}

RedisAutoDeleteTempqueueItems
Automatically process the elements in temp_queue. This operation is to prevent RedisPushQueue from crashing.

The processing idea is to use the brpop command to block the value in the temp_queue. If the "value" corresponding to the "value" can be obtained, it indicates that the RedisPushQueue failed to execute. Return the value lpush to the push_queue for new processing

The reason why we use the brpop command is that in RedisPushQueue, we use brpoppushnohop PHP yourpath / redisautodeletetempqueueitems.php & start the daemon to run. After modifying the file, we need to restart it

<?php

ini_set('default_socket_timeout', -1); //No timeout
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->select(2);//Switch to db2
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
  while($key_arr = $redis->brPop('temp_queue',0)){
  if(count($key_arr) != 2){
    continue;
  }
  $key =$key_arr[1];
  if($redis->get($key)){//If the value can be obtained, it indicates that RedisPushQueue execution failed
    $redis->lPush('push_queue',$key);
  }
}

For more professional message queues, you can use RabbitMQ, ActiveMQ, eroMq and Kafka. There is not much to introduce here.

Keywords: PHP message queue

Added by zesoft.net on Wed, 08 Dec 2021 23:31:48 +0200