来源:C++ 标准草案 N5032,第 31 章 Input/output library

目录

  1. 总体架构
  2. 类型体系与继承关系
  3. ios_base — 流的根基
  4. basic_ios — 流状态管理
  5. stream buffer — 缓冲区原理
  6. 标准 iostream 对象
  7. 格式化输入 basic_istream
  8. 格式化输出 basic_ostream
  9. 操纵符 Manipulators
  10. 字符串流
  11. Span 流(C++23)
  12. 文件流
  13. 同步输出流 syncstream
  14. 文件系统 filesystem
  15. 完整可运行示例

总体架构

C++ 的 I/O 库采用分层设计,每一层有明确的职责:

用户代码
    |
    | 使用 << 和 >> 运算符
    v
basic_istream / basic_ostream    <- 格式化层:负责数据的格式转换
    |
    | 通过 rdbuf() 调用
    v
basic_streambuf                  <- 缓冲层:管理字符缓冲区
    |
    | 派生出具体实现
    v
basic_stringbuf / basic_filebuf / basic_syncbuf  <- 数据源层:字符串/文件/同步

用生活比喻:

  • basic_streambuf 像一个仓库,存放待发送/待接收的字符
  • basic_ostream 像一个快递员,把数据打包(格式化)后送到仓库
  • basic_istream 像一个收货员,从仓库取出数据并解包(解析)

类型体系与继承关系

ios_base
格式标志/状态/回调

basic_ios<charT,traits>
继承 ios_base
流缓冲区/状态查询

basic_istream<charT,traits>
格式化/非格式化输入

basic_ostream<charT,traits>
格式化/非格式化输出

basic_iostream<charT,traits>
双向流

basic_istringstream
字符串输入流

basic_ostringstream
字符串输出流

basic_stringstream
字符串双向流

basic_ifstream
文件输入流

basic_ofstream
文件输出流

basic_fstream
文件双向流

basic_osyncstream
同步输出流

缓冲区的继承树单独列出:

basic_streambuf<charT,traits>
缓冲区基类

basic_stringbuf
字符串缓冲区

basic_filebuf
文件缓冲区

basic_syncbuf
同步缓冲区

basic_spanbuf
span缓冲区

ios_base — 流的根基

ios_base 是所有流类的根基类,它存储了:

  • 格式标志(fmtflags):控制数字进制、对齐方式、小数点等
  • 流状态(iostate):记录读写是否出错
  • 打开模式(openmode):文件是读还是写
  • 私有数据槽:供用户存储自定义数据

格式标志(fmtflags)

fmtflags 是一个位掩码,每个位控制一种格式行为:
boolalpha   - 用 "true"/"false" 显示布尔值(而不是 1/0)
dec         - 十进制整数(默认)
hex         - 十六进制整数
oct         - 八进制整数
fixed       - 定点小数(如 3.14)
scientific  - 科学计数法(如 3.14e+00)
hexfloat    - 十六进制浮点(0x1.91eb86p+1)
showbase    - 显示进制前缀(0x、0)
showpoint   - 始终显示小数点
showpos     - 正数显示 + 号
skipws      - 输入时跳过前导空白(默认开启)
unitbuf     - 每次输出后立即刷新缓冲区
uppercase   - 十六进制字母大写(A-F 而非 a-f)
left        - 左对齐
right       - 右对齐(默认)
internal    - 符号左对齐,数字右对齐

三个"组合标志"(只能选其中一个):
a d j u s t f i e l d = l e f t ∣ r i g h t ∣ i n t e r n a l adjustfield = left \mid right \mid internal adjustfield=leftrightinternal
b a s e f i e l d = d e c ∣ o c t ∣ h e x basefield = dec \mid oct \mid hex basefield=decocthex
f l o a t f i e l d = s c i e n t i f i c ∣ f i x e d floatfield = scientific \mid fixed floatfield=scientificfixed

流状态(iostate)

iostate 是一个位掩码,记录流是否出错:
goodbit = 0     - 一切正常(没有任何错误位被设置)
eofbit          - 到达文件末尾
failbit         - 操作失败(如格式解析错误)
badbit          - 流缓冲区损坏(严重错误)
判断方法:
  stream.good()  ←→  rdstate() == goodbit
  stream.eof()   ←→  eofbit 被设置
  stream.fail()  ←→  failbit 或 badbit 被设置
  stream.bad()   ←→  badbit 被设置

打开模式(openmode)

app      - 每次写入前先移到文件末尾(追加模式)
ate      - 打开后立即移到文件末尾(seek to end)
binary   - 二进制模式(不做换行符转换)
in       - 打开用于读
out      - 打开用于写
trunc    - 打开时清空现有内容
noreplace- 独占创建(文件已存在则失败)

定位方向(seekdir)

beg - 从文件开头计算偏移
cur - 从当前位置计算偏移
end - 从文件末尾计算偏移

basic_ios — 流状态管理

basic_ios 继承 ios_base,增加了与具体字符类型相关的功能:

basic_ios 的核心成员:
  rdbuf()  - 返回关联的 streambuf(缓冲区指针)
  tie()    - 返回"绑定流"(输出前先刷新绑定流)
  fill()   - 填充字符(宽度不足时用什么字符补齐,默认空格)
  imbue()  - 设置本地化 locale

初始化默认值(来自标准表格)

basic_ios::init(sb) 被调用时,各字段被设置为:

字段 初始值
rdbuf() sb(传入的缓冲区)
tie() nullptr(不绑定任何流)
rdstate() goodbit(如果sb不为空),否则badbit
flags() skipws | dec(跳过空白 + 十进制)
width() 0(不限制宽度)
precision() 6(6位有效数字)
fill() widen(' ')(空格字符)

bool 转换

if (stream) { ... }    // 等价于 if (!stream.fail())
if (!stream) { ... }   // 等价于 if (stream.fail())

tie() 的作用

cin.tie() 默认返回 &cout。这意味着:在从 cin 读数据之前,会自动刷新 cout,确保提示语先显示再等待输入。

stream buffer — 缓冲区原理

缓冲区是 C++ I/O 的核心机制,通过一段内存数组实现高效的批量读写。

三指针模型

每个序列(输入序列 / 输出序列)由三个指针控制:

内部字符数组示意:
输入序列(get area):
+---+---+---+---+---+---+---+---+
| a | b | c | d | e | f | g | h |
+---+---+---+---+---+---+---+---+
  ^           ^               ^
  |           |               |
eback()     gptr()          egptr()
(起始)   (当前读位置)  (结束)
输出序列(put area):
+---+---+---+---+---+---+---+---+
| X | Y | Z | _ | _ | _ | _ | _ |
+---+---+---+---+---+---+---+---+
  ^           ^               ^
  |           |               |
pbase()     pptr()          epptr()
(起始)   (当前写位置)  (结束)
  • 读操作:从 *gptr() 读取字符,然后 gptr() 向右移动
  • 写操作:将字符写入 *pptr(),然后 pptr() 向右移动
  • 当读指针到达 egptr() 时,调用 underflow() 从底层(文件/字符串)加载更多数据
  • 当写指针到达 epptr() 时,调用 overflow() 将缓冲区数据刷写到底层

关键虚函数


虚函数 何时调用 作用
underflow() 读区空了 从底层读入更多数据
uflow() 读区空了且需要移动指针 同上,但自动移动gptr
overflow(c) 写区满了 把缓冲数据刷写到底层,然后写入c
pbackfail(c) 需要退回字符时 将c放回输入序列
seekoff() 相对定位 移动读/写位置
seekpos() 绝对定位 移动到指定位置
sync() 同步 将缓冲数据写到底层

标准 iostream 对象

<iostream> 提供 8 个全局流对象:

对象 类型 连接到 说明
cin istream stdin 标准输入,默认绑定到 cout
cout ostream stdout 标准输出
cerr ostream stderr 标准错误,无缓冲(unitbuf),绑定到 cout
clog ostream stderr 标准错误日志,有缓冲
wcin wistream stdin 宽字符版 cin
wcout wostream stdout 宽字符版 cout
wcerr wostream stderr 宽字符版 cerr
wclog wostream stderr 宽字符版 clog

cerr vs clog 的区别

cerr:unitbuf 标志被设置
  → 每次写入后立即刷新
  → 适合调试信息(立即可见)
  → 性能较低
clog:正常缓冲模式
  → 批量刷新
  → 适合日志记录(性能更好)

sync_with_stdio

// 取消 C++ 流和 C stdio 流的同步
ios_base::sync_with_stdio(false);
// 效果:cin/cout 不再与 scanf/printf 同步,速度更快
// 代价:混用 cin 和 scanf 会产生未定义行为

格式化输入 basic_istream

sentry — 输入卫兵

每个格式化输入函数开始时都构造一个 sentry 对象,它负责:

  1. 检查流状态是否 good
  2. 如果有绑定流(tie()),先刷新它
  3. 如果 skipws 标志开启,跳过前导空白字符
sentry 的执行流程:
构造 sentry(is)
    |
    +-- is.good() 为 false?
    |       YES → 设置 failbit,sentry 为 false,函数提前返回
    |
    +-- is.tie() 不为 null?
    |       YES → 调用 is.tie()->flush()
    |
    +-- noskipws 为 false 且 skipws 标志开启?
            YES → 跳过空白字符
            读取字符失败? → 设置 failbit | eofbit

格式化提取(operator>>)

对数值类型(int、double 等),实际由 locale 的 num_get facet 完成解析。对于 shortint,标准先用 long 读取,再检查范围是否溢出:

// short 的读取逻辑(简化)
long lval;
// 先读为 long
use_facet<num_get<char>>(...).get(..., lval);
// 再检查范围
if (lval < numeric_limits<short>::min()) {
    state |= failbit;       // 设置失败标志
    val = numeric_limits<short>::min();
} else if (lval > numeric_limits<short>::max()) {
    state |= failbit;
    val = numeric_limits<short>::max();
} else {
    val = static_cast<short>(lval);
}

非格式化输入函数


函数 说明
get() 读取一个字符(包括空白)
get(s, n, delim) 读取最多 n-1 个字符,直到遇到 delim(不消耗 delim)
getline(s, n, delim) 读取最多 n-1 个字符,直到遇到 delim(消耗 delim)
read(s, n) 读取恰好 n 个字符
readsome(s, n) 只读取当前缓冲区中可用的字符(不阻塞)
ignore(n, delim) 丢弃最多 n 个字符,直到遇到 delim
peek() 查看下一个字符但不消耗
putback(c) 把字符 c 放回输入序列
unget() 把最后读取的字符放回
gcount() 返回上次非格式化操作读取的字符数

get 与 getline 的关键区别

输入流:  "hello\nworld\n"
用 get(buf, 100, '\n') 读取后:
  buf = "hello"
  流中剩余:"\nworld\n"   <- '\n' 还在流里!
  gcount() = 5
用 getline(buf, 100, '\n') 读取后:
  buf = "hello"
  流中剩余:"world\n"     <- '\n' 被消耗了!
  gcount() = 6           <- 计入了被消耗的 '\n'

格式化输出 basic_ostream

sentry — 输出卫兵

