一、为什么需要多 Reactor?

在第一篇中,我们已经有了:

Reactor(单线程)
↓
Acceptor(accept)
↓
Connection(处理读写)

结构已经正确了,但还有一个关键问题:

问题:只能用一个 CPU 核心

单线程 Reactor

一个线程处理所有连接

结果:

  • CPU 利用率低 ❌
  • 多核机器浪费 ❌
  • handler 一旦稍重就拖慢整体 ❌

所以必须升级

从“单 Reactor” → “多 Reactor + 多线程”

二、多 Reactor 模型是什么?

核心思想

主线程:只负责 accept
↓
把连接分发给多个 Reactor 线程
↓
每个线程独立 epoll + 处理连接

架构图(必须记住)

Main Reactor(主线程)
↓ accept
┌────────┬────────┬────────┐
↓ ↓ ↓ ↓
Sub Reactor1 Sub Reactor2 Sub Reactor3 ...
(线程1) (线程2) (线程3)

每个 Sub Reactor:
epoll + Connection

三、这一模型解决什么问题?

1️⃣ 多核利用

每个 Reactor 一个线程:

CPU 核心1 → Reactor1
CPU 核心2 → Reactor2
CPU 核心3 → Reactor3

2️⃣ 避免锁竞争

👉 一个连接只属于一个 Reactor:

fd → 固定 Reactor

所以:

  • 不需要跨线程锁 ✔
  • 不需要共享 Connection ✔

3️⃣ 高并发核心模型(工业级)

👉 Nginx / Netty 都是类似思路:

accept 分发 + 多事件循环

四、这一篇要做什么?

我们要把第一篇的结构升级成:

Reactor(主线程)
↓
Acceptor
↓
分发连接
↓
Sub Reactor(多个线程)
↓
Connection

五、工程结构(升级版)

├── EventHandler.h
├── Reactor.h / Reactor.cpp
├── Connection.h / Connection.cpp
├── Acceptor.h / Acceptor.cpp
├── ReactorThread.h / ReactorThread.cpp
├── ReactorThreadPool.h / ReactorThreadPool.cpp
├── main.cpp

六、新增组件讲解

1️⃣ ReactorThread(一个线程 + 一个 Reactor)


ReactorThread.h

#ifndef REACTOR_THREAD_H
#define REACTOR_THREAD_H

#include "Reactor.h"
#include <thread>
#include <memory>

class ReactorThread {
public:
ReactorThread();
~ReactorThread();

Reactor* getReactor();

void start();

private:
std::unique_ptr<Reactor> reactor_;
std::thread thread_;
};

#endif

ReactorThread.cpp

#include "ReactorThread.h"

ReactorThread::ReactorThread() {
reactor_ = std::make_unique<Reactor>();
}

ReactorThread::~ReactorThread() {
if (thread_.joinable()) {
thread_.join();
}
}

Reactor* ReactorThread::getReactor() {
return reactor_.get();
}

void ReactorThread::start() {
thread_ = std::thread([this]() {
reactor_->loop();
});
}

这一层干嘛?

👉 封装:

线程 + Reactor

👉 每个线程干一件事:

运行一个 epoll 事件循环

2️⃣ ReactorThreadPool(线程池)


ReactorThreadPool.h

#ifndef REACTOR_THREAD_POOL_H
#define REACTOR_THREAD_POOL_H

#include "ReactorThread.h"
#include <vector>
#include <memory>

class ReactorThreadPool {
public:
explicit ReactorThreadPool(int size);

void start();

Reactor* getNextReactor();

private:
std::vector<std::unique_ptr<ReactorThread>> threads_;
int index_;
};

#endif

ReactorThreadPool.cpp

#include "ReactorThreadPool.h"

ReactorThreadPool::ReactorThreadPool(int size)
: index_(0) {
for (int i = 0; i < size; ++i) {
threads_.emplace_back(std::make_unique<ReactorThread>());
}
}

void ReactorThreadPool::start() {
for (auto& t : threads_) {
t->start();
}
}

