一、为什么要从 handler lambda 升级到 Connection 对象?

在最开始的 mini epoll + Reactor demo 里,我们常常这样写:

reactor.add(client_fd, EPOLLIN, [&](int cfd) {
    char buf[1024];
    int n = read(cfd, buf, sizeof(buf));
    if (n > 0) {
        write(cfd, buf, n);
    } else {
        reactor.remove(cfd);
    }
});

这种写法能跑,但只能算:

函数式 demo

一旦你要加:

  • 连接状态
  • 读缓冲区 / 写缓冲区
  • 请求解析
  • 日志
  • 超时
  • 关闭策略

代码就会迅速失控。

所以服务端真正应该做的第一步,是建立这个抽象:

fd
 ↓
Connection
 ↓
onRead()
onWrite()
onClose()

也就是:

连接不是一个数字,而是一个对象。

二、这一篇的核心目标

这一篇只做一件很关键的事:

把 Reactor 的处理单元,从“fd → lambda”升级成“fd → Connection 对象”

同时把结构彻底理顺:

正确结构应该是:

main
  ↓
创建 Reactor
  ↓
把监听 fd 注册给 Reactor
  ↓
reactor.loop() 统一事件循环
        ↓
server_fd 事件 → Acceptor 处理 accept
client_fd 事件 → Connection 处理 onRead / onWrite / onClose

也就是说:

  • main() 不再自己写第二套 epoll 循环
  • Reactor 成为唯一事件循环中心
  • Acceptor 负责接连接
  • Connection 负责处理连接

这一版,控制流就是干净的。

三、最终工程结构(推荐目录)

.
├── EventHandler.h
├── Reactor.h
├── Reactor.cpp
├── Connection.h
├── Connection.cpp
├── Acceptor.h
├── Acceptor.cpp
├── main.cpp

职责划分

EventHandler.h

统一抽象事件处理接口。

Reactor.h / Reactor.cpp

负责:

  • epoll 创建
  • 注册 / 修改 / 删除 fd
  • fd → EventHandler 映射
  • 统一事件循环
Connection.h / Connection.cpp

负责:

  • 持有连接 fd
  • 读写缓冲区
  • onRead / onWrite / onClose
Acceptor.h / Acceptor.cpp

负责:

  • 监听 socket 的 accept
  • 创建新 Connection
  • 注册给 Reactor
main.cpp

只负责初始化:

  • 创建监听 socket
  • 创建 Reactor
  • 创建 Acceptor
  • 启动 loop

这就是一个标准的最小服务骨架。

四、先定义统一事件接口

EventHandler.h

#ifndef EVENT_HANDLER_H
#define EVENT_HANDLER_H

class EventHandler {
public:
    virtual ~EventHandler() = default;

    virtual int fd() const = 0;
    virtual void onRead() = 0;
    virtual void onWrite() = 0;
    virtual void onClose() = 0;
};

#endif

这一层是干什么的?

Reactor 不应该知道:

  • 你是监听 socket
  • 还是普通连接
  • 还是以后别的事件源

它只需要知道:

谁能处理事件

所以抽象成统一接口:

  • fd()
  • onRead()
  • onWrite()
  • onClose()

这样:

  • Acceptor 可以实现它
  • Connection 也可以实现它

这就是面向对象后的 Reactor。

五、定义 Reactor

Reactor.h

#ifndef REACTOR_H
#define REACTOR_H

#include <unordered_map>
#include <cstdint>

class EventHandler;

class Reactor {
public:
    Reactor();
    ~Reactor();

    bool addHandler(EventHandler* handler, uint32_t events);
    bool modifyHandler(int fd, uint32_t events);
    void removeHandler(int fd);

    void loop();

private:
    int epollFd_;
    std::unordered_map<int, EventHandler*> handlers_;
};

#endif

Reactor.cpp

#include "Reactor.h"
#include "EventHandler.h"

#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <cstdlib>

Reactor::Reactor() {
    epollFd_ = epoll_create1(0);
    if (epollFd_ == -1) {
        std::cerr << "epoll_create1 failed" << std::endl;
        std::exit(1);
    }
}

Reactor::~Reactor() {
    for (auto& pair : handlers_) {
        delete pair.second;
    }
    handlers_.clear();

    if (epollFd_ >= 0) {
        close(epollFd_);
        epollFd_ = -1;
    }
}

bool Reactor::addHandler(EventHandler* handler, uint32_t events) {
    int fd = handler->fd();

    epoll_event ev{};
    ev.events = events;
    ev.data.fd = fd;

    if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev) == -1) {
        std::cerr << "epoll_ctl add failed, fd=" << fd << std::endl;
        return false;
    }

    handlers_[fd] = handler;
    return true;
}

bool Reactor::modifyHandler(int fd, uint32_t events) {
    epoll_event ev{};
    ev.events = events;
    ev.data.fd = fd;

    if (epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev) == -1) {
        std::cerr << "epoll_ctl mod failed, fd=" << fd << std::endl;
        return false;
    }

    return true;
}