输出 sentry 负责:

  1. 检查流状态(os.good()
  2. 调用 os.tie()->flush()(如果有绑定流)
  3. 析构时,如果 unitbuf 标志开启,自动调用 rdbuf()->pubsync()

填充(Padding)规则

当输出宽度(width())大于内容长度时,自动补充填充字符(fill()):

os.width(10);
os.fill('*');
os << "hi";
根据对齐标志:
  right(默认):  "********hi"
  left:          "hi********"
  internal:      符号左,数字右(如 "+*******42")

宽度 = max ⁡ ( o s . w i d t h ( ) , 内容长度 ) 宽度 = \max(os.width(), \text{内容长度}) 宽度=max(os.width(),内容长度)

算术类型的输出

实际由 locale 的 num_put facet 完成格式化:

short 和 int 的输出有特殊处理:
  - 如果是十六进制或八进制,先转为 unsigned long 避免负号问题
  - 否则转为 long 进行格式化
float 输出先转为 double(避免 num_put 没有 float 的特化)

print 函数(C++23)

// 等价于 cout << format("Hello, {}!\n", name);
print("Hello, {}!\n", name);
// 输出到指定流
print(cerr, "Error: {}\n", msg);
// 自动加换行
println("Result: {}", value);

printprintln 在 UTF-8 环境下使用 vprint_unicode,会调用平台原生 Unicode API(Windows 上是 WriteConsoleW),确保正确显示 Unicode 字符。

操纵符 Manipulators

操纵符本质上是函数指针,通过 operator<<operator>> 传递给流,从而改变流的状态。

cout << hex;   // 相当于:hex(cout);  内部调用 cout.setf(ios_base::hex, ios_base::basefield)
cin >> skipws; // 相当于:skipws(cin);

基础操纵符


操纵符 效果
boolalpha / noboolalpha 布尔值用字母/数字表示
showbase / noshowbase 显示/隐藏进制前缀
showpos / noshowpos 显示/隐藏正数 + 号
skipws / noskipws 跳过/不跳过前导空白
uppercase / nouppercase 大写/小写十六进制
unitbuf / nounitbuf 每次写后刷新/不刷新
dec / oct / hex 十进制/八进制/十六进制
fixed / scientific / hexfloat / defaultfloat 浮点格式
left / right / internal 对齐方式
endl 输出 '\n' 并刷新
ends 输出空字符 '\0'
flush 刷新缓冲区
ws 跳过输入流中的空白

<iomanip> 操纵符


操纵符 效果
setw(n) 设置下次输出的字段宽度(只影响下一次操作!)
setprecision(n) 设置浮点精度
setfill(c) 设置填充字符
setbase(8/10/16) 设置整数进制
setiosflags(mask) 设置指定标志位
resetiosflags(mask) 清除指定标志位

quoted 操纵符

quoted 用于处理含空格的字符串,自动加引号和转义:

string s = "hello world";
cout << quoted(s);           // 输出:"hello world"
string result;
istringstream iss("\"hello world\"");
iss >> quoted(result);       // result = "hello world"(引号被去掉)

常见应用场景:CSV、XML 文件处理,或通过 >> 读取含空格的路径。

字符串流

字符串流将 std::string 作为数据源/目标,无需文件 I/O:

istringstream
从字符串读取

ostringstream
写入字符串

stringstream
双向读写字符串

stringbuf
字符串缓冲区

stringbuf 的初始化规则

openmode 决定读写区的初始化:
有 out 标志:
  pbase() → buf 起始
  epptr() >= pbase() + buf.size()
  有 ate 标志:pptr() = pbase() + buf.size()(从末尾开始写)
  没有 ate:  pptr() = pbase()(从开头开始写)
有 in 标志:
  eback() → buf 起始
  gptr()  = eback()
  egptr() = eback() + buf.size()

str() 方法的行为

ostringstream oss;
oss << "hello " << 42;
// const & 版本:返回副本
string s1 = oss.str();    // s1 = "hello 42"
// && 版本(C++20):移动,消耗内部缓冲区
string s2 = std::move(oss).str(); // s2 = "hello 42",oss 的缓冲区变空
// view() 版本(C++20):不复制,返回视图
string_view sv = oss.view();  // 零拷贝,但注意生命周期!

Span 流(C++23)

<spanstream> 提供基于 std::span<char> 的流,适用于在固定大小的缓冲区上进行流操作,不涉及内存分配:

char buf[100];
ospanstream oss(span<char>(buf, sizeof(buf)));
oss << "hello " << 42;
// 数据写入 buf,不分配堆内存

与 stringstream 的关键区别:

特性 stringstream spanstream
底层存储 std::string(动态增长) std::span(固定大小)
内存分配
溢出 overflow 自动扩容 不可用(固定大小)
适用场景 通用字符串构建 嵌入式/性能敏感代码

文件流

文件打开模式映射表


ios_base 标志组合 stdio 等价 说明
out "w" 写(覆盖)
out | noreplace "wx" 写(文件已存在则失败)
out | app "a" 追加写
in "r" 只读
in | out "r+" 读写(文件必须存在)
in | out | trunc "w+" 读写(覆盖)
in | out | app "a+" 读+追加写
以上 + binary b "wb" "rb"

filebuf 的字符转换

文件缓冲区在宽字符(wchar_t)模式下,通过 codecvt facet 进行多字节/宽字符转换:

磁盘文件(多字节字节流)
         |
         | codecvt::in() 转换
         v
内部缓冲区(宽字符序列 wchar_t[])
         |
         | 提供给 wistream/wostream
         v
用户代码

转换公式:extern_chars → c o d e c v t : : i n intern_chars (wchar_t) \text{转换公式:extern\_chars} \xrightarrow{codecvt::in} \text{intern\_chars (wchar\_t)} 转换公式:extern_charscodecvt::in intern_chars (wchar_t)

native_handle(C++26)

ifstream ifs("data.txt");
auto handle = ifs.native_handle();
// POSIX 上:handle 是 int(文件描述符)
// Windows 上:handle 是 HANDLE

同步输出流 syncstream

<syncstream> 解决多线程环境下流输出交错的问题。

问题演示

无同步时(多线程输出):
线程A: "Hello"
线程B: "World"
实际输出可能是:"HWelolrold"  ← 字符交错!
有 osyncstream 时:
线程A: osyncstream(cout) << "Hello\n";
线程B: osyncstream(cout) << "World\n";
保证输出是:"Hello\nWorld\n" 或 "World\nHello\n"  ← 不会交错

syncbuf 的工作原理

osyncstream 的输出流程:
写入字符 → 存入 syncbuf 的内部缓冲区
                    |
                    | (不立即输出到 cout)
                    v
调用 emit() 或析构时
                    |
                    | 原子性地获取 wrapped 流的锁
                    | 将缓冲区全部写入 wrapped 流
                    | 释放锁
                    v
cout(wrapped 流)收到完整的字符块

关键性质:emit() 对同一个 wrapped 流的调用是全序的(total order),不会有字符交错。

// 标准用法
{
    osyncstream bout(cout);
    bout << "Hello, ";
    bout << "World!\n";
}  // 析构时自动调用 emit(),原子性写入 cout

emit()flush() 的区别:在 osyncstream 上调用 flush() 只是记录"需要刷新",并不立即刷新。真正的刷新发生在 emit() 时。

文件系统 filesystem

<filesystem> 提供跨平台的文件系统操作。

path 类

namespace fs = std::filesystem;
// 路径构造(/ 运算符连接路径)
fs::path p = "/home/user" / "docs" / "file.txt";
// 路径分解
p.root_name()       // ""(POSIX)或 "C:"(Windows)
p.root_directory()  // "/"
p.parent_path()     // "/home/user/docs"
p.filename()        // "file.txt"
p.stem()            // "file"
p.extension()       // ".txt"
// 路径修改
p.replace_extension(".bak"); // "/home/user/docs/file.bak"
p.remove_filename();         // "/home/user/docs/"

路径规范化(normalization)

标准定义了一个 8 步规范化过程,将路径中的 ... 和多余的分隔符消除:

规范化步骤(简化):
1. 空路径直接返回
2. 根名中的斜杠替换为首选分隔符
3. 目录分隔符替换为首选分隔符
4. 移除 "." 文件名
5. 移除 "非.." + "/" + ".." 的组合
6. 如果有根目录,移除所有 ".."
7. 如果末尾是 "..",移除末尾分隔符
8. 如果路径为空,置为 "."
示例:
"foo/./bar/.." → "foo/"
"foo/.///bar/../" → "foo/"

错误报告两种模式

所有文件系统函数提供两个版本:

// 抛异常版本(适合错误是"真正异常"情况)
try {
    fs::create_directory("/no/permission");
} catch (const fs::filesystem_error& e) {
    cerr << e.what() << '\n';    // 错误信息
    cerr << e.path1() << '\n';   // 相关路径
}
// error_code 版本(适合错误是"正常情况")
error_code ec;
fs::create_directory("/maybe/exists", ec);
if (ec) {
    // 处理错误,不抛异常
}

文件状态与类型

file_type 枚举值

regular - 普通文件

directory - 目录

symlink - 符号链接

block - 块设备

character - 字符设备

fifo - 管道文件

socket - Socket文件

not_found - 文件不存在

unknown - 类型未知

none - 尚未确定或出错

目录迭代

// 非递归迭代
for (const auto& entry : fs::directory_iterator(".")) {
    cout << entry.path() << "\n";
    cout << "  大小: " << entry.file_size() << "\n";  // 可能使用缓存值!
}
// 递归迭代
for (const auto& entry : fs::recursive_directory_iterator(".")) {
    cout << entry.path() << "\n";
}

directory_entry 可能缓存文件属性(大小、最后修改时间等),避免重复访问磁盘。若需要最新数据,调用 refresh()

常用操作函数速查


函数 功能
exists(p) 文件是否存在
is_regular_file(p) 是否是普通文件
is_directory(p) 是否是目录
is_symlink(p) 是否是符号链接
file_size(p) 文件大小(字节)
last_write_time(p) 最后修改时间
create_directory(p) 创建目录
create_directories(p) 递归创建目录
copy(from, to) 复制文件
copy_file(from, to) 复制普通文件
rename(old, new) 重命名/移动
remove(p) 删除文件或空目录
remove_all(p) 递归删除
canonical(p) 解析符号链接,返回绝对路径
relative(p, base) 计算相对路径
space(p) 查询磁盘空间
permissions(p, prms) 设置文件权限

权限枚举

文件权限使用 POSIX 风格的八进制位掩码:
all = 0700 ⏟ owner ∣ 070 ⏟ group ∣ 07 ⏟ others = 0777 \text{all} = \underbrace{0700}_{\text{owner}} \mid \underbrace{070}_{\text{group}} \mid \underbrace{07}_{\text{others}} = 0777 all=owner 0700group 070others 07=0777

完整可运行示例

// cpp_io_demo.cpp
// 编译命令:
//   g++ -std=c++20 -o cpp_io_demo cpp_io_demo.cpp
//   clang++ -std=c++20 -o cpp_io_demo cpp_io_demo.cpp
//
// 注意:filesystem 部分在某些编译器上需要链接 -lstdc++fs(旧版GCC)
#include <iostream>    // cin, cout, cerr, clog
#include <iomanip>     // setw, setprecision, quoted, hex 等
#include <sstream>     // ostringstream, istringstream, stringstream
#include <fstream>     // ofstream, ifstream, fstream
#include <filesystem>  // path, directory_iterator 等
#include <string>
#include <string_view>
#include <format>      // std::format (C++20)
#include <thread>      // std::thread (用于 syncstream 演示)
#include <syncstream>  // osyncstream (C++20)
namespace fs = std::filesystem;
// ============================================================
// 工具:打印章节标题
// ============================================================
void section(const char* title) {
    std::cout << "\n===== " << title << " =====\n";
}
// ============================================================
// 1. 基础格式标志演示
// ============================================================
void demo_fmtflags() {
    section("格式标志演示");
    // --- 整数进制 ---
    int n = 255;
    std::cout << std::dec << "十进制: " << n << "\n";   // 255
    std::cout << std::hex << "十六进制: " << n << "\n"; // ff
    std::cout << std::oct << "八进制: " << n << "\n";   // 377
    // showbase:显示进制前缀
    std::cout << std::hex << std::showbase << n << "\n"; // 0xff
    std::cout << std::uppercase << n << "\n";             // 0XFF
    // 恢复默认
    std::cout << std::dec << std::noshowbase << std::nouppercase;
    // --- 浮点数格式 ---
    double pi = 3.14159265358979;
    std::cout << std::defaultfloat << "默认: "     << pi << "\n"; // 3.14159
    std::cout << std::fixed       << "定点: "     << pi << "\n"; // 3.141593
    std::cout << std::scientific  << "科学计数: " << pi << "\n"; // 3.141593e+00
    std::cout << std::hexfloat    << "十六进制浮点: " << pi << "\n"; // 0x1.921fb54442d18p+1
    // 精度设置
    std::cout << std::defaultfloat;
    std::cout << std::setprecision(10) << pi << "\n"; // 3.141592654
    // --- 对齐与填充 ---
    std::cout << std::setw(10) << std::left  << "LEFT"  << "|\n"; // "LEFT      |"
    std::cout << std::setw(10) << std::right << "RIGHT" << "|\n"; // "     RIGHT|"
    std::cout << std::setfill('*') << std::setw(10) << 42 << "\n"; // "********42"
    std::cout << std::setfill(' '); // 恢复默认填充字符
    // --- 布尔值 ---
    std::cout << std::boolalpha   << true << " " << false << "\n"; // true false
    std::cout << std::noboolalpha << true << " " << false << "\n"; // 1 0
    // --- showpos:显示正数符号 ---
    std::cout << std::showpos << 42 << " " << -42 << "\n"; // +42 -42
    std::cout << std::noshowpos;
}
// ============================================================
// 2. 流状态演示
// ============================================================
void demo_stream_state() {
    section("流状态演示");
    // 使用 istringstream 模拟带错误的输入
    std::istringstream iss("123 abc 456");
    int x;
    iss >> x;
    std::cout << "读取成功: x=" << x
              << " good=" << iss.good() << "\n"; // x=123, good=1
    iss >> x; // 尝试读取 "abc" 为整数 → 失败
    std::cout << "读取 'abc' 为整数后:"
              << " fail=" << iss.fail()
              << " bad=" << iss.bad() << "\n"; // fail=1, bad=0
    // 清除错误标志,继续读取
    iss.clear();
    std::string s;
    iss >> s; // 读取 "abc"
    std::cout << "清除后读取: s=" << s << "\n"; // s=abc
    iss >> x;
    std::cout << "继续读取: x=" << x << "\n"; // x=456
    // 到达文件末尾
    iss >> x; // 流已空
    std::cout << "到达末尾: eof=" << iss.eof()
              << " fail=" << iss.fail() << "\n"; // eof=1, fail=1
}
// ============================================================
// 3. 字符串流演示
// ============================================================
void demo_string_streams() {
    section("字符串流演示");
    // ostringstream:将多个值拼接成字符串
    std::ostringstream oss;
    oss << "姓名: " << "Alice"
        << ", 年龄: " << 30
        << ", 分数: " << std::fixed << std::setprecision(2) << 98.5;
    std::string result = oss.str();
    std::cout << result << "\n";
    // istringstream:从字符串解析值
    std::string data = "3.14 42 hello";
    std::istringstream iss(data);
    double d;
    int i;
    std::string word;
    iss >> d >> i >> word;
    std::cout << "解析结果: d=" << d << " i=" << i << " word=" << word << "\n";
    // stringstream:既可读又可写
    std::stringstream ss;
    ss << "100 200 300";
    int a, b, c;
    ss >> a >> b >> c;
    std::cout << "a=" << a << " b=" << b << " c=" << c << "\n";
    // quoted:处理含空格的字符串
    std::string path_with_spaces = "/home/user/my documents/file.txt";
    std::ostringstream oss2;
    oss2 << std::quoted(path_with_spaces);
    std::cout << "quoted 输出: " << oss2.str() << "\n";
    // 从 quoted 输入中解析
    std::istringstream iss2("\"/home/user/my documents/file.txt\"");
    std::string parsed_path;
    iss2 >> std::quoted(parsed_path);
    std::cout << "解析后路径: " << parsed_path << "\n";
    // view():零拷贝访问缓冲区(C++20)
    std::ostringstream oss3;
    oss3 << "hello world";
    std::string_view sv = oss3.view();
    std::cout << "view() 视图: " << sv << "\n";
}
// ============================================================
// 4. 文件流演示
// ============================================================
void demo_file_streams() {
    section("文件流演示");
    const std::string filename = "/tmp/cpp_io_test.txt";
    // --- 写入文件 ---
    {
        std::ofstream ofs(filename); // 默认模式:out | trunc
        if (!ofs) {
            std::cerr << "无法创建文件: " << filename << "\n";
            return;
        }
        ofs << "第一行\n"
            << "第二行,数字=" << 42 << "\n"
            << "第三行,浮点=" << 3.14 << "\n";
    } // ofs 析构,自动关闭并刷新
    // --- 读取文件 ---
    {
        std::ifstream ifs(filename);
        if (!ifs) {
            std::cerr << "无法打开文件\n";
            return;
        }
        std::string line;
        int linenum = 0;
        while (std::getline(ifs, line)) {
            std::cout << "[" << ++linenum << "] " << line << "\n";
        }
        std::cout << "读到文件末尾: eof=" << ifs.eof() << "\n";
    }
    // --- 追加模式 ---
    {
        std::ofstream ofs(filename, std::ios::app);
        ofs << "追加的第四行\n";
    }
    // --- 二进制读写 ---
    const std::string binfile = "/tmp/cpp_io_test.bin";
    {
        std::ofstream ofs(binfile, std::ios::binary);
        int data[] = {1, 2, 3, 4, 5};
        // write:写入原始字节
        ofs.write(reinterpret_cast<const char*>(data), sizeof(data));
    }
    {
        std::ifstream ifs(binfile, std::ios::binary);
        int data[5] = {};
        ifs.read(reinterpret_cast<char*>(data), sizeof(data));
        std::cout << "二进制读取: ";
        for (int v : data) std::cout << v << " ";
        std::cout << "\n";
    }
    // --- seekg / tellg ---
    {
        std::fstream fs(filename, std::ios::in | std::ios::out);
        // 移到文件末尾,获取文件大小
        fs.seekg(0, std::ios::end);
        std::streampos size = fs.tellg();
        std::cout << "文件大小: " << size << " 字节\n";
        // 回到文件开头
        fs.seekg(0, std::ios::beg);
        std::string firstLine;
        std::getline(fs, firstLine);
        std::cout << "第一行: " << firstLine << "\n";
    }
    // 清理临时文件
    std::remove(filename.c_str());
    std::remove(binfile.c_str());
}
// ============================================================
// 5. 操纵符演示
// ============================================================
void demo_manipulators() {
    section("操纵符演示");
    // setw 只影响下一次输出
    std::cout << std::setw(8) << 42        // 宽度8
              << std::setw(8) << "hello"   // 宽度8
              << "\n";
    // 输出:      42   hello
    // get_money / put_money(需要 locale 支持)
    // 此处略去,因各平台 locale 配置不同
    // resetiosflags:精确清除标志
    std::cout << std::hex << std::showbase << 255 << "\n"; // 0xff
    std::cout << std::resetiosflags(std::ios::showbase | std::ios::hex);
    std::cout << 255 << "\n"; // 255(恢复十进制,无前缀)
}
// ============================================================
// 6. 同步流演示(多线程安全输出)
// ============================================================
void demo_syncstream() {
    section("同步输出流演示");
    // 多线程输出,每个线程使用 osyncstream 包装 cout
    auto thread_func = [](int id) {
        // 每个 osyncstream 析构时原子性写入 cout
        std::osyncstream(std::cout)
            << "线程 " << id << " 的输出\n";
    };
    std::thread t1(thread_func, 1);
    std::thread t2(thread_func, 2);
    std::thread t3(thread_func, 3);
    t1.join();
    t2.join();
    t3.join();
    std::cout << "(以上三行顺序不定,但每行都完整)\n";
    // emit() 的手动控制
    {
        std::osyncstream bout(std::cout);
        bout << "Part 1, ";
        bout.emit();                    // 原子性写入 "Part 1, "
        bout << "Part 2\n";
        // 析构时自动 emit() "Part 2\n"
    }
}
// ============================================================
// 7. 文件系统演示
// ============================================================
void demo_filesystem() {
    section("文件系统演示");
    // --- 路径操作 ---
    fs::path p = "/home/user/docs/report.pdf";
    std::cout << "完整路径:   " << p << "\n";
    std::cout << "父目录:     " << p.parent_path() << "\n";
    std::cout << "文件名:     " << p.filename() << "\n";
    std::cout << "主干名:     " << p.stem() << "\n";
    std::cout << "扩展名:     " << p.extension() << "\n";
    // 路径连接
    fs::path base = "/tmp";
    fs::path full = base / "subdir" / "file.txt";
    std::cout << "连接路径: " << full << "\n";
    // 路径规范化
    fs::path messy = "/tmp/./foo/../bar";
    std::cout << "规范化: " << messy.lexically_normal() << "\n";
    // --- 临时目录 ---
    try {
        fs::path tmp = fs::temp_directory_path();
        std::cout << "临时目录: " << tmp << "\n";
    } catch (const fs::filesystem_error& e) {
        std::cerr << "无法获取临时目录: " << e.what() << "\n";
    }
    // --- 文件操作 ---
    fs::path testfile = "/tmp/fs_test_12345.txt";
    try {
        // 创建测试文件
        {
            std::ofstream ofs(testfile);
            ofs << "test content";
        }
        // 查询文件信息
        std::cout << "文件是否存在: " << fs::exists(testfile) << "\n";
        std::cout << "是普通文件:   " << fs::is_regular_file(testfile) << "\n";
        std::cout << "文件大小:     " << fs::file_size(testfile) << " 字节\n";
        // 复制文件
        fs::path copyfile = "/tmp/fs_test_copy.txt";
        fs::copy_file(testfile, copyfile, fs::copy_options::overwrite_existing);
        std::cout << "复制后存在: " << fs::exists(copyfile) << "\n";
        // 重命名
        fs::path renamed = "/tmp/fs_test_renamed.txt";
        fs::rename(copyfile, renamed);
        std::cout << "重命名后: original=" << fs::exists(copyfile)
                  << " renamed=" << fs::exists(renamed) << "\n";
        // 清理
        fs::remove(testfile);
        fs::remove(renamed);
    } catch (const fs::filesystem_error& e) {
        std::cerr << "文件系统操作失败: " << e.what() << "\n";
    }
    // --- 目录迭代 ---
    try {
        std::cout << "\n/tmp 目录下的前5个条目:\n";
        int count = 0;
        for (const auto& entry : fs::directory_iterator("/tmp")) {
            if (count++ >= 5) break;
            std::cout << "  " << entry.path().filename()
                      << " [" << (entry.is_directory() ? "目录" : "文件") << "]\n";
        }
    } catch (const fs::filesystem_error& e) {
        std::cerr << "迭代失败: " << e.what() << "\n";
    }
    // --- 磁盘空间 ---
    try {
        auto info = fs::space("/tmp");
        std::cout << "\n磁盘空间信息:\n";
        std::cout << "  总容量: " << info.capacity / (1024*1024) << " MB\n";
        std::cout << "  可用空间: " << info.available / (1024*1024) << " MB\n";
    } catch (const fs::filesystem_error& e) {
        std::cerr << "查询磁盘空间失败: " << e.what() << "\n";
    }
    // --- error_code 版本(不抛异常)---
    std::error_code ec;
    bool exists = fs::exists("/nonexistent_path_xyz", ec);
    std::cout << "\n不存在的路径: exists=" << exists
              << " error=" << ec.message() << "\n";
}
// ============================================================
// 主函数
// ============================================================
int main() {
    std::cout << "C++ 输入/输出库演示\n";
    std::cout << "===================\n";
    demo_fmtflags();
    demo_stream_state();
    demo_string_streams();
    demo_file_streams();
    demo_manipulators();
    demo_syncstream();
    demo_filesystem();
    return 0;
}

https://godbolt.org/z/vPE5M4r88

关键设计思想总结

分层设计
  ios_base     → 格式标志、流状态、openmode(与字符类型无关)
  basic_ios    → 缓冲区指针、fill字符、locale(依赖字符类型)
  basic_istream/basic_ostream → 格式化层
  basic_streambuf → 缓冲区层
  具体实现(filebuf/stringbuf) → 数据源层
虚函数扩展点
  underflow()  → 自定义"读区空了时的行为"
  overflow()   → 自定义"写区满了时的行为"
  seekoff/seekpos → 自定义定位行为
  sync()       → 自定义同步行为
错误处理双轨制
  抛异常版本   → 适合"错误是真正异常情况"的代码
  error_code版本 → 适合"错误是正常情况"的高性能代码

常见陷阱和最佳实践


陷阱 说明 解决方案
setw() 只生效一次 每次输出后宽度自动重置为0 每次输出前重新设置
get() 不消耗分隔符 读取后分隔符仍在流中 改用 getline() 或手动 ignore()
混用 cin/scanf 两者的缓冲区可能不同步 只使用一种,或调用 sync_with_stdio(false)
string_viewview() 悬空 oss.view() 在修改 oss 后失效 及时使用,或用 str() 复制
osyncstreamflush() 不立即刷新 只记录意图,不真正刷新 emit()flush_emit()
filesystem::path 的比较 == 是词法比较,不是文件等价 文件等价用 equivalent()

C++26 并发支持库 完全解读

基于 N5032 标准文档第32章,面向 C++ 开发者的中文详解

目录

  1. 总览
  2. 基础要求
  3. 停止令牌 Stop Tokens
  4. 线程 Threads
  5. 原子操作 Atomic Operations
  6. 互斥锁 Mutual Exclusion
  7. 条件变量 Condition Variables
  8. 信号量 Semaphores
  9. 协调类型 Latch 和 Barrier
  10. 期货 Futures
  11. 安全回收 Safe Reclamation
  12. 综合示例:线程池
  13. 关键知识点总结

总览

C++ 并发支持库提供了一整套用于创建和管理线程、执行互斥、线程间通信的组件。

+------------------------------------------------------------+
|                   C++ 并发支持库(第32章)                  |
+--------------------+---------------------------------------+
| 子章节              | 头文件                                |
+--------------------+---------------------------------------+
| 32.3 停止令牌       | <stop_token>                          |
| 32.4 线程           | <thread>                              |
| 32.5 原子操作       | <atomic>, <stdatomic.h>               |
| 32.6 互斥锁         | <mutex>, <shared_mutex>               |
| 32.7 条件变量       | <condition_variable>                  |
| 32.8 信号量         | <semaphore>                           |
| 32.9 协调类型       | <latch>, <barrier>                    |
| 32.10 期货          | <future>                              |
| 32.11 安全回收      | <rcu>, <hazard_pointer>               |
+--------------------+---------------------------------------+

基础要求

锁类型层级

C++ 标准定义了一组可锁类型的接口要求,形成层级关系:

Cpp17BasicLockable
lock 阻塞直到获得锁
unlock 释放锁

Cpp17Lockable
继承 BasicLockable
try_lock 尝试加锁不
阻塞返回bool

Cpp17TimedLockable
继承 Lockable
try_lock_for
相对时间内尝试
try_lock_until 绝对时间点前尝试

Cpp17SharedLockable
lock_shared 共享锁
try_lock_shared
unlock_shared

Cpp17SharedTimedLockable
继承 SharedLockable
try_lock_shared_for
try_lock_shared_until

超时规范

函数命名规则:

  • _for 结尾:接受相对时间(duration),推荐使用 steady_clock 计时
  • _until 结尾:接受绝对时间点(time_point),使用指定 clock 计时
    超时的实际耗时公式:
    t r e a l = D t + D i + D m t_{real} = D_t + D_i + D_m treal=Dt+Di+Dm
    其中:
  • D t D_t Dt 是用户指定的等待时长
  • D i D_i Di 是中断响应、函数返回、调度带来的"实现质量"延迟(Implementation quality delay)
  • D m D_m Dm 是处理器和内存资源争用带来的"管理质量"延迟(Management quality delay)

停止令牌

停止令牌(Stop Token)是 C++20 引入的机制,允许一个线程礼貌地请求另一个线程停止执行。

核心概念

get_token

注册回调

request_stop

stop_source
停止源
可发出停止请求

stop_token
停止令牌
可检测停止请求

stop_callback
停止回调
停止时自动调用

工作机制

线程A(持有 stop_source)            线程B(持有 stop_token)
        |                                    |
        |  source.request_stop()            |  token.stop_requested()
        |  返回 true                         |  --> 返回 true
        |                                    |
        |-------- 触发所有已注册回调 -------->|

stop_source 与 stop_token 完整示例

#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>
int main() {
    // 创建停止源(持有共享停止状态,类似 shared_ptr 引用计数)
    std::stop_source source;
    // 从停止源获取令牌(用于传递给需要被停止的线程)
    std::stop_token token = source.get_token();
    // 注册停止回调:当 request_stop() 被调用时,此 lambda 自动执行
    // 回调在 request_stop() 的调用线程中同步执行
    std::stop_callback cb(token, [] {
        std::cout << "收到停止请求,正在清理资源...\n";
    });
    // 模拟工作线程:接收 token 的副本(token 是可复制的)
    std::thread worker([tok = token] {
        // 循环检查是否收到停止请求
        while (!tok.stop_requested()) {
            std::cout << "工作线程运行中...\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        std::cout << "工作线程收到停止信号,退出。\n";
    });
    // 主线程等待一段时间后请求停止
    std::this_thread::sleep_for(std::chrono::milliseconds(350));
    // request_stop() 只有第一次调用有效,之后调用无效返回 false
    bool first_request = source.request_stop(); // true
    bool second_request = source.request_stop(); // false(已经请求过)
    worker.join();
    return 0;
}

https://godbolt.org/z/3zh4hh19T

never_stop_token

#include <stop_token>
// never_stop_token:永不停止的特殊令牌
// stop_possible() 和 stop_requested() 均为 constexpr 返回 false
// 适合用于不需要停止功能的通用模板代码中作为默认参数
std::never_stop_token nst;
// nst.stop_possible() == false  (编译期常量,零开销)
// nst.stop_requested() == false  (编译期常量,零开销)

stop_source 与 inplace_stop_source 对比


特性 stop_source inplace_stop_source
所有权 共享所有权(引用计数,类似 shared_ptr) 唯一所有权(停止状态直接嵌入对象内部)
可复制 是(多个 source 指向同一状态) 否(不可移动也不可复制)
令牌类型 stop_token inplace_stop_token
适用场景 跨线程共享停止信号 生命周期明确、使用方全部在 source 作用域内的场景

#include <stop_token>
#include <iostream>
int main() {
    std::inplace_stop_source src;
    // inplace_stop_token 只持有指向 src 的指针,不参与所有权
    std::inplace_stop_token tok = src.get_token();
    // 注意:回调对象的生命周期必须嵌套在 src 的生命周期内
    std::inplace_stop_callback cb(tok, [] {
        std::cout << "inplace 停止回调触发\n";
    });
    src.request_stop(); // 触发回调
    // src 析构时,其内部停止状态随之销毁
    return 0;
}

线程

std::thread 基础

#include <iostream>
#include <thread>
#include <string>
// 普通函数作为线程入口
void task(int id, const std::string& msg) {
    std::cout << "线程 " << id << ": " << msg << "\n";
}
int main() {
    // 构造函数:新线程立即开始执行
    // f 和 args 会在构造线程中被 decay-copy,然后在新线程中调用
    std::thread t1(task, 1, "Hello");
    // joinable() 返回 true 表示线程代表了一个活跃的执行
    // 默认构造、被 move 走、join/detach 后均为 not joinable
    if (t1.joinable()) {
        t1.join(); // 阻塞等待线程 t1 结束
    }
    // 移动语义:线程所有权可以转移,t2 不再代表线程
    std::thread t2(task, 2, "World");
    std::thread t3 = std::move(t2); // t2 变为 not joinable
    t3.join();
    // hardware_concurrency() 返回建议的并发线程数量
    // 只是提示,返回 0 表示无法确定
    unsigned int n = std::thread::hardware_concurrency();
    std::cout << "建议线程数: " << n << "\n";
    return 0;
}

https://godbolt.org/z/T8azra631

thread 生命周期状态机

std::thread 状态转换:
  [默认构造] 或 [被 move 走] 或 [join/detach 后]
      |
      |  不可加入(not joinable),get_id() == id()
      |
      | thread(f, args...)  --> 创建新线程,立即开始执行
      v
  [可加入 joinable]
      |
      |--- join()  ----> 阻塞等待线程完成 ----> [不可加入]
      |
      |--- detach() ---> 线程独立运行 ---------> [不可加入]
      |
      |--- 析构时若仍 joinable() --> 调用 terminate()!

警告: thread 对象在 joinable()true 时被析构,程序会调用 std::terminate() 终止!

std::jthread:自动 join 的线程(C++20)

jthread 是改进版线程,析构时自动发出停止请求并 join,避免了忘记 join 的问题:

#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>
int main() {
    // jthread 构造时,若 f 的第一个参数是 stop_token,自动传入内部的 stop_token
    // 否则不传 stop_token(和 std::thread 一样的用法)
    std::jthread jt([](std::stop_token stoken) {
        int count = 0;
        while (!stoken.stop_requested()) {
            std::cout << "jthread 工作中,计数: " << count++ << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        std::cout << "jthread 优雅退出\n";
    });
    std::this_thread::sleep_for(std::chrono::milliseconds(350));
    // jthread 析构时自动执行:
    //   1. jt.request_stop()  --> 向内部 stop_source 发出停止请求
    //   2. jt.join()          --> 等待线程结束
    // 无需手动操作!
    return 0;
}

https://godbolt.org/z/nYWEj4nYo

jthread vs thread 对比


特性 std::thread std::jthread
析构时行为 若 joinable 则 terminate 自动 request_stop + join
停止支持 无(需要手动实现) 内置 stop_source/stop_token
引入版本 C++11 C++20
是否可 detach 是(但通常不需要)

this_thread 命名空间

#include <thread>
#include <chrono>
#include <iostream>
void demo_this_thread() {
    // 获取当前线程的唯一标识符
    // 每次调用返回相同值;不等于默认构造的 thread::id
    std::thread::id my_id = std::this_thread::get_id();
    std::cout << "当前线程 ID: " << my_id << "\n";
    // yield:向实现提示可以重新调度
    // 不做任何同步保证,仅是提示
    std::this_thread::yield();
    // sleep_for:相对睡眠(阻塞当前线程至少 rel_time)
    // 实现应使用 steady_clock 计时
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    // sleep_until:绝对睡眠(阻塞当前线程直到 abs_time)
    auto wake_time = std::chrono::steady_clock::now()
                   + std::chrono::seconds(1);
    std::this_thread::sleep_until(wake_time);
}

原子操作

原子操作是不可分割的操作,在执行过程中不会被其他线程的操作打断。

内存顺序(memory_order)

这是并发编程中最重要的概念,控制原子操作与周围内存操作的可见性和顺序:

严格程度(从严到宽):
memory_order::seq_cst   (顺序一致,默认值)
  所有 seq_cst 操作在所有线程中有统一的全局顺序
  最安全,但性能最低
memory_order::acq_rel   (获取-释放,用于读改写操作如 CAS)
  同时具有 acquire 和 release 语义
memory_order::release   (释放,用于写操作/store)
  本操作之前的所有写操作,对执行了 acquire 读取此值的线程可见
memory_order::acquire   (获取,用于读操作/load)
  读到 release 写入的值后,该值之前的所有写对本线程可见
memory_order::relaxed   (宽松)
  只保证操作本身的原子性,不提供任何跨线程同步保证
  性能最高,但最难正确使用

release-acquire 同步关系示意:
s t o r e ( r e l e a s e ) → 同步 l o a d ( a c q u i r e )  读到了该值 store(release) \xrightarrow{同步} load(acquire)\ 读到了该值 store(release)同步 load(acquire) 读到了该值

线程1 (producer)                    线程2 (consumer)
data = 42;          (1)
ready.store(true,   (2) --> [读到true] ready.load(acquire) (3)
   release);                                  |
                                        assert(data == 42) (4) 保证成立

(2) release-写 同步于 (3) acquire-读,因此 (1) 的写 先序可见于 (4)。

release-acquire 同步完整示例

#include <atomic>
#include <thread>
#include <cassert>
#include <iostream>
std::atomic<int>  data{0};
std::atomic<bool> ready{false};
void producer() {
    // relaxed 写:此写操作本身不提供同步
    data.store(42, std::memory_order::relaxed);        // (1)
    // release 写:确保 (1) 对所有 acquire 读到 true 的线程可见
    ready.store(true, std::memory_order::release);     // (2)
}
void consumer() {
    // acquire 读:看到 ready==true 后,保证能看到 (2) 之前的所有写操作
    while (!ready.load(std::memory_order::acquire));   // (3)
    // 到这里,data 必定等于 42
    assert(data.load(std::memory_order::relaxed) == 42); // (4)
    std::cout << "data = " << data.load() << "\n";
}
int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}

https://godbolt.org/z/Y8eGbz611

atomic 基本操作

#include <atomic>
#include <iostream>
int main() {
    std::atomic<int> counter{0};
    // store:原子写(只允许 relaxed/release/seq_cst)
    counter.store(10);
    // load:原子读(只允许 relaxed/acquire/seq_cst)
    int val = counter.load(); // val == 10
    // exchange:原子地写入新值,返回旧值(读改写操作)
    int old = counter.exchange(20); // old == 10, counter == 20
    // compare_exchange_strong(CAS 操作):
    // 若 counter 当前值 == expected,则将 counter 改为 desired,返回 true
    // 否则将 counter 当前值写入 expected,返回 false(不修改 counter)
    int expected = 20;
    bool success = counter.compare_exchange_strong(expected, 30);
    // success == true, counter == 30
    // compare_exchange_strong 失败的情况:
    expected = 999; // 故意设置错误的期望值
    success = counter.compare_exchange_strong(expected, 100);
    // success == false, counter 不变(仍为30), expected 被更新为 30
    // fetch_add:原子加,返回操作之前的旧值
    int prev = counter.fetch_add(5); // prev == 30, counter == 35
    // 运算符形式(等价于 fetch_add/fetch_sub 但返回新值)
    counter += 1;       // counter == 36,等价于 fetch_add(1)(忽略返回值)
    ++counter;          // counter == 37,等价于 fetch_add(1) + 1
    int v = counter++;  // v == 37(旧值), counter == 38,等价于 fetch_add(1)
    std::cout << "最终值: " << counter.load() << "\n"; // 38
    return 0;
}

https://godbolt.org/z/eeWW4G7GK

compare_exchange_weak 与 strong 的区别

#include <atomic>
std::atomic<int> val{0};
// weak 版本:可能发生"虚假失败"(spurious failure)
// 即使 val == expected,也可能返回 false
// 在 ARM 等 LL/SC 架构上性能优于 strong
// 适合在循环中使用
void update_with_weak(int new_val) {
    int expected = val.load(std::memory_order::relaxed);
    // 循环重试:weak 的虚假失败会导致多循环一次,但不影响正确性
    while (!val.compare_exchange_weak(
        expected,                      // 失败时被更新为 val 的当前值
        new_val,
        std::memory_order::release,    // 成功时的内存顺序
        std::memory_order::relaxed     // 失败时的内存顺序
    )) {
        // expected 已被更新,下次循环用新值继续尝试
    }
}
// strong 版本:只有值真正不匹配才失败(不会虚假失败)
// 适合只需要尝试一次的场景(不在循环中)
bool try_update_once(int expected_val, int new_val) {
    // 注意:expected_val 是按值传入,失败时会被修改
    return val.compare_exchange_strong(expected_val, new_val);
}

无锁栈(atomic 高级用法)

#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
// 无锁栈(演示 CAS 循环的典型用法)
template<typename T>
class LockFreeStack {
    struct Node {
        T data;
        Node* next;
        explicit Node(T val) : data(std::move(val)), next(nullptr) {}
    };
    std::atomic<Node*> head{nullptr};
public:
    void push(T val) {
        Node* new_node = new Node(std::move(val));
        // 先设置 next 为当前 head
        new_node->next = head.load(std::memory_order::relaxed);
        // CAS 循环:尝试将 head 从旧值替换为 new_node
        // 若中途 head 被其他线程修改,next 被更新后重试
        while (!head.compare_exchange_weak(
            new_node->next,             // expected(失败时自动更新)
            new_node,                   // desired
            std::memory_order::release, // 成功:新节点对所有 acquire 可见
            std::memory_order::relaxed  // 失败:不需要同步
        ));
    }
    bool pop(T& result) {
        Node* old_head = head.load(std::memory_order::acquire);
        // 尝试将 head 向前移动一步
        while (old_head && !head.compare_exchange_weak(
            old_head,
            old_head->next,
            std::memory_order::acquire,
            std::memory_order::relaxed
        ));
        if (!old_head) return false; // 栈为空
        result = std::move(old_head->data);
        delete old_head; // 注意:真实无锁代码这里需要危险指针或 RCU 保护
        return true;
    }
    ~LockFreeStack() {
        T dummy;
        while (pop(dummy));
    }
};
int main() {
    LockFreeStack<int> stack;
    stack.push(1);
    stack.push(2);
    stack.push(3);
    int val;
    while (stack.pop(val)) {
        std::cout << val << "\n"; // 输出 3, 2, 1(后进先出)
    }
    return 0;
}

https://godbolt.org/z/Mq9xffGcb

atomic_flag 与自旋锁

atomic_flag 是唯一保证无锁的原子类型,可用于实现自旋锁:

#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
// 使用 atomic_flag 实现自旋锁
class SpinLock {
    // atomic_flag 默认初始化为 clear(false)状态
    std::atomic_flag flag{};
public:
    void lock() {
        // test_and_set:原子地设为 true,返回旧值
        // 若旧值为 false,说明成功获得锁
        // 若旧值为 true,说明锁被占用,继续自旋
        while (flag.test_and_set(std::memory_order::acquire)) {
            // 空循环等待(自旋)
            // 在 x86 上可插入 _mm_pause() 减少功耗和提高性能
        }
    }
    void unlock() {
        // clear:原子地设为 false,释放锁
        // release 顺序保证临界区内的写操作对下一个 acquire 可见
        flag.clear(std::memory_order::release);
    }
};
SpinLock spin;
int shared_counter = 0;
void increment(int times) {
    for (int i = 0; i < times; ++i) {
        spin.lock();
        ++shared_counter; // 临界区
        spin.unlock();
    }
}
int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(increment, 10000);
    }
    for (auto& t : threads) t.join();
    std::cout << "计数器最终值: " << shared_counter
              << "(期望 40000)\n";
    return 0;
}

