一、这一篇到底在做什么?

到目前为止,你已经分别完成了:

  • ✔ 第一篇:Connection(结构抽象)
  • ✔ 第二篇:多 Reactor(并发模型)
  • ✔ 第三篇:单 Reactor + 协程(执行模型)

但这三者是分开的能力

结构 ✔
并发 ✔
执行 ✔

👉 真正的高并发服务端,还差最后一步:

把并发模型(多 Reactor)和执行模型(协程)融合

本篇目标

构建这样一套模型:

Main Reactor(accept)
↓
Sub Reactor(线程) + 协程(执行)

一句话总结本篇:

多线程负责“并发”,协程负责“执行”

二、最终架构图(必须理解)

                 Main Reactor(主线程)
                        ↓ accept
        ┌──────────────┬──────────────┬──────────────┐
        ↓              ↓              ↓
   Sub Reactor1   Sub Reactor2   Sub Reactor3
     (线程1)         (线程2)         (线程3)
        ↓              ↓              ↓
    coroutine       coroutine      coroutine

三、和第三篇的本质区别


第三篇(单线程)

一个 Reactor
+ 多协程

👉 特点:

  • ✔ 无锁
  • ❌ 只能用一个 CPU

第四篇(本篇)

多 Reactor(多线程)
+ 每个 Reactor 内部用协程

👉 特点:

  • ✔ 多核利用
  • ✔ 无锁(连接不跨线程)
  • ✔ 高并发

四、核心设计原则(非常重要)

原则 1:连接只属于一个 Reactor

fd → 固定 Reactor

👉 不跨线程 → 不需要锁


原则 2:每个线程独立事件循环

线程1 → epoll1
线程2 → epoll2


原则 3:协程只在所属 Reactor 线程执行

👉 不跨线程恢复


原则 4:Main Reactor 只做 accept

👉 不处理业务

五、项目结构(最终版)

.
├── Task.h
├── Reactor.h / Reactor.cpp
├── ReactorThread.h / ReactorThread.cpp
├── ReactorThreadPool.h / ReactorThreadPool.cpp
├── SocketUtil.h / SocketUtil.cpp
├── Server.h / Server.cpp
└── main.cpp

六、核心代码实现

1️⃣ ReactorThread(线程 + Reactor)

ReactorThread.h

#ifndef REACTOR_THREAD_H
#define REACTOR_THREAD_H

#include "Reactor.h"
#include <thread>
#include <memory>

class ReactorThread {
public:
    ReactorThread();
    ~ReactorThread();

    Reactor* getReactor();
    void start();

private:
    std::unique_ptr<Reactor> reactor_;
    std::thread thread_;
};

#endif

ReactorThread.cpp

#include "ReactorThread.h"

ReactorThread::ReactorThread() {
    reactor_ = std::make_unique<Reactor>();
}

ReactorThread::~ReactorThread() {
    if (thread_.joinable()) {
        thread_.join();
    }
}

Reactor* ReactorThread::getReactor() {
    return reactor_.get();
}

void ReactorThread::start() {
    thread_ = std::thread([this]() {
        reactor_->loop();
    });
}

2️⃣ ReactorThreadPool

ReactorThreadPool.h

#ifndef REACTOR_THREAD_POOL_H
#define REACTOR_THREAD_POOL_H

#include "ReactorThread.h"
#include <vector>
#include <memory>

class ReactorThreadPool {
public:
    explicit ReactorThreadPool(int size);

    void start();

    Reactor* getNextReactor();

private:
    std::vector<std::unique_ptr<ReactorThread>> threads_;
    int index_;
};

#endif

ReactorThreadPool.cpp

#include "ReactorThreadPool.h"

ReactorThreadPool::ReactorThreadPool(int size)
    : index_(0) {
    for (int i = 0; i < size; ++i) {
        threads_.emplace_back(std::make_unique<ReactorThread>());
    }
}

void ReactorThreadPool::start() {
    for (auto& t : threads_) {
        t->start();
    }
}

Reactor* ReactorThreadPool::getNextReactor() {
    Reactor* reactor = threads_[index_]->getReactor();
    index_ = (index_ + 1) % threads_.size();
    return reactor;
}

3️⃣ Server(协程版本)

Server.h

#ifndef SERVER_H
#define SERVER_H

#include "Task.h"

class Reactor;

DetachedTask handleConnection(int fd, Reactor& reactor);

#endif

Server.cpp

#include "Server.h"
#include "Reactor.h"

#include <unistd.h>
#include <errno.h>
#include <iostream>
#include <string>

DetachedTask handleConnection(int fd, Reactor& reactor) {
    std::string writeBuffer;
    char buffer[1024];

    while (true) {
        co_await reactor.readable(fd);

        while (true) {
            ssize_t n = read(fd, buffer, sizeof(buffer));

            if (n > 0) {
                writeBuffer.append(buffer, n);
            }
            else if (n == 0) {
                reactor.removeFd(fd);
                close(fd);
                co_return;
            }
            else {
                if (errno == EAGAIN) break;
                reactor.removeFd(fd);
                close(fd);
                co_return;
            }
        }

        while (!writeBuffer.empty()) {
            co_await reactor.writable(fd);

            ssize_t wn = write(fd, writeBuffer.data(), writeBuffer.size());
            if (wn > 0) {
                writeBuffer.erase(0, wn);
            }
            else if (errno != EAGAIN) {
                reactor.removeFd(fd);
                close(fd);
                co_return;
            }
        }
    }
}

4️⃣ 主线程 accept 分发

main.cpp

#include "Reactor.h"
#include "ReactorThreadPool.h"
#include "SocketUtil.h"
#include "Server.h"

#include <sys/socket.h>
#include <iostream>

int main() {
    int listenFd = createListenFd(8080);

    ReactorThreadPool pool(4);
    pool.start();

    Reactor mainReactor;

    std::cout << "server running :8080" << std::endl;

    while (true) {
        int clientFd = accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK);

        if (clientFd >= 0) {
            Reactor* subReactor = pool.getNextReactor();

            handleConnection(clientFd, *subReactor);
        }
    }

    return 0;
}

七、最核心执行流程(必须理解)

客户端连接
    ↓
Main Reactor accept
    ↓
选择 Sub Reactor
    ↓
启动协程 handleConnection
    ↓
co_await readable → 挂起
    ↓
epoll_wait
    ↓
fd ready
    ↓
Reactor 恢复协程
    ↓
继续执行 read/write

八、这一篇真正的价值

你现在掌握的是:

epoll(事件)
Reactor(调度)
多线程(并发)
协程(执行)

👉 组合起来就是:

现代高并发服务端模型

九、总结

如果说前三篇是在分别建立结构、并发和执行模型,那么本篇完成的,是这三者的真正融合。

在这个模型中:

  • 多 Reactor 提供并发能力
  • Reactor 提供事件调度能力
  • 协程提供执行模型

三者结合,构成了现代高性能服务端的核心运行方式。

十、下一篇(第五篇)

👉 《Connection + 协程:面向对象的异步模型》

👉 Connection + 协程(工程级写法)

你会把:Connection + coroutine

彻底融合成:

class Connection {
coroutine run();
};

单 Reactor + 协程,是“执行模型”
多 Reactor + 协程,才是“架构模型”

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