Lifecycle of redis command execution

baiyan

Introduce

First, let's look at a very familiar redis command execution diagram:

Think about it then, when we connect to the redis server, enter and execute a redis command, such as set key1 value1.How on earth was this command sent to the redis server, and how did the redis server parse, process, and return to its successful execution?

Client-to-server command transfer (request)

Redis encapsulates a set of protocol specifications on the basis of TCP protocol, which makes it easy for the server and client to receive and parse data, delimit the boundary between command parameters, and finally process the data transmitted in TCP byte stream.Here we use tcpdump to capture packets when redis-cli sends commands:

tcpdump port 6379 -i lo -X

At this point, we enter the set key1 value1 command in the client.The following packets are captured in tcpdump:

The first is the packet when the client sends the command to the redis server, and the second is the packet that the redis server responds to the client.Let's start with the first packet, which is sent from port 43856 on the client to port 6379 on the redis server.The first 20 bytes are the IP header, and the last 32 bytes are the TCP header (since there is an optional option behind the TCP header).
We are mainly concerned with the data information starting from "2a33", from which point on is the specific data format of redis.From an ASCII code translation of the data on the right side, you can also see the words set, key1, value1, with some useful characters in the middle. Here, we will analyze the protocol format of redis data transmission according to the result of packet capture.

  • 2a33:0x2a is the ASCII code value of the character'*', 0x33 is the ASCII code value of'3'(decimal value is 51)
  • 0d0a:0d is the ASCII code value of "r", 0a is the ASCII code value of "n"
  • 7365: is the ASCII code value of "s" and "e"
  • 740d: is the ASCII code value of "t" and "r"
  • 0a24: is the ASCII code value of "n" and "$"
  • 340d: is the ASCII code value of "4" and "r"
  • 0a6b: is the ASCII code value of "n" and "k"
  • 6579: is the ASCII code value of "e" and "y"
  • 310d: is the ASCII code value of "1" and "r"
  • 0a24: is the ASCII code value of "n" and "$"
  • 360d: ASCII code value of "6" and "r"
  • 0a76: is the ASCII code value of "n" and "v"
  • 616c: is the ASCII code value of "a" and "l"
  • 7565: is the ASCII code value of "u" and "e"
  • 310d: is the ASCII code value of "1" and "r"
  • 0a: is the ASCII code value of "n"

Seeing here, can we find the following rules:

  • redis uses'*'as a flag to indicate the beginning of the command.The number immediately following * represents the number of parameters (set key1 value1 has three parameters and therefore 3)
  • redis starts with'$'as a command parameter, followed by a number representing the length of the parameter (for example, key1 is 4 and therefore $4)
  • redis uses "rn" as a separator between parameters to facilitate locating boundaries when parsing TCP byte stream data

Taken together, the redis packets sent by the client to the server are in the following format:

*3 \r\n set \r\n $4 \r\n key1 \r\n $6 \r\n value1 \r\n

Compared with the FastCGI protocol, redis uses only a few delimiters and special characters to normalize the command's transmission syntax and data format, and through the delimiters defined in them, the server can easily and efficiently parse and read the correct data from the byte stream data.This communication protocol is simple and efficient, and can meet the high performance requirements of redis.

Server-side processing of commands

Now that the command has been securely delivered to the server through the redis data transfer protocol, the server is ready to start processing the transmitted byte stream data.Because we clearly defined the boundaries of each parameter (\r\n) in the protocol, the redis service side is also very easy to resolve.

Step 1: Use of callback functions

Redis is a typical event driver.To improve the performance of single-process redis, redis uses IO multiplexing technology to process command requests from clients.Redis specifies the event handler to execute when the server receives an event requested by a client command when creating a client instance:

client *createClient(int fd) {
    
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        anetNonBlock(NULL,fd); //Set up non-blocking
        anetEnableTcpNoDelay(NULL,fd); //Setup does not use Nagle algorithm to avoid half-package and sticky-package phenomena
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive); //Set keep-alive
        //Notice that a file event is created here.Callback readQueryFromClient() function when client is ready to read events
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR) {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    ... 
}

To temporarily store the byte stream data requested by the client to the server, redis encapsulates a receive buffer that caches the data read from the socket.Subsequent command processing reads command data from the buffer and processes it.The advantage of buffers is that you do not have to maintain read-write sockets all the time.In subsequent processes, we only need to read from the buffer, not from the socket.This allows the socket to be released earlier and saves resources.Buffer creation and use is accomplished in the client callback function readQueryFromClient() described earlier:

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
   ...
    qblen = sdslen(c->querybuf); //Get Buffer Length
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; 
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //Create an sds structure as a buffer
    nread = read(fd, c->querybuf+qblen, readlen); //Read data from socket to buffer for temporary storage
    ...
    //Really Processing Commands
    processInputBufferAndReplicate(c);
}

Step 2: Use of Distributors

This code creates and writes byte stream data to the buffer, then calls processInputBufferAndReplicate() to actually process the command.The processInputBufferAndReplicate() function simply calls the ==processInputBuffer() function.Since there is already byte stream data from the client to the server in our previous buffer, we need to do some preliminary data filtering and processing in this layer:

void processInputBuffer(client *c) {
    // Continue cycling if the buffer has not been processed
    while(c->qb_pos < sdslen(c->querybuf)) {
         ...
        // Customized distribution processing of byte stream data
        if (c->reqtype == PROTO_REQ_INLINE) { //If the request is of type INLINE
            if (processInlineBuffer(c) != C_OK) break;  //Call processInlineBuffer to resolve buffer data
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {//If the request is of type MULTIBULK
            if (processMultibulkBuffer(c) != C_OK) break; //Call processMultibulkBuffer to resolve buffer data
        } else { 
            serverPanic("Unknown request type");
        }

       // Start processing specific commands
        if (c->argc == 0) { //Command parameter is 0, illegal
            resetClient(c);
        } else { //Command parameter is not 0, legal
            // Call processCommand() to actually process the command
            if (processCommand(c) == C_OK) { //
                ...
            }
        }
    }
}

Readers may be confused at this point.What is INLINE and what is MULTIBULK?In redis, there are two types of request commands:

  • INLINE type: Simple string format, such as the ping command
  • MULTIBULK type: String array format.Most commands, such as set, get, and so on, are of this type

This function is actually a distributor.Since the underlying byte stream data is irregular, we need to distinguish which type of request the request byte stream data belongs to based on the client's reqtype field and distribute it to the corresponding function for processing.Since the commands we often execute are of type MULTIBULK, we also take the MULTIBULK type as an example.For the set, get MULTIBULK request type, it is distributed to the processMultibulkBuffer() function for processing.

Step 3: Check the data integrity of the receive buffer

When the Nagle algorithm for TCP is enabled, TCP merges or disassembles packets requested by multiple redis commands.This can result in incomplete commands in one data package or multiple commands in one data package.To solve this problem, the processMultibulkBuffer() function guarantees that it will successfully parse command parameters in the byte stream and return a success status code only if the buffer contains a full request.Otherwise, an external while loop will break out, wait for the next event loop to read the remaining data from the socket, and then parse the command.This ensures the integrity of the data in the redis protocol as well as the integrity of the actual command parameters.

int processMultibulkBuffer(client *c) {
    while(c->multibulklen) {
        ...
        /* Read Command Parameter Byte Stream */
        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { //If the number after $that represents the parameter length does not match the actual command length (+2 at \r\n), the data is incomplete, jump out of the loop and wait for the next read of the remaining data
            break;
        } else { //Complete commands, do some initialization before executing commands
            if (c->qb_pos == 0 && c->bulklen >= PROTO_MBULK_BIG_ARG && sdslen(c->querybuf) == (size_t)(c->bulklen+2)) {
                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); 
                sdsIncrLen(c->querybuf,-2); 
                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
                sdsclear(c->querybuf);
            } else {
                c->argv[c->argc++] =
                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
                c->qb_pos += c->bulklen+2;
            }
            c->bulklen = -1;
            c->multibulklen--; //Processing next command parameter
        }
    }
}

Step 4: Really Processing Commands

Let's go back to the outside.When we successfully execute the processMultibulkBuffer() function, the current command is complete and ready to be processed.Let's think about what we should do if we want to design different handlers to perform different functions according to different commands.Just think about it, we can write the following code simply:

if (command == "get") {
    doGetCommand(); //get command processing function
} else if (command == "set") {
    doSetCommand(); //set command processing function
} else {
    printf("Illegal command")
}

The above code is very simple, but based on the different command requests we get, it is distributed to different command processing functions for customization.So redis actually works the same way, so what exactly does redis do?

int processCommand(client *c) {
    //Return directly if exiting command
    if (!strcasecmp(c->argv[0]->ptr,"quit")) { 
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //Find commands in the dictionary and assign command processing functions to the cmd field in the c structure
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    // Return Value Check
    if (!c->cmd) { //The command was not found
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || //Command parameter mismatch
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
   // Really Execute Commands
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else { //Really Execute Commands
        call(c,CMD_CALL_FULL); //Core Functions
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

Of these functions, the most important is the call to the lookupCommand() and call() functions.In redis, all commands are stored in a dictionary that looks like this:

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
    {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
    {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
    {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
    {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
    {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
    {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
    {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
    {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
    {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
    ...
};

We can see that this dictionary is a collection of all commands from which we call lookupCommand to get information about commands and commands.It is an array of structs containing all command names, command processing functions, number of parameters, and various tags.In fact, this is equivalent to the maintenance of configuration information and the mapping relationship of Command-line processing function names, which solves the difficult maintenance and poor scalability problems of distributing command-processing functions using if-else at first.
After we have successfully found a command handler in the dictionary, we just need to call the corresponding command handler.In the last call() function above, the corresponding command processing function is called and the result of the call is returned to the client.For example, setCommand() is the actual processing function of the set command:

void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

This function first judges and processes the NX and EX parameters, and finally calls setGenericCommand() to execute the general logic part of the set command:

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }

    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    setKey(c->db,key,val);
    server.dirty++;
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

The addReply() generic return function will be called eventually, and the result of execution should be returned to the client.Let's see what's done in this function:

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

We read this code carefully, as if we didn't find out when the execution results were returned to the client.In this function, simply add the returned result to the output buffer, and a command is executed.So when exactly did you return?Remember that when introducing Open Event Loop, the function beforesleep() was mentioned to execute before each event loop blocked waiting for file events, mainly performing some very time-consuming operations, such as expiration key deletion, return command replies to clients, and so on.In this way, network communication overhead when returning execution results can be reduced, multiple commands on the same client can be returned, multiple commands can be cached, and finally returned uniformly at one time, which reduces the number of returns and improves performance.

Client-to-server command transfer (response)

After executing the set key1 value1 command, we get a return of "OK" to represent the successful execution of the command.In fact, we look closely at the second packet returned above. In fact, the bottom layer is a return value of'+OK'.So why should we have a + sign?Because in addition to the set command we mentioned above, the get command, the lpush command, and so on, their return values are different.Get returns a data collection, lpush returns an integer representing the length of the list, and so on.A string representation is far from adequate.So there are five return value structures defined in the redis communication protocol.Clients use the first character of each return structure to determine what type of return value is:

  • Status Reply: The first character is'+'; for example, when the SET command is executed, it returns'+OK\r\n'to the client.
  • Error reply: The first character is'-'; for example, when a client requests a command that does not exist,'-ERR unknown command'testcmd'is returned to the client.
  • Integer reply: The first character is':'; for example, when the INCR command is executed, return': 100\r\n'to the client.
  • Bulk reply: The first character is'$'; for example, the GET command lookup key returns the result'$5\rnhello\r\n'to the client, where $5 represents the length of the return string.
  • Multiple batch replies: The first character is''; for example, the LRANGE command may return multiple values in the format'3\r\n$6\r\nvalue1\rn$6rnvalue2rn$6\r\nvalue3\r\n', the same format as the command request protocol,'\3'denotes the number of returned values,'$6' denotes the current return value string length, and multiple return valuesValues are separated by'\r\n'.

The first type of set command we execute is state reply.By using the + sign, the client knows this is a status reply and knows how to read the following byte stream.

summary

So far, we have completed the entire life cycle of the redis command, but also understand the format and specifications of the redis communication protocol.Next, I'll go into the implementation of each command, so go ahead.

Reference material

Keywords: C Redis ascii socket encoding

Added by luuno on Sun, 15 Sep 2019 10:15:30 +0300