https://godbolt.org/z/fvdf5zeK7

内存屏障(Fence)

内存屏障比 release/acquire 更灵活,但也更难正确使用:

#include <atomic>
#include <thread>
#include <iostream>
int data_value = 0;
std::atomic<bool> data_ready{false};
void fence_producer() {
    data_value = 42; // 普通(非原子)写
    // release fence:保证 fence 之前的所有写操作(包括非原子写)
    // 对执行了对应 acquire fence 的线程可见
    std::atomic_thread_fence(std::memory_order::release);
    // relaxed store:只需保证指针更新可见,不需要本身是 release
    data_ready.store(true, std::memory_order::relaxed);
}
void fence_consumer() {
    // 等待数据就绪
    while (!data_ready.load(std::memory_order::relaxed));
    // acquire fence:与 release fence 配对,确保 fence 之后的读
    // 能看到对应 release fence 之前的所有写
    std::atomic_thread_fence(std::memory_order::acquire);
    // 现在可以安全读取 data_value,保证能看到 42
    std::cout << "data_value = " << data_value << "\n"; // 42
}

C++26 新增:atomic modify-write 操作

C++26 新增了 store_addstore_substore_and 等操作,与 fetch_add 的区别在于:

  • fetch_add(n):读改写操作(RMW),返回旧值,使用 acquire/release 语义
  • store_add(n)仅写操作(modify-write),不返回旧值,只允许 relaxed/release/seq_cst,允许更激进的硬件优化(如树形规约)
