Redis: del/unlink command source code analysis

Redis (4): del/unlink command source code analysis

Let's take a look at the deletion process in this article.

For the client, the delete operation does not need to distinguish which data type, just del operation.

Zero, delete definition of del command

There are mainly two: del/unlink. The difference is that unlink is faster because it uses asynchronous deletion optimization mode, which is defined as follows:

// There is only one w in the logo, which means that it is a normal write operation. There is nothing to say
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}
// If it is marked as wF, it means that it is a fast write operation. In fact, there is an asynchronous optimization process, which will be explained later
{"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}

1, delCommand

The function of delCommand is to delete the data of a key directly and release the memory.

//db.c, del command processing
void delCommand(client *c) {

// Synchronous deletion
delGenericCommand(c,0);

}

/ This command implements DEL and LAZYDEL. /
void delGenericCommand(client *c, int lazy) {

int numdel = 0, j;

for (j = 1; j < c->argc; j++) {
    // Automatic expiration data cleanup
    expireIfNeeded(c->db,c->argv[j]);
    // The main difference between synchronous deletion and asynchronous deletion lies in the deletion of complex data types, such as hash,list,set
    // The deletion for string is exactly the same
    int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                          dbSyncDelete(c->db,c->argv[j]);
    // On the propagation of command writing
    if (deleted) {
        signalModifiedKey(c->db,c->argv[j]);
        notifyKeyspaceEvent(NOTIFY_GENERIC,
            "del",c->argv[j],c->db->id);
        server.dirty++;
        numdel++;
    }
}
// Response delete data volume, granularity to key level
addReplyLongLong(c,numdel);

}

The framework code is obvious at a glance, but there are many things compared with our ordinary deletion. Otherwise there would be no design.

2, unlinkCommand

As follows, it's the same as del. It's just a change of lazy logo.

//db.c, unlink delete processing
void unlinkCommand(client *c) {

// It's the same as del, but the lazy logo is different
delGenericCommand(c,1);

}

3, Delete data process details

There are two ways to delete data: synchronous and asynchronous. The reason is the same. One is background deletion and the other is foreground deletion. Let's take a look at them separately.

  1. Synchronous delete dbSyncDelete

Synchronous deletion is very simple. As long as the corresponding key is deleted, val can be deleted. If there is inner layer reference, recursive deletion can be done.

//db.c, delete data synchronously
/ Delete a key, value, and associated expiration entry if any, from the DB /
int dbSyncDelete(redisDb db, robj key) {

/* Deleting an entry from the expires dict will not free the sds of
 * the key, because it is shared with the main dictionary. */
// Remove from expires queue first, then DB - > Dict
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
    if (server.cluster_enabled) slotToKeyDel(key);
    return 1;
} else {
    return 0;
}

}
//dict.c, as above, just dictDelete(), so the real delete action is implemented in dict.
int dictDelete(dict ht, const void key) {

// nofree: 0, which requires free memory
return dictGenericDelete(ht,key,0);

}
//dict.c, nofree: 0: to release the corresponding val memory, 1: do not release the corresponding val memory, only delete the key
/ Search and remove an element /
static int dictGenericDelete(dict d, const void key, int nofree)
{

unsigned int h, idx;
dictEntry *he, *prevHe;
int table;

if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// ht[0] and ht[1] scan if possible
for (table = 0; table <= 1; table++) {
    idx = h & d->ht[table].sizemask;
    he = d->ht[table].table[idx];
    prevHe = NULL;
    while(he) {
        if (dictCompareKeys(d, key, he->key)) {
            /* Unlink the element from the list */
            if (prevHe)
                prevHe->next = he->next;
            else
                d->ht[table].table[idx] = he->next;
            // No no free, just free memory
            if (!nofree) {
                // It seems that key/value needs to release memory separately
                dictFreeKey(d, he);
                dictFreeVal(d, he);
            }
            zfree(he);
            d->ht[table].used--;
            return DICT_OK;
        }
        prevHe = he;
        he = he->next;
    }
    // If you don't rehashing, just scan 0
    if (!dictIsRehashing(d)) break;
}
return DICT_ERR; /* not found */

}

