前言

在基于 Reactor 模型的 C++ 高性能网络库中,套接字管理与连接接收是服务端的核心基石。本文基于完整实现的 socket.h/ccacceptor.h/cc 四段代码,深度讲解网络套接字工具函数、IP 地址封装类、Socket 资源管理类、Acceptor 连接接收器四大模块。


一、代码整体结构说明

本文基于四段完整代码展开:

  1. socket.h:套接字工具函数声明、InetAddress 地址类、Socket 类声明
  2. socket.cc:套接字工具函数、地址类、Socket 类实现
  3. acceptor.h:Acceptor 连接接收器类声明
  4. acceptor.cc:Acceptor 类实现

这四段代码构成了网络库的底层 IO 与连接接入层,是 TcpServer、EventLoop、Channel 等上层组件的基础。


二、代码精讲

第一部分:socket.h + socket.cc 

socket.h:

#pragma once
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string>

namespace net
{
    namespace sockets
    {
        static const int LISTEN_COUNT =1024;
        // 创建⾮阻塞套接字,失败则抛异常
        int createNonblockSocket();
        // 创建普通默认套接字,失败则抛异常
        int createBlockSocket();
        // 为套接字绑定地址,失败则抛异常
        void bind(int sockfd, const struct sockaddr *addr);
        // 直接返回系统调⽤即可,失败由外部处理
        int connect(int sockfd, const struct sockaddr *addr);
        // 开始监听套接字,失败抛异常
        void listen(int sockfd);
        // 获取新连接,accept4可以获取的同时设置套接字选项
        // 错误情况: 可接受:EAGAIN,ECONNABORTED,EINTR,EPROTO,EPERM,EMFILE
        // 不可接收抛异常:EBADF,EFAULT,EINVAL,ENFILE,ENOBUFS,ENOMEM,ENOTSOCK,
        // EOPNOTSUPP
        int accept(int sockfd, struct sockaddr_in *addr);
        // 直接返回系统调⽤
        ssize_t read(int fd, void *buf, size_t size);
        // 直接返回系统调⽤
        ssize_t readv(int fd, struct iovec *vec, int count);
        // 直接返回系统调⽤
        ssize_t write(int fd, const void *buf, size_t size);
        // 直接返回系统调⽤
        void close(int fd);
        // 转换为:192.168.1.1:8080 inet_ntop
        void toIpPort(char *buf, size_t size, const struct sockaddr_in *addr);
        // 转为⽹络字节序地址结构数据 inet_pton
        void fromIpPort(const char *ip, uint16_t port, struct sockaddr_in *addr);
    }
    class InetAddress
    {
    public:
        // 初始化数据,INADDR_LOOPBACK / INADDR_ANY
        explicit InetAddress(uint16_t port = 0);
        // 初始化数据
        InetAddress(const std::string ip, uint16_t port);
        // 地址转字符串
        std::string toIpPort() const;
        // 获取地址数据
        const struct sockaddr *getSockAddr() const;
        // 设置地址数据
        void setSockAddr(struct sockaddr_in addr);

    private:
        struct sockaddr_in _addr;
    };
    
    class Socket
    {
    public:
        explicit Socket(int sockfd):_sockfd(sockfd){}
        ~Socket(){sockets::close(_sockfd);}
        int fd(){return _sockfd;}

        void bind(const InetAddress &localaddr);
        void listen();
        int accept(InetAddress *peeraddr);
        // IPPROTO_TCP:TCP 层(设置 TCP 协议相关属性), TCP_NODELAY
        void setTcpNoDelay(bool on);
        // SOL_SOCKET:套接字层(设置 socket 本身的属性),SO_REUSEADDR
        void setReuseAddr(bool on);
        // SOL_SOCKET, SO_REUSEPORT
        void setReusePort(bool on);
        // SOL_SOCKET, SO_KEEPALIVE
        void setKeepAlive(bool on);

    private:
        const int _sockfd;
    };
}

socket.cc:

#include "socket.h"
#include "details.h"
#include "sys/uio.h"
#include "linux/tcp.h"
#include <cstring>
namespace net
{
    // 创建⾮阻塞套接字,失败则抛异常
    //SOCK_NONBLOCK:非阻塞 IO
    int sockets::createNonblockSocket()
    {
        int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
        if (fd < 0)
        {
            LOG_FATAL("创建套接字失败");
        }
        return fd;
    }
    // 创建普通默认套接字,失败则抛异常
    int sockets::createBlockSocket()
    {
        int fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
        if (fd < 0)
        {
            LOG_FATAL("创建套接字失败");
        }
        return fd;
    }
    // 为套接字绑定地址,失败则抛异常
    void sockets::bind(int sockfd, const struct sockaddr *addr)
    {
        int ret = ::bind(sockfd, addr, sizeof(struct sockaddr_in));
        if (ret < 0)
        {
            LOG_FATAL("套接字绑定失败");
        }
    }
    // 直接返回系统调⽤即可,失败由外部处理
    int sockets::connect(int sockfd, const struct sockaddr *addr)
    {
        //让当前 sockfd 去连接 addr 这个 IP:Port
        int ret = ::connect(sockfd, addr, sizeof(struct sockaddr_in));
        if (ret < 0)
        {
            LOG_ERROR("连接服务器失败");
        }
        return ret;
    }
    // 开始监听套接字,失败抛异常
    void sockets::listen(int sockfd)
    {
        int ret = ::listen(sockfd, LISTEN_COUNT);
        if (ret < 0)
        {
            LOG_FATAL("监听失败");
        }
    }
    // 获取新连接,accept4可以获取的同时设置套接字选项
    // 错误情况: 可接受:EAGAIN,ECONNABORTED,EINTR,EPROTO,EPERM,EMFILE
    // 不可接收抛异常:EBADF,EFAULT,EINVAL,ENFILE,ENOBUFS,ENOMEM,ENOTSOCK,EOPNOTSUPP
    int sockets::accept(int sockfd, struct sockaddr_in *addr)
    {
        socklen_t len = sizeof(struct sockaddr_in);
        int ret = ::accept4(sockfd, (struct sockaddr *)addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC);
        if (ret < 0)
        {
            switch (errno)
            {
            case EAGAIN:       // 非阻塞场景下,没有新连接
            case ECONNABORTED: // 新连接异常
            case EINTR:        // 当前的阻塞操作被信号中断了
            case EPROTO:       // 协议错误
            case EPERM:        // 防火墙拦截
            case EMFILE:       // 文件描述符达到进程的限制上限
                break;
            case EBADF:      // 坏的文件描述符
            case EFAULT:     // 地址参数无效
            case EINVAL:     // 参数无效
            case ENFILE:     // 系统描述符数量达到上限
            case ENOBUFS:    // 内存不足
            case ENOMEM:     // 内存不足
            case ENOTSOCK:   // 描述符不是一个套接字描述符
            case EOPNOTSUPP: // 操作错误,描述符不是一个流式套接字
                LOG_FATAL("获取新连接失败");
            default:
                LOG_FATAL("获取新连接失败");
            }
        }
        return ret;
    }
    // 直接返回系统调⽤
    ssize_t sockets::read(int fd, void *buf, size_t size)
    {
        return ::read(fd, buf, size); // 等价于recv(fd,buf,size,0)
    }
    // 直接返回系统调⽤
    ssize_t sockets::readv(int fd, struct iovec *vec, int count)
    {
        // 实现分块接受,将接收到的数据,放到不连续的内存空间中
        return ::readv(fd, vec, count);
    }
    // 直接返回系统调⽤
    ssize_t sockets::write(int fd, const void *buf, size_t size)
    {
        return ::write(fd, buf, size); // 等价于send(fd,buf,size,0)
    }
    // 直接返回系统调⽤
    void sockets::close(int fd)
    {
        if (fd > 0)
        {
            ::close(fd);
        }
    }
    // 转换为:192.168.1.1:8080 inet_ntop
    void sockets::toIpPort(char *buf, size_t size, const struct sockaddr_in *addr)
    {
        // 转换IP地址:inet_ntop   转换终端:ntohs,sprintf
        memset(buf, 0x00, size);
        inet_ntop(AF_INET, &addr->sin_addr, buf, size);
        snprintf(buf + strlen(buf), size - strlen(buf), "%u", ntohs(addr->sin_port));
    }
    // 转为⽹络字节序地址结构数据 inet_pton
    void sockets::fromIpPort(const char *ip, uint16_t port, struct sockaddr_in *addr)
    {
        inet_pton(AF_INET, ip, &addr->sin_addr);
        addr->sin_port = htons(port);
    }

