C++ 服务端进阶(一)—— 从 Reactor 到 Connection:服务骨架设计(工程版)
一、为什么要从 handler lambda 升级到 Connection 对象?
在最开始的 mini epoll + Reactor demo 里,我们常常这样写:
reactor.add(client_fd, EPOLLIN, [&](int cfd) {
char buf[1024];
int n = read(cfd, buf, sizeof(buf));
if (n > 0) {
write(cfd, buf, n);
} else {
reactor.remove(cfd);
}
});
这种写法能跑,但只能算:
函数式 demo
一旦你要加:
- 连接状态
- 读缓冲区 / 写缓冲区
- 请求解析
- 日志
- 超时
- 关闭策略
代码就会迅速失控。
所以服务端真正应该做的第一步,是建立这个抽象:
fd
↓
Connection
↓
onRead()
onWrite()
onClose()
也就是:
连接不是一个数字,而是一个对象。
二、这一篇的核心目标
这一篇只做一件很关键的事:
把 Reactor 的处理单元,从“fd → lambda”升级成“fd → Connection 对象”
同时把结构彻底理顺:
正确结构应该是:
main
↓
创建 Reactor
↓
把监听 fd 注册给 Reactor
↓
reactor.loop() 统一事件循环
↓
server_fd 事件 → Acceptor 处理 accept
client_fd 事件 → Connection 处理 onRead / onWrite / onClose
也就是说:
main()不再自己写第二套 epoll 循环Reactor成为唯一事件循环中心Acceptor负责接连接Connection负责处理连接
这一版,控制流就是干净的。
三、最终工程结构(推荐目录)
.
├── EventHandler.h
├── Reactor.h
├── Reactor.cpp
├── Connection.h
├── Connection.cpp
├── Acceptor.h
├── Acceptor.cpp
├── main.cpp
职责划分
EventHandler.h
统一抽象事件处理接口。
Reactor.h / Reactor.cpp
负责:
- epoll 创建
- 注册 / 修改 / 删除 fd
- fd → EventHandler 映射
- 统一事件循环
Connection.h / Connection.cpp
负责:
- 持有连接 fd
- 读写缓冲区
- onRead / onWrite / onClose
Acceptor.h / Acceptor.cpp
负责:
- 监听 socket 的 accept
- 创建新 Connection
- 注册给 Reactor
main.cpp
只负责初始化:
- 创建监听 socket
- 创建 Reactor
- 创建 Acceptor
- 启动 loop
这就是一个标准的最小服务骨架。
四、先定义统一事件接口
EventHandler.h
#ifndef EVENT_HANDLER_H
#define EVENT_HANDLER_H
class EventHandler {
public:
virtual ~EventHandler() = default;
virtual int fd() const = 0;
virtual void onRead() = 0;
virtual void onWrite() = 0;
virtual void onClose() = 0;
};
#endif
这一层是干什么的?
Reactor 不应该知道:
- 你是监听 socket
- 还是普通连接
- 还是以后别的事件源
它只需要知道:
谁能处理事件
所以抽象成统一接口:
fd()onRead()onWrite()onClose()
这样:
Acceptor可以实现它Connection也可以实现它
这就是面向对象后的 Reactor。
五、定义 Reactor
Reactor.h
#ifndef REACTOR_H
#define REACTOR_H
#include <unordered_map>
#include <cstdint>
class EventHandler;
class Reactor {
public:
Reactor();
~Reactor();
bool addHandler(EventHandler* handler, uint32_t events);
bool modifyHandler(int fd, uint32_t events);
void removeHandler(int fd);
void loop();
private:
int epollFd_;
std::unordered_map<int, EventHandler*> handlers_;
};
#endif
Reactor.cpp
#include "Reactor.h"
#include "EventHandler.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <cstdlib>
Reactor::Reactor() {
epollFd_ = epoll_create1(0);
if (epollFd_ == -1) {
std::cerr << "epoll_create1 failed" << std::endl;
std::exit(1);
}
}
Reactor::~Reactor() {
for (auto& pair : handlers_) {
delete pair.second;
}
handlers_.clear();
if (epollFd_ >= 0) {
close(epollFd_);
epollFd_ = -1;
}
}
bool Reactor::addHandler(EventHandler* handler, uint32_t events) {
int fd = handler->fd();
epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev) == -1) {
std::cerr << "epoll_ctl add failed, fd=" << fd << std::endl;
return false;
}
handlers_[fd] = handler;
return true;
}
bool Reactor::modifyHandler(int fd, uint32_t events) {
epoll_event ev{};
ev.events = events;
ev.data.fd = fd;
if (epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev) == -1) {
std::cerr << "epoll_ctl mod failed, fd=" << fd << std::endl;
return false;
}
return true;
}
void Reactor::removeHandler(int fd) {
epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr);
auto it = handlers_.find(fd);
if (it != handlers_.end()) {
delete it->second;
handlers_.erase(it);
}
}
void Reactor::loop() {
constexpr int MAX_EVENTS = 64;
std::vector<epoll_event> events(MAX_EVENTS);
while (true) {
int n = epoll_wait(epollFd_, events.data(), MAX_EVENTS, -1);
if (n == -1) {
continue;
}
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd;
uint32_t ev = events[i].events;
auto it = handlers_.find(fd);
if (it == handlers_.end()) {
continue;
}
EventHandler* handler = it->second;
if (ev & (EPOLLERR | EPOLLHUP)) {
handler->onClose();
continue;
}
if (ev & EPOLLIN) {
handler->onRead();
}
if (ev & EPOLLOUT) {
handler->onWrite();
}
}
}
}
Reactor 这一版最关键的变化
从以前的:
fd → lambda
升级到:fd → EventHandler*
这意味着 Reactor 只负责:
- 等待事件
- 查找 handler
- 分发回调
它已经完全具备“调度器”的味道了。
六、定义 Connection
Connection.h
#ifndef CONNECTION_H
#define CONNECTION_H
#include "EventHandler.h"
#include <string>
class Reactor;
class Connection : public EventHandler {
public:
Connection(int fd, Reactor* reactor);
~Connection() override;
int fd() const override;
void onRead() override;
void onWrite() override;
void onClose() override;
private:
int fd_;
Reactor* reactor_;
std::string readBuffer_;
std::string writeBuffer_;
};
#endif
Connection.cpp
#include "Connection.h"
#include "Reactor.h"
#include <unistd.h>
#include <errno.h>
#include <iostream>
#include <sys/epoll.h>
Connection::Connection(int fd, Reactor* reactor)
: fd_(fd), reactor_(reactor) {
}
Connection::~Connection() {
if (fd_ >= 0) {
close(fd_);
fd_ = -1;
}
}
int Connection::fd() const {
return fd_;
}
void Connection::onRead() {
char buffer[1024];
while (true) {
ssize_t n = read(fd_, buffer, sizeof(buffer));
if (n > 0) {
readBuffer_.append(buffer, n);
std::cout << "[Connection::onRead] fd=" << fd_
<< " recv: " << std::string(buffer, n) << std::endl;
// demo: echo 回写
writeBuffer_.append(buffer, n);
}
else if (n == 0) {
std::cout << "[Connection::onRead] client closed fd=" << fd_ << std::endl;
onClose();
return;
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
std::cerr << "[Connection::onRead] read error fd=" << fd_ << std::endl;
onClose();
return;
}
}
if (!writeBuffer_.empty()) {
reactor_->modifyHandler(fd_, EPOLLIN | EPOLLOUT);
}
}
void Connection::onWrite() {
while (!writeBuffer_.empty()) {
ssize_t n = write(fd_, writeBuffer_.data(), writeBuffer_.size());
if (n > 0) {
writeBuffer_.erase(0, n);
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
std::cerr << "[Connection::onWrite] write error fd=" << fd_ << std::endl;
onClose();
return;
}
}
if (writeBuffer_.empty()) {
reactor_->modifyHandler(fd_, EPOLLIN);
}
}
void Connection::onClose() {
if (fd_ >= 0) {
int oldFd = fd_;
fd_ = -1;
reactor_->removeHandler(oldFd);
}
}
Connection 这一层最该记住什么?
1)Connection 持有连接自己的状态
比如:
fd_readBuffer_writeBuffer_
2)Connection 自己处理自己的生命周期
谁负责连接,谁负责关闭。
3)Connection 只关心“这个连接”
不关心别的 fd,也不关心 Reactor 怎么 wait。
所以:
Reactor 是调度层
Connection 是连接层
七、定义 Acceptor
Acceptor.h
#ifndef ACCEPTOR_H
#define ACCEPTOR_H
#include "EventHandler.h"
class Reactor;
class Acceptor : public EventHandler {
public:
Acceptor(int listenFd, Reactor* reactor);
~Acceptor() override;
int fd() const override;
void onRead() override;
void onWrite() override;
void onClose() override;
private:
int listenFd_;
Reactor* reactor_;
};
#endif
Acceptor.cpp
#include "Acceptor.h"
#include "Reactor.h"
#include "Connection.h"
#include <unistd.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>
namespace {
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}
Acceptor::Acceptor(int listenFd, Reactor* reactor)
: listenFd_(listenFd), reactor_(reactor) {
}
Acceptor::~Acceptor() {
if (listenFd_ >= 0) {
close(listenFd_);
listenFd_ = -1;
}
}
int Acceptor::fd() const {
return listenFd_;
}
void Acceptor::onRead() {
while (true) {
int clientFd = accept(listenFd_, nullptr, nullptr);
if (clientFd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
std::cerr << "[Acceptor::onRead] accept error" << std::endl;
break;
}
if (setNonBlocking(clientFd) == -1) {
close(clientFd);
continue;
}
std::cout << "[Acceptor] new connection fd=" << clientFd << std::endl;
Connection* conn = new Connection(clientFd, reactor_);
if (!reactor_->addHandler(conn, EPOLLIN)) {
delete conn;
}
}
}
void Acceptor::onWrite() {
// 监听 socket 一般不关心写事件
}
void Acceptor::onClose() {
// 监听 fd 一般不会像普通连接那样关闭,这里先留空
}
为什么还要单独有个 Acceptor?
因为监听 socket 和普通连接,本质不是一回事:
监听 fd(server_fd)
负责:
accept
客户端 fd(client_fd)
负责:
read/write/close
所以用两个对象分开,是很自然的:
AcceptorConnection
这也让后面扩展主从 Reactor 很容易。
八、main.cpp(最终合理版)
main.cpp
#include "Reactor.h"
#include "Acceptor.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <iostream>
namespace {
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}
int main() {
int serverFd = socket(AF_INET, SOCK_STREAM, 0);
if (serverFd == -1) {
std::cerr << "socket failed" << std::endl;
return 1;
}
int opt = 1;
setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (setNonBlocking(serverFd) == -1) {
std::cerr << "setNonBlocking failed" << std::endl;
close(serverFd);
return 1;
}
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(8080);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(serverFd, (sockaddr*)&addr, sizeof(addr)) == -1) {
std::cerr << "bind failed" << std::endl;
close(serverFd);
return 1;
}
if (listen(serverFd, 128) == -1) {
std::cerr << "listen failed" << std::endl;
close(serverFd);
return 1;
}
Reactor reactor;
Acceptor* acceptor = new Acceptor(serverFd, &reactor);
if (!reactor.addHandler(acceptor, EPOLLIN)) {
delete acceptor;
close(serverFd);
return 1;
}
std::cout << "server running on :8080" << std::endl;
reactor.loop();
return 0;
}
九、这一版的控制流终于完全合理了
你现在可以把完整控制流理解成:
main
↓
创建 Reactor
↓
创建 Acceptor(监听 fd)
↓
Acceptor 注册到 Reactor
↓
reactor.loop()
↓
epoll_wait()
↓
fd ready
↓
找到对应 handler
↓
如果是 server_fd → Acceptor::onRead()
如果是 client_fd → Connection::onRead()/onWrite()/onClose()
这就是一套真正合理的最小 Reactor 骨架。
十、和之前版本相比,升级在哪?
之前的问题是:
1)main 自己也在循环
2)Reactor 也在循环
3)监听 fd 没纳入统一分发体系
所以结构是别扭的。
现在这一版:
1)只有一个事件循环
reactor.loop();
2)所有 fd 都归 Reactor 管
包括:
serverFdclientFd
3)Acceptor 和 Connection 分层清晰
这是最关键的工程升级。
十一、这一篇真正建立了什么能力?
这一篇最大的价值不是“代码更多了”,而是建立了三层结构:
1)Reactor:调度层
负责事件循环和分发。
2)Acceptor:接入层
负责接连接。
3)Connection:连接层
负责一个连接的读写和关闭。
你从这里开始,就不再是在写 socket 示例,而是在写:
服务端骨架
十二、这一版还有哪些不足?
这是最小合理版,但还远不是工业级。
还缺:
- 智能指针管理对象生命周期
- Connection 上下文对象
- 写事件更精细控制
- 半包/粘包处理
- 定时器 / 超时
- 多 Reactor / 多线程
- 协程接入
但没关系,这些正是后面的升级路线。
十三、本篇一句话总结
mini epoll demo 解决的是“事件怎么来”,
而这一篇解决的是“事件交给谁,以及连接如何成为对象”。
也就是从:fd → lambda
升级到:fd → Acceptor / Connection
这一步,就是从 demo 走向工程的关键分水岭。
十四、下一篇
这一篇做完,最顺的下一篇就是:
《C++ 服务端进阶(二)—— 多 Reactor + 线程模型:高并发架构》
因为你现在已经有了:
- 非阻塞 fd
- epoll
- Reactor
- Acceptor
- Connection
下一步自然就是:
从单 Reactor 升级到多 Reactor / 主从 Reactor
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)