programing

공유 메모리를 사용하여 프로세스 간 세마포어를 공유하는 방법

codeshow 2023. 9. 14. 23:41
반응형

공유 메모리를 사용하여 프로세스 간 세마포어를 공유하는 방법

하나의 서버에 N개의 클라이언트 프로세스를 동기화해야 합니다.이 과정들은 제가 3개의 세마포어를 선언했던 주요 함수에 의해 분기됩니다.POSIX semaphore를 사용하기로 했는데 이 과정들을 어떻게 공유해야 할지 모르겠습니다.공유 메모리가 올바르게 작동해야 한다고 생각했는데 몇 가지 질문이 있습니다.

  • 세그먼트에 적절한 메모리 공간을 할당하려면 어떻게 해야 합니까?
  • 사용해도 됩니까?sizeof(sem_t)인에size_t들녘의shmget내가 필요한 공간을 정확히 할당하기 위해서?
  • 이 상황과 비슷한 예를 가진 사람이 있습니까?

이름을 공유하기 쉽습니다.POSIX세마포어

  • 세마포어 이름 선택

    #define SNAME "/mysem"
    
  • 사용하다sem_open와 함께O_CREAT그들을 창조하는 과정에서

    sem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */
    
  • 다른 프로세스에서 세마포어 열기

    sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */
    

공유 메모리 사용을 고집한다면 당연히 가능합니다.

int fd = shm_open("shmname", O_CREAT, O_RDWR);
ftruncate(fd, sizeof(sem_t));
sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
    MAP_SHARED, fd, 0);

sem_init(sem, 1, 1);

위의 내용을 테스트해 본 적이 없어서 완전히 엉망진창일 수도 있습니다.

저는 부모(프로듀서)가 N개의 자식(소비자) 스레드를 생성하고 이들 간의 데이터를 통신하기 위해 글로벌 어레이를 사용하는 샘플 애플리케이션을 작성했습니다.글로벌 어레이는 원형 메모리이며 어레이에 데이터를 쓸 때 보호됩니다.

머리글 파일:

#define TRUE 1
#define FALSE 0

#define SUCCESS 0
#define FAILURE -1

/*
 * Function Declaration.
 */
void Parent( void );
int CalcBitmap(unsigned int Num);
void InitSemaphore( void );
void DeInitSemaphore( void );
int ComputeComplexCalc( int op1, int op2);

/*
 * Thread functions.
 */
void *Child( void *arg );
void *Report( void *arg1 );

/*
 * Macro Definition.
 */
#define INPUT_FILE  "./input.txt"
#define NUM_OF_CHILDREN 10
#define SIZE_CIRCULAR_BUF 256
#define BUF_SIZE 128
//Set the bits corresponding to all threads.
#define SET_ALL_BITMAP( a ) a = uiBitmap;     
//Clears the bit corresponding to a thread after
//every read. 
#define CLEAR_BIT( a, b ) a &= ~( 1 << b );   

/*
 * Have a dedicated semaphore for each buffer
 * to synchronize the acess to the shared memory
 * between the producer (parent) and the consumers. 
 */
sem_t ReadCntrMutex;
sem_t semEmptyNode[SIZE_CIRCULAR_BUF];

/*
 * Global Variables.
 */
unsigned int uiBitmap = 0;
//Counter to track the number of processed buffers.
unsigned int Stats;                           
unsigned int uiParentCnt = 0;
char inputfile[BUF_SIZE];
int EndOfFile = FALSE;

/*
 * Data Structure Definition. ( Circular Buffer )
 */
struct Message
{
    int id;
    unsigned int BufNo1;
    unsigned int BufNo2;
    /* Counter to check if all threads read the buffer. */
    unsigned int uiBufReadCntr;
};
struct Message ShdBuf[SIZE_CIRCULAR_BUF];

소스 코드:

    /*
# 
# ipc_communicator is a IPC binary where a parent
# create ten child threads, read the input parameters
# from a file, sends that parameters to all the children.
# Each child computes the operations on those input parameters
# and writes them on to their own file.
#
#  Author: Ashok Vairavan              Date: 8/8/2014
*/

/*
 * Generic Header Files.
 */