    // 初始化数据,INADDR_LOOPBACK / INADDR_ANY
    InetAddress::InetAddress(uint16_t port)
    {
        _addr.sin_family = AF_INET;
        _addr.sin_addr.s_addr = INADDR_ANY;
        _addr.sin_port = htons(port);
    }
    // 初始化数据
    InetAddress::InetAddress(const std::string ip, uint16_t port)
    {
        _addr.sin_family = AF_INET;
        sockets::fromIpPort(ip.c_str(), port, &_addr);
    }
    // 地址转字符串
    std::string InetAddress::toIpPort() const
    {
        char buf[64] = {0};
        sockets::toIpPort(buf, 64, &_addr);
        return buf;
    }
    // 获取地址数据
    const struct sockaddr *InetAddress::getSockAddr() const
    {
        return (struct sockaddr *)&_addr;
    }
    // 设置地址数据
    void InetAddress::setSockAddr(struct sockaddr_in addr)
    {
        _addr = addr;
    }

    void Socket::bind(const InetAddress &localaddr)
    {
        return sockets::bind(_sockfd, localaddr.getSockAddr());
    }
    void Socket::listen()
    {
        return sockets::listen(_sockfd);
    }
    int Socket::accept(InetAddress *peeraddr)
    {
        struct sockaddr_in addr;
        int fd = sockets::accept(_sockfd, &addr);
        peeraddr->setSockAddr(addr);
        return fd;
    }
    // IPPROTO_TCP, TCP_NODELAY
    void Socket::setTcpNoDelay(bool on)
    {
        // int setsockopt(int sockfd, int level, int optname,const void *optval, size_t size);
        int opt = on ? 1 : 0;
        setsockopt(_sockfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
    }
    // SOL_SOCKET, SO_REUSEADDR
    void Socket::setReuseAddr(bool on)
    {
        int opt = on ? 1 : 0;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }
    // SOL_SOCKET, SO_REUSEPORT
    void Socket::setReusePort(bool on)
    {
        int opt = on ? 1 : 0;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    }
    // SOL_SOCKET, SO_KEEPALIVE
    void Socket::setKeepAlive(bool on)
    {
        int opt = on ? 1 : 0;
        setsockopt(_sockfd, SOL_SOCKET, SO_KEEPALIVE, &opt, sizeof(opt));
    }
}

核心知识点:

1. Linux 系统头文件与网络编程基础
  • 标准网络头文件:<fcntl.h> <unistd.h> <sys/socket.h> <netinet/in.h> <arpa/inet.h>
  • 内存操作头文件:<cstring>
  • 散读 / 散写头文件:<sys/uio.h>
  • TCP 协议头文件:<linux/tcp.h>
2. 命名空间设计
  • 使用net命名空间隔离网络库代码
  • 内部嵌套sockets工具命名空间,封装纯函数式套接字操作
3. Linux 套接字系统调用封装
  • socket():创建 TCP 流式套接字,支持 IPv4 协议
  • bind():将套接字与 IP / 端口绑定
  • listen():将套接字设置为监听状态
  • accept4():高级接受连接函数,支持原子设置套接字属性
  • connect():客户端发起连接
  • read()/write()/readv()/close():文件描述符基础 IO 操作
  • setsockopt():设置套接字选项
4. 套接字创建高级特性
  • SOCK_NONBLOCK:创建非阻塞套接字(Reactor 模型必备)
  • SOCK_CLOEXEC:fork 子进程时自动关闭文件描述符,防止资源泄漏
  • 区分阻塞套接字非阻塞套接字创建接口
5. 网络地址转换函数
  • inet_pton:字符串 IP → 网络字节序二进制 IP
  • inet_ntop:网络字节序二进制 IP → 字符串 IP
  • htons/ntohs:主机字节序与网络字节序端口转换
6. 错误处理机制
  • 系统调用失败判断 + 日志输出(LOG_FATAL/LOG_ERROR)
  • accept函数精细化错误码处理:
    • 可忽略错误:EAGAIN、ECONNABORTED、EINTR、EPROTO、EPERM、EMFILE
    • 致命错误:EBADF、EFAULT、ENFILE、ENOMEM 等,直接终止程序
7. InetAddress 地址封装类
  • 封装sockaddr_in结构体,简化网络地址操作
  • 双构造函数:端口初始化、IP + 端口初始化
  • 提供 IP:Port 字符串转换、地址结构体获取 / 设置接口
  • 支持INADDR_ANY(监听所有网卡)
8. Socket 资源管理类(RAII 机制)
  • RAII 设计模式:构造函数绑定文件描述符,析构函数自动关闭套接字
  • 封装bind/listen/accept核心操作
  • 四大核心套接字选项设置:
    1. TCP_NODELAY:禁用 Nagle 算法,降低延迟
    2. SO_REUSEADDR:地址复用,解决 TIME_WAIT 端口占用问题
    3. SO_REUSEPORT:端口复用,支持多进程 / 多线程负载均衡
    4. SO_KEEPALIVE:TCP 保活机制,检测死连接
  • 私有成员_sockfd,对外仅提供只读接口,保证安全性
9. 高性能 IO 特性
  • readv散读函数:支持将数据写入不连续内存,减少系统调用次数
  • 监听队列长度LISTEN_COUNT = 1024,适配高并发场景

第二部分:acceptor.h + acceptor.cc 

acceptor.h:

#pragma once // 防止头文件重复包含
#include "details.h"
#include "channel.h"
#include "eventloop.h"
#include "socket.h"

namespace net
{
    // 新连接到来时的回调函数类型
    using NewConnectionCallback = std::function<void(int, InetAddress)>;
    class Acceptor
    {
    public:
        // 初始化成员,设置channel回调函数,设置套接字的选项标志
        Acceptor(EventLoop *loop, const InetAddress &addr);
        // 关闭套接字,移除套接字的事件监控
        ~Acceptor();
        void setNewConnectionCallback(NewConnectionCallback cb);
        void listen();

