当前位置: 首页 > 架构/云 > System V消息队列(Chapter 6)

System V消息队列(Chapter 6)

from: 

1、消息队列的基本概念

       消息队列就是一个消息的链表。有足够写权限的进程可往队列中放置消息,有足够读权限的进程可从队列中取走消息。每个消息是一个记录它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这跟管道和FIFO是相反的,对后者来说,除非读出者已存在,否则先有写入者是没有意义的。消息队列是随内核持续的。一个进程可以先往某个队列写入一些消息后终止,让另外一个进程在以后某个时刻读出这些消息。这一点管道和FIFO是不支持的。

2、消息队列的分类

       目前主要有两种类型的消息队列:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用。考虑到程序的可移植性,新开发的应用程序应尽量使用POSIX消息队列。

3、和消息队列相关的结构

       每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该消息队列的大量信息,包括消息队列键值、用户ID、组ID、消息队列中消息数目等等,甚至记录了最近对消息队列读写进程的ID。读者可以访问这些信息,也可以设置其中的某些信息。这个结构存于系统空间。

struct msg_queue {

    struct kern_ipc_perm q_perm;

    time_t q_stime;         /* last msgsnd time */

    time_t q_rtime;         /* last msgrcv time */

    time_t q_ctime;         /* last change time */

    unsigned long q_cbytes;     /* current number of bytes on queue */

    unsigned long q_qnum;       /* number of messages in queue */

    unsigned long q_qbytes;     /* max number of bytes on queue */

    pid_t q_lspid;          /* pid of last msgsnd */

    pid_t q_lrpid;          /* last receive pid */

    struct list_head q_messages;

    struct list_head q_receivers;

    struct list_head q_senders;

};

 

结构msqid_ds用来设置或返回消息队列的信息,存在于用户空间;

struct msqid_ds{ 
    struct ipc_perm msg_perm; 
    struct msg *msg_first; /* first message on queue,unused */ 
    struct msg *msg_last; /* last message in queue,unused */ 
    __kernel_time_t msg_stime; /* last msgsnd time */ 
    __kernel_time_t msg_rtime; /* last msgrcv time */
    __kernel_time_t msg_ctime; /* last change time */ 
    unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long msg_lqbytes; /* ditto */ 
    unsigned short msg_cbytes; /* current number of bytes on queue */ 
    unsigned short msg_qnum; /* number of messages in queue */
    unsigned short msg_qbytes; /* max number of bytes on queue */ 
    __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ 
};

 

4 打开或创建消息队列

       消息队列的内核持续性要求每个消息队列都在系统范围内对应唯一的键值,所以,要获得一个消息队列的描述字,只需提供该消息队列的键值即可。msgget用于创建一个消息队列或打开一个现存的队列。

1

名称:

msgget

功能:

创建消息队列

头文件:

#include <sys/types.h>

#include <sys/msg.h>

#inlcude <sys/ipc.h>

函数原形:

int msgget(key_t key,int msgflag);

参数:

key 消息队列的键

flag 一些标志位

返回值:

若成功则为消息队列描述字若出错则为-1

      

 

 

 

 

 

 参数key是一个键值,由ftok获得;msgflg参数是一些标志位。该调用返回与健值key相对应的消息队列描述字。

       在以下两种情况下,该调用将创建一个新的消息队列:

       1.如果没有消息队列与健值key相对应,并且msgflg中包含了IPC_CREAT标志位;

       2key参数为IPC_PRIVATE

       参数msgflg可以为以下:IPC_CREAT(创建消息队列)、IPC_EXCL )、IPC_NOWAIT  )或三者的或结果。

       还有注意的是:当创建一个新队列时,系统自动初始化struct msqid_ds结构的下列成员。

ipc_perm结构按我们以前说的进行初始化。该结构中mode成员按flag中的相应权限位设置。

msg_qnum,msg_lspid,msg_lrpid,msg_stime,msg_rtime都设置为0

msg_ctime设置为当前时间。

msg_qbytes设置为系统限制值。

 

5、获得和修改消息队列属性,删除消息队列

2.

名称:

msgctl

功能:

对消息队列进行多种操作

头文件:

#include <sys/msg.h>

函数原形:

int msgctl(int msqid, int cmd,struct msqid_ds *buf);

参数:

msqid   消息队列描述字

cmd    要执行的操作

buf     此队列的struct msqid_ds结构

返回值:

若成功返回0,若出错返回-1

      

 

 

 

 

 该系统调用对由msqid标识的消息队列执行cmd操作,共有三种cmd操作:IPC_STATIPC_SET IPC_RMID