#include "stdio.h"
#include "unistd.h"
#include "string.h"
#include "pthread.h"
#include "errno.h"
#include "malloc.h"
#include "fcntl.h"
#include "getopt.h"
#include "stdlib.h"
#include "math.h"
#include "semaphore.h"

/*
 * Private Header Files.
 */
#include "ipc*.h"

/*
 * Function to calculate the bitmap based on the 
 * number of threads. This bitmap is used to 
 * track which thread read the buffer and the
 * pending read threads.
 */
int CalcBitmap(unsigned int Num)
{
   unsigned int uiIndex;

   for( uiIndex = 0; uiIndex < Num; uiIndex++ )
   {
      uiBitmap |= 1 << uiIndex;
   }
   return uiBitmap;
}
/*
 * Function that performs complex computation on
 * numbers.
 */
int ComputeComplexCalc( int op1, int op2)
{
    return sqrt(op1) + log(op2);
}

/*
 * Function to initialise the semaphores. 
 * semEmptyNode indicates if the buffer is available
 * for write. semFilledNode indicates that the buffer
 * is available for read.
 */
void InitSemaphore( void )
{
   unsigned int uiIndex=0;

   CalcBitmap(NUM_OF_CHILDREN);

   sem_init( &ReadCntrMutex, 0, 1);
   while( uiIndex<SIZE_CIRCULAR_BUF )
   {
    sem_init( &semEmptyNode[uiIndex], 0, 1 );
        uiIndex++;
   }
   return;
}

/* 
 * Function to de-initilaise semaphore.
 */
void DeInitSemaphore( void )
{
   unsigned int uiIndex=0;

   sem_destroy( &ReadCntrMutex);
   while( uiIndex<SIZE_CIRCULAR_BUF )
   {
    sem_destroy( &semEmptyNode[uiIndex] );
        uiIndex++;
   }
   return;
}

/* Returns TRUE if the parent had reached end of file */
int isEOF( void )
{
   return __sync_fetch_and_add(&EndOfFile, 0);
}

/*
 * Thread functions that prints the number of buffers
 * processed per second.
 */ 
void *Report( void *arg1 )
{
   int CumStats = 0;

   while( TRUE )
   {
      sleep( 1 );
      CumStats += __sync_fetch_and_sub(&Stats, 0);
      printf("Processed Per Second: %d Cumulative Stats: %d\n", 
               __sync_fetch_and_sub(&Stats, Stats), CumStats);
   }
   return NULL;
}

/*
 * Function that reads the data from the input file
 * fills the shared buffer and signals the children to
 * read the data.
 */
void Parent( void )
{
   unsigned int op1, op2;
   unsigned int uiCnt = 0;

   /* Read the input parameters and process it in
    * while loop till the end of file.
    */

   FILE *fp = fopen( inputfile, "r" );
   while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
   {
     /* Wait for the buffer to become empty */
     sem_wait(&semEmptyNode[uiCnt]);

     /* Access the shared buffer */
     ShdBuf[uiCnt].BufNo1 = op1;
     ShdBuf[uiCnt].BufNo2 = op2;

     /* Bitmap to track which thread read the buffer */
     sem_wait( &ReadCntrMutex );
     SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
     sem_post( &ReadCntrMutex );

     __sync_fetch_and_add( &uiParentCnt, 1);
     uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;
   }

   /* If it is end of file, indicate it to other
    * children using global variable.
    */ 
   if( feof( fp ) )
   {
      __sync_add_and_fetch(&EndOfFile, TRUE);
      fclose( fp ); 
      return;
   }

   return;
}

/* 
 * Child thread function which takes threadID as an
 * argument, creates a file using threadID, fetches the
 * parameters from the shared memory, computes the calculation,
 * writes the output to their own file and will release the 
 * semaphore when all the threads read the buffer.
 */
