前言

在这里插入图片描述


一、理解五种IO模型

先搞懂核心,IO 的本质:IO=等+拷贝

  • :等待内核把数据准备好
  • 拷贝:把数据从内核拷贝到应用层

所有等待的时间决定了IO的效率,这五种 IO模型的区别在与:进程在「等待数据就绪」和「数据从内核复制到用户空间」这两个阶段,是否阻塞、由谁来完成。

阻塞I/O(Blocking IO)

  • 特点
    在内核没有把数据准备好之前,系统调用会一直在等待(阻塞),直到数据准备好,应用层才能拿到数据。这样简单,好写,易上手,但是一个连接占用一个现成,高并发不行。
  • 过程

进程调用 recv

【阻塞等待】内核:等网络数据来

数据到内核

【继续阻塞】内核 → 拷贝到用户空间

拷贝完成,进程返回

在这里插入图片描述

非阻塞I/O(Non-blocking IO)

  • 特点
    ⾮阻塞IO往往需要程序员循环的⽅式反复尝试读写⽂件描述符, 这个过程称为轮询. 这对CPU来说是较
    ⼤的浪费, ⼀般只有特定场景下才使⽤。
  • 过程

进程循环调用 recv

内核没数据 → 立刻返回 EAGAIN

过会儿再调 recv

内核没数据 → 又返回 EAGAIN

…(反复轮询)…

数据到内核

【阻塞一下】内核 → 拷贝到用户

返回成功

在这里插入图片描述

IO 多路转接(select /poll/epoll)

  • 特点
    虽然从流程图上看起来和阻塞IO类似. 实际上最核⼼在于IO多路转接能够同时等待多个⽂件描述符的就绪状态。是真正的高并发模型。
  • 过程

进程把一堆 socket 交给 select/epoll

【阻塞在 select】内核统一监视所有 socket

某个 socket 数据到了

select 返回:告诉你哪个就绪

进程调用 recv

【快速阻塞】内核 → 拷贝到用户

处理完成

在这里插入图片描述

信号驱动 IO(Signal Driven IO)

  • 特点
    内核将数据准备好的时候, 使⽤SIGIO信号通知应⽤程序进⾏IO操作。这样不用轮询,不用阻塞在 select,但信号机制复杂、不稳定、用得少。

  • 过程

进程注册 SIGIO 信号

进程继续干别的,不阻塞

内核等到数据 → 发 SIGIO 信号

进程收到信号,调用 recv

【阻塞】内核 → 拷贝到用户

完成

在这里插入图片描述

异步 I/O(真正的异步,POSIX AIO)

  • 特点:由内核在数据拷⻉完成时, 通知应⽤程序(⽽信号驱动是告诉应⽤程序何时可以开始拷⻉数
    据)。
  • 过程

进程调用 aio_read,告诉内核: 数据放哪里,完成通知我

进程立刻返回,继续干别的

内核自己等数据

内核自己拷贝数据到用户

全部搞定 → 通知进程

在这里插入图片描述

二、I/O多路转接select

select核心定位与作用

select 是Linux 系统中最早标准化的 I/O 多路复用系统调用,其核心设计目标是:允许单个进程/线程同时监听多个文件描述符(File Descriptor, fd),当其中任意一个文件描述符处于就绪状态(可读、可写或异常)时,系统会通知应用程序进行对应处理,从而避免进程因阻塞在单个 I/O 操作上而浪费资源,实现单线程处理多并发连接的需求。在网络编程场景中,select 常用于服务器端同时监听多个客户端连接,无需为每个连接创建独立的线程或进程,有效降低了系统的资源开销,是早期网络服务(如简单 HTTP 服务器)的核心依赖。

函数原型和头文件依赖

#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>

int select(int nfds,
           fd_set *readfds,
           fd_set *writefds,
           fd_set *exceptfds,
           struct timeval *timeout
           );

:不同系统的头文件可能存在细微差异,上述为 Linux 系统下的标准引入方式,BSD 系统可能需额外引入相关头文件。

参数深度解析

  1. int nfds
    该参数表示本次监听的所有文件描述符中的最大值 + 1,其核心作用是告知内核:需要遍历检查的文件描述符范围(从 0 到 nfds-1)。

注意事项nfds 必须严格设置为“最大 fd + 1”,若设置过小,会导致大于等于 nfds 的文件描述符无法被监听;若设置过大,则会增加内核的遍历开销,降低效率。

  1. fd_set *readfds
    可读文件描述符集合,用于监听哪些文件描述符存在“可读事件”。所谓可读事件,通常包括:
  • socket 客户端发起连接(listen 状态的 socket 就绪);
  • socket 收到客户端发送的数据(普通fd上的读事件就绪);

若应用程序不关心可读事件,可传入 nullptr,表示不监听任何可读文件描述符。

  1. fd_set *writefds
    可写文件描述符集合,用于监听哪些文件描述符存在“可写事件”。可写事件主要指:
    文件描述符对应的输出缓冲区未满,允许应用程序写入数据(如 socket 可发送数据、管道写端可写入数据)。
    与 readfds 类似,不关心可写事件时可传入nullptr。

  2. fd_set *exceptfds
    异常文件描述符集合,用于监听文件描述符的异常事件,在大多数常规网络编程场景中,异常事件较少用到,因此通常传入 nullptr。

  3. struct timeval *timeout
    超时时间结构体,用于设置 select 的阻塞时长,其定义如下:

struct timeval {
  __time_t tv_sec;		/* Seconds.  */ 秒级超时时间
  __suseconds_t tv_usec;	/* Microseconds.  */  微秒级超时时间(1= 1000000 微秒)
  
   // long tv_sec;   //这是最初样子,上面只是被宏替换了
   // long tv_usec; 
};

根据 timeout 的不同取值,select 有三种工作模式:

  • timeout = nullptr:永久阻塞模式,select 会一直阻塞,直到至少有一个文件描述符就绪。
  • timeout = {0, 0}:非阻塞模式,select 不会阻塞,立即返回当前就绪的文件描述符数量(可能为 0);
  • timeout 为具体时间(如 {5, 0}):定时阻塞模式,select 阻塞指定的时间,若超时前有文件描述符就绪,则立即返回;若超时仍无就绪 fd,则返回 0。

