引言

先校准一下预期

这一篇如果真的要做 “C++20 原生协程 + epoll + Reactor”,工业级会非常复杂,因为 C++ 原生协程只给你:

  • co_await
  • co_return
  • suspend / resume 机制

不给你 runtime,也就是说:

你得自己把 epoll、事件注册、协程恢复、任务生命周期 全部串起来。

所以这一篇做的是:

最小可理解、可运行、能体现本质的版本

目标不是上来就对标 Boost.Asio / libuv,而是真正打通:

epoll(事件)

Reactor(调度)

Coroutine(挂起 / 恢复)

正文

一、为什么要从 Reactor 再走到协程?

在前两篇里,我们已经完成了:

  • 第一篇:Connection 抽象,把连接对象化
  • 第二篇:多 Reactor + 线程模型,把架构做出来

这时候你其实已经能写出一个“像样的服务端”了。

但是还有一个经典问题:

代码写起来还是不够优雅

比如在 Reactor 模型里,常见写法会变成:

onRead()
↓
解析状态
↓
决定下次等读还是等写
↓
回调继续

一旦逻辑稍微复杂一点,就会出现:

  • 状态变量越来越多
  • 读写流程被拆碎
  • 回调 / handler 里到处是分支
  • 同步思维变成“事件状态机思维”,代码不直观

所以才需要协程。

一旦逻辑稍微复杂一点,就会出现:

  • 状态变量越来越多
  • 读写流程被拆碎
  • 回调 / handler 里到处是分支
  • 同步思维变成“事件状态机思维”,代码不直观

所以才需要协程。

二、协程到底解决什么问题?

协程解决的不是:

有没有事件

这个是 epoll 干的。

协程解决的是:

代码执行到一半,怎么暂停;事件到了之后,怎么从原地继续执行

换句话说:

Reactor 负责:

  • 什么时候可以读
  • 什么时候可以写

协程负责:

  • 现在先停一下
  • 等可读/可写了再继续往下跑

所以这三层关系一定要记住:

epoll:谁 ready 了
Reactor:把 ready 事件分发出去
协程:让处理逻辑可以挂起再恢复

三、这一篇要做什么?

这一篇我们做一个最小可运行版本,目标不是工业级,而是彻底打通模型。

我们要实现:

1)单线程 Reactor

  • epoll_wait() 统一监听事件

2)C++20 协程

  • co_await reactor.readable(fd)
  • co_await reactor.writable(fd)

3)最小 echo server

  • accept 新连接
  • 协程里读数据
  • 协程里写回数据
  • 连接关闭时回收

也就是说,最终代码形态会像这样:

co_await reactor.readable(fd);
ssize_t n = read(fd, buffer, sizeof(buffer));

co_await reactor.writable(fd);
write(fd, buffer, n);

这就是:

同步写法 + 异步执行

四、最终项目结构

这一篇我建议拆成下面这些文件:

.
├── Task.h
├── Reactor.h
├── Reactor.cpp
├── SocketUtil.h
├── SocketUtil.cpp
├── Server.h
├── Server.cpp
└── main.cpp

各文件职责

Task.h

定义最小协程返回类型 DetachedTask

Reactor.h / Reactor.cpp

定义 Reactor 和 awaiter:

  • readable(fd)
  • writable(fd)
  • waitReadable()
  • waitWritable()
  • loop()

SocketUtil.h / SocketUtil.cpp

负责:

  • setNonBlocking
  • createListenFd

Server.h / Server.cpp

负责:

  • acceptLoop
  • handleConnection

main.cpp

负责初始化和启动

五、代码总览之前,先讲清最核心的设计


1)为什么要有 DetachedTask

C++20 协程本身只是语言原语,它不会自动帮你调度。

我们这里实现一个最小的协程返回类型:

DetachedTask

含义就是:

  • 协程创建后立即开始执行
  • 执行到 co_await 时挂起
  • Reactor 事件到来后恢复
  • 执行结束后自动销毁

它非常适合这种:

顶层连接处理协程


2)为什么 Reactor 里要保存 coroutine handle?

因为当你写:

co_await reactor.readable(fd);

本质就是:

  • 当前协程先挂起
  • Reactor 把这个协程句柄记住
  • 等 fd 可读时,再把这个协程恢复

所以 Reactor 这一层不仅要管 epoll,还要管:

哪个 fd 对应哪个挂起的协程


3)为什么这是 Reactor + 协程,而不是纯协程?

因为协程自己不会知道:

fd 什么时候 ready

这件事只有内核 + epoll 知道。

所以必须是:

协程挂起

Reactor 把 handle 挂到 fd 上

epoll_wait 返回 ready fd

Reactor 恢复对应协程

