N26 mailbox
一、多线程数据接收架构
缓冲区是解决速率不匹配最常见的方法。

多线程数据处理流水线设计,用来解决「数据接收速度远大于存储速度」的性能瓶颈问题,核心思路是用 FIFO(环形缓冲区 / 队列) 做解耦,实现异步处理。
| 环节 | 速度 / 耗时 | 角色 | 设计关键 |
|---|---|---|---|
| 数据接收 | 最快(us 级) | 生产者 | 只入队,不阻塞 |
| FIFO 队列 | 几乎无耗时 | 缓冲区 | 线程安全、容量合理、防溢出 |
| 文件写入 | 中等(ms 级) | 消费者 1 | 批量写优化,减少系统调用 |
| 数据库存储 | 最慢(ms 级 / 批量) | 消费者 2 | 批量插入、事务优化 |
文件空洞作用:
1. 预分配文件空间,避免频繁扩容
2. 降低写入操作的系统开销
文件空洞:
1.不占物理磁盘空间空洞部分不会写磁盘,只有真正写入的数据才占空间。
2.读取空洞返回全 0你读空洞位置,系统直接返回 \0,不会报错。
3.复制文件会消除空洞cp 默认会把空洞变成真实 0 字节,占满空间
二、程序流程

mailbox = 线程之间的 “专用收件箱”
- 每个线程有自己的 mailbox(邮箱)
- 别的线程想给它发消息 / 数据,就投到它的邮箱里
- 它自己定时去邮箱里取
本质上:mailbox 就是一种 线程专属的 queue(队列)你可以理解成:mailbox = 带地址的、一对一通信的队列。
| 结构 | 特点 | 用途 |
|---|---|---|
| queue(队列) | 先进先出缓冲区 | 数据缓冲、异步处理 |
| mailbox(邮箱) | 每个线程一个,带 “收件地址” | 线程间通信、发消息、发指令 |
// collect1 -> save
send_data(recver,data)
{
1. copy data
2. find save by name from mailbox
3. find tid_self -> name
4. push node to save queue
}
// th save from collect1
recv_data(sender,data)
{
1. find tid_self -> name
2. find node by name
3. pop queue -> node
4. cmp recver name
5. case sender deal data
}
三、代码
定义线程函数指针类型th_fun:嵌入式多线程中统一线程函数原型,符合 POSIX 线程标准(pthread_create要求的函数格式)。
typedef void*(*th_fun)(void* arg);
消息载体结构体:嵌入式 IPC 中典型的 “消息头 + 消息体” 设计,同时保存线程 ID 和名称
typedef struct mail_data
{
pthread_t id_of_sender; // 发送线程ID(系统级标识)
char name_of_sender[256]; // 发送线程名(业务级标识,易读)
pthread_t id_of_recver; // 接收线程ID
char name_of_recver[256]; // 接收线程名
DATATYPE data; // 消息正文
}MAIL_DATA;
线程节点结构体:每个线程绑定一个独立的消息队列(邮箱),是 “线程 - 邮箱” 映射的核心载体。
typedef struct thread_node
{
pthread_t tid; // 线程ID(pthread_create返回的系统ID)
char name[256]; // 线程名(唯一标识,如"show"/"sock")
Que *mail_head, *mail_tail; // 该线程的消息队列头尾指针(邮箱核心)
th_fun th; // 线程执行函数指针
}LIST_DATA;
消息队列节点:嵌入式中链表队列是轻量级实现(无需连续内存,适配嵌入式小内存场景)。
typedef struct queue{
MAIL_DATA data; // 队列节点存储的消息
struct queue* next;// 链表式队列的下一个节点
}Que, *pQue;
线程链表节点:用于管理所有注册到邮箱系统的线程(嵌入式中链表是管理动态对象的常用方式)。
typedef struct Link{
LIST_DATA elem; // 链表节点存储的线程数据
struct Link *next; // 链表下一个节点
}LIST_LINK;
线程节点入队:头插法(嵌入式中无需排序场景下最快的插入方式),将新线程节点插入链表头部。
void list_add(LIST_LINK* head, LIST_LINK *info)
{
info->next = head->next;
head->next = info;
}
按线程名查找节点:遍历链表匹配线程名,strncmp避免字符串越界(嵌入式中内存越界是致命问题);返回 NULL 表示未找到。
LIST_LINK * list_for_each(LIST_LINK* head, char *name)
{
LIST_LINK * tmp = NULL;
tmp = head;
while(tmp->next != NULL)
{
if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
return tmp;
tmp = tmp->next;
}
if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
return tmp;
else
return NULL;
}
消息入队:
- 动态分配队列节点(嵌入式中需注意内存泄漏,需配套销毁逻辑);
memcpy深拷贝消息:嵌入式中避免 “野指针” 和数据覆盖(若用浅拷贝,原数据释放后队列中数据失效);- 队尾插入:符合 “FIFO” 消息队列特性(嵌入式 IPC 中 FIFO 是最常用的消息顺序)。
int in_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
Que* tmp = malloc(sizeof(Que));
memcpy(&tmp->data, data, sizeof(MAIL_DATA)); // 拷贝消息(深拷贝,避免原数据覆盖)
tmp->next = NULL;
list_head->elem.mail_tail->next = tmp; // 新节点加入队尾
list_head->elem.mail_tail = list_head->elem.mail_tail->next; // 尾指针后移
return 0;
}
消息出队:
- 先判空:避免操作空队列导致崩溃(嵌入式中无 MMU,空指针访问直接宕机);
- 哑节点设计优势:无需单独处理 “头节点是有效节点” 的情况;
- 释放节点:嵌入式内存资源宝贵,必须即时释放。
int out_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
if(list_head->elem.mail_head == list_head->elem.mail_tail){
printf("queue is empty. \n");
return -1;
}
// 处理最后一个节点:尾指针回退到哑节点
if(list_head->elem.mail_head->next == list_head->elem.mail_tail){
list_head->elem.mail_tail = list_head->elem.mail_head;
}
Que* del = list_head->elem.mail_head->next; // 待删除的有效节点
list_head->elem.mail_head->next = del->next; // 头指针后移
memcpy(data, &del->data, sizeof(MAIL_DATA)); // 拷贝消息到输出参数
free(del); // 释放节点(嵌入式中必须释放,否则内存泄漏)
return 0;
}
队列销毁:遍历释放所有节点,嵌入式中退出线程前必须调用,避免内存泄漏。
void destroy(LIST_LINK *list_head)
{
Que* tmp = list_head->elem.mail_head;
while(list_head->elem.mail_head){
tmp = list_head->elem.mail_head;
list_head->elem.mail_head = list_head->elem.mail_head->next;
free(tmp);
}
}
MBS(邮箱系统):嵌入式中 “管理器” 模式,封装全局资源(互斥锁 + 线程链表),便于统一管理。
typedef struct mail_box_system
{
pthread_mutex_t mutex; // 全局互斥锁(保护邮箱系统所有共享资源)
LIST_LINK *thread_list; // 所有注册线程的链表头
}MBS;
int send_msg(MBS*msb,char*recvname,DATATYPE data); // 发送消息
int recv_msg(MBS*msb,char*sendname,DATATYPE data); // 接收消息
创建邮箱系统:
- 内存分配 + 判空:嵌入式中 malloc 失败是常见场景(内存不足),必须处理;
- 互斥锁初始化:POSIX 标准接口,NULL 表示默认属性(嵌入式中足够用);
- 线程链表头初始化:为后续注册线程做准备。
MBS* create_mail_box_system()
{
MBS *temp =(MBS*)malloc(sizeof(MBS));
if(NULL == temp)
{
perror("create_mail_box_system mutex malloc failure\n");
return NULL;
}
int ret = pthread_mutex_init(&temp->mutex,NULL); // 初始化互斥锁
if(0 != ret)
{
perror("create_mail_box_system mutex init failure\n");
return NULL;
}
temp->thread_list = malloc(sizeof(LIST_LINK)); // 初始化线程链表头
temp->thread_list->next = NULL;
printf("mail box create ok!! \n");
return temp;
}
销毁邮箱系统:
- 先销毁互斥锁(避免资源泄漏);
- 遍历释放链表:嵌入式中必须递归 / 遍历释放链表,否则内存泄漏。
int destroy_mail_box_system(MBS*mbs)
{
pthread_mutex_destroy(&mbs->mutex); // 销毁互斥锁
LIST_LINK *temp = NULL;
LIST_LINK *find = mbs->thread_list;
while(find != NULL) // 遍历释放线程链表
{
temp = find;
find = find->next;
free(temp);
}
free(find); // 注:此处冗余,find已为NULL,嵌入式中free(NULL)安全
return 0;
}
发送消息:
- 构造消息体:填充发送 / 接收线程信息、消息正文;
- 查找接收线程:确保消息发往正确的邮箱;
- 临界区保护:入队操作是共享资源访问(多线程同时发消息),必须加锁(嵌入式中互斥锁是最基础的同步手段)。
int send_msg(MBS*msb, char*recvname, DATATYPE data)
{
MAIL_DATA* temp = malloc(sizeof(MAIL_DATA));
strcpy(temp->data, data); // 赋值消息正文
temp->id_of_sender = pthread_self(); // 发送线程ID
// 查找接收线程节点
LIST_LINK *find = list_for_each(msb->thread_list, recvname);
if (find == NULL)
{
printf("can,t find msg \n");
return -1;
}
// 获取发送线程名
char* name = get_th_name(msb);
strcpy(temp->name_of_sender,name);
strcpy(temp->name_of_recver,recvname);
// 临界区保护:入队操作
ENTER_CRITICAL_AREA(&msb->mutex);
in_queue(find, temp);
QUIT_CRITICAL_AREA(&msb->mutex);
return 0;
}
接收消息:
- 查找自己的邮箱:按线程 ID 匹配;
- 阻塞等待:嵌入式中 “忙等” 方式(无消息时循环),实际应优化为条件变量(减少 CPU 占用);
- 临界区保护:出队操作加锁,避免多线程同时取消息导致队列错乱。
int recv_msg(MBS*msb,char*sendname,DATATYPE data)
{
MAIL_DATA* temp = malloc(sizeof(MAIL_DATA));
pthread_t tid = pthread_self(); // 获取当前线程ID
// 查找当前线程的节点(自己的邮箱)
LIST_LINK *find = msb->thread_list;
while(find != NULL)
{
if( find->elem.tid == tid)
break;
find = find->next;
}
// 阻塞等待消息(无消息则死循环)
if( find->elem.tid == tid)
{
while (1)
{
if(find->elem.mail_head != find->elem.mail_tail) // 队列非空
{
ENTER_CRITICAL_AREA(&msb->mutex);
out_queue(find, temp); // 消息出队
QUIT_CRITICAL_AREA(&msb->mutex);
break;
}
}
}
// 拷贝消息到输出参数
strcpy(sendname, temp->name_of_sender);
strcpy(data, temp->data);
free(temp); // 释放临时消息体
return 0;
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)