OS: producer consumer problem (multi process + shared memory + semaphore)

I Introduction
When I used cout again after a year, I burst into tears. It was a touch of reunion after a long separation, although I basically forgot it. Take advantage of a lot of time to consolidate the problems of producers and consumers, and use pure C. Cherish the happy time when you can write code.

II analysis
Producer and consumer problem is an abstraction between multiple cooperative processes. Relationship between producers and consumers:
1. Access to the buffer is mutually exclusive. Since both will modify the buffer, when one party modifies the buffer, the other party cannot modify it. This is mutual exclusion.
2. The actions of one party affect the other party. Only when the buffer is not empty can it be consumed. When is it not empty? Production is not empty; If the buffer is full, it cannot be produced. When is it dissatisfied? Consumption is dissatisfied. This is a synchronous relationship.
To describe this relationship, on the one hand, shared memory is used to represent buffers; On the other hand, mutually exclusive semaphores are used to control the access to the buffer, and synchronous semaphores are used to describe the dependency between the two. (when multiple processes can access critical resources, it is necessary to add mutexes.)

III Shared storage
Shared storage is a means of inter process communication. Usually, semaphore synchronization or mutual exclusion are used to access shared storage. The principle of shared storage is to map the address space of a process to a shared storage segment. Under LINUX, create or obtain shared memory by using shmget function.
1. Create
1) Do not specify KEY
// IPC_PRIVATE indicates that memory needs to be created;  
//SHM_SIZE indicates the byte size;  
//SHM_MODE indicates that the access permission word, such as 0600, indicates that the user can read and write the memory
int shmget(key_t IPC_PRIVATE,size_t SHM_SIZE,int SHM_MODE);
2) Specify KEY
//If SHM_ If the shared storage pointed to by key already exists, the ID of the shared storage is returned;  
//Otherwise, create a shared store and return its ID
int  shmget(key_t SHM_KEY,size_t SHM_SIZE,int SHM_MODE);

2. Visit
Method 1
You only need the ID of the shared storage to obtain the actual address occupied by the shared storage through the shmat function. Therefore, you can use variables to store pointers to shared storage in the stack of the parent process. After fork ing, the child process can easily access shared storage through this pointer.
Method 2
If there is no parent-child relationship between processes, but the KEY of shared storage is negotiated, in each process, the I D of shared storage can be obtained through KEY and shmget function, and then the actual address of shared storage can be obtained through shmat function, and finally accessed.

In my implementation, I implement the producer as the parent process and the consumer as the child process, and share memory between processes through method 1.
IV Semaphore set
Semaphores have two primitives P and V. P locks resources and V releases resources. The interface using semaphore set under LINUX is particularly complex. The functions I use are as follows:
1. Create or obtain semaphore set
// IPC_PRIVATE means to create a semaphore set, NUM_OF_SEM indicates how many semaphores are in the set; FLAGS complex not investigated
semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );
// SEM_KEY is key_t type
//If SEM_ If the semaphore set represented by key exists, the ID of the semaphore set is returned
//If it does not exist, a semaphore set is created and the ID is returned
semget(SEM_KEY, NUM_OF_SEM,FLAGS);

2. Initialize semaphore
The created procedure does not specify the initial value of the semaphore, which needs to be specified using the semctl function.
semctl(int semSetId , int semIdx , int cmd, union semun su);

Where semSetId refers to the ID of the semaphore set, and semidx refers to the index of a semaphore in the semaphore set (starting from zero). If you want to set the value of the semaphore, you can fill in SETVAL. In order to set the value of the semaphore, you can specify su Val is the value to be set.
When I compile using union semun under UBUNTU, I always report an error:
invalid use of undefined type 'union semun'
It is said that the definition of semun was deleted under Linux. You can customize semun to solve the following problems:

#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/*   union   semun   is   defined   by   including   <sys/sem.h>   */ 
#else 
/*   according   to   X/OPEN   we   have   to   define   it   ourselves   */ 
union semun{
	int val;
	struct semid_ds *buf;
	unsigned short *array;
};
#endif

V Code decomposition

1. Header file

#include "stdio.h" / / support printf
#include <sys/shm. h> / / support shmget, shmat, etc
#include <sys/sem. h> / / support semget 
#include <stdlib. h> / / support exit

