ARM-11-Mailbox 线程间通信机制
目录
4.1 队列模块(queue.c)——带哨兵头节点的单向链表
register_to_mail_system()——注册并启动线程
一、什么是 Mailbox 通信
Mailbox(邮箱通信)是一种经典的线程间通信(IPC,Inter-Process Communication)机制,广泛用于嵌入式系统和并发编程。
核心思想:每个线程拥有自己的消息队列,线程之间通过「发消息」和「收消息」来解耦数据生产与处理逻辑,不需要直接调用对方函数,也不需要共享内存。
生产者线程(collect) 目标线程(save / display)
│ │
│ send_data("save", data) │
├─────────────────────────────►│ save 的消息队列
│ [消息1][消息2]...
│ │
│ recv_data("collect1", buf)
│ │ 从队列取出,处理
|
💡 为什么用 Mailbox 而不是直接调用函数? 解耦:发送方不关心接收方什么时候处理,只管投递消息。 异步:收发双方运行在不同线程,互不阻塞对方主逻辑。 扩展性好:新增线程只需注册到信箱,不需要修改其他线程的代码。 |
二、系统设计与数据结构
2.1 整体架构
系统由五个核心组件构成:
|
组件 |
对应结构体 |
职责 |
|
邮件系统(MBS) |
struct mail_box_system |
全局单例,管理所有线程注册信息,持有互斥锁保护 |
|
线程节点 |
LIST_LINK → elem |
存储 tid、名称、消息队列头尾指针、线程函数入口 |
|
邮件数据 |
MAIL_DATA |
消息正文 + 发送方信息 + 接收方信息 |
|
队列模块 |
queue.h / queue.c |
为每个线程提供独立消息队列(带哨兵头节点的单向链表) |
|
链表模块 |
list.h / list.c |
管理所有已注册线程节点的全局链表 |
2.2 关键数据结构
① MAIL_DATA——一条消息(一封信)
typedef struct mail_data {
pthread_t id_of_sender; // 发送方线程 ID
char name_of_sender[256]; // 发送方线程名
pthread_t id_of_recver; // 接收方线程 ID(预留)
char name_of_recver[256]; // 接收方线程名
char data[256]; // 消息内容(DATATYPE = char[256])
} MAIL_DATA;
每条消息带着「我是谁发的、发给谁、内容是什么」三份信息,接收方收到后能知道来源,据此分发处理。
② Que——消息队列节点(信封)
typedef struct queue {
MAIL_DATA data; // 这个节点携带的一条消息
struct queue *next; // 指向下一个队列节点
} Que, *pQue;
③ LIST_DATA——线程节点(线程在信箱里的档案)
typedef struct thread_node {
pthread_t tid; // 线程 ID
char name[256]; // 线程名,全局唯一
Que *mail_head; // 消息队列头(哨兵节点)
Que *mail_tail; // 消息队列尾
th_fun th; // 线程函数指针
} LIST_DATA;
④ LIST_LINK——信箱链表节点
typedef struct Link {
LIST_DATA elem; // 线程档案(上面的 LIST_DATA)
struct Link *next; // 指向链表下一个节点
} LIST_LINK;
⑤ MBS——邮件系统总控
typedef struct mail_box_system {
pthread_mutex_t mutex; // 保护整个邮件系统的互斥锁
LIST_LINK *thread_list; // 已注册线程的链表头(哑节点)
} MBS;
// 宏定义,让加锁/解锁语义更清晰
#define ENTER_CRITICAL_AREA(mutex) do{ pthread_mutex_lock(mutex); }while(0)
#define QUIT_CRITICAL_AREA(mutex) do{ pthread_mutex_unlock(mutex); }while(0)
|
💡 为什么宏要写成 do{...}while(0)? 如果直接写 #define LOCK(m) pthread_mutex_lock(m),在 if 语句里用会出问题: if(cond) LOCK(m); → 展开成 if(cond) pthread_mutex_lock(m);; 多了分号 do{...}while(0) 是 C 语言宏的标准写法,展开后是一个完整语句,无论放在哪里都安全。 |
2.3 内存结构示意图
MBS
├─ mutex(互斥锁)
└─ thread_list
│
[哑头节点] ──► [show节点] ──► [sock节点] ──► [collect节点] ──► NULL
elem.name="show" elem.name="sock" elem.name="collect"
elem.tid=... elem.tid=... elem.tid=...
mail_head─►[哑] mail_head─►[哑] mail_head─►[哑]
mail_tail─►[哑] mail_tail─►[msg1] mail_tail─►[哑]
│
[msg2]
│
NULL
三、工作流程
|
1 |
create_mail_box_system() 创建 MBS,初始化互斥锁,创建链表哑头节点 |
|
2 |
register_to_mail_system() 为线程分配链表节点,初始化消息队列(哨兵节点),pthread_create 启动线程,头插法加入全局链表 |
|
3 |
send_msg(mbs, "target", data) 构造 MAIL_DATA,按名字从链表找目标节点,加锁后 in_queue 入队 |
|
4 |
recv_msg(mbs, sendname_buf, data_buf) 用 TID 找到自己的节点,忙等队列非空,加锁后 out_queue 出队,返回发送方名和数据 |
|
5 |
wait_all_end(mbs) 遍历链表对每个线程 pthread_join,等待所有线程结束 |
|
6 |
destroy_mail_box_system(mbs) 销毁互斥锁,释放所有链表节点内存 |
四、核心模块实现详解
4.1 队列模块(queue.c)——带哨兵头节点的单向链表
每个线程的消息队列使用「带哨兵头节点」的单向链表实现,mail_head 永远指向哨兵,mail_tail 指向最后一个有效节点(队列空时也指向哨兵)。
空队列: mail_head == mail_tail
[哨兵] → NULL
↑ ↑
mail_head mail_tail
插入两条消息后:
[哨兵] → [消息1] → [消息2] → NULL
↑ ↑
mail_head mail_tail
init_que()——初始化队列
int init_que(LIST_LINK *list_head)
{
Que *tmp = malloc(sizeof(Que)); // 申请哨兵节点
tmp->next = NULL;
// 头尾都指向哨兵,表示队列为空
list_head->elem.mail_head = list_head->elem.mail_tail = tmp;
return 0;
}
in_queue()——入队(尾插,保证先进先出)
int in_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
Que *tmp = malloc(sizeof(Que));
memcpy(&tmp->data, data, sizeof(MAIL_DATA)); // 深拷贝消息数据
tmp->next = NULL;
// 挂到当前尾节点后面
list_head->elem.mail_tail->next = tmp;
// 尾指针后移
list_head->elem.mail_tail = list_head->elem.mail_tail->next;
return 0;
}
|
📌 为什么用 memcpy 而不是直接赋值? MAIL_DATA 是一个结构体,直接赋值(=)也是值拷贝,效果相同。 memcpy 更明确表达「深拷贝」意图,避免将来改成指针类型时漏掉拷贝。 拷贝完之后,调用方 free 掉临时的 temp 不影响队列里的副本。 |
out_queue()——出队(从哨兵后取第一个节点)
int out_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
// 判断空队列:头 == 尾 表示只剩哨兵
if(list_head->elem.mail_head == list_head->elem.mail_tail) {
printf("queue is empty.\n");
return -1;
}
// 只剩最后一个有效节点时,出队后尾指针要归位到哨兵
if(list_head->elem.mail_head->next == list_head->elem.mail_tail) {
list_head->elem.mail_tail = list_head->elem.mail_head;
}
Que *del = list_head->elem.mail_head->next; // 哨兵的 next 才是真数据
list_head->elem.mail_head->next = del->next; // 哨兵跳过 del
memcpy(data, &del->data, sizeof(MAIL_DATA)); // 把数据拷给调用方
free(del);
return 0;
}
4.2 链表模块(list.c)
list_add()——头插法
void list_add(LIST_LINK *head, LIST_LINK *info)
{
info->next = head->next; // 新节点指向原第一个有效节点
head->next = info; // 哑头节点指向新节点
}
// 插入前:[哑头] → [show] → [sock] → NULL
// 插入后:[哑头] → [collect] → [show] → [sock] → NULL
// ⚠ 最后注册的线程排在链表最前面(头插法特性)
list_for_each()——按名字查找线程节点
LIST_LINK * list_for_each(LIST_LINK *head, char *name)
{
LIST_LINK *tmp = head; // 从哑头节点开始遍历
while(tmp->next != NULL)
{
if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
return tmp;
tmp = tmp->next;
}
// 单独判断最后一个节点(循环结束条件是 next==NULL,最后一个没进循环体)
if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
return tmp;
return NULL;
}
|
⚠️ 注意:从哑头节点开始比较 tmp 初始值是 head(哑头节点),其 elem.name 是未初始化的内存(通常是空字符串)。 如果传入空字符串 name,strncmp 可能误匹配哑头节点,返回错误结果。 实际使用中线程名不会为空,所以正常运行没问题,但这是一个边界隐患。 |
4.3 发送消息——send_msg()
对应图片里的四步逻辑:
// send_data( recver, data )
// {
// 1. copy data ← 构造消息节点,复制数据
// 2. find save by name ← 按名字从信箱找目标线程
// 3. find tid_self -> name ← 用自身TID查发送方名字
// 4. push node to queue ← 加锁,入队
// }
int send_msg(MBS *msb, char *recvname, DATATYPE data)
{
MAIL_DATA *temp = malloc(sizeof(MAIL_DATA));
// 1. copy data
strcpy(temp->data, data);
temp->id_of_sender = pthread_self();
// 2. find target by name from mailbox
LIST_LINK *find = list_for_each(msb->thread_list, recvname);
if(find == NULL) { printf("can't find msg\n"); return -1; }
// 3. find tid_self -> name
char *name = get_th_name(msb); // 用 TID 反查自己的名字
strcpy(temp->name_of_sender, name);
strcpy(temp->name_of_recver, recvname);
// 4. push node to queue(加锁保护)
ENTER_CRITICAL_AREA(&msb->mutex);
in_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
return 0;
}
4.4 接收消息——recv_msg()
对应图片里的五步逻辑:
// recv_data( sender, data )
// {
// 1. find tid_self -> name ← 用TID找到自己的节点
// 2. find node by tid ← 定位自己的消息队列
// 3. pop queue -> node ← 出队(忙等非空)
// 4. cmp recver name ← 确认消息是发给自己的
// 5. case sender deal data ← 按发送方分发处理
// }
int recv_msg(MBS *msb, char *sendname, DATATYPE data)
{
MAIL_DATA *temp = malloc(sizeof(MAIL_DATA));
pthread_t tid = pthread_self();
// 1+2: 用 TID 找到自己的链表节点
LIST_LINK *find = msb->thread_list;
while(find != NULL) {
if(find->elem.tid == tid) break;
find = find->next;
}
// 3: 忙等 + 出队
if(find->elem.tid == tid) {
while(1) { // ← 忙等待
if(find->elem.mail_head !=
find->elem.mail_tail) { // 队列非空
ENTER_CRITICAL_AREA(&msb->mutex);
out_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
break;
}
}
}
// 4+5: 拷出数据,返回给调用方
strcpy(sendname, temp->name_of_sender);
strcpy(data, temp->data);
free(temp);
return 0;
}
|
🔴 重点:忙等待(Busy Wait) while(1) 一直循环检查队列是否有消息,没消息就一直转,这叫「忙等待」或「自旋等待」。 优点:消息一来立刻响应,延迟极低。 缺点:没消息时线程持续占用 CPU,白白消耗资源。 工程优化:用 pthread_cond_wait() 条件变量,没消息时线程「睡觉」,有消息时再「唤醒」,CPU 不空转。 |
4.5 辅助函数
get_th_name()——用 TID 查线程名
send_msg 需要填写 name_of_sender,但发送方只能通过 pthread_self() 拿到 TID(数字),需要反查名字。
char *get_th_name(MBS *msb)
{
pthread_t tid = pthread_self(); // 当前线程的 TID
LIST_LINK *find = msb->thread_list;
LIST_LINK *end = end_list; // end_list = NULL(全局变量)
while(find != end) {
if(find->elem.tid == tid) break;
find = find->next;
}
if(find->elem.tid == tid) return find->elem.name;
else return NULL;
}
register_to_mail_system()——注册并启动线程
int register_to_mail_system(MBS *mbs, char name[], th_fun th)
{
LIST_LINK *temp = malloc(sizeof(LIST_LINK));
strcpy(temp->elem.name, name);
temp->elem.th = th;
temp->next = NULL;
init_que(temp); // 先初始化队列
pthread_create(&(temp->elem.tid), NULL, th, NULL); // 再启动线程
list_add(mbs->thread_list, temp); // 加入链表
return 0;
}
|
📌 注意顺序:init_que 必须在 pthread_create 之前 线程一旦启动,可能立刻调用 recv_msg,此时必须保证队列已经初始化。 如果先 pthread_create 再 init_que,线程里访问未初始化的 mail_head 会崩溃。 |
五、两个版本对比
笔记中包含两个实现版本,核心思路相同,但设计细节有差异:
|
老师版(MBS 系统) |
简化版(mailbox 版) |
|
消息队列:带哨兵头节点 head == tail 表示空 |
消息队列:无哨兵 head == NULL 表示空,简单直接 |
|
锁粒度:一把大锁 整个 MBS 共用 mutex |
锁粒度:每线程一把锁 每个 MAIL 节点有自己的 lock,并发效率更高 |
|
recv 行为:忙等待 while(1) 一直转到有消息 |
recv 行为:非阻塞 没消息直接返回,靠 sleep 控制时序 |
|
线程注册:main 统一管理 register 时内部调 pthread_create |
线程注册:线程自己注册 线程函数第一行调 register_th("name") |
|
send_data 步骤: 1.copy → 2.find target → 3.get sender name → 4.push |
send_data 步骤: 同上,字段名更简短(sender/recver 各32字节) |
|
队列操作:头插 |
队列操作:老师版头插(会导致顺序颠倒) 简化版改为尾插(先进先出) |
|
📌 简化版解决了顺序问题 老师版和简化版最初都用头插法(新节点插在队列最前面)。 头插法导致先发的消息反而排在后面(后进先出),recv 时顺序颠倒,sender 对不上。 简化版修复:改用尾插法,保证先发先到(先进先出),recv 顺序和 send 顺序一致。 |
六、完整执行时序
main()
│
├─ create_mail_box_system() → 建 MBS,初始化互斥锁 + 链表
│
├─ register("show", show_th) → 建节点,init_que,pthread_create,头插链表
├─ register("sock", sock_th) → 同上
└─ register("collect", collect_th) → 同上,collect 开始 while(i--) 循环
│
│ sleep(1) 后...
│
├─ send_msg(mbs, "show", "aabb") → 找 show 节点 → 加锁 in_queue
├─ send_msg(mbs, "show", "ccddee") → 同上
├─ send_msg(mbs, "show", "nihao") → 同上
├─ send_msg(mbs, "sock", "1122") → 找 sock 节点 → 加锁 in_queue
└─ ...
show_th()
└─ while(1) { recv_msg → 忙等 → 出队 → printf → sleep(2) }
当 collect 的消息入队后,show 的忙等退出,打印消息
sock_th()
└─ while(1) { recv_msg → 忙等 → 出队 → send_msg("show","I'm sock") → sleep(3) }
sock 收到消息后,再转发一条给 show
七、补充说明
7.1 do{...}while(0) 宏的必要性
// 假设宏定义为(不加 do while):
#define LOCK(m) pthread_mutex_lock(m)
#define UNLOCK(m) pthread_mutex_unlock(m); some_other_call()
// 使用时:
if(cond)
UNLOCK(m); // 展开后变成:
// if(cond) pthread_mutex_unlock(m); some_other_call();
// ↑只有这行受if控制 ↑这行总是执行!
// 正确写法(do while):
#define UNLOCK(m) do{ pthread_mutex_unlock(m); some_other_call(); }while(0)
// 无论放在 if/else/for 里,展开后都是一个完整的语句块
结论:多条语句的宏,务必用 do{...}while(0) 包裹,这是 C 语言宏的固定安全写法。
7.2 生产环境优化建议
|
现有方案(学习版) |
生产优化方向 |
|
忙等待(while 1 空转) |
用 pthread_cond_wait() 条件变量,没消息时线程挂起,有消息时 signal 唤醒,避免 CPU 空转 |
|
一把大锁(全局 mutex) |
拆分为每个线程一把锁,send 时只锁目标队列,不影响其他线程并发 |
|
链表查找 O(n) |
改用有序二叉树或哈希表,按 TID/name 查找降到 O(logn) 或 O(1) |
|
消息数组固定 256 字节 |
改用动态分配 + 消息池,减少频繁 malloc/free 的开销 |
|
无消息优先级 |
给 MAIL_DATA 添加 priority 字段(代码中注释的 pro),出队时按优先级排序 |
|
总结——Mailbox 通信的核心要点 1. 本质:每个线程有私有消息队列,send 往目标队列投消息,recv 从自己队列取消息 2. 关键设计:哨兵头节点让空队判断统一(head==tail),入出队边界处理更简洁 3. 线程安全:send/recv 操作共用一把互斥锁,或每队列一把锁,防止并发竞争 4. 查找机制:send 按线程名查找,recv 按 TID 查找自己的节点 5. 学习版 vs 生产版:学习版用忙等待便于理解,生产版应改为条件变量避免 CPU 空转 |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)