#include <atomic>
std::atomic<int> counter{0};
// fetch_add:RMW,保证严格的修改顺序读取
int old = counter.fetch_add(1); // 返回旧值
// store_add:modify-write,不需要读取旧值,可能更高效
counter.store_add(1, std::memory_order::relaxed); // 不返回值

互斥锁

mutex 类型一览

互斥锁类型体系:
std::mutex
  非递归,独占,最基础的互斥锁
std::recursive_mutex
  可递归(同线程可多次加锁),独占
  每次 lock() 对应一次 unlock()
std::timed_mutex
  非递归,独占,额外支持 try_lock_for / try_lock_until
std::recursive_timed_mutex
  可递归,独占,带超时
std::shared_mutex
  非递归,支持共享(读)锁 + 独占(写)锁
  多线程可同时持有共享锁,但独占锁与其他锁互斥
std::shared_timed_mutex
  非递归,共享/独占,带超时

RAII 锁包装器速查


包装器 特点 适用场景
lock_guard<M> 构造加锁,析构解锁,不可移动不可复制 简单作用域保护
scoped_lock<M...> 同时锁多个互斥量,内置死锁避免算法 需要同时持有多个锁
unique_lock<M> 可延迟加锁、可手动加解锁、可转移所有权 配合条件变量;需要灵活控制锁时机
shared_lock<M> 获取共享(读)锁 shared_mutex 的读端

读写锁示例

#include <shared_mutex>
#include <mutex>
#include <iostream>
#include <thread>
#include <vector>
#include <string>
// 线程安全的读写缓存
class ThreadSafeCache {
    mutable std::shared_mutex rw_mutex; // mutable 允许 const 方法加锁
    std::string data{"初始数据"};
public:
    // 读操作:获取共享锁(多线程可同时读,不影响其他读者)
    std::string read() const {
        std::shared_lock lock(rw_mutex); // CTAD 推导出 shared_lock<shared_mutex>
        return data;
    }
    // 写操作:获取独占锁(阻止所有其他读写操作)
    void write(const std::string& new_data) {
        std::unique_lock lock(rw_mutex); // 独占锁
        data = new_data;
    }
};
// 防止死锁:同时锁两个互斥量
void safe_lock_both_mutex_demo() {
    std::mutex m1, m2;
    // scoped_lock 内部使用死锁避免算法(类似 std::lock)
    // 无论 m1 m2 的顺序,都不会死锁
    std::scoped_lock lock(m1, m2);
    // 临界区:同时持有 m1 和 m2
}
int main() {
    ThreadSafeCache cache;
    // 多个读线程并发读
    std::vector<std::thread> readers;
    for (int i = 0; i < 3; ++i) {
        readers.emplace_back([&cache, i] {
            std::cout << "读线程" << i << ": " << cache.read() << "\n";
        });
    }
    // 写线程
    std::thread writer([&cache] {
        cache.write("更新后的数据");
        std::cout << "写完成\n";
    });
    for (auto& t : readers) t.join();
    writer.join();
    return 0;
}

https://godbolt.org/z/fq1531Kae

unique_lock 高级用法

#include <mutex>
#include <chrono>
#include <iostream>
// 演示 unique_lock 的灵活性
void unique_lock_demo() {
    std::mutex mtx;
    // 1. defer_lock:构造时不加锁
    {
        std::unique_lock lock(mtx, std::defer_lock); // 未加锁
        // ... 做一些准备工作 ...
        lock.lock();   // 手动加锁
        // 临界区
        lock.unlock(); // 手动解锁(析构时若 owns_lock() 会再次解锁)
    }
    // 2. adopt_lock:已经持有锁,让 unique_lock 接管
    {
        mtx.lock(); // 手动加锁
        std::unique_lock lock(mtx, std::adopt_lock); // 接管,不再加锁
        // 析构时自动解锁
    }
    // 3. try_to_lock:尝试加锁,不阻塞
    {
        std::unique_lock lock(mtx, std::try_to_lock);
        if (lock.owns_lock()) {
            std::cout << "成功获取锁\n";
        } else {
            std::cout << "锁已被占用\n";
        }
    }
    // 4. 带超时的加锁
    {
        std::timed_mutex timed_mtx;
        std::unique_lock timed_lock(
            timed_mtx,
            std::chrono::milliseconds(100) // 最多等 100ms
        );
        if (timed_lock.owns_lock()) {
            std::cout << "在超时前获取了锁\n";
        }
    }
}

