目录

一、什么是 Mailbox 通信

二、系统设计与数据结构

2.1  整体架构

2.2  关键数据结构

① MAIL_DATA——一条消息(一封信)

② Que——消息队列节点(信封)

③ LIST_DATA——线程节点(线程在信箱里的档案)

④ LIST_LINK——信箱链表节点

⑤ MBS——邮件系统总控

2.3  内存结构示意图

三、工作流程

四、核心模块实现详解

4.1  队列模块(queue.c)——带哨兵头节点的单向链表

init_que()——初始化队列

in_queue()——入队(尾插,保证先进先出)

out_queue()——出队(从哨兵后取第一个节点)

4.2  链表模块(list.c)

list_add()——头插法

list_for_each()——按名字查找线程节点

4.3  发送消息——send_msg()

4.4  接收消息——recv_msg()

4.5  辅助函数

get_th_name()——用 TID 查线程名

register_to_mail_system()——注册并启动线程

五、两个版本对比

六、完整执行时序

七、补充说明

7.1  do{...}while(0) 宏的必要性

7.2  生产环境优化建议


一、什么是 Mailbox 通信

Mailbox(邮箱通信)是一种经典的线程间通信(IPC,Inter-Process Communication)机制,广泛用于嵌入式系统和并发编程。

核心思想:每个线程拥有自己的消息队列,线程之间通过「发消息」和「收消息」来解耦数据生产与处理逻辑,不需要直接调用对方函数,也不需要共享内存。

生产者线程(collect)         目标线程(save / display)

      │                              │

      │  send_data("save", data)     │

      ├─────────────────────────────►│  save 的消息队列

      │                         [消息1][消息2]...

      │                              │

      │                         recv_data("collect1", buf)

      │                              │ 从队列取出,处理

💡  为什么用 Mailbox 而不是直接调用函数?

解耦:发送方不关心接收方什么时候处理,只管投递消息。

异步:收发双方运行在不同线程,互不阻塞对方主逻辑。

扩展性好:新增线程只需注册到信箱,不需要修改其他线程的代码。

二、系统设计与数据结构

2.1  整体架构

系统由五个核心组件构成:

组件

对应结构体

职责

邮件系统(MBS)

struct mail_box_system

全局单例,管理所有线程注册信息,持有互斥锁保护

线程节点

LIST_LINK → elem

存储 tid、名称、消息队列头尾指针、线程函数入口

邮件数据

MAIL_DATA

消息正文 + 发送方信息 + 接收方信息

队列模块

queue.h / queue.c

为每个线程提供独立消息队列(带哨兵头节点的单向链表)

链表模块

list.h / list.c

管理所有已注册线程节点的全局链表

2.2  关键数据结构

① MAIL_DATA——一条消息(一封信)

typedef struct mail_data {

    pthread_t  id_of_sender;         // 发送方线程 ID

    char       name_of_sender[256];  // 发送方线程名

    pthread_t  id_of_recver;         // 接收方线程 ID(预留)

    char       name_of_recver[256];  // 接收方线程名

    char       data[256];            // 消息内容(DATATYPE = char[256])

} MAIL_DATA;

每条消息带着「我是谁发的、发给谁、内容是什么」三份信息,接收方收到后能知道来源,据此分发处理。

② Que——消息队列节点(信封)

typedef struct queue {

    MAIL_DATA   data;        // 这个节点携带的一条消息

    struct queue *next;      // 指向下一个队列节点

} Que, *pQue;

③ LIST_DATA——线程节点(线程在信箱里的档案)

typedef struct thread_node {

    pthread_t   tid;              // 线程 ID

    char        name[256];        // 线程名,全局唯一

    Que        *mail_head;        // 消息队列头(哨兵节点)

    Que        *mail_tail;        // 消息队列尾

    th_fun      th;               // 线程函数指针

} LIST_DATA;

④ LIST_LINK——信箱链表节点

