C++ 服务端进阶(二)—— 多 Reactor + 线程模型:高并发服务端架构(工程版)
一、为什么需要多 Reactor?
在第一篇中,我们已经有了:
Reactor(单线程)
↓
Acceptor(accept)
↓
Connection(处理读写)
结构已经正确了,但还有一个关键问题:
问题:只能用一个 CPU 核心
单线程 Reactor
↓
一个线程处理所有连接
结果:
- CPU 利用率低 ❌
- 多核机器浪费 ❌
- handler 一旦稍重就拖慢整体 ❌
所以必须升级
从“单 Reactor” → “多 Reactor + 多线程”
二、多 Reactor 模型是什么?
核心思想
主线程:只负责 accept
↓
把连接分发给多个 Reactor 线程
↓
每个线程独立 epoll + 处理连接
架构图(必须记住)
Main Reactor(主线程)
↓ accept
┌────────┬────────┬────────┐
↓ ↓ ↓ ↓
Sub Reactor1 Sub Reactor2 Sub Reactor3 ...
(线程1) (线程2) (线程3)
每个 Sub Reactor:
epoll + Connection
三、这一模型解决什么问题?
1️⃣ 多核利用
每个 Reactor 一个线程:
CPU 核心1 → Reactor1
CPU 核心2 → Reactor2
CPU 核心3 → Reactor3
2️⃣ 避免锁竞争
👉 一个连接只属于一个 Reactor:
fd → 固定 Reactor
所以:
- 不需要跨线程锁 ✔
- 不需要共享 Connection ✔
3️⃣ 高并发核心模型(工业级)
👉 Nginx / Netty 都是类似思路:
accept 分发 + 多事件循环
四、这一篇要做什么?
我们要把第一篇的结构升级成:
Reactor(主线程)
↓
Acceptor
↓
分发连接
↓
Sub Reactor(多个线程)
↓
Connection
五、工程结构(升级版)
├── EventHandler.h
├── Reactor.h / Reactor.cpp
├── Connection.h / Connection.cpp
├── Acceptor.h / Acceptor.cpp
├── ReactorThread.h / ReactorThread.cpp
├── ReactorThreadPool.h / ReactorThreadPool.cpp
├── main.cpp
六、新增组件讲解
1️⃣ ReactorThread(一个线程 + 一个 Reactor)
ReactorThread.h
#ifndef REACTOR_THREAD_H
#define REACTOR_THREAD_H
#include "Reactor.h"
#include <thread>
#include <memory>
class ReactorThread {
public:
ReactorThread();
~ReactorThread();
Reactor* getReactor();
void start();
private:
std::unique_ptr<Reactor> reactor_;
std::thread thread_;
};
#endif
ReactorThread.cpp
#include "ReactorThread.h"
ReactorThread::ReactorThread() {
reactor_ = std::make_unique<Reactor>();
}
ReactorThread::~ReactorThread() {
if (thread_.joinable()) {
thread_.join();
}
}
Reactor* ReactorThread::getReactor() {
return reactor_.get();
}
void ReactorThread::start() {
thread_ = std::thread([this]() {
reactor_->loop();
});
}
这一层干嘛?
👉 封装:
线程 + Reactor
👉 每个线程干一件事:
运行一个 epoll 事件循环
2️⃣ ReactorThreadPool(线程池)
ReactorThreadPool.h
#ifndef REACTOR_THREAD_POOL_H
#define REACTOR_THREAD_POOL_H
#include "ReactorThread.h"
#include <vector>
#include <memory>
class ReactorThreadPool {
public:
explicit ReactorThreadPool(int size);
void start();
Reactor* getNextReactor();
private:
std::vector<std::unique_ptr<ReactorThread>> threads_;
int index_;
};
#endif
ReactorThreadPool.cpp
#include "ReactorThreadPool.h"
ReactorThreadPool::ReactorThreadPool(int size)
: index_(0) {
for (int i = 0; i < size; ++i) {
threads_.emplace_back(std::make_unique<ReactorThread>());
}
}
void ReactorThreadPool::start() {
for (auto& t : threads_) {
t->start();
}
}
Reactor* ReactorThreadPool::getNextReactor() {
Reactor* reactor = threads_[index_]->getReactor();
index_ = (index_ + 1) % threads_.size();
return reactor;
}
这一层干嘛?
👉 做一件核心事情:
负载均衡分发连接
分发策略(当前)
轮询(Round Robin)
👉 每次 accept:
client_fd → 下一个 Reactor
3️⃣ 修改 Acceptor(关键)
新版 Acceptor.h
#ifndef ACCEPTOR_H
#define ACCEPTOR_H
#include "EventHandler.h"
class Reactor;
class ReactorThreadPool;
class Acceptor : public EventHandler {
public:
Acceptor(int listenFd, Reactor* mainReactor, ReactorThreadPool* pool);
~Acceptor() override;
int fd() const override;
void onRead() override;
void onWrite() override;
void onClose() override;
private:
int listenFd_;
Reactor* mainReactor_;
ReactorThreadPool* threadPool_;
};
#endif
新版 Acceptor.cpp
#include "Acceptor.h"
#include "Reactor.h"
#include "Connection.h"
#include "ReactorThreadPool.h"
#include <unistd.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>
namespace {
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}
Acceptor::Acceptor(int listenFd, Reactor* mainReactor, ReactorThreadPool* pool)
: listenFd_(listenFd),
mainReactor_(mainReactor),
threadPool_(pool) {}
Acceptor::~Acceptor() {
if (listenFd_ >= 0) {
close(listenFd_);
}
}
int Acceptor::fd() const {
return listenFd_;
}
void Acceptor::onRead() {
while (true) {
int clientFd = accept(listenFd_, nullptr, nullptr);
if (clientFd == -1) {
if (errno == EAGAIN) break;
break;
}
setNonBlocking(clientFd);
// 🎯 核心:选择一个子 Reactor
Reactor* subReactor = threadPool_->getNextReactor();
Connection* conn = new Connection(clientFd, subReactor);
subReactor->addHandler(conn, EPOLLIN);
std::cout << "[Acceptor] new connection fd=" << clientFd << std::endl;
}
}
void Acceptor::onWrite() {}
void Acceptor::onClose() {}
七、main.cpp(最终结构)
#include "Reactor.h"
#include "Acceptor.h"
#include "ReactorThreadPool.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <iostream>
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
int main() {
int serverFd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
setNonBlocking(serverFd);
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(8080);
addr.sin_addr.s_addr = INADDR_ANY;
bind(serverFd, (sockaddr*)&addr, sizeof(addr));
listen(serverFd, 128);
// 主 Reactor(只 accept)
Reactor mainReactor;
// 子 Reactor 线程池
ReactorThreadPool pool(4);
pool.start();
// Acceptor 注册到主 Reactor
Acceptor* acceptor = new Acceptor(serverFd, &mainReactor, &pool);
mainReactor.addHandler(acceptor, EPOLLIN);
std::cout << "server running on :8080" << std::endl;
mainReactor.loop();
return 0;
}
八、这一版最关键变化
从:
一个 Reactor
到:
Main Reactor(accept)
↓
Sub Reactor(处理连接)
关键一行(必须记住)
Reactor* subReactor = threadPool_->getNextReactor();
👉 这就是:
连接分发
九、最终理解(核心)
epoll + Reactor(单线程)
一个线程处理所有 fd
多 Reactor 模型
主线程:接连接
子线程:处理连接
十、这一篇本质提升
第一篇
✔ 建立 Connection
✔ 建立服务骨架
第二篇
✔ 多核并发
✔ 真正高并发架构
总结
单 Reactor 是模型
多 Reactor 才是架构
下一篇(第三篇)
👉 《C++ 服务端进阶(三)—— Reactor + 协程:现代异步模型》
你会进入:
- 回调 → 协程
- epoll → coroutine
- 写出类似 Go / Netty 的模型
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)