OS,  Study

[OS] Message Queue

IPC를 위한 방법은 크게 6가지가 있다.

  • Pipe
  • FIFO
  • Message Queue
  • Shared Memory
  • Semaphore
  • Socket

Pipe나 FIFO는 file descriptor를 이용해서 통신하기 때문에 virtual file system에 속한다. Message queue, shared memory, semaphore는 key값을 통해서 통신한다.

기본적으로 message queue는 permission이 R/W는 존재하며 X가 없다.

기본적으로 내부는 singlely-linked list로 구성된다. Pipe와 큰 차이점은 타입을 줄 수 있다. Message를 전송할 때 반드시 타입을 명시해야한다. Message queue의 구조는 다음과 같다.

struct msqid_ds {
    struct ipc_perm msg_perm;     /* Ownership and permissions */
    time_t          msg_stime;    /* Time of last msgsnd(2) */
    time_t          msg_rtime;    /* Time of last msgrcv(2) */
    time_t          msg_ctime;    /* Time of last change */
    unsigned long   __msg_cbytes; /* Current number of bytes in
                                     queue (nonstandard) */
    msgqnum_t       msg_qnum;     /* Current number of messages
                                     in queue */
    msglen_t        msg_qbytes;   /* Maximum number of bytes
                                     allowed in queue */
    pid_t           msg_lspid;    /* PID of last msgsnd(2) */
    pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
};

Message queue에 전송되는 데이터 구조는 다음과 같다.

struct {
   long  data_type; // must have variable
   int   data_num;
   char  data_buff[BUFF_SIZE];
}

참고로 message queue를 사용하는데 필요한 header file들은 다음과 같다.

#include <sys/msg.h>
#include <sys/ipc.h>
#include <sys/types.h>

msgget

Message queue를 만들 때 사용하는 함수다.

int msgget(key_t key, int msgflg);
ArgumentDescription
key_t keyMessage queue를 얻어올 때 사용하는 key 값
int msgflgIPC_CREAT: key에 해당하는 큐가 있다면 큐의 식별자를 반환하며, 없으면 생성
IPC_EXCL: key에 해당하는 큐가 없다면 생성하지만 있다면 -1을 반환하고 복귀
ReturnDescription
-1이 아닌 값Message queue ID
-1실패

msgctl

Message queue를 관리할 때 쓰는 함수다. Message queue를 지울 때도 사용된다.

int msgctl ( int msqid, int cmd, struct msqid_ds *buf )
ArgumentDescription
int msqid목표하는 message queue ID
int cmdCommand 종류
IPC_STAT: message queue의 현재 상태를 buf에 저장
IPC_SET: message queue의 상태를 buf 값으로 변경합니다. 그러나 모든 정보는 안되고 msg_perm과 msg_qbytes 내용만 변경할 수 있음
IPC_RMID: message queue를 삭제합니다. 이럴 때에는 buf가 필요 없으므로 buf 를 0 으로 지정
struct msqid_ds *bufMessage queue 정보를 받을 버퍼
ReturnDescription
0성공
-1실패

msgsnd

Message를 전달할 때 쓰는 함수다.

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

ArgumentDescription
int msqid목표하는 message queue ID
const void *msgp전송할 데이터
size_t msgsz전송할 데이터 크기
int msgflg동작 옵션
0: message queue에 공간이 생길 때 까지 대기
IPC_NOWAIT: message queue에 여유 공간이 없다면 바로 -1 로 복귀
ReturnDescription
0성공
-1실패

msgrcv

Message를 받는데 사용하는 함수다. 참고로 argument 중 첫 번째 인자인 msgid는 수신된 데이터 중 구조체의 첫 번째 멤버 변수와 비교하는데 사용된다.

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
ArgumentDescription
int msqid목표하는 message queue ID
void *msgp전송할 데이터
size_t msgsz전송할 데이터 크기
long msgtypMessage queue에 있는 데이터 중 어떤 값을 읽을 지에 대한 옵션
0: message queue에 자료가 있다면 첫 번째의 자료를 읽음
양수: 양수로 지정한 값과 같은 data_type의 자료 중 첫 번째를 읽어 들입니다.
음수: 음수 값을 절대 값으로 변경하고, 이 절대값과 같거나 보다 제일 작은 data_type의 자료를 구합니다.
Message queue에 data_type 이 1, 5, 15 이고 -10을 지정했다면 1의 데이터를 구함
int msgflg읽어 들이는 옵션
0: 옵션 사용하지 않음
IPC_NOWAIT: message queue에 메시지가 없다면 기다리지 않고 -1 로 복귀
MSG_NOERROR: message queue에 있는 자료가 준비된 데이터 크기보다 크다면 초과 부분을 잘라 내고 읽어 들일 수 있는 부분만 담아 전달. 이 옵션이 없다면 메시지 큐에 자료가 있다고 하더라도 -1 로 실패
ReturnDescription
0성공
-1실패