typedef struct Link {

    LIST_DATA   elem;        // 线程档案(上面的 LIST_DATA)

    struct Link *next;       // 指向链表下一个节点

} LIST_LINK;

⑤ MBS——邮件系统总控

typedef struct mail_box_system {

    pthread_mutex_t  mutex;        // 保护整个邮件系统的互斥锁

    LIST_LINK       *thread_list;  // 已注册线程的链表头(哑节点)

} MBS;

// 宏定义,让加锁/解锁语义更清晰

#define ENTER_CRITICAL_AREA(mutex)  do{ pthread_mutex_lock(mutex); }while(0)

#define QUIT_CRITICAL_AREA(mutex)   do{ pthread_mutex_unlock(mutex); }while(0)

💡  为什么宏要写成 do{...}while(0)?

如果直接写 #define LOCK(m) pthread_mutex_lock(m),在 if 语句里用会出问题:

    if(cond)  LOCK(m);  →  展开成  if(cond) pthread_mutex_lock(m);;  多了分号

do{...}while(0) 是 C 语言宏的标准写法,展开后是一个完整语句,无论放在哪里都安全。

2.3  内存结构示意图

MBS

 ├─ mutex(互斥锁)

 └─ thread_list

       │

      [哑头节点] ──► [show节点]    ──► [sock节点]    ──► [collect节点] ──► NULL

                      elem.name="show"  elem.name="sock"   elem.name="collect"

                      elem.tid=...      elem.tid=...        elem.tid=...

                      mail_head─►[哑]  mail_head─►[哑]     mail_head─►[哑]

                      mail_tail─►[哑]  mail_tail─►[msg1]   mail_tail─►[哑]

                                                    │

                                                   [msg2]

                                                    │

                                                   NULL

三、工作流程

1

create_mail_box_system()

创建 MBS,初始化互斥锁,创建链表哑头节点

2

register_to_mail_system()

为线程分配链表节点,初始化消息队列(哨兵节点),pthread_create 启动线程,头插法加入全局链表

3

send_msg(mbs, "target", data)

构造 MAIL_DATA,按名字从链表找目标节点,加锁后 in_queue 入队

4

recv_msg(mbs, sendname_buf, data_buf)

用 TID 找到自己的节点,忙等队列非空,加锁后 out_queue 出队,返回发送方名和数据

5

wait_all_end(mbs)

遍历链表对每个线程 pthread_join,等待所有线程结束

6

destroy_mail_box_system(mbs)

销毁互斥锁,释放所有链表节点内存

四、核心模块实现详解

4.1  队列模块(queue.c)——带哨兵头节点的单向链表

每个线程的消息队列使用「带哨兵头节点」的单向链表实现,mail_head 永远指向哨兵,mail_tail 指向最后一个有效节点(队列空时也指向哨兵)。

空队列:  mail_head == mail_tail

  [哨兵] → NULL

     ↑         ↑

  mail_head  mail_tail

插入两条消息后:

  [哨兵] → [消息1] → [消息2] → NULL

     ↑                            ↑

  mail_head                    mail_tail

init_que()——初始化队列

int init_que(LIST_LINK *list_head)

{

    Que *tmp = malloc(sizeof(Que));  // 申请哨兵节点

    tmp->next = NULL;

    // 头尾都指向哨兵,表示队列为空

    list_head->elem.mail_head = list_head->elem.mail_tail = tmp;

    return 0;

}

in_queue()——入队(尾插,保证先进先出)

int in_queue(LIST_LINK *list_head, MAIL_DATA *data)

{

    Que *tmp = malloc(sizeof(Que));

    memcpy(&tmp->data, data, sizeof(MAIL_DATA));  // 深拷贝消息数据

    tmp->next = NULL;

    // 挂到当前尾节点后面

    list_head->elem.mail_tail->next = tmp;

    // 尾指针后移

    list_head->elem.mail_tail = list_head->elem.mail_tail->next;

    return 0;

}