call_once:线程安全的一次性初始化

#include <mutex>
#include <iostream>
#include <thread>
#include <vector>
class Singleton {
    static std::once_flag init_flag;
    static Singleton* instance;
    Singleton() {
        std::cout << "Singleton 初始化(只执行一次)\n";
    }
public:
    static Singleton& get() {
        // call_once 保证 lambda 只被调用一次
        // 即使多个线程同时进入,也只有一个线程执行初始化
        // 其他线程被阻塞,直到初始化完成
        std::call_once(init_flag, [] {
            instance = new Singleton();
        });
        return *instance;
    }
    void do_work() {
        std::cout << "工作中(来自线程 "
                  << std::this_thread::get_id() << ")\n";
    }
};
std::once_flag Singleton::init_flag;
Singleton* Singleton::instance = nullptr;
int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back([] { Singleton::get().do_work(); });
    }
    for (auto& t : threads) t.join();
    // "Singleton 初始化" 只打印一次
    return 0;
}

https://godbolt.org/z/T54fMTWTE

条件变量

条件变量用于线程间的等待/通知机制。

基本工作模式

生产者-消费者信号流:
生产者线程                          消费者线程
    |                                    |
    | 1. 获取互斥锁                      | 1. 获取互斥锁
    | 2. 修改共享状态                    | 2. while (!条件满足) {
    | 3. cv.notify_one()                |       cv.wait(lock)
    | 4. 释放锁                          |       // 原子释放锁 + 进入等待
    |                                    |       // 被唤醒后重新获取锁
    |                                    |    }
    |                                    | 3. 使用共享状态
    |                                    | 4. 释放锁

为什么必须用 while 而不是 if
因为存在虚假唤醒(spurious wakeup):线程可能在没有 notify 的情况下被唤醒。
while 或谓词版本的 wait 可以防止这个问题。

生产者消费者完整示例

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <string>
// 线程安全队列
template<typename T>
class SafeQueue {
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable cv;
    bool done = false;
public:
    // 生产者:推入元素
    void push(T item) {
        {
            std::lock_guard lock(mtx);
            queue.push(std::move(item));
        }
        cv.notify_one(); // 通知至少一个等待的消费者
    }
    // 通知消费者不再有新数据
    void finish() {
        {
            std::lock_guard lock(mtx);
            done = true;
        }
        cv.notify_all(); // 唤醒所有等待的消费者
    }
    // 消费者:取出元素,队列空时阻塞等待
    bool pop(T& item) {
        std::unique_lock lock(mtx); // 条件变量需要 unique_lock
        // wait(lock, pred) 等价于:
        // while (!pred()) { cv.wait(lock); }
        // wait 会原子地 unlock + 进入等待,被唤醒时重新 lock
        cv.wait(lock, [this] {
            return !queue.empty() || done;
        });
        if (queue.empty()) return false; // done==true 且队列为空
        item = std::move(queue.front());
        queue.pop();
        return true;
    }
};
int main() {
    SafeQueue<std::string> sq;
    std::thread producer([&sq] {
        for (int i = 0; i < 5; ++i) {
            sq.push("任务 " + std::to_string(i));
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
        sq.finish(); // 通知消费者:没有更多任务了
    });
    std::thread consumer([&sq] {
        std::string item;
        while (sq.pop(item)) {
            std::cout << "消费: " << item << "\n";
        }
        std::cout << "消费者完成\n";
    });
    producer.join();
    consumer.join();
    return 0;
}

https://godbolt.org/z/o6GT1Mo9T

condition_variable_any 与可中断等待

condition_variable_any 支持任意 BasicLockable 类型,并支持配合 stop_token可中断等待

#include <condition_variable>
#include <mutex>
#include <stop_token>
#include <thread>
#include <iostream>
std::condition_variable_any cv;
std::mutex mtx;
bool data_ready = false;
// 可中断的等待函数:stop 请求到来时立即退出等待
void interruptible_worker(std::stop_token stoken) {
    std::unique_lock lock(mtx);
    // 可中断等待:
    // - 正常情况:条件满足时返回 true
    // - 中断情况:stop_requested() 为 true 时返回 false
    bool result = cv.wait(lock, stoken, [] { return data_ready; });
    if (result) {
        std::cout << "条件满足,正常退出等待\n";
    } else {
        // stoken.stop_requested() 一定为 true
        std::cout << "收到停止请求,退出等待(条件可能未满足)\n";
    }
}
int main() {
    std::stop_source ss;
    std::thread worker(interruptible_worker, ss.get_token());
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    // 发出停止请求,worker 从 wait 中退出(无需 notify)
    ss.request_stop();
    worker.join();
    return 0;
}

https://godbolt.org/z/h6ffj81hM

信号量

信号量比互斥锁更轻量,维护一个内部计数器,用于控制对资源的并发访问数量。
计数器始终满足: c o u n t e r ≥ 0 counter \geq 0 counter0
操作语义:

  • release(n) c o u n t e r + = n counter \mathrel{+}= n counter+=n,唤醒等待的线程(n 默认为 1)
  • acquire():若 c o u n t e r > 0 counter > 0 counter>0 c o u n t e r − = 1 counter \mathrel{-}= 1 counter=1 立即返回;否则阻塞

信号量与互斥锁的区别


特性 mutex binary_semaphore(初始值1)
所有权 有(只有加锁的线程可以解锁) 无(任意线程可以 release)
用途 保护临界区 线程间信号传递、事件通知
初始状态 解锁 可自定义(0=等待,1=可用)

#include <semaphore>
#include <thread>
#include <iostream>
#include <vector>
// 示例1:限流器(同时最多允许 N 个线程访问资源)
template<int MaxConcurrency>
class RateLimiter {
    // counting_semaphore<N>:内部计数最大值至少为 N
    std::counting_semaphore<MaxConcurrency> sem{MaxConcurrency};
public:
    void access(int thread_id) {
        sem.acquire(); // 计数-1;若为0则阻塞等待
        std::cout << "线程 " << thread_id << " 开始访问\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "线程 " << thread_id << " 释放资源\n";
        sem.release(); // 计数+1,唤醒等待的线程
    }
};
// 示例2:线程间单向信号传递(binary_semaphore)
void signal_demo() {
    // 初始值为 0:消费者会立即阻塞等待信号
    std::binary_semaphore signal{0};
    std::thread waiter([&signal] {
        std::cout << "等待信号...\n";
        signal.acquire(); // 阻塞,直到计数 > 0
        std::cout << "收到信号,继续执行\n";
    });
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    signal.release(); // 发出信号(计数从0变为1)
    waiter.join();
}
int main() {
    RateLimiter<3> limiter; // 同时最多3个线程
    std::vector<std::thread> threads;
    for (int i = 0; i < 8; ++i) {
        threads.emplace_back([&limiter, i] { limiter.access(i); });
    }
    for (auto& t : threads) t.join();
    signal_demo();
    return 0;
}

协调类型

Latch(闩锁)

Latch 是一次性的倒数计数器,一旦计数到零就永远保持在"打开"状态,不能重置。

latch 工作原理(expected = 3):
线程1  线程2  线程3
  |      |      |
  |      |      |-- count_down(1) --> counter: 3 -> 2
  |      |      |
  |      |-- count_down(1) -------> counter: 2 -> 1
  |      |
  |-- count_down(1) -------------> counter: 1 -> 0
                                         |
                                  解除所有 wait() 阻塞!
#include <latch>
#include <thread>
#include <iostream>
#include <vector>
int main() {
    const int N = 5;
    // 创建 latch,初始计数为 N
    // 当 count_down 被调用 N 次后(计数降为0),所有 wait() 返回
    std::latch sync_point{N};
    std::vector<std::thread> workers;
    for (int i = 0; i < N; ++i) {
        workers.emplace_back([&sync_point, i] {
            // 模拟每个线程的初始化工作(耗时不同)
            std::this_thread::sleep_for(
                std::chrono::milliseconds(50 * (i + 1)));
            std::cout << "线程 " << i << " 完成初始化\n";
            // 通知:我已就绪,同时等待所有线程就绪
            // arrive_and_wait = count_down(1) + wait()
            sync_point.arrive_and_wait();
            // 所有线程都到达后,同时开始实际工作
            std::cout << "线程 " << i << " 开始工作\n";
        });
    }
    for (auto& t : workers) t.join();
    return 0;
}

Barrier(屏障)

Barrier 是可复用的多阶段同步点,每个阶段结束时可执行一个完成函数:

barrier 阶段循环(expected = 3):
     第1阶段           第2阶段            第3阶段
T1 --|arrive|[等待] ---|arrive|[等待] ---|arrive|...
T2 --|arrive|          |arrive|          |arrive|
T3 --|arrive|          |arrive|          |arrive|
            |                  |
        完成函数()          完成函数()
        重置计数器          重置计数器
        进入下一阶段        进入下一阶段
#include <barrier>
#include <thread>
#include <iostream>
#include <vector>
int main() {
    const int N = 4;
    int phase_count = 0;
    std::vector<int> results(N, 0);
    // 完成函数:每个阶段所有线程到达后执行
    // 必须 noexcept!
    auto on_completion = [&]() noexcept {
        ++phase_count;
        std::cout << "=== 阶段 " << phase_count << " 完成 ===\n";
    };
    // barrier 创建时指定期望到达的线程数
    std::barrier sync{N, on_completion};
    std::vector<std::thread> workers;
    for (int i = 0; i < N; ++i) {
        workers.emplace_back([&, i] {
            // ---- 第1阶段:各自计算 ----
            results[i] = i * i;
            std::cout << "线程" << i << " 第1阶段: " << results[i] << "\n";
            // arrive_and_wait:到达屏障并等待所有线程
            // 完成函数在所有线程 arrive 后、wait 返回前执行
            sync.arrive_and_wait();
            // ---- 第2阶段:使用所有结果 ----
            int sum = 0;
            for (int r : results) sum += r;
            std::cout << "线程" << i << " 第2阶段,总和=" << sum << "\n";
            sync.arrive_and_wait();
        });
    }
    for (auto& t : workers) t.join();
    return 0;
}

arrive() 与 arrive_and_wait() 的区别

arrive() 的用途(分离到达和等待):
线程A:
  token = barrier.arrive();    // 立即返回,不阻塞
  do_some_other_work();        // 可以在等待期间做其他事
  barrier.wait(std::move(token)); // 然后再等待
arrive_and_wait():
  等价于 wait(arrive())
  一步完成:到达并立即等待
arrive_and_drop():
  从后续所有阶段中退出
  本阶段计数减1 + 后续阶段期望数减1
  适合中途退出不再参与的线程

期货

future 提供了在一个线程中获取另一个线程的计算结果的机制。

共享状态生命周期

创建共享状态
      |
      v
+----------+         +----------+
| promise  |-------->|  future  |
| 或 async  |  共享状态 | 或 shared |
| 或 task  |         |  _future  |
+----------+         +----------+
      |                    |
 set_value(v)          get() 阻塞等待,
 或 set_exception(e)   直到 set_value/set_exception 被调用
      |                    |
 状态变为"就绪"        返回值或重新抛出异常

promise 与 future 基础

#include <future>
#include <thread>
#include <iostream>
#include <stdexcept>
int main() {
    // promise:异步结果的"写端"
    std::promise<int> prom;
    // get_future():获取 future(只能调用一次)
    std::future<int> fut = prom.get_future();
    std::thread worker([p = std::move(prom)]() mutable {
        try {
            int result = 6 * 7; // 模拟计算
            // set_value:将值存入共享状态,future 变为"就绪"
            p.set_value(result);
        } catch (...) {
            // set_exception:将异常存入共享状态
            p.set_exception(std::current_exception());
        }
        // 如果 promise 析构时共享状态还未就绪
        // 会自动存入 broken_promise 异常
    });
    // get():阻塞等待 future 就绪,返回值或重新抛出异常
    try {
        int result = fut.get(); // 只能调用一次!
        std::cout << "结果: " << result << "\n"; // 42
    } catch (const std::exception& e) {
        std::cout << "异常: " << e.what() << "\n";
    }
    worker.join();
    return 0;
}

async:最简单的异步执行方式

#include <future>
#include <iostream>
#include <vector>
#include <numeric>
// 并行求和示例
long long sum_range(const std::vector<int>& v, size_t start, size_t end) {
    long long s = 0;
    for (size_t i = start; i < end; ++i) s += v[i];
    return s;
}
int main() {
    std::vector<int> data(1000000);
    std::iota(data.begin(), data.end(), 1); // 填入 1, 2, ..., 1000000
    size_t mid = data.size() / 2;
    // launch::async:立即在新线程中开始执行
    std::future<long long> f1 = std::async(
        std::launch::async,
        sum_range,
        std::cref(data), 0, mid
    );
    // launch::deferred:延迟执行,直到 get()/wait() 被调用时才在调用线程执行
    std::future<long long> f2 = std::async(
        std::launch::deferred,
        sum_range,
        std::cref(data), mid, data.size()
    );
    // 默认策略(async | deferred):由实现自行选择
    // 不保证并行!可能在调用线程中延迟执行
    auto f3 = std::async([] { return 100LL; });
    // get() 等待并取回结果
    // f2 的 get() 调用触发延迟函数在当前线程执行
    long long total = f1.get() + f2.get() + f3.get();
    std::cout << "总和: " << total << "\n";
    return 0;
}

shared_future:可多线程共享的 future

#include <future>
#include <thread>
#include <iostream>
#include <vector>
int main() {
    std::promise<std::string> prom;
    // future<T>:只能被一个线程 get()(get() 后 valid() 变 false)
    // shared_future<T>:可以被多个线程共享,每个线程都可以 get()
    std::shared_future<std::string> shared = prom.get_future().share();
    std::vector<std::thread> readers;
    for (int i = 0; i < 3; ++i) {
        readers.emplace_back([shared, i] {
            // 多个线程同时调用 get() 是安全的(但结果共享,需要只读访问)
            const std::string& msg = shared.get();
            std::cout << "读线程 " << i << " 收到: " << msg << "\n";
        });
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    prom.set_value("广播消息:所有线程都能读到");
    for (auto& t : readers) t.join();
    return 0;
}

packaged_task:包装可调用对象

#include <future>
#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
// 简单任务队列(线程池的简化版)
class TaskQueue {
    std::queue<std::packaged_task<int()>> tasks;
    std::mutex mtx;
    std::condition_variable cv;
    bool stop_flag = false;
    std::thread worker_thread;
public:
    TaskQueue() {
        worker_thread = std::thread([this] {
            while (true) {
                std::packaged_task<int()> task;
                {
                    std::unique_lock lock(mtx);
                    cv.wait(lock, [this] {
                        return !tasks.empty() || stop_flag;
                    });
                    if (stop_flag && tasks.empty()) return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }
                // 执行任务:task() 会自动将返回值存入共享状态
                task();
            }
        });
    }
    // 提交一个可调用对象,返回 future 以获取结果
    std::future<int> submit(std::function<int()> f) {
        // packaged_task 将 f 包装成:执行 f 并将结果存入 promise
        std::packaged_task<int()> task(std::move(f));
        std::future<int> result = task.get_future();
        {
            std::lock_guard lock(mtx);
            tasks.push(std::move(task));
        }
        cv.notify_one();
        return result;
    }
    ~TaskQueue() {
        {
            std::lock_guard lock(mtx);
            stop_flag = true;
        }
        cv.notify_all();
        worker_thread.join();
    }
};
int main() {
    TaskQueue tq;
    auto f1 = tq.submit([] { return 1 + 1; });
    auto f2 = tq.submit([] { return 10 * 5; });
    auto f3 = tq.submit([] { return 100 / 4; });
    std::cout << "1+1 = " << f1.get() << "\n";   // 2
    std::cout << "10*5 = " << f2.get() << "\n";  // 50
    std::cout << "100/4 = " << f3.get() << "\n"; // 25
    return 0;
}

安全回收

RCU(读-复制-更新)

RCU 是专为频繁读、极少写场景优化的同步机制,读者完全无锁。
核心思想:

  • 读者:进入"保护区域"后安全读取,无需加锁
  • 写者:复制旧数据,修改副本,原子替换指针,然后"退休"旧数据
  • 回收:等所有保护区域中正在读旧数据的读者退出后,才真正删除旧数据
RCU 工作时序:
读者1: [--保护区域--读取旧数据-----------退出--]
读者2:     [--保护区域--读取旧数据--退出--]
写者:    |复制+修改|替换指针|retire旧数据|
                               |        |
                等待现有读者    |        |
                全部退出区域   |        |
                               +--删除旧数据--+
#include <rcu>
#include <atomic>
#include <thread>
#include <iostream>
#include <string>
#include <memory>
// 继承 rcu_obj_base<T> 使 T 支持 RCU 回收
struct Config : public std::rcu_obj_base<Config> {
    std::string name;
    int value;
    Config(std::string n, int v) : name(std::move(n)), value(v) {}
};
// 全局配置指针(原子操作保证指针更新的可见性)
std::atomic<Config*> g_config{new Config("初始配置", 0)};
// 读者:高频调用,使用 RCU 保护区域
void reader(int id) {
    for (int i = 0; i < 3; ++i) {
        {
            // 进入 RCU 保护区域(scoped_lock 析构时退出)
            // 区域内的旧对象不会被 retire 机制删除
            std::scoped_lock rcu_lock(std::rcu_default_domain());
            Config* cfg = g_config.load(std::memory_order::acquire);
            std::cout << "读者" << id << ": ["
                      << cfg->name << ", " << cfg->value << "]\n";
        } // 退出 RCU 保护区域
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}
// 写者:低频调用,更新配置
void writer() {
    for (int i = 1; i <= 2; ++i) {
        Config* new_cfg = new Config("更新" + std::to_string(i), i * 10);
        // 原子替换指针(所有之后的读者都会读到新配置)
        Config* old_cfg = g_config.exchange(
            new_cfg, std::memory_order::acq_rel);
        // retire:将旧对象标记为"待回收"
        // 等所有已进入保护区域的读者退出后,自动调用 delete old_cfg
        old_cfg->retire();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
int main() {
    std::thread w(writer);
    std::thread r1(reader, 1);
    std::thread r2(reader, 2);
    w.join(); r1.join(); r2.join();
    // 手动清理最后一个存活的 config(它没有被 retire)
    delete g_config.load();
    // rcu_barrier:等待所有已 retire 的对象完成回收
    std::rcu_barrier();
    return 0;
}

危险指针(Hazard Pointers)

危险指针是另一种无锁内存回收机制,适合细粒度的对象保护。

危险指针机制:
读者:
  1. 声明"我正在访问对象 X"(设置危险指针指向 X)
  2. 重新检查原子指针(验证 X 还没有被删除)
  3. 安全使用 X
  4. 清除危险指针
删除者:
  1. 将对象从数据结构中移除(retire)
  2. 扫描所有危险指针,若有指向该对象的则推迟删除
  3. 当没有危险指针保护该对象时,真正删除
#include <hazard_pointer>
#include <atomic>
#include <thread>
#include <iostream>
#include <string>
// 继承 hazard_pointer_obj_base<T> 使 T 支持危险指针保护
struct Node : public std::hazard_pointer_obj_base<Node> {
    std::string value;
    explicit Node(std::string v) : value(std::move(v)) {}
};
std::atomic<Node*> g_node{new Node("初始值")};
void safe_reader(int id) {
    // make_hazard_pointer:获取一个危险指针槽位
    // 每个 hazard_pointer 对象独占一个槽位
    std::hazard_pointer hp = std::make_hazard_pointer();
    for (int i = 0; i < 3; ++i) {
        // protect(src):循环执行:
        //   1. 加载 src 的值为 ptr
        //   2. 将危险指针设为 ptr
        //   3. 重新加载 src,若值变了则重试(防止 TOCTOU)
        // 返回受保护的指针
        Node* node = hp.protect(g_node);
        if (node) {
            std::cout << "读者" << id << ": " << node->value << "\n";
        }
        // hp 清除保护(或 hp.reset_protection())前,node 不会被删除
        std::this_thread::sleep_for(std::chrono::milliseconds(30));
    }
    // hp 析构 --> 危险指针槽位被释放
}
void updater() {
    for (int i = 1; i <= 2; ++i) {
        Node* new_node = new Node("新值" + std::to_string(i));
        Node* old_node = g_node.exchange(new_node, std::memory_order::acq_rel);
        // retire:当所有保护 old_node 的危险指针消失后,自动 delete
        if (old_node) old_node->retire();
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
    }
}
int main() {
    std::thread u(updater);
    std::thread r1(safe_reader, 1);
    std::thread r2(safe_reader, 2);
    u.join(); r1.join(); r2.join();
    // 清理最后一个节点
    delete g_node.load();
    return 0;
}

综合示例:线程池

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
#include <functional>
#include <vector>
#include <iostream>
#include <stdexcept>
#include <type_traits>
#include <memory>
// 基于 C++ 并发原语的通用线程池
class ThreadPool {
    std::vector<std::thread> workers;           // 工作线程
    std::queue<std::function<void()>> task_queue; // 任务队列
    std::mutex queue_mutex;                     // 保护任务队列
    std::condition_variable condition;          // 任务到来的通知
    bool stopped = false;                       // 停止标志
public:
    explicit ThreadPool(size_t thread_count) {
        for (size_t i = 0; i < thread_count; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock lock(queue_mutex);
                        // 等待:有任务 或 停止标志被置位
                        condition.wait(lock, [this] {
                            return stopped || !task_queue.empty();
                        });
                        // 若停止且队列空,线程退出
                        if (stopped && task_queue.empty()) return;
                        // 取出一个任务
                        task = std::move(task_queue.front());
                        task_queue.pop();
                    }
                    // 在锁外执行任务,避免长时间持锁
                    task();
                }
            });
        }
    }
    // 提交任务,返回 future 以获取结果
    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args)
        -> std::future<std::invoke_result_t<F, Args...>>
    {
        using ReturnType = std::invoke_result_t<F, Args...>;
        // 用 packaged_task 包装,捕获所有参数
        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            [func = std::forward<F>(f),
             // 捕获参数副本
             targs = std::tuple<std::decay_t<Args>...>(
                 std::forward<Args>(args)...)]() mutable {
                return std::apply(func, std::move(targs));
            }
        );
        std::future<ReturnType> result = task->get_future();
        {
            std::lock_guard lock(queue_mutex);
            if (stopped) {
                throw std::runtime_error("线程池已停止,不接受新任务");
            }
            // 将 packaged_task 包装进 function<void()>
            task_queue.emplace([task] { (*task)(); });
        }
        condition.notify_one(); // 唤醒一个等待的工作线程
        return result;
    }
    // 析构:等待所有任务完成
    ~ThreadPool() {
        {
            std::lock_guard lock(queue_mutex);
            stopped = true;
        }
        condition.notify_all(); // 唤醒所有线程,让它们检查 stopped
        for (auto& w : workers) {
            if (w.joinable()) w.join();
        }
    }
};
int main() {
    ThreadPool pool(4); // 4个工作线程
    // 提交8个任务,每个任务计算平方
    std::vector<std::future<int>> futures;
    for (int i = 0; i < 8; ++i) {
        futures.push_back(pool.submit([i] {
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
            return i * i;
        }));
    }
    // 收集所有结果
    int total = 0;
    for (int i = 0; i < 8; ++i) {
        int result = futures[i].get();
        std::cout << i << "^2 = " << result << "\n";
        total += result;
    }
    // 0+1+4+9+16+25+36+49 = 140
    std::cout << "总和: " << total << "\n";
    return 0;
}

