🔥个人主页:Cx330🌸

❄️个人专栏:《C语言》《LeetCode刷题集》《数据结构-初阶》《C++知识分享》

《优选算法指南-必刷经典100题》《Linux操作系统》:从入门到入魔

《Git深度解析》:版本管理实战全解

🌟心向往之行必能至


🎥Cx330🌸的简介:


目录

前言:

一、先搞懂:进程池是什么?核心优势有哪些?

二、手搓进程池:分步实现(附完整代码)

步骤1:前期准备——定义任务类型与测试任务​

步骤2:实现子进程工作逻辑——任务执行的核心

步骤3:封装Channel类——管理主从进程通信与子进程

步骤4:封装ProcessPool类——进程池核心管理逻辑

步骤5:主函数测试

三、编译运行与结果分析(附Makefile)

四、完整代码展示

五、进阶优化:让进程池更实用

六、常见坑点与注意事项

七、总结


前言:

在Linux后台开发中,频繁创建和销毁进程会带来巨大的系统开销——进程创建需要分配资源、初始化PCB,销毁则需要回收资源,尤其在高并发场景(如Web服务器、任务调度)中,这种开销会严重影响程序性能。而进程池,就是解决这个问题的“利器”。

今天我们就来“手搓”一个简单但可复用的Linux进程池,从原理拆解到代码实现,带你搞懂进程池的核心逻辑,学会如何通过进程复用提升程序效率。


一、先搞懂:进程池是什么?核心优势有哪些?

进程池,顾名思义,就是提前创建一定数量的子进程,通过匿名管道统一管理复用这些子进程来处理任务,而不是每次有任务就创建新进程、任务结束就销毁进程。

核心优势

  • 降低系统开销:避免频繁创建/销毁进程的资源消耗,子进程可重复使用

  • 提升响应速度:任务到来时,无需等待进程创建,直接分配空闲子进程处理

  • 便于管理:统一管理子进程的创建、销毁、任务分配,避免子进程泄露


二、手搓进程池:分步实现(附完整代码)

步骤1:前期准备——定义任务类型与测试任务​

首先定义任务的统一类型,封装各类测试任务,为后续进程池执行任务奠定基础,这是进程池“可处理多类型任务”的核心前提。

#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <memory>
#include <sys/wait.h>

#define __MAIN__

// 1. 定义任务类型:用function封装,支持任意无参无返回值的任务
using task_t = std::function<void()>;

// 2. 实现4类测试任务(模拟实际业务场景)
void printlog()
{
  // sleep(1); // 可注释/打开,模拟任务执行耗时
  std::cout<<"我是一个打印日志的任务,pid: "<<getpid()<<std::endl;
}
void download()
{
  // sleep(1);
  std::cout<<"我是一个下载任务,pid: "<<getpid()<<std::endl;
}
void readmysql()
{
  // sleep(1);
  std::cout<<"我是一个访问数据库的操作,pid: "<<getpid()<<std::endl;
}
void writeredis()
{
  // sleep(1);
  std::cout<<"我是一个访问redis的任务,pid: "<<getpid()<<std::endl;
}

// 3. 任务容器:存储所有可执行任务,供后续根据任务码调用
std::vector<task_t> gtasks;

// 4. 加载任务:将所有测试任务存入容器
void LoadTask()
{
  gtasks.push_back(printlog);
  gtasks.push_back(download);
  gtasks.push_back(readmysql);
  gtasks.push_back(writeredis);
}

// 5. 生成随机任务:模拟高并发场景下的随机任务请求(输出型参数存储任务码)
void RandomTask(std::vector<int> *out)
{
  for(int i=0;i<50;i++)
  {
    int code=rand() % gtasks.size(); // 随机生成任务码(0-3,对应4类任务)
    usleep(23223); // 模拟任务请求间隔
    out->push_back(code);
  }
}

// 6. 任务码转字符串:便于打印日志,直观查看任务类型
#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIS_TASK 3