📌  为什么用 memcpy 而不是直接赋值?

MAIL_DATA 是一个结构体,直接赋值(=)也是值拷贝,效果相同。

memcpy 更明确表达「深拷贝」意图,避免将来改成指针类型时漏掉拷贝。

拷贝完之后,调用方 free 掉临时的 temp 不影响队列里的副本。

out_queue()——出队(从哨兵后取第一个节点)

int out_queue(LIST_LINK *list_head, MAIL_DATA *data)

{

    // 判断空队列:头 == 尾 表示只剩哨兵

    if(list_head->elem.mail_head == list_head->elem.mail_tail) {

        printf("queue is empty.\n");

        return -1;

    }

    // 只剩最后一个有效节点时,出队后尾指针要归位到哨兵

    if(list_head->elem.mail_head->next == list_head->elem.mail_tail) {

        list_head->elem.mail_tail = list_head->elem.mail_head;

    }

    Que *del = list_head->elem.mail_head->next; // 哨兵的 next 才是真数据

    list_head->elem.mail_head->next = del->next; // 哨兵跳过 del

    memcpy(data, &del->data, sizeof(MAIL_DATA)); // 把数据拷给调用方

    free(del);

    return 0;

}

4.2  链表模块(list.c)

list_add()——头插法

void list_add(LIST_LINK *head, LIST_LINK *info)

{

    info->next = head->next;  // 新节点指向原第一个有效节点

    head->next = info;        // 哑头节点指向新节点

}

// 插入前:[哑头] → [show] → [sock] → NULL

// 插入后:[哑头] → [collect] → [show] → [sock] → NULL

// ⚠ 最后注册的线程排在链表最前面(头插法特性)

list_for_each()——按名字查找线程节点

LIST_LINK * list_for_each(LIST_LINK *head, char *name)

{

    LIST_LINK *tmp = head;       // 从哑头节点开始遍历

    while(tmp->next != NULL)

    {

        if(strncmp(tmp->elem.name, name, strlen(name)) == 0)

            return tmp;

        tmp = tmp->next;

    }

    // 单独判断最后一个节点(循环结束条件是 next==NULL,最后一个没进循环体)

    if(strncmp(tmp->elem.name, name, strlen(name)) == 0)

        return tmp;

    return NULL;

}

⚠️  注意:从哑头节点开始比较

tmp 初始值是 head(哑头节点),其 elem.name 是未初始化的内存(通常是空字符串)。

如果传入空字符串 name,strncmp 可能误匹配哑头节点,返回错误结果。

实际使用中线程名不会为空,所以正常运行没问题,但这是一个边界隐患。

4.3  发送消息——send_msg()

对应图片里的四步逻辑:

// send_data( recver, data )

// {

//   1. copy data              ← 构造消息节点,复制数据

//   2. find save by name      ← 按名字从信箱找目标线程

//   3. find tid_self -> name  ← 用自身TID查发送方名字

//   4. push node to queue     ← 加锁,入队

// }

int send_msg(MBS *msb, char *recvname, DATATYPE data)

{

    MAIL_DATA *temp = malloc(sizeof(MAIL_DATA));

    // 1. copy data

    strcpy(temp->data, data);

    temp->id_of_sender = pthread_self();

    // 2. find target by name from mailbox

    LIST_LINK *find = list_for_each(msb->thread_list, recvname);

    if(find == NULL) { printf("can't find msg\n"); return -1; }

    // 3. find tid_self -> name

    char *name = get_th_name(msb);   // 用 TID 反查自己的名字

    strcpy(temp->name_of_sender, name);

    strcpy(temp->name_of_recver, recvname);

    // 4. push node to queue(加锁保护)

    ENTER_CRITICAL_AREA(&msb->mutex);

    in_queue(find, temp);

    QUIT_CRITICAL_AREA(&msb->mutex);

    return 0;

}