In fact, for languages with GC collectors, they don't need to pay attention to memory release at all. They have their own background tools to deal with it. However, for languages with the level of c, they need to pay attention to memory by themselves. This is also the significance of this article. Otherwise, is it difficult to delete elements of a hash table? Did not.

Next, let's see how redis specifically releases memory.

//dict.h, the logic of releasing key and value is also very simple, which can be defined with a macro
//Release depends on keyDestructor, valDestructor

define dictFreeKey(d, entry) \

if ((d)->type->keyDestructor) \
    (d)->type->keyDestructor((d)->privdata, (entry)->key)

define dictFreeVal(d, entry) \

if ((d)->type->valDestructor) \
    (d)->type->valDestructor((d)->privdata, (entry)->v.val)

//So, it's necessary for us to go back to the key value analysis method
//This, in turn, depends on the specific data type, that is, the data type you use in setXXX
//Let's take a look at the initialization of this key destructor, Val destructor
//The definition of the destructor of server. C kV
/ Db->dict, keys are sds strings, vals are Redis objects. /
dictType dbDictType = {

dictSdsHash,                /* hash function */
NULL,                       /* key dup */
NULL,                       /* val dup */
dictSdsKeyCompare,          /* key compare */
dictSdsDestructor,          /* key destructor */
dictObjectDestructor   /* val destructor */

};

//1. First look at key destructor, key release
//server.c, directly call the services provided by sds
void dictSdsDestructor(void privdata, void val)
{

DICT_NOTUSED(privdata);
// The sds can release the key directly
sdsfree(val);

}
//sds.c, really free value memory
/ Free an sds string. No operation is performed if 's' is NULL. /
void sdsfree(sds s) {

if (s == NULL) return;
// zfree, it is very simple indeed, because sds is a continuous memory space, which can be deleted directly by using the method provided by the system
s_free((char*)s-sdsHdrSize(s[-1]));

}

//2. The value destructor releases the value. If the key must be in string format, the value may not be the primary one, because redis provides rich data types
// server.c
void dictObjectDestructor(void privdata, void val)
{

DICT_NOTUSED(privdata);

if (val == NULL) return; /* Lazy freeing will set value to NULL. */
decrRefCount(val);

}
//Reduce the reference count of value
void decrRefCount(robj *o) {

if (o->refcount == 1) {
    switch(o->type) {
        // string type
        case OBJ_STRING: freeStringObject(o); break;
        // list type
        case OBJ_LIST: freeListObject(o); break;
        // set type
        case OBJ_SET: freeSetObject(o); break;
        // zset type
        case OBJ_ZSET: freeZsetObject(o); break;
        // hash type
        case OBJ_HASH: freeHashObject(o); break;
        default: serverPanic("Unknown object type"); break;
    }
    zfree(o);
} else {
    if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
    if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}

}

Well, it can be seen that the release of key is very simple. value is a lot more cautious. First, it only subtracts references on the surface. Only when there is only one reference left, that is, only the current reference, this release is the last release, so the memory will be recycled.

//Before introducing memory release of different data types, let's look at the data structure of each element
// dict.h
typedef struct dictEntry {

// Store key field content
void *key;
// Store value with a union
union {
    // Use * val to store data
    void *val;
    uint64_t u64;
    // Use this field when storing expiration times
    int64_t s64;
    // Use when storing score
    double d;
} v;
// When there is a hash conflict, it is used as a linked list
struct dictEntry *next;

} dictEntry;

//1. Release of string type
// object.c
void freeStringObject(robj *o) {

// Direct call to sds service release
if (o->encoding == OBJ_ENCODING_RAW) {
    sdsfree(o->ptr);
}

}

//2. Release of list type
// object.c
void freeListObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_QUICKLIST:
    quicklistRelease(o->ptr);
    break;
default:
    serverPanic("Unknown list encoding type");
}

}
// quicklist.c
/ Free entire quicklist. /
void quicklistRelease(quicklist *quicklist) {

unsigned long len;
quicklistNode *current, *next;

current = quicklist->head;
len = quicklist->len;
// The list can be released by iterating one by one
while (len--) {
    next = current->next;
    // Release list specific value
    zfree(current->zl);
    quicklist->count -= current->count;
    // Release list object
    zfree(current);

    quicklist->len--;
    current = next;
}
zfree(quicklist);

}

