C++ 服务端进阶(五)—— Connection + 协程:面向对象的异步模型(工程版完整实现)
一、这一篇到底解决什么问题?
在第四篇中,我们已经完成了:
多 Reactor(并发) + 协程(执行)
架构已经是对的了:
Main Reactor(accept)
↓
Sub Reactor(线程) + coroutine
但是代码形态仍然是:
handleConnection(fd, reactor);
👉 问题:
- ❌ 连接没有归属对象
- ❌ 状态分散在函数中
- ❌ 协程逻辑不属于“连接实体”
- ❌ 不符合真实服务端设计
本篇目标
把模型升级为:
fd
↓
Connection 对象
↓
run() 协程
↓
read / process / write
👉 一句话总结:
连接不再是“被处理的 fd”,而是“自带异步执行能力的对象”
二、第四篇 vs 第五篇(关键对比)
第四篇(函数式)
handleConnection(fd, reactor);
第五篇(对象化)
Connection* conn = new Connection(fd, reactor);
conn->start();
👉 本质变化:
函数式异步 → 面向对象异步
三、最终项目结构(第五篇)
.
├── Task.h
├── Reactor.h / Reactor.cpp
├── ReactorThread.h / ReactorThread.cpp
├── ReactorThreadPool.h / ReactorThreadPool.cpp
├── SocketUtil.h / SocketUtil.cpp
├── Connection.h / Connection.cpp
└── main.cpp
变化说明
👉 相比第四篇:
删除:
Server.h / Server.cpp
新增:
Connection.h / Connection.cpp
👉 原因:
连接处理逻辑从“外部函数” → 收敛到 Connection 内部
四、核心设计(必须理解)
1️⃣ Connection 是“活的对象”
它不只是数据:
fd
buffer
它还包含:
协程执行逻辑(run)
2️⃣ start() vs run()
void start() {
run();
}
👉 start = 启动入口
👉 run = 真正协程执行体
3️⃣ 生命周期归 Connection 自己管理
closeSelf()
👉 谁拥有连接,谁负责关闭
五、完整代码实现
1️⃣ Task.h
#ifndef TASK_H
#define TASK_H
#include <coroutine>
#include <exception>
struct DetachedTask {
struct promise_type {
DetachedTask get_return_object() noexcept {
return {};
}
std::suspend_never initial_suspend() noexcept {
return {};
}
struct FinalAwaiter {
bool await_ready() noexcept { return false; }
template <typename Promise>
void await_suspend(std::coroutine_handle<Promise> h) noexcept {
h.destroy();
}
void await_resume() noexcept {}
};
FinalAwaiter final_suspend() noexcept {
return {};
}
void return_void() noexcept {}
void unhandled_exception() {
std::terminate();
}
};
};
#endif
2️⃣ Reactor.h
#ifndef REACTOR_H
#define REACTOR_H
#include <coroutine>
#include <unordered_map>
#include <cstdint>
class Reactor {
public:
Reactor();
~Reactor();
void waitReadable(int fd, std::coroutine_handle<> handle);
void waitWritable(int fd, std::coroutine_handle<> handle);
void removeFd(int fd);
void loop();
class ReadableAwaiter {
public:
ReadableAwaiter(Reactor& reactor, int fd)
: reactor_(reactor), fd_(fd) {}
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) {
reactor_.waitReadable(fd_, handle);
}
void await_resume() const noexcept {}
private:
Reactor& reactor_;
int fd_;
};
class WritableAwaiter {
public:
WritableAwaiter(Reactor& reactor, int fd)
: reactor_(reactor), fd_(fd) {}
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> handle) {
reactor_.waitWritable(fd_, handle);
}
void await_resume() const noexcept {}
private:
Reactor& reactor_;
int fd_;
};
ReadableAwaiter readable(int fd) { return ReadableAwaiter(*this, fd); }
WritableAwaiter writable(int fd) { return WritableAwaiter(*this, fd); }
private:
struct Entry {
std::coroutine_handle<> readHandle{};
std::coroutine_handle<> writeHandle{};
uint32_t interests{0};
bool registered{false};
};
void updateInterest(int fd);
private:
int epollFd_;
std::unordered_map<int, Entry> entries_;
};
#endif
3️⃣ Reactor.cpp
(保持和第三/四篇一致,这里不再删减)
👉 👉(这一段你可以直接复用前一篇代码,保持一致性)
4️⃣ Connection.h
#ifndef CONNECTION_H
#define CONNECTION_H
#include "Task.h"
#include <string>
class Reactor;
class Connection {
public:
Connection(int fd, Reactor& reactor);
~Connection();
void start();
private:
DetachedTask run();
void closeSelf();
private:
int fd_;
Reactor& reactor_;
std::string readBuffer_;
std::string writeBuffer_;
};
#endif
5️⃣ Connection.cpp(核心)
#include "Connection.h"
#include "Reactor.h"
#include <unistd.h>
#include <errno.h>
#include <iostream>
Connection::Connection(int fd, Reactor& reactor)
: fd_(fd), reactor_(reactor) {}
Connection::~Connection() {
if (fd_ >= 0) {
::close(fd_);
fd_ = -1;
}
}
void Connection::start() {
run();
}
DetachedTask Connection::run() {
char buffer[1024];
while (true) {
co_await reactor_.readable(fd_);
while (true) {
ssize_t n = ::read(fd_, buffer, sizeof(buffer));
if (n > 0) {
readBuffer_.append(buffer, n);
std::cout << "[recv] fd=" << fd_
<< " msg=" << std::string(buffer, n) << std::endl;
// demo:echo
writeBuffer_.append(buffer, n);
}
else if (n == 0) {
std::cout << "[close] peer closed fd=" << fd_ << std::endl;
closeSelf();
co_return;
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
std::cerr << "[error] read failed fd=" << fd_ << std::endl;
closeSelf();
co_return;
}
}
while (!writeBuffer_.empty()) {
co_await reactor_.writable(fd_);
ssize_t wn = ::write(fd_, writeBuffer_.data(), writeBuffer_.size());
if (wn > 0) {
writeBuffer_.erase(0, wn);
}
else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
std::cerr << "[error] write failed fd=" << fd_ << std::endl;
closeSelf();
co_return;
}
}
}
}
void Connection::closeSelf() {
if (fd_ >= 0) {
reactor_.removeFd(fd_);
::close(fd_);
fd_ = -1;
}
}
6️⃣ main.cpp
#include "ReactorThreadPool.h"
#include "SocketUtil.h"
#include "Connection.h"
#include <sys/socket.h>
#include <iostream>
int main() {
int listenFd = createListenFd(8080);
ReactorThreadPool pool(4);
pool.start();
std::cout << "server running on :8080" << std::endl;
while (true) {
int clientFd = accept4(listenFd, nullptr, nullptr, SOCK_NONBLOCK);
if (clientFd >= 0) {
Reactor* subReactor = pool.getNextReactor();
Connection* conn = new Connection(clientFd, *subReactor);
conn->start();
}
}
return 0;
}
六、执行流程(必须理解)
accept 新连接
↓
new Connection(fd)
↓
conn.start()
↓
run() 协程开始
↓
co_await readable(fd)
↓
挂起
↓
epoll_wait
↓
fd ready
↓
Reactor 恢复协程
↓
继续执行 read/write
七、本篇核心提升(最重要)
第四篇解决:
多 Reactor + 协程(架构)
第五篇解决:
Connection + 协程(模型)
👉 一句话:
第四篇让你会“搭架构”,第五篇让你会“写框架代码”
八、总结
如果说第四篇完成的是“多 Reactor + 协程”的架构融合,那么第五篇完成的,就是“Connection + 协程”的模型融合。
从这一篇开始,连接不再只是一个 fd,也不只是一个状态容器,而是一个能够承载完整异步执行流程的服务端实体。
九、下一篇(最终篇)
👉 《C++ 服务端进阶(六)—— 工程化落地:协议、缓冲区与超时机制》
👉 你将补齐:
- 半包 / 粘包
- buffer 管理
- 超时 / 心跳
- 生命周期
函数式异步 → 对象化异步 → 工程化系统
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)