一、这一篇到底解决什么问题?

在第四篇中,我们已经完成了:

多 Reactor(并发) + 协程(执行)

架构已经是对的了:

Main Reactor(accept)

Sub Reactor(线程) + coroutine


但是代码形态仍然是:

handleConnection(fd, reactor);

👉 问题:

  • ❌ 连接没有归属对象
  • ❌ 状态分散在函数中
  • ❌ 协程逻辑不属于“连接实体”
  • ❌ 不符合真实服务端设计

本篇目标

把模型升级为:

fd
↓
Connection 对象
↓
run() 协程
↓
read / process / write

👉 一句话总结:

连接不再是“被处理的 fd”,而是“自带异步执行能力的对象”

二、第四篇 vs 第五篇(关键对比)

第四篇(函数式)

handleConnection(fd, reactor);

第五篇(对象化)

Connection* conn = new Connection(fd, reactor);
conn->start();

👉 本质变化:

函数式异步 → 面向对象异步

三、最终项目结构(第五篇)

.
├── Task.h
├── Reactor.h / Reactor.cpp
├── ReactorThread.h / ReactorThread.cpp
├── ReactorThreadPool.h / ReactorThreadPool.cpp
├── SocketUtil.h / SocketUtil.cpp
├── Connection.h / Connection.cpp
└── main.cpp

变化说明

👉 相比第四篇:

删除:
Server.h / Server.cpp

新增:
Connection.h / Connection.cpp

👉 原因:

连接处理逻辑从“外部函数” → 收敛到 Connection 内部

四、核心设计(必须理解)


1️⃣ Connection 是“活的对象”

它不只是数据:

fd
buffer

它还包含:

协程执行逻辑(run)

2️⃣ start() vs run()

void start() {
run();
}

👉 start = 启动入口
👉 run = 真正协程执行体

3️⃣ 生命周期归 Connection 自己管理

closeSelf()

👉 谁拥有连接,谁负责关闭

五、完整代码实现

1️⃣ Task.h

#ifndef TASK_H
#define TASK_H

#include <coroutine>
#include <exception>

struct DetachedTask {
struct promise_type {
DetachedTask get_return_object() noexcept {
return {};
}

std::suspend_never initial_suspend() noexcept {
return {};
}

struct FinalAwaiter {
bool await_ready() noexcept { return false; }

template <typename Promise>
void await_suspend(std::coroutine_handle<Promise> h) noexcept {
h.destroy();
}

void await_resume() noexcept {}
};

FinalAwaiter final_suspend() noexcept {
return {};
}

void return_void() noexcept {}

void unhandled_exception() {
std::terminate();
}
};
};

#endif

2️⃣ Reactor.h

#ifndef REACTOR_H
#define REACTOR_H

#include <coroutine>
#include <unordered_map>
#include <cstdint>

class Reactor {
public:
Reactor();
~Reactor();

void waitReadable(int fd, std::coroutine_handle<> handle);
void waitWritable(int fd, std::coroutine_handle<> handle);
void removeFd(int fd);

void loop();

class ReadableAwaiter {
public:
ReadableAwaiter(Reactor& reactor, int fd)
: reactor_(reactor), fd_(fd) {}

bool await_ready() const noexcept { return false; }

void await_suspend(std::coroutine_handle<> handle) {
reactor_.waitReadable(fd_, handle);
}

void await_resume() const noexcept {}

private:
Reactor& reactor_;
int fd_;
};

class WritableAwaiter {
public:
WritableAwaiter(Reactor& reactor, int fd)
: reactor_(reactor), fd_(fd) {}

bool await_ready() const noexcept { return false; }

void await_suspend(std::coroutine_handle<> handle) {
reactor_.waitWritable(fd_, handle);
}

void await_resume() const noexcept {}

private:
Reactor& reactor_;
int fd_;
};

ReadableAwaiter readable(int fd) { return ReadableAwaiter(*this, fd); }
WritableAwaiter writable(int fd) { return WritableAwaiter(*this, fd); }

private:
struct Entry {
std::coroutine_handle<> readHandle{};
std::coroutine_handle<> writeHandle{};
uint32_t interests{0};
bool registered{false};
};

void updateInterest(int fd);

private:
int epollFd_;
std::unordered_map<int, Entry> entries_;
};

