2021SC@SDUSC
summary
After analyzing the contents related to buffer pool in the previous blog, let's analyze the contents related to postgreSQL shared memory, which involves IPC (inter process communication).
Because the process is the smallest unit of resources allocated by the operating system, each process has its own independent set of system resources, and the resources between different processes are isolated from each other. In order to enable different processes to access resources and work together, the operating system provides IPC mechanism. IPC in PostgreSQL is mainly realized by sharing memory, that is, open up a memory space in the system where all processes can read and write, and agree on the time and method for processes to read and write this memory, so that processes can exchange data through this shared memory.
Source code analysis
During postgreSQL initialization, the system allocates a memory area that is visible to all backend processes of postgreSQL, which is shared memory.
On the basis of shared memory, postgreSQL also provides other functions:
- Communication between postgres process and postmaster
- SI Message mechanism, i.e. invalid message delivery mechanism
- Unified management process related variables and functions
This time, we will mainly analyze the shared memory and SI Message mechanism.
Shared memory and its management
Initialization of shared memory
The source code is located in Src / backend / storage / IPC / ipci C.
void CreateSharedMemoryAndSemaphores(int port) { PGShmemHeader *shim = NULL; //Whether it is exec is judged here_ In the case of backend, it is called by the back end of the postmaster branch (at this time, the shared memory already exists and does not need to be initialized, but the pointer referencing the shared structure needs to be initialized in the local memory) if (!IsUnderPostmaster) { PGShmemHeader *seghdr; Size size;//Store the calculated shared memory size int numSemas;//Store the calculated semaphore quantity //Calculate the number of semaphores required numSemas = ProcGlobalSemas(); numSemas += SpinlockSemas(); //Here we begin to calculate the size of shared memory (including some estimates) size = 100000; //The amount of memory required to add semaphores (the number of semaphores has been calculated above) size = add_size(size, PGSemaphoreShmemSize(numSemas)); size = add_size(size, SpinlockSemaSize()); //Calculation and estimation of the size of various locks, structures and indexes in shared memory size = add_size(size, hash_estimate_size(SHMEM_INDEX_SIZE, sizeof(ShmemIndexEnt))); size = add_size(size, BufferShmemSize()); size = add_size(size, LockShmemSize()); size = add_size(size, PredicateLockShmemSize()); size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, XLOGShmemSize()); size = add_size(size, CLOGShmemSize()); size = add_size(size, CommitTsShmemSize()); size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, TwoPhaseShmemSize()); size = add_size(size, BackgroundWorkerShmemSize()); size = add_size(size, MultiXactShmemSize()); size = add_size(size, LWLockShmemSize()); size = add_size(size, ProcArrayShmemSize()); size = add_size(size, BackendStatusShmemSize()); size = add_size(size, SInvalShmemSize()); size = add_size(size, PMSignalShmemSize()); size = add_size(size, ProcSignalShmemSize()); size = add_size(size, CheckpointerShmemSize()); size = add_size(size, AutoVacuumShmemSize()); size = add_size(size, ReplicationSlotsShmemSize()); size = add_size(size, ReplicationOriginShmemSize()); size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, ApplyLauncherShmemSize()); size = add_size(size, SnapMgrShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif /* freeze the addin request size and include it */ addin_request_allowed = false; size = add_size(size, total_addin_request); /* might as well round it off to a multiple of a typical page size */ size = add_size(size, 8192 - (size % 8192)); elog(DEBUG3, "invoking IpcMemoryCreate(size=%zu)", size); //Initialize shared memory segment seghdr = PGSharedMemoryCreate(size, port, &shim); //Initializes the access method of shared memory InitShmemAccess(seghdr); //Create semaphore PGReserveSemaphores(numSemas, port); #ifndef HAVE_SPINLOCKS //Initialize spin lock semaphore SpinlockSemaInit(); #endif } else { //This is the above-mentioned EXEC_BANCKEND condition, which has been attached to shared memory. #ifndef EXEC_BACKEND elog(PANIC, "should be attached to shared memory already"); #endif } if (!IsUnderPostmaster) //Initialize allocation of shared memory InitShmemAllocation(); //Initialize LWLocks. This lock is used to allocate shared memory. The following function requires this lock. CreateLWLocks(); //Initialize shared memory index hash table InitShmemIndex(); //Set xlog,clog and buffer pool XLOGShmemInit(); CLOGShmemInit(); CommitTsShmemInit(); SUBTRANSShmemInit(); MultiXactShmemInit(); InitBufferPool(); //Initialize lock manager InitLocks(); //Initialize predicate lock manager InitPredicateLocks(); //Initialize process table if (!IsUnderPostmaster) InitProcGlobal(); CreateSharedProcArray(); CreateSharedBackendStatus(); TwoPhaseShmemInit(); BackgroundWorkerShmemInit(); //Create invalid messaging CreateSharedInvalidationState(); //Establish inter process communication mechanism, i.e. IPC PMSignalShmemInit(); ProcSignalShmemInit(); CheckpointerShmemInit(); AutoVacuumShmemInit(); ReplicationSlotsShmemInit(); ReplicationOriginShmemInit(); WalSndShmemInit(); WalRcvShmemInit(); ApplyLauncherShmemInit(); //Set up other modules that require shared memory space SnapMgrInit(); BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); //Initialize dynamic shared memory if (!IsUnderPostmaster) dsm_postmaster_startup(shim); //Memory allocation for loadable modules if (shmem_startup_hook) shmem_startup_hook(); }
This function is responsible for initializing shared memory and semaphores. It is called by postmaster or a separate back-end process. The general process is as follows:
- Calculate the total required size of shared memory
- Allocate shared memory space
- Initialize shared memory header pointer
- Create semaphore
- Building shared memory index tables
- Initialize each module and allocate shared memory space
Shared memory Hash table
During initialization, a Hash table will be created in the shared memory. When other modules need to open up a space in the shared memory, they will call relevant functions to obtain an area in the shared memory and an index in the Hash table.
This function calls Src / backend / storage / IPC / shmem C to initialize the memory. I won't analyze it in detail here.
SI Message mechanism
SI Message is mainly used to synchronize caches of different processes. Here we mainly analyze how SI Message is implemented with the help of the above shared memory.
shmInvalBuffer
The data structure is located in shared memory and is used to record all invalid messages sent by the system and the progress of all processes in processing invalid messages.
static SISeg *shmInvalBuffer;
According to the code, you can see that this is a pointer to a global variable of type static, and the structure type is SISeg.
SISeg structure
typedef struct SISeg { //General status information int minMsgNum; //The minimum number of invalid messages in the Buffer that have not been processed by all processes, that is, the oldest message still needed int maxMsgNum; //The next array element subscript that can be used to hold the new invalid message int nextThreshold; //The total number of messages to be cleaned up, which is used when calling SICleanupQueue int lastBackend; //The subscript of the last active procState entry int maxBackends; //The size of the procState array slock_t msgnumLock; //This spin lock ensures the process synchronization of maxMsgNum //A fixed length array that stores invalid messages. Each element of the array stores an invalid message. And the array is a ring queue. SharedInvalidationMessage buffer[MAXNUMMESSAGES]; //#define MAXNUMMESSAGES 4096 this is the definition of the size of the buffer array in the source code. It is 4096 //Used to store the progress (status) of each back-end process processing invalid messages ProcState procState[FLEXIBLE_ARRAY_MEMBER]; //The size of the array is related to the maximum number of processes allowed by the system } SISeg;
About the insertion of buffer ring queue:
When the array is initially empty, the new invalid messages are stored in the elements of the array from front to back. When the array is full, the new invalid messages will return to the head of the buffer array and start insertion.
About minMsgNum and maxMsgNum:
One of the two can be considered as the lower bound of the invalid message, and the other can be considered as the upper bound of the invalid message. That is, invalid messages with a sequence number smaller than minMsgNum have been processed by all processes, while invalid messages with a sequence number greater than or equal to maxMsgNum have not been generated, and invalid messages between them are that at least one process has not processed them. Therefore, in the buffer ring, positions outside the window composed of minMsgNum and maxMsgNum can be used to store newly added invalid messages.
However, when inserting invalid messages into the SIMessage queue, there will still be insufficient space (at this time, there are invalid messages that have not been completely read in the message queue). At this time, it is necessary to clean up some invalid messages (when the sum of the current number of messages plus the number of messages to be inserted exceeds the above nextThreshold).
SICleanupQueue
It is recommended to read the following ProcState structure first, and then look at the function analysis here, because the following structure will be used.
This function is responsible for cleaning up invalid message queues.
void SICleanupQueue(bool callerHasWriteLock, int minFree) //callerHasWriteLock is set to true when the caller holds SInvalWriteLock //minFree is the minimum number of messages to release { //Pointer to buffer SISeg *segP = shmInvalBuffer; int min, minsig, lowbound, numMsgs, i; ProcState *needSig = NULL; //Pay attention to all readers and writers here //If the caller does not have a SInvalWriteLock, the write lock is acquired if (!callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); //Acquire read lock (SInvalWReadLock) LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); //Used to record the global minimum nextMsgNum min = segP->maxMsgNum; //Here, the upper bound of the number of purges is calculated, and the difference between the upper bound and the lowbound is the minimum number of processes that need to be notified for acceleration minsig = min - SIG_THRESHOLD; //minFree is the minimum value of the queue space to be released (input parameter). Here is the location of the space that must be released during this cleaning process lowbound = min - MAXNUMMESSAGES + minFree;
lowbound and minsig
Because the clean-up operation will notify the process that has not finished processing the relevant messages, and the process will discard all tuples in the Cache, which will lead to Cache overload. If all processes overload the Cache, it will lead to higher I/O times. In order to reduce the number of Cache overloads, the above minimum value is calculated. As long as the value of nextMsgNum is less than lowbound, the process needs to be reset, that is, set the resetState variable to true, and the process will automatically overload the Cache. For processes whose nextMsgNum value is between lowbound and minsig, although it has nothing to do with this cleanup, in order to avoid frequent cleanup operations, these processes will be required to speed up the processing of invalid messages. Then the cleanup operation will find the slowest one of these processes and send him PROCSIG_CATCHUP_INTERRUPT signal. After receiving SIGUSR1, the process will process all invalid messages at one time, and then continue to send procsig to the next process with the slowest progress_ CATCHUP_ Interrupt signal
//Traverse the ProcState structure of all processes to check, and set the resetState variable of the process whose nextMsgNum is lower than lowbound to true //And find the slowest progress in the process between lowbound and minsig in nextMsgNum for (i = 0; i < segP->lastBackend; i++) { //Fetch the ProcState structure of the process ProcState *stateP = &segP->procState[i]; //Fetch the nextMsgNum of the process for checking int n = stateP->nextMsgNum; //If the procPid of the process is 0 (that is, the process state is not recorded, indicating an inactive process), or it is already in the state that needs to be reset, or sendOnly is true, it will be ignored directly if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) continue; //If the nextMsgNum of the process is lower than lowbound if (n < lowbound) { //Because lowbound is the lower bound of cleanup, the process must be reset stateP->resetState = true;//Set reset required to true //Proceed to the next process continue; } //Here, the global minimum nextMsgNum is obtained by comparison one by one if (n < min) min = n; //Look for the farthest back-end process that does not send a signal if (n < minsig && !stateP->signaled) { //Update minsig minsig = n; //Set the signal to be sent needSig = stateP; } } segP->minMsgNum = min; //When min becomes the real minMsgNum, reduce all message counters to prevent counter overflow. if (min >= MSGNUMWRAPAROUND) { //Counter decrease segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; for (i = 0; i < segP->lastBackend; i++) { //The nextMsgNum of the process should also be reduced segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } //Here, it is necessary to decide how many messages are left in the queue and assign a new value to threshold for the next call of the function numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs < CLEANUP_MIN)//If the quantity to be left is less than the minimum value of cleaning segP->nextThreshold = CLEANUP_MIN; else//otherwise segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; //If necessary, signal the process to speed up if (needSig) { //Get pid pid_t his_pid = needSig->procPid; //Get backendId BackendId his_backendId = (needSig - &segP->procState[0]) + 1; //Set sent signal to true needSig->signaled = true; //Release the read lock and write lock (because SendProcSignal executes slowly, release the lock before it executes) LWLockRelease(SInvalReadLock); LWLockRelease(SInvalWriteLock); elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); //Call the SendProcSignal function to send a signal SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); //Restore the lock state after sending (because it is about to end, there is no need to take it again for reading and writing) if (callerHasWriteLock)//If the caller has a write lock //Regain write lock LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); } else//The process does not need to send a signal { //Release read lock LWLockRelease(SInvalReadLock); //If you do not initially have a write lock if (!callerHasWriteLock) //Release write lock LWLockRelease(SInvalWriteLock); } }
The main process is as follows:
- Calculate the values of min,lowbound and minsig
- Check the ProcState structure of each process, set the resetState variable of the process where nextMsgNum is lower than lowbound to true, and find the farthest (slowest) process in the process where nextMsgNum is between lowbound and minsig
- Recalculate threshold
- Send a signal to the farthest process found in step 2
ProcState structure
typedef struct ProcState { pid_t procPid; //The PID of the back-end process is set to 0 if the process status is not recorded PGPROC *proc; //PGPROC of backend process int nextMsgNum; //The next invalid message number to be read by the process, i.e. the index of buffer array. When the procPid above is 0 or the resetState below is true, the variable has no meaning bool resetState; //Records whether the process needs to reset the status bool signaled; //Record whether the process receives a catchup signal bool hasMessages; //Records whether the process has unread messages bool sendOnly; //Used to mark whether the process only sends invalid messages and does not receive invalid messages //It has no meaning for ProcState whose proc is not 0 LocalTransactionId nextLXID;//Record the next LocalTransactionId } ProcState;
About sendOnly:
If true, it means that the process will only send invalid messages and will not receive invalid messages. This situation only makes sense during the startup process during recovery. The process will send invalid messages so that other processes can view the schema changes.
summary
I feel that the upper and lower bounds of minMsgNum and maxMsgNum are very similar to those of sliding windows I learned in computer networks, and the insertion into the buffer array is also similar to that into the buffer of sliding windows. And the buffer array is the same ring queue structure as the sliding window.
Master the basic implementation of postgreSQL on IPC, establish your own shared memory mechanism through the shared memory of the operating system, and then implement the communication mechanism such as SI Message on the basis of shared memory.