suricata what each thread does -- FlowManagerThread

catalogue

Thread initialization

Stream management logic processing function

Main cycle

Get timed out stream

Processing timed out streams

Other timeout

Thread exit

For the relationship between the creation of flow management thread and the registered slot and TmModule, see the section on creating sub threads of non working threads in my previous article

suricata what each thread does -- the main thread_ Xuwaiwaiwai blog - CSDN blog suricata the role of each thread - main thread https://blog.csdn.net/xuwaiwai/article/details/120086508?spm=1001.2014.3001.5501

Thread function: TV - > TM_ Func(), TmThreadsManagement.

Thread initialization

Set the thread name and cpu binding.

For the initialization of the registered module, the management thread only registers one slot (i.e. FlowManager module),

The initialization function is s - > slotthreadinit, where the corresponding is

static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)

Execute in function:

static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
{
    FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
    if (ftd == NULL)
        return TM_ECODE_FAILED;

    ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
    SCLogDebug("flow manager instance %u", ftd->instance);

    /* set the min and max value used for hash row walking
     * each thread has it's own section of the flow hash */
    uint32_t range = flow_config.hash_size / flowmgr_number;
    if (ftd->instance == 0)
        ftd->max = range;
    else if ((ftd->instance + 1) == flowmgr_number) {
        ftd->min = (range * ftd->instance) + 1;
        ftd->max = flow_config.hash_size;
    } else {
        ftd->min = (range * ftd->instance) + 1;
        ftd->max = (range * (ftd->instance + 1));
    }
    BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);

    SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);

    /* pass thread data back to caller */
    *data = ftd;

    FlowCountersInit(t, &ftd->cnt);

    PacketPoolInit();
    return TM_ECODE_OK;
}

There may be multiple management threads, so flow_config.hash_size is divided equally according to the number of management threads, and FTD - > min and FTD - > Max are set according to each equal, which is equivalent to setting flowbucket * flow_ The hash bucket array is divided equally, so that the management thread can traverse the flowbucket * flow according to this interval in the loop_ The stream in each interval in the hash array.

Initializes the packetpool of the current management thread.

Status statistics initialization.

Set the flag of tv to THV_INIT_DONE so that the main thread can remove the pause flag of this tv - > flag.

Stream management logic processing function

r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));

Since the slot corresponds to the FlowManager module, this function is

static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)

This function is mainly used to manage the flow table and process the flow timeout and flow emergency time. The processed data sources are Flow *evicted and Flow *head in each FlowBucket, which are mainly executed in the thread loop.

Stream timeout processing:

  • In emergency mode, the hash table is traversed completely
  • In the non emergency mode, only part of the hash table is scanned, but the traversal range is further reduced

They all end up in static uint32_ T flowtimeouthash (flowmanagertimeoutthread * TD, struct timeout * ts, const uint32_t hash_min, const uint32_t hash_max, flowtimeoutcounters * counters) function detects timeout in flowbucket * flow_ Detect the flow in this interval [uint32_t hash_min, const uint32_t hash_max] in hash.

Main cycle

Get timed out stream

As soon as I came up, I got something I couldn't understand. First, the above detection interval is grouped according to each 32 or 64 bits (operating system bits), which is considered as 64 bits here.

#if __WORDSIZE==64
#define BITS 64
#define TYPE uint64_t
#else
#define BITS 32
#define TYPE uint32_t
#endif