void Reactor::removeHandler(int fd) {
    epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr);

    auto it = handlers_.find(fd);
    if (it != handlers_.end()) {
        delete it->second;
        handlers_.erase(it);
    }
}

void Reactor::loop() {
    constexpr int MAX_EVENTS = 64;
    std::vector<epoll_event> events(MAX_EVENTS);

    while (true) {
        int n = epoll_wait(epollFd_, events.data(), MAX_EVENTS, -1);
        if (n == -1) {
            continue;
        }

        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;
            uint32_t ev = events[i].events;

            auto it = handlers_.find(fd);
            if (it == handlers_.end()) {
                continue;
            }

            EventHandler* handler = it->second;

            if (ev & (EPOLLERR | EPOLLHUP)) {
                handler->onClose();
                continue;
            }

            if (ev & EPOLLIN) {
                handler->onRead();
            }

            if (ev & EPOLLOUT) {
                handler->onWrite();
            }
        }
    }
}

Reactor 这一版最关键的变化

从以前的:

fd → lambda

升级到:fd → EventHandler*

这意味着 Reactor 只负责:

  • 等待事件
  • 查找 handler
  • 分发回调

它已经完全具备“调度器”的味道了。

六、定义 Connection

Connection.h

#ifndef CONNECTION_H
#define CONNECTION_H

#include "EventHandler.h"
#include <string>

class Reactor;

class Connection : public EventHandler {
public:
    Connection(int fd, Reactor* reactor);
    ~Connection() override;

    int fd() const override;

    void onRead() override;
    void onWrite() override;
    void onClose() override;

private:
    int fd_;
    Reactor* reactor_;
    std::string readBuffer_;
    std::string writeBuffer_;
};

#endif

Connection.cpp

#include "Connection.h"
#include "Reactor.h"

#include <unistd.h>
#include <errno.h>
#include <iostream>
#include <sys/epoll.h>

Connection::Connection(int fd, Reactor* reactor)
    : fd_(fd), reactor_(reactor) {
}

Connection::~Connection() {
    if (fd_ >= 0) {
        close(fd_);
        fd_ = -1;
    }
}

int Connection::fd() const {
    return fd_;
}

void Connection::onRead() {
    char buffer[1024];

    while (true) {
        ssize_t n = read(fd_, buffer, sizeof(buffer));

        if (n > 0) {
            readBuffer_.append(buffer, n);

            std::cout << "[Connection::onRead] fd=" << fd_
                      << " recv: " << std::string(buffer, n) << std::endl;

            // demo: echo 回写
            writeBuffer_.append(buffer, n);
        }
        else if (n == 0) {
            std::cout << "[Connection::onRead] client closed fd=" << fd_ << std::endl;
            onClose();
            return;
        }
        else {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }
            std::cerr << "[Connection::onRead] read error fd=" << fd_ << std::endl;
            onClose();
            return;
        }
    }

    if (!writeBuffer_.empty()) {
        reactor_->modifyHandler(fd_, EPOLLIN | EPOLLOUT);
    }
}

void Connection::onWrite() {
    while (!writeBuffer_.empty()) {
        ssize_t n = write(fd_, writeBuffer_.data(), writeBuffer_.size());

        if (n > 0) {
            writeBuffer_.erase(0, n);
        }
        else {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }
            std::cerr << "[Connection::onWrite] write error fd=" << fd_ << std::endl;
            onClose();
            return;
        }
    }

    if (writeBuffer_.empty()) {
        reactor_->modifyHandler(fd_, EPOLLIN);
    }
}

void Connection::onClose() {
    if (fd_ >= 0) {
        int oldFd = fd_;
        fd_ = -1;
        reactor_->removeHandler(oldFd);
    }
}

Connection 这一层最该记住什么?

1)Connection 持有连接自己的状态

比如:

  • fd_
  • readBuffer_
  • writeBuffer_

2)Connection 自己处理自己的生命周期

谁负责连接,谁负责关闭。

3)Connection 只关心“这个连接”

不关心别的 fd,也不关心 Reactor 怎么 wait。

所以:

Reactor 是调度层
Connection 是连接层

七、定义 Acceptor

Acceptor.h

#ifndef ACCEPTOR_H
#define ACCEPTOR_H

#include "EventHandler.h"

class Reactor;

class Acceptor : public EventHandler {
public:
    Acceptor(int listenFd, Reactor* reactor);
    ~Acceptor() override;

    int fd() const override;

    void onRead() override;
    void onWrite() override;
    void onClose() override;

private:
    int listenFd_;
    Reactor* reactor_;
};

#endif

Acceptor.cpp

#include "Acceptor.h"
#include "Reactor.h"
#include "Connection.h"

#include <unistd.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>

namespace {
int setNonBlocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) return -1;
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}

Acceptor::Acceptor(int listenFd, Reactor* reactor)
    : listenFd_(listenFd), reactor_(reactor) {
}

