一、msgsnd 和 msgrcv 函数

  #include <sys/types.h>
  #include <sys/ipc.h>
  #include <sys/msg.h>

功能:把一条消息添加到消息队列中
原型 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数
msgid: 由msgget函数返回的消息队列标识码
msgp:是一个指针,指针指向准备发送的消息结构体
msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型
msgflg:控制着当前消息队列满或到达系统上限时将要发生的事情
返回值:成功返回0;失败返回-1


msgflg=IPC_NOWAIT表示队列满不等待,返回EAGAIN错误。为0表示阻塞等待
消息结构在两方面受到制约。首先,它的具体数据必须小于系统规定的上限值MSGMAX;其次,它必须以一个long int长整数开始,接收者函数将利用这个长整数确定消息的类型。


消息结构参考形式如下:
struct msgbuf {
long  mtype;
char mtext[1];
};

The  mtext  field  is an array (or other structure) whose size is specified by msgsz, a nonnegative integer value.Messages of zero length (i.e., no mtext field) are permitted. 

即mtex 这块区域可以是个数组或者结构体,大小由参数msgsz 指明。



功能:是从一个消息队列接收消息
原型 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数
msgid: 由msgget函数返回的消息队列标识码
msgp:是一个指针,指针指向准备接收的消息结构体
msgsz:是msgp指向的最大消息长度,这个长度不含保存消息类型的那个long int长整型
msgtype:它可以实现接收优先级的简单形式
msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事
返回值:成功返回实际放到接收缓冲区里去的字符个数,失败返回-1


msgtype=0返回队列第一条信息
msgtype>0返回队列第一条类型等于msgtype的消息 
msgtype<0返回队列第一条类型小于等于msgtype绝对值的消息,并且是满足条件的消息类型最小的消息
msgflg=IPC_NOWAIT,队列没有可读消息不等待,返回ENOMSG错误。
msgflg=MSG_NOERROR,消息大小超过msgsz时被截断
msgtype>0且msgflg=MSG_EXCEPT,接收类型不等于msgtype的第一条消息。


二、消息队列实现回射客户/服务器

在前面的系列文章中,我们都是使用socket 套接字来实现回射客户/服务器程序,现在尝试使用消息队列来实现,主要就是利用上面介绍的两个函数msgsnd,msgrcv 。


对于服务器端来说,接收到一个消息结构体的类型如果为1,表示是客户请求,而mtex 字段的前4个字节存放着不同进程的pid ,后续字节才是真正的数据,服务器回射客户端时,将pid 作为类型,mtex 为实际数据,客户端只接收对应类型的数据,故可以区分不同客户端。


程序如下:

echoser.c

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
     do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    }  while( 0)

#define MSGMAX  8192
struct msgbuf
{
     long mtype;
     char mtext[MSGMAX];
};


void echo_ser( int msgid)
{
     struct msgbuf msg;
    memset(&msg,  0sizeof(msg));
     int nrcv =  0;
     while ( 1)
    {

         if ((nrcv = msgrcv(msgid, &msg, MSGMAX,  10)) <  0);
         int pid = *(( int *)msg.mtext);
        fputs(msg.mtext +  4, stdout);
        msg.mtype = pid;
        msgsnd(msgid, &msg, nrcv,  0);
        memset(&msg,  0sizeof(msg));

    }
}

int main( int argc,  char *argv[])
{
     int msgid;
    msgid = msgget( 1234, IPC_CREAT |  0666);
     if (msgid == - 1)
        ERR_EXIT( "msgget");

    echo_ser(msgid);


     return  0;
}

echocli.c

 C++ Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>

#define ERR_EXIT(m) \
     do { \
        perror(m); \
        exit(EXIT_FAILURE); \
    }  while( 0)

#define MSGMAX  8192

struct msgbuf
{
     long mtype;
     char mtext[MSGMAX];
};

void echo_cli( int msgid)
{
     int nrcv;
     int pid = getpid();
     struct msgbuf msg;
    memset(&msg,  0sizeof(msg));
    msg.mtype =  1;
    *(( int *)msg.mtext) = pid;
     while (fgets(msg.mtext +  4, MSGMAX, stdin) !=  NULL)
    {

         if (msgsnd(msgid, &msg,  4 + strlen(msg.mtext +  4), IPC_NOWAIT) <  0)
            ERR_EXIT( "msgsnd");

        memset(msg.mtext +  40, MSGMAX -  4);
         if ((nrcv = msgrcv(msgid, &msg, MSGMAX, pid,  0)) <  0)
            ERR_EXIT( "msgsnd");
        fputs(msg.mtext +  4, stdout);
        memset(msg.mtext +  40, MSGMAX -  4);

    }
}

int main( int argc,  char *argv[])
{

     int msgid;
    msgid = msgget( 12340);
     if (msgid == - 1)
        ERR_EXIT( "msgget");

    echo_cli(msgid);

     return  0;
}

程序逻辑不复杂,就不多说了,编译运行服务器端,再开两个客户端,可以看到正常回射输出。

但上述程序是存在死锁的风险的,当开了多个客户端,将队列写满了,此时服务器端想要写入就会阻塞,而因为客户端一旦发送了数据就阻塞等待服务器端回射类型为pid的消息,即队列的消息不会减少,此时就会形成死锁,即使服务器端是非阻塞地写入,此时会返回EAGAIN 的错误,程序逻辑来说我们也会使其不断地尝试去写入,而不是粗暴地将其退出进程,这样还是会死锁。

对此问题可以多开几个私有的队列进行服务,如下:


即某个客户端先创建一个私有消息队列,然后将私有消息队列标识符和具体数据发到共享的队列,服务器fork 出一个子进程,此时根据私有队列标识符就可以将数据回射到这个队列,这个客户端就可以从私有队列读取到回射的数据。


参考:

《UNP》


GitHub 加速计划 / li / linux-dash
6
1
下载
A beautiful web dashboard for Linux
最近提交(Master分支:4 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