1. Set a Uint64 first_ t check_bits bitmap, each bit indicates whether a bucket has a timeout flow. The method is to cycle 64 times each time and check t each bucket's next_ts and the latest time. If there is a timeout, check_ In bits, the corresponding i is set to 1.
2. After the loop is completed, recycle the integer, check each bit and check next_ts, if they all meet the timeout conditions, they will be recycled.

    for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
        TYPE check_bits = 0;
        const uint32_t check = MIN(BITS, (hash_max - idx));
        for (uint32_t i = 0; i < check; i++) {
            FlowBucket *fb = &flow_hash[idx+i];
            check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i;
        }
        if (check_bits == 0)
            continue;

        for (uint32_t i = 0; i < check; i++) {
            FlowBucket *fb = &flow_hash[idx+i];
            if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) {
                if (FMTryLockBucket(fb) == 0) {
                    Flow *evicted = NULL;
                    if (fb->evicted != NULL || fb->head != NULL) {
                        /* if evicted is set, we only process that list right now.
                         * Since its set we've had traffic that touched this row
                         * very recently, and there is a good chance more of it will
                         * come in in the near future. So unlock the row asap and leave
                         * the possible eviction of flows to the packet lookup path. */
                        if (fb->evicted != NULL) {
                            /* transfer out of bucket so we can do additional work outside
                             * of the bucket lock */
                            evicted = fb->evicted;
                            fb->evicted = NULL;
                        } else if (fb->head != NULL) {
                            int32_t next_ts = 0;
                            FlowManagerHashRowTimeout(td,
                                    fb->head, ts, emergency, counters, &next_ts);

                            if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
                                SC_ATOMIC_SET(fb->next_ts, next_ts);
                        }
                        if (fb->evicted == NULL && fb->head == NULL) {
                            SC_ATOMIC_SET(fb->next_ts, INT_MAX);
                        } else if (fb->evicted != NULL && fb->head == NULL) {
                            SC_ATOMIC_SET(fb->next_ts, 0);
                        }
                    } else {
                        SC_ATOMIC_SET(fb->next_ts, INT_MAX);
                        rows_empty++;
                    }
                    FBLOCK_UNLOCK(fb);
                    /* processed evicted list */
                    if (evicted) {
                        FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
                    }
                } else {
                    rows_busy++;
                }
            } else {
                rows_skipped++;
            }
        }
        if (td->aside_queue.len) {
            cnt += ProcessAsideQueue(td, counters);
        }
    }
  • If FB - > evicted is not NULL, all streams in this function will be recycled and temporarily stored in the temporary queue of Flow *evicted in this function. In the subsequent process, we call the static void FlowManagerHashRowClearEvictedList (FlowManagerTimeoutThread *td, Flow *f, struct timeval *ts, FlowTimeoutCounters FlowTimeoutCounters) function, and put all the entries in it. Queue this queue.
    static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td,
            Flow *f, struct timeval *ts, FlowTimeoutCounters *counters)
    {
        do {
            FLOWLOCK_WRLOCK(f);
            Flow *next_flow = f->next;
            f->next = NULL;
            f->fb = NULL;
    
            DEBUG_VALIDATE_BUG_ON(f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters));
    
            FlowQueuePrivateAppendFlow(&td->aside_queue, f);
            /* flow is still locked in the queue */
    
            f = next_flow;
        } while (f != NULL);
    }
    
  • If FB - > head is not NULL, call FlowManagerHashRowTimeout for timeout processing, and the timeout stream is put to TD - > aside_ Queue this queue;
    static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td,
            Flow *f, struct timeval *ts,
            int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
    {
        uint32_t checked = 0;
        Flow *prev_f = NULL;
    
        do {
            checked++;
    
            /* check flow timeout based on lastts and state. Both can be
             * accessed w/o Flow lock as we do have the hash row lock (so flow
             * can't disappear) and flow_state is atomic. lastts can only
             * be modified when we have both the flow and hash row lock */
    
            /* timeout logic goes here */
            if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
    
                counters->flows_notimeout++;
    
                prev_f = f;
                f = f->next;
                continue;
            }
    
            FMFlowLock(f); //FLOWLOCK_WRLOCK(f);
    
            Flow *next_flow = f->next;
    
            counters->flows_timeout++;
    
            /* never prune a flow that is used by a packet we
             * are currently processing in one of the threads */
            if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) {
                FLOWLOCK_UNLOCK(f);
                prev_f = f;
                counters->flows_timeout_inuse++;
                f = f->next;
                continue;
            }
    
            RemoveFromHash(f, prev_f);
    
            FlowQueuePrivateAppendFlow(&td->aside_queue, f);
            /* flow is still locked in the queue */
    
            f = next_flow;
        } while (f != NULL);
    
        counters->flows_checked += checked;
        if (checked > counters->rows_maxlen)
            counters->rows_maxlen = checked;
    }

Processing timed out streams

Process the TD - > aside obtained above_ For the timed out stream in the queue, call static uint32_t ProcessAsideQueue (FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters) function:

static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
{
    FlowQueuePrivate recycle = { NULL, NULL, 0 };
    counters->flows_aside += td->aside_queue.len;

    uint32_t cnt = 0;
    Flow *f;
    while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
        /* flow is still locked */

        if (f->proto == IPPROTO_TCP &&
                !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
#ifdef CAPTURE_OFFLOAD
                f->flow_state != FLOW_STATE_CAPTURE_BYPASSED &&
#endif
                f->flow_state != FLOW_STATE_LOCAL_BYPASSED &&
                FlowForceReassemblyNeedReassembly(f) == 1)
        {
            FlowForceReassemblyForFlow(f);
            /* flow ownership is passed to the worker thread */

            /* flow remains locked */
            counters->flows_aside_needs_work++;
            continue;
        }
        FLOWLOCK_UNLOCK(f);

        FlowQueuePrivateAppendFlow(&recycle, f);
        if (recycle.len == 100) {
            FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
        }
        cnt++;
    }
    if (recycle.len) {
        FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
    }
    return cnt;
}

Mainly put TD - > aside_ The timeout flow of the queue is put into the recycling queue flow of the recycling thread_ recycle_ q. Once every 100, waiting for the recycle thread to process.

If the timeout flow here needs to be reorganized, it is called back to the original thread processing it. The function FlowWorkerProcessInjectedFlows in the FlowWorker of the original thread completes the reorganization and recycling, and the FlowForceReassemblyForFlow function puts the flow into the TV - > flow of the original thread_ In the queue, after being put into the queue, send a signal to the thread to which the flow belongs to wake up that thread to process the reorganization and recycling of the flow.

Other timeout

        if (ftd->instance == 0 &&
                (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {
            DefragTimeoutHash(&ts);
            //uint32_t hosts_pruned =
            HostTimeoutHash(&ts);
            IPPairTimeoutHash(&ts);
            other_last_sec = (uint32_t)ts.tv_sec;
        }

Thread exit

Execute the thread exit function, and then exit.

The past is the prelude

Keywords: Cyber Security

Added by reapfyre on Sat, 15 Jan 2022 02:45:50 +0200