postgreSQL source code analysis - storage management - memory management

2021SC@SDUSC

summary

After analyzing the contents related to buffer pool in the previous blog, let's analyze the contents related to postgreSQL shared memory, which involves IPC (inter process communication).

Because the process is the smallest unit of resources allocated by the operating system, each process has its own independent set of system resources, and the resources between different processes are isolated from each other. In order to enable different processes to access resources and work together, the operating system provides IPC mechanism. IPC in PostgreSQL is mainly realized by sharing memory, that is, open up a memory space in the system where all processes can read and write, and agree on the time and method for processes to read and write this memory, so that processes can exchange data through this shared memory.

Source code analysis

During postgreSQL initialization, the system allocates a memory area that is visible to all backend processes of postgreSQL, which is shared memory.
On the basis of shared memory, postgreSQL also provides other functions:

  1. Communication between postgres process and postmaster
  2. SI Message mechanism, i.e. invalid message delivery mechanism
  3. Unified management process related variables and functions

This time, we will mainly analyze the shared memory and SI Message mechanism.

Shared memory and its management

Initialization of shared memory

The source code is located in Src / backend / storage / IPC / ipci C.

void
CreateSharedMemoryAndSemaphores(int port)
{
	PGShmemHeader *shim = NULL;
	//Whether it is exec is judged here_ In the case of backend, it is called by the back end of the postmaster branch (at this time, the shared memory already exists and does not need to be initialized, but the pointer referencing the shared structure needs to be initialized in the local memory)
	if (!IsUnderPostmaster)
	{
		PGShmemHeader *seghdr;
		Size		size;//Store the calculated shared memory size
		int			numSemas;//Store the calculated semaphore quantity

		//Calculate the number of semaphores required
		numSemas = ProcGlobalSemas();
		numSemas += SpinlockSemas();

		
		 //Here we begin to calculate the size of shared memory (including some estimates)
		size = 100000;
		//The amount of memory required to add semaphores (the number of semaphores has been calculated above)
		size = add_size(size, PGSemaphoreShmemSize(numSemas));
		size = add_size(size, SpinlockSemaSize());
		//Calculation and estimation of the size of various locks, structures and indexes in shared memory
		size = add_size(size, hash_estimate_size(SHMEM_INDEX_SIZE,
												 sizeof(ShmemIndexEnt)));
		size = add_size(size, BufferShmemSize());
		size = add_size(size, LockShmemSize());
		size = add_size(size, PredicateLockShmemSize());
		size = add_size(size, ProcGlobalShmemSize());
		size = add_size(size, XLOGShmemSize());
		size = add_size(size, CLOGShmemSize());
		size = add_size(size, CommitTsShmemSize());
		size = add_size(size, SUBTRANSShmemSize());
		size = add_size(size, TwoPhaseShmemSize());
		size = add_size(size, BackgroundWorkerShmemSize());
		size = add_size(size, MultiXactShmemSize());
		size = add_size(size, LWLockShmemSize());
		size = add_size(size, ProcArrayShmemSize());
		size = add_size(size, BackendStatusShmemSize());
		size = add_size(size, SInvalShmemSize());
		size = add_size(size, PMSignalShmemSize());
		size = add_size(size, ProcSignalShmemSize());
		size = add_size(size, CheckpointerShmemSize());
		size = add_size(size, AutoVacuumShmemSize());
		size = add_size(size, ReplicationSlotsShmemSize());
		size = add_size(size, ReplicationOriginShmemSize());
		size = add_size(size, WalSndShmemSize());
		size = add_size(size, WalRcvShmemSize());
		size = add_size(size, ApplyLauncherShmemSize());
		size = add_size(size, SnapMgrShmemSize());
		size = add_size(size, BTreeShmemSize());
		size = add_size(size, SyncScanShmemSize());
		size = add_size(size, AsyncShmemSize());
#ifdef EXEC_BACKEND
		size = add_size(size, ShmemBackendArraySize());
#endif

		/* freeze the addin request size and include it */
		addin_request_allowed = false;
		size = add_size(size, total_addin_request);

		/* might as well round it off to a multiple of a typical page size */
		size = add_size(size, 8192 - (size % 8192));

		elog(DEBUG3, "invoking IpcMemoryCreate(size=%zu)", size);

		//Initialize shared memory segment
		seghdr = PGSharedMemoryCreate(size, port, &shim);
		//Initializes the access method of shared memory
		InitShmemAccess(seghdr);
		 //Create semaphore
		PGReserveSemaphores(numSemas, port);

#ifndef HAVE_SPINLOCKS
		//Initialize spin lock semaphore
		SpinlockSemaInit();
#endif
	}
	else
	{
		 //This is the above-mentioned EXEC_BANCKEND condition, which has been attached to shared memory.
#ifndef EXEC_BACKEND
		elog(PANIC, "should be attached to shared memory already");
#endif
	}

	if (!IsUnderPostmaster)
		//Initialize allocation of shared memory
		InitShmemAllocation();

	 //Initialize LWLocks. This lock is used to allocate shared memory. The following function requires this lock.
	CreateLWLocks();

	 //Initialize shared memory index hash table
	InitShmemIndex();

	 //Set xlog,clog and buffer pool
	XLOGShmemInit();
	CLOGShmemInit();
	CommitTsShmemInit();
	SUBTRANSShmemInit();
	MultiXactShmemInit();
	InitBufferPool();

	//Initialize lock manager
	InitLocks();
	 //Initialize predicate lock manager
	InitPredicateLocks();

	//Initialize process table
	if (!IsUnderPostmaster)
		InitProcGlobal();
	CreateSharedProcArray();
	CreateSharedBackendStatus();
	TwoPhaseShmemInit();
	BackgroundWorkerShmemInit();

	//Create invalid messaging
	CreateSharedInvalidationState();

	//Establish inter process communication mechanism, i.e. IPC
	PMSignalShmemInit();
	ProcSignalShmemInit();
	CheckpointerShmemInit();
	AutoVacuumShmemInit();
	ReplicationSlotsShmemInit();
	ReplicationOriginShmemInit();
	WalSndShmemInit();
	WalRcvShmemInit();
	ApplyLauncherShmemInit();

	//Set up other modules that require shared memory space
	SnapMgrInit();
	BTreeShmemInit();
	SyncScanShmemInit();
	AsyncShmemInit();

	//Initialize dynamic shared memory
	if (!IsUnderPostmaster)
		dsm_postmaster_startup(shim);

	//Memory allocation for loadable modules
	if (shmem_startup_hook)
		shmem_startup_hook();
}

