Redis Source Analysis: AOF

Links to the original text: https://my.oschina.net/zipu888/blog/549702
redis 2.4.4

AOF(append-only fashion) is one of the persistence tools of redis. log is written to meet the need to reconstruct data when needed.
the AOF persistence logs every write operation received by the server, that will be played again at server startup, reconstructing the original dataset. Commands are logged using the same format as the Redis protocol itself, in an append-only fashion. Redis is able to rewrite the log on background when it gets too big.
Redis logs include:
1. Write AOF log at different granularity according to user configuration
2. Reconstruct AOF log according to configuration or command sent outside when log data volume reaches a certain amount.

Aof related configuration (redis.conf):
appendonly yes
If AOF persistence is turned on, yes is turned on, no is not turned on

appendfilename appendonly.aof
Specify the AOF log file name, default name: appendonly.aof

appendfsync everysec
When to write data to disk, redis provides three modes:
no: Without fsync, OS determines the time granularity of data brush disk, which has high performance.
always: Do fsync every time you write, safe
everysec: At least 1s after the last fsync, compromise

no-appendfsync-on-rewrite no
When Aof log is rewritten, fsync when log is written or not. If the system encounters latency problems, it is recommended to set yes (rewrite does not force fsync)

auto-aof-rewrite-percentage 100
When the growth of Aof log exceeds the specified proportion, rewrite log file, set to 0 to indicate that Aof log is not automatically rewritten

auto-aof-rewrite-min-size 64mb
When you start rewriting Aof log, the minimum size of Aof log

Write Aof log when updating data:

Execute a client command process

/* Call() is the core of Redis execution of a command */
void call(redisClient *c) {
    long long dirty, start = ustime(), duration;

    dirty = server.dirty;
    c->cmd->proc(c);    //Command execution
    dirty = server.dirty-dirty;   //Calculate the dirty value, the update operation will change the dirty value
    duration = ustime()-start;
    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
    if (server.appendonly && dirty > 0) //AOF is started and the data is updated
        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc); 
    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}


void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {

    if (dictid != server.appendseldb) { //If the current database is inconsistent with the previous one, write a command to change the database.
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.appendseldb = dictid;
    }

     .....

    server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); //Put data in atobuf

    if (server.bgrewritechildpid != -1)  //If the child process is doing Aof log rewrite, the data is put into the buffer bgrewritebuf at the same time
        server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));

    sdsfree(buf);
}


Feed AppendOnlyFile does not actually write Aof log, which happens before returning the user's request to the flush AppendOnlyFile function.
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;

    if (sdslen(server.aofbuf) == 0) return;

    if (server.appendfsync == APPENDFSYNC_EVERYSEC)   //When append fsync is set to everysec, check whether there is data waiting for fsync
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

    //If appendfsync is set to everysec, if the main thread waits for fsync for no more than 2 seconds during blocking, it returns (the data is cached in aofbuf)
    if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                return;
            }
            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    server.aof_flush_postponed_start = 0;
   
    //Write Aof log
    nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
    if (nwritten != (signed)sdslen(server.aofbuf)) {
        if (nwritten == -1) {
            redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
        } else {
            redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
        }
        exit(1);
    }
    server.appendonly_current_size += nwritten; //Record log file size

    //Empty aofbuf, reuse if aofbuf is small
    if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) {
        sdsclear(server.aofbuf);
    } else {
        sdsfree(server.aofbuf);
        server.aofbuf = sdsempty();
    }

    //If no-appendfsync-on-rewrite is set to yes and a child process is currently rewrite, it returns directly
    if (server.no_appendfsync_on_rewrite &&
        (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
            return;

    /* Perform the fsync if needed. */
    if (server.appendfsync == APPENDFSYNC_ALWAYS) {
        /* aof_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
        server.lastfsync = server.unixtime;
    } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
                server.unixtime > server.lastfsync)) {
        if (!sync_in_progress) aof_background_fsync(server.appendfd);
        server.lastfsync = server.unixtime;
    }
}


void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_condvar[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}
aboutappendfsyncSet to everysec The situation, fsync It is done by a separate threadrewrite Aof log: In two cases, redis Would be right aof log do rewrite1.  Configuration automation rewrite Threshold occurrence 2.  Client Send bgrewriteaof command

Receive bgrewriteaof Command Execution Function:

void bgrewriteaofCommand(redisClient *c) {
    if (server.bgrewritechildpid != -1) {
        addReplyError(c,"Background append only file rewriting already in progress");
    } else if (server.bgsavechildpid != -1) {
        server.aofrewrite_scheduled = 1;
        addReplyStatus(c,"Background append only file rewriting scheduled");
    } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
        addReplyStatus(c,"Background append only file rewriting started");
    } else {
        addReply(c,shared.err);
    }
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
   	......
   	
    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
        server.aofrewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    //Wait for background rewrite child process to finish and post processing.
    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            if (pid == server.bgsavechildpid) {
                backgroundSaveDoneHandler(statloc);
            } else {
                backgroundRewriteDoneHandler(statloc);
            }
            updateDictResizePolicy();
        }
    } else {
         time_t now = time(NULL);

        ......

         //check whether the configuration condition for rewriting aof log appears
         if (server.bgsavechildpid == -1 &&
             server.bgrewritechildpid == -1 &&
             server.auto_aofrewrite_perc &&
             server.appendonly_current_size > server.auto_aofrewrite_min_size)
         {
            long long base = server.auto_aofrewrite_base_size ?
                            server.auto_aofrewrite_base_size : 1;
            long long growth = (server.appendonly_current_size*100/base) - 100;
            if (growth >= server.auto_aofrewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }
	......
}
Perform rewriting:
int rewriteAppendOnlyFileBackground(void) {
    ......
    if ((childpid = fork()) == 0) { //Create subprocesses, by subprocesses
        char tmpfile[256];

        //The following is subprocess execution
        if (server.vm_enabled) vmReopenSwapFile();
        if (server.ipfd > 0) close(server.ipfd);
        if (server.sofd > 0) close(server.sofd);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
            _exit(0);
        } else {
            _exit(1);
        }
    } else {
        //The following parent process executes
        ......
        server.bgrewritechildpid = childpid;
        ......
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}


Subprocess rewriting uses copy on write to write the data status seen by the current subprocess to the log:
int rewriteAppendOnlyFile(char *filename) {
    ......
    //Create temporary files
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
        return REDIS_ERR;
    }
    
    //Traverse all databases
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        //Write Select Database Command
        if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
        if (fwriteBulkLongLong(fp,j) == 0) goto werr;

        //Write all elements of the database
        while((de = dictNext(di)) != NULL) {
            .....
        }
    }

    //Data writing to disk
    fflush(fp);
    aof_fsync(fileno(fp));
    fclose(fp);
    ......








   

                                                    

Reproduced in: https://my.oschina.net/zipu888/blog/549702

Keywords: Redis Database Linux

Added by mwkemo on Sun, 06 Oct 2019 23:43:42 +0300