摘要:协程是 C++20 最具革命性的特性之一,但官方只提供了底层设施,如何将其落地为生产力工具?本文将带你手写一个基于协程的异步网络框架,掌握 co_awaitco_yieldpromise_type 的核心原理。


一、为什么需要协程?

在 C++20 之前,异步编程通常依赖回调函数或 Future/Promise 模式:

// 回调地狱示例
void fetchData(std::function<void(Data)> callback) {
    asyncConnect([callback](Connection conn) {
        conn.asyncRead([callback](Buffer buf) {
            parseData(buf, [callback](Data data) {
                callback(data);  // 层层嵌套,代码难以维护
            });
        });
    });
}

问题显而易见

  • 回调嵌套导致代码横向膨胀(Callback Hell)
  • 错误处理分散在各层回调中
  • 状态机逻辑与业务逻辑混杂
  • 难以实现复杂的异步流程控制

协程通过挂起/恢复机制,让我们可以用同步的写法实现异步逻辑

// 协程版本——代码顺序清晰,如同同步代码
Task<Data> fetchData() {
    auto conn = co_await asyncConnect();
    auto buf = co_await conn.asyncRead();
    co_return parseData(buf);
}

二、协程核心概念速览

2.1 协程三关键字

关键字 作用 说明
co_await 挂起等待 等待某个可等待对象完成,期间协程挂起
co_yield 产出值 生成器模式,产出值后挂起,下次恢复继续
co_return 返回值 协程结束并返回结果

2.2 编译器如何改造协程函数

当编译器遇到协程函数时,会生成一个状态机结构:

// 你写的代码
Task<int> demo() {
    int a = co_await fetchA();
    int b = co_await fetchB();
    co_return a + b;
}

// 编译器生成的近似伪代码
Task<int> demo() {
    struct __coro_state {
        int __suspend_point = 0;
        int a, b;
        // ... 其他状态
    };
    
    switch (state.__suspend_point) {
        case 0: goto start;
        case 1: goto after_fetchA;
        case 2: goto after_fetchB;
    }
    
    start:
    // ... 初始化代码
    state.__suspend_point = 1;
    co_await fetchA();  // 挂起
    
    after_fetchA:
    state.a = /* 结果 */;
    state.__suspend_point = 2;
    co_await fetchB();  // 挂起
    
    after_fetchB:
    state.b = /* 结果 */;
    co_return state.a + state.b;
}

2.3 Promise Type——协程的"控制器"

每个协程都有关联的 Promise Type,控制协程的行为:

template<typename T>
struct Task {
    struct promise_type {
        T value;
        std::coroutine_handle<> continuation;
        
        Task get_return_object() {
            return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        
        std::suspend_always initial_suspend() { return {}; }  // 创建即挂起
        std::suspend_always final_suspend() noexcept { return {}; }  // 结束前挂起
        
        void return_value(T v) { value = std::move(v); }
        void unhandled_exception() { std::terminate(); }
    };
    
    std::coroutine_handle<promise_type> handle;
};

三、实战:手写异步网络框架

3.1 设计目标

我们将实现一个轻量级协程网络框架,核心特性:

  • 基于 epoll/kqueue 的单线程事件循环
  • Task<T> 类型表示异步操作
  • co_await 支持超时、取消
  • 零依赖,仅使用标准库

3.2 核心组件实现

3.2.1 可等待对象(Awaitable)
// 基础可等待对象接口
template<typename T = void>
struct Awaiter {
    bool await_ready() const noexcept { return false; }
    
    void await_suspend(std::coroutine_handle<> h) {
        // 将 handle 注册到事件循环
        EventLoop::current().registerCallback(this, h);
    }
    
    T await_resume() {
        if (exception_) std::rethrow_exception(exception_);
        return std::move(result_);
    }
    
    void set_value(T value) { result_ = std::move(value); }
    void set_exception(std::exception_ptr e) { exception_ = e; }
    
private:
    T result_;
    std::exception_ptr exception_;
};
3.2.2 Task 类型
template<typename T = void>
class Task {
public:
    struct promise_type {
        T* value_ptr_ = nullptr;
        std::exception_ptr exception_;
        std::coroutine_handle<> continuation_;
        