关键知识点总结

同步原语选择指南

遇到并发问题,如何选择合适的原语?
需要保护共享数据?
  是 --> 读多写少?
             是 --> shared_mutex + shared_lock/unique_lock
             否 --> mutex + lock_guard/unique_lock
  否 --> 需要等待某个条件成立?
             是 --> condition_variable(配合 unique_lock)
                    或 condition_variable_any(配合 stop_token 可中断)
             否 --> 需要控制并发访问数量?
                        是 --> counting_semaphore
                        否 --> 需要多线程同步启动/完成?
                                   一次性 --> latch
                                   多阶段可重复 --> barrier
                        需要跨线程传递计算结果?
                                --> promise/future/async/packaged_task
                        需要请求停止另一线程?
                                --> stop_source/stop_token (jthread)
                        高频读极少写,需要无锁?
                                --> RCU (rcu_obj_base)
                                或 hazard_pointer

内存顺序使用速查


操作类型 可用内存顺序 推荐默认
store(写) relaxed, release, seq_cst seq_cst
load(读) relaxed, acquire, seq_cst seq_cst
fetch_add/exchange 等 RMW relaxed, acquire, release, acq_rel, seq_cst seq_cst
store_add 等 modify-write(C++26) relaxed, release, seq_cst seq_cst

常见错误与防范方法

  1. 数据竞争(Data Race)
    • 问题:多线程同时读写同一非原子变量,且至少一个是写
    • 防范:使用 mutex 保护,或改用 atomic<T>
  2. 死锁(Deadlock)
    • 问题:线程A持有锁1等待锁2,线程B持有锁2等待锁1
    • 防范:用 scoped_lock 同时加多把锁;或固定加锁顺序
  3. 虚假唤醒(Spurious Wakeup)
    • 问题:condition_variable::wait() 在没有 notify 的情况下返回
    • 防范:总在循环中使用 wait,或使用带谓词的 wait(lock, pred)
  4. thread 析构时 joinable
    • 问题:thread 析构时若 joinable() 为 true,调用 terminate()
    • 防范:使用 jthread;或确保每个 thread 都调用 join()detach()
  5. future 重复 get()
    • 问题:future::get() 调用后 valid() 变 false,再次调用是未定义行为
    • 防范:每个 future 只调用一次 get();多处共享用 shared_future
  6. promise 析构时共享状态未就绪(broken_promise)
    • 问题:promise 析构时若未调用 set_value/set_exception
      关联的 future::get() 会抛出 future_error(broken_promise)
    • 防范:确保所有代码路径(包括异常路径)都设置了结果
  7. stop_callback 生命周期超过 stop_source
    • 问题:inplace_stop_callback 的生命周期必须嵌套在 inplace_stop_source
    • 防范:使用带引用计数的 stop_source/stop_callback
      或严格控制对象析构顺序

C++26 执行控制库(Execution Control Library)完全解析

文档来源:N5032 标准草案 第33章

目录

  1. 总览:这个库是干什么的
  2. 核心概念:四大构件
  3. 可查询对象(Queryable)
  4. 异步操作的生命周期
  5. 调度器(Scheduler)
  6. 接收器(Receiver)
  7. 操作状态(Operation State)
  8. 发送器(Sender)
  9. 发送器工厂
  10. 发送器适配器
  11. 发送器消费者
  12. 完成签名(Completion Signatures)
  13. 协程工具
  14. 执行作用域
  15. 并行调度器
  16. 完整示例代码

1. 总览

C++26 的执行控制库(<execution>)是一套用来描述、组合、执行异步工作的框架。
想象一下餐厅的流程:

顾客下单(创建工作)
  -> 厨师做菜(执行工作)
  -> 服务员上菜(完成通知)

这个库做的事情类似:

发送器(Sender) 描述工作
  -> 调度器(Scheduler) 决定在哪里执行
  -> 接收器(Receiver) 处理结果
  -> 操作状态(Operation State) 管理整个过程

四大组件关系图

connect 连接

connect 连接

schedule 获取

start 启动

成功

失败

取消

Sender 发送器
描述异步工作

Operation State
操作状态
管理执行过程

Receiver 接收器
处理完成结果

Scheduler 调度器
决定在哪个线程/资源执行

执行

set_value
值完成

set_error
错误完成

set_stopped
停止完成

三类定制点对象(表158)


类型 作用 举例
核心(core) 连接各组件 connect, start
完成函数(completion functions) 通知异步操作完成 set_value, set_error, set_stopped
查询(queries) 读取对象属性 get_allocator, get_scheduler

2. 核心概念

2.1 movable-value 概念

// 这个概念检查一个类型是否可以被"移动存储"
// 用于确保发送器能安全地存储它接收到的参数
template<class T>
concept movable-value =
    move_constructible<decay_t<T>> &&      // 衰减后的类型可以移动构造
    constructible_from<decay_t<T>, T> &&   // 可以从 T 构造衰减类型
    (!is_array_v<remove_reference_t<T>>);  // 不是数组类型(数组不能直接移动)

2.2 MATCHING-SIG — 签名匹配

对于函数类型 F 1 = R 1 ( A r g s 1 . . . ) F_1 = R_1(Args_1...) F1=R1(Args1...) F 2 = R 2 ( A r g s 2 . . . ) F_2 = R_2(Args_2...) F2=R2(Args2...)
MATCHING-SIG ( F 1 , F 2 ) = true    ⟺    same_as ⟨ R 1 ( A r g s 1 & & . . . ) , R 2 ( A r g s 2 & & . . . ) ⟩ \text{MATCHING-SIG}(F_1, F_2) = \text{true} \iff \text{same\_as}\langle R_1(Args_1\&\&...), R_2(Args_2\&\&...)\rangle MATCHING-SIG(F1,F2)=truesame_asR1(Args1&&...),R2(Args2&&...)⟩
简单说:把所有参数都加上 &&(右值引用),然后看两个函数类型是否完全相同。