fd_set 集合操作接口(必用工具)

fd_set 本质是一个位图结构(固定大小的整数数组),用于存储需要监听的文件描述符。由于其底层实现依赖系统内核,应用程序无法直接操作位图,因此操作系统提供了一组标准宏接口,用于对 fd_set 进行初始化、添加、删除和判断操作,具体如下:

// 清空 fd_set 集合,将所有位设为 0(初始化必用)
void FD_ZERO(fd_set *set);

// 将指定的文件描述符 fd 加入到 fd_set 集合中
void FD_SET(int fd, fd_set *set);

// 将指定的文件描述符 fd 从 fd_set 集合中移除
void FD_CLR(int fd, fd_set *set);

// 判断 fd 是否在 fd_set 集合中(用于 select 返回后,判断 fd 是否就绪)
int FD_ISSET(int fd, fd_set *set);
  • 使用规范流程
    每次调用 select 前,必须先通过 FD_ZERO 清空集合,再通过 FD_SET 添加需要监听的 fd;select 返回后,通过 FD_ISSET 遍历判断哪些 fd 处于就绪状态。

返回值含义

select 的返回值直接反映了调用结果,应用程序需根据返回值进行不同的逻辑处理,具体含义如下:

  • 返回值 > 0:调用成功,返回值表示本次就绪的文件描述符总数(注意:不是就绪的 fd 列表,需通过 FD_ISSET 遍历判断)。
  • 返回值 = 0:超时,在指定的 timeout 时间内,没有任何文件描述符就绪。
  • 返回值 = -1:调用失败,此时系统会设置 errno 来标识具体错误原因。
    #define __FD_SETSIZE 1024

select 核心缺陷(面试高频 + 实践痛点)

作为早期的 I/O 多路复用方案,select 存在诸多设计缺陷,这些缺陷导致其无法适应高并发场景,也是后续 poll、epoll 诞生的核心原因。下面逐一拆解这些缺陷的本质的影响。

1.最大文件描述符限制(FD_SETSIZE = 1024)

fd_set 是固定大小的位图,其大小由系统宏 FD_SETSIZE 定义,默认值为 1024。这意味着 select最多只能监听 1024 个文件描述符,超过该数量的 fd 无法被监听。

虽然可以通过修改内核参数重新定义 FD_SETSIZE,但会导致 fd_set 占用的内存增大,进一步增加内核拷贝和遍历的开销,且修改后需重新编译应用程序,扩展性极差,无法满足高并发(如万级、百万级连接)场景的需求。

2.每次调用需全量拷贝 fd 集合(用户态 ↔ 内核态)

每次调用 select 时,应用程序都需要将 readfds、writefds、exceptfds 三个集合全量拷贝到内核空间;内核处理完成后,又需要将标记后的集合拷贝回用户态。

这种拷贝操作的开销与 fd 数量正相关,当 fd 数量达到上千个时,拷贝开销会成为系统性能的瓶颈,严重影响程序的响应速度。

3. 内核遍历效率低下(O(n) 时间复杂度)

内核每次处理 select 时,都需要线性遍历从 0 到 nfds-1 的所有 fd,无论这些 fd 是否就绪。这种遍历方式的时间复杂度为 O(n),当 fd 数量较多(如上千个)时,即使只有少数几个 fd 就绪,内核也需要遍历所有 fd,效率极低。

例如,当监听 1000 个 fd,仅 1 个 fd 就绪时,内核仍需遍历 1000 个 fd,大部分遍历操作都是无效的,浪费大量 CPU 资源。

4. fd_set 集合被内核覆盖,需重复初始化

内核在处理 select 时,会将标记后的就绪 fd 覆盖原有的 fd_set 集合,导致原集合中的未就绪 fd 被清空。因此,每次调用 select前,应用程序都必须重新通过 FD_ZERO 和 FD_SET 初始化 fd_set 集合,增加了代码的复杂度,也容易出现遗漏监听 fd 的错误。

5. 仅支持水平触发(LT),无高效事件通知机制

select 仅支持水平触发(Level Trigger, LT)模式,即只要 fd 对应的缓冲区有数据(可读)或有空闲空间(可写),select 就会持续返回该 fd 就绪。这种模式虽然安全,但在高并发场景下,会导致 select 频繁返回同一个就绪 fd,增加不必要的系统调用开销。
此外,select 采用“轮询式”检测 fd 就绪状态,内核无法主动通知应用程序哪个 fd 就绪,只能通过遍历的方式查找,缺乏高效的事件通知机制。

select 实践示例(完整代码)

下面给出一个基于 select 的简单 TCP 服务器示例和客服端,一共有七个文件:Common.hpp(禁止拷贝和错误码设置),InetAddr.hpp(端口和ip的转换),SelectServer.hpp(select服务器开源),SelectServer.cc(服务器),Socket.hpp(套接字的封装),SelectClient.cc(客户端)。makefile(自动编译文件)

SelectServer.hpp(select服务器开源)