Reactor* ReactorThreadPool::getNextReactor() {
Reactor* reactor = threads_[index_]->getReactor();
index_ = (index_ + 1) % threads_.size();
return reactor;
}

这一层干嘛?

👉 做一件核心事情:

负载均衡分发连接

分发策略(当前)

轮询(Round Robin)

👉 每次 accept:

client_fd → 下一个 Reactor

3️⃣ 修改 Acceptor(关键)


新版 Acceptor.h

#ifndef ACCEPTOR_H
#define ACCEPTOR_H

#include "EventHandler.h"

class Reactor;
class ReactorThreadPool;

class Acceptor : public EventHandler {
public:
Acceptor(int listenFd, Reactor* mainReactor, ReactorThreadPool* pool);
~Acceptor() override;

int fd() const override;

void onRead() override;
void onWrite() override;
void onClose() override;

private:
int listenFd_;
Reactor* mainReactor_;
ReactorThreadPool* threadPool_;
};

#endif

新版 Acceptor.cpp

#include "Acceptor.h"
#include "Reactor.h"
#include "Connection.h"
#include "ReactorThreadPool.h"

#include <unistd.h>
#include <iostream>
#include <fcntl.h>
#include <errno.h>

namespace {
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}

Acceptor::Acceptor(int listenFd, Reactor* mainReactor, ReactorThreadPool* pool)
: listenFd_(listenFd),
mainReactor_(mainReactor),
threadPool_(pool) {}

Acceptor::~Acceptor() {
if (listenFd_ >= 0) {
close(listenFd_);
}
}

int Acceptor::fd() const {
return listenFd_;
}

void Acceptor::onRead() {
while (true) {
int clientFd = accept(listenFd_, nullptr, nullptr);
if (clientFd == -1) {
if (errno == EAGAIN) break;
break;
}

setNonBlocking(clientFd);

// 🎯 核心:选择一个子 Reactor
Reactor* subReactor = threadPool_->getNextReactor();

Connection* conn = new Connection(clientFd, subReactor);

subReactor->addHandler(conn, EPOLLIN);

std::cout << "[Acceptor] new connection fd=" << clientFd << std::endl;
}
}

void Acceptor::onWrite() {}
void Acceptor::onClose() {}

七、main.cpp(最终结构)

#include "Reactor.h"
#include "Acceptor.h"
#include "ReactorThreadPool.h"

#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>

#include <iostream>

int setNonBlocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int serverFd = socket(AF_INET, SOCK_STREAM, 0);

    int opt = 1;
    setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    setNonBlocking(serverFd);

    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8080);
    addr.sin_addr.s_addr = INADDR_ANY;

    bind(serverFd, (sockaddr*)&addr, sizeof(addr));
    listen(serverFd, 128);

    // 主 Reactor(只 accept)
    Reactor mainReactor;

    // 子 Reactor 线程池
    ReactorThreadPool pool(4);
    pool.start();

    // Acceptor 注册到主 Reactor
    Acceptor* acceptor = new Acceptor(serverFd, &mainReactor, &pool);
    mainReactor.addHandler(acceptor, EPOLLIN);

    std::cout << "server running on :8080" << std::endl;

    mainReactor.loop();

    return 0;
}

八、这一版最关键变化

从:

一个 Reactor

到:

Main Reactor(accept)

Sub Reactor(处理连接)


关键一行(必须记住)

Reactor* subReactor = threadPool_->getNextReactor();

👉 这就是:

连接分发

九、最终理解(核心)


epoll + Reactor(单线程)

一个线程处理所有 fd


多 Reactor 模型

主线程:接连接
子线程:处理连接

十、这一篇本质提升

第一篇

✔ 建立 Connection
✔ 建立服务骨架

第二篇

✔ 多核并发
✔ 真正高并发架构

总结

单 Reactor 是模型
多 Reactor 才是架构

下一篇(第三篇)

👉 《C++ 服务端进阶(三)—— Reactor + 协程:现代异步模型》

你会进入:

  • 回调 → 协程
  • epoll → coroutine
  • 写出类似 Go / Netty 的模型

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