std::string Task2String(int code)
{
  switch(code)
  {
    case LOG_TASK: return "printlog";
    case DOWNLOAD_TASK: return "download";  
    case MYSQL_TASK: return "readmysql";  
    case REDIS_TASK: return "writeredis";
    default: return "unknown"; 
  }
}

关键说明:

  • std::function<void()>定义任务类型,实现“任务解耦”,后续可轻松添加新任务,无需修改进程池核心逻辑;

  • 任务码(0-3)对应不同任务,通过RandomTask模拟真实场景中随机到来的任务请求;

  • LoadTask函数统一加载任务,便于管理和扩展,符合“单一职责”原则。

步骤2:实现子进程工作逻辑——任务执行的核心

子进程的核心作用是“等待任务、执行任务”,通过读取管道中的任务码,调用对应的任务函数,这是进程复用的核心逻辑。

// 子进程工作函数:从管道读取任务码,执行对应任务
void Work(int rfd)
{
  while (true)
  {
    int code=0;
    // 从管道读任务码(阻塞等待,直到主进程发送任务)
    ssize_t n=read(rfd,&code,sizeof(code));
    if(n==sizeof(int)) // 读取到完整任务码
    {
      // 任务码合法,执行对应任务
      if(code>=0 && code<gtasks.size())
      {
        gtasks[code](); // 调用任务容器中的对应任务
      }
    }
    else if(n==0) // 读取到EOF(主进程关闭管道写端),子进程退出
    {
      break;
    }
    else // 读取失败,子进程退出
    {
      break;
    }
  }
}

关键说明:

  • 子进程通过read从管道读端(rfd)获取任务码,阻塞等待任务,实现“空闲时待命、有任务时执行”;

  • 只有读取到完整的任务码(4字节,int类型),才执行任务,避免任务错乱;

  • 当主进程关闭管道写端,子进程读取到n=0,主动退出,避免僵尸进程。

步骤3:封装Channel类——管理主从进程通信与子进程

Channel类是进程池的“通信桥梁”,封装了主进程与单个子进程的管道写端、子进程PID,统一管理任务发送、管道关闭、子进程回收,简化进程池的管理逻辑。

class Channel
{
public:
  // 构造函数:初始化管道写端和子进程PID
  Channel(int wfd,pid_t who)
      : _wfd(wfd),
      _sub_process_id(who)
  {
    _name="Channel-"+std::to_string(_sub_process_id)+"-"+std::to_string(_wfd);
  }
  // 获取管道写端(供主进程发送任务使用)
  int Fd() { return _wfd; }
  // 获取子进程PID(供回收子进程使用)
  pid_t SubId() { return _sub_process_id; }
  // 获取Channel名称(便于日志打印和调试)
  std::string Name() { return _name; }
  // 关闭管道写端(通知子进程退出)
  void Close()
  {
    if(_wfd>=0) close(_wfd);
  }
  // 回收子进程(避免僵尸进程)
  void Wait()
  {
    pid_t rid=waitpid(_sub_process_id,nullptr,0);
    (void)rid; // 消除未使用变量警告
  }
  // 向子进程发送任务(写入任务码到管道)
  void SendTask(int taskcode)
  {
    ssize_t n=write(_wfd,&taskcode,sizeof(taskcode));
    (void)n; // 简化处理,实际可增加错误判断
  }
  ~Channel() {} // 析构函数,无需额外操作(管道已在Close中关闭)

private:
  int _wfd;               // 管道写端(主进程用,向子进程发任务)
  pid_t _sub_process_id;  // 对应的子进程PID
  std::string _name;      // Channel名称(用于调试和日志)
};

关键说明:

  • 每个子进程对应一个Channel对象,主进程通过Channel的SendTask发送任务,通过CloseWait回收子进程;

  • 封装后,进程池无需直接操作管道和子进程PID,降低耦合度,后续修改通信逻辑(如替换为消息队列)只需修改Channel类;

  • _name属性便于调试,可直观查看任务发送给了哪个子进程。

步骤4:封装ProcessPool类——进程池核心管理逻辑