#pragma once
#include <iostream>
#include <memory>
#include <sys/select.h>
#include <unistd.h>
#include "Socket.hpp"
class SelectServer
{
    const static int size = sizeof(fd_set) * 8;
    const static int defaultfd = -1;

public:
    SelectServer(uint16_t port) : listensockfd(std::make_unique<TcpSocket>()), _port(port), isrunning(false)
    {
        listensockfd->TcpSocketBulid(port);

        for (int i = 0; i < size; i++) // 初始化数组
        {
            fd_arr[i] = defaultfd;
        }
        fd_arr[0] = listensockfd->Socketfd(); // 默认0号下标是listen监听套接字
    }
    void Accepter() // 连接管理器
    {
        InetAddr client;
        int fd = listensockfd->Accept(&client); // 这里accept不会阻塞,相当于把等和拷贝分开处理的
        if (fd > 0)
        {
            std::cout << "获取一个新链接" << fd << std::endl;
        }
        // 不可以直接使用fd,因为不知道fd上面的读事件是否已经就绪了,需要重新交给select关心fd上的读事件,那如何托管呢?这里需要一个辅助数组。添加到辅助数组里面
        int pos = 0;
        for (; pos < size; pos++)
        {
            if (fd_arr[pos] == defaultfd)
                break;
        }
        if (pos == size)
        {
            std::cout << "select listen full" << std::endl;
            close(fd);
        }
        else
        {
            fd_arr[pos] = fd;
        }
    }
    void Recver(int fd, int pos) // IO处理器
    {
        char buffer[1024];
        int s = recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (s > 0)
        {
            buffer[s] = 0;
            std::string server = "server echo ";
            std::cout << buffer << std::endl;

            server += buffer;
            int n = send(fd_arr[pos], server.c_str(), server.size(), 0);
            (void)n;
        }
        else if (s == 0)
        {

            std::cout << "client quit " << std::endl;
            fd_arr[pos] = defaultfd;
            close(fd);
        }
        else
        {
            std::cout << "Recver errror " << std::endl;
            fd_arr[pos] = defaultfd;
            close(fd);
        }
    }
    void HanderEvent(fd_set &rfds) // 事件派发
    {
        //
        for (int i = 0; i < size; i++) // 判断是listensockfd上有新链接就绪,还是普通的fd上的读事件就绪
        {
            if (fd_arr[i] == defaultfd)
                continue;
            if (FD_ISSET(fd_arr[i], &rfds)) // 判断fd在不在rfds这个集合里面
            {
                if (fd_arr[i] == listensockfd->Socketfd())
                {
                    // listensockfd新链接到来
                    std::cout << "listencockfd reading" << std::endl;
                    Accepter();
                }
                else
                {
                    // 普通fd读事件就绪
                    std::cout << "fd reading" << std::endl;
                    Recver(fd_arr[i], i);
                }
            }
        }
    }
    void fdprint()
    {
        std::cout << "fd_arr[]:";
        for (int i = 0; i < size; i++)
        {
            if (fd_arr[i] == defaultfd)
                continue;
            std::cout << fd_arr[i] << " ";
        }
        std::cout << std::endl;
    }
    void Start()
    {
        isrunning = true;
        while (isrunning)
        {
            // 这里不能直接accept是因为accept是阻塞io,不符合select多路转接的理念,因为listensockfd也是一个fd,那将来系统是怎样知道listensockfd上有新链接到来呢?
            // 将listensockfd添加到select内部,让OS帮我关心listensockfd上面的读事件(默认有链接到来读事件就绪)
            fd_set rfds;
            FD_ZERO(&rfds);
            int maxfd = defaultfd;
            for (int i = 0; i < size; i++)
            {
                if (fd_arr[i] == defaultfd)
                    continue;
                FD_SET(fd_arr[i], &rfds); // 把fd添加到fd_set数据集里面

                // 最大fd
                fd_arr[i] > maxfd ? maxfd = fd_arr[i] : maxfd = maxfd;
            }
            fdprint();
            struct timeval timeout = {2, 0};
            //  最大fd:一定是变化的
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr); // select返回之后,rfds要被重置,那么你怎么找到那些fd要添加到rfds中呢?需要辅助数组
            switch (n)
            {
            case -1:
                std::cout << "select error" << std::endl;

                break;
            case 0:
                std::cout << "timeout ....." << std::endl;
                break;
            default:
                std::cout << "有事件就绪" << n << std::endl;
                HanderEvent(rfds); // 处理就绪事件,有两种情况,一是监听套接字有链接到来,二是普通fd上的读是事件就绪
                break;
            }
        }
    }
    ~SelectServer() {}

private:
    std::unique_ptr<Socket> listensockfd;
    uint16_t _port;
    bool isrunning;
    int fd_arr[size];
};

SelectServer.cc(服务器)

#include <iostream>
#include "Socket.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"
#include "SelectServer.hpp"
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cout << argv[0] << " port" << std::endl;
        exit(USAGE_ERR);
    }

    uint16_t port = std::stoi(argv[1]);

    std::unique_ptr<SelectServer> strv = std::make_unique<SelectServer>(port);
    strv->Start();

    return 0;
}

SelectClient.cc(客户端)

#include <iostream>
#include "InetAddr.hpp"
#include "Common.hpp"
#include "SelectServer.hpp"
#include "Socket.hpp"
int main(int argc, char *argv[])
{
    uint16_t port = std::stoi(argv[2]);
    std::string ip = argv[1];

    std::unique_ptr<Socket> sock = std::make_unique<TcpSocket>();
    sock->SocketBuild();

    int n = sock->Connect(ip, port);
    if (n < 0)
    {
        std::cout << "connect error" << std::endl;
        exit(CONNECT_ERR);
    }
    while (true)
    {
        std::cout << "please Enter: " << std::endl;
        std::string line;
        std::getline(std::cin, line);
        int w = send(sock->Socketfd(), line.c_str(), line.size(), 0);
        (void)w;

        char buffer[1024];
        memset(&buffer, 0, sizeof(buffer));
        int s = recv(sock->Socketfd(), buffer, sizeof(buffer) - 1, 0);
        if (s > 0)
        {
            buffer[s] = 0;

            std::cout << buffer << std::endl;
        }
        else if (s == 0)
        {
            std::cout << "server quit " << std::endl;
            break;
        }
        else
        {
            std::cout << "recv error " << std::endl;
            break;
        }
    }
    return 0;
}

Common.hpp(禁止拷贝和错误码设置)

#include <iostream>
#pragma once
#include <iostream>
#include <iostream>
#include <functional>
#include <unistd.h>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>

enum Exitcode
{
    OK = 0,
    USAGE_ERR,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR,
    CONNECT_ERR,
    FORK_ERR

};
class NoCopy
{
public:
    NoCopy() {}
    NoCopy(const NoCopy &) = delete;
    const NoCopy &operator=(const NoCopy &) = delete;
    ~NoCopy() {}
};
#define CONV(addr) ((struct sockaddr *)&addr)