#endif

3️⃣ Reactor.cpp

(保持和第三/四篇一致,这里不再删减)

👉 👉(这一段你可以直接复用前一篇代码,保持一致性)

4️⃣ Connection.h

#ifndef CONNECTION_H
#define CONNECTION_H

#include "Task.h"
#include <string>

class Reactor;

class Connection {
public:
Connection(int fd, Reactor& reactor);
~Connection();

void start();

private:
DetachedTask run();
void closeSelf();

private:
int fd_;
Reactor& reactor_;

std::string readBuffer_;
std::string writeBuffer_;
};

#endif

5️⃣ Connection.cpp(核心)

#include "Connection.h"
#include "Reactor.h"

#include <unistd.h>
#include <errno.h>
#include <iostream>

Connection::Connection(int fd, Reactor& reactor)
: fd_(fd), reactor_(reactor) {}

Connection::~Connection() {
if (fd_ >= 0) {
::close(fd_);
fd_ = -1;
}
}

void Connection::start() {
run();
}

DetachedTask Connection::run() {
char buffer[1024];

while (true) {
co_await reactor_.readable(fd_);

while (true) {
ssize_t n = ::read(fd_, buffer, sizeof(buffer));

if (n > 0) {
readBuffer_.append(buffer, n);

std::cout << "[recv] fd=" << fd_
<< " msg=" << std::string(buffer, n) << std::endl;

// demo:echo
writeBuffer_.append(buffer, n);
}
else if (n == 0) {
std::cout << "[close] peer closed fd=" << fd_ << std::endl;
closeSelf();
co_return;
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}

std::cerr << "[error] read failed fd=" << fd_ << std::endl;
closeSelf();
co_return;
}
}

while (!writeBuffer_.empty()) {
co_await reactor_.writable(fd_);

ssize_t wn = ::write(fd_, writeBuffer_.data(), writeBuffer_.size());
if (wn > 0) {
writeBuffer_.erase(0, wn);
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}

std::cerr << "[error] write failed fd=" << fd_ << std::endl;
closeSelf();
co_return;
}
}
}
}

void Connection::closeSelf() {
if (fd_ >= 0) {
reactor_.removeFd(fd_);
::close(fd_);
fd_ = -1;
}
}

6️⃣ main.cpp

#include "ReactorThreadPool.h"
#include "SocketUtil.h"
#include "Connection.h"

#include <sys/socket.h>
#include <iostream>

int main() {
int listenFd = createListenFd(8080);

ReactorThreadPool pool(4);
pool.start();

std::cout << "server running on :8080" << std::endl;

while (true) {
int clientFd = accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK);

if (clientFd >= 0) {
Reactor* subReactor = pool.getNextReactor();

Connection* conn = new Connection(clientFd, *subReactor);
conn->start();
}
}

return 0;
}

六、执行流程(必须理解)

accept 新连接

new Connection(fd)

conn.start()

run() 协程开始

co_await readable(fd)

挂起

epoll_wait

fd ready

Reactor 恢复协程

继续执行 read/write

七、本篇核心提升(最重要)

第四篇解决:

多 Reactor + 协程(架构)

第五篇解决:

Connection + 协程(模型)


👉 一句话:

第四篇让你会“搭架构”,第五篇让你会“写框架代码”

八、总结

如果说第四篇完成的是“多 Reactor + 协程”的架构融合,那么第五篇完成的,就是“Connection + 协程”的模型融合。

从这一篇开始,连接不再只是一个 fd,也不只是一个状态容器,而是一个能够承载完整异步执行流程的服务端实体。

九、下一篇(最终篇)

👉 《C++ 服务端进阶(六)—— 工程化落地:协议、缓冲区与超时机制》


👉 你将补齐:

  • 半包 / 粘包
  • buffer 管理
  • 超时 / 心跳
  • 生命周期

函数式异步 → 对象化异步 → 工程化系统

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