//3. Release of set type
//There are two types of object. C and set, HT and intset
void freeSetObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_HT:
    // The hash type needs to delete the kv of each hash
    dictRelease((dict*) o->ptr);
    break;
case OBJ_ENCODING_INTSET:
    // intset direct release
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown set encoding type");
}

}
// dict.c,
/ Clear & Release the hash table /
void dictRelease(dict *d)
{

// ht[0],ht[1] clean up successively
_dictClear(d,&d->ht[0],NULL);
_dictClear(d,&d->ht[1],NULL);
zfree(d);

}
// dict.c,
/ Destroy an entire dictionary /
int _dictClear(dict d, dictht ht, void(callback)(void *)) {

unsigned long i;

/* Free all the elements */
for (i = 0; i < ht->size && ht->used > 0; i++) {
    dictEntry *he, *nextHe;

    if (callback && (i & 65535) == 0) callback(d->privdata);
    // The element is empty, hash miss, but as long as used > 0, there are still elements to be deleted
    // In fact, when there are only a few elements, the efficiency is good
    if ((he = ht->table[i]) == NULL) continue;
    while(he) {
        nextHe = he->next;
        // The release kv logic here is the same as before
        // It looks like recursion, but in fact, because redis does not have the problem of data type nesting, such as storing hash under hash, so recursion will not exist
        // The specific structure will be interpreted later
        dictFreeKey(d, he);
        dictFreeVal(d, he);
        zfree(he);
        ht->used--;
        he = nextHe;
    }
}
/* Free the table and the allocated cache structure */
zfree(ht->table);
/* Re-initialize the table */
_dictReset(ht);
return DICT_OK; /* never fails */

}

//4. Release of hash type
//object.c, hash and set are very similar, and the code is reused a lot
void freeHashObject(robj *o) {

switch (o->encoding) {
case OBJ_ENCODING_HT:
    // ht form is consistent with set
    dictRelease((dict*) o->ptr);
    break;
case OBJ_ENCODING_ZIPLIST:
    // ziplist direct release
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown hash encoding type");
    break;
}

}

//5. Release of Zset type
//Storage forms of object.c, zset and others
void freeZsetObject(robj *o) {

zset *zs;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST:
    zs = o->ptr;
    // Release of dict data, release of ht 0,1
    dictRelease(zs->dict);
    // Release skiplist data, mainly look at this
    zslFree(zs->zsl);
    zfree(zs);
    break;
case OBJ_ENCODING_ZIPLIST:
    zfree(o->ptr);
    break;
default:
    serverPanic("Unknown sorted set encoding");
}

}
//t_zset.c, release the skip table data
/ Free a whole skiplist. /
void zslFree(zskiplist *zsl) {

zskiplistNode *node = zsl->header->level[0].forward, *next;

zfree(zsl->header);
while(node) {
    // Release data based on layer 0, and iterate based on layer 0 until deletion is completed
    // Because the data of other layers are referenced data of layer 0, you do not need to pay attention to it when releasing
    next = node->level[0].forward;
    zslFreeNode(node);
    node = next;
}
zfree(zsl);

}
//T_zsetis also very simple, just release node.ele, and then release itself to
//Such deletion depends on its storage structure. We'll talk about it later
/* Free the specified skiplist node. The referenced SDS string representation

  • of the element is freed too, unless node->ele is set to NULL before calling
  • this function. */
  1. zslFreeNode(zskiplistNode *node) {
    sdsfree(node->ele);
    zfree(node);
    }
  1. Asynchronous deletion process

Asynchronous deletion is arguably more complex and interesting. But we've had the core things rolled up before, and there's not much left.

// lazyfree.c,
int dbAsyncDelete(redisDb db, robj key) {

/* Deleting an entry from the expires dict will not free the sds of
 * the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

/* If the value is composed of a few allocations, to free in a lazy way
 * is actually just slower... So under a certain limit we just free
 * the object synchronously. */
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
    robj *val = dictGetVal(de);
    size_t free_effort = lazyfreeGetFreeEffort(val);

    /* If releasing the object is too much work, let's put it into the
     * lazy free list. */
    // In fact, the difference between asynchronous method and synchronous method is that the influence of elements to be deleted must be greater than a certain threshold (64)
    // Otherwise, it will be deleted directly according to the synchronization mode, because it is less expensive
    if (free_effort > LAZYFREE_THRESHOLD) {
        // Asynchronous release + 1, atomic operation
        atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
        // Add the value release to the asynchronous thread queue for background processing. The task type is asynchronous memory release
        bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
        // Set val to NULL to ignore releasing value related memory when deleting externally
        dictSetVal(db->dict,de,NULL);
    }
}