InetAddr.hpp(端口和ip的转换)

#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include "Common.hpp"
class InetAddr
{
public:
    InetAddr() {}
    InetAddr(const struct sockaddr_in &addr) // 网络格式转换成本地格式
        : _addr(addr)
    {
        SetAddr(_addr);
    }

    InetAddr(uint16_t port)
        : _port(port)
    {

        bzero(&_addr, sizeof(_addr)); // 清空sockaddr_in
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port); // 本地格式转化成网络格式
        _addr.sin_addr.s_addr = INADDR_ANY;
    }
    InetAddr(std::string ip, uint16_t port)
        : _ip(ip), _port(port)
    {
        memset(&_addr, 0, sizeof(_addr));
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port);
        _addr.sin_addr.s_addr = inet_addr(_ip.c_str());
    }
    std::string StringAddr() const
    {
        return _ip + ":" + std::to_string(_port);
    }
    bool operator==(const InetAddr &addr)
    {
        return addr._ip == _ip && addr._port == _port;
    }

    // 返回port和ip
    uint16_t port() { return _port; }
    std::string ip() { return _ip; }

    const struct sockaddr_in &NetAddr() const { return _addr; }

    const struct sockaddr *NetAddrPtr() const { return CONV(_addr); }

    socklen_t InetAddrLen() { return sizeof(_addr); }
    void SetAddr(struct sockaddr_in &addr)
    {
        _port = ntohs(addr.sin_port);
        //_ip = inet_ntoa(_addr.sin_addr); // 四字节网络ip风格转换成点分十进制
        char ipbuffer[64];
        inet_ntop(AF_INET, &addr.sin_addr, ipbuffer, sizeof(ipbuffer));
        _ip = ipbuffer;
    }

    ~InetAddr() {}

private:
    struct sockaddr_in _addr;
    uint16_t _port;
    std::string _ip;
};

Socket.hpp(套接字的封装)

#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include <unistd.h>
#include <memory>
#include "Common.hpp"
#include "InetAddr.hpp"

class Socket : public NoCopy
{
public:
    Socket() {}
    virtual void SocketBuild() = 0;
    virtual void SocketBind(uint16_t poer) = 0;
    virtual void SocketListen() = 0;
    virtual int Accept(InetAddr *client) = 0;
    virtual int Socketfd() = 0;
    virtual void Close() = 0;
    virtual int Recv(std::string *out) = 0;
    virtual ssize_t Write(std::string &line) = 0;
    virtual int Connect(std::string &server_ip, uint16_t server_port) = 0;

    void TcpSocketBulid(uint16_t port)
    {
        SocketBuild();
        SocketBind(port);
        SocketListen();
    }
    void UdpSocketBuild(uint16_t port)
    {
        SocketBuild();
        SocketBind(port);
    }
    ~Socket() {}

private:
};
const static int sockfd = -1;
class TcpSocket : public Socket
{
public:
    TcpSocket() : _socket(sockfd) {}
    // TcpSocket(int fd) : _socket(fd) {}
    void SocketBuild() override
    {
        _socket = ::socket(AF_INET, SOCK_STREAM, 0);
        if (_socket < 0)
        {
            std::cout << "socket error" << std::endl;
            exit(SOCKET_ERR);
        }
        std::cout << "socket success:" << _socket << std::endl;
    }
    void SocketBind(uint16_t port) override
    {
        InetAddr peer(port);
        int b = ::bind(_socket, peer.NetAddrPtr(), peer.InetAddrLen());
        if (b < 0)
        {
            std::cout << "BIND error" << strerror(errno) << std::endl;
            exit(BIND_ERR);
        }
        std::cout << "bind success:" << _socket << std::endl;
    }
    void SocketListen() override
    {
        int n = ::listen(_socket, backlog);
        if (n < 0)
        {
            std::cout << "listen error" << std::endl;
            exit(LISTEN_ERR);
        }
        std::cout << "listen success:" << _socket << std::endl;
    }
    int Accept(InetAddr *client) override
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int fd = ::accept(_socket, CONV(peer), &len);
        if (fd < 0)
        {
            std::cout << "accept error" << std::endl;
            return -1;
        }
        else
            client->SetAddr(peer);
        return fd;
    }
    int Connect(std::string &server_ip, uint16_t server_port) override
    {
        InetAddr client(server_ip, server_port);
        return connect(_socket, client.NetAddrPtr(), client.InetAddrLen());
    }
    int Recv(std::string *out) override
    {
        char buffer[1024];
        ssize_t s = read(_socket, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s] = 0;
            *out += buffer;
        }
        return s;
    }
    ssize_t Write(std::string &line) override
    {
        return write(_socket, line.c_str(), line.size());
    }
    int Socketfd() override { return _socket; }
    void Close() override
    {
        if (_socket >= 0)
        {
            ::close(_socket);
        }
    }
    ~TcpSocket() {}

private:
    int _socket;
    int backlog = 253;
};

makefile(自动编译文件)

.PHONY:all
all: selectserver selectclient

selectserver:SelectServer.cc
	g++ -o $@ $^ -std=c++17
selectclient:SelectClient.cc
	g++ -o $@ $^ -std=c++17
.PYONY:clean
clean:
	rm -f selectserver selectclient

select 作为 I/O 多路复用技术的鼻祖,其核心价值在于实现了“单线程监听多 fd”的需求,为早期网络编程提供了高效的解决方案。但受限于时代背景,其设计存在诸多缺陷,尤其是最大 fd 限制、O(n) 遍历效率、全量拷贝等问题,导致其无法适应现代高并发场景。

三、多路转接pool

poll 核心定位与作用

poll 是 Linux 系统中一款经典的 I/O 多路复用系统调用,其核心设计目标与 select 一致:允许单个进程/线程同时监听多个文件描述符(File Descriptor, fd),当任意一个文件描述符处于就绪状态(可读、可写或异常)时,系统通知应用程序进行对应处理,从而避免进程阻塞在单个 I/O 操作上,实现单线程处理多并发连接的需求。