    private:
        void handleRead(Timestamp recvTime); // 有新连接时被调用

    private:
        EventLoop *_loop;                       // 所属事件循环
        Socket _acceptSocket;                   // 监听套接字(server socket)
        Channel _acceptChannel;                 // 封装监听 fd,交给 EventLoop 监控
        NewConnectionCallback _newConnCallback; // 新连接回调
        int _idleFd;                            // 文件描述符占位符(处理 fd 耗尽问题)
    };
}

acceptor.cc:

#include "acceptor.h"
#include "cassert"
namespace net
{
    // 初始化成员,设置channel回调函数,设置套接字的选项标志
    Acceptor::Acceptor(EventLoop *loop, const InetAddress &addr)
        : _loop(loop),
          _acceptSocket(sockets::createNonblockSocket()),
          _acceptChannel(_loop, _acceptSocket.fd()),
          _idleFd(::open("/dev/null", O_CLOEXEC | O_CREAT))
    {
        //_odleFd占位府的作用:
        // 应用于accept的时候,进程描述符数量达到上限,内核里边有就绪的新连接
        // 但是受限于进程描述符数量,无法获取出来,就会一直触发可读事件
        assert(_idleFd >= 0);
        // 设置套接字选项
        _acceptSocket.setKeepAlive(true);
        _acceptSocket.bind(addr);
        // 为Channel设置事件处理回调函数
        _acceptChannel.setReadCallback(std::bind(&Acceptor::handleRead, this, std::placeholders::_1));
    }
    // 关闭描述符,移除套接字的事件监控
    Acceptor::~Acceptor(){
        ::close(_idleFd);
        _acceptChannel.disableAll();
        _acceptChannel.remove();
    }
    void Acceptor::listen(){
        _loop->assertInLoopThread();
        //对监听套接字开始监听
        _acceptSocket.listen();
        //对channel启动读事件监控
        _acceptChannel.enableReading();
    }
    void Acceptor::setNewConnectionCallback(NewConnectionCallback cb){
        _newConnCallback=std::move(cb);
    }
    void Acceptor::handleRead(Timestamp recvTime){
        _loop->assertInLoopThread();
        InetAddress addr;
        int fd=_acceptSocket.accept(&addr);
        if(fd>0)
        {
            if(_newConnCallback){
                _newConnCallback(fd,addr);
            }else{
                ::close(fd);
            }
        }
        //这样做是因为epoll的EPOLLIN会持续触发,但无法处理新连接,
        //通过占位符机制强制接受一个连接并关闭,让系统恢复。
        else{
            //出错了,就绪的连接就没取出来,关闭占位符,取出新连接,
            //重新关闭新连接,重新进行占位
            ::close(_idleFd);
            _idleFd=_acceptSocket.accept(&addr);
            ::close(_idleFd);
            _idleFd=open("/dev/null",O_CLOEXEC|O_CREAT);
        }
    }
}

核心知识点:

1. Reactor 模型核心组件融合
  • 依赖EventLoop:绑定 IO 线程,实现事件驱动
  • 依赖Channel:封装监听套接字,交由 epoll 监控读写事件
  • 依赖Socket:管理监听套接字生命周期
2. C++ 函数对象与回调机制
  • using NewConnectionCallback:定义新连接回调函数类型
  • std::function + std::bind:实现事件回调解耦
  • std::move:优化回调函数的转移语义
3. Acceptor 核心设计
  • 单例式监听套接字管理,专门处理服务端连接接收
  • 线程安全校验:assertInLoopThread保证所有操作在 IO 线程执行
  • 封装listen接口:启动监听并注册读事件
4. 事件处理逻辑
  • handleRead:监听套接字可读事件回调(新连接到达)
  • 调用Socket::accept获取新连接文件描述符与对端地址
  • 执行用户注册的新连接回调,将连接交付上层处理
5. 高并发工程级优化:idleFd 机制
  • 解决EMFILE 错误(进程文件描述符耗尽)
  • 原理:预先占用/dev/null文件描述符,异常时释放→接受连接→关闭→重新占位
  • 避免 epoll 持续触发可读事件导致 CPU 100% 死循环
6. 资源安全管理
  • 析构函数:关闭 idleFd、注销 Channel 事件、移除 epoll 监控
  • 无用户回调时自动关闭新连接,防止文件描述符泄漏
  • assert断言:强制校验关键资源合法性
7. 占位符工具
  • std::placeholders::_1:绑定回调函数参数
  • /dev/null:空设备文件,用于创建占位文件描述符

三、核心类设计思想

1. Socket 类:面向对象的系统调用封装

设计目标:屏蔽 Linux 原生 socket API 的繁琐与不安全,提供面向对象、自动资源管理的套接字接口。

核心价值