ProcessPool类是整个进程池的“管理者”,负责创建子进程、管理所有Channel、实现负载均衡分配任务、终止进程池,是面向对象设计的核心。

class ProcessPool
{
private:
  // 负载均衡:轮询选择子进程(Next函数实现轮询逻辑)
  int Next()
  {
    int choice=_next_choice;
    _next_choice++;
    _next_choice %= _channels.size(); // 取模实现循环轮询
    return choice;
  }
public:
  // 构造函数:初始化进程池大小(子进程数量)
  ProcessPool(int number)
    :_number(number),
    _next_choice(0) // 轮询起始索引,初始为0
  {}

  // 启动进程池:创建指定数量的子进程和对应Channel
  void Start()
  {
    for (int i = 0; i < _number; i++)
    {
      // 1. 创建管道(主写子读)
      int pipefd[2];
      int n = pipe(pipefd);
      if (n < 0) { perror("pipe:"); exit(2); }

      // 2. 创建子进程
      pid_t id = fork();
      if (id < 0) { perror("fork:"); exit(3); }
      else if (id == 0) // 子进程逻辑
      {
        close(pipefd[1]); // 子进程关闭写端,只保留读端
        Work(pipefd[0]);  // 子进程进入工作循环,等待任务
        close(pipefd[0]); // 执行完任务,关闭读端
        exit(0);          // 子进程退出
      }
      else // 主进程逻辑
      {
        close(pipefd[0]); // 主进程关闭读端,只保留写端
        // 创建Channel对象,管理当前子进程和管道写端,存入容器
        _channels.emplace_back(pipefd[1],id);
      }
    }
  }

  // 推送任务:根据轮询策略,将任务发送给子进程
  void PushTask(int taskcode)
  {
    int who=Next(); // 轮询选择一个子进程(负载均衡)
    _channels[who].SendTask(taskcode); // 发送任务码

    // 打印日志,直观查看任务分配情况
    std::cout<<"发送任务:"<<Task2String(taskcode)<<"["<< taskcode <<"]"<<"给:"<<_channels[who].Name()<<std::endl;
  }

  // 终止进程池:关闭所有管道写端,回收所有子进程
  void Stop()
  {
    // 从后往前关闭管道、回收子进程(避免容器迭代时出现异常)
    int end=_channels.size()-1;
    while(end>=0)
    {
      _channels[end].Close();  // 关闭管道写端,通知子进程退出
      _channels[end].Wait();   // 回收子进程
      std::cout<<_channels[end].Name()<<" close and wait success!"<<std::endl;
      end--;
    }
  }

  // 调试打印:输出所有Channel的信息(管道写端、子进程PID、名称)
  void DebugPrint()
  {
    std::cout<<"--------------------------------"<<std::endl;
    for(auto &channel:_channels)
    {
      std::cout<<channel.Fd()<<std::endl;
      std::cout<<channel.SubId()<<std::endl;
      std::cout<<channel.Name()<<std::endl;
    }
    std::cout<<"--------------------------------"<<std::endl;
  }

  ~ProcessPool() {} // 析构函数,无需额外操作(资源已在Stop中释放)

private:
  std::vector<Channel> _channels; // 存储所有Channel,管理子进程和通信
  int _number;                    // 进程池大小(子进程数量)
  int _next_choice;               // 轮询索引,实现负载均衡
};

关键说明(核心重点):

  • Start()函数:批量创建子进程和管道,主进程关闭管道读端、保存写端到Channel,子进程关闭写端、进入Work循环等待任务;

  • 负载均衡:Next()函数通过轮询(_next_choice自增取模)分配任务,确保所有子进程负载均匀,避免单个子进程忙碌、其他子进程空闲;

  • Stop()函数:从后往前关闭管道写端、回收子进程,避免迭代容器时因元素删除导致的异常,确保所有资源被正确回收;

  • std::vector<Channel>管理所有子进程通信,结构清晰,便于扩展(如动态调整进程池大小)。

步骤5:主函数测试