void *Child( void *ThreadPtr )
{
   unsigned int uiCnt = 0, uiTotalCnt = 0;
   unsigned int op1, op2;
   char filename[BUF_SIZE];
   int ThreadID;

   ThreadID = *((int *) ThreadPtr);
   free( ThreadPtr );

   snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );

   FILE *fp = fopen( filename, "w" );
   if( fp == NULL )
   {
      perror( "fopen" );
      return NULL;
   }

   while (TRUE)
   {
      /* Wait till the parent fills the buffer */
      if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
      {
         /* Access the shared memory */
         op1 = ShdBuf[uiCnt].BufNo1;
         op2 = ShdBuf[uiCnt].BufNo2;

         fprintf(fp, "%d %d = %d\n", op1, op2, 
                               ComputeComplexCalc( op1, op2) ); 

         sem_wait( &ReadCntrMutex );
         ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
         if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
         {
            __sync_add_and_fetch(&Stats, 1);

            /* Release the semaphore lock if 
               all readers read the data */
            sem_post(&semEmptyNode[uiCnt]);
         }
         sem_post( &ReadCntrMutex );

         uiTotalCnt++;
         uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;

         /* If the parent reaches the end of file and 
            if the child thread read all the data then break out */
         if( isEOF() && ( uiTotalCnt ==  uiParentCnt ) )
         {
            //printf("Thid %d p %d c %d\n", ThreadID, uiParentCnt, uiCnt );
            break;
         }
      }
      else
      {
         /* Sleep for ten micro seconds before checking 
            the shared memory */
         usleep(10);
      }
   }
   fclose( fp );
   return NULL;
}

void usage( void )
{
    printf(" Usage:\n");
    printf("         -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt");
    printf("         -?  - Help Menu. \n" );
    exit(1);
}

int main( int argc, char *argv[])
{
   pthread_attr_t ThrAttr;
   pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
   unsigned int uiThdIndex = 0;
   unsigned int *pThreadID;
   int c;

   strncpy( inputfile, INPUT_FILE, BUF_SIZE );

   while( ( c = getopt( argc, argv, "f:" )) != -1 )
   {
       switch( c )
       {
           case 'f':
                strncpy( inputfile, optarg, BUF_SIZE );
                break;

           default:
                usage();
        }
   }

   if( access( inputfile, F_OK ) == -1 )
   {
      perror( "access" );
      return FAILURE;
   } 

   InitSemaphore();

   pthread_attr_init( &ThrAttr );
   while( uiThdIndex< NUM_OF_CHILDREN )
   {
      /* Allocate the memory from the heap and pass it as an argument
       * to each thread to avoid race condition and invalid reference
       * issues with the local variables.
       */
      pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
      if( pThreadID == NULL )
      {
     perror( "malloc" );
     /* Cancel pthread operation */
     return FAILURE;
      }
      *pThreadID = uiThdIndex;
      if( pthread_create( 
             &ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
      {
    printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno);
    perror( "pthread_create" );
    return FAILURE;
      }
      uiThdIndex++;
   }

   /* Report thread reports the statistics of IPC communication
    * between parent and childs threads.
    */
   if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
   {
    perror( "pthread_create" );
    return FAILURE;
   }

   Parent();

   uiThdIndex = 0;
   while( uiThdIndex < NUM_OF_CHILDREN )
   {
      pthread_join( ThrChild[uiThdIndex], NULL );
      uiThdIndex++;
   }

   /*
    * Cancel the report thread function when all the 
    * children exits.
    */
   pthread_cancel( ThrReport );

   DeInitSemaphore();

   return SUCCESS;
}

네, 두 공정 사이에 명명된 세마포를 사용할 수 있습니다.파일을 열 수 있습니다.

snprintf(sem_open_file, 128, "/sem_16");
ret_value = sem_open((const char *)sem_open_file, 0, 0, 0);

if (ret_value == SEM_FAILED) {
    /*
     * No such file or directory
     */
    if (errno == ENOENT) {
        ret_value = sem_open((const char *)sem_open_file,
                           O_CREAT | O_EXCL, 0777, 1);
     }
}

그리고 다른 공정에서도 이것을 사용할 수 있습니다.

공유 메모리 접근법이 과연 효과가 있을지 정말 의문입니다.

AFAIK 세마포어는 실제로 단지 손잡이일 뿐이며, 그것이 열리는 과정에서 유효합니다.진짜 세마포어 정보는 커널 메모리에 있습니다.

따라서 다른 공정에서 동일한 핸들 값을 사용하는 것은 불가능합니다.

언급URL : https://stackoverflow.com/questions/8359322/how-to-share-semaphores-between-processes-using-shared-memory

반응형