IPC_STAT:该命令用来获取消息队列信息,返回的信息存贮在buf指向的msqid_da结构中;

IPC_SET:该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid_ds结构中;可设置属性包括:msg_perm.uidmsg_perm.gidmsg_perm.mode以及msg_qbytes,同时,也影响msg_ctime成员。

IPC_RMID:删除msqid_ds标识的消息队列.

 

6、用消息队列发送和接收消息

3

名称:

msgsnd

功能:

将数据放到消息队列上

头文件:

#include <sys/types.h>

#include <sys/msg.h>

#inlcude <sys/ipc.h>

函数原形:

int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg);

参数:

msqid   消息队列描述字

msgp       指向消息数据的指针

msgsz    发送消息的大小

msgflg       标志位

返回值:

若成功则为0,若出错则为-1

      

 

 

 

 

 

 

 

 向msgid代表的消息队列发送一个消息,即将发送的消息存储在msgp指向的msgbuf结构中,消息的大小由msgze指定。

       struct msgbuf{

              long mtype; /*消息类型*/

              char mtext[1]; /*消息数据*/

       };

       我们可以把msgbuf结构看成是一个模版,程序员可以根据自己的需要来设计直接的消息结构。举例来说,如果某个应用需要交换由一个整数后跟一个8字节字符数组构成的消息,那它可以如下定义自己的结构:

       typedef struct my_msgbuf{

              long mtypel

              int    mshort;

              char mchar[MY_DATA]

       }Message;

       对发送消息来说,有意义的msgflg标志为IPC_NOWAIT,指明在消息队列没有足够空间容纳要发送的消息时,msgsnd是否等待。造成msgsnd()等待的条件有两种:

       1.当前消息的大小与当前消息队列中的字节数之和超过了消息队列的总容量;

       2.当前消息队列的消息数(单位"")不小于消息队列的总容量(单位"字节数"),此时,虽然消息队列中的消息数目很多,但基本上都只有一个字节。

       msgsnd()解除阻塞的条件有三个:

       1.不满足上述两个条件,即消息队列中有容纳该消息的空间;

       2msqid代表的消息队列被删除;

      3.调用msgsnd()的进程被信号中断;

       msgsnd成功返回,与消息队列相关的msqid_ds结构得到更新,以标明发出该调用的进程IDmsg_lspid),进行该调用的时间(msg_stime),并指示队列中增加了一条消息。


4

名称:

msgrcv

功能:

从队列中取出消息

头文件:

#include <sys/types.h>

#include <sys/msg.h>

#inlcude <sys/ipc.h>

函数原形:

int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long msgtyp, int msgflg);

参数:

msqid   消息队列描述字

msgp    指向消息数据的指针

msgsz   接收消息的大小

msgtyp   请求读取的消息类型

msgflg   标志位

返回值:

若成功则为消息数据部分的长度,若出错则为-1

      

 

 

 

 

 

 

 

 

该系统调用从msgid代表的消息队列中读取一个消息,并把消息存储在msgp指向的msgbuf结构中。

       msqid为消息队列描述字;消息返回后存储在msgp指向的地址,msgsz指定msgbufmtext成员的长度(即消息内容的长度),msgtyp为请求读取的消息类型;读消息标志msgflg可以为以下几个常值的或:

       IPC_NOWAIT 如果没有满足条件的消息,调用立即返回,此时,errno=ENOMSG

       IPC_EXCEPT msgtyp>0配合使用,返回队列中第一个类型不为msgtyp的消息

       IPC_NOERROR 如果队列中满足条件的消息内容大于所请求的msgsz字节,则把该消息截断,截断部分将丢失。

       msgtyp使我们可以指定想要哪一种消息:

       msgtyp==0     返回队列中的第一个消息。

       msgtyp>0       返回队列中消息类型为msgtyp的第一个消息。

       msgtyp<0       返回队列中消息类型值小于或等于msgtyp绝对值的消息,如果这种消息有若干个,则取类型值最小的消息。

       msgrcv成功返回,与消息队列相关的msqid_ds结构得到更新,以标明发出该调用的进程IDmsg_lspid),进行该调用的时间(msg_stime),并将队列中消息数(msg_qnum)减1

 

7.消息队列与管道以及有名管道的比较

       消息队列与管道以及有名管道相比,具有更大的灵活性,首先,它提供有格式字节流,有利于减少开发人员的工作量;其次,消息具有类型,在实际应用中,可作为优先级使用。这两点是管道以及有名管道所不能比的。同样,消息队列可以在几个进程间复用,而不管这几个进程是否具有亲缘关系,这一点与有名管道很相似;但消息队列是随内核持续的,与有名管道(随进程持续)相比,生命力更强,应用空间更大。

 

       输出消息队列信息的例子:

#include <sys/types.h> 
#include <sys/msg.h>

#include <sys/ipc.h>

#include <stdio.h>

#define KEY 75

 

void msg_stat(int,struct msqid_ds);

 

int main()

{

int msgid;

int sflags,rflags;

struct msqid_ds msg;

 

struct msgbuf{

    int mtypes;

    char mtext[10];

}msg_buf;

 

msgid=msgget(KEY,IPC_CREAT);

msg_stat(msgid,msg);

 

sflags=IPC_NOWAIT;

msg_buf.mtypes=10;

msg_buf.mtext[0]=’a’;

msgsnd(msgid,&msg_buf,sizeof(msg_buf.mtext),sflags);

msg_stat(msgid,msg);

 

rflags=IPC_NOWAIT|MSG_NOERROR;

msgrcv(msgid,&msg_buf,4,10,rflags);

msg.msg_perm.uid=8;

msg.msg_perm.gid=8;

msg.msg_qbytes=16388;

msgctl(msgid,IPC_SET,&msg);

msg_stat(msgid,msg);

msgctl(msgid,IPC_RMID,NULL);

}

 

void msg_stat(int msgid,struct msqid_ds msg)

{

    msgctl(msgid,IPC_STAT,&msg);

 

printf("current number of bytes on queue is %dn",msg_info.msg_cbytes); /*最后消息的类型*/

printf("number of messages in queue is %dn",msg_info.msg_qnum);/*当前消息队列的消息数*/

printf("max number of bytes on queue is %dn",msg_info.msg_qbytes); /*消息队列的容量*/

printf("pid of last msgsnd is %dn",msg_info.msg_lspid);/*最后发送消息时间*/

printf("pid of last msgrcv is %dn",msg_info.msg_lrpid); /*最后接受消息进程*/

printf("last msgsnd time is %s", ctime(&(msg_info.msg_stime)));/*最后发送消息时间*/

printf("last msgrcv time is %s", ctime(&(msg_info.msg_rtime)));/*最后接受消息时间*/

printf("last change time is %s", ctime(&(msg_info.msg_ctime)));/* 最后修改时间*/

printf("msg uid is %dn",msg_info.msg_perm.uid); /*进程id*/

printf("msg gid is %dn",msg_info.msg_perm.gid); /*进程组id*/

}

 

 

用消息队列实现进程间通信的例子:

/* 14_6.c客户进程*/

#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#define MSGKEY 75

#define MAXLINE 100


struct msgform /*定义消息格式*/
{

long mtype; /*消息类型*/

pid_t pid; /*消息发送进程ID */
    char mtext[100];/*消息数据*/
}msg;
int msgqid;/*定义消息的id*/

void client()
{
    int i;
    msgqid=msgget(MSGKEY,0777); /*打开75#消息队列*/
    for(i=1;i>5;i++)/*循环发送5个消息*/
    {
        msg.mtype=i;
        printf(“client:”);

        fgets(msg.mtext,MAXLINE,stdin);/*把用户输入读入缓冲区*/
        msgsnd(msgqid,&msg,1024,0); /*发送消息*/
    }
    exit(0);
}

main( )

    client( );
}

/*14_7.c  服务器进程*/
#include <sys/types.h>
#include <sys/msg.h>
#include <sys/ipc.h>
#define MSGKEY 75

struct msgform
{

long mtype;

pid_t pid;
    char mtext[100];
}msg;
int msgqid;

void server( )
{
    msgqid=msgget(MSGKEY,0777|IPC_CREAT); /*创建75#消息队列*/
    do 
    {
        msgrcv(msgqid,&msg,1030,0,0); /*接收消息*/
       printf(“name:%d,type:%d,date:%sn”,msg.pid,msg.mtype,msg.mtext);

/*输出消息*/
    }

while(msg.mtype!=5);
    msgctl(msgqid,IPC_RMID,0); /*删除消息队列,归还资源*/
    exit(0);
}

main( )

    server( );
}

    程序client.c和server.c,分别用于消息的发送与接收.server建立一个 Key为75的消息队列,等待其它进程发来的消息。当遇到类型为1的消息,则作为结束信号,取消该队列,并退出server。server每接收到一个消息后显示一句“server:输入信息”. client使用 key为75的消息队列,先后发送类型从3到1的消息,然后退出。最后一个消息,即是 server端需要的结束信号。client 每发送一条消息后显示一句“client:要发送的消息”。
    注意: 二个程序分别编辑、编译为client与server。执行:
./server&
./client

文章来源于网络

转载时请以 超链接的形式 注明:转自疯狂泰克