4.4  接收消息——recv_msg()

对应图片里的五步逻辑:

// recv_data( sender, data )

// {

//   1. find tid_self -> name   ← 用TID找到自己的节点

//   2. find node by tid        ← 定位自己的消息队列

//   3. pop queue -> node       ← 出队(忙等非空)

//   4. cmp recver name         ← 确认消息是发给自己的

//   5. case sender deal data   ← 按发送方分发处理

// }

int recv_msg(MBS *msb, char *sendname, DATATYPE data)

{

    MAIL_DATA *temp = malloc(sizeof(MAIL_DATA));

    pthread_t tid = pthread_self();

    // 1+2: 用 TID 找到自己的链表节点

    LIST_LINK *find = msb->thread_list;

    while(find != NULL) {

        if(find->elem.tid == tid) break;

        find = find->next;

    }

    // 3: 忙等 + 出队

    if(find->elem.tid == tid) {

        while(1) {                          // ← 忙等待

            if(find->elem.mail_head !=

               find->elem.mail_tail) {      // 队列非空

                ENTER_CRITICAL_AREA(&msb->mutex);

                out_queue(find, temp);

                QUIT_CRITICAL_AREA(&msb->mutex);

                break;

            }

        }

    }

    // 4+5: 拷出数据,返回给调用方

    strcpy(sendname, temp->name_of_sender);

    strcpy(data, temp->data);

    free(temp);

    return 0;

}

🔴  重点:忙等待(Busy Wait)

while(1) 一直循环检查队列是否有消息,没消息就一直转,这叫「忙等待」或「自旋等待」。

优点:消息一来立刻响应,延迟极低。

缺点:没消息时线程持续占用 CPU,白白消耗资源。

工程优化:用 pthread_cond_wait() 条件变量,没消息时线程「睡觉」,有消息时再「唤醒」,CPU 不空转。

4.5  辅助函数

get_th_name()——用 TID 查线程名

send_msg 需要填写 name_of_sender,但发送方只能通过 pthread_self() 拿到 TID(数字),需要反查名字。

char *get_th_name(MBS *msb)

{

    pthread_t tid = pthread_self();   // 当前线程的 TID

    LIST_LINK *find = msb->thread_list;

    LIST_LINK *end  = end_list;       // end_list = NULL(全局变量)

    while(find != end) {

        if(find->elem.tid == tid) break;

        find = find->next;

    }

    if(find->elem.tid == tid)  return find->elem.name;

    else                       return NULL;

}

register_to_mail_system()——注册并启动线程

int register_to_mail_system(MBS *mbs, char name[], th_fun th)

{

    LIST_LINK *temp = malloc(sizeof(LIST_LINK));

    strcpy(temp->elem.name, name);

    temp->elem.th = th;

    temp->next = NULL;

    init_que(temp);                              // 先初始化队列

    pthread_create(&(temp->elem.tid), NULL, th, NULL);  // 再启动线程

    list_add(mbs->thread_list, temp);            // 加入链表

    return 0;

}

📌  注意顺序:init_que 必须在 pthread_create 之前

线程一旦启动,可能立刻调用 recv_msg,此时必须保证队列已经初始化。

如果先 pthread_create 再 init_que,线程里访问未初始化的 mail_head 会崩溃。

五、两个版本对比

笔记中包含两个实现版本,核心思路相同,但设计细节有差异:

老师版(MBS 系统)

简化版(mailbox 版)

消息队列:带哨兵头节点

head == tail 表示空

消息队列:无哨兵

head == NULL 表示空,简单直接

锁粒度:一把大锁

整个 MBS 共用 mutex

锁粒度:每线程一把锁

每个 MAIL 节点有自己的 lock,并发效率更高

recv 行为:忙等待

while(1) 一直转到有消息

recv 行为:非阻塞

没消息直接返回,靠 sleep 控制时序

线程注册:main 统一管理