3. 可查询对象

3.1 什么是 Queryable?

"可查询对象"就像一个键值对字典,用"查询对象"作为键,用对应的值作为值。

queryable 对象
  ├── key: get_allocator  -> value: 某个分配器
  ├── key: get_scheduler  -> value: 某个调度器
  └── key: get_stop_token -> value: 某个停止令牌
// queryable 概念的定义极其简单:只要能被销毁就行
// 真正的约束通过语义要求来表达
template<class T>
concept queryable = destructible<T>;

3.2 查询的语法

对于查询对象 q、可查询对象 env、额外参数 args...

  • 调用方式q(env, args...) 等价于 env.query(q, args...)
  • 常量性:对 env 的查询等价于对 const env 的查询(查询不修改对象)
  • 等价保持:相同输入总产生相同输出

3.3 常见查询对象

forwarding_query  —— 询问某查询是否应该被转发
get_allocator     —— 获取关联的分配器
get_stop_token    —— 获取关联的停止令牌
get_scheduler     —— 获取关联的调度器
get_domain        —— 获取关联的执行域
get_env           —— 获取对象的环境

4. 异步操作的生命周期

4.1 异步操作的定义

一个"异步操作"具有以下特征:

  1. 显式创建
  2. 最多被显式启动一次
  3. 启动后,最终以恰好一种方式完成
    • 值完成(value completion / set_value):成功,可有任意数量结果
    • 错误完成(error completion / set_error):失败,有单个错误结果
    • 停止完成(stopped completion / set_stopped):取消,无结果
  4. 完成时可在不同执行资源
  5. 可以创建子操作

4.2 生命周期图

[创建阶段]
Sender + Receiver  --connect()--> Operation State(操作状态诞生)
[执行阶段]
Operation State  --start()--> 异步操作开始执行
                              (生命周期开始)
[完成阶段]
异步操作  --set_value/set_error/set_stopped--> 完成操作执行
                                              (生命周期结束)

重要规则

  • 操作状态的生命周期不能在异步操作完成前结束,否则是未定义行为
  • 完成操作只能执行一次

4.3 父子操作关系

父操作
  ├── 子操作1(必须在父操作完成前完成)
  ├── 子操作2
  └── 子操作3

5. 调度器

5.1 调度器是什么?

调度器是执行资源的抽象,提供统一的接口来把工作提交到某个执行上下文(线程池、GPU、当前线程等)。

// scheduler 概念要求:
template<class Sch>
concept scheduler =
    // 1. 标记自己是调度器
    derived_from<typename remove_cvref_t<Sch>::scheduler_concept, scheduler_t> &&
    // 2. 是可查询的
    queryable<Sch> &&
    // 3. 支持 schedule() 调用,返回一个 sender
    requires(Sch&& sch) {
        { schedule(std::forward<Sch>(sch)) } -> sender;
        // 4. schedule 返回的 sender 的值完成调度器 == 自己
        { auto(get_completion_scheduler<set_value_t>(
            get_env(schedule(std::forward<Sch>(sch))))) }
            -> same_as<remove_cvref_t<Sch>>;
    } &&
    // 5. 可以判断是否相等(同一执行资源)
    equality_comparable<remove_cvref_t<Sch>> &&
    // 6. 可以复制
    copyable<remove_cvref_t<Sch>>;

5.2 调度器语义规则

  • sch1 == sch2true,当且仅当它们共享同一执行资源
  • 调度器的析构函数不能阻塞等待已连接的接收器完成
  • 所有复制/移动/比较操作不能抛异常,不能引入数据竞争

5.3 调度器查询

get_forward_progress_guarantee(sch)
  -> concurrent    // 并发前进保证(最强)
  -> parallel      // 并行前进保证
  -> weakly_parallel // 弱并行(默认)

6. 接收器

6.1 接收器的角色

接收器是异步操作的延续(continuation),它聚合了三种处理函数:

接收器 Receiver
  ├── set_value(rcvr, vals...)  —— 值完成时调用
  ├── set_error(rcvr, err)      —— 错误完成时调用
  └── set_stopped(rcvr)         —— 停止完成时调用

6.2 receiver 概念

template<class Rcvr>
concept receiver =
    // 1. 标记自己是接收器
    derived_from<typename remove_cvref_t<Rcvr>::receiver_concept, receiver_t> &&
    // 2. get_env 返回可查询对象
    requires(const remove_cvref_t<Rcvr>& rcvr) {
        { get_env(rcvr) } -> queryable;
    } &&
    // 3. 右值可移动
    move_constructible<remove_cvref_t<Rcvr>> &&
    // 4. 左值可复制构造
    constructible_from<remove_cvref_t<Rcvr>, Rcvr> &&
    // 5. 移动不抛异常(极其重要!完成函数必须 noexcept)
    is_nothrow_move_constructible_v<remove_cvref_t<Rcvr>>;

6.3 三个完成函数的规则

所有完成函数都:

  • 要求 rcvr 是右值(不能是左值或 const 右值)
  • 必须 noexcept(用 MANDATE-NOTHROW 宏强制)
// set_value: 值完成
// rcvr.set_value(vs...) 必须 noexcept
set_value(rcvr, vs...);
// set_error: 错误完成
// rcvr.set_error(err) 必须 noexcept
set_error(rcvr, err);
// set_stopped: 停止完成
// rcvr.set_stopped() 必须 noexcept
set_stopped(rcvr);

7. 操作状态

7.1 operation_state 概念

template<class O>
concept operation_state =
    // 1. 标记自己是操作状态
    derived_from<typename O::operation_state_concept, operation_state_t> &&
    // 2. 支持 start(o) 调用(o 必须是左值)
    requires (O& o) {
        start(o);  // 注意:o 是左值引用
    };

7.2 重要规则

  • start(op) 必须 noexcept
  • start 接受左值,不接受右值(操作状态不能被移动后启动)
  • 对库提供的 sender 创建的操作状态,禁止复制或移动
  • 操作状态在异步操作生命周期内不能被销毁

7.3 数据流图

connect(sender, receiver)
  |
  v
operation_state(拥有 receiver 的所有权)
  |
  v
start(op)  <-- 异步操作生命周期开始
  |
  v
某个线程/执行代理上执行...
  |
  v
set_value/set_error/set_stopped  <-- 异步操作生命周期结束
                                     (此后 op 变为无效,访问是 UB)

8. 发送器

8.1 sender 概念

template<class Sndr>
concept sender =
    // 1. 被标记为 sender(或者是可等待体)
    enable_sender<remove_cvref_t<Sndr>> &&
    // 2. get_env 返回可查询对象
    requires (const remove_cvref_t<Sndr>& sndr) {
        { get_env(sndr) } -> queryable;
    } &&
    // 3. 可移动构造
    move_constructible<remove_cvref_t<Sndr>> &&
    // 4. 从自身构造(支持从左值构造)
    constructible_from<remove_cvref_t<Sndr>, Sndr>;

8.2 sender_in 概念

// sender_in<Sndr, Env> 表示:
// 在环境类型 Env 中,发送器 Sndr 的完成签名是已知且有效的
template<class Sndr, class... Env>
concept sender_in =
    sender<Sndr> &&
    (sizeof...(Env) <= 1) &&           // 最多一个环境
    (queryable<Env> && ...) &&         // 环境必须可查询
    is-constant<get_completion_signatures<Sndr, Env...>()>;  // 完成签名是编译期常量

8.3 非依赖发送器 vs 依赖发送器

非依赖发送器(non-dependent sender):
  完成签名与执行环境无关,编译期就能确定
  例:just(42) 永远完成为 set_value(int)
依赖发送器(dependent sender):
  完成签名取决于执行环境
  例:read_env(get_scheduler) 的结果类型取决于环境中的调度器类型

8.4 basic-sender 框架(最核心的内部机制)

库内部用 basic-sender 来实现所有标准算法,它是一个模板聚合类:

basic-sender<Tag, Data, Child0, Child1, ...>
  ├── get<0>() → Tag(标识算法种类,如 then_t)
  ├── get<1>() → Data(算法参数,如 then 的函数对象)
  ├── get<2>() → Child0(第一个子发送器)
  └── get<3>() → Child1(第二个子发送器)

连接流程(内部实现):

basic-sender::connect(rcvr)
  |
  v
创建 basic-operation(继承自 basic-state)
  |
  ├── basic-state
  │     ├── rcvr(接收器的副本)
  │     └── state(算法特定状态,由 impls-for::get-state 创建)
  │
  └── inner-ops(子发送器的操作状态数组)
        通过 connect-all 把每个子发送器连接到对应的 basic-receiver

8.5 impls-for 定制机制

每个算法通过特化 impls-for 来定制行为:

impls-for<Tag>
  ├── get-attrs   定制发送器的属性(环境查询)
  ├── get-env     定制子接收器的环境
  ├── get-state   创建算法特定状态
  ├── start       启动逻辑
  ├── complete    完成逻辑
  └── check-types 编译期类型检查

9. 发送器工厂

9.1 just / just_error / just_stopped

这三个是最简单的发送器工厂,同步完成:

// just(vals...) → 立即以值完成
auto s1 = just(42, 3.14f);   // 完成为 set_value(42, 3.14f)
// just_error(err) → 立即以错误完成
auto s2 = just_error(std::make_exception_ptr(std::runtime_error("oops")));
// just_stopped() → 立即以停止完成
auto s3 = just_stopped();

内部实现核心(start 逻辑):

// impls-for<just_t>::start 等价于:
[](auto& state, auto& rcvr) noexcept -> void {
    auto& [...ts] = state;   // 解包存储的值
    set_value(std::move(rcvr), std::move(ts)...);  // 立即触发值完成
};

9.2 schedule

// 从调度器获取一个"调度发送器"
// 当启动时,把工作提交到调度器的执行资源
auto sndr = schedule(my_scheduler);
// 完成签名:set_value()(无参数值完成)

9.3 read_env

// 从接收器的环境中读取一个查询结果作为值完成
auto sndr = read_env(get_scheduler);
// 完成签名:set_value(<当前调度器类型>)

10. 发送器适配器

10.1 管道语法

所有适配器支持管道操作符 |

// 函数调用形式
then(sndr, f)
// 管道形式(等价)
sndr | then(f)
// 链式管道
sndr | then(f) | upon_error(handle_err) | continues_on(sch)

10.2 then / upon_error / upon_stopped

附加回调到发送器的某种完成上:

// then: 对值完成的结果调用 f,f 的返回值作为新的值完成
auto s = just(42) | then([](int x) { return x * 2; });
// 最终完成为 set_value(84)
// upon_error: 对错误完成调用 f
auto s2 = just_error(err) | upon_error([](auto e) { return -1; });
// 最终完成为 set_value(-1)
// upon_stopped: 对停止完成调用 f
auto s3 = just_stopped() | upon_stopped([] { return 0; });
// 最终完成为 set_value(0)

内部 complete 逻辑:

// 如果完成标签匹配,调用 f 并把结果作为新的值完成
// 否则,直接转发原完成
[]<class Tag, class... Args>(auto, auto& fn, auto& rcvr, Tag, Args&&... args) noexcept -> void {
    if constexpr (same_as<Tag, decayed-typeof<set-cpo>>) {
        TRY-SET-VALUE(rcvr, invoke(std::move(fn), std::forward<Args>(args)...));
    } else {
        Tag()(std::move(rcvr), std::forward<Args>(args)...);  // 转发
    }
};

10.3 let_value / let_error / let_stopped

then 更强大:回调可以返回一个新的发送器,新发送器的完成作为整体完成:

// let_value: 值完成时,用结果调用 f,f 返回新发送器
auto s = just(42)
    | let_value([](int x) {
        return just(x, x * 2);  // 返回新发送器
    });
// 最终完成为 set_value(42, 84)

流程:

父发送器完成(值完成)
  -> 调用 f(结果参数...)
  -> f 返回子发送器
  -> 连接并启动子发送器
  -> 子发送器的完成作为整体完成

10.4 starts_on — 在指定调度器上启动

// 在 sch 的执行资源上启动 sndr
auto s = starts_on(sch, sndr);

变换过程(transform_sender):

// starts_on 最终变换为:
let_value(
    schedule(sch),            // 先跳到 sch 的执行资源
    [sndr]() mutable {
        return std::move(sndr);  // 然后在那里执行 sndr
    }
);

10.5 continues_on — 完成时转移到指定调度器

// sndr 在当前执行代理上运行,完成时转移到 sch
auto s = sndr | continues_on(sch);

内部变换为 schedule_from(sch, sndr)

sndr 执行
  -> 完成信号发生
  -> 把完成信号暂存
  -> 在 sch 的执行资源上触发最终完成

10.6 on — 双向执行转移

on 有两种形式:
形式一on(sch, sndr) — 在 sch 上运行,完成后回到原调度器

原调度器(env 中的 scheduler)
  -> starts_on(sch, sndr)   在 sch 上运行
  -> continues_on(原调度器)  回到原调度器

形式二on(sndr, sch, closure) — sndr 完成后转到 sch,执行 closure,再回来

sndr 完成
  -> 转移到 sch
  -> closure(sndr的结果)
  -> 回到原来的调度器

10.7 bulk / bulk_chunked / bulk_unchunked — 并行批量执行

// bulk: 对 0..shape-1 中每个 i 调用 f(i, vals...)
auto s = just(data) | bulk(std::execution::par, 100, [](int i, auto& data) {
    data[i] = process(i);
});
// bulk_chunked: f(begin, end, vals...),实现可以把迭代分块
// bulk_unchunked: f(i, vals...),每次一个元素

三者关系:

bulk(sndr, policy, shape, f)
  | transform_sender
  v
bulk_chunked(sndr, policy, shape, new_f)
  (new_f 循环调用原始 f)

10.8 when_all — 等待所有发送器完成

// 等待所有子发送器完成,把所有值结果拼接为一个值完成
auto s = when_all(
    just(1),
    just(2.0),
    just("hello")
);
// 完成为 set_value(1, 2.0, "hello")

工作原理(内部状态):

state-type
  ├── count(原子计数器,初始 = 子发送器数量)
  ├── stop_src(内部停止源,任一失败则全部取消)
  ├── disp(disposition:started/error/stopped)
  ├── errors(错误存储)
  └── values(各子发送器的值存储)
每个子发送器完成时:
  -> 调用 arrive()
  -> count--
  -> 如果 count == 0,调用 complete()
       -> 按 disp 触发最终完成

10.9 write_env — 注入环境属性

// 给 sndr 添加自定义环境属性
auto s = write_env(sndr, prop(get_allocator, my_alloc{}));
// 当 sndr 的子操作查询分配器时,得到 my_alloc{}

10.10 stopped_as_optional — 把停止变为 optional

