一、多线程数据接收架构

缓冲区是解决速率不匹配最常见的方法。

多线程数据处理流水线设计,用来解决「数据接收速度远大于存储速度」的性能瓶颈问题,核心思路是用 FIFO(环形缓冲区 / 队列) 做解耦,实现异步处理。

环节 速度 / 耗时 角色 设计关键
数据接收 最快(us 级) 生产者 只入队,不阻塞
FIFO 队列 几乎无耗时 缓冲区 线程安全、容量合理、防溢出
文件写入 中等(ms 级) 消费者 1 批量写优化,减少系统调用
数据库存储 最慢(ms 级 / 批量) 消费者 2 批量插入、事务优化

文件空洞作用:

1. 预分配文件空间,避免频繁扩容

2. 降低写入操作的系统开销

文件空洞:

1.不占物理磁盘空间空洞部分不会写磁盘,只有真正写入的数据才占空间。

2.读取空洞返回全 0你读空洞位置,系统直接返回 \0,不会报错。

3.复制文件会消除空洞cp 默认会把空洞变成真实 0 字节,占满空间

二、程序流程

mailbox = 线程之间的 “专用收件箱”

  • 每个线程有自己的 mailbox(邮箱)
  • 别的线程想给它发消息 / 数据,就投到它的邮箱里
  • 它自己定时去邮箱里取

本质上:mailbox 就是一种 线程专属的 queue(队列)你可以理解成:mailbox = 带地址的、一对一通信的队列。

结构 特点 用途
queue(队列) 先进先出缓冲区 数据缓冲、异步处理
mailbox(邮箱) 每个线程一个,带 “收件地址” 线程间通信、发消息、发指令
// collect1 -> save
send_data(recver,data)
{
    1. copy data
    2. find save by name from mailbox
    3. find tid_self -> name
    4. push node to save queue
}
// th save from collect1
recv_data(sender,data)
{
    1. find tid_self -> name
    2. find node by name
    3. pop queue -> node
    4. cmp recver name
    5. case sender deal data
}

三、代码

定义线程函数指针类型th_fun:嵌入式多线程中统一线程函数原型,符合 POSIX 线程标准(pthread_create要求的函数格式)。

typedef void*(*th_fun)(void* arg);

消息载体结构体:嵌入式 IPC 中典型的 “消息头 + 消息体” 设计,同时保存线程 ID 和名称

typedef struct mail_data
{
    pthread_t   id_of_sender;    // 发送线程ID(系统级标识)
    char       name_of_sender[256]; // 发送线程名(业务级标识,易读)
    pthread_t   id_of_recver;    // 接收线程ID
    char       name_of_recver[256]; // 接收线程名
    DATATYPE data;               // 消息正文
}MAIL_DATA;

线程节点结构体:每个线程绑定一个独立的消息队列(邮箱),是 “线程 - 邮箱” 映射的核心载体。

typedef struct thread_node
{
    pthread_t tid;         // 线程ID(pthread_create返回的系统ID)
    char name[256];        // 线程名(唯一标识,如"show"/"sock")
    Que *mail_head, *mail_tail; // 该线程的消息队列头尾指针(邮箱核心)
    th_fun th;             // 线程执行函数指针
}LIST_DATA;

消息队列节点:嵌入式中链表队列是轻量级实现(无需连续内存,适配嵌入式小内存场景)。

typedef struct queue{
    MAIL_DATA data;    // 队列节点存储的消息
    struct queue* next;// 链表式队列的下一个节点
}Que, *pQue;

线程链表节点:用于管理所有注册到邮箱系统的线程(嵌入式中链表是管理动态对象的常用方式)。

typedef struct Link{
    LIST_DATA elem;    // 链表节点存储的线程数据
    struct Link *next; // 链表下一个节点
}LIST_LINK;

线程节点入队:头插法(嵌入式中无需排序场景下最快的插入方式),将新线程节点插入链表头部。

void list_add(LIST_LINK* head, LIST_LINK *info)
{
    info->next = head->next;
    head->next = info;
}

按线程名查找节点:遍历链表匹配线程名,strncmp避免字符串越界(嵌入式中内存越界是致命问题);返回 NULL 表示未找到。

LIST_LINK * list_for_each(LIST_LINK* head, char *name)
{
    LIST_LINK * tmp = NULL;
    tmp = head;
    while(tmp->next != NULL)
    {
        if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
           return tmp;
        tmp = tmp->next;
    }
    if(strncmp(tmp->elem.name, name, strlen(name)) == 0)
        return tmp;
    else
        return NULL;
}

消息入队:

  • 动态分配队列节点(嵌入式中需注意内存泄漏,需配套销毁逻辑);
  • memcpy深拷贝消息:嵌入式中避免 “野指针” 和数据覆盖(若用浅拷贝,原数据释放后队列中数据失效);
  • 队尾插入:符合 “FIFO” 消息队列特性(嵌入式 IPC 中 FIFO 是最常用的消息顺序)。
int in_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
    Que* tmp = malloc(sizeof(Que));
    memcpy(&tmp->data, data, sizeof(MAIL_DATA)); // 拷贝消息(深拷贝,避免原数据覆盖)
    tmp->next = NULL;
    list_head->elem.mail_tail->next = tmp; // 新节点加入队尾
    list_head->elem.mail_tail = list_head->elem.mail_tail->next; // 尾指针后移
    return 0;
}

消息出队:

  • 先判空:避免操作空队列导致崩溃(嵌入式中无 MMU,空指针访问直接宕机);
  • 哑节点设计优势:无需单独处理 “头节点是有效节点” 的情况;
  • 释放节点:嵌入式内存资源宝贵,必须即时释放。
int out_queue(LIST_LINK *list_head, MAIL_DATA *data)
{
    if(list_head->elem.mail_head == list_head->elem.mail_tail){
        printf("queue is empty. \n");
        return -1;
    }
    // 处理最后一个节点:尾指针回退到哑节点
    if(list_head->elem.mail_head->next == list_head->elem.mail_tail){
        list_head->elem.mail_tail = list_head->elem.mail_head;
    }
    Que* del = list_head->elem.mail_head->next; // 待删除的有效节点
    list_head->elem.mail_head->next = del->next; // 头指针后移
    memcpy(data, &del->data, sizeof(MAIL_DATA)); // 拷贝消息到输出参数
    free(del); // 释放节点(嵌入式中必须释放,否则内存泄漏)
    return 0;
}

队列销毁:遍历释放所有节点,嵌入式中退出线程前必须调用,避免内存泄漏。

void destroy(LIST_LINK *list_head)
{
    Que* tmp = list_head->elem.mail_head;
    while(list_head->elem.mail_head){
        tmp = list_head->elem.mail_head;
        list_head->elem.mail_head = list_head->elem.mail_head->next;
        free(tmp);		
    }
}

MBS(邮箱系统):嵌入式中 “管理器” 模式,封装全局资源(互斥锁 + 线程链表),便于统一管理。

typedef struct mail_box_system
{
    pthread_mutex_t mutex;  // 全局互斥锁(保护邮箱系统所有共享资源)
    LIST_LINK *thread_list; // 所有注册线程的链表头
}MBS;

int send_msg(MBS*msb,char*recvname,DATATYPE data); // 发送消息
int recv_msg(MBS*msb,char*sendname,DATATYPE data); // 接收消息

创建邮箱系统:

  • 内存分配 + 判空:嵌入式中 malloc 失败是常见场景(内存不足),必须处理;
  • 互斥锁初始化:POSIX 标准接口,NULL 表示默认属性(嵌入式中足够用);
  • 线程链表头初始化:为后续注册线程做准备。
MBS* create_mail_box_system()
{
    MBS *temp =(MBS*)malloc(sizeof(MBS));
    if(NULL ==  temp)
    {
        perror("create_mail_box_system mutex malloc failure\n");
        return NULL;
    }
    int ret = pthread_mutex_init(&temp->mutex,NULL); // 初始化互斥锁
    if(0 != ret)
    {
        perror("create_mail_box_system mutex init failure\n");
        return NULL;
    }
    temp->thread_list = malloc(sizeof(LIST_LINK)); // 初始化线程链表头
    temp->thread_list->next = NULL;
    printf("mail box create ok!! \n");
    return temp;
}

销毁邮箱系统:

  • 先销毁互斥锁(避免资源泄漏);
  • 遍历释放链表:嵌入式中必须递归 / 遍历释放链表,否则内存泄漏。
int destroy_mail_box_system(MBS*mbs)
{
    pthread_mutex_destroy(&mbs->mutex); // 销毁互斥锁
    LIST_LINK *temp = NULL;
    LIST_LINK *find = mbs->thread_list;
    while(find !=  NULL) // 遍历释放线程链表
    {
        temp = find;
        find = find->next;
        free(temp);
    }
    free(find); // 注:此处冗余,find已为NULL,嵌入式中free(NULL)安全
    return 0;
}

发送消息:

  • 构造消息体:填充发送 / 接收线程信息、消息正文;
  • 查找接收线程:确保消息发往正确的邮箱;
  • 临界区保护:入队操作是共享资源访问(多线程同时发消息),必须加锁(嵌入式中互斥锁是最基础的同步手段)。
int send_msg(MBS*msb, char*recvname, DATATYPE data)
{
    MAIL_DATA* temp =  malloc(sizeof(MAIL_DATA));
    strcpy(temp->data, data); // 赋值消息正文
    temp->id_of_sender = pthread_self(); // 发送线程ID
    // 查找接收线程节点
    LIST_LINK *find = list_for_each(msb->thread_list, recvname);
    if (find == NULL)
    {
        printf("can,t find msg \n");
        return -1;
    }
    // 获取发送线程名
    char* name = get_th_name(msb);
    strcpy(temp->name_of_sender,name);
    strcpy(temp->name_of_recver,recvname);
    // 临界区保护:入队操作
    ENTER_CRITICAL_AREA(&msb->mutex);
    in_queue(find, temp);
    QUIT_CRITICAL_AREA(&msb->mutex);
    return 0;
}

接收消息:

  • 查找自己的邮箱:按线程 ID 匹配;
  • 阻塞等待:嵌入式中 “忙等” 方式(无消息时循环),实际应优化为条件变量(减少 CPU 占用);
  • 临界区保护:出队操作加锁,避免多线程同时取消息导致队列错乱。
int recv_msg(MBS*msb,char*sendname,DATATYPE data)
{
    MAIL_DATA* temp =  malloc(sizeof(MAIL_DATA));
    pthread_t tid =  pthread_self(); // 获取当前线程ID
    // 查找当前线程的节点(自己的邮箱)
    LIST_LINK *find = msb->thread_list;
    while(find != NULL)
    {
        if( find->elem.tid == tid)
            break;
        find = find->next;
    }
    // 阻塞等待消息(无消息则死循环)
    if( find->elem.tid == tid)
    {
        while (1)
        {
            if(find->elem.mail_head != find->elem.mail_tail) // 队列非空
            {
                ENTER_CRITICAL_AREA(&msb->mutex);
                out_queue(find, temp); // 消息出队
                QUIT_CRITICAL_AREA(&msb->mutex);
                break;
            }
        }
    }
    // 拷贝消息到输出参数
    strcpy(sendname, temp->name_of_sender);
    strcpy(data, temp->data);
    free(temp); // 释放临时消息体
    return 0;
}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