/* Release the key-val pair, or just the key if we set the val
 * field to NULL in order to lazy free it later. */
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
    if (server.cluster_enabled) slotToKeyDel(key);
    return 1;
} else {
    return 0;
}

}
//bio.c, add asynchronous tasks to the thread. The type is determined by the type. The thread can add them safely
//Then, the background thread will not stop running the task
void bioCreateBackgroundJob(int type, void arg1, void arg2, void *arg3) {

struct bio_job *job = zmalloc(sizeof(*job));

job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// Locking operation
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
// Wake up task thread
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);

}
//bio.c, the background thread task framework, has something to do anyway.
void bioProcessBackgroundJobs(void arg) {

struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;

/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
    serverLog(LL_WARNING,
        "Warning: bio thread started with wrong type %lu",type);
    return NULL;
}

/* Make the thread killable at any time, so that bioKillThreads()
 * can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
 * receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
    serverLog(LL_WARNING,
        "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
// Task runs all the time
while(1) {
    listNode *ln;

    /* The loop always starts with the lock hold. */
    if (listLength(bio_jobs[type]) == 0) {
        // Note that the lock will be released here so that external tasks can be added in
        pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
        continue;
    }
    /* Pop the job from the queue. */
    ln = listFirst(bio_jobs[type]);
    job = ln->value;
    /* It is now possible to unlock the background system as we know have
     * a stand alone job structure to process.*/
    pthread_mutex_unlock(&bio_mutex[type]);

    /* Process the job accordingly to its type. */
    if (type == BIO_CLOSE_FILE) {
        close((long)job->arg1);
    } else if (type == BIO_AOF_FSYNC) {
        aof_fsync((long)job->arg1);
    } 
    // That's it. I'll deal with the submitted tasks
    else if (type == BIO_LAZY_FREE) {
        /* What we free changes depending on what arguments are set:
         * arg1 -> free the object at pointer.
         * arg2 & arg3 -> free two dictionaries (a Redis DB).
         * only arg3 -> free the skiplist. */
        // This paper introduces the form of deleting value, using the first case
        if (job->arg1)
            lazyfreeFreeObjectFromBioThread(job->arg1);
        else if (job->arg2 && job->arg3)
            lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
        else if (job->arg3)
            lazyfreeFreeSlotsMapFromBioThread(job->arg3);
    } else {
        serverPanic("Wrong job type in bioProcessBackgroundJobs().");
    }
    zfree(job);

    /* Unblock threads blocked on bioWaitStepOfType() if any. */
    // Wake up all related waiting threads
    pthread_cond_broadcast(&bio_step_cond[type]);

    /* Lock again before reiterating the loop, if there are no longer
     * jobs to process we'll block again in pthread_cond_wait(). */
    pthread_mutex_lock(&bio_mutex[type]);
    listDelNode(bio_jobs[type],ln);
    bio_pending[type]--;
}

}

//lazyfree.c, consistent with synchronous deletion
/* Release objects from the lazyfree thread. It's just decrRefCount()

  • updating the count of objects to release. */
  1. lazyfreeFreeObjectFromBioThread(robj *o) {
    decrRefCount(o);
    atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
    }

From here, we can see that redis does not perform asynchronous operations every time it enters asynchronous, but only when necessary. This also reminds us that we should not be asynchronous for the sake of asynchrony, but should calculate the advantages and disadvantages.

In this way, the entire del/unlink process is completed. Generally speaking, the remove is based on the simple hash removal. The only difficulty is the problem of memory recycling. This is actually what a simple garbage collector using reference counter algorithm should do. Don't talk too much. The specific process depends on the storage structure of data, and the main purpose is to recursively release space to avoid memory leakage.

Original address https://www.cnblogs.com/yougewe/p/12231419.html

Keywords: Database encoding Redis less

Added by Yamakazi on Sun, 02 Feb 2020 13:43:58 +0200