与 select 相比,poll 最大的改进的是突破了文件描述符数量的固定限制,同时优化了事件管理的接口设计,降低了编程复杂度,适用于中等并发场景(如几千级连接),是 select 到 epoll 演进过程中承上启下的关键方案。

函数原型与头文件依赖

使用 poll 需引入对应的系统头文件,其标准函数原型如下,接口设计相比 select 更简洁,无需区分三个独立的文件描述符集合:

#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数深度解析

poll 的参数数量少于 select,核心是通过结构体数组管理所有待监听的文件描述符及事件,每个参数的作用与使用注意事项如下:

  1. struct pollfd *fds
    指向 struct pollfd 结构体数组的指针,该数组用于存储所有需要监听的文件描述符及其对应的监听事件,是 poll 的核心数据结构。与 select 的三个 fd_set 位图不同,poll 用单个结构体数组整合了所有监听信息,接口更简洁、灵活。

核心结构体 struct pollfd 的定义及各字段含义如下(内核标准定义):

struct pollfd {
    int   fd;      // 要监听的文件描述符
    short events;  // 输入参数:需要监听的事件(位掩码组合)
    short revents; // 输出参数:实际发生的事件(由内核填充)
};

简单总结:你通过 “位掩码组合” 告诉内核要监听什么,内核通过 “填充 revents” 告诉你实际发生了什么。

  • fd:待监听的文件描述符,若该值为负数(如 -1),则对应的 events 字段会被内核忽略,revents 字段返回 0,可用于标记数组中的空闲槽位,无需频繁调整数组大小;
  • events:输入参数,用于指定需要监听的事件,通过位掩码组合实现多事件监听,常见事件宏见下文详解;
  • revents:输出参数,由内核填充,用于返回该文件描述符实际发生的事件,仅在poll 调用成功后有效,其值可能包含 events 中指定的事件,也可能包含内核自动检测的错误、挂起等事件(如 POLLERR)。
  1. nfds_t nfds
    该参数表示 fds 数组中有效结构体的数量(即实际需要监听的文件描述符个数),类型为 nfds_t(无符号整型)。
    注意事项:与 select 的 nfds(最大 fd + 1)不同,poll 的 nfds 直接指定数组中有效元素的个数,无需计算最大 fd,内核仅遍历前 nfds 个结构体,减少了无效遍历开销。
  2. int timeout
    超时时间(单位:毫秒),用于设置 poll 的阻塞时长,其取值规则与 select 的 timeout 基本一致,但无需借助 struct timeval 结构体,使用更简洁,具体分为三种模式:
  • timeout = -1:永久阻塞模式,poll 会一直阻塞,直到至少有一个文件描述符就绪,或被信号中断(如收到 SIGINT 信号);
  • timeout = 0:非阻塞模式,poll 不会阻塞,立即返回当前就绪的文件描述符数量(可能为 0);
  • timeout > 0:定时阻塞模式,poll 阻塞指定的毫秒数,若超时前有文件描述符就绪,则立即返回;若超时仍无就绪 fd,则返回 0。
    注意:若 timeout 取值为负数(除 -1 外),则属于无效参数,poll 调用失败,errno 会被设置为 EINVAL。

常用事件宏(events / revents 核心)

poll 的事件通过位掩码宏定义实现,可在 events 中组合多个事件(用 | 连接),内核会在 revents 中返回实际发生的事件。与 select 相比,poll 的事件宏更丰富,且无需区分读、写、异常三个集合,统一通过 events 字段指定,以下是最常用的事件宏:

事件宏 含义 使用场景
POLLIN 有普通或优先级带数据可读 客户端发数据、新连接到达、标准输入有数据
POLLOUT 有普通数据可写(输出缓冲区未满) socket 可发送数据、管道可写入数据
POLLERR 文件描述符发生错误 连接异常、写入失败等
POLLHUP 连接挂起(对端关闭连接) 客户端主动断开 TCP 连接
POLLNVAL 文件描述符无效(未打开或非法) fd 已关闭仍被监听
POLLPRI 有紧急(高优先级)数据可读 TCP 带外数据(OOB 数据)到达

使用示例:监听 fd 的可读和紧急数据事件,可设置 events = POLLIN | POLLPRI;内核会在 revents 中标记实际发生的事件,应用程序通过位与(&)判断具体事件类型。

返回值的含义

poll 的返回值与 select 逻辑一致,直接反映调用结果,应用程序需根据返回值进行对应处理,具体含义及错误场景如下:

  • 返回值 > 0:调用成功,返回值表示本次就绪的文件描述符总数(即 revents 字段非 0 的 pollfd 结构体个数),注意:返回值是“就绪 fd 总数”,不是具体的 fd 列表,需遍历数组判断哪些 fd 就绪;
  • 返回值 = 0:超时,在指定的 timeout 时间内,没有任何文件描述符就绪;
  • 返回值 = -1:调用失败,此时系统会设置errno 来标识具体错误原因,常见错误如下:
    • EINTR:调用被信号中断,可重新调用 poll;
    • EINVAL:nfds 超过系统限制,或 timeout 取值无效(负数且非 -1);
    • EBADF:fds 数组中包含无效的文件描述符(如已关闭的 fd);
    • EFAULT:fds 指针指向进程不可访问的地址空间;
    • ENOMEM:内核无法分配足够的内存用于存储相关数据结构。

poll 核心特性与缺陷(对比 select)

poll 是 select 的改良版,解决了 select 的部分核心缺陷,但仍存在未解决的性能瓶颈,以下从“优势”和“缺陷”两方面详细说明,同时对比 select,帮助理解其定位。