这就是现代异步系统的核心链路。

六、完整代码

1)Task.h

#ifndef TASK_H
#define TASK_H

#include <coroutine>
#include <exception>
#include <utility>

struct DetachedTask {
    struct promise_type {
        DetachedTask get_return_object() noexcept {
            return {};
        }

        std::suspend_never initial_suspend() noexcept {
            return {};
        }

        struct FinalAwaiter {
            bool await_ready() noexcept { return false; }

            template <typename Promise>
            void await_suspend(std::coroutine_handle<Promise> h) noexcept {
                h.destroy();
            }

            void await_resume() noexcept {}
        };

        FinalAwaiter final_suspend() noexcept {
            return {};
        }

        void return_void() noexcept {}

        void unhandled_exception() {
            std::terminate();
        }
    };
};

#endif

2)Reactor.h

#ifndef REACTOR_H
#define REACTOR_H

#include <coroutine>
#include <cstdint>
#include <unordered_map>

class Reactor {
public:
    Reactor();
    ~Reactor();

    Reactor(const Reactor&) = delete;
    Reactor& operator=(const Reactor&) = delete;

    void waitReadable(int fd, std::coroutine_handle<> handle);
    void waitWritable(int fd, std::coroutine_handle<> handle);
    void removeFd(int fd);

    void loop();

    class ReadableAwaiter {
    public:
        ReadableAwaiter(Reactor& reactor, int fd)
            : reactor_(reactor), fd_(fd) {}

        bool await_ready() const noexcept {
            return false;
        }

        void await_suspend(std::coroutine_handle<> handle) {
            reactor_.waitReadable(fd_, handle);
        }

        void await_resume() const noexcept {}

    private:
        Reactor& reactor_;
        int fd_;
    };

    class WritableAwaiter {
    public:
        WritableAwaiter(Reactor& reactor, int fd)
            : reactor_(reactor), fd_(fd) {}

        bool await_ready() const noexcept {
            return false;
        }

        void await_suspend(std::coroutine_handle<> handle) {
            reactor_.waitWritable(fd_, handle);
        }

        void await_resume() const noexcept {}

    private:
        Reactor& reactor_;
        int fd_;
    };

    ReadableAwaiter readable(int fd) {
        return ReadableAwaiter(*this, fd);
    }

    WritableAwaiter writable(int fd) {
        return WritableAwaiter(*this, fd);
    }

private:
    struct Entry {
        std::coroutine_handle<> readHandle{};
        std::coroutine_handle<> writeHandle{};
        uint32_t interests{0};
        bool registered{false};
    };

    void updateInterest(int fd);

private:
    int epollFd_;
    std::unordered_map<int, Entry> entries_;
};

#endif

3)Reactor.cpp

#include "Reactor.h"

#include <sys/epoll.h>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <cstdlib>

Reactor::Reactor() {
    epollFd_ = epoll_create1(0);
    if (epollFd_ == -1) {
        std::cerr << "epoll_create1 failed" << std::endl;
        std::exit(1);
    }
}

Reactor::~Reactor() {
    if (epollFd_ >= 0) {
        close(epollFd_);
        epollFd_ = -1;
    }
}

void Reactor::waitReadable(int fd, std::coroutine_handle<> handle) {
    Entry& entry = entries_[fd];
    entry.readHandle = handle;
    entry.interests |= EPOLLIN;
    updateInterest(fd);
}

void Reactor::waitWritable(int fd, std::coroutine_handle<> handle) {
    Entry& entry = entries_[fd];
    entry.writeHandle = handle;
    entry.interests |= EPOLLOUT;
    updateInterest(fd);
}

void Reactor::removeFd(int fd) {
    auto it = entries_.find(fd);
    if (it == entries_.end()) {
        return;
    }

    if (it->second.registered) {
        epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr);
    }

    entries_.erase(it);
}

void Reactor::updateInterest(int fd) {
    auto it = entries_.find(fd);
    if (it == entries_.end()) {
        return;
    }

    Entry& entry = it->second;

    if (entry.interests == 0) {
        if (entry.registered) {
            epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr);
            entry.registered = false;
        }
        return;
    }

    epoll_event ev{};
    ev.events = entry.interests | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
    ev.data.fd = fd;

    if (!entry.registered) {
        if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev) == 0) {
            entry.registered = true;
        }
    } else {
        epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);
    }
}