This function is responsible for initializing shared memory and semaphores. It is called by postmaster or a separate back-end process. The general process is as follows:

  1. Calculate the total required size of shared memory
  2. Allocate shared memory space
  3. Initialize shared memory header pointer
  4. Create semaphore
  5. Building shared memory index tables
  6. Initialize each module and allocate shared memory space

Shared memory Hash table
During initialization, a Hash table will be created in the shared memory. When other modules need to open up a space in the shared memory, they will call relevant functions to obtain an area in the shared memory and an index in the Hash table.

This function calls Src / backend / storage / IPC / shmem C to initialize the memory. I won't analyze it in detail here.

SI Message mechanism

SI Message is mainly used to synchronize caches of different processes. Here we mainly analyze how SI Message is implemented with the help of the above shared memory.

shmInvalBuffer

The data structure is located in shared memory and is used to record all invalid messages sent by the system and the progress of all processes in processing invalid messages.

static SISeg *shmInvalBuffer;	

According to the code, you can see that this is a pointer to a global variable of type static, and the structure type is SISeg.

SISeg structure

typedef struct SISeg
{
	//General status information
	int			minMsgNum;		//The minimum number of invalid messages in the Buffer that have not been processed by all processes, that is, the oldest message still needed
	int			maxMsgNum;		//The next array element subscript that can be used to hold the new invalid message
	int			nextThreshold;	//The total number of messages to be cleaned up, which is used when calling SICleanupQueue
	int			lastBackend;	//The subscript of the last active procState entry
	int			maxBackends;	//The size of the procState array

	slock_t		msgnumLock;		//This spin lock ensures the process synchronization of maxMsgNum

	 //A fixed length array that stores invalid messages. Each element of the array stores an invalid message. And the array is a ring queue.
	SharedInvalidationMessage buffer[MAXNUMMESSAGES];
	//#define MAXNUMMESSAGES 4096 this is the definition of the size of the buffer array in the source code. It is 4096
	
	 //Used to store the progress (status) of each back-end process processing invalid messages
	ProcState	procState[FLEXIBLE_ARRAY_MEMBER];
	//The size of the array is related to the maximum number of processes allowed by the system
} SISeg;