poll 相比 select 的核心优势

  • 突破文件描述符数量限制:poll 用动态数组(struct pollfd 数组)管理 fd,无固定数量限制(仅受系统内存和进程文件描述符限制),彻底解决了 select 中 FD_SETSIZE(默认 1024)的限制,可支持几千级并发连接;
  • 接口设计更简洁、友好:无需区分读、写、异常三个独立的集合,用单个 pollfd 结构体整合 fd 和事件信息,减少了代码冗余;同时 events(输入)和 revents(输出)分离,无需每次调用前重新初始化监听事件,仅需清零 revents 即可;
  • 遍历效率更高:poll 仅遍历数组中前 nfds 个有效元素,而 select 需遍历从 0 到 maxfd 的所有 fd,减少了无效遍历开销,尤其当 fd 分布不连续时,优势更明显;
  • 事件表达更丰富:poll 支持更多的事件类型(如 POLLPRI 紧急数据事件),且能自动检测错误、挂起等事件,无需像 select 那样通过单独的异常集合监听;
  • 兼容性更强:遵循 POSIX 标准,在不同 Linux、UNIX 系统中实现统一,无需适配不同的头文件和参数格式。

poll 仍存在的核心缺陷(未解决 select 的本质问题)

  • 内核遍历效率仍为 O(n):与 select 一样,poll 每次调用时,内核仍需线性遍历所有有效 fd,时间复杂度为 O(n),当 fd 数量达到上万级时,遍历开销会显著增加,性能急剧下降;
  • 每次调用需全量拷贝数据:与 select 类似,poll 每次调用时,需将整个 pollfd 数组从用户态拷贝到内核态,拷贝开销与 fd 数量正相关,当 fd 数量较多时,拷贝开销会成为性能瓶颈;
  • 用户态仍需遍历所有 fd:poll 仅返回就绪 fd 的总数,不返回具体的就绪 fd 列表,应用程序仍需遍历整个 pollfd 数组,判断每个 fd 是否就绪,进一步增加了用户态的开销;
  • 仅支持水平触发(LT):与 select 一致,poll 仅支持水平触发模式,即只要 fd 对应的缓冲区有数据(可读)或有空闲空间(可写),poll 就会持续返回该 fd 就绪,在高并发场景下,会导致频繁的无效系统调用;
  • 数组管理繁琐:虽然 pollfd 数组可动态扩展,但当 fd 频繁连接和断开时,需要手动管理数组中的空闲槽位(如用 fd = -1 标记),增加了编程复杂度。

poll实例示例(实现服务端和客户端)

Common.hpp,InetAddr.hpp,PollServer.cc,PollServer.hpp,PollClient.cc,Socket.hpp,Makefile

Common.hpp

#include <iostream>
#pragma once
#include <iostream>
#include <iostream>
#include <functional>
#include <unistd.h>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>

enum Exitcode
{
    OK = 0,
    USAGE_ERR,
    SOCKET_ERR,
    BIND_ERR,
    LISTEN_ERR,
    CONNECT_ERR,
    FORK_ERR

};
class NoCopy
{
public:
    NoCopy() {}
    NoCopy(const NoCopy &) = delete;
    const NoCopy &operator=(const NoCopy &) = delete;
    ~NoCopy() {}
};
#define CONV(addr) ((struct sockaddr *)&addr)

InetAddr.hpp

#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <cstring>
#include "Common.hpp"
class InetAddr
{
public:
    InetAddr() {}
    InetAddr(const struct sockaddr_in &addr) // 网络格式转换成本地格式
        : _addr(addr)
    {
        SetAddr(_addr);
    }

    InetAddr(uint16_t port)
        : _port(port)
    {

        bzero(&_addr, sizeof(_addr)); // 清空sockaddr_in
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port); // 本地格式转化成网络格式
        _addr.sin_addr.s_addr = INADDR_ANY;
    }
    InetAddr(std::string ip, uint16_t port)
        : _ip(ip), _port(port)
    {
        memset(&_addr, 0, sizeof(_addr));
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port);
        _addr.sin_addr.s_addr = inet_addr(_ip.c_str());
    }
    std::string StringAddr() const
    {
        return _ip + ":" + std::to_string(_port);
    }
    bool operator==(const InetAddr &addr)
    {
        return addr._ip == _ip && addr._port == _port;
    }

    // 返回port和ip
    uint16_t port() { return _port; }
    std::string ip() { return _ip; }

    const struct sockaddr_in &NetAddr() const { return _addr; }

    const struct sockaddr *NetAddrPtr() const { return CONV(_addr); }

    socklen_t InetAddrLen() { return sizeof(_addr); }
    void SetAddr(struct sockaddr_in &addr)
    {
        _port = ntohs(addr.sin_port);
        //_ip = inet_ntoa(_addr.sin_addr); // 四字节网络ip风格转换成点分十进制
        char ipbuffer[64];
        inet_ntop(AF_INET, &addr.sin_addr, ipbuffer, sizeof(ipbuffer));
        _ip = ipbuffer;
    }

    ~InetAddr() {}

private:
    struct sockaddr_in _addr;
    uint16_t _port;
    std::string _ip;
};

PollServer.cc

#include <iostream>
#include "InetAddr.hpp"
#include "Common.hpp"
#include "PollServer.hpp"
int main(int argc, char *argv[])
{
    if(argc!=2)
    {
        std::cout << argv[0] << "port" << std::endl;
        exit(USAGE_ERR);
    }
    uint16_t port = std::stoi(argv[1]);
    std::unique_ptr<PollServer> strv = std::make_unique<PollServer>(port);
    strv->Start();
    return 0;
}

PollServer.hpp(动态扩容版本)

#include <iostream>
#include <sys/poll.h>
#include "InetAddr.hpp"
#include "Socket.hpp"
#include "Common.hpp"
class PollServer
{
    const static int defaultfd = -1;

public:
    PollServer(uint16_t port) : _listensocket(std::make_unique<TcpSocket>()), _port(port), isrunning(false), old_cap(1)
    {
        _listensocket->TcpSocketBulid(port);
        arr = (struct pollfd *)malloc(old_cap * sizeof(struct pollfd));
        if (arr == nullptr)
            return;

        for (int i = 0; i < old_cap; i++)
        {
            arr[i].fd = defaultfd;
            arr[i].events = 0;
            arr[i].revents = 0;
        }
        arr[0].fd = _listensocket->Socketfd();
        arr[0].events = POLLIN; // 用户告诉内核需要帮我注意可读事件
    }
    struct pollfd *pollfd_arr_malloc(struct pollfd *arr, int &old_cap)
    {
        // 扩容
        int capacity = 2 * old_cap;
        struct pollfd *tmp = (struct pollfd *)malloc(capacity * sizeof(struct pollfd));
        if (tmp == nullptr)
            return nullptr;

