Linux系统编程(十三)--- 邮箱通信(Mailbox)模式
目录
一、邮箱通信
它是一种经典的线程间通信(IPC,Inter-Process Communication)机制,常用于嵌入式系统或并发编程中。每个线程拥有自己的消息队列,通过发送和接收消息来解耦数据生产与处理逻辑。其中 send_data 将数据推送到目标线程的队列,recv_data 则从本线程队列中取出消息并根据发送者进行相应处理。

二、系统设计与数据结构
2.1 整体架构
系统由以下核心组件构成:
-
邮件系统(MBS):全局单例,管理所有线程的注册信息,提供互斥保护。
-
线程节点(
LIST_LINK中的elem):存储线程的 tid、名称、消息队列头尾、线程函数入口。 -
邮件数据(
MAIL_DATA):包含消息正文、发送者信息、接收者信息。 -
队列模块(
queue.h/c):为每个线程提供消息队列(带哨兵头节点的单向链表)。 -
链表模块(
list.h/c):管理所有已注册线程的链表。
2.2 关键数据结构
#ifndef __LIST_H__
#define __LIST_H__
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
typedef void*(*th_fun)(void* arg); // 线程函数类型
typedef char DATATYPE[256]; // 消息数据最大长度256字节
// 邮件数据结构(包含发送/接收方标识和实际数据)
typedef struct mail_data
{
pthread_t id_of_sender;
char name_of_sender[256];
pthread_t id_of_recver;
char name_of_recver[256];
DATATYPE data;
} MAIL_DATA;
// 队列节点结构(单向链表,每个节点携带一个 MAIL_DATA)
typedef struct queue{
MAIL_DATA data;
struct queue* next;
// int pro; // 注释掉的优先级字段(预留)
} Que, *pQue;
// 线程节点数据(存储线程信息及消息队列头尾)
typedef struct thread_node
{
pthread_t tid; // 线程id号
char name[256]; // 线程名字 ,必须唯一
Que *mail_head, *mail_tail; // 消息队列头尾指针
th_fun th; // 线程函数
} LIST_DATA;
// 链表节点(包含线程数据及next指针)
typedef struct Link{
LIST_DATA elem;
struct Link *next;
} LIST_LINK;
// 外部函数声明
extern LIST_LINK * list_init();
extern LIST_LINK * list_for_each(LIST_LINK* head, char *name);
extern void list_add(LIST_LINK *head, LIST_LINK *info);
#endif
#ifndef __QUEUE_H__
#define __QUEUE_H__
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "list.h"
// 队列操作函数声明
extern int init_que(LIST_LINK *list_head);
extern int in_queue(LIST_LINK *list_head, MAIL_DATA *data);
extern int out_queue(LIST_LINK *list_head, MAIL_DATA *data);
extern void destroy(LIST_LINK *list_head);
#endif
2.3 工作流程
-
调用
create_mail_box_system()创建全局邮件系统。 -
各线程通过
register_to_mail_system()向系统注册,系统自动启动线程。 -
发送方调用
send_msg(mbs, "target_name", data),将消息拷贝后推入目标线程队列。 -
接收方在自己的线程函数中循环调用
recv_msg(mbs, sender_name_buf, data_buf),忙等队列非空,取出消息后处理。
三、核心功能实现解析(含队列与链表模块)
3.1 队列模块实现(queue.c):
#include "queue.h"
/*
typedef int* pi;
#define p int*
pi a, b;
p c, d;
Que* head, tail;
*/
//pQue head, tail;
Que *head, *tail;
// 初始化队列:为指定线程节点创建哨兵头节点,队列头尾均指向该哨兵
int init_que(LIST_LINK *list_head)
{
Que* tmp = malloc(sizeof(Que));
tmp->next = NULL;
list_head->elem.mail_head = list_head->elem.mail_tail = tmp;
return 0;
}
// 入队:在队尾插入新节点,拷贝邮件数据
int in_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
Que* tmp = malloc(sizeof(Que));
memcpy(&tmp->data, data, sizeof(MAIL_DATA));
tmp->next = NULL;
list_head->elem.mail_tail->next = tmp;
list_head->elem.mail_tail = list_head->elem.mail_tail->next;
return 0;
}
// 出队:从队首(哨兵之后)取出一个节点,拷贝数据并释放节点
int out_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
if(list_head->elem.mail_head == list_head->elem.mail_tail){
printf("queue is empty. \n");
return -1;
}
// 如果只有一个有效节点,出队后需要将队尾指针重新指向哨兵
if(list_head->elem.mail_head->next == list_head->elem.mail_tail){
list_head->elem.mail_tail = list_head->elem.mail_head;
}
Que* del = list_head->elem.mail_head->next;
list_head->elem.mail_head->next = del->next;
memcpy(data, &del->data, sizeof(MAIL_DATA));
free(del);
return 0;
}
// 销毁队列:释放所有节点(包括哨兵)
void destroy(LIST_LINK *list_head)
{
Que* tmp = list_head->elem.mail_head;
while(list_head->elem.mail_head){
tmp = list_head->elem.mail_head;
list_head->elem.mail_head = list_head->elem.mail_head->next;
free(tmp);
}
}
说明:该队列实现使用带哨兵头节点的单向链表,mail_head 指向哨兵,mail_tail 指向最后一个有效节点(或哨兵当队列空)。入队 O(1),出队 O(1)。
3.2 链表模块实现(list.c)
#include "list.h"
// 创建新链表头(未在主体代码中使用,但模块提供)
LIST_LINK * list_init()
{
LIST_LINK *temp = malloc(sizeof(LIST_LINK));
temp->next = NULL;
return temp;
}
// 头插法:将 info 节点插入 head 之后
void list_add(LIST_LINK* head, LIST_LINK *info)
{
info->next = head->next;
head->next = info;
}
// 根据线程名称查找节点(注意:遍历时 tmp 从 head 开始,会检查哨兵节点)
LIST_LINK * list_for_each(LIST_LINK* head, char *name)
{
LIST_LINK * tmp = NULL;
tmp = head;
while(tmp->next != NULL)
{
if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
return tmp;
tmp = tmp->next;
}
if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
return tmp;
else
return NULL;
}
3.3 主程序(mail_system.c):
3.3.1 创建与销毁邮件系统:
MBS* create_mail_box_system()
{
MBS *temp =(MBS*)malloc(sizeof(MBS));
if(NULL == temp) { ... }
pthread_mutex_init(&temp->mutex, NULL);
temp->thread_list = malloc(sizeof(LIST_LINK));
temp->thread_list->next = NULL;
printf("mail box create ok!! \n");
return temp;
}
int destroy_mail_box_system(MBS*mbs)
{
pthread_mutex_destroy(&mbs->mutex);
LIST_LINK *temp = NULL;
LIST_LINK *find = mbs->thread_list;
while(find != NULL) { ... free ... }
free(find);
return 0;
}
3.3.2 线程注册
int register_to_mail_system(MBS *mbs, char name[], th_fun th)
{
LIST_LINK* temp = malloc(sizeof(LIST_LINK));
strcpy(temp->elem.name, name);
temp->elem.th = th;
temp->next = NULL;
init_que(temp); // 初始化该线程的消息队列
pthread_create(&(temp->elem.tid), NULL, th, NULL);
list_add(mbs->thread_list, temp); // 头插法加入全局链表
printf("register mail system |%s| ok \n", temp->elem.name);
return 0;
}
3.3.3 发送消息
int send_msg(MBS*msb, char*recvname, DATATYPE data)
{
MAIL_DATA* temp = malloc(sizeof(MAIL_DATA));
strcpy(temp->data, data);
temp->id_of_sender = pthread_self();
LIST_LINK *find = list_for_each(msb->thread_list, recvname);
if (find == NULL) { ... return -1; }
char* name = get_th_name(msb);
strcpy(temp->name_of_sender, name);
strcpy(temp->name_of_recver, recvname);
ENTER_CRITICAL_AREA(&msb->mutex);
in_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
return 0;
}
3.3.4 接收消息
int recv_msg(MBS*msb, char*sendname, DATATYPE data)
{
MAIL_DATA* temp = malloc(sizeof(MAIL_DATA));
pthread_t tid = pthread_self();
LIST_LINK *find = msb->thread_list;
while(find != NULL) {
if(find->elem.tid == tid) break;
find = find->next;
}
if(find->elem.tid == tid) {
while (1) { // 忙等队列非空
if(find->elem.mail_head != find->elem.mail_tail) {
ENTER_CRITICAL_AREA(&msb->mutex);
out_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
break;
}
}
}
strcpy(sendname, temp->name_of_sender);
strcpy(data, temp->data);
free(temp);
return 0;
}
3.3.5 辅助函数
char *get_th_name(MBS*msb)//根据当前线程的 ID(pthread_self()),在全局线程链表中查找并返回该线程注册时使用的名称(name)。
{
pthread_t tid = pthread_self();
LIST_LINK *find = msb->thread_list;
LIST_LINK *end = end_list; // 注意:end_list 为全局变量,需外部设置
while(find != end) { ... }
if(find->elem.tid == tid) return find->elem.name;
else return NULL;
}
int wait_all_end(MBS*msb)//等待所有已注册的线程正常结束,即主线程阻塞直到每个子线程执行完 return NULL 或 pthread_exit。
{
LIST_LINK *find = msb->thread_list->next;
LIST_LINK *end = end_list;
while(find != end) {
pthread_join(find->elem.tid, NULL);
find = find->next;
}
pthread_join(find->elem.tid, NULL);
return 0;
}
四、完整代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "list.h"
#include "queue.h"
#define ENTER_CRITICAL_AREA(mutex) do{pthread_mutex_lock(mutex);}while(0)
#define QUIT_CRITICAL_AREA(mutex) do{pthread_mutex_unlock(mutex);}while(0)
unsigned int pthread_index;
LIST_LINK *end_list = NULL;
//typedef struct thread_node
//{
// pthread_t tid; //线程id号
// char name[256]; //线程名字 ,必须唯一
// Que *amil_head, *mail_tail;
// th_fun th; //线程函数
//}TH_NODE;
typedef struct mail_box_system
{
pthread_mutex_t mutex; //保护邮件系统
LIST_LINK *thread_list;
}MBS;
MBS* mbs;
int send_msg(MBS*msb,char*recvname,DATATYPE data);
int recv_msg(MBS*msb,char*sendname,DATATYPE data);
MBS* create_mail_box_system()
{
MBS *temp =(MBS*)malloc(sizeof(MBS));
if(NULL == temp)
{
perror("create_mail_box_system mutex malloc failure\n");
return NULL;
}
int ret = pthread_mutex_init(&temp->mutex,NULL);
if(0 != ret)
{
perror("create_mail_box_system mutex init failure\n");
return NULL;
}
temp->thread_list = malloc(sizeof(LIST_LINK));
// memset(temp->thread_list, 0, sizeof(LIST_LINK));
temp->thread_list->next = NULL;
printf("mail box create ok!! \n");
return temp;
}
int destroy_mail_box_system(MBS*mbs)
{
pthread_mutex_destroy(&mbs->mutex);
LIST_LINK *temp = NULL;
LIST_LINK *find = mbs->thread_list;
while(find != NULL)
{
temp = find;
find = find->next;
free(temp);
}
free(find);
return 0;
}
void* data_collect_th(void* arg)
{
int i = 5;
while(i--)
{
// printf("this is the data th\n");
sleep(1);
send_msg(mbs,"show","aabb");
send_msg(mbs,"show","ccddee");
send_msg(mbs,"show","nihao");
send_msg(mbs,"sock","1122");
send_msg(mbs,"sock","3344");
send_msg(mbs,"sock","88182172");
}
printf("data_collect_th end\n");
return NULL;
}
void* show_th(void* arg)
{
while(1)
{
// printf("this is the show th\n");
char sendname[256];
DATATYPE data;
recv_msg(mbs,sendname,data);
printf("show recv msg from %s msg is %s\n", sendname, data);
sleep(2);
}
return NULL;
}
void* sock_th(void* arg)
{
while(1)
{
// printf("this is the sock th\n");
DATATYPE data;
char sendname[256];
recv_msg(mbs,sendname,data);
send_msg(mbs, "show", "I'm sock");
printf("scok recv msg from %s msg is %s\n", sendname, data);
sleep(3);
}
return NULL;
}
char *get_th_name(MBS*msb)
{
pthread_t tid = pthread_self();
LIST_LINK *find = msb->thread_list;
LIST_LINK *end = end_list;
while(find != end)
{
if(find->elem.tid == tid)
break;
find = find->next;
}
if(find->elem.tid == tid)
{
//printf("cant find the recv th\n");
return find->elem.name;
}
else
return NULL;
}
int register_to_mail_system(MBS *mbs,char name[],th_fun th)
{
LIST_LINK* temp = malloc(sizeof(LIST_LINK));
if(NULL == temp)
{
perror("register to mail malloc \n");
return -1;
}
strcpy(temp->elem.name ,name);
temp->elem.th = th;
temp->next = NULL;
init_que(temp);
pthread_t ret = pthread_create(&(temp->elem.tid),NULL,th,NULL);
if(0!=ret)
{
perror("register to mail thread create\n");
return -1;
}
list_add(mbs->thread_list, temp);
printf("register mail system |%s| ok \n", temp->elem.name);
return 0;
}
int unregister_from_mailbox(MBS*msb,char*name)
{
LIST_LINK* find=msb->thread_list->next;
LIST_LINK *temp = NULL;
while(find != NULL)
{
temp = find;
find = find->next;
if(0 == strcmp(temp->elem.name ,name))
{
destroy(find);
free(temp);
return 0;
}
}
if(0 == strcmp(find->elem.name ,name))
{
destroy(find);
free(find);
return 0;
}
return -1;
}
int wait_all_end(MBS*msb)
{
LIST_LINK *find=msb->thread_list->next;
LIST_LINK *end=end_list;
while(find != end)
{
// pthread_join(find,NULL);
pthread_join(find->elem.tid,NULL);
find = find->next;
}
pthread_join(find->elem.tid,NULL);
return 0;
}
int send_msg(MBS*msb, char*recvname, DATATYPE data)
{
MAIL_DATA* temp = malloc(sizeof(MAIL_DATA));
strcpy(temp->data, data);
temp->id_of_sender = pthread_self();
LIST_LINK *find = list_for_each(msb->thread_list, recvname);
if (find == NULL)
{
printf("can,t find msg \n");
return -1;
}
char* name = get_th_name(msb);
strcpy(temp->name_of_sender,name);
strcpy(temp->name_of_recver,recvname);
ENTER_CRITICAL_AREA(&msb->mutex);
in_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
// printf("send msg is ok |%s| msg is %s\n", temp->name_of_recver, temp->data);
return 0;
}
int recv_msg(MBS*msb,char*sendname,DATATYPE data)
{
MAIL_DATA* temp = malloc(sizeof(MAIL_DATA));
pthread_t tid = pthread_self();
LIST_LINK *find = msb->thread_list;
while(find != NULL)
{
if( find->elem.tid == tid)
break;
find = find->next;
}
if( find->elem.tid == tid)
{
while (1)
{
if(find->elem.mail_head != find->elem.mail_tail)
{
ENTER_CRITICAL_AREA(&msb->mutex);
out_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
break;
}
}
}
strcpy(sendname, temp->name_of_sender);
strcpy(data, temp->data);
free(temp);
return 0;
}
int main()
{
mbs = create_mail_box_system();
register_to_mail_system(mbs,"show",show_th);
register_to_mail_system(mbs,"sock",sock_th);
register_to_mail_system(mbs,"collect",data_collect_th);
wait_all_end(mbs);
destroy_mail_box_system(mbs);
printf("Hello World!");
return 0;
}
五、总结与补充:
补充1:

为什么写成do while,如果不要这个,这里面多一个函数的话,if里面执行这个宏,后续的函数执行不到。
补充2:
其他版本的mailbox:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
//数据结构
typedef struct NODE
{
char data[128];
char sender[32];
char recver[32];
struct NODE *next;
} NODE;
typedef struct MAIL
{
char name[32];
pthread_t tid;
NODE *queue; /* 消息队列头 */
pthread_mutex_t lock;
struct MAIL *next;
} MAIL;
//全局链表
static MAIL *mailbox = NULL;
static pthread_mutex_t mailbox_lock;
//创建链表注册链表
void create_mail(void)
{
mailbox = NULL;
pthread_mutex_init(&mailbox_lock, NULL); // 动态初始化
printf("[mailbox] created\n");
}
void register_th(const char *name)
{
MAIL *m = (MAIL *)malloc(sizeof(MAIL)); //开空间
memset(m, 0, sizeof(MAIL)); //初始化
//填充结构体
strncpy(m->name, name, sizeof(m->name) - 1);
m->tid = pthread_self();
m->queue = NULL;
pthread_mutex_init(&m->lock, NULL);
pthread_mutex_lock(&mailbox_lock); //加锁,操作链表
m->next = mailbox;
mailbox = m;
pthread_mutex_unlock(&mailbox_lock);
printf("[%s] register_th tid:%lu\n", name, (unsigned long)m->tid); //打印注册信息
}
//按名字/tid 查找
static MAIL *find_by_name(const char *name)
{
MAIL *m = mailbox;
while (m)
{
if (strcmp(m->name, name) == 0)
return m;
m = m->next;
}
return NULL;
}
static MAIL *find_by_tid(pthread_t tid)
{
MAIL *m = mailbox;
while (m)
{
if (pthread_equal(m->tid, tid))
return m;
m = m->next;
}
return NULL;
}
//发数据
void send_data(const char *recver, const char *data)
{
// 1. copy data
NODE *node = (NODE *)malloc(sizeof(NODE));
memset(node, 0, sizeof(NODE));
strncpy(node->data, data, sizeof(node->data) - 1);
strncpy(node->recver, recver, sizeof(node->recver) - 1);
node->next = NULL;
// 2. find save by name from mailbox
MAIL *recv_mail = find_by_name(recver);
if (!recv_mail)
{
printf("[send_data] can not find recver: %s\n", recver);
free(node);
return;
}
// 3. find tid_self -> name (sender)
MAIL *self_mail = find_by_tid(pthread_self());
if (!self_mail)
{
printf("[send_data] sender not registered\n");
free(node);
return;
}
strncpy(node->sender, self_mail->name, sizeof(node->sender) - 1);
// 4. push node to save queue
pthread_mutex_lock(&recv_mail->lock); //加锁
//头插法,入队
node->next = recv_mail->queue;
recv_mail->queue = node;
pthread_mutex_unlock(&recv_mail->lock); //解锁
printf("[%s] send_data -> [%s] : %s\n", node->sender, recver, data);
}
//收数据
void recv_data(const char *sender, char *data)
{
// 1. find tid_self -> name
MAIL *self_mail = find_by_tid(pthread_self());
if (!self_mail)
{
printf("[recv_data] not registered\n");
return;
}
// 2. find node by tid 加锁,遍历队列查找匹配 sender 的节点
// 3. pop queue -> node
pthread_mutex_lock(&self_mail->lock);
NODE *prev = NULL;
NODE *curr = self_mail->queue;
NODE *found = NULL;
// 遍历队列,寻找 sender 匹配的节点
while (curr)
{
if (strcmp(curr->sender, sender) == 0)
{
// 找到匹配节点,将其从链表中摘除
found = curr;
if (prev == NULL)
{
// 匹配的是队首
self_mail->queue = curr->next;
}
else
{
prev->next = curr->next;
}
break;
}
prev = curr;
curr = curr->next;
}
pthread_mutex_unlock(&self_mail->lock);
// 如果没有找到匹配节点,直接返回
if (!found)
{
printf("[%s] recv_data: no message from '%s'\n", self_mail->name, sender);
return;
}
// 4. cmp recver name 检查 recver 是否匹配
if (strcmp(found->recver, self_mail->name) != 0)
{
printf("[%s] recv_data: recver mismatch, drop\n", self_mail->name);
free(found);
return;
}
// 5. case sender deal data 处理匹配到的消息
printf("[%s] recv_data from [%s] : %s\n", self_mail->name, found->sender, found->data);
if (data)
{
strncpy(data, found->data, 128 - 1);
}
free(found);
}
//线程函数
void *th1_fun(void *arg) // collect1
{
register_th("collect1");
sleep(1);
send_data("save", "temp=36.5");
send_data("display", "temp=36.5");
return NULL;
}
void *th2_fun(void *arg) // collect2
{
register_th("collect2");
sleep(1);
send_data("save", "humi=60%");
send_data("display", "humi=60%");
return NULL;
}
void *th3_fun(void *arg) // save
{
register_th("save");
sleep(2); // 等 collect 发完
char data[128] = {0};
recv_data("collect1", data);
if (data[0])
printf("[save] saved: %s\n", data);
memset(data, 0, sizeof(data));
recv_data("collect2", data);
if (data[0])
printf("[save] saved: %s\n", data);
return NULL;
}
void *th4_fun(void *arg) // display
{
register_th("display");
sleep(2);
char data[128] = {0};
recv_data("collect1", data);
if (data[0])
printf("[display] show: %s\n", data);
memset(data, 0, sizeof(data));
recv_data("collect2", data);
if (data[0])
printf("[display] show: %s\n", data);
return NULL;
}
void *th5_fun(void *arg) // alarm
{
register_th("alarm");
sleep(3);
send_data("display", "ALARM: over limit!");
return NULL;
}
int main(int argc, char **argv)
{
create_mail();
pthread_t tid[5] = {0};
pthread_create(&tid[0], NULL, th1_fun, NULL); // collect1
pthread_create(&tid[1], NULL, th2_fun, NULL); // collect2
pthread_create(&tid[2], NULL, th3_fun, NULL); // save
pthread_create(&tid[3], NULL, th4_fun, NULL); // display
pthread_create(&tid[4], NULL, th5_fun, NULL); // alarm
int i;
for (i = 0; i < 5; i++)
pthread_join(tid[i], NULL); //回收线程
printf("[main] all done\n");
return 0;
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)