register 时内部调 pthread_create

线程注册:线程自己注册

线程函数第一行调 register_th("name")

send_data 步骤:

1.copy → 2.find target → 3.get sender name → 4.push

send_data 步骤:

同上,字段名更简短(sender/recver 各32字节)

队列操作:头插

队列操作:老师版头插(会导致顺序颠倒) 简化版改为尾插(先进先出)

📌  简化版解决了顺序问题

老师版和简化版最初都用头插法(新节点插在队列最前面)。

头插法导致先发的消息反而排在后面(后进先出),recv 时顺序颠倒,sender 对不上。

简化版修复:改用尾插法,保证先发先到(先进先出),recv 顺序和 send 顺序一致。

六、完整执行时序

main()

  │

  ├─ create_mail_box_system()          → 建 MBS,初始化互斥锁 + 链表

  │

  ├─ register("show", show_th)         → 建节点,init_que,pthread_create,头插链表

  ├─ register("sock", sock_th)         → 同上

  └─ register("collect", collect_th)   → 同上,collect 开始 while(i--) 循环

        │

        │  sleep(1) 后...

        │

        ├─ send_msg(mbs, "show", "aabb")    → 找 show 节点 → 加锁 in_queue

        ├─ send_msg(mbs, "show", "ccddee")  → 同上

        ├─ send_msg(mbs, "show", "nihao")   → 同上

        ├─ send_msg(mbs, "sock", "1122")    → 找 sock 节点 → 加锁 in_queue

        └─ ...

show_th()

  └─ while(1) { recv_msg → 忙等 → 出队 → printf → sleep(2) }

       当 collect 的消息入队后,show 的忙等退出,打印消息

sock_th()

  └─ while(1) { recv_msg → 忙等 → 出队 → send_msg("show","I'm sock") → sleep(3) }

       sock 收到消息后,再转发一条给 show

七、补充说明

7.1  do{...}while(0) 宏的必要性

// 假设宏定义为(不加 do while):

#define LOCK(m)  pthread_mutex_lock(m)

#define UNLOCK(m) pthread_mutex_unlock(m); some_other_call()

// 使用时:

if(cond)

    UNLOCK(m);       // 展开后变成:

// if(cond) pthread_mutex_unlock(m); some_other_call();

//            ↑只有这行受if控制        ↑这行总是执行!

// 正确写法(do while):

#define UNLOCK(m)  do{ pthread_mutex_unlock(m); some_other_call(); }while(0)

// 无论放在 if/else/for 里,展开后都是一个完整的语句块

结论:多条语句的宏,务必用 do{...}while(0) 包裹,这是 C 语言宏的固定安全写法。

7.2  生产环境优化建议

现有方案(学习版)

生产优化方向

忙等待(while 1 空转)

用 pthread_cond_wait() 条件变量,没消息时线程挂起,有消息时 signal 唤醒,避免 CPU 空转

一把大锁(全局 mutex)

拆分为每个线程一把锁,send 时只锁目标队列,不影响其他线程并发

链表查找 O(n)

改用有序二叉树或哈希表,按 TID/name 查找降到 O(logn) 或 O(1)

消息数组固定 256 字节

改用动态分配 + 消息池,减少频繁 malloc/free 的开销

无消息优先级

给 MAIL_DATA 添加 priority 字段(代码中注释的 pro),出队时按优先级排序

总结——Mailbox 通信的核心要点

1. 本质:每个线程有私有消息队列,send 往目标队列投消息,recv 从自己队列取消息

2. 关键设计:哨兵头节点让空队判断统一(head==tail),入出队边界处理更简洁

3. 线程安全:send/recv 操作共用一把互斥锁,或每队列一把锁,防止并发竞争

4. 查找机制:send 按线程名查找,recv 按 TID 查找自己的节点

5. 学习版 vs 生产版:学习版用忙等待便于理解,生产版应改为条件变量避免 CPU 空转

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