        // 拷贝旧数组数据到新数组
        if (arr != NULL && old_cap > 0)
        {
            memcpy(tmp, arr, old_cap * sizeof(struct pollfd)); // 这里虽然是浅拷贝,但是不会有内存泄漏,因为struct pollfd中并没有指针,所以memcpy完全可以
        }
        for (int i = old_cap; i < capacity; i++)
        {
            tmp[i].fd = defaultfd;
            tmp[i].events = 0;
            tmp[i].revents = 0;
        }
        old_cap = capacity;
        free(arr);
        arr = nullptr;
        return tmp;
    }
    void Accept()
    {

        InetAddr client;
        int sockfd = _listensocket->Accept(&client);
        // 不能直接读,不知道fd上面的读事件是否就绪,要让内核帮我们关心一下
        int i = 0;
        for (; i < old_cap; i++)
        {
            if (arr[i].fd == defaultfd)
                break;
        }
        if (i == old_cap)
        {
            std::cout << "到这里已经扩容了" << std::endl;
            arr = pollfd_arr_malloc(arr, old_cap);
            std::cout << old_cap << std::endl;
        }
        arr[i].fd = sockfd;
        arr[i].events = POLLIN;
        arr[i].revents = 0;
    }
    void Recv(int pos)
    {
        char buffer[1024];
        memset(&buffer, 0, sizeof(buffer));
        int s = recv(arr[pos].fd, buffer, sizeof(buffer) - 1, 0);
        if (s > 0)
        {
            buffer[s] = 0;
            std::string server = "server echo ";
            std::cout << server << buffer << std::endl;

            server += buffer;
            int n = send(arr[pos].fd, server.c_str(), server.size(), 0);
            (void)n;
        }
        else if (s == 0)
        {
            std::cout << "client quit " << std::endl;
            close(arr[pos].fd);
            arr[pos].fd = defaultfd;
            arr[pos].events = 0;
            arr[pos].revents = 0;
        }
        else
        {
            std::cout << "client read error " << std::endl;
            close(arr[pos].fd);
            arr[pos].fd = defaultfd;
            arr[pos].events = 0;
            arr[pos].revents = 0;
        }
    }
    void Dispenser() // 任务派发器
    {
        // 是listen还是普通fd
        for (int i = 0; i < old_cap; i++)
        {
            if (arr[i].fd == defaultfd)
                continue;
            if (arr[i].revents & POLLIN)
            {
                if (arr[i].fd == _listensocket->Socketfd())
                {
                    Accept(); // listensockfd
                }
                else
                {
                    Recv(i);
                }
            }
        }
    }
    void Start()
    {
        isrunning = true;
        while (isrunning)
        {
            Printfd();
            int timeout = -1;
            int n = poll(arr, old_cap, timeout);
            switch (n)
            {
            case -1:
                std::cout << "poll error" << std::endl;
                break;
            case 0:
                std::cout << "poll timeout" << std::endl;
                break;
            default:
                std::cout << "get new file" << std::endl;
                Dispenser();
                break;
            }
        }
    }
    void Printfd()
    {
        std::cout << "pollfd[]: ";
        for (int i = 0; i < old_cap; i++)
        {
            if (arr[i].fd == defaultfd)
                continue;
            std::cout << arr[i].fd << " ";
        }
        std::cout << std::endl;
    }
    ~PollServer()
    {
        free(arr);
        arr = nullptr;
        if (_listensocket->Socketfd() > 0)
            _listensocket->Close();
    }

private:
    std::unique_ptr<Socket> _listensocket;
    bool isrunning;
    uint16_t _port;
    struct pollfd *arr; // 动态数组
    int old_cap;
};

PollClient.cc

#include <iostream>
#include "InetAddr.hpp"
#include "Common.hpp"
#include "PollServer.hpp"
int main(int argc, char *argv[])
{
    uint16_t port = std::stoi(argv[2]);
    std::string ip = argv[1];

    std::unique_ptr<Socket> sock = std::make_unique<TcpSocket>();
    sock->SocketBuild();

    int n = sock->Connect(ip, port);
    if (n < 0)
    {
        std::cout << "connect error" << std::endl;
        exit(CONNECT_ERR);
    }
    while (true)
    {
        std::cout << "please Enter: " << std::endl;
        std::string line;
        std::getline(std::cin, line);
        int w = send(sock->Socketfd(), line.c_str(), line.size(), 0);
        (void)w;

        char buffer[1024];
        memset(&buffer, 0, sizeof(buffer));
        int s = recv(sock->Socketfd(), buffer, sizeof(buffer) - 1, 0);
        if (s > 0)
        {
            buffer[s] = 0;

            std::cout << buffer << std::endl;
        }
        else if (s == 0)
        {
            std::cout << "server quit " << std::endl;
            break;
        }
        else
        {
            std::cout << "recv error " << std::endl;
            break;
        }
    }
    return 0;
}

Socket.hpp

#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include <unistd.h>
#include <memory>
#include "Common.hpp"
#include "InetAddr.hpp"

class Socket : public NoCopy
{
public:
    Socket() {}
    virtual void SocketBuild() = 0;
    virtual void SocketBind(uint16_t poer) = 0;
    virtual void SocketListen() = 0;
    virtual int Accept(InetAddr *client) = 0;
    virtual int Socketfd() = 0;
    virtual void Close() = 0;
    virtual int Recv(std::string *out) = 0;
    virtual ssize_t Write(std::string &line) = 0;
    virtual int Connect(std::string &server_ip, uint16_t server_port) = 0;