  • RAII 自动关闭文件描述符,杜绝资源泄漏
  • 统一错误处理,简化上层调用
  • 封装所有高并发必备套接字选项
  • 与 InetAddress 解耦,代码简洁易用

2. InetAddress 类:网络地址工具

设计目标:封装sockaddr_in,统一处理 IP / 端口、字节序转换、字符串格式化。

核心价值:消除原生结构体的冗余代码,提升代码可读性与可维护性。

3. Acceptor 类:Reactor 连接入口

设计目标:作为服务端唯一连接接收器,基于事件驱动接收新连接。

核心价值

  • 与 EventLoop 深度融合,纯异步非阻塞设计
  • 精细化异常处理,支持高并发场景
  • 回调机制解耦连接接收与业务处理
  • idleFd 机制解决工程中最棘手的文件描述符耗尽问题

四、整体工作流程

  1. 初始化:Acceptor 创建非阻塞监听 Socket,绑定 IP / 端口,设置套接字选项
  2. 事件注册:将监听 Socket 封装为 Channel,注册到 EventLoop,监听 EPOLLIN 事件
  3. 启动监听:调用 listen 函数,内核开始接收连接请求
  4. 连接到达:epoll 触发可读事件,Channel 调用 handleRead
  5. 接受连接:accept 获取新连接,执行用户回调
  6. 异常处理:文件描述符耗尽时,idleFd 机制保证程序稳定运行
  7. 资源释放:析构函数自动关闭所有文件描述符,注销事件

五、总结

这套代码的核心亮点:

  1. 安全:CLOEXEC、RAII、私有成员封装,杜绝资源泄漏与非法操作
  2. 高效:非阻塞 IO、事件驱动、端口复用、禁用 Nagle 算法
  3. 健壮:精细化错误处理、idleFd 解决文件描述符耗尽
  4. 解耦:回调机制、分层设计,适配大型网络库架构
  5. 标准:完全遵循 Reactor 模型设计,可直接集成到高性能服务端框架
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