About the insertion of buffer ring queue:
When the array is initially empty, the new invalid messages are stored in the elements of the array from front to back. When the array is full, the new invalid messages will return to the head of the buffer array and start insertion.
About minMsgNum and maxMsgNum:
One of the two can be considered as the lower bound of the invalid message, and the other can be considered as the upper bound of the invalid message. That is, invalid messages with a sequence number smaller than minMsgNum have been processed by all processes, while invalid messages with a sequence number greater than or equal to maxMsgNum have not been generated, and invalid messages between them are that at least one process has not processed them. Therefore, in the buffer ring, positions outside the window composed of minMsgNum and maxMsgNum can be used to store newly added invalid messages.
However, when inserting invalid messages into the SIMessage queue, there will still be insufficient space (at this time, there are invalid messages that have not been completely read in the message queue). At this time, it is necessary to clean up some invalid messages (when the sum of the current number of messages plus the number of messages to be inserted exceeds the above nextThreshold).

SICleanupQueue

It is recommended to read the following ProcState structure first, and then look at the function analysis here, because the following structure will be used.
This function is responsible for cleaning up invalid message queues.

void
SICleanupQueue(bool callerHasWriteLock, int minFree)
//callerHasWriteLock is set to true when the caller holds SInvalWriteLock
//minFree is the minimum number of messages to release
{
	//Pointer to buffer
	SISeg	   *segP = shmInvalBuffer;
	int			min,
				minsig,
				lowbound,
				numMsgs,
				i;
	ProcState  *needSig = NULL;

	//Pay attention to all readers and writers here
	//If the caller does not have a SInvalWriteLock, the write lock is acquired
	if (!callerHasWriteLock)
		LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
	//Acquire read lock (SInvalWReadLock)
	LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);

	 //Used to record the global minimum nextMsgNum
	min = segP->maxMsgNum;
	//Here, the upper bound of the number of purges is calculated, and the difference between the upper bound and the lowbound is the minimum number of processes that need to be notified for acceleration
	minsig = min - SIG_THRESHOLD;
	//minFree is the minimum value of the queue space to be released (input parameter). Here is the location of the space that must be released during this cleaning process
	lowbound = min - MAXNUMMESSAGES + minFree;