    void TcpSocketBulid(uint16_t port)
    {
        SocketBuild();
        SocketBind(port);
        SocketListen();
    }
    void UdpSocketBuild(uint16_t port)
    {
        SocketBuild();
        SocketBind(port);
    }
    ~Socket() {}

private:
};
const static int sockfd = -1;
class TcpSocket : public Socket
{
public:
    TcpSocket() : _socket(sockfd) {}
    // TcpSocket(int fd) : _socket(fd) {}
    void SocketBuild() override
    {
        _socket = ::socket(AF_INET, SOCK_STREAM, 0);
        if (_socket < 0)
        {
            std::cout << "socket error" << std::endl;
            exit(SOCKET_ERR);
        }
        std::cout << "socket success:" << _socket << std::endl;
    }
    void SocketBind(uint16_t port) override
    {
        InetAddr peer(port);
        int b = ::bind(_socket, peer.NetAddrPtr(), peer.InetAddrLen());
        if (b < 0)
        {
            std::cout << "BIND error" << strerror(errno) << std::endl;
            exit(BIND_ERR);
        }
        std::cout << "bind success:" << _socket << std::endl;
    }
    void SocketListen() override
    {
        int n = ::listen(_socket, backlog);
        if (n < 0)
        {
            std::cout << "listen error" << std::endl;
            exit(LISTEN_ERR);
        }
        std::cout << "listen success:" << _socket << std::endl;
    }
    int Accept(InetAddr *client) override
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int fd = ::accept(_socket, CONV(peer), &len);
        if (fd < 0)
        {
            std::cout << "accept error" << std::endl;
            return -1;
        }
        else
            client->SetAddr(peer);
        return fd;
    }
    int Connect(std::string &server_ip, uint16_t server_port) override
    {
        InetAddr client(server_ip, server_port);
        return connect(_socket, client.NetAddrPtr(), client.InetAddrLen());
    }
    int Recv(std::string *out) override
    {
        char buffer[1024];
        ssize_t s = read(_socket, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            buffer[s] = 0;
            *out += buffer;
        }
        return s;
    }
    ssize_t Write(std::string &line) override
    {
        return write(_socket, line.c_str(), line.size());
    }
    int Socketfd() override { return _socket; }
    void Close() override
    {
        if (_socket >= 0)
        {
            ::close(_socket);
        }
    }
    ~TcpSocket() {}

private:
    int _socket;
    int backlog = 253;
};

Makefile

.PHONY:all
all: pollserver pollclient

pollserver:PollServer.cc
	g++ -o $@ $^ -std=c++17
pollclient:PollClient.cc
	g++ -o $@ $^ -std=c++17
.PYONY:clean
clean:
	rm -f pollserver pollclient

静态版本的poll服务器

#pragma once
#include <iostream>
#include <sys/poll.h>
#include "Socket.hpp"
#include "InetAddr.hpp"
class PollServer
{
    const static int size = 4096;
    const static int defaultfd = -1;

public:
    PollServer(uint16_t port) : _listensockfd(std::make_unique<TcpSocket>()), _port(port), isrunning(false)
    {
        _listensockfd->TcpSocketBulid(_port);

        for (int i = 0; i < size; i++)
        {
            _fds[i].fd = defaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        _fds[0].fd = _listensockfd->Socketfd();
        _fds[0].events = POLLIN; // 只关心读事件
    }
    void Start()
    {
        isrunning = true;
        while (isrunning)
        {

            Printfd();
            int timeout = -1;
            int n = poll(_fds, size, timeout);
            switch (n)
            {
            case -1:
                std::cout << "poll error" << std::endl;
                break;
            case 0:
                std::cout << "poll timeout..." << std::endl;
                break;
            default:
                std::cout << "get new reading file" << std::endl;
                Dispatcher();
                break;
            }
        }
    }
    void Dispatcher()
    {
        // 首先判断是谁的事件就绪了
        for (int i = 0; i < size; i++)
        {
            if (_fds[i].fd == defaultfd)
                continue;
            if (_fds[i].revents & POLLIN)
            {
                if (_fds[i].fd == _listensockfd->Socketfd())
                {
                    std::cout << "有线连接到来" << std::endl;
                    Accept();
                }
                else
                {
                    std::cout << _fds[i].fd << "上的读事件已经就绪" << std::endl;
                    Recver(i);
                }
            }
        }
    }
    void Accept()
    {

        InetAddr client;
        int sockfd = _listensockfd->Accept(&client);
        int i = 0;
        for (; i < size; i++)
        {
            if (_fds[i].fd == defaultfd)
                break;
        }
        if (i == size)
        {
            close(_fds[i].fd);
            std::cout << "pollfd full" << std::endl;
            _fds[i].fd = defaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        else
        {
            _fds[i].fd = sockfd;
            _fds[i].events = POLLIN;
            _fds[i].revents = 0;
        }
    }
    void Printfd()
    {
        std::cout << "_fds[].fd: ";
        for (int i = 0; i < size; i++)
        {
            if (_fds[i].fd == defaultfd)
                continue;
            std::cout << _fds[i].fd << " ";
        }
        std::cout << std::endl;
    }
    void Recver(int pos)
    {

        char buffer[1024];
        memset(&buffer, 0, sizeof(buffer));
        int s = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);
        if (s > 0)
        {
            buffer[s] = 0;
            std::cout << "server echo: " << buffer << std::endl;
        }
        else if (s == 0)
        {
            std::cout << "client quit " << std::endl;
            close(_fds[pos].fd);
            _fds[pos].fd = defaultfd;
            _fds[pos].events = 0;
            _fds[pos].revents = 0;
        }
        else
        {
            std::cout << "client read error " << std::endl;
            close(_fds[pos].fd);
            _fds[pos].fd = defaultfd;
            _fds[pos].events = 0;
            _fds[pos].revents = 0;
        }
    }
    ~PollServer() {}

private:
    std::unique_ptr<Socket> _listensockfd;
    uint16_t _port;
    bool isrunning;
    struct pollfd _fds[size];
};

总结

本章分享了IO的五种模型和多路转接select和poll的代码实现。希望能给大家带来点帮助,内容可提供参考,如果有疑问咋们评论区讨论吧。
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