2. Semaphore
Three semaphores are required:
The first semaphore is used to restrict the producer from producing when the buffer is insufficient. It is a synchronous semaphore
The second semaphore is used to restrict consumers to consume only when there are products in the buffer. It is a synchronous semaphore
The third semaphore is used to restrict that producers and consumers must be mutually exclusive when accessing the buffer. It is a mutually exclusive semaphore

Create semaphore set, semget

if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
{
	perror("create semaphore failed");
	exit(1);
}

To initialize three semaphores, semctl, union semun is required

union semun su;
 
//Semaphore initialization, where su represents union semun 
su.val = N_BUFFER;//How many products can the current warehouse receive
if(semctl(semSetId,0,SETVAL, su) < 0){
	perror("semctl failed");
	exit(1);
}
su.val = 0;//There are currently no products
if(semctl(semSetId,1,SETVAL,su) < 0){
	perror("semctl failed");
	exit(1);
}
su.val = 1;//When it is 1, the buffer can be entered
if(semctl(semSetId,2,SETVAL,su) < 0){
	perror("semctl failed");
	exit(1);
}

Encapsulates the + 1 or - 1 operation on the value of a semaphore in the semaphore set

//semSetId represents the id of the semaphore set
//semNum represents the index of the semaphore to be processed in the semaphore set
void waitSem(int semSetId,int semNum)
{
	struct sembuf sb;
	sb.sem_num = semNum;
	sb.sem_op = -1;//It means to reduce the semaphore by one
	sb.sem_flg = SEM_UNDO;//
	//The second parameter is of type sembuf [] and represents an array
	//The third parameter represents the size of the array represented by the second parameter
	if(semop(semSetId,&sb,1) < 0){
		perror("waitSem failed");
		exit(1);
	}
}
void sigSem(int semSetId,int semNum)
{
	struct sembuf sb;
	sb.sem_num = semNum;
	sb.sem_op = 1;
	sb.sem_flg = SEM_UNDO;
	//The second parameter is of type sembuf [] and represents an array
	//The third parameter represents the size of the array represented by the second parameter
	if(semop(semSetId,&sb,1) < 0){
		perror("waitSem failed");
		exit(1);
	}
}

3. Use shared memory

//Encapsulate the content in the shared storage area with a structure
struct ShM{
	int start;
	int end;
}* pSM;
 
 
//Buffer allocation and initialization
if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
{
	perror("create shared memory failed");
	exit(1);
}
//shmat returns a void * pointer, which needs to be forcibly converted
pSM = (struct ShM *)shmat(shmId,0,0);
//Initialization work
pSM->start = 0;
pSM->end = 0;

4. Production process

while(1)
{
	waitSem(semSetId,0);//Get a space to store products
	waitSem(semSetId,2);//Occupy product buffer
	produce();
	sigSem(semSetId,2);//Release product buffer
	sleep(1);//Produce one every two seconds
	sigSem(semSetId,1);//Inform the consumer that there is a product
}

5. Consumption process

while(1)
{
	waitSem(semSetId,1);//There must be products to consume
	waitSem(semSetId,2);//Lock buffer
	consume();//To get the product, you need to modify the buffer
	sigSem(semSetId,2);//Free buffer
	sigSem(semSetId,0);//Tell the producer that there is room
	sleep(2);//Consumption frequency
}

Vi Code full text

#include "stdio.h"
#include <sys/shm.h>
#include <sys/sem.h>
#include <stdlib.h>
#define SHM_SIZE (1024*1024)
#define SHM_MODE 0600
#define SEM_MODE 0600
 
#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/*   union   semun   is   defined   by   including   <sys/sem.h>   */ 
#else 
/*   according   to   X/OPEN   we   have   to   define   it   ourselves   */ 
union semun{
	int val;
	struct semid_ds *buf;
	unsigned short *array;
};
#endif
 
struct ShM{
	int start;
	int end;
}* pSM;
 
const int N_CONSUMER = 3;//Number of consumers
const int N_BUFFER = 5;//Buffer capacity
int shmId = -1,semSetId=-1;
union semun su;//sem union, used to initialize semaphores
 