lowbound and minsig
Because the clean-up operation will notify the process that has not finished processing the relevant messages, and the process will discard all tuples in the Cache, which will lead to Cache overload. If all processes overload the Cache, it will lead to higher I/O times. In order to reduce the number of Cache overloads, the above minimum value is calculated. As long as the value of nextMsgNum is less than lowbound, the process needs to be reset, that is, set the resetState variable to true, and the process will automatically overload the Cache. For processes whose nextMsgNum value is between lowbound and minsig, although it has nothing to do with this cleanup, in order to avoid frequent cleanup operations, these processes will be required to speed up the processing of invalid messages. Then the cleanup operation will find the slowest one of these processes and send him PROCSIG_CATCHUP_INTERRUPT signal. After receiving SIGUSR1, the process will process all invalid messages at one time, and then continue to send procsig to the next process with the slowest progress_ CATCHUP_ Interrupt signal

	//Traverse the ProcState structure of all processes to check, and set the resetState variable of the process whose nextMsgNum is lower than lowbound to true
	//And find the slowest progress in the process between lowbound and minsig in nextMsgNum
	for (i = 0; i < segP->lastBackend; i++)
	{
		//Fetch the ProcState structure of the process
		ProcState  *stateP = &segP->procState[i];
		//Fetch the nextMsgNum of the process for checking
		int			n = stateP->nextMsgNum;

		//If the procPid of the process is 0 (that is, the process state is not recorded, indicating an inactive process), or it is already in the state that needs to be reset, or sendOnly is true, it will be ignored directly
		if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
			continue;

		 //If the nextMsgNum of the process is lower than lowbound
		if (n < lowbound)
		{
			//Because lowbound is the lower bound of cleanup, the process must be reset
			stateP->resetState = true;//Set reset required to true
			//Proceed to the next process
			continue;
		}

		//Here, the global minimum nextMsgNum is obtained by comparison one by one
		if (n < min)
			min = n;

		//Look for the farthest back-end process that does not send a signal
		if (n < minsig && !stateP->signaled)
		{
			//Update minsig
			minsig = n;
			//Set the signal to be sent
			needSig = stateP;
		}
	}
	segP->minMsgNum = min;

	 //When min becomes the real minMsgNum, reduce all message counters to prevent counter overflow.
	if (min >= MSGNUMWRAPAROUND)
	{
		//Counter decrease
		segP->minMsgNum -= MSGNUMWRAPAROUND;
		segP->maxMsgNum -= MSGNUMWRAPAROUND;
		for (i = 0; i < segP->lastBackend; i++)
		{
			//The nextMsgNum of the process should also be reduced
			segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
		}
	}

	 //Here, it is necessary to decide how many messages are left in the queue and assign a new value to threshold for the next call of the function
	numMsgs = segP->maxMsgNum - segP->minMsgNum;
	if (numMsgs < CLEANUP_MIN)//If the quantity to be left is less than the minimum value of cleaning
		segP->nextThreshold = CLEANUP_MIN;
	else//otherwise
		segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;

	 //If necessary, signal the process to speed up
	if (needSig)
	{
		//Get pid
		pid_t		his_pid = needSig->procPid;
		//Get backendId
		BackendId	his_backendId = (needSig - &segP->procState[0]) + 1;
		//Set sent signal to true
		needSig->signaled = true;
		//Release the read lock and write lock (because SendProcSignal executes slowly, release the lock before it executes)
		LWLockRelease(SInvalReadLock);
		LWLockRelease(SInvalWriteLock);
		elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
		//Call the SendProcSignal function to send a signal
		SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
		//Restore the lock state after sending (because it is about to end, there is no need to take it again for reading and writing)
		if (callerHasWriteLock)//If the caller has a write lock
			//Regain write lock
			LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
	}
	else//The process does not need to send a signal
	{
		//Release read lock
		LWLockRelease(SInvalReadLock);
		//If you do not initially have a write lock
		if (!callerHasWriteLock)
			//Release write lock
			LWLockRelease(SInvalWriteLock);
	}
}

The main process is as follows:

  1. Calculate the values of min,lowbound and minsig
  2. Check the ProcState structure of each process, set the resetState variable of the process where nextMsgNum is lower than lowbound to true, and find the farthest (slowest) process in the process where nextMsgNum is between lowbound and minsig
  3. Recalculate threshold
  4. Send a signal to the farthest process found in step 2

ProcState structure

typedef struct ProcState
{
	pid_t		procPid;		//The PID of the back-end process is set to 0 if the process status is not recorded
	PGPROC	   *proc;			//PGPROC of backend process

	int			nextMsgNum;		//The next invalid message number to be read by the process, i.e. the index of buffer array. When the procPid above is 0 or the resetState below is true, the variable has no meaning
	bool		resetState;		//Records whether the process needs to reset the status
	bool		signaled;		//Record whether the process receives a catchup signal
	bool		hasMessages;	//Records whether the process has unread messages

	bool		sendOnly;		//Used to mark whether the process only sends invalid messages and does not receive invalid messages

	 //It has no meaning for ProcState whose proc is not 0
	LocalTransactionId nextLXID;//Record the next LocalTransactionId
} ProcState;

About sendOnly:
If true, it means that the process will only send invalid messages and will not receive invalid messages. This situation only makes sense during the startup process during recovery. The process will send invalid messages so that other processes can view the schema changes.

summary

I feel that the upper and lower bounds of minMsgNum and maxMsgNum are very similar to those of sliding windows I learned in computer networks, and the insertion into the buffer array is also similar to that into the buffer of sliding windows. And the buffer array is the same ring queue structure as the sliding window.
Master the basic implementation of postgreSQL on IPC, establish your own shared memory mechanism through the shared memory of the operating system, and then implement the communication mechanism such as SI Message on the basis of shared memory.

Keywords: Database PostgreSQL

Added by dad00 on Mon, 20 Dec 2021 15:52:23 +0200