        Task get_return_object() {
            return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        
        std::suspend_always initial_suspend() { return {}; }
        
        struct FinalAwaiter {
            bool await_ready() noexcept { return false; }
            
            void await_suspend(std::coroutine_handle<promise_type> h) noexcept {
                auto& p = h.promise();
                if (p.continuation_) {
                    p.continuation_.resume();  // 恢复父协程
                }
            }
            
            void await_resume() noexcept {}
        };
        
        FinalAwaiter final_suspend() noexcept { return {}; }
        
        void return_value(T value) {
            if (value_ptr_) *value_ptr_ = std::move(value);
        }
        
        void unhandled_exception() {
            exception_ = std::current_exception();
        }
    };
    
    using Handle = std::coroutine_handle<promise_type>;
    
    explicit Task(Handle h) : handle_(h) {}
    ~Task() { if (handle_) handle_.destroy(); }
    
    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, {});
        }
        return *this;
    }
    
    // 同步等待(仅用于主线程)
    T get() {
        T result;
        handle_.promise().value_ptr_ = &result;
        handle_.resume();
        
        while (!handle_.done()) {
            EventLoop::current().runOnce();
        }
        
        if (handle_.promise().exception_) {
            std::rethrow_exception(handle_.promise().exception_);
        }
        return result;
    }
    
    // 支持 co_await
    auto operator co_await() {
        struct Awaiter {
            Handle handle_;
            
            bool await_ready() const { return handle_.done(); }
            
            std::coroutine_handle<> await_suspend(std::coroutine_handle<> cont) {
                handle_.promise().continuation_ = cont;
                return handle_;
            }
            
            T await_resume() {
                if (handle_.promise().exception_) {
                    std::rethrow_exception(handle_.promise().exception_);
                }
                return std::move(*handle_.promise().value_ptr_);
            }
        };
        return Awaiter{handle_};
    }
    
    void start() { handle_.resume(); }
    bool done() const { return handle_.done(); }
    
private:
    Handle handle_;
};

// void 特化版本
template<>
class Task<void> {
    // 类似实现,省略... 
};
3.2.3 异步 I/O 操作
// 异步读取
task<std::vector<char>> async_read(int fd, size_t n) {
    std::vector<char> buffer(n);
    size_t total_read = 0;
    
    while (total_read < n) {
        ssize_t nread = co_await AsyncReadAwaiter{fd, buffer.data() + total_read, n - total_read};
        if (nread <= 0) {
            if (nread < 0 && errno == EAGAIN) {
                co_await WaitForReadable{fd};  // 等待可读
                continue;
            }
            if (nread == 0) break;  // EOF
            throw std::system_error(errno, std::generic_category());
        }
        total_read += nread;
    }
    
    buffer.resize(total_read);
    co_return buffer;
}

// 带超时的异步操作
template<typename Awaitable>
auto with_timeout(Awaitable awaitable, std::chrono::milliseconds timeout) {
    struct TimeoutAwaiter {
        Awaitable awaitable_;
        std::chrono::milliseconds timeout_;
        
        bool await_ready() { return awaitable_.await_ready(); }
        
        void await_suspend(std::coroutine_handle<> h) {
            // 注册超时定时器
            auto timeout_handle = EventLoop::current().setTimeout(timeout_, [h]() mutable {
                h.destroy();  // 超时销毁协程
            });
            
            awaitable_.await_suspend(std::coroutine_handle<TimeoutPromise>::from_address(h.address()));
        }
        
        decltype(auto) await_resume() {
            return awaitable_.await_resume();
        }
    };
    return TimeoutAwaiter{std::move(awaitable), timeout};
}

3.3 实战示例:HTTP 客户端

#include <iostream>
#include "coro_net.hpp"

using namespace coro;

// 协程风格的 HTTP GET
task<std::string> http_get(const std::string& host, const std::string& path) {
    // 1. 异步解析 DNS
    auto addresses = co_await async_resolve(host);
    
    // 2. 异步连接(带 5 秒超时)
    auto conn = co_await with_timeout(
        async_connect(addresses, 80),
        5s
    );
    
    // 3. 构造 HTTP 请求
    std::string request = fmt::format(
        "GET {} HTTP/1.1\r\n"
        "Host: {}\r\n"
        "Connection: close\r\n"
        "\r\n",
        path, host
    );
    
    // 4. 异步发送请求
    co_await async_write_all(conn.fd(), request);
    
    // 5. 异步读取响应
    std::string response;
    char buffer[4096];
    while (true) {
        auto n = co_await async_read(conn.fd(), buffer, sizeof(buffer));
        if (n == 0) break;
        response.append(buffer, n);
    }
    
    co_return response;
}