// 停止完成 -> set_value(optional<T>{})(disengaged)
// 值完成(x) -> set_value(optional<T>{x})(engaged)
// 再也不会有 stopped 完成
auto s = some_sender | stopped_as_optional;

10.11 stopped_as_error — 把停止变为错误

// 停止完成 -> set_error(err)
auto s = some_sender | stopped_as_error(my_error{});

10.12 associate — 关联到执行作用域

// 把 sender 与一个 scope token 关联
// 作用域可以追踪所有关联发送器的生命周期
auto s = sndr | associate(scope.get_token());

11. 发送器消费者

11.1 sync_wait — 同步等待

// 阻塞当前线程,等待发送器完成
// 返回 optional<tuple<结果类型...>>
auto result = this_thread::sync_wait(some_sender);
if (result) {
    auto [val] = *result;  // 值完成
    // 使用 val
}
// result 为空:stopped 完成
// 抛出异常:error 完成

内部实现:

// sync_wait 内部使用 run_loop 驱动事件循环
sync-wait-state<Sndr> state;   // 包含 run_loop 和结果存储
auto op = connect(sndr, sync-wait-receiver<Sndr>{&state});
start(op);           // 启动异步操作
state.loop.run();    // 当前线程运行事件循环,直到 loop.finish() 被调用
// 接收器的完成函数会调用 loop.finish()
if (state.error) rethrow_exception(state.error);
return state.result;

11.2 sync_wait_with_variant — 返回变体类型

// 对于有多种值完成签名的发送器
// 返回 optional<variant<tuple<...>, tuple<...>, ...>>
auto result = this_thread::sync_wait_with_variant(sndr);

11.3 spawn — 即发即忘

// 关联到作用域,立即启动,不关心结果
// 发送器必须只有 set_value() 和 set_stopped() 两种完成
execution::spawn(sndr, scope.get_token());

11.4 spawn_future — 即发,但可以等待结果

// 立即启动,返回一个可等待结果的 sender
auto future_sndr = execution::spawn_future(sndr, scope.get_token());
// 之后可以 co_await future_sndr 或 sync_wait(future_sndr)

12. 完成签名

12.1 什么是完成签名?

完成签名是描述异步操作如何完成的函数类型:

// 可能的完成签名:
set_value_t()            // 空值完成
set_value_t(int, float)  // 携带 int 和 float 的值完成
set_error_t(exception_ptr)  // 携带 exception_ptr 的错误完成
set_stopped_t()          // 停止完成

12.2 completion_signatures 类模板

// 封装一组完成签名
struct my_sender {
    using sender_concept = execution::sender_t;
    using completion_signatures = execution::completion_signatures<
        set_value_t(),              // 可以空值完成
        set_value_t(int, float),    // 可以带值完成
        set_error_t(exception_ptr), // 可以带异常完成
        set_stopped_t()             // 可以停止完成
    >;
};

12.3 gather-signatures — 提取特定标签的签名

对于发送器的完成签名集合 C o m p l e t i o n s Completions Completions 和标签 T a g Tag Tag
gather-signatures ⟨ T a g , C o m p l e t i o n s , T u p l e , V a r i a n t ⟩ \text{gather-signatures}\langle Tag, Completions, Tuple, Variant\rangle gather-signaturesTag,Completions,Tuple,Variant
= 把所有 T a g ( A r g s . . . ) Tag(Args...) Tag(Args...) 形式的签名变为 T u p l e ⟨ A r g s . . . ⟩ Tuple\langle Args...\rangle TupleArgs...,再用 V a r i a n t Variant Variant 包装所有这些 Tuple

completion_signatures<
    set_value_t(),
    set_value_t(int, float),
    set_error_t(exception_ptr)
>
gather-signatures<set_value_t, ..., decayed-tuple, variant>
  = variant<
        tuple<>,        // 对应 set_value_t()
        tuple<int, float>  // 对应 set_value_t(int, float)
    >

12.4 实用类型别名

// 提取所有值完成的类型(组成 variant of tuples)
value_types_of_t<Sndr, Env>
// 提取所有错误完成的类型(组成 variant)
error_types_of_t<Sndr, Env>
// 发送器是否有停止完成
sends_stopped<Sndr, Env>  // bool 常量

13. 协程工具

13.1 as_awaitable — 让发送器可被 co_await

// 在协程中,可以 co_await 一个 sender
task<int> my_coroutine() {
    int result = co_await just(42);  // just(42) 被当作 awaitable
    co_return result;
}

as_awaitable 的选择逻辑(优先级从高到低):

1. expr.as_awaitable(p)          如果存在
2. expr 本身就是 awaiter         如果是
3. sender + 有 await 完成适配器  如果满足
4. sender(awaitable-sender)   如果满足
5. 直接返回 expr                 兜底

13.2 with_awaitable_senders — 让协程支持 sender

// 在协程 Promise 类型中继承此类,使 sender 自动可等待
struct my_promise : with_awaitable_senders<my_promise> {
    // ...
};

功能:

  • 提供 await_transform 将 sender 转为 awaitable
  • 提供 unhandled_stopped 默认实现(调用 terminate(),除非设置了 continuation)
  • 支持通过 set_continuation 传播停止

13.3 task<T, Environment> — 协程任务类型

// 最完整的协程 sender 包装
task<int> compute() {
    int x = co_await just(21);
    co_return x * 2;  // 完成为 set_value(42)
}
// 带自定义环境
task<void, my_env> custom_task() {
    co_await schedule(get_current_scheduler());
}

task 的完成签名:
c o m p l e t i o n _ s i g n a t u r e s = { s e t _ v a l u e _ t ( ) if  T = v o i d s e t _ v a l u e _ t ( T ) otherwise ∪    e r r o r _ t y p e s ∪ { s e t _ s t o p p e d _ t ( ) } completion\_signatures = \begin{cases} set\_value\_t() & \text{if } T = void \\ set\_value\_t(T) & \text{otherwise} \end{cases} \cup \; error\_types \cup \{set\_stopped\_t()\} completion_signatures={set_value_t()set_value_t(T)if T=voidotherwiseerror_types{set_stopped_t()}

13.4 task 内部架构

task<T, Env>
  |
  +-- promise_type(协程承诺类型)
  |     ├── alloc(分配器)
  |     ├── source(停止源)
  |     ├── token(停止令牌)
  |     ├── result(T 类型的结果)
  |     └── errors(错误变体)
  |
  +-- state<Rcvr>(操作状态)
        ├── handle(协程句柄)
        ├── rcvr(外部接收器)
        ├── own-env(自有环境)
        └── environment(环境对象)

关键机制

  • co_await sender 时,自动调用 affine_on(sender, SCHED(*this)),确保在同一个调度器上恢复
  • yield_value(with_error<E>{...}) 用于发出错误完成
  • change_coroutine_scheduler{sch} 用于切换协程的关联调度器

14. 执行作用域

14.1 作用域的作用

执行作用域(Scope)用来追踪异步操作的生命周期,确保在析构前所有关联的操作都已完成。
类似于有界线程池或 structured concurrency(结构化并发)。

14.2 scope_token 概念

// scope_token 是创建关联的句柄
template<class Token>
concept scope_token =
    copyable<Token> &&
    requires(const Token token) {
        // try_associate(): 尝试获取关联(失败时返回未 engaged 的 Assoc)
        { token.try_associate() } -> scope_association;
        // wrap(sndr): 包装发送器(可能添加停止令牌传播等)
        { token.wrap(declval<test-sender>()) } -> sender_in<test-env>;
    };

14.3 作用域状态机

构造

try_associate() 调用

close()

close()

join() 启动

join() 启动

join() 启动

count 降为 0

count 降为 0

析构

unused

open

unused_and_closed

closed

open_and_joining

closed_and_joining

joined

14.4 simple_counting_scope

最简单的作用域,只追踪关联数量,不支持取消:

execution::simple_counting_scope scope;
auto token = scope.get_token();
// 关联发送器
auto s = some_sender | associate(token);
// 关闭作用域(不再允许新关联)
scope.close();
// 等待所有已关联操作完成
auto join_sndr = scope.join();
sync_wait(starts_on(sch, join_sndr));

14.5 counting_scope

simple_counting_scope 基础上增加取消支持

execution::counting_scope scope;
auto token = scope.get_token();
// token.wrap(sndr) 会把作用域的停止令牌注入到 sndr
// 当 scope.request_stop() 时,所有关联操作收到停止请求
scope.request_stop();  // 请求所有关联操作停止

token.wrap(sndr) 等价于:

stop-when(sndr, scope.s_source.get_token())

15. 并行调度器

15.1 parallel_scheduler

系统级的并行执行资源调度器:

auto sch = execution::get_parallel_scheduler();
// sch.get_forward_progress_guarantee() == forward_progress_guarantee::parallel

15.2 可替换后端

通过 system_context_replaceability 命名空间,用户可以替换默认实现:

parallel_scheduler
  |
  v(通过 query_parallel_scheduler_backend() 获取)
parallel_scheduler_backend(可替换!)
  ├── schedule(receiver_proxy&, span<byte>)
  ├── schedule_bulk_chunked(size_t n, bulk_item_receiver_proxy&, span<byte>)
  └── schedule_bulk_unchunked(size_t n, bulk_item_receiver_proxy&, span<byte>)

15.3 receiver_proxy — 接收器代理

后端通过 receiver_proxy 与前端通信,无需知道具体接收器类型:

struct receiver_proxy {
    virtual void set_value() noexcept = 0;
    virtual void set_error(exception_ptr) noexcept = 0;
    virtual void set_stopped() noexcept = 0;
    // 查询接收器环境中的某个属性
    template<class P, class Query>
    optional<P> try_query(Query q) noexcept;
};

16. 完整示例代码

16.1 基础用法:just + then + sync_wait

#include <execution>
#include <print>
#include <optional>
#include <tuple>
int main() {
    using namespace std::execution;
    // 创建一个发送器链:just(10) -> then(乘2) -> then(加1)
    auto sndr = just(10)
        | then([](int x) { return x * 2; })
        | then([](int x) { return x + 1; });
    // 同步等待结果
    auto result = std::this_thread::sync_wait(std::move(sndr));
    if (result) {
        auto [val] = *result;
        std::println("结果: {}", val);  // 输出: 结果: 21
    }
    return 0;
}

https://godbolt.org/z/Yssf1ezbv

16.2 错误处理示例

#include <execution>
#include <print>
#include <exception>
#include <stdexcept>
int main() {
    using namespace std::execution;
    // 错误处理链
    auto sndr = just_error(std::make_exception_ptr(
                    std::runtime_error("出错了")))
        | upon_error([](std::exception_ptr ep) -> int {
            try {
                std::rethrow_exception(ep);
            } catch (const std::exception& e) {
                std::println("捕获错误: {}", e.what());
                return -1;  // 把错误转换为值完成
            }
        });
    auto result = std::this_thread::sync_wait(std::move(sndr));
    if (result) {
        auto [val] = *result;
        std::println("恢复后的值: {}", val);  // 输出: 恢复后的值: -1
    }
    return 0;
}

https://godbolt.org/z/6jGq9bGW7

16.3 run_loop 手动事件循环示例

#include <execution>
#include <thread>
#include <print>
int main() {
    using namespace std::execution;
    run_loop loop;
    // 在另一个线程上驱动事件循环
    std::thread loop_thread([&loop] {
        loop.run();  // 阻塞直到 loop.finish() 被调用
    });
    auto sch = loop.get_scheduler();
    // 在 loop 的执行资源上调度工作
    auto sndr = schedule(sch)
        | then([] {
            std::println("在 loop 线程上执行!");
            return 42;
        });
    // 同步等待(会委托给 loop 的调度器)
    auto result = std::this_thread::sync_wait(std::move(sndr));
    loop.finish();    // 通知事件循环结束
    loop_thread.join();
    if (result) {
        auto [val] = *result;
        std::println("结果: {}", val);
    }
    return 0;
}

https://godbolt.org/z/haM9erM3G

16.4 when_all 并行等待示例

#include <execution>
#include <print>
int main() {
    using namespace std::execution;
    run_loop loop;
    auto sch = loop.get_scheduler();
    std::thread t([&] { loop.run(); });
    // 并行执行两个计算,等待都完成
    auto sndr = when_all(
        just(1) | then([](int x) { return x * 10; }),
        just(2) | then([](int x) { return x * 20; })
    );
    auto result = std::this_thread::sync_wait(std::move(sndr));
    loop.finish();
    t.join();
    if (result) {
        auto [a, b] = *result;
        std::println("结果: {} 和 {}", a, b);  // 10 和 40
    }
    return 0;
}

https://godbolt.org/z/Gq7crn56c

16.5 自定义发送器示例

#include <execution>
#include <print>
// 自定义发送器:立即完成,值为固定常量
struct constant_sender {
    using sender_concept = std::execution::sender_t;
    // 声明完成签名
    using completion_signatures =
        std::execution::completion_signatures<
            std::execution::set_value_t(int)
        >;
    int value;
    // connect 返回操作状态
    template<std::execution::receiver_of<completion_signatures> Rcvr>
    struct operation_state {
        using operation_state_concept = std::execution::operation_state_t;
        Rcvr rcvr;
        int value;
        // start: 立即触发值完成
        void start() & noexcept {
            std::execution::set_value(std::move(rcvr), value);
        }
    };
    template<std::execution::receiver_of<completion_signatures> Rcvr>
    operation_state<std::remove_cvref_t<Rcvr>>
    connect(Rcvr&& rcvr) && {
        return {std::forward<Rcvr>(rcvr), value};
    }
};
int main() {
    using namespace std::execution;
    constant_sender s{42};
    auto result = std::this_thread::sync_wait(std::move(s));
    if (result) {
        auto [val] = *result;
        std::println("常量发送器的值: {}", val);  // 42
    }
    return 0;
}

https://godbolt.org/z/drnEqMbWx

总结:各组件职责一览

+------------------+--------------------------------+---------------------------+
| 组件             | 职责                           | 关键操作                  |
+------------------+--------------------------------+---------------------------+
| Sender           | 描述异步工作的"食谱"           | connect(), get_env()      |
| Receiver         | 处理异步工作的"结果"           | set_value/error/stopped() |
| Operation State  | 管理执行中的异步操作           | start()                   |
| Scheduler        | 决定在哪里执行                 | schedule()                |
| Environment      | 存储执行上下文的属性           | query()                   |
| Scope            | 追踪异步操作生命周期           | get_token(), join()       |
+------------------+--------------------------------+---------------------------+

数据流总结

Scheduler → schedule Sender → connect OperationState → start 执行 → set_* Receiver \text{Scheduler} \xrightarrow{\text{schedule}} \text{Sender} \xrightarrow{\text{connect}} \text{OperationState} \xrightarrow{\text{start}} \text{执行} \xrightarrow{\text{set\_*}} \text{Receiver} Schedulerschedule Senderconnect OperationStatestart 执行set_* Receiver

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