主函数实现进程池的完整调用流程:解析命令行参数、加载任务、生成随机任务、启动进程池、推送任务、终止进程池,测试进程池的正常工作。

// 主进程入口(仅在定义__MAIN__时生效,避免重复编译)
#ifdef __MAIN__

// 用法提示:输入进程池大小(如./process_pool 5)
static void Usage(const std::string &proc)
{
  std::cout << "Usage:\n\t" << proc << " process_number" << std::endl;
}

// 程序入口:./process_pool 进程池大小
int main(int argc, char *argv[])
{
  // 1. 解析命令行参数(必须输入进程池大小)
  if (argc != 2) { Usage(argv[0]); exit(1); }
  int number = std::stoi(argv[1]); // 进程池大小(子进程数量)

  // 2. 初始化任务:加载所有测试任务,生成50个随机任务
  srand(time(nullptr)^getpid()); // 设置随机种子(结合时间和PID,确保随机性)
  LoadTask();
  std::vector<int> task_codes;
  RandomTask(&task_codes);

  // 3. 创建进程池(智能指针管理,自动释放,避免内存泄漏)
  std::unique_ptr<ProcessPool> pp=std::make_unique<ProcessPool>(number);
  // 4. 启动进程池,创建子进程
  pp->Start();
  sleep(2); // 等待子进程初始化完成

  // 5. 推送所有随机任务(注释的代码可用于手动输入任务码测试)
  for(auto task:task_codes)
  {
    pp->PushTask(task);
    usleep(500000); // 模拟任务推送间隔,避免任务堆积
  }

  // 6. 终止进程池,回收所有资源
  pp->Stop();

  return 0;
}

#endif

关键说明:

  • std::unique_ptr管理ProcessPool对象,自动释放内存,避免手动管理内存导致的泄漏;

  • 命令行参数解析:输入进程池大小(如./process_pool 5,表示创建5个子进程),符合Linux程序的使用习惯;

  • 注释的代码可用于手动输入任务码测试,灵活切换“随机任务”和“手动任务”,便于调试;

  • sleep(2)等待子进程初始化完成,避免主进程推送任务时,子进程尚未进入工作循环。


三、编译运行与结果分析(附Makefile)

// Makefile
process_pool:process_pool.cc
	g++ -o $@ $^ -std=c++14

.PHONY:clean
clean:
	rm -f process_pool
  • 编译:直接 make;
  • 运行./process_pool 5(5 为子进程数量,可自定义);

结果分析:

  • 5个子进程循环复用,处理15个任务,无需频繁创建/销毁子进程;

  • 当任务数超过队列最大容量(10)时,主进程阻塞等待,避免任务堆积;

  • 销毁进程池时,所有子进程正常退出,资源被回收,无僵尸进程。


四、完整代码展示

#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <memory>
#include <sys/wait.h>

#define __MAIN__

//////////////////////////////任务测试代码/////////////////////////////

using task_t =std::function<void()>;

void printlog()
{
  // sleep(1);
  std::cout<<"我是一个打印日志的任务,pid: "<<getpid()<<std::endl;
}
void download()
{
  // sleep(1);
  std::cout<<"我是一个下载任务,pid: "<<getpid()<<std::endl;
}

void readmysql()
{
  // sleep(1);
  std::cout<<"我是一个访问数据库的操作,pid: "<<getpid()<<std::endl;
}

void writeredis()
{
  // sleep(1);
  std::cout<<"我是一个访问redis的任务,pid: "<<getpid()<<std::endl;
}

std::vector<task_t> gtasks;

void LoadTask()
{
  gtasks.push_back(printlog);
  gtasks.push_back(download);
  gtasks.push_back(readmysql);
  gtasks.push_back(writeredis);
}

// *:输出型参数
// const &:输入型参数
// &:输入输出型
void RandomTask(std::vector<int> *out)
{
  for(int i=0;i<50;i++)
  {
    int code=rand() % gtasks.size();
    usleep(23223);
    out->push_back(code);
  }
}

