消息队列可认为是一个消息链表,它允许进程之间以消息的形式交换数据。有足够写权限的进程或线程可往队列中放置消息,有足够读权限的进程或线程可从队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。与管道不同,管道是字节流模型,没有消息边界。
本文介绍的是POSIX消息队列。POSIX消息队列与System V消息队列的相似之处在于数据的交换单位是整个消息,但它们之间仍然存在一些显著的差异。
- POSIX消息队列是引用计数的。只有当所有当前使用队列的进程都关闭了队列之后才会对队列进行标记以便删除。
- 每个System V消息都有一个整数类型,并且通过msgrcv()可以以各种方式类选择消息。与之形成鲜明对比的是,POSIX消息有一个关联的优先级,并且消息之间是严格按照优先级顺序排队的(以及接收)。
- POSIX消息队列提供了一个特性允许在队列中的一条消息可用时异步地通知进程。
概述
POSIX消息队列API中的主要函数如下。
- mq_open()函数创建一个新消息队列或打开一个既有队列,返回后续调用中会用到的消息队列描述符。
- mq_send()函数向队列写入一条消息。
- mq_recevie()函数从队列中读取一条消息。
- mq_close()函数关闭进程之前打开的一个消息队列。
- mq_unlink()函数删除一个消息队列名并当所有进程关闭该队列时对队列进行标记以便删除。
上面的函数所完成的功能是相当明显的。此外,POSIX消息队列API还具备一些特别的特性。
- 每个消息队列都有一组关联的特性,其中一些特性可以在使用mq_open()创建或打开队列时进行设置。获取和修改队列特性的工作则是由两个函数来完成的:mq_getattr()和mq_setattr()。
- mq_notify()函数允许一个进程向一个队列注册接收消息通知。在注册完之后,当一条消息可用时会通过发送一个信号或在一个单独的线程中调用一个函数来通知进程。
打开、关闭和断开链接消息队列
打开一个消息队列
mq_open()函数创建一个新消息队列或打开一个既有队列。
#include<fcntl.h>
#include<sys/stat.h>
#include<mqueue.h>
mqd_t mq_open(const char *name,int oflag,mode_t mode,struct mq_attr *attr);
name参数标识出了消息队列,消息队列对象的名字的最大长度为NAME_MAX(255)个字符。
oflag参数是一个位掩码,它控制着mq_open()操作的各个方面。
oflag参数的位值:
标记 | 描述 |
---|---|
O_CREAT | 队列不存在时创建队列 |
O_EXCL | 与O_CREAT一起使用,若消息队列已存在,则错误返回 |
O_RDONLY | 只读打开 |
O_WRONLY | 只写打开 |
O_RDWR | 读写打开 |
O_NONBLOCK | 以非阻塞模式打开 |
oflag参数的其中一个用途是,确定是打开一个既有队列还是创建和打开一个新队列。如果在oflag中不包含O_CREAT,那么将会打开一个既有队列。如果在oflag中包含了O_CREAT,并且与给定的name对应的队列不存在,那么就会创建一个新的空队列。
如果在oflag中同时包含O_CREAT和O_EXCL,并且与给定的name对应的队列已经存在,那么mq_open()就会失败。
mq_open()通常用来打开一个既有消息队列,这种调用只需要两个参数,但如果在flags中指定了O_CREAT,那么就还需要另外两个参数:mode和attr。这些参数用法如下:
- mode参数是一个位掩码,它指定了施加于新消息队列之上的权限。这个参数可取的位置与文件上的掩码值是一样的,并且与open()一样,mode中的值会与进程的umask取掩码。要从一个队列中读取消息就必须要将读权限赋予相应的用户,要向队列写入消息就需要写权限。
- attr参数是一个mq_attr结构,它指定了新消息队列的特性。如果attr为NULL,那么将使用实现定义的默认特性创建队列。mq_attr结构后在后面进行介绍。
mq_open()在成功结束时会返回一个消息队列描述符,它是一个类型为mqd_t的值,在后续的调用中将会使用它来引用这个打开着的消息队列。
fork()、exec()以及进程终止对消息队列描述符的影响
- 在fork()中子进程会接收其父进程的消息队列描述符的副本,并且这些描述符会引用同样 的打开着的消息队列描述符。子进程不会继承其父进程的任何消息通知注册。
- 当一个进程执行了一个exec()或终止时,所有其打开的消息队列描述符会被关闭。
关闭一个消息队列
mq_close()函数关闭消息队列描述符mqdes。
#include<mqueue.h>
int mq_close(mqd_t mqdes);
与文件上的close()一样,关闭一个消息队列并不会删除该队列。要删除队列则需要使用mq_unlinl(),它是unlink()在消息队列上的版本。
删除一个消息队列
mq_unlink()函数删除通过name标识的消息队列,并将队列标记为在所有进程使用完该队列之后销毁该队列。
#include<mqueue.h>
int mq_unlink(const char *name);
消息队列的特性
mq_open()、mq_getattr()以及mq_setattr()函数都会接收一个参数,它是一个指向mq_attr结构的指针。这个结构是在<mqueue.h>中进行定义的,其形式如下:
struct mq_attr
{
long mq_flags;//阻塞标志, 0或O_NONBLOCK
long mq_maxmsg;//最大消息数
long mq_msgsize;//每个消息最大大小
long mq_curmsgs;//当前消息数
};
在开始深入介绍mq_attr的细节之前有必要指出以下几点。
- 这三个函数中的每个函数都只用到了其中几个字段。上面给出的结构定义中的注释指出了各个函数所用到的字段。
- 这个结构包含了与一个消息描述符相关联的打开的消息队列描述的相关信息以及该描述符所引用的队列的相关信息。
- 其中一些字段中包含的信息在使用mq_open()创建队列时就已经确定下来了(mq_maxmsg和mq_msgsize);其他字段则会返回消息队列描述或消息队列的当前状态相关信息。
在创建队列时设置消息队列特性
在使用mq_open()创建消息队列时可以通过下列mq_attr字段来确定队列的特性。
- mq_maxmsg字段定义了使用mq_send()向消息队列添加消息的数量上限,其取值必须大于0。
- mq_msgsize字段定义了加入消息队列的每条消息的大小的上限,其取值必须大于0。
mq_maxmsg和mq_msgsize特性是在消息队列被创建时就确定下来的,并且之后也无法修改这两个特性。
获取消息队列特性
mq_getattr()函数返回一个包含与描述符mqdes相关联的消息队列描述和消息队列的相关信息的mq_attr结构。
#include<mqueue.h>
int mq_getattr(mqd_t mqdes,struct mq_attr *attr);
除了上面已经介绍的mq_maxmsg和mq_msgsize字段之外,attr指向的返回结构中还包含下列字段。
mq_flags
这些是与描述符mqdes相关联的打开的消息队列描述的标记,其取值只有一个:O_NONBLOCK。这个标记是根据mq_open()的oflag参数来初始化的,并且使用mq_setattr()可以修改这个标记。
mq_curmsgs
当前位于队列中的消息数。这个信息在mq_getattr()返回时可能已经发生了改变,前提是存在其他进程从队列中读取消息或写入消息。
修改消息队列特性
mq_setattr()函数设置与消息队列描述符mqdes相关联的消息队列描述的特性,并可选地返回与消息队列有关的消息。
#include<mqueue.h>
int mq_setattr(mq_t mqdes,const struct mq_attr *newattr,struct mq_attr *oldattr);
mq_setattr()函数执行下列任务。
- 它使用newattr指向的mq_attr结构中的mq_flags字段来修改与描述符mqdes相关联的消息队列描述的标记。
- 如果oldattr不为NULL,那么就返回一个包含之前的消息队列描述标记和消息队列特性的mq_attr结构。
交换消息
发送消息
mq_send()函数将位于msg_ptr指向的缓冲区中的消息添加到描述符mqdes所引用的消息队列中。
#include<mqueue.h>
int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned int msg_prio);
msg_len参数指定了msg_ptr指向的消息的长度,其值必须小于或等于队列的mq_msgsize特性,否则mq_send()就会返回EMSGSIZE错误。长度为零的消息是允许的。
每条消息都拥有一个用非负整数表示的优先级,它通过msg_prio参数指定。消息在队列中是按照优先级倒序排列的(即0表示优先级最低)。当一条消息被添加到队列中时,它会被放置在队列中具有相同优先级的所有消息之后。如果一个应用程序无需使用消息优先级,那么只需要将msg_prio指定为0即可。
接收消息
mq_receive()函数从mqdes引用的消息队列中删除一条优先级最高、存在时间最长的消息并将删除的消息放置在msg_ptr指向的缓冲区。
#include<mqueue.h>
ssize_t mq_receive(mqd_t mqdes, char *mdg_ptr,size_t msg_len,unsigned int *msg_prio);
调用着使用msg_len参数来指定msg_ptr指向的缓冲区中的可用字节数。
不管消息的实际大小是什么,msg_len必须要大于或等于队列的mq_msgsize特性,否则mq_receive()就会失败并返回EMSGSIZE错误。如果不清楚一个队列的mq_msgsize特性的值,那么可以使用mq_getattr()来获取这个值。
如果msg_prio不为NULL,那么接收到的消息的优先级会被复制到msg_prio指向的位置处。
消息通知
POSIX消息队列能够接收之前为空的队列上有可用消息的异步通知(即队列从空变成了非空)。这个特性意味着已经无需执行一个阻塞的调用或将消息队列描述符标记为非阻塞并在队列上定期执行mq_receive()调用。
进程可以选择通过信号的形式或者通过一个单独的线程中调用一个函数的形式来接收通知。
mq_notify()函数注册调用进程在一条消息进入描述符mqdes引用的空队列时接收通知。
#include<mqueue.h>
int mq_notify(mqd_t mqdes,const struct sigevent *notification);
notification参数指定了进程接收通知的机制。
有关消息的通知需要注意以下几点:
- 在任何一个时刻都只有一个进程能够向一个特定的消息队列注册接收通知。如果一个消息队列上已经存在注册进程了,那么后续在该队列上的注册请求将会失败。
- 只有当一条新消息进入之前为空的队列时注册进程才会收到通知。如果在注册的时候队列中已经包含消息,那么只有当队列被清空之后有一条新消息达到之时才会发出通知。
- 当向注册进程发送了一个通知之后就会删除注册信息,之后任何进程就能够向队列注册接收通知了。当通知被发送给注册进程时,注册即被撤销,该进程若要重新注册,则必须重新调用mq_notify()。
- 注册进程只有在当前不存在其他在该队列上调用mq_receive()而发生阻塞的进程时才会收到通知。如果其他进程在mq_receive()调用中被阻塞了,那么该进程会读取消息,注册进程会保持注册状态。
- 一个进程可以通过在调用mq_notify()时传入一个值为NULL的notification参数来撤销自己在消息通知上的注册信息。
struct sigevent
{
int sigev_notify;
int sigev_signo;
union sigval sigev_value;
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
sigev_notify取值:
SIGEV_NONE:事件发生时,什么也不做;
SIGEV_SIGNAL:事件发生时,将sigev_signo指定的信号发送给指定的进程;SIGEV_THREAD:事件发生时,内核会(在此进程内)以sigev_notify_attributes为线程属性创建一个线程,并让其执行sigev_notify_function,并以sigev_value为其参数
sigev_signo:在sigev_notify=SIGEV_SIGNAL时使用,指定信号类别
sigev_value:sigev_notify=SIGEV_SIGEV_THREAD时使用,作为sigev_notify_function的参数
union sigval
{
int sival_int;
void *sival_ptr;
};
sigev_notify_function:在sigev_notify=SIGEV_THREAD时使用,其他情况下置NULL
sigev_notify_attributes:在sigev_notify=SIGEV_THREAD时使用,指定创建线程的属性,其他情况下置NULL