//semSetId represents the id of the semaphore set
//semNum represents the index of the semaphore to be processed in the semaphore set
void waitSem(int semSetId,int semNum)
{
	struct sembuf sb;
	sb.sem_num = semNum;
	sb.sem_op = -1;//It means to reduce the semaphore by one
	sb.sem_flg = SEM_UNDO;//
	//The second parameter is of type sembuf [] and represents an array
	//The third parameter represents the size of the array represented by the second parameter
	if(semop(semSetId,&sb,1) < 0){
		perror("waitSem failed");
		exit(1);
	}
}
void sigSem(int semSetId,int semNum)
{
	struct sembuf sb;
	sb.sem_num = semNum;
	sb.sem_op = 1;
	sb.sem_flg = SEM_UNDO;
	//The second parameter is of type sembuf [] and represents an array
	//The third parameter represents the size of the array represented by the second parameter
	if(semop(semSetId,&sb,1) < 0){
		perror("waitSem failed");
		exit(1);
	}
}
//It must be called with mutual exclusion and buffer dissatisfaction
void produce()
{
	int last = pSM->end;
	pSM->end = (pSM->end+1) % N_BUFFER;
	printf("production %d\n",last);
}
//It must be called with mutual exclusion and the buffer is not empty
void consume()
{
	int last = pSM->start;
	pSM->start = (pSM->start + 1)%N_BUFFER;
	printf("consume %d\n",last);
}
 
void init()
{
	//Buffer allocation and initialization
	if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
	{
		perror("create shared memory failed");
		exit(1);
	}
	pSM = (struct ShM *)shmat(shmId,0,0);
	pSM->start = 0;
	pSM->end = 0;
	
	//Semaphore creation
	//The first one: synchronization semaphore, which indicates the sequence. There must be space for production
	//The second one: synchronization semaphore, which indicates the sequence. Products must be available before consumption
	//Third: mutually exclusive semaphores. Producers and consumers cannot enter the buffer at the same time
 
	if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
	{
		perror("create semaphore failed");
		exit(1);
	}
	//Semaphore initialization, where su represents union semun 
	su.val = N_BUFFER;//How many products can the current warehouse receive
	if(semctl(semSetId,0,SETVAL, su) < 0){
		perror("semctl failed");
		exit(1);
	}
	su.val = 0;//There are currently no products
	if(semctl(semSetId,1,SETVAL,su) < 0){
		perror("semctl failed");
		exit(1);
	}
	su.val = 1;//When it is 1, the buffer can be entered
	if(semctl(semSetId,2,SETVAL,su) < 0){
		perror("semctl failed");
		exit(1);
	}
}
int main()
{
	int i = 0,child = -1;
	init();
	//Create multiple (N_CONSUMER) consumer subprocesses
	for(i = 0; i < N_CONSUMER; i++)
	{
		if((child = fork()) < 0)//Call to fork failed
		{
			perror("the fork failed");
			exit(1);
		}
		else if(child == 0)//Subprocess
		{
			printf("I'm the third %d Consumer subprocesses, PID = %d\n",i,getpid());
			while(1)
			{
				waitSem(semSetId,1);//There must be products to consume
				waitSem(semSetId,2);//Lock buffer
				consume();//To get the product, you need to modify the buffer
				sigSem(semSetId,2);//Free buffer
				sigSem(semSetId,0);//Tell the producer that there is room
				sleep(2);//Consumption frequency
			}
			break;//Must have
		}
	}
	
	
	//Parent process starts production
	if(child > 0)
	{
		while(1)
		{
			waitSem(semSetId,0);//Get a space to store products
			waitSem(semSetId,2);//Occupy product buffer
			produce();
			sigSem(semSetId,2);//Release product buffer
			sleep(1);//Produce one every two seconds
			sigSem(semSetId,1);//Inform the consumer that there is a product
		}
	}
	return 0;
}


--------
Copyright notice: This article is the original article of CSDN blogger "yaozhiyi", which follows the CC 4.0 BY-SA copyright agreement. Please attach the original source link and this notice for reprint.
Original link: https://blog.csdn.net/yaozhiyi/article/details/7561759

Keywords: Linux

Added by davidcriniti on Wed, 29 Dec 2021 12:17:46 +0200