void Reactor::loop() {
    constexpr int MAX_EVENTS = 64;
    std::vector<epoll_event> events(MAX_EVENTS);

    while (true) {
        int n = epoll_wait(epollFd_, events.data(), MAX_EVENTS, -1);
        if (n == -1) {
            continue;
        }

        for (int i = 0; i < n; ++i) {
            int fd = events[i].data.fd;
            uint32_t ev = events[i].events;

            auto it = entries_.find(fd);
            if (it == entries_.end()) {
                continue;
            }

            Entry& entry = it->second;

            std::coroutine_handle<> readHandle{};
            std::coroutine_handle<> writeHandle{};

            if (ev & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
                readHandle = entry.readHandle;
                entry.readHandle = {};
                entry.interests &= ~EPOLLIN;
            }

            if (ev & (EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
                writeHandle = entry.writeHandle;
                entry.writeHandle = {};
                entry.interests &= ~EPOLLOUT;
            }

            updateInterest(fd);

            if (readHandle) {
                readHandle.resume();
            }

            if (writeHandle && (!readHandle || writeHandle.address() != readHandle.address())) {
                writeHandle.resume();
            }
        }
    }
}

4)SocketUtil.h

#ifndef SOCKET_UTIL_H
#define SOCKET_UTIL_H

int setNonBlocking(int fd);
int createListenFd(int port);

#endif

5)SocketUtil.cpp

#include "SocketUtil.h"

#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>

int setNonBlocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) return -1;
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int createListenFd(int port) {
    int serverFd = socket(AF_INET, SOCK_STREAM, 0);
    if (serverFd == -1) {
        return -1;
    }

    int opt = 1;
    setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    if (setNonBlocking(serverFd) == -1) {
        close(serverFd);
        return -1;
    }

    sockaddr_in addr{};
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(serverFd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == -1) {
        close(serverFd);
        return -1;
    }

    if (listen(serverFd, 128) == -1) {
        close(serverFd);
        return -1;
    }

    return serverFd;
}

6)Server.h

#ifndef SERVER_H
#define SERVER_H

#include "Task.h"

class Reactor;

DetachedTask acceptLoop(int listenFd, Reactor& reactor);
DetachedTask handleConnection(int clientFd, Reactor& reactor);

#endif

7)Server.cpp

#include "Server.h"
#include "Reactor.h"
#include "SocketUtil.h"

#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>

#include <iostream>
#include <string>

DetachedTask handleConnection(int clientFd, Reactor& reactor) {
    std::string writeBuffer;
    char buffer[1024];

    while (true) {
        co_await reactor.readable(clientFd);

        while (true) {
            ssize_t n = ::read(clientFd, buffer, sizeof(buffer));

            if (n > 0) {
                std::string msg(buffer, n);
                std::cout << "[recv] fd=" << clientFd
                          << " msg=" << msg << std::endl;

                // demo: echo
                writeBuffer.append(buffer, n);
            }
            else if (n == 0) {
                std::cout << "[close] peer closed fd=" << clientFd << std::endl;
                reactor.removeFd(clientFd);
                ::close(clientFd);
                co_return;
            }
            else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break;
                }

                std::cerr << "[error] read failed fd=" << clientFd << std::endl;
                reactor.removeFd(clientFd);
                ::close(clientFd);
                co_return;
            }
        }

        while (!writeBuffer.empty()) {
            co_await reactor.writable(clientFd);

            ssize_t wn = ::write(clientFd, writeBuffer.data(), writeBuffer.size());
            if (wn > 0) {
                writeBuffer.erase(0, wn);
            }
            else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    continue;
                }

                std::cerr << "[error] write failed fd=" << clientFd << std::endl;
                reactor.removeFd(clientFd);
                ::close(clientFd);
                co_return;
            }
        }
    }
}

DetachedTask acceptLoop(int listenFd, Reactor& reactor) {
    while (true) {
        co_await reactor.readable(listenFd);

        while (true) {
            int clientFd = ::accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK);
            if (clientFd >= 0) {
                std::cout << "[accept] new client fd=" << clientFd << std::endl;
                handleConnection(clientFd, reactor);
            }
            else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break;
                }

                if (errno == EINTR) {
                    continue;
                }

                std::cerr << "[error] accept4 failed" << std::endl;
                break;
            }
        }
    }
}

8)main.cpp

#include "Reactor.h"
#include "SocketUtil.h"
#include "Server.h"

#include <iostream>
#include <unistd.h>

int main() {
    int listenFd = createListenFd(8080);
    if (listenFd == -1) {
        std::cerr << "createListenFd failed" << std::endl;
        return 1;
    }

    Reactor reactor;

    // 启动 accept 协程
    acceptLoop(listenFd, reactor);

    std::cout << "server running on :8080" << std::endl;
    reactor.loop();

    close(listenFd);
    return 0;
}

七、怎么编译运行?

因为用了 C++20 协程,所以编译命令建议这样:

g++ -std=c++20 -O2 Reactor.cpp SocketUtil.cpp Server.cpp main.cpp -o server

