C++ 服务端进阶(三)—— Reactor + 协程:现代异步模型(附完整项目结构与代码)
引言
先校准一下预期
这一篇如果真的要做 “C++20 原生协程 + epoll + Reactor”,工业级会非常复杂,因为 C++ 原生协程只给你:
co_awaitco_return- suspend / resume 机制
它不给你 runtime,也就是说:
你得自己把
epoll、事件注册、协程恢复、任务生命周期 全部串起来。
所以这一篇做的是:
最小可理解、可运行、能体现本质的版本
目标不是上来就对标 Boost.Asio / libuv,而是真正打通:
epoll(事件)
↓
Reactor(调度)
↓
Coroutine(挂起 / 恢复)
正文
一、为什么要从 Reactor 再走到协程?
在前两篇里,我们已经完成了:
- 第一篇:
Connection抽象,把连接对象化 - 第二篇:多 Reactor + 线程模型,把架构做出来
这时候你其实已经能写出一个“像样的服务端”了。
但是还有一个经典问题:
代码写起来还是不够优雅
比如在 Reactor 模型里,常见写法会变成:
onRead()
↓
解析状态
↓
决定下次等读还是等写
↓
回调继续
一旦逻辑稍微复杂一点,就会出现:
- 状态变量越来越多
- 读写流程被拆碎
- 回调 / handler 里到处是分支
- 同步思维变成“事件状态机思维”,代码不直观
所以才需要协程。
一旦逻辑稍微复杂一点,就会出现:
- 状态变量越来越多
- 读写流程被拆碎
- 回调 / handler 里到处是分支
- 同步思维变成“事件状态机思维”,代码不直观
所以才需要协程。
二、协程到底解决什么问题?
协程解决的不是:
有没有事件
这个是 epoll 干的。
协程解决的是:
代码执行到一半,怎么暂停;事件到了之后,怎么从原地继续执行
换句话说:
Reactor 负责:
- 什么时候可以读
- 什么时候可以写
协程负责:
- 现在先停一下
- 等可读/可写了再继续往下跑
所以这三层关系一定要记住:
epoll:谁 ready 了
Reactor:把 ready 事件分发出去
协程:让处理逻辑可以挂起再恢复
三、这一篇要做什么?
这一篇我们做一个最小可运行版本,目标不是工业级,而是彻底打通模型。
我们要实现:
1)单线程 Reactor
epoll_wait()统一监听事件
2)C++20 协程
co_await reactor.readable(fd)co_await reactor.writable(fd)
3)最小 echo server
- accept 新连接
- 协程里读数据
- 协程里写回数据
- 连接关闭时回收
也就是说,最终代码形态会像这样:
co_await reactor.readable(fd);
ssize_t n = read(fd, buffer, sizeof(buffer));
co_await reactor.writable(fd);
write(fd, buffer, n);
这就是:
同步写法 + 异步执行
四、最终项目结构
这一篇我建议拆成下面这些文件:
.
├── Task.h
├── Reactor.h
├── Reactor.cpp
├── SocketUtil.h
├── SocketUtil.cpp
├── Server.h
├── Server.cpp
└── main.cpp
各文件职责
Task.h
定义最小协程返回类型 DetachedTask
Reactor.h / Reactor.cpp
定义 Reactor 和 awaiter:
readable(fd)writable(fd)waitReadable()waitWritable()loop()
SocketUtil.h / SocketUtil.cpp
负责:
setNonBlockingcreateListenFd
Server.h / Server.cpp
负责:
acceptLoophandleConnection
main.cpp
负责初始化和启动
五、代码总览之前,先讲清最核心的设计
1)为什么要有 DetachedTask?
C++20 协程本身只是语言原语,它不会自动帮你调度。
我们这里实现一个最小的协程返回类型:
DetachedTask
含义就是:
- 协程创建后立即开始执行
- 执行到
co_await时挂起 - Reactor 事件到来后恢复
- 执行结束后自动销毁
它非常适合这种:
顶层连接处理协程
2)为什么 Reactor 里要保存 coroutine handle?
因为当你写:
co_await reactor.readable(fd);
本质就是:
- 当前协程先挂起
- Reactor 把这个协程句柄记住
- 等
fd可读时,再把这个协程恢复
所以 Reactor 这一层不仅要管 epoll,还要管:
哪个 fd 对应哪个挂起的协程
3)为什么这是 Reactor + 协程,而不是纯协程?
因为协程自己不会知道:
fd 什么时候 ready
这件事只有内核 + epoll 知道。
所以必须是:
协程挂起
↓
Reactor 把 handle 挂到 fd 上
↓
epoll_wait 返回 ready fd
↓
Reactor 恢复对应协程
这就是现代异步系统的核心链路。
六、完整代码
1)Task.h
#ifndef TASK_H
#define TASK_H
#include <coroutine>
#include <exception>
#include <utility>
struct DetachedTask {
struct promise_type {
DetachedTask get_return_object() noexcept {
return {};
}
std::suspend_never initial_suspend() noexcept {
return {};
}
struct FinalAwaiter {
bool await_ready() noexcept { return false; }
template <typename Promise>
void await_suspend(std::coroutine_handle<Promise> h) noexcept {
h.destroy();
}
void await_resume() noexcept {}
};
FinalAwaiter final_suspend() noexcept {
return {};
}
void return_void() noexcept {}
void unhandled_exception() {
std::terminate();
}
};
};
#endif
2)Reactor.h
#ifndef REACTOR_H
#define REACTOR_H
#include <coroutine>
#include <cstdint>
#include <unordered_map>
class Reactor {
public:
Reactor();
~Reactor();
Reactor(const Reactor&) = delete;
Reactor& operator=(const Reactor&) = delete;
void waitReadable(int fd, std::coroutine_handle<> handle);
void waitWritable(int fd, std::coroutine_handle<> handle);
void removeFd(int fd);
void loop();
class ReadableAwaiter {
public:
ReadableAwaiter(Reactor& reactor, int fd)
: reactor_(reactor), fd_(fd) {}
bool await_ready() const noexcept {
return false;
}
void await_suspend(std::coroutine_handle<> handle) {
reactor_.waitReadable(fd_, handle);
}
void await_resume() const noexcept {}
private:
Reactor& reactor_;
int fd_;
};
class WritableAwaiter {
public:
WritableAwaiter(Reactor& reactor, int fd)
: reactor_(reactor), fd_(fd) {}
bool await_ready() const noexcept {
return false;
}
void await_suspend(std::coroutine_handle<> handle) {
reactor_.waitWritable(fd_, handle);
}
void await_resume() const noexcept {}
private:
Reactor& reactor_;
int fd_;
};
ReadableAwaiter readable(int fd) {
return ReadableAwaiter(*this, fd);
}
WritableAwaiter writable(int fd) {
return WritableAwaiter(*this, fd);
}
private:
struct Entry {
std::coroutine_handle<> readHandle{};
std::coroutine_handle<> writeHandle{};
uint32_t interests{0};
bool registered{false};
};
void updateInterest(int fd);
private:
int epollFd_;
std::unordered_map<int, Entry> entries_;
};
#endif
3)Reactor.cpp
#include "Reactor.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <cstdlib>
Reactor::Reactor() {
epollFd_ = epoll_create1(0);
if (epollFd_ == -1) {
std::cerr << "epoll_create1 failed" << std::endl;
std::exit(1);
}
}
Reactor::~Reactor() {
if (epollFd_ >= 0) {
close(epollFd_);
epollFd_ = -1;
}
}
void Reactor::waitReadable(int fd, std::coroutine_handle<> handle) {
Entry& entry = entries_[fd];
entry.readHandle = handle;
entry.interests |= EPOLLIN;
updateInterest(fd);
}
void Reactor::waitWritable(int fd, std::coroutine_handle<> handle) {
Entry& entry = entries_[fd];
entry.writeHandle = handle;
entry.interests |= EPOLLOUT;
updateInterest(fd);
}
void Reactor::removeFd(int fd) {
auto it = entries_.find(fd);
if (it == entries_.end()) {
return;
}
if (it->second.registered) {
epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr);
}
entries_.erase(it);
}
void Reactor::updateInterest(int fd) {
auto it = entries_.find(fd);
if (it == entries_.end()) {
return;
}
Entry& entry = it->second;
if (entry.interests == 0) {
if (entry.registered) {
epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr);
entry.registered = false;
}
return;
}
epoll_event ev{};
ev.events = entry.interests | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.fd = fd;
if (!entry.registered) {
if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev) == 0) {
entry.registered = true;
}
} else {
epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);
}
}
void Reactor::loop() {
constexpr int MAX_EVENTS = 64;
std::vector<epoll_event> events(MAX_EVENTS);
while (true) {
int n = epoll_wait(epollFd_, events.data(), MAX_EVENTS, -1);
if (n == -1) {
continue;
}
for (int i = 0; i < n; ++i) {
int fd = events[i].data.fd;
uint32_t ev = events[i].events;
auto it = entries_.find(fd);
if (it == entries_.end()) {
continue;
}
Entry& entry = it->second;
std::coroutine_handle<> readHandle{};
std::coroutine_handle<> writeHandle{};
if (ev & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
readHandle = entry.readHandle;
entry.readHandle = {};
entry.interests &= ~EPOLLIN;
}
if (ev & (EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
writeHandle = entry.writeHandle;
entry.writeHandle = {};
entry.interests &= ~EPOLLOUT;
}
updateInterest(fd);
if (readHandle) {
readHandle.resume();
}
if (writeHandle && (!readHandle || writeHandle.address() != readHandle.address())) {
writeHandle.resume();
}
}
}
}
4)SocketUtil.h
#ifndef SOCKET_UTIL_H
#define SOCKET_UTIL_H
int setNonBlocking(int fd);
int createListenFd(int port);
#endif
5)SocketUtil.cpp
#include "SocketUtil.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int createListenFd(int port) {
int serverFd = socket(AF_INET, SOCK_STREAM, 0);
if (serverFd == -1) {
return -1;
}
int opt = 1;
setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (setNonBlocking(serverFd) == -1) {
close(serverFd);
return -1;
}
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(serverFd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == -1) {
close(serverFd);
return -1;
}
if (listen(serverFd, 128) == -1) {
close(serverFd);
return -1;
}
return serverFd;
}
6)Server.h
#ifndef SERVER_H
#define SERVER_H
#include "Task.h"
class Reactor;
DetachedTask acceptLoop(int listenFd, Reactor& reactor);
DetachedTask handleConnection(int clientFd, Reactor& reactor);
#endif
7)Server.cpp
#include "Server.h"
#include "Reactor.h"
#include "SocketUtil.h"
#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
#include <iostream>
#include <string>
DetachedTask handleConnection(int clientFd, Reactor& reactor) {
std::string writeBuffer;
char buffer[1024];
while (true) {
co_await reactor.readable(clientFd);
while (true) {
ssize_t n = ::read(clientFd, buffer, sizeof(buffer));
if (n > 0) {
std::string msg(buffer, n);
std::cout << "[recv] fd=" << clientFd
<< " msg=" << msg << std::endl;
// demo: echo
writeBuffer.append(buffer, n);
}
else if (n == 0) {
std::cout << "[close] peer closed fd=" << clientFd << std::endl;
reactor.removeFd(clientFd);
::close(clientFd);
co_return;
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
std::cerr << "[error] read failed fd=" << clientFd << std::endl;
reactor.removeFd(clientFd);
::close(clientFd);
co_return;
}
}
while (!writeBuffer.empty()) {
co_await reactor.writable(clientFd);
ssize_t wn = ::write(clientFd, writeBuffer.data(), writeBuffer.size());
if (wn > 0) {
writeBuffer.erase(0, wn);
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
std::cerr << "[error] write failed fd=" << clientFd << std::endl;
reactor.removeFd(clientFd);
::close(clientFd);
co_return;
}
}
}
}
DetachedTask acceptLoop(int listenFd, Reactor& reactor) {
while (true) {
co_await reactor.readable(listenFd);
while (true) {
int clientFd = ::accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK);
if (clientFd >= 0) {
std::cout << "[accept] new client fd=" << clientFd << std::endl;
handleConnection(clientFd, reactor);
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
if (errno == EINTR) {
continue;
}
std::cerr << "[error] accept4 failed" << std::endl;
break;
}
}
}
}
8)main.cpp
#include "Reactor.h"
#include "SocketUtil.h"
#include "Server.h"
#include <iostream>
#include <unistd.h>
int main() {
int listenFd = createListenFd(8080);
if (listenFd == -1) {
std::cerr << "createListenFd failed" << std::endl;
return 1;
}
Reactor reactor;
// 启动 accept 协程
acceptLoop(listenFd, reactor);
std::cout << "server running on :8080" << std::endl;
reactor.loop();
close(listenFd);
return 0;
}
七、怎么编译运行?
因为用了 C++20 协程,所以编译命令建议这样:
g++ -std=c++20 -O2 Reactor.cpp SocketUtil.cpp Server.cpp main.cpp -o server
运行:
./server
测试:
nc 127.0.0.1 8080
输入:hello
你会看到服务端输出日志,并且客户端收到回显。
八、这套代码的核心控制流,一定要看懂
这套代码真正的运行方式是:
main()
↓
acceptLoop(listenFd, reactor) // 启动 accept 协程
↓
协程第一次执行到:
co_await reactor.readable(listenFd)
↓
挂起
↓
reactor.loop()
↓
epoll_wait()
↓
listenFd ready
↓
Reactor 恢复 acceptLoop 协程
↓
accept 新连接
↓
为每个 clientFd 启动 handleConnection 协程
↓
handleConnection 执行到:
co_await reactor.readable(clientFd)
↓
挂起
↓
epoll_wait()
↓
clientFd ready
↓
Reactor 恢复 handleConnection 协程
↓
read / write / close
这就是:
事件驱动 + 协程恢复
九、这一版到底比纯 Reactor handler 好在哪?
如果你不用协程,逻辑通常会散在:
onRead()onWrite()- 状态机切换
- 回调链
而现在你可以写成:
co_await reactor.readable(fd);
read(...);
co_await reactor.writable(fd);
write(...);
也就是说:
表面上像同步代码
底层实际上还是异步调度
这就是协程最值钱的地方。
十、这一篇你最该记住的 5 句话
epoll 负责“谁 ready 了”
Reactor 负责“恢复哪个协程”
协程负责“从挂起点继续执行”
协程不是替代 Reactor,而是运行在 Reactor 之上
Reactor + 协程 = 现代异步系统的核心组合
十一、这一版还不是工业级,差在哪?
必须实话实说:
这是一版最小打通模型的代码,还不是工业级。
还缺:
Connection对象化的协程版本- 写缓冲区更细致的管理
- 半包 / 粘包处理
- timer / timeout
- 多 Reactor + 协程
- 任务派发到业务线程池
- 智能指针管理协程相关对象生命周期
- 更安全的 close / remove 时机控制
但是,这一版已经把最关键的一层打通了:
epoll / Reactor / 协程 到底怎么协同工作
这比单纯会背概念强太多了。
十二、本篇一句话总结
前面的 Reactor 模型解决的是“事件来了怎么分发”,
而这一篇进一步解决的是“分发之后,代码怎么优雅地继续执行”。也就是说:
epoll 负责发现事件,Reactor 负责调度事件,协程负责优雅执行事件。从这一篇开始,我们已经不再只是写 socket 代码,而是在逐步搭建一个现代异步服务端运行时。
十三、完整项目结构(最终版)
.
├── Task.h
├── Reactor.h
├── Reactor.cpp
├── SocketUtil.h
├── SocketUtil.cpp
├── Server.h
├── Server.cpp
└── main.cpp
十四、下一步最顺的方向(核心升级)
到这一篇为止,我们已经分别完成了三件事情:
- 第一篇:Connection(结构抽象)
- 第二篇:多 Reactor(并发模型)
- 第三篇:单 Reactor + 协程(执行模型)
也就是说,到目前为止,这三篇分别解决了:
结构 ✔
并发 ✔
执行 ✔
但这里要特别强调一点:
第三篇的协程模型,仍然是单 Reactor 版本。
它的价值在于先把最关键的执行流问题讲清楚:
- 协程怎么挂起
- Reactor 怎么恢复协程
- epoll 怎么驱动协程继续执行
这一篇解决的是:
“Reactor + 协程”这套执行模型到底是怎么工作的
而不是最终的高并发架构形态。
下一步(主线核心):把并发模型和执行模型融合起来
真正的高并发服务端,不会停在:
单 Reactor + 协程
因为这仍然只能使用一个线程、一个事件循环。
所以本系列真正的主线下一步,就是:
第四篇要解决什么?
第四篇要做的事情非常明确:
把第二篇和第三篇真正合起来:
多 Reactor(第二篇)
+
协程(第三篇)
也就是把:
- 第二篇解决的“多线程并发架构”
- 第三篇解决的“协程执行模型”
融合成一套完整运行方式。
最终架构形态
第四篇的目标架构会变成:
Main Reactor(accept)
↓
Sub Reactor(线程1)+ 协程
Sub Reactor(线程2)+ 协程
Sub Reactor(线程3)+ 协程
...
也就是说:
- 主 Reactor 负责 accept
- 子 Reactor 负责事件循环
- 每个子 Reactor 内部通过协程处理连接逻辑
这才是:
真正接近现代高并发服务端的核心模型
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)