C++20 协程实战:从零手写高性能异步网络框架
·
摘要:协程是 C++20 最具革命性的特性之一,但官方只提供了底层设施,如何将其落地为生产力工具?本文将带你手写一个基于协程的异步网络框架,掌握
co_await、co_yield、promise_type的核心原理。
一、为什么需要协程?
在 C++20 之前,异步编程通常依赖回调函数或 Future/Promise 模式:
// 回调地狱示例
void fetchData(std::function<void(Data)> callback) {
asyncConnect([callback](Connection conn) {
conn.asyncRead([callback](Buffer buf) {
parseData(buf, [callback](Data data) {
callback(data); // 层层嵌套,代码难以维护
});
});
});
}
问题显而易见:
- 回调嵌套导致代码横向膨胀(Callback Hell)
- 错误处理分散在各层回调中
- 状态机逻辑与业务逻辑混杂
- 难以实现复杂的异步流程控制
协程通过挂起/恢复机制,让我们可以用同步的写法实现异步逻辑:
// 协程版本——代码顺序清晰,如同同步代码
Task<Data> fetchData() {
auto conn = co_await asyncConnect();
auto buf = co_await conn.asyncRead();
co_return parseData(buf);
}
二、协程核心概念速览
2.1 协程三关键字
| 关键字 | 作用 | 说明 |
|---|---|---|
co_await |
挂起等待 | 等待某个可等待对象完成,期间协程挂起 |
co_yield |
产出值 | 生成器模式,产出值后挂起,下次恢复继续 |
co_return |
返回值 | 协程结束并返回结果 |
2.2 编译器如何改造协程函数
当编译器遇到协程函数时,会生成一个状态机结构:
// 你写的代码
Task<int> demo() {
int a = co_await fetchA();
int b = co_await fetchB();
co_return a + b;
}
// 编译器生成的近似伪代码
Task<int> demo() {
struct __coro_state {
int __suspend_point = 0;
int a, b;
// ... 其他状态
};
switch (state.__suspend_point) {
case 0: goto start;
case 1: goto after_fetchA;
case 2: goto after_fetchB;
}
start:
// ... 初始化代码
state.__suspend_point = 1;
co_await fetchA(); // 挂起
after_fetchA:
state.a = /* 结果 */;
state.__suspend_point = 2;
co_await fetchB(); // 挂起
after_fetchB:
state.b = /* 结果 */;
co_return state.a + state.b;
}
2.3 Promise Type——协程的"控制器"
每个协程都有关联的 Promise Type,控制协程的行为:
template<typename T>
struct Task {
struct promise_type {
T value;
std::coroutine_handle<> continuation;
Task get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; } // 创建即挂起
std::suspend_always final_suspend() noexcept { return {}; } // 结束前挂起
void return_value(T v) { value = std::move(v); }
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
};
三、实战:手写异步网络框架
3.1 设计目标
我们将实现一个轻量级协程网络框架,核心特性:
- 基于 epoll/kqueue 的单线程事件循环
Task<T>类型表示异步操作co_await支持超时、取消- 零依赖,仅使用标准库
3.2 核心组件实现
3.2.1 可等待对象(Awaitable)
// 基础可等待对象接口
template<typename T = void>
struct Awaiter {
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> h) {
// 将 handle 注册到事件循环
EventLoop::current().registerCallback(this, h);
}
T await_resume() {
if (exception_) std::rethrow_exception(exception_);
return std::move(result_);
}
void set_value(T value) { result_ = std::move(value); }
void set_exception(std::exception_ptr e) { exception_ = e; }
private:
T result_;
std::exception_ptr exception_;
};
3.2.2 Task 类型
template<typename T = void>
class Task {
public:
struct promise_type {
T* value_ptr_ = nullptr;
std::exception_ptr exception_;
std::coroutine_handle<> continuation_;
Task get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
struct FinalAwaiter {
bool await_ready() noexcept { return false; }
void await_suspend(std::coroutine_handle<promise_type> h) noexcept {
auto& p = h.promise();
if (p.continuation_) {
p.continuation_.resume(); // 恢复父协程
}
}
void await_resume() noexcept {}
};
FinalAwaiter final_suspend() noexcept { return {}; }
void return_value(T value) {
if (value_ptr_) *value_ptr_ = std::move(value);
}
void unhandled_exception() {
exception_ = std::current_exception();
}
};
using Handle = std::coroutine_handle<promise_type>;
explicit Task(Handle h) : handle_(h) {}
~Task() { if (handle_) handle_.destroy(); }
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
// 同步等待(仅用于主线程)
T get() {
T result;
handle_.promise().value_ptr_ = &result;
handle_.resume();
while (!handle_.done()) {
EventLoop::current().runOnce();
}
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
return result;
}
// 支持 co_await
auto operator co_await() {
struct Awaiter {
Handle handle_;
bool await_ready() const { return handle_.done(); }
std::coroutine_handle<> await_suspend(std::coroutine_handle<> cont) {
handle_.promise().continuation_ = cont;
return handle_;
}
T await_resume() {
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
return std::move(*handle_.promise().value_ptr_);
}
};
return Awaiter{handle_};
}
void start() { handle_.resume(); }
bool done() const { return handle_.done(); }
private:
Handle handle_;
};
// void 特化版本
template<>
class Task<void> {
// 类似实现,省略...
};
3.2.3 异步 I/O 操作
// 异步读取
task<std::vector<char>> async_read(int fd, size_t n) {
std::vector<char> buffer(n);
size_t total_read = 0;
while (total_read < n) {
ssize_t nread = co_await AsyncReadAwaiter{fd, buffer.data() + total_read, n - total_read};
if (nread <= 0) {
if (nread < 0 && errno == EAGAIN) {
co_await WaitForReadable{fd}; // 等待可读
continue;
}
if (nread == 0) break; // EOF
throw std::system_error(errno, std::generic_category());
}
total_read += nread;
}
buffer.resize(total_read);
co_return buffer;
}
// 带超时的异步操作
template<typename Awaitable>
auto with_timeout(Awaitable awaitable, std::chrono::milliseconds timeout) {
struct TimeoutAwaiter {
Awaitable awaitable_;
std::chrono::milliseconds timeout_;
bool await_ready() { return awaitable_.await_ready(); }
void await_suspend(std::coroutine_handle<> h) {
// 注册超时定时器
auto timeout_handle = EventLoop::current().setTimeout(timeout_, [h]() mutable {
h.destroy(); // 超时销毁协程
});
awaitable_.await_suspend(std::coroutine_handle<TimeoutPromise>::from_address(h.address()));
}
decltype(auto) await_resume() {
return awaitable_.await_resume();
}
};
return TimeoutAwaiter{std::move(awaitable), timeout};
}
3.3 实战示例:HTTP 客户端
#include <iostream>
#include "coro_net.hpp"
using namespace coro;
// 协程风格的 HTTP GET
task<std::string> http_get(const std::string& host, const std::string& path) {
// 1. 异步解析 DNS
auto addresses = co_await async_resolve(host);
// 2. 异步连接(带 5 秒超时)
auto conn = co_await with_timeout(
async_connect(addresses, 80),
5s
);
// 3. 构造 HTTP 请求
std::string request = fmt::format(
"GET {} HTTP/1.1\r\n"
"Host: {}\r\n"
"Connection: close\r\n"
"\r\n",
path, host
);
// 4. 异步发送请求
co_await async_write_all(conn.fd(), request);
// 5. 异步读取响应
std::string response;
char buffer[4096];
while (true) {
auto n = co_await async_read(conn.fd(), buffer, sizeof(buffer));
if (n == 0) break;
response.append(buffer, n);
}
co_return response;
}
// 并发请求多个 URL
task<std::vector<std::string>> fetch_all(const std::vector<std::string>& urls) {
std::vector<task<std::string>> tasks;
for (const auto& url : urls) {
tasks.push_back(http_get(url, "/"));
}
// 等待所有任务完成
std::vector<std::string> results;
for (auto& t : tasks) {
results.push_back(co_await t);
}
co_return results;
}
int main() {
EventLoop loop;
// 发起并发请求
auto task = fetch_all({
"example.com",
"httpbin.org",
"github.com"
});
// 运行事件循环直到完成
auto results = task.get();
for (size_t i = 0; i < results.size(); ++i) {
std::cout << "URL " << i << " response size: "
<< results[i].size() << " bytes\n";
}
return 0;
}
四、性能分析
4.1 协程 vs 线程
| 特性 | 线程 | 协程 |
|---|---|---|
| 内存占用 | 1-8 MB 栈空间 | ~200-500 bytes(堆分配) |
| 切换开销 | 系统调用,~1-10 μs | 用户态,~10-100 ns |
| 调度方式 | 抢占式 | 协作式 |
| 并发数量 | 数千级别 | 百万级别 |
| 适用场景 | CPU 密集型 | I/O 密集型 |
4.2 编译器优化
GCC/Clang 对协程有专门的优化:
// -O3 优化后,简单协程会被内联展开
inline int compute() {
return 42;
}
task<int> optimized() {
// 编译器可能直接优化为:co_return 42;
auto x = co_await async_compute();
co_return x * 2;
}
4.3 内存分配优化
默认情况下,协程状态在堆上分配。可通过自定义 operator new 优化:
struct promise_type {
// 使用内存池分配
void* operator new(size_t size) {
return CoroMemoryPool::allocate(size);
}
void operator delete(void* ptr, size_t size) {
CoroMemoryPool::deallocate(ptr, size);
}
// ...
};
五、C++23 协程新特性展望
5.1 std::generator<T>(C++23)
#include <generator>
std::generator<int> fibonacci(int n) {
int a = 0, b = 1;
for (int i = 0; i < n; ++i) {
co_yield a;
auto next = a + b;
a = b;
b = next;
}
}
// 使用
for (auto n : fibonacci(10)) {
std::cout << n << " "; // 0 1 1 2 3 5 8 13 21 34
}
5.2 基于 Senders/Receivers 的新异步模型(C++26 方向)
C++23 引入了 std::execution(Sender/Receiver),将与协程深度整合:
// 未来写法(概念演示)
task<void> new_style() {
auto result = co_await std::execution::schedule(pool_scheduler)
| std::execution::then([]{ return fetch_data(); })
| std::execution::then([](auto data){ return process(data); });
}
六、总结与最佳实践
6.1 何时使用协程?
✅ 推荐使用场景:
- 高并发 I/O 服务(网络、数据库)
- 需要复杂异步流程控制的场景
- 生成器模式(无限序列、流处理)
- 状态机替代方案
❌ 避免使用场景:
- 简单的回调即可完成的操作
- 需要与 C ABI 交互的代码
- 极端延迟敏感的实时系统
6.2 调试技巧
// 1. 添加协程 ID 方便调试
struct promise_type {
static inline std::atomic<uint64_t> next_id{0};
uint64_t id = next_id++;
promise_type() {
std::cerr << "Coro " << id << " created\n";
}
~promise_type() {
std::cerr << "Coro " << id << " destroyed\n";
}
};
// 2. 使用 LLDB 调试协程
// (lldb) frame variable -L __coro_frame
// 查看编译器生成的协程状态机
6.3 关键要点
- 协程是协作式的——只能在
co_await、co_yield、co_return处挂起 - 生命周期管理至关重要——确保协程在使用的对象销毁前完成
- 异常会跨挂点传播——使用
co_await的try-catch包裹可能抛异常的操作 - 编译器支持已成熟——GCC 11+、Clang 13+、MSVC 2019 均完整支持
参考文献
- C++ Reference - Coroutines
- Lewis Baker’s Blog - C++ Coroutines
- cppcoro 库实现
- C++20 标准草案 N4861
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)