运行:

./server

测试:

nc 127.0.0.1 8080

输入:hello

你会看到服务端输出日志,并且客户端收到回显。

八、这套代码的核心控制流,一定要看懂

这套代码真正的运行方式是:

main()
↓
acceptLoop(listenFd, reactor) // 启动 accept 协程
↓
协程第一次执行到:
co_await reactor.readable(listenFd)
↓
挂起
↓
reactor.loop()
↓
epoll_wait()
↓
listenFd ready
↓
Reactor 恢复 acceptLoop 协程
↓
accept 新连接
↓
为每个 clientFd 启动 handleConnection 协程
↓
handleConnection 执行到:
co_await reactor.readable(clientFd)
↓
挂起
↓
epoll_wait()
↓
clientFd ready
↓
Reactor 恢复 handleConnection 协程
↓
read / write / close

这就是:

事件驱动 + 协程恢复

九、这一版到底比纯 Reactor handler 好在哪?

如果你不用协程,逻辑通常会散在:

  • onRead()
  • onWrite()
  • 状态机切换
  • 回调链

而现在你可以写成:

co_await reactor.readable(fd);
read(...);

co_await reactor.writable(fd);
write(...);

也就是说:

表面上像同步代码

底层实际上还是异步调度

这就是协程最值钱的地方。

十、这一篇你最该记住的 5 句话

epoll 负责“谁 ready 了”

Reactor 负责“恢复哪个协程”

协程负责“从挂起点继续执行”

协程不是替代 Reactor,而是运行在 Reactor 之上

Reactor + 协程 = 现代异步系统的核心组合

十一、这一版还不是工业级,差在哪?

必须实话实说:

这是一版最小打通模型的代码,还不是工业级。

还缺:

  • Connection 对象化的协程版本
  • 写缓冲区更细致的管理
  • 半包 / 粘包处理
  • timer / timeout
  • 多 Reactor + 协程
  • 任务派发到业务线程池
  • 智能指针管理协程相关对象生命周期
  • 更安全的 close / remove 时机控制

但是,这一版已经把最关键的一层打通了:

epoll / Reactor / 协程 到底怎么协同工作

这比单纯会背概念强太多了。

十二、本篇一句话总结

前面的 Reactor 模型解决的是“事件来了怎么分发”,
而这一篇进一步解决的是“分发之后,代码怎么优雅地继续执行”。

也就是说:
epoll 负责发现事件,Reactor 负责调度事件,协程负责优雅执行事件。

从这一篇开始,我们已经不再只是写 socket 代码,而是在逐步搭建一个现代异步服务端运行时。

十三、完整项目结构(最终版)

.
├── Task.h
├── Reactor.h
├── Reactor.cpp
├── SocketUtil.h
├── SocketUtil.cpp
├── Server.h
├── Server.cpp
└── main.cpp

十四、下一步最顺的方向(核心升级)

到这一篇为止,我们已经分别完成了三件事情:

  • 第一篇:Connection(结构抽象)
  • 第二篇:多 Reactor(并发模型)
  • 第三篇:单 Reactor + 协程(执行模型)

也就是说,到目前为止,这三篇分别解决了:

结构 ✔
并发 ✔
执行 ✔

但这里要特别强调一点:

第三篇的协程模型,仍然是单 Reactor 版本。

它的价值在于先把最关键的执行流问题讲清楚:

  • 协程怎么挂起
  • Reactor 怎么恢复协程
  • epoll 怎么驱动协程继续执行

这一篇解决的是:

“Reactor + 协程”这套执行模型到底是怎么工作的

而不是最终的高并发架构形态。

下一步(主线核心):把并发模型和执行模型融合起来

真正的高并发服务端,不会停在:

单 Reactor + 协程

因为这仍然只能使用一个线程、一个事件循环。

所以本系列真正的主线下一步,就是:

《C++ 服务端进阶(四)—— 多 Reactor + 协程:真正的高并发模型》

第四篇要解决什么?

第四篇要做的事情非常明确:

把第二篇和第三篇真正合起来:

多 Reactor(第二篇)
+
协程(第三篇)

也就是把:

  • 第二篇解决的“多线程并发架构”
  • 第三篇解决的“协程执行模型”

融合成一套完整运行方式。

最终架构形态

第四篇的目标架构会变成:

Main Reactor(accept)
↓
Sub Reactor(线程1)+ 协程
Sub Reactor(线程2)+ 协程
Sub Reactor(线程3)+ 协程
...

也就是说:

  • 主 Reactor 负责 accept
  • 子 Reactor 负责事件循环
  • 每个子 Reactor 内部通过协程处理连接逻辑

这才是:

真正接近现代高并发服务端的核心模型

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