The flow aging entry function FlowManager checks the aging process. If it enters the emergency mode, it directly calls the function FlowTimeoutHash for aging processing. If it is in the normal mode, it calls the function FlowTimeoutHashInChunks for aging processing.
1. FlowTimeoutHashInChunks function
FlowManager->FlowTimeoutHashInChunks
The function FlowTimeoutHashInChunks is responsible for processing buckets by time (i.e. times) and range for threads. As time goes by, buckets are divided into multiple ranges. The range is divided according to the number of seconds. After 0 seconds, there is no range, and no timeout check will be performed, because the time does not change, This theory is also reasonable. The more seconds passed, the more the range of minutes. chunks = MIN(secs_passed, pass_in_sec);chunks are the number of ranges separated.
In fact, the idea is not to traverse all buckcets at a time, but to go through several buckets and variables hash every few seconds_ pass_ ITER controls which bucket to start traversing each time, chunks controls the end range of traversing buckets, and hash_pass_iter starts from 0 to pass_in_sec=240 seconds (30 * 8), from 0 to 240, which means that only one bucket can be traversed within 1 second. If the time does not change, no bucket will be traversed. If the time changes, for example, from 50 seconds to 51 seconds, then hash_pass_iter + +, so that the bucket traversed each time is traversed backward. Because it has just been traversed before, it will not be traversed this time. The number of buckets traversed each time is calculated within this function. For example, the number of buckets in the thread is 65536, hash_pass_iter's loop range is up to pass_in_sec=240 seconds (30 * 8), so the number of buckets traversed each time is equal to 65536 / 240 = 273, and the redundant buckets fall into the last range.
const uint32_t min_timeout = FlowTimeoutsMin();
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
It's not easy to understand, and the words are wordy. There's so much said above, but in fact, I haven't accurately explained it, and I haven't written what I want to express. Make do with it.
struct timeval ts;
TimeGet(&ts)
const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000; // Convert to milliseconds
const uint32_t rt = (uint32_t)ts.tv_sec;
const uint32_t secs_passed = rt - flow_last_sec,
rt represents real-time time, assignment in the loop, flow_last_sec is the last time. Subtracting the two is the number of seconds elapsed,
In fact, you can only traverse a few buckets at a time, not all buckets at a time. Because the polling check interval is very fast, 667 milliseconds. It is not necessary to traverse every time, so set secs_ The passed variable indicates the number of seconds elapsed. Only when the time changes will the bucket be traversed. The check timeout is hash_ pass_ The ITER variable records the initial index of each bucket traversal, and continuously adds 1 within the range of seconds. In this way, each bucket will be traversed later, and the bucket just traversed will not be traversed when hash_pass_iter equals pass_in_sec is set to 0, that is, the traversal starts from the first bucket.
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts, const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks) { //Number of bucket s this thread is responsible for const uint32_t rows = hash_max - hash_min; //The number of bucket s traversed each time. chunks is the number of seconds passed in const uint32_t chunk_size = rows / chunks; --------------------------------- //chunks is the pass passed in_ in_ SEC variable, //These two lines are defined outside the function, which is easy to understand //This line gets the minimum timeout seconds of all protocol timeout times in normal mode const uint32_t min_timeout = FlowTimeoutsMin(); //This is multiplied by 8 according to the timeout seconds. The minimum number of seconds is 30, so pass_in_sec = 240 //I don't know how to forget it. I don't understand it for the time being const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60; ----------------------------------------------------------------------- //This time, the starting index of the bucket is traversed. As the iter value increases by 1, the index moves back by a chunk_size //Every time iter is passed in, it is the value after adding 1, so every time it traverses the following bucket in turn, //Because the previous bucket has just been traversed in the previous second or several seconds, it will be traversed later this time, //This is also the main purpose of this function const uint32_t min = iter * chunk_size + hash_min; //End index of bucket traversal this time uint32_t max = min + chunk_size; /* we start at beginning of hash at next iteration so let's check * hash till the end */ if (iter + 1 == chunks) { //If there is a remainder, it is not enough for a chunk_ bucket of size is included in the scope of this traversal max = hash_max; } //This function has been explained in the previous article to do specific aging tasks const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters); return cnt; //Returns the number of aging flow s }
2. FlowSparePoolUpdate function
FlowManager->FlowSparePoolUpdate
This function is mainly called in the first flow aging thread to adjust the number of free flows. If the total number of free flows is less than 90% of the configured flows, it will be supplemented, and if the total number of free flows is more than 110% of the configured flows, it will be released.
void FlowSparePoolUpdate(uint32_t size) { //-------------------------------------------------- //The code in this if handles more free flows than the pre allocated flows //If the number of additional flows is 10 times or more than that of a single flow queue, one tenth will be released //For example, if 1100 / 10 = 110, release 100. Why not 110? Because one flow queue is released each time, //The flow queue has 100 flows, that is, the number of idle flows is more than //The number of flows in a single flow queue cannot be more than 10 times, and more than 10 times will release one tenth const int64_t todo = (int64_t)flow_config.prealloc - (int64_t)size; if (todo < 0) { //Calculate the number of flow s that should be released uint32_t to_remove = (uint32_t)(todo * -1) / 10; while (to_remove) { //If it is less than 100 of a single flow queue, it does not need to be released if (to_remove < flow_spare_pool_block_size) return; //Fetch the first pool of the flow memory pool FlowSparePool *p = NULL; SCMutexLock(&flow_spare_pool_m); p = flow_spare_pool; if (p != NULL) { flow_spare_pool = p->next; flow_spare_pool_flow_cnt -= p->queue.len; to_remove -= p->queue.len; } SCMutexUnlock(&flow_spare_pool_m); //Release the flow queue of the first pool if (p != NULL) { Flow *f; while ((f = FlowQueuePrivateGetFromTop(&p->queue))) { FlowFree(f); } SCFree(p); } } } else if (todo > 0) { //------------------------------------- //If the free flow quantity is less than the pre allocated flow quantity, the flow quantity shall be supplemented FlowSparePool *head = NULL, *tail = NULL; //Computing requires multiple memory pool s with less flow uint32_t blocks = ((uint32_t)todo / flow_spare_pool_block_size) + 1; uint32_t flow_cnt = 0; for (uint32_t cnt = 0; cnt < blocks; cnt++) { //Allocate flow memory pool structure FlowSparePool *p = FlowSpareGetPool(); if (p == NULL) { break; } //The function FlowSparePoolUpdateBlock is mainly used to allocate flow and hang it into the flow memory pool const bool ok = FlowSparePoolUpdateBlock(p); if (p->queue.len == 0) { SCFree(p); break; } flow_cnt += p->queue.len; //Cumulative number of flows in the flow queue /* prepend to list */ p->next = head; //Put each flow memory pool in the head of the temporary flow memory pool head = p; if (tail == NULL) tail = p; if (!ok) break; } if (head) { //If the allocation is successful, the global flow memory pool is updated SCMutexLock(&flow_spare_pool_m); if (flow_spare_pool == NULL) { flow_spare_pool = head; //The global flow memory pool is empty and directly placed in the header } else if (tail != NULL) { //If the global flow memory pool is not empty, the allocated memory pool is linked into the table //The second pool location of the global flow memory pool /* since these are 'full' buckets we don't put them * at the top but right after as the top is likely not * full. */ tail->next = flow_spare_pool->next; flow_spare_pool->next = head; } flow_spare_pool_flow_cnt += flow_cnt; //Update the total number of idle flow s #ifdef FSP_VALIDATE Validate(flow_spare_pool, flow_spare_pool_flow_cnt); #endif SCMutexUnlock(&flow_spare_pool_m); } } }