This article continues to explain the establishment of flow in the previous article. The most important function FlowGetNew is to obtain a flow. The acquisition process is also very tortuous. Please tell your grandparents to get a flow.
1. The function flowgetnew can also be understood in several aspects:
FlowHandlePacket->FlowGetFlowFromHash->FlowGetNew
a. Obtain the flow from the thread's own flow queue. If the flow is obtained successfully, return the flow. If not, obtain a flow queue from the global flow memory pool, and then obtain the flow from the flow queue.
b. If the thread's own flow queue and global flow memory pool have no available flow queue, and the flow memory exceeds the configured upper limit, it is set to enter the emergency mode.
c. Then, b, after setting the emergency mode, call the function FlowGetUsedFlow to get flow, which is from the global flow_ being used. Get one from the bucket linked list of hash. This function will be commented later.
d. If the thread's own flow queue and global flow memory pool do not have available flow queues, and the flow memory does not exceed the configured upper limit, allocate flow directly in memory, that is, call the function FlowAlloc.
static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, const Packet *p) { //Get emergency mode flag const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0); //Check whether flow can be generated. icmp error package does not generate flow //In case of emergency mode, tcp non sync packets do not generate flow. It can be seen that sync packets generate flow first if (FlowCreateCheck(p, emerg) == 0) { return NULL; } //Get the flow from the thread's own flow queue. If the flow is obtained successfully, the flow is returned /* get a flow from the spare queue */ Flow *f = FlowQueuePrivateGetFromTop(&fls->spare_queue); if (f == NULL) { //If obtaining flow fails, obtain a flow queue from the global flow memory pool, and then obtain flow from the flow queue. //FlowSpareSync function is mainly used to obtain a flow queue from the free global flow memory pool for your own use f = FlowSpareSync(tv, fls, p, emerg); } //There are no flow queues available in the global flow memory pool if (f == NULL) { /* If we reached the max memcap, we get a used flow */ //Judge that if the flow memory exceeds the configured upper limit, it will enter the emergency mode. if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) { /* declare state of emergency */ if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) { //The emergency mode flag can be set only if it is not set. It is unnecessary to set it again after setting it SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY); //This function sets the normal timeout of the current stream aging to the emergency mode timeout FlowTimeoutsEmergency(); } //There is no idle flow, and the memory occupied by the flow exceeds the upper limit of the configuration. Try to start from the global flow in use_ hash //Get one from the bucket linked list of. Only if the reference count is 0, it will be taken out, regardless of whether it meets the timeout condition f = FlowGetUsedFlow(tv, fls->dtv, &p->ts); if (f == NULL) { return NULL; } #ifdef UNITTESTS if (tv != NULL && fls->dtv != NULL) { #endif StatsIncr(tv, fls->dtv->counter_flow_get_used); #ifdef UNITTESTS } #endif /* flow is still locked from FlowGetUsedFlow() */ FlowUpdateCounter(tv, fls->dtv, p->proto); return f; } //If there is no flow queue available for the thread's own flow queue and the global flow memory pool, //If the flow memory does not exceed the configured upper limit, the flow is directly allocated in the memory, that is, the function FlowAlloc is called /* now see if we can alloc a new flow */ f = FlowAlloc(); if (f == NULL) { #ifdef UNITTESTS if (tv != NULL && fls->dtv != NULL) { #endif StatsIncr(tv, fls->dtv->counter_flow_memcap); #ifdef UNITTESTS } #endif return NULL; } /* flow is initialized but *unlocked* */ //flow will be initialized in FlowAlloc. No one uses it or locks it. Whoever uses it will lock it } else { /* flow has been recycled before it went into the spare queue */ /* flow is initialized (recylced) but *unlocked* */ //This means that after the flow is obtained, the flow will be initialized FlowInit (f) when recycled, //Indeed, this is done during recycling and the lock is released } FLOWLOCK_WRLOCK(f); FlowUpdateCounter(tv, fls->dtv, p->proto); return f; }
2. FlowTimeoutsEmergency function
FlowHandlePacket->FlowGetFlowFromHash->FlowGetNew->FlowTimeoutsEmergency
This function has a single function and sets the aging timeout to the aging time of emergency mode.
void FlowTimeoutsEmergency(void) { //Both parameters are global variables, //flow_timeouts is the timeout in use //flow_timeouts_emerg is the emergency timeout set at initialization //Set the aging timeout to the aging time of emergency mode. After setting, in the stream aging thread, //The time obtained when checking timeout is the time of this emergency mode SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg); }
3. FlowGetUsedFlow function
FlowHandlePacket->FlowGetFlowFromHash->FlowGetNew->FlowGetUsedFlow
This function is called when no free flow is available and the flow memory reaches the configured memory limit when obtaining flow in FlowGetNew.
The function is mainly from the global variable flow_ Get a flow from the used flows in the hash. The reference count of this flow must be 0. Do not check whether it times out. Take it out as long as the reference count is 0.
When obtaining flow, there is a small algorithm when selecting bucket. If you use flow every time_ If you take the flow out of the bucket in front of the hash, the flow in front is bound to be taken out quickly. After taking it, you have to traverse from front to back every time. Each traverse is useless and time-consuming, because the qualified flow in the bucket in front is taken away first, and each bucket will be traversed in turn.
So, set an atomic variable flow_prune_idx, which is used to control the bucket from which the qualified flow is obtained each time. Here, 5 is added each time, that is, the index interval of the bucket from which the flow is obtained each time is 5. If there is no qualified flow on this bucket, the index is added by 1, and the next bucket is checked until the flow is found or the lookup count is 5 (fixed value of global variable). If the traversal reaches the last bucket, Then start the search from the first bucket.
The purpose is to reserve qualified flows for each bucket. You can quickly find the flow when fetching each time without traversing all buckets
/** \internal * \brief Get a flow from the hash directly. * * Called in conditions where the spare queue is empty and memcap is reached. * * Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt * is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the * top each time since that would clear the top of the hash leading to longer * and longer search times under high pressure (observed). * * \param tv thread vars * \param dtv decode thread vars (for flow log api thread data) * * \retval f flow or NULL */ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const struct timeval *ts) { //This function adds flow to the global atomic variable that controls the bucket index_ GET_ NEW_ Tries, the value is 5, //The bucket index of each search is the last value plus 5, and the search will not start every time. In this way, each bucket can be searched with the greatest probability //Keep the qualified flow s and search for these buckets later. The probability of success is high. It is not necessary to traverse all buckets //This is my personal understanding. Why is the maximum probability? Because not all bucket s have qualified flow s. If you don't search it, it doesn't, //If you search for it, it doesn't, and it's impossible to reserve qualified flow s for that bucket, uint32_t idx = GetUsedAtomicUpdate(FLOW_GET_NEW_TRIES) % flow_config.hash_size; uint32_t tried = 0; while (1) { //The number of lookups is actually the number of bucket s searched. If it reaches 5 times, the counter will be updated and returned if (tried++ > FLOW_GET_NEW_TRIES) { STATSADDUI64(counter_flow_get_used_eval, tried); break; } //When the bucket index reaches the maximum, it starts from 0 if (++idx >= flow_config.hash_size) idx = 0; FlowBucket *fb = &flow_hash[idx]; //next_ts timeout variable set INT_MAX indicates that there is no flow on the bucket in the future_ Ts to time out //That is, the bucket is empty and the int set by the FlowManager aging thread_ MAX if (SC_ATOMIC_GET(fb->next_ts) == INT_MAX) continue; if (GetUsedTryLockBucket(fb) != 0) { STATSADDUI64(counter_flow_get_used_eval_busy, 1); continue; } Flow *f = fb->head; if (f == NULL) { FBLOCK_UNLOCK(fb); continue; } if (GetUsedTryLockFlow(f) != 0) { STATSADDUI64(counter_flow_get_used_eval_busy, 1); FBLOCK_UNLOCK(fb); continue; } /** never prune a flow that is used by a packet or stream msg * we are currently processing in one of the threads */ if (f->use_cnt > 0) { //If the counter is greater than 0, it means that a pack refers to this flow and cannot rob other people's flow STATSADDUI64(counter_flow_get_used_eval_busy, 1); FBLOCK_UNLOCK(fb); FLOWLOCK_UNLOCK(f); continue; } //This function determines whether this flow has been used recently, //0 is based on the time difference between the latest time and the flow. If it is less than a certain number of seconds, it has been used recently and cannot be taken away if (StillAlive(f, ts)) { STATSADDUI64(counter_flow_get_used_eval_reject, 1); FBLOCK_UNLOCK(fb); FLOWLOCK_UNLOCK(f); continue; } //Well, congratulations. The flow count is 0. It hasn't been updated recently. That's it //Take it off the bucket /* remove from the hash */ fb->head = f->next; f->next = NULL; f->fb = NULL; FBLOCK_UNLOCK(fb); //This is the termination flag of flow, and the flag flow is forcibly taken away_ END_ FLAG_ FORCED, //When flow terminates, it is in emergency mode, and the flag flow is set_ END_ FLAG_ EMERGENCY /* rest of the flags is updated on-demand in output */ f->flow_end_flags |= FLOW_END_FLAG_FORCED; if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY; /* invoke flow log api */ #ifdef UNITTESTS if (dtv) { #endif //Output flow log if (dtv->output_flow_thread_data) { (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f); } #ifdef UNITTESTS } #endif //Clear some resources in the flow, mainly whether the memory of the tcp session associated with the flow is f - > protoctx //The storage space behind the flow structure, void * array, and reinitialize the flow FlowClearMemory(f, f->protomap); /* leave locked */ STATSADDUI64(counter_flow_get_used_eval, tried); return f; } STATSADDUI64(counter_flow_get_used_failed, 1); return NULL; }