stay Reading 2 of redis source code - finally understand the redis startup process The start-up process of redis is introduced, and a diagram is also drawn. Today, let me explain in detail how the main task of redis is implemented.
Let's look at the flow chart first
There are three main tasks in redis:
-
EventLoop - > beforesleep create a callback write event and bind the processor sendReplyToClient in handleclients with pending writes
-
aeProcessEvents implements the whole main process and main functions
-
Read fd from epoll and write the read data to server clients
-
Listen to the exposed ip and port (tcp socket). Listen for new requests through acceptTcpHandler and create fd
-
After listening to the ready event, parse and execute the command through readQueryFromClient
-
processTimeEvents, scheduled task execution, inside aeProcessEvents
/** * @brief Create a file event and add fd to the corresponding poll * Execute the corresponding processor after callback * @param eventLoop * @param fd fd of the corresponding request * @param mask Event type * @param proc poll Processor executed after callback * @param clientData Binding with client (reference address) * @return int */ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } // Insert the created aeFileEvent into the corresponding position of the corresponding event table EventLoop - > events [fd] (according to the principle of tcp, the same fd will not appear twice) aeFileEvent *fe = &eventLoop->events[fd]; //Add it to the poll of the operating system. Each operating system has its own implementation. Finally, put the monitored corresponding fd event into EventLoop apidata if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; // Give the processor to rfileProc and wfileProc (this callback will be executed later) if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; //Reassign maxfd if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
Main process code analysis - aeProcessEvents
At AE C medium
/** * @brief Execute eventLoop * * @param eventLoop */ void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; //As long as it is not stopped, it will execute in a loop. This is the main thread while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) //Execute beforesleep before each cycle eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
Let's look at the corresponding source code one by one
Look at AE aeProcessEvents in C
/** * @brief Handle scheduled tasks, file events and expiration events waiting in eventLoop * @param eventLoop * @param flags Event type, * All events from main, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP * File event AE from networking_ FILE_ EVENTS|AE_ DONT_ WAIT * @return int */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ //Judgment for event type if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ /** * @brief This means: * eventLoop There is a listening tcp or (flags are time events and waiting to be processed) * Get the timer task to be executed next time. If yes, calculate the interval time, * When waiting for a read-write event, the corresponding time is blocked at most * Otherwise, it will be blocked until a task arrives (if the epoll itself has a timeout?) */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; //time struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); // Calculate the travel value event tvp according to the shortest /** * @brief Get the number of numevents from epoll and put the task into fired. There is no event type specified here * If there is time, block the specified time, * There is no time until there is data. At this time, the scheduled task does not need to be executed (redis is minimalism. Guess the author's intention. You don't have a read-write request. What am I doing with the scheduled task) */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ //Execute aftersleep if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); /** * @brief Event triggered by execution */ for (j = 0; j < numevents; j++) { /** * @brief Get the corresponding fd event from the registered event table and insert the logic into aeCreateFileEvent */ aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ //Reversal is mainly set for persistence. During persistence, set the barrier. See the persistence code later and explain TODO in detail int invert = fe->mask & AE_BARRIER; /** * @brief The AE_READABLE processor includes: * Processed by acceptTcpHandler and created in initServer * Processed by readQueryFromClient, in accepttcphandler acceptCommonHandler. Create in createclient * TODO Follow up analysis of other read events * * The AE_WRITABLE processor includes: * Processed by sendReplyToClient, it is created in beforesleep - > handleclients with pending writes * TODO Follow up analysis of other write events */ if (!invert && fe->mask & mask & AE_READABLE) { //When rfileProc and wfileProc are created by aeCreateFileEvent, the incoming processor is bound to fd. Here fe - > clientdata is the client in aeCreateFileEvent fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ //Handle write events if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } // Reverse call, if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } //Processing times + 1 processed++; } } /* Check time events */ //Dealing with periodic tasks is mainly serverCron if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
Let's take a look at aeEventLoop first to help understand
/** * @brief Event manager, the whole process has only one */ typedef struct aeEventLoop { //Maximum fd int maxfd; /* highest file descriptor currently registered */ //Hold up to so many connections (maximum link + 128), the size of events and fired arrays int setsize; /* max number of file descriptors tracked */ //Record the largest scheduled event id (put several into several), and the stored scheduled events will increase automatically long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ //For registered file event handlers, one fd is bound to another in initServer aeFileEvent *events; /* Registered events */ //Triggered events (all events pulled from epoll will be dropped here in ae) aeFiredEvent *fired; /* Fired events */ //Header node of timed event linked list aeTimeEvent *timeEventHead; //Event cycle end ID int stop; //epoll's data, void *apidata; /* This is used for polling API specific data */ //aeProcessEvents execute before processing (once per loop) aeBeforeSleepProc *beforesleep; //Execute after aeApiPoll aeBeforeSleepProc *aftersleep; } aeEventLoop;
Let's see how aeApiPoll gets data from epoll?
Before that, we have to understand how redis is selected. At AE C in the document
/** * @brief ae Several implementations of * redis Sort from top to bottom by performance * evport: Solaris support * epoll: Support linux * kqueue: Support FreeBSD systems such as macos * select: It doesn't contain anything, just select */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
Since my computer is a mac, I can directly locate the kqueue. Let's take a look at AE_ kqueue. aeApiPoll in C
/** * @brief Get the number of events per unit time * @param eventLoop * @param tvp unit time * @return int Returns the number of events to be processed */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; /** * @brief Get the ready event data, which is bound to fd, which has bound the corresponding processor * timeout If the pointer is null, kevent() will block permanently until the event occurs * timeout If there is a value, the maximum blocking time is so long */ if (tvp != NULL) { struct timespec timeout; timeout.tv_sec = tvp->tv_sec; timeout.tv_nsec = tvp->tv_usec * 1000; retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout); } else { retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL); } //Fill the event into EventLoop - > fired if (retval > 0) { int j; numevents = retval; for(j = 0; j < numevents; j++) { int mask = 0; struct kevent *e = state->events+j; if (e->filter == EVFILT_READ) mask |= AE_READABLE; if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE; // The ident here is assigned when the event is created eventLoop->fired[j].fd = e->ident; eventLoop->fired[j].mask = mask; } } return numevents; }
processor
acceptTcpHandler
Source location: networking C medium
Registration time: initServer at startup
Processing content:
-
Listen to the open ip and ports, and create fd after receiving the request,
-
Create a file event according to fd and bind the callback function readQueryFromClient to fd to handle AE_READABLE event
-
Create a client and bind the readQueryFromClient to the fd. When the AE of the fd is heard_ Callback after readable event
Let's see how the acceptTcpHandler handles (key points)
/** * @brief tcp processor * @param el * @param fd fd of current tcp * @param privdata Corresponding epoll data * @param mask */ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /** * cport Current port * cfd Current fd * max Up to 1000 at a time */ int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); //Fetch tcp request while(max--) { /** * @brief Listen to the tcp socket, get a new fd, and then study TODO here * The new fd is an effective link */ cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); //Handle cfd for new requests acceptCommonHandler(cfd,0,cip); } } /** * @brief Request (fd) processing for new supervisor * It is mainly to create a file event so that readQueryFromClient can listen to AE_READABLE event * And add the client to the server End of line for clients * @param fd in the light of * @param flags * @param ip */ static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; // Create a client according to the monitored request fd and throw it to the server Clients end of the queue. If there is a transaction, the business is initialized if ((c = createClient(fd)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } //If maxclients is exceeded, it will not be processed directly if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; //Number of rejected links + 1 server.stat_rejected_conn++; freeClient(c); return; } //Number of links + 1 server.stat_numconnections++; c->flags |= flags; } The key is createClient /** * @brief Create client information based on fd (important) * 1,Set the new request to non blocking and no delay, and set KeepAlive to server tcpkeepalive * 2,Create a file event according to FD and bind the callback function readQueryFromClient to fd. When you hear the AE of FD_ Callback after readable event * 3,Create a client based on this fd and put the client into the server End of line for clients * @param fd tcp Corresponding fd * @return client* */ client *createClient(int fd) { client *c = zmalloc(sizeof(client)); if (fd != -1) { //Set tcp non blocking anetNonBlock(NULL,fd); //Set tcp no latency anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) //Set KeepAlive for tcp anetKeepAlive(NULL,fd,server.tcpkeepalive); //Register a file event, and the callback function readQueryFromClient handles the AE of the fd_ Readable event if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { //If the creation fails, close and release it close(fd); zfree(c); return NULL; } } selectDb(c,0); uint64_t client_id; //Atom get client_id atomicGetIncr(server.next_client_id,client_id,1); c->id = client_id; //The new request is bound to the client, that is, the fd of the corresponding request c->fd = fd; ...... //After parsing tcp, the flags are 0 c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; ...... //Put the newly produced client into the server Clients tail if (fd != -1) linkClient(c); //Initialize transaction initClientMultiState(c); return c; }
The corresponding flow chart is as follows:
Execute the readQueryFromClient command
Source location: networking C medium
Registration time: after listening to the request in acceptTcpHandler, get fd and bind readQueryFromClient
Processing content:
-
Read the information from the tcp buffer and parse it into the client
-
Find the command and execute it in server processCommand in C
Trigger code fe->rfileProc(eventLoop,fd,fe->clientData,mask); /** * @brief Read client information * @param el * @param fd fd corresponding to the request * @param privdata client client * @param mask */ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { //Process input stream processInputBufferAndReplicate(c); } void processInputBufferAndReplicate(client *c) { //master node if (!(c->flags & CLIENT_MASTER)) { //Process input buffer (mainly see here) processInputBuffer(c); } else { //Cluster synchronous replication size_t prev_offset = c->reploff; processInputBuffer(c); //Only data can be synchronized size_t applied = c->reploff - prev_offset; if (applied) { replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied); sdsrange(c->pending_querybuf,applied,-1); } } } /** * @brief Process input * * @param c */ void processInputBuffer(client *c) { /** * @brief Read data from buffer according to different types */ if (c->reqtype == PROTO_REQ_INLINE) { //Read data from the buffer according to different types, parse the client execution content into robj and access C - > argv if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); } if (processCommand(c) == C_OK) { } //Set to null, the next client can continue server.current_client = NULL; } int processCommand(client *c) { //From server Query the mapping of command execution commands in the commands dictionary, and C - > argv [0] is the command name c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); //If the command cannot be found, the command is unknown, and the parameters are wrong //Other logical judgments /** * @brief Execute command * When the transaction (CLIENT_MULTI) is started, it is directly put into the Multi queue * */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { //Execute command callback call(c,CMD_CALL_FULL); // c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } } //Command execution void call(client *c, int flags) { //When the command is executed, the corresponding redisCommand will be executed c->cmd->proc(c); } Look at the code below, proc The second parameter is the specific method of the corresponding command struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0}, {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0}, ...... } struct redisCommand { char *name; redisCommandProc *proc; int arity; char *sflags; /* Flags as string representation, one char per flag. */ int flags; /* The actual flags, obtained from the 'sflags' field. */ redisGetKeysProc *getkeys_proc; int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls; };
Scheduled task serverCron
Source location: server C medium
Registration time: initServer()
/** * @brief Time event execution * @param eventLoop fd * @param id fd * @param clientData * @return int */ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /** * @brief Processing server Tasks in clients * Here, the tail node is reversed to the head node, because the tail interpolation method adopted by clients. If the last one expires, the previous request response clients have expired * 1,Processing overdue tasks * 2,Reclaim query buffer * 3,Leave it alone, TODO */ clientsCron(); /** * 1,Handle expired key s, send expired events and delete them * 2,Memory defragmentation * 3,rehash */ databasesCron(); //rdb operation rdbSaveBackground(server.rdb_filename,rsiptr); //aof rewriteAppendOnlyFileBackground(); //Release the client linked list that needs to be released asynchronously freeClientsInAsyncFreeQueue(); //Other sampling }
Writeback event
The receiving task is completed. Let's look at another write back event. Take the get command
server.c in {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, t_string.c in void getCommand(client *c) { getGenericCommand(c); } int getGenericCommand(client *c) { robj *o; //Return directly if it is empty if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return C_OK; //Not empty, if (o->type != OBJ_STRING) { //Add exception reply addReply(c,shared.wrongtypeerr); return C_ERR; } else { //Add to response queue addReplyBulk(c,o); return C_OK; } } It all ended up here networking.c /** * @brief Add reply * @param c * @param obj robj */ void addReply(client *c, robj *obj) { //Determine and write the client to be written back to the server clients_ pending_ write if (prepareClientToWrite(c) != C_OK) return; //If the reply content is sds code if (sdsEncodedObject(obj)) { /** * @brief Try it first_ addReplyToBuffer writes to the buffer. If the write fails, try again_ addReplyStringToList */ if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { //The content of the reply is that the code is int (really save space) char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyStringToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } } int prepareClientToWrite(client *c) { /** * If C - > bufpos and C - > reply, it indicates that the client has been put into the write waiting queue server clients_ pending_ write */ if (!clientHasPendingReplies(c)) // Write the client to the server by header insertion clients_ pending_ write clientInstallWriteHandler(c); } /** * @brief Write the response contents into the C - > buf array. The length of buf is limited, only 16kb * @param c * @param s * @param len * @return int */ int _addReplyToBuffer(client *c, const char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ //If there is a value in C - > reply, it doesn't matter if (listLength(c->reply) > 0) return C_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return C_ERR; //Copy s to C - > buf memcpy(c->buf+c->bufpos,s,len); //Indicates the size of the output buffer c->bufpos+=len; return C_OK; } /** * @brief Input the response content into the C - > reply linked list * * @param c * @param s * @param len */ void _addReplyStringToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; listNode *ln = listLast(c->reply); //Get tail clientReplyBlock *tail = ln? listNodeValue(ln): NULL; /** * @brief reply Once you have the data, write it later, * If there is not enough space left in the clientReplyBlock, create another one * clientReplyBlock At least 16k. If the response string is small, one cannot be filled */ if (tail) { size_t avail = tail->size - tail->used; size_t copy = avail >= len? len: avail; memcpy(tail->buf + tail->used, s, copy); tail->used += copy; s += copy; len -= copy; } //len has data, either adding to the existing data or remaining data, or because tail is null if (len) { size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; tail = zmalloc(size + sizeof(clientReplyBlock)); /* take over the allocation's internal fragmentation */ tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); tail->used = len; memcpy(tail->buf, s, len); listAddNodeTail(c->reply, tail); c->reply_bytes += tail->size; } //Join the client to the server clients_ to_ close asyncCloseClientOnOutputBufferLimitReached(c); }
The code is very clear
-
All responses go to networking addReply in C
-
If there is response data to the client, write the client directly to the server clients_ pending_ write
-
addReply is input to different response buffers according to different codes
Judging from the above processes, redis5 is single threaded
Writeback client
Until now, only the client is written to the server clients_ pending_ write
The data written back is written to C - > buf and C - > reply.
But I didn't write the client. Don't worry. Remember the beforeSleep in aeMain?
First look at the server beforeSleep in C
/** * @brief Execute before loop processing * * @param eventLoop */ void beforeSleep(struct aeEventLoop *eventLoop) { //Activate fast cycle if (server.active_expire_enabled && server.masterhost == NULL) //Execute fast loop activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); ...... //Write aof buffer to disk flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ //Processing the waiting writeback queue (see last) handleClientsWithPendingWrites(); } stay networking.c in int handleClientsWithPendingWrites(void) { int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { if (writeToClient(c->fd,c,0) == C_ERR) continue; } } int writeToClient(int fd, client *c, int handler_installed) { while(clientHasPendingReplies(c)) { //C - > buf or C - > reply have data. Choose one from the other. The following is the processing. if (c->bufpos > 0) { //Directly call the underlying write to write the data to fd and respond to the client nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); } else { o = listNodeValue(listFirst(c->reply)); objlen = o->used; if (objlen == 0) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } //Directly call the underlying write to write the data to fd and respond to the client nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); } }
So far, the whole redis has formed a closed loop by listening, receiving requests, executing commands and writing back data
Series articles
redis source code reading - Introduction
Reading 2 of redis source code - finally understand the redis startup process