// 并发请求多个 URL
task<std::vector<std::string>> fetch_all(const std::vector<std::string>& urls) {
    std::vector<task<std::string>> tasks;
    
    for (const auto& url : urls) {
        tasks.push_back(http_get(url, "/"));
    }
    
    // 等待所有任务完成
    std::vector<std::string> results;
    for (auto& t : tasks) {
        results.push_back(co_await t);
    }
    
    co_return results;
}

int main() {
    EventLoop loop;
    
    // 发起并发请求
    auto task = fetch_all({
        "example.com",
        "httpbin.org",
        "github.com"
    });
    
    // 运行事件循环直到完成
    auto results = task.get();
    
    for (size_t i = 0; i < results.size(); ++i) {
        std::cout << "URL " << i << " response size: " 
                  << results[i].size() << " bytes\n";
    }
    
    return 0;
}

四、性能分析

4.1 协程 vs 线程

特性 线程 协程
内存占用 1-8 MB 栈空间 ~200-500 bytes(堆分配)
切换开销 系统调用,~1-10 μs 用户态,~10-100 ns
调度方式 抢占式 协作式
并发数量 数千级别 百万级别
适用场景 CPU 密集型 I/O 密集型

4.2 编译器优化

GCC/Clang 对协程有专门的优化:

// -O3 优化后,简单协程会被内联展开
inline int compute() {
    return 42;
}

task<int> optimized() {
    // 编译器可能直接优化为:co_return 42;
    auto x = co_await async_compute();
    co_return x * 2;
}

4.3 内存分配优化

默认情况下,协程状态在堆上分配。可通过自定义 operator new 优化:

struct promise_type {
    // 使用内存池分配
    void* operator new(size_t size) {
        return CoroMemoryPool::allocate(size);
    }
    
    void operator delete(void* ptr, size_t size) {
        CoroMemoryPool::deallocate(ptr, size);
    }
    // ...
};

五、C++23 协程新特性展望

5.1 std::generator<T>(C++23)

#include <generator>

std::generator<int> fibonacci(int n) {
    int a = 0, b = 1;
    for (int i = 0; i < n; ++i) {
        co_yield a;
        auto next = a + b;
        a = b;
        b = next;
    }
}

// 使用
for (auto n : fibonacci(10)) {
    std::cout << n << " ";  // 0 1 1 2 3 5 8 13 21 34
}

5.2 基于 Senders/Receivers 的新异步模型(C++26 方向)

C++23 引入了 std::execution(Sender/Receiver),将与协程深度整合:

// 未来写法(概念演示)
task<void> new_style() {
    auto result = co_await std::execution::schedule(pool_scheduler)
        | std::execution::then([]{ return fetch_data(); })
        | std::execution::then([](auto data){ return process(data); });
}

六、总结与最佳实践

6.1 何时使用协程?

推荐使用场景

  • 高并发 I/O 服务(网络、数据库)
  • 需要复杂异步流程控制的场景
  • 生成器模式(无限序列、流处理)
  • 状态机替代方案

避免使用场景

  • 简单的回调即可完成的操作
  • 需要与 C ABI 交互的代码
  • 极端延迟敏感的实时系统

6.2 调试技巧

// 1. 添加协程 ID 方便调试
struct promise_type {
    static inline std::atomic<uint64_t> next_id{0};
    uint64_t id = next_id++;
    
    promise_type() {
        std::cerr << "Coro " << id << " created\n";
    }
    ~promise_type() {
        std::cerr << "Coro " << id << " destroyed\n";
    }
};

// 2. 使用 LLDB 调试协程
// (lldb) frame variable -L __coro_frame
// 查看编译器生成的协程状态机

6.3 关键要点

  1. 协程是协作式的——只能在 co_awaitco_yieldco_return 处挂起
  2. 生命周期管理至关重要——确保协程在使用的对象销毁前完成
  3. 异常会跨挂点传播——使用 co_awaittry-catch 包裹可能抛异常的操作
  4. 编译器支持已成熟——GCC 11+、Clang 13+、MSVC 2019 均完整支持

参考文献

  1. C++ Reference - Coroutines
  2. Lewis Baker’s Blog - C++ Coroutines
  3. cppcoro 库实现
  4. C++20 标准草案 N4861
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