C++ 服务端进阶(四)—— 多 Reactor + 协程:真正的高并发模型(融合版)
·
一、这一篇到底在做什么?
到目前为止,你已经分别完成了:
- ✔ 第一篇:Connection(结构抽象)
- ✔ 第二篇:多 Reactor(并发模型)
- ✔ 第三篇:单 Reactor + 协程(执行模型)
但这三者是分开的能力:
结构 ✔
并发 ✔
执行 ✔
👉 真正的高并发服务端,还差最后一步:
把并发模型(多 Reactor)和执行模型(协程)融合
本篇目标
构建这样一套模型:
Main Reactor(accept)
↓
Sub Reactor(线程) + 协程(执行)
一句话总结本篇:
多线程负责“并发”,协程负责“执行”
二、最终架构图(必须理解)
Main Reactor(主线程)
↓ accept
┌──────────────┬──────────────┬──────────────┐
↓ ↓ ↓
Sub Reactor1 Sub Reactor2 Sub Reactor3
(线程1) (线程2) (线程3)
↓ ↓ ↓
coroutine coroutine coroutine
三、和第三篇的本质区别
第三篇(单线程)
一个 Reactor
+ 多协程
👉 特点:
- ✔ 无锁
- ❌ 只能用一个 CPU
第四篇(本篇)
多 Reactor(多线程)
+ 每个 Reactor 内部用协程
👉 特点:
- ✔ 多核利用
- ✔ 无锁(连接不跨线程)
- ✔ 高并发
四、核心设计原则(非常重要)
原则 1:连接只属于一个 Reactor
fd → 固定 Reactor
👉 不跨线程 → 不需要锁
原则 2:每个线程独立事件循环
线程1 → epoll1
线程2 → epoll2
原则 3:协程只在所属 Reactor 线程执行
👉 不跨线程恢复
原则 4:Main Reactor 只做 accept
👉 不处理业务
五、项目结构(最终版)
.
├── Task.h
├── Reactor.h / Reactor.cpp
├── ReactorThread.h / ReactorThread.cpp
├── ReactorThreadPool.h / ReactorThreadPool.cpp
├── SocketUtil.h / SocketUtil.cpp
├── Server.h / Server.cpp
└── main.cpp
六、核心代码实现
1️⃣ ReactorThread(线程 + Reactor)
ReactorThread.h
#ifndef REACTOR_THREAD_H
#define REACTOR_THREAD_H
#include "Reactor.h"
#include <thread>
#include <memory>
class ReactorThread {
public:
ReactorThread();
~ReactorThread();
Reactor* getReactor();
void start();
private:
std::unique_ptr<Reactor> reactor_;
std::thread thread_;
};
#endif
ReactorThread.cpp
#include "ReactorThread.h"
ReactorThread::ReactorThread() {
reactor_ = std::make_unique<Reactor>();
}
ReactorThread::~ReactorThread() {
if (thread_.joinable()) {
thread_.join();
}
}
Reactor* ReactorThread::getReactor() {
return reactor_.get();
}
void ReactorThread::start() {
thread_ = std::thread([this]() {
reactor_->loop();
});
}
2️⃣ ReactorThreadPool
ReactorThreadPool.h
#ifndef REACTOR_THREAD_POOL_H
#define REACTOR_THREAD_POOL_H
#include "ReactorThread.h"
#include <vector>
#include <memory>
class ReactorThreadPool {
public:
explicit ReactorThreadPool(int size);
void start();
Reactor* getNextReactor();
private:
std::vector<std::unique_ptr<ReactorThread>> threads_;
int index_;
};
#endif
ReactorThreadPool.cpp
#include "ReactorThreadPool.h"
ReactorThreadPool::ReactorThreadPool(int size)
: index_(0) {
for (int i = 0; i < size; ++i) {
threads_.emplace_back(std::make_unique<ReactorThread>());
}
}
void ReactorThreadPool::start() {
for (auto& t : threads_) {
t->start();
}
}
Reactor* ReactorThreadPool::getNextReactor() {
Reactor* reactor = threads_[index_]->getReactor();
index_ = (index_ + 1) % threads_.size();
return reactor;
}
3️⃣ Server(协程版本)
Server.h
#ifndef SERVER_H
#define SERVER_H
#include "Task.h"
class Reactor;
DetachedTask handleConnection(int fd, Reactor& reactor);
#endif
Server.cpp
#include "Server.h"
#include "Reactor.h"
#include <unistd.h>
#include <errno.h>
#include <iostream>
#include <string>
DetachedTask handleConnection(int fd, Reactor& reactor) {
std::string writeBuffer;
char buffer[1024];
while (true) {
co_await reactor.readable(fd);
while (true) {
ssize_t n = read(fd, buffer, sizeof(buffer));
if (n > 0) {
writeBuffer.append(buffer, n);
}
else if (n == 0) {
reactor.removeFd(fd);
close(fd);
co_return;
}
else {
if (errno == EAGAIN) break;
reactor.removeFd(fd);
close(fd);
co_return;
}
}
while (!writeBuffer.empty()) {
co_await reactor.writable(fd);
ssize_t wn = write(fd, writeBuffer.data(), writeBuffer.size());
if (wn > 0) {
writeBuffer.erase(0, wn);
}
else if (errno != EAGAIN) {
reactor.removeFd(fd);
close(fd);
co_return;
}
}
}
}
4️⃣ 主线程 accept 分发
main.cpp
#include "Reactor.h"
#include "ReactorThreadPool.h"
#include "SocketUtil.h"
#include "Server.h"
#include <sys/socket.h>
#include <iostream>
int main() {
int listenFd = createListenFd(8080);
ReactorThreadPool pool(4);
pool.start();
Reactor mainReactor;
std::cout << "server running :8080" << std::endl;
while (true) {
int clientFd = accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK);
if (clientFd >= 0) {
Reactor* subReactor = pool.getNextReactor();
handleConnection(clientFd, *subReactor);
}
}
return 0;
}
七、最核心执行流程(必须理解)
客户端连接
↓
Main Reactor accept
↓
选择 Sub Reactor
↓
启动协程 handleConnection
↓
co_await readable → 挂起
↓
epoll_wait
↓
fd ready
↓
Reactor 恢复协程
↓
继续执行 read/write
八、这一篇真正的价值
你现在掌握的是:
epoll(事件)
Reactor(调度)
多线程(并发)
协程(执行)
👉 组合起来就是:
现代高并发服务端模型
九、总结
如果说前三篇是在分别建立结构、并发和执行模型,那么本篇完成的,是这三者的真正融合。
在这个模型中:
- 多 Reactor 提供并发能力
- Reactor 提供事件调度能力
- 协程提供执行模型
三者结合,构成了现代高性能服务端的核心运行方式。
十、下一篇(第五篇)
👉 Connection + 协程(工程级写法)
你会把:Connection + coroutine
彻底融合成:
class Connection {
coroutine run();
};
单 Reactor + 协程,是“执行模型”
多 Reactor + 协程,才是“架构模型”
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)