Acceptor::~Acceptor() {
    if (listenFd_ >= 0) {
        close(listenFd_);
        listenFd_ = -1;
    }
}

int Acceptor::fd() const {
    return listenFd_;
}

void Acceptor::onRead() {
    while (true) {
        int clientFd = accept(listenFd_, nullptr, nullptr);
        if (clientFd == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }
            std::cerr << "[Acceptor::onRead] accept error" << std::endl;
            break;
        }

        if (setNonBlocking(clientFd) == -1) {
            close(clientFd);
            continue;
        }

        std::cout << "[Acceptor] new connection fd=" << clientFd << std::endl;

        Connection* conn = new Connection(clientFd, reactor_);
        if (!reactor_->addHandler(conn, EPOLLIN)) {
            delete conn;
        }
    }
}

void Acceptor::onWrite() {
    // 监听 socket 一般不关心写事件
}

void Acceptor::onClose() {
    // 监听 fd 一般不会像普通连接那样关闭,这里先留空
}

为什么还要单独有个 Acceptor?

因为监听 socket 和普通连接,本质不是一回事:

监听 fd(server_fd)

负责:

  • accept

客户端 fd(client_fd)

负责:

  • read/write/close

所以用两个对象分开,是很自然的:

  • Acceptor
  • Connection

这也让后面扩展主从 Reactor 很容易。

八、main.cpp(最终合理版)

main.cpp

#include "Reactor.h"
#include "Acceptor.h"

#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>

#include <iostream>

namespace {
int setNonBlocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) return -1;
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}

int main() {
    int serverFd = socket(AF_INET, SOCK_STREAM, 0);
    if (serverFd == -1) {
        std::cerr << "socket failed" << std::endl;
        return 1;
    }

    int opt = 1;
    setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    if (setNonBlocking(serverFd) == -1) {
        std::cerr << "setNonBlocking failed" << std::endl;
        close(serverFd);
        return 1;
    }

    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8080);
    addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(serverFd, (sockaddr*)&addr, sizeof(addr)) == -1) {
        std::cerr << "bind failed" << std::endl;
        close(serverFd);
        return 1;
    }

    if (listen(serverFd, 128) == -1) {
        std::cerr << "listen failed" << std::endl;
        close(serverFd);
        return 1;
    }

    Reactor reactor;

    Acceptor* acceptor = new Acceptor(serverFd, &reactor);
    if (!reactor.addHandler(acceptor, EPOLLIN)) {
        delete acceptor;
        close(serverFd);
        return 1;
    }

    std::cout << "server running on :8080" << std::endl;
    reactor.loop();

    return 0;
}

九、这一版的控制流终于完全合理了

你现在可以把完整控制流理解成:

main
  ↓
创建 Reactor
  ↓
创建 Acceptor(监听 fd)
  ↓
Acceptor 注册到 Reactor
  ↓
reactor.loop()
    ↓
epoll_wait()
    ↓
fd ready
    ↓
找到对应 handler
        ↓
        如果是 server_fd → Acceptor::onRead()
        如果是 client_fd → Connection::onRead()/onWrite()/onClose()

这就是一套真正合理的最小 Reactor 骨架。

十、和之前版本相比,升级在哪?

之前的问题是:

1)main 自己也在循环

2)Reactor 也在循环

3)监听 fd 没纳入统一分发体系

所以结构是别扭的。

现在这一版:

1)只有一个事件循环

reactor.loop();

2)所有 fd 都归 Reactor 管

包括:

  • serverFd
  • clientFd

3)Acceptor 和 Connection 分层清晰

这是最关键的工程升级。

十一、这一篇真正建立了什么能力?

这一篇最大的价值不是“代码更多了”,而是建立了三层结构:

1)Reactor:调度层

负责事件循环和分发。

2)Acceptor:接入层

负责接连接。

3)Connection:连接层

负责一个连接的读写和关闭。

你从这里开始,就不再是在写 socket 示例,而是在写:

服务端骨架

十二、这一版还有哪些不足?

这是最小合理版,但还远不是工业级。

还缺:

  • 智能指针管理对象生命周期
  • Connection 上下文对象
  • 写事件更精细控制
  • 半包/粘包处理
  • 定时器 / 超时
  • 多 Reactor / 多线程
  • 协程接入

但没关系,这些正是后面的升级路线。

十三、本篇一句话总结

mini epoll demo 解决的是“事件怎么来”,
而这一篇解决的是“事件交给谁,以及连接如何成为对象”。

也就是从:fd → lambda

升级到:fd → Acceptor / Connection

这一步,就是从 demo 走向工程的关键分水岭。

十四、下一篇

这一篇做完,最顺的下一篇就是:

《C++ 服务端进阶(二)—— 多 Reactor + 线程模型:高并发架构》

因为你现在已经有了:

  • 非阻塞 fd
  • epoll
  • Reactor
  • Acceptor
  • Connection

下一步自然就是:

从单 Reactor 升级到多 Reactor / 主从 Reactor

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