#define LOG_TASK 0
#define DOWNLOAD_TASK 1
#define MYSQL_TASK 2
#define REDIS_TASK 3

std::string Task2String(int code)
{
  switch(code)
  {
    case LOG_TASK:
      return "printlog";
    case DOWNLOAD_TASK:
      return "download";  
    case MYSQL_TASK:
      return "readmysql";  
    case REDIS_TASK:
      return "writeredis";
    default:
      return "unknown"; 
  }
}


//////////////////////////////进程池代码/////////////////////////////


void Work(int rfd)
{
  while (true)
  {
    int code=0;
    ssize_t n=read(rfd,&code,sizeof(code));
    if(n==sizeof(int))
    {
      if(code>=0 && code<gtasks.size())
      {
        gtasks[code]();
      }
    }
    else if(n==0)
    {
      break; // 子进程只要读到返回值为0,表明父进程让我退出
    }
    else
    {
      break;
    }
  }
}

class Channel
{
public:
  Channel(int wfd,pid_t who)
      : _wfd(wfd),
      _sub_process_id(who)
  {
    _name="Channel-"+std::to_string(_sub_process_id)+'-'+std::to_string(_wfd);
  }
  int Fd()
  {
    return _wfd;
  }
  pid_t SubId()
  {
    return _sub_process_id;
  }
  std::string Name()
  {
    return _name;
  }
  void Close()
  {
    if(_wfd>=0)
      close(_wfd);
  }
  void Wait()
  {
    pid_t rid=waitpid(_sub_process_id,nullptr,0);
    (void)rid;
  }
  void SendTask(int taskcode)
  {
    ssize_t n=write(_wfd,&taskcode,sizeof(taskcode));
    (void)n;
  }
  ~Channel()
  {}

private:
  int _wfd;
  pid_t _sub_process_id;
  std::string _name;
};

class ProcessPool
{
private:
  int Next()
  {
    int choice=_next_choice;
    _next_choice++;
    _next_choice %= _channels.size();
    return choice;
  }
public:
  ProcessPool(int number)
    :_number(number),
    _next_choice(0) 
  {}
  // 父进程
  void Start()
  {
    for (int i = 0; i < _number; i++)
    {
      // 创建管道
      int pipefd[2];
      int n = pipe(pipefd);
      if (n < 0)
      {
        perror("pipe:");
        exit(2);
      }
      // 创建子进程
      pid_t id = fork();
      if (id < 0)
      {
        perror("fork:");
        exit(3);
      }
      else if (id == 0) // 子进程
      {
        close(pipefd[1]);
        Work(pipefd[0]);
        close(pipefd[0]);
        exit(0);
      }
      else // 父进程
      {
        close(pipefd[0]);
        // pipefd[1]; // ??

        // Channel c(pipefd[1]);
        // channels.push_back(c);

        _channels.emplace_back(pipefd[1],id);
      }
    }
  }
  // 1.什么任务——任务码决定
  // 2.任务给谁——属于进程池内部操作,负载均衡
  void PushTask(int taskcode)
  {
    //选择一个子进程
    int who=Next();
    _channels[who].SendTask(taskcode);

    std::cout<<"发送任务:"<<Task2String(taskcode)<<"["<< taskcode <<"]"<<"给:"<<_channels[who].Name()<<std::endl;
  }
  void Stop()
  {
    // version2 ???
    // for(auto &channel:_channels)
    // {
    //   channel.Close();
    //   channel.Wait();
    //   std::cout<<channel.Name()<<"close and wait success!"<<std::endl;
    // }

    // version3
    int end=_channels.size()-1;
    while(end>=0)
    {
      _channels[end].Close();
      _channels[end].Wait();
      std::cout<<_channels[end].Name()<<"close and wait success!"<<std::endl;
      end--;
    }

    // // 内部bug!
    // // 1.关闭wfd ————version1
    // for(auto &channel:_channels)
    // {
    //   channel.Close();
    //   std::cout<<channel.Name()<<"close success!"<<std::endl;
    // }
    // sleep(3);
    // // 2.回收子进程
    // for(auto &channel:_channels)
    // {
    //   channel.Wait();
    //   std::cout<<channel.Name()<<"wait success!"<<std::endl;
    // }
  }
  void DebugPrint()
  {
    std::cout<<"--------------------------------"<<std::endl;
    for(auto &channel:_channels)
    {
      std::cout<<channel.Fd()<<std::endl;
      std::cout<<channel.SubId()<<std::endl;
      std::cout<<channel.Name()<<std::endl;
    }
    std::cout<<"--------------------------------"<<std::endl;
  }
  ~ProcessPool() {}

private:
    std::vector<Channel> _channels;
    int _number;
    int _next_choice;
};