Example

5000번 Message queue를 생성해서 타입 번호 1번 message를 전송하는 프로그램이다. Client 코드는 echo client로 구현되어있다.

// msglib.c
#include "msglib.h"

int CreateMQ(long key) {
    return( msgget(key, IPC_CREAT | 0777) );
}

int OpenMQ(long key) {
    return( msgget(key, 0) );
}

long SendMQ(int qid, long mtype, MSG_t msg) {
    int st; 

    msg.to_mtype = mtype;
    msg.fm_mtype = getpid();

    /* msgsnd() : Return 0 if OK, -1 on error */        
    st = msgsnd(qid, &msg, (sizeof(MSG_t)-sizeof(long)), IPC_NOWAIT);

    if(st < 0)
        return -1L;

    return msg.fm_mtype;
}

long RecvMQ(int qid, long mtype, MSG_t *msg) {
    int st; 

    /* msgrcv() : Return recv bytes if OK, -1 on error */
    st = msgrcv(qid, msg,sizeof(MSG_t)- sizeof(long), 
            mtype, IPC_NOWAIT);
    if(st < 0)
        return -1L;
    return msg->fm_mtype;
}

int GetFreeSizeMQ(int qid, long *freesize) {
    int rtn;
    struct msqid_ds stat_q;

    if(qid<0)
        return -1; 

    /* msgctl() : Return  0 if OK, -1 on error */
    rtn = msgctl(qid,IPC_STAT,&stat_q);
    if(rtn < 0)
        return -1; 

    *freesize = stat_q.msg_qbytes - stat_q.msg_cbytes;

    return rtn;
}

int RemoveMQ(int qid) {
    return(msgctl(qid, IPC_RMID, 0));
}
// msglib.h
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

typedef struct msg_q {
    long to_mtype;
    long fm_mtype;
    char mtext[100];
} MSG_t;


int  CreateMQ( long key_t );
int  OpenMQ( long key_t );
long RecvMQ( int qid, long mtype, MSG_t *msgbuf );
long SendMQ( int qid, long mtype, MSG_t msgbuf );
int GetFreeSizeMQ(int qid, long *freesize);
int  RemoveMQ( int qid );
// msgserver.c
#include "msglib.h"
#include <stdio.h> 
#include <string.h> 
int main() {
    int qid;
    MSG_t msg;
    long mtype;

    qid=CreateMQ(5000);

    if(qid<0) {
        printf("q open fail: %d\n",errno);    
        return -1;
    }
    while(1) {
        mtype = RecvMQ(qid, 1L,&msg);

        if(mtype > 0)
        {
            if(!strcmp(msg.mtext, "exit"))
            {
                printf("Server Process Exit\n");    
                break;
            }
            printf("recv : %s\n",msg.mtext);    
            if(SendMQ(qid, mtype, msg)<0)
                break;
        }
    }
    RemoveMQ(qid);

    return 0;
}
// msgclient.c
#include "msglib.h"
#include <stdio.h>
#include <string.h>
int main() {
    int qid,st;
    MSG_t msg;

    qid = OpenMQ(5000);

    if(qid < 0) {
        printf("q open fail: %d\n",errno);    
        return -1;
    }
    while(1) {
        memset(msg.mtext,'\0',100); 
        printf("\ninput : ");
        fgets(msg.mtext, sizeof msg.mtext, stdin);
        msg.mtext[strlen(msg.mtext)-1] = 0; // '\n' => '\0'

        if(SendMQ(qid,1L,msg)<=0)   {
            printf("q send fail: %d\n",errno);    
            break;
        }

        if(!strcmp(msg.mtext, "exit"))    {
            printf("Client Process Exit\n");    
            break;
        }
        sleep(1);
        st = RecvMQ(qid, getpid(),&msg);
        if(st>0)
            printf("recv : %s\n", msg.mtext);
    }
    return 0;
}
$ gcc msglib.c -c msglib.o
$ gcc msgserver.c msglib.o -o server
$ gcc msgclient.c msglib.o -o client

위 상태로 세션을 1개 더 띄워서 server와 client를 각각 띄워서 통신이 되는 것을 확인할 수 있다.

./server
recv : hello // 2

----------------------------------------------------

./client 
input : hello // 1
recv : hello // 3

다수의 client가 존재 할 때 본인의 message만 받을 수 있도록 구분하는데 사용되는 ID는 각 client process의 PID를 이용한다.

Reference

  1. http://forum.falinux.com/zbxe/index.php?document_srl=420634&mid=C_LIB

Leave a Reply

Your email address will not be published. Required fields are marked *