redis source code reading 3 - finally understand the main task execution

stay Reading 2 of redis source code - finally understand the redis startup process The start-up process of redis is introduced, and a diagram is also drawn. Today, let me explain in detail how the main task of redis is implemented.

Let's look at the flow chart first

There are three main tasks in redis:

  • EventLoop - > beforesleep create a callback write event and bind the processor sendReplyToClient in handleclients with pending writes

  • aeProcessEvents implements the whole main process and main functions

  • Read fd from epoll and write the read data to server clients

  • Listen to the exposed ip and port (tcp socket). Listen for new requests through acceptTcpHandler and create fd

  • After listening to the ready event, parse and execute the command through readQueryFromClient

  • processTimeEvents, scheduled task execution, inside aeProcessEvents

/**
 * @brief Create a file event and add fd to the corresponding poll
 * Execute the corresponding processor after callback
 * @param eventLoop 
 * @param fd fd of the corresponding request
 * @param mask Event type
 * @param proc poll Processor executed after callback
 * @param clientData Binding with client (reference address)
 * @return int 
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // Insert the created aeFileEvent into the corresponding position of the corresponding event table EventLoop - > events [fd] (according to the principle of tcp, the same fd will not appear twice)
    aeFileEvent *fe = &eventLoop->events[fd];
    //Add it to the poll of the operating system. Each operating system has its own implementation. Finally, put the monitored corresponding fd event into EventLoop apidata
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    // Give the processor to rfileProc and wfileProc (this callback will be executed later)
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    //Reassign maxfd
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

Main process code analysis - aeProcessEvents

At AE C medium

/**
 * @brief Execute eventLoop
 * 
 * @param eventLoop 
 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
     //As long as it is not stopped, it will execute in a loop. This is the main thread
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            //Execute beforesleep before each cycle
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

Let's look at the corresponding source code one by one

Look at AE aeProcessEvents in C

/**
 * @brief Handle scheduled tasks, file events and expiration events waiting in eventLoop
 * @param eventLoop 
 * @param flags Event type,
 * All events from main, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP
 * File event AE from networking_ FILE_ EVENTS|AE_ DONT_ WAIT
 * @return int 
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    //Judgment for event type
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */

    /**
     * @brief This means:
     * eventLoop There is a listening tcp or (flags are time events and waiting to be processed)
     * Get the timer task to be executed next time. If yes, calculate the interval time,
     *  When waiting for a read-write event, the corresponding time is blocked at most
     *  Otherwise, it will be blocked until a task arrives (if the epoll itself has a timeout?)
     */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        //time
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
 
        // Calculate the travel value event tvp according to the shortest

        /**
         * @brief Get the number of numevents from epoll and put the task into fired. There is no event type specified here
         * If there is time, block the specified time,
         * There is no time until there is data. At this time, the scheduled task does not need to be executed (redis is minimalism. Guess the author's intention. You don't have a read-write request. What am I doing with the scheduled task)
         */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        //Execute aftersleep
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        /**
         * @brief Event triggered by execution
         */
        for (j = 0; j < numevents; j++) {
            /**
             * @brief Get the corresponding fd event from the registered event table and insert the logic into aeCreateFileEvent 
             */
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            //Reversal is mainly set for persistence. During persistence, set the barrier. See the persistence code later and explain TODO in detail
            int invert = fe->mask & AE_BARRIER;

            /**
             * @brief The AE_READABLE processor includes:
             * Processed by acceptTcpHandler and created in initServer
             * Processed by readQueryFromClient, in accepttcphandler acceptCommonHandler. Create in createclient
             * TODO Follow up analysis of other read events
             * 
             * The AE_WRITABLE processor includes:
             * Processed by sendReplyToClient, it is created in beforesleep - > handleclients with pending writes
             * TODO  Follow up analysis of other write events
             */
            if (!invert && fe->mask & mask & AE_READABLE) {
                //When rfileProc and wfileProc are created by aeCreateFileEvent, the incoming processor is bound to fd. Here fe - > clientdata is the client in aeCreateFileEvent
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            //Handle write events
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            // Reverse call,
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            //Processing times + 1
            processed++;
        }
    }
    /* Check time events */
    //Dealing with periodic tasks is mainly serverCron
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

Let's take a look at aeEventLoop first to help understand

/**
 * @brief Event manager, the whole process has only one
 */
typedef struct aeEventLoop {
    //Maximum fd
    int maxfd;   /* highest file descriptor currently registered */
    //Hold up to so many connections (maximum link + 128), the size of events and fired arrays
    int setsize; /* max number of file descriptors tracked */
     //Record the largest scheduled event id (put several into several), and the stored scheduled events will increase automatically
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    //For registered file event handlers, one fd is bound to another in initServer
    aeFileEvent *events; /* Registered events */
    //Triggered events (all events pulled from epoll will be dropped here in ae)
    aeFiredEvent *fired; /* Fired events */
    //Header node of timed event linked list
    aeTimeEvent *timeEventHead;
    //Event cycle end ID
    int stop;
    //epoll's data,
    void *apidata; /* This is used for polling API specific data */
    //aeProcessEvents execute before processing (once per loop)
    aeBeforeSleepProc *beforesleep;
    //Execute after aeApiPoll
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

Let's see how aeApiPoll gets data from epoll?

Before that, we have to understand how redis is selected. At AE C in the document

/**
 * @brief ae Several implementations of
 * redis Sort from top to bottom by performance
 * evport: Solaris support
 * epoll: Support linux
 * kqueue:  Support FreeBSD systems such as macos
 * select:  It doesn't contain anything, just select
 */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

Since my computer is a mac, I can directly locate the kqueue. Let's take a look at AE_ kqueue. aeApiPoll in C

/**
 * @brief Get the number of events per unit time
 * @param eventLoop 
 * @param tvp unit time 
 * @return int Returns the number of events to be processed
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    /**
     * @brief Get the ready event data, which is bound to fd, which has bound the corresponding processor
     * timeout If the pointer is null, kevent() will block permanently until the event occurs
     * timeout If there is a value, the maximum blocking time is so long
     */
    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        &timeout);
    } else {
       
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    }
    //Fill the event into EventLoop - > fired
    if (retval > 0) {
        int j;

        numevents = retval;
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;

            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            // The ident here is assigned when the event is created
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

processor

acceptTcpHandler

Source location: networking C medium

Registration time: initServer at startup

Processing content:

  • Listen to the open ip and ports, and create fd after receiving the request,

  • Create a file event according to fd and bind the callback function readQueryFromClient to fd to handle AE_READABLE event

  • Create a client and bind the readQueryFromClient to the fd. When the AE of the fd is heard_ Callback after readable event

Let's see how the acceptTcpHandler handles (key points)

/**
 * @brief tcp processor
 * @param el 
 * @param fd fd of current tcp
 * @param privdata Corresponding epoll data
 * @param mask 
 */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    /**
     * cport Current port
     * cfd Current fd
     * max Up to 1000 at a time
     */
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
    //Fetch tcp request
    while(max--) {
        /**
         * @brief Listen to the tcp socket, get a new fd, and then study TODO here
         * The new fd is an effective link
         */
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        //Handle cfd for new requests
        acceptCommonHandler(cfd,0,cip);
    }
}
/**
 * @brief Request (fd) processing for new supervisor
 *  It is mainly to create a file event so that readQueryFromClient can listen to AE_READABLE event
 *  And add the client to the server End of line for clients
 * @param fd in the light of
 * @param flags 
 * @param ip 
 */
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // Create a client according to the monitored request fd and throw it to the server Clients end of the queue. If there is a transaction, the business is initialized
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
     //If maxclients is exceeded, it will not be processed directly
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";
        //Number of rejected links + 1
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    //Number of links + 1
    server.stat_numconnections++;
    c->flags |= flags;
}

The key is createClient
/**
 * @brief Create client information based on fd (important)
 * 1,Set the new request to non blocking and no delay, and set KeepAlive to server tcpkeepalive
 * 2,Create a file event according to FD and bind the callback function readQueryFromClient to fd. When you hear the AE of FD_ Callback after readable event
 * 3,Create a client based on this fd and put the client into the server End of line for clients
 * @param fd tcp Corresponding fd
 * @return client* 
 */
client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        //Set tcp non blocking
        anetNonBlock(NULL,fd);
        //Set tcp no latency
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            //Set KeepAlive for tcp
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        //Register a file event, and the callback function readQueryFromClient handles the AE of the fd_ Readable event
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            //If the creation fails, close and release it
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;
    //Atom get client_id
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    //The new request is bound to the client, that is, the fd of the corresponding request
    c->fd = fd;
    ......
    //After parsing tcp, the flags are 0
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    ......
    //Put the newly produced client into the server Clients tail
    if (fd != -1) linkClient(c);
    //Initialize transaction
    initClientMultiState(c);
    return c;
}


The corresponding flow chart is as follows:

Execute the readQueryFromClient command

Source location: networking C medium

Registration time: after listening to the request in acceptTcpHandler, get fd and bind readQueryFromClient

Processing content:

  • Read the information from the tcp buffer and parse it into the client

  • Find the command and execute it in server processCommand in C

Trigger code
 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
 
 /**
 * @brief Read client information
 * @param el 
 * @param fd fd corresponding to the request
 * @param privdata client client
 * @param mask 
 */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    //Process input stream
    processInputBufferAndReplicate(c);
}


void processInputBufferAndReplicate(client *c) {
    //master node
    if (!(c->flags & CLIENT_MASTER)) {
        //Process input buffer (mainly see here)
        processInputBuffer(c);
    } else {
        //Cluster synchronous replication
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        //Only data can be synchronized
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}


/**
 * @brief Process input
 * 
 * @param c 
 */
void processInputBuffer(client *c) {
    /**
     * @brief Read data from buffer according to different types
     */
    if (c->reqtype == PROTO_REQ_INLINE) {
        //Read data from the buffer according to different types, parse the client execution content into robj and access C - > argv
        if (processInlineBuffer(c) != C_OK) break;
    } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
        if (processMultibulkBuffer(c) != C_OK) break;
    } else {
        serverPanic("Unknown request type");
    }
    if (processCommand(c) == C_OK) {
        
    }
    //Set to null, the next client can continue
    server.current_client = NULL;
}


int processCommand(client *c) {
    //From server Query the mapping of command execution commands in the commands dictionary, and C - > argv [0] is the command name
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    //If the command cannot be found, the command is unknown, and the parameters are wrong
    //Other logical judgments
    /**
     * @brief Execute command
     * When the transaction (CLIENT_MULTI) is started, it is directly put into the Multi queue
     * 
     */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        //Execute command callback
        call(c,CMD_CALL_FULL);
        //
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
}

//Command execution
void call(client *c, int flags) {
    //When the command is executed, the corresponding redisCommand will be executed
    c->cmd->proc(c);
}

Look at the code below, proc The second parameter is the specific method of the corresponding command

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    ......
}

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags; /* Flags as string representation, one char per flag. */
    int flags;    /* The actual flags, obtained from the 'sflags' field. */
    redisGetKeysProc *getkeys_proc;
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
};

Scheduled task serverCron

Source location: server C medium

Registration time: initServer()

/**
 * @brief Time event execution
 * @param eventLoop fd
 * @param id  fd
 * @param clientData 
 * @return int 
 */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    /**
     * @brief Processing server Tasks in clients
     * Here, the tail node is reversed to the head node, because the tail interpolation method adopted by clients. If the last one expires, the previous request response clients have expired
     * 1,Processing overdue tasks
     * 2,Reclaim query buffer
     * 3,Leave it alone, TODO
     */
    clientsCron();
    /**
     * 1,Handle expired key s, send expired events and delete them
     * 2,Memory defragmentation
     * 3,rehash
     */
    databasesCron();
    //rdb operation
    rdbSaveBackground(server.rdb_filename,rsiptr);
    //aof
    rewriteAppendOnlyFileBackground();
    //Release the client linked list that needs to be released asynchronously
    freeClientsInAsyncFreeQueue();
    //Other sampling
}

Writeback event

The receiving task is completed. Let's look at another write back event. Take the get command

server.c in
 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
 
 
t_string.c in

void getCommand(client *c) {
    getGenericCommand(c);
} 

int getGenericCommand(client *c) {
    robj *o;
    //Return directly if it is empty
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;

    //Not empty,
    if (o->type != OBJ_STRING) {
        //Add exception reply
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        //Add to response queue
        addReplyBulk(c,o);
        return C_OK;
    }
}

It all ended up here
networking.c

/**
 * @brief Add reply
 * @param c 
 * @param obj robj
 */
void addReply(client *c, robj *obj) {
    //Determine and write the client to be written back to the server clients_ pending_ write
    if (prepareClientToWrite(c) != C_OK) return;
    //If the reply content is sds code
    if (sdsEncodedObject(obj)) {
        /**
         * @brief Try it first_ addReplyToBuffer writes to the buffer. If the write fails, try again_ addReplyStringToList
         */
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        //The content of the reply is that the code is int (really save space)
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

int prepareClientToWrite(client *c) {
    /**
     * If C - > bufpos and C - > reply, it indicates that the client has been put into the write waiting queue server clients_ pending_ write
     */
    if (!clientHasPendingReplies(c)) 
       // Write the client to the server by header insertion clients_ pending_ write
       clientInstallWriteHandler(c);
}

/**
 * @brief Write the response contents into the C - > buf array. The length of buf is limited, only 16kb
 * @param c 
 * @param s 
 * @param len 
 * @return int
 */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    //If there is a value in C - > reply, it doesn't matter
    if (listLength(c->reply) > 0) return C_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return C_ERR;
    //Copy s to C - > buf
    memcpy(c->buf+c->bufpos,s,len);
    //Indicates the size of the output buffer
    c->bufpos+=len;
    return C_OK;
}

/**
 * @brief Input the response content into the C - > reply linked list
 * 
 * @param c 
 * @param s 
 * @param len 
 */
void _addReplyStringToList(client *c, const char *s, size_t len) {
    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;

    listNode *ln = listLast(c->reply);
    //Get tail
    clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
    /**
     * @brief reply Once you have the data, write it later,
     * If there is not enough space left in the clientReplyBlock, create another one
     * clientReplyBlock At least 16k. If the response string is small, one cannot be filled
     */
    if (tail) {
        size_t avail = tail->size - tail->used;
        size_t copy = avail >= len? len: avail;
        memcpy(tail->buf + tail->used, s, copy);
        tail->used += copy;
        s += copy;
        len -= copy;
    }
    //len has data, either adding to the existing data or remaining data, or because tail is null
    if (len) {
        size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
        tail = zmalloc(size + sizeof(clientReplyBlock));
        /* take over the allocation's internal fragmentation */
        tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
        tail->used = len;
        memcpy(tail->buf, s, len);
        listAddNodeTail(c->reply, tail);
        c->reply_bytes += tail->size;
    }
    //Join the client to the server clients_ to_ close
    asyncCloseClientOnOutputBufferLimitReached(c);
}


The code is very clear

  • All responses go to networking addReply in C

  • If there is response data to the client, write the client directly to the server clients_ pending_ write

  • addReply is input to different response buffers according to different codes

Judging from the above processes, redis5 is single threaded

Writeback client

Until now, only the client is written to the server clients_ pending_ write

The data written back is written to C - > buf and C - > reply.

But I didn't write the client. Don't worry. Remember the beforeSleep in aeMain?

First look at the server beforeSleep in C

/**
 * @brief Execute before loop processing
 * 
 * @param eventLoop 
 */
void beforeSleep(struct aeEventLoop *eventLoop) {
        //Activate fast cycle
    if (server.active_expire_enabled && server.masterhost == NULL)
        //Execute fast loop
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
    ......
     //Write aof buffer to disk
    flushAppendOnlyFile(0);

    /* Handle writes with pending output buffers. */
    //Processing the waiting writeback queue (see last)
    handleClientsWithPendingWrites();
}

stay networking.c in

int handleClientsWithPendingWrites(void) {
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
    }
}

int writeToClient(int fd, client *c, int handler_installed) {
      while(clientHasPendingReplies(c)) {
        //C - > buf or C - > reply have data. Choose one from the other. The following is the processing.
        if (c->bufpos > 0) {
            //Directly call the underlying write to write the data to fd and respond to the client
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);

        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = o->used;

            if (objlen == 0) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }
             //Directly call the underlying write to write the data to fd and respond to the client
            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);

        }
}

So far, the whole redis has formed a closed loop by listening, receiving requests, executing commands and writing back data

Series articles

redis source code reading - Introduction

Reading 2 of redis source code - finally understand the redis startup process

Keywords: Database Redis Cache

Added by psychotomus on Tue, 14 Dec 2021 03:37:44 +0200