// 父进程

#ifdef __MAIN__

static void Usage(const std::string &proc)
{
  std::cout << "Usage:\n\t" << proc << "process_number" << std::endl;
}

// ./process_pool 5
int main(int argc, char *argv[])
{
  if (argc != 2)
  {
    Usage(argv[0]);
    exit(1);
  }
  int number = std::stoi(argv[1]);
  // 0.加载任务
  srand(time(nullptr)^getpid()); 
  LoadTask();
  std::vector<int> task_codes;
  RandomTask(&task_codes);

  // 1.创建进程池对象
  std::unique_ptr<ProcessPool> pp=std::make_unique<ProcessPool>(number);
  // 2.启动进程池
  pp->Start();
  sleep(2);

  // for(auto task:task_codes)
  // {
  //   pp->PushTask(task);
  //   usleep(500000);
  // }

  // while(true)
  // {
  //   // int code=0;
  //   // std::cout<<"Please Enter Your Task# ";
  //   // std::cin>>code;
  //   // if(code<0 || code>gtasks.size())
  //   // {
  //   //   std::cout<<"任务码错误,请重新输入"<<std::endl;
  //   //   continue;
  //   // }


  //   pp->PushTask(code);
  // }


  pp->Stop();

  return 0;
}

#endif

五、进阶优化:让进程池更实用

我们手搓的是基础版进程池,实际生产中可根据需求优化以下几点:

  1. 动态调整进程池大小:根据任务量自动增加/减少子进程(避免空闲子进程浪费资源);

  2. 任务类型扩展:支持传入函数指针和参数,让进程池能处理不同类型的任务(而非固定打印);

  3. 非阻塞添加任务:任务队列满时,返回错误而非阻塞,提高主进程灵活性;

  4. 信号处理优化(后面会给大家讲解):处理SIGCHLD信号,及时回收异常退出的子进程,并重新创建子进程,保证进程池稳定性;

  5. 使用更高效的IPC:将管道替换为消息队列(支持消息优先级)或共享内存(更高吞吐量)。


六、常见坑点与注意事项

  • 管道读写阻塞:子进程read()会阻塞,主进程write()在管道满时也会阻塞,需根据需求调整为非阻塞;

  • 僵尸进程:必须用waitpid()回收子进程,尤其是子进程异常退出时,避免僵尸进程占用资源;

  • 管道关闭顺序:主进程需先关闭写端,子进程才能读取到EOF并正常退出;

  • 内存泄漏:进程池销毁时,必须释放所有分配的内存(child_pids、task_queue、pool本身);

  • 任务队列溢出:需设置合理的max_task,避免任务堆积导致内存溢出。


七、总结

通过手搓这个基础版Linux进程池,我们搞懂了进程池的核心逻辑——提前创建子进程、复用子进程、统一管理任务,本质上是用空间换时间,减少进程创建/销毁的开销。

本文的代码虽然简单,但涵盖了进程池的核心流程,你可以在此基础上进行扩展,适配实际业务场景(如Web服务器的请求处理、后台任务调度等)。

如果运行过程中遇到问题,可重点检查管道操作、子进程回收和任务队列的逻辑,这些都是实现进程池的关键。动手敲一遍代码,你会对Linux进程控制和IPC有更深刻的理解~

最后,附上完整代码链接,方便大家直接测试和修改:thread pool

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