C++草案N5032学习:C++ 输入/输出库(`<iostream>` 体系)详细中文解析
来源:C++ 标准草案 N5032,第 31 章 Input/output library
目录
- 总体架构
- 类型体系与继承关系
- ios_base — 流的根基
- basic_ios — 流状态管理
- stream buffer — 缓冲区原理
- 标准 iostream 对象
- 格式化输入 basic_istream
- 格式化输出 basic_ostream
- 操纵符 Manipulators
- 字符串流
- Span 流(C++23)
- 文件流
- 同步输出流 syncstream
- 文件系统 filesystem
- 完整可运行示例
总体架构
C++ 的 I/O 库采用分层设计,每一层有明确的职责:
用户代码
|
| 使用 << 和 >> 运算符
v
basic_istream / basic_ostream <- 格式化层:负责数据的格式转换
|
| 通过 rdbuf() 调用
v
basic_streambuf <- 缓冲层:管理字符缓冲区
|
| 派生出具体实现
v
basic_stringbuf / basic_filebuf / basic_syncbuf <- 数据源层:字符串/文件/同步
用生活比喻:
basic_streambuf像一个仓库,存放待发送/待接收的字符basic_ostream像一个快递员,把数据打包(格式化)后送到仓库basic_istream像一个收货员,从仓库取出数据并解包(解析)
类型体系与继承关系
缓冲区的继承树单独列出:
ios_base — 流的根基
ios_base 是所有流类的根基类,它存储了:
- 格式标志(fmtflags):控制数字进制、对齐方式、小数点等
- 流状态(iostate):记录读写是否出错
- 打开模式(openmode):文件是读还是写
- 私有数据槽:供用户存储自定义数据
格式标志(fmtflags)
fmtflags 是一个位掩码,每个位控制一种格式行为:
boolalpha - 用 "true"/"false" 显示布尔值(而不是 1/0)
dec - 十进制整数(默认)
hex - 十六进制整数
oct - 八进制整数
fixed - 定点小数(如 3.14)
scientific - 科学计数法(如 3.14e+00)
hexfloat - 十六进制浮点(0x1.91eb86p+1)
showbase - 显示进制前缀(0x、0)
showpoint - 始终显示小数点
showpos - 正数显示 + 号
skipws - 输入时跳过前导空白(默认开启)
unitbuf - 每次输出后立即刷新缓冲区
uppercase - 十六进制字母大写(A-F 而非 a-f)
left - 左对齐
right - 右对齐(默认)
internal - 符号左对齐,数字右对齐
三个"组合标志"(只能选其中一个):
a d j u s t f i e l d = l e f t ∣ r i g h t ∣ i n t e r n a l adjustfield = left \mid right \mid internal adjustfield=left∣right∣internal
b a s e f i e l d = d e c ∣ o c t ∣ h e x basefield = dec \mid oct \mid hex basefield=dec∣oct∣hex
f l o a t f i e l d = s c i e n t i f i c ∣ f i x e d floatfield = scientific \mid fixed floatfield=scientific∣fixed
流状态(iostate)
iostate 是一个位掩码,记录流是否出错:
goodbit = 0 - 一切正常(没有任何错误位被设置)
eofbit - 到达文件末尾
failbit - 操作失败(如格式解析错误)
badbit - 流缓冲区损坏(严重错误)
判断方法:
stream.good() ←→ rdstate() == goodbit
stream.eof() ←→ eofbit 被设置
stream.fail() ←→ failbit 或 badbit 被设置
stream.bad() ←→ badbit 被设置
打开模式(openmode)
app - 每次写入前先移到文件末尾(追加模式)
ate - 打开后立即移到文件末尾(seek to end)
binary - 二进制模式(不做换行符转换)
in - 打开用于读
out - 打开用于写
trunc - 打开时清空现有内容
noreplace- 独占创建(文件已存在则失败)
定位方向(seekdir)
beg - 从文件开头计算偏移
cur - 从当前位置计算偏移
end - 从文件末尾计算偏移
basic_ios — 流状态管理
basic_ios 继承 ios_base,增加了与具体字符类型相关的功能:
basic_ios 的核心成员:
rdbuf() - 返回关联的 streambuf(缓冲区指针)
tie() - 返回"绑定流"(输出前先刷新绑定流)
fill() - 填充字符(宽度不足时用什么字符补齐,默认空格)
imbue() - 设置本地化 locale
初始化默认值(来自标准表格)
当 basic_ios::init(sb) 被调用时,各字段被设置为:
| 字段 | 初始值 |
|---|---|
rdbuf() |
sb(传入的缓冲区) |
tie() |
nullptr(不绑定任何流) |
rdstate() |
goodbit(如果sb不为空),否则badbit |
flags() |
skipws | dec(跳过空白 + 十进制) |
width() |
0(不限制宽度) |
precision() |
6(6位有效数字) |
fill() |
widen(' ')(空格字符) |
bool 转换
if (stream) { ... } // 等价于 if (!stream.fail())
if (!stream) { ... } // 等价于 if (stream.fail())
tie() 的作用
cin.tie() 默认返回 &cout。这意味着:在从 cin 读数据之前,会自动刷新 cout,确保提示语先显示再等待输入。
stream buffer — 缓冲区原理
缓冲区是 C++ I/O 的核心机制,通过一段内存数组实现高效的批量读写。
三指针模型
每个序列(输入序列 / 输出序列)由三个指针控制:
内部字符数组示意:
输入序列(get area):
+---+---+---+---+---+---+---+---+
| a | b | c | d | e | f | g | h |
+---+---+---+---+---+---+---+---+
^ ^ ^
| | |
eback() gptr() egptr()
(起始) (当前读位置) (结束)
输出序列(put area):
+---+---+---+---+---+---+---+---+
| X | Y | Z | _ | _ | _ | _ | _ |
+---+---+---+---+---+---+---+---+
^ ^ ^
| | |
pbase() pptr() epptr()
(起始) (当前写位置) (结束)
- 读操作:从
*gptr()读取字符,然后gptr()向右移动 - 写操作:将字符写入
*pptr(),然后pptr()向右移动 - 当读指针到达
egptr()时,调用underflow()从底层(文件/字符串)加载更多数据 - 当写指针到达
epptr()时,调用overflow()将缓冲区数据刷写到底层
关键虚函数
| 虚函数 | 何时调用 | 作用 |
|---|---|---|
underflow() |
读区空了 | 从底层读入更多数据 |
uflow() |
读区空了且需要移动指针 | 同上,但自动移动gptr |
overflow(c) |
写区满了 | 把缓冲数据刷写到底层,然后写入c |
pbackfail(c) |
需要退回字符时 | 将c放回输入序列 |
seekoff() |
相对定位 | 移动读/写位置 |
seekpos() |
绝对定位 | 移动到指定位置 |
sync() |
同步 | 将缓冲数据写到底层 |
标准 iostream 对象
<iostream> 提供 8 个全局流对象:
| 对象 | 类型 | 连接到 | 说明 |
|---|---|---|---|
cin |
istream |
stdin |
标准输入,默认绑定到 cout |
cout |
ostream |
stdout |
标准输出 |
cerr |
ostream |
stderr |
标准错误,无缓冲(unitbuf),绑定到 cout |
clog |
ostream |
stderr |
标准错误日志,有缓冲 |
wcin |
wistream |
stdin |
宽字符版 cin |
wcout |
wostream |
stdout |
宽字符版 cout |
wcerr |
wostream |
stderr |
宽字符版 cerr |
wclog |
wostream |
stderr |
宽字符版 clog |
cerr vs clog 的区别
cerr:unitbuf 标志被设置
→ 每次写入后立即刷新
→ 适合调试信息(立即可见)
→ 性能较低
clog:正常缓冲模式
→ 批量刷新
→ 适合日志记录(性能更好)
sync_with_stdio
// 取消 C++ 流和 C stdio 流的同步
ios_base::sync_with_stdio(false);
// 效果:cin/cout 不再与 scanf/printf 同步,速度更快
// 代价:混用 cin 和 scanf 会产生未定义行为
格式化输入 basic_istream
sentry — 输入卫兵
每个格式化输入函数开始时都构造一个 sentry 对象,它负责:
- 检查流状态是否 good
- 如果有绑定流(
tie()),先刷新它 - 如果
skipws标志开启,跳过前导空白字符
sentry 的执行流程:
构造 sentry(is)
|
+-- is.good() 为 false?
| YES → 设置 failbit,sentry 为 false,函数提前返回
|
+-- is.tie() 不为 null?
| YES → 调用 is.tie()->flush()
|
+-- noskipws 为 false 且 skipws 标志开启?
YES → 跳过空白字符
读取字符失败? → 设置 failbit | eofbit
格式化提取(operator>>)
对数值类型(int、double 等),实际由 locale 的 num_get facet 完成解析。对于 short 和 int,标准先用 long 读取,再检查范围是否溢出:
// short 的读取逻辑(简化)
long lval;
// 先读为 long
use_facet<num_get<char>>(...).get(..., lval);
// 再检查范围
if (lval < numeric_limits<short>::min()) {
state |= failbit; // 设置失败标志
val = numeric_limits<short>::min();
} else if (lval > numeric_limits<short>::max()) {
state |= failbit;
val = numeric_limits<short>::max();
} else {
val = static_cast<short>(lval);
}
非格式化输入函数
| 函数 | 说明 |
|---|---|
get() |
读取一个字符(包括空白) |
get(s, n, delim) |
读取最多 n-1 个字符,直到遇到 delim(不消耗 delim) |
getline(s, n, delim) |
读取最多 n-1 个字符,直到遇到 delim(消耗 delim) |
read(s, n) |
读取恰好 n 个字符 |
readsome(s, n) |
只读取当前缓冲区中可用的字符(不阻塞) |
ignore(n, delim) |
丢弃最多 n 个字符,直到遇到 delim |
peek() |
查看下一个字符但不消耗 |
putback(c) |
把字符 c 放回输入序列 |
unget() |
把最后读取的字符放回 |
gcount() |
返回上次非格式化操作读取的字符数 |
get 与 getline 的关键区别
输入流: "hello\nworld\n"
用 get(buf, 100, '\n') 读取后:
buf = "hello"
流中剩余:"\nworld\n" <- '\n' 还在流里!
gcount() = 5
用 getline(buf, 100, '\n') 读取后:
buf = "hello"
流中剩余:"world\n" <- '\n' 被消耗了!
gcount() = 6 <- 计入了被消耗的 '\n'
格式化输出 basic_ostream
sentry — 输出卫兵
输出 sentry 负责:
- 检查流状态(
os.good()) - 调用
os.tie()->flush()(如果有绑定流) - 析构时,如果
unitbuf标志开启,自动调用rdbuf()->pubsync()
填充(Padding)规则
当输出宽度(width())大于内容长度时,自动补充填充字符(fill()):
os.width(10);
os.fill('*');
os << "hi";
根据对齐标志:
right(默认): "********hi"
left: "hi********"
internal: 符号左,数字右(如 "+*******42")
宽度 = max ( o s . w i d t h ( ) , 内容长度 ) 宽度 = \max(os.width(), \text{内容长度}) 宽度=max(os.width(),内容长度)
算术类型的输出
实际由 locale 的 num_put facet 完成格式化:
short 和 int 的输出有特殊处理:
- 如果是十六进制或八进制,先转为 unsigned long 避免负号问题
- 否则转为 long 进行格式化
float 输出先转为 double(避免 num_put 没有 float 的特化)
print 函数(C++23)
// 等价于 cout << format("Hello, {}!\n", name);
print("Hello, {}!\n", name);
// 输出到指定流
print(cerr, "Error: {}\n", msg);
// 自动加换行
println("Result: {}", value);
print 和 println 在 UTF-8 环境下使用 vprint_unicode,会调用平台原生 Unicode API(Windows 上是 WriteConsoleW),确保正确显示 Unicode 字符。
操纵符 Manipulators
操纵符本质上是函数指针,通过 operator<< 或 operator>> 传递给流,从而改变流的状态。
cout << hex; // 相当于:hex(cout); 内部调用 cout.setf(ios_base::hex, ios_base::basefield)
cin >> skipws; // 相当于:skipws(cin);
基础操纵符
| 操纵符 | 效果 |
|---|---|
boolalpha / noboolalpha |
布尔值用字母/数字表示 |
showbase / noshowbase |
显示/隐藏进制前缀 |
showpos / noshowpos |
显示/隐藏正数 + 号 |
skipws / noskipws |
跳过/不跳过前导空白 |
uppercase / nouppercase |
大写/小写十六进制 |
unitbuf / nounitbuf |
每次写后刷新/不刷新 |
dec / oct / hex |
十进制/八进制/十六进制 |
fixed / scientific / hexfloat / defaultfloat |
浮点格式 |
left / right / internal |
对齐方式 |
endl |
输出 '\n' 并刷新 |
ends |
输出空字符 '\0' |
flush |
刷新缓冲区 |
ws |
跳过输入流中的空白 |
<iomanip> 操纵符
| 操纵符 | 效果 |
|---|---|
setw(n) |
设置下次输出的字段宽度(只影响下一次操作!) |
setprecision(n) |
设置浮点精度 |
setfill(c) |
设置填充字符 |
setbase(8/10/16) |
设置整数进制 |
setiosflags(mask) |
设置指定标志位 |
resetiosflags(mask) |
清除指定标志位 |
quoted 操纵符
quoted 用于处理含空格的字符串,自动加引号和转义:
string s = "hello world";
cout << quoted(s); // 输出:"hello world"
string result;
istringstream iss("\"hello world\"");
iss >> quoted(result); // result = "hello world"(引号被去掉)
常见应用场景:CSV、XML 文件处理,或通过 >> 读取含空格的路径。
字符串流
字符串流将 std::string 作为数据源/目标,无需文件 I/O:
stringbuf 的初始化规则
openmode 决定读写区的初始化:
有 out 标志:
pbase() → buf 起始
epptr() >= pbase() + buf.size()
有 ate 标志:pptr() = pbase() + buf.size()(从末尾开始写)
没有 ate: pptr() = pbase()(从开头开始写)
有 in 标志:
eback() → buf 起始
gptr() = eback()
egptr() = eback() + buf.size()
str() 方法的行为
ostringstream oss;
oss << "hello " << 42;
// const & 版本:返回副本
string s1 = oss.str(); // s1 = "hello 42"
// && 版本(C++20):移动,消耗内部缓冲区
string s2 = std::move(oss).str(); // s2 = "hello 42",oss 的缓冲区变空
// view() 版本(C++20):不复制,返回视图
string_view sv = oss.view(); // 零拷贝,但注意生命周期!
Span 流(C++23)
<spanstream> 提供基于 std::span<char> 的流,适用于在固定大小的缓冲区上进行流操作,不涉及内存分配:
char buf[100];
ospanstream oss(span<char>(buf, sizeof(buf)));
oss << "hello " << 42;
// 数据写入 buf,不分配堆内存
与 stringstream 的关键区别:
| 特性 | stringstream | spanstream |
|---|---|---|
| 底层存储 | std::string(动态增长) |
std::span(固定大小) |
| 内存分配 | 是 | 否 |
| 溢出 overflow | 自动扩容 | 不可用(固定大小) |
| 适用场景 | 通用字符串构建 | 嵌入式/性能敏感代码 |
文件流
文件打开模式映射表
| ios_base 标志组合 | stdio 等价 | 说明 |
|---|---|---|
out |
"w" |
写(覆盖) |
out | noreplace |
"wx" |
写(文件已存在则失败) |
out | app |
"a" |
追加写 |
in |
"r" |
只读 |
in | out |
"r+" |
读写(文件必须存在) |
in | out | trunc |
"w+" |
读写(覆盖) |
in | out | app |
"a+" |
读+追加写 |
以上 + binary |
加 b |
如 "wb" "rb" |
filebuf 的字符转换
文件缓冲区在宽字符(wchar_t)模式下,通过 codecvt facet 进行多字节/宽字符转换:
磁盘文件(多字节字节流)
|
| codecvt::in() 转换
v
内部缓冲区(宽字符序列 wchar_t[])
|
| 提供给 wistream/wostream
v
用户代码
转换公式:extern_chars → c o d e c v t : : i n intern_chars (wchar_t) \text{转换公式:extern\_chars} \xrightarrow{codecvt::in} \text{intern\_chars (wchar\_t)} 转换公式:extern_charscodecvt::inintern_chars (wchar_t)
native_handle(C++26)
ifstream ifs("data.txt");
auto handle = ifs.native_handle();
// POSIX 上:handle 是 int(文件描述符)
// Windows 上:handle 是 HANDLE
同步输出流 syncstream
<syncstream> 解决多线程环境下流输出交错的问题。
问题演示
无同步时(多线程输出):
线程A: "Hello"
线程B: "World"
实际输出可能是:"HWelolrold" ← 字符交错!
有 osyncstream 时:
线程A: osyncstream(cout) << "Hello\n";
线程B: osyncstream(cout) << "World\n";
保证输出是:"Hello\nWorld\n" 或 "World\nHello\n" ← 不会交错
syncbuf 的工作原理
osyncstream 的输出流程:
写入字符 → 存入 syncbuf 的内部缓冲区
|
| (不立即输出到 cout)
v
调用 emit() 或析构时
|
| 原子性地获取 wrapped 流的锁
| 将缓冲区全部写入 wrapped 流
| 释放锁
v
cout(wrapped 流)收到完整的字符块
关键性质:emit() 对同一个 wrapped 流的调用是全序的(total order),不会有字符交错。
// 标准用法
{
osyncstream bout(cout);
bout << "Hello, ";
bout << "World!\n";
} // 析构时自动调用 emit(),原子性写入 cout
emit() 与 flush() 的区别:在 osyncstream 上调用 flush() 只是记录"需要刷新",并不立即刷新。真正的刷新发生在 emit() 时。
文件系统 filesystem
<filesystem> 提供跨平台的文件系统操作。
path 类
namespace fs = std::filesystem;
// 路径构造(/ 运算符连接路径)
fs::path p = "/home/user" / "docs" / "file.txt";
// 路径分解
p.root_name() // ""(POSIX)或 "C:"(Windows)
p.root_directory() // "/"
p.parent_path() // "/home/user/docs"
p.filename() // "file.txt"
p.stem() // "file"
p.extension() // ".txt"
// 路径修改
p.replace_extension(".bak"); // "/home/user/docs/file.bak"
p.remove_filename(); // "/home/user/docs/"
路径规范化(normalization)
标准定义了一个 8 步规范化过程,将路径中的 .、.. 和多余的分隔符消除:
规范化步骤(简化):
1. 空路径直接返回
2. 根名中的斜杠替换为首选分隔符
3. 目录分隔符替换为首选分隔符
4. 移除 "." 文件名
5. 移除 "非.." + "/" + ".." 的组合
6. 如果有根目录,移除所有 ".."
7. 如果末尾是 "..",移除末尾分隔符
8. 如果路径为空,置为 "."
示例:
"foo/./bar/.." → "foo/"
"foo/.///bar/../" → "foo/"
错误报告两种模式
所有文件系统函数提供两个版本:
// 抛异常版本(适合错误是"真正异常"情况)
try {
fs::create_directory("/no/permission");
} catch (const fs::filesystem_error& e) {
cerr << e.what() << '\n'; // 错误信息
cerr << e.path1() << '\n'; // 相关路径
}
// error_code 版本(适合错误是"正常情况")
error_code ec;
fs::create_directory("/maybe/exists", ec);
if (ec) {
// 处理错误,不抛异常
}
文件状态与类型
目录迭代
// 非递归迭代
for (const auto& entry : fs::directory_iterator(".")) {
cout << entry.path() << "\n";
cout << " 大小: " << entry.file_size() << "\n"; // 可能使用缓存值!
}
// 递归迭代
for (const auto& entry : fs::recursive_directory_iterator(".")) {
cout << entry.path() << "\n";
}
directory_entry 可能缓存文件属性(大小、最后修改时间等),避免重复访问磁盘。若需要最新数据,调用 refresh()。
常用操作函数速查
| 函数 | 功能 |
|---|---|
exists(p) |
文件是否存在 |
is_regular_file(p) |
是否是普通文件 |
is_directory(p) |
是否是目录 |
is_symlink(p) |
是否是符号链接 |
file_size(p) |
文件大小(字节) |
last_write_time(p) |
最后修改时间 |
create_directory(p) |
创建目录 |
create_directories(p) |
递归创建目录 |
copy(from, to) |
复制文件 |
copy_file(from, to) |
复制普通文件 |
rename(old, new) |
重命名/移动 |
remove(p) |
删除文件或空目录 |
remove_all(p) |
递归删除 |
canonical(p) |
解析符号链接,返回绝对路径 |
relative(p, base) |
计算相对路径 |
space(p) |
查询磁盘空间 |
permissions(p, prms) |
设置文件权限 |
权限枚举
文件权限使用 POSIX 风格的八进制位掩码:
all = 0700 ⏟ owner ∣ 070 ⏟ group ∣ 07 ⏟ others = 0777 \text{all} = \underbrace{0700}_{\text{owner}} \mid \underbrace{070}_{\text{group}} \mid \underbrace{07}_{\text{others}} = 0777 all=owner
0700∣group
070∣others
07=0777
完整可运行示例
// cpp_io_demo.cpp
// 编译命令:
// g++ -std=c++20 -o cpp_io_demo cpp_io_demo.cpp
// clang++ -std=c++20 -o cpp_io_demo cpp_io_demo.cpp
//
// 注意:filesystem 部分在某些编译器上需要链接 -lstdc++fs(旧版GCC)
#include <iostream> // cin, cout, cerr, clog
#include <iomanip> // setw, setprecision, quoted, hex 等
#include <sstream> // ostringstream, istringstream, stringstream
#include <fstream> // ofstream, ifstream, fstream
#include <filesystem> // path, directory_iterator 等
#include <string>
#include <string_view>
#include <format> // std::format (C++20)
#include <thread> // std::thread (用于 syncstream 演示)
#include <syncstream> // osyncstream (C++20)
namespace fs = std::filesystem;
// ============================================================
// 工具:打印章节标题
// ============================================================
void section(const char* title) {
std::cout << "\n===== " << title << " =====\n";
}
// ============================================================
// 1. 基础格式标志演示
// ============================================================
void demo_fmtflags() {
section("格式标志演示");
// --- 整数进制 ---
int n = 255;
std::cout << std::dec << "十进制: " << n << "\n"; // 255
std::cout << std::hex << "十六进制: " << n << "\n"; // ff
std::cout << std::oct << "八进制: " << n << "\n"; // 377
// showbase:显示进制前缀
std::cout << std::hex << std::showbase << n << "\n"; // 0xff
std::cout << std::uppercase << n << "\n"; // 0XFF
// 恢复默认
std::cout << std::dec << std::noshowbase << std::nouppercase;
// --- 浮点数格式 ---
double pi = 3.14159265358979;
std::cout << std::defaultfloat << "默认: " << pi << "\n"; // 3.14159
std::cout << std::fixed << "定点: " << pi << "\n"; // 3.141593
std::cout << std::scientific << "科学计数: " << pi << "\n"; // 3.141593e+00
std::cout << std::hexfloat << "十六进制浮点: " << pi << "\n"; // 0x1.921fb54442d18p+1
// 精度设置
std::cout << std::defaultfloat;
std::cout << std::setprecision(10) << pi << "\n"; // 3.141592654
// --- 对齐与填充 ---
std::cout << std::setw(10) << std::left << "LEFT" << "|\n"; // "LEFT |"
std::cout << std::setw(10) << std::right << "RIGHT" << "|\n"; // " RIGHT|"
std::cout << std::setfill('*') << std::setw(10) << 42 << "\n"; // "********42"
std::cout << std::setfill(' '); // 恢复默认填充字符
// --- 布尔值 ---
std::cout << std::boolalpha << true << " " << false << "\n"; // true false
std::cout << std::noboolalpha << true << " " << false << "\n"; // 1 0
// --- showpos:显示正数符号 ---
std::cout << std::showpos << 42 << " " << -42 << "\n"; // +42 -42
std::cout << std::noshowpos;
}
// ============================================================
// 2. 流状态演示
// ============================================================
void demo_stream_state() {
section("流状态演示");
// 使用 istringstream 模拟带错误的输入
std::istringstream iss("123 abc 456");
int x;
iss >> x;
std::cout << "读取成功: x=" << x
<< " good=" << iss.good() << "\n"; // x=123, good=1
iss >> x; // 尝试读取 "abc" 为整数 → 失败
std::cout << "读取 'abc' 为整数后:"
<< " fail=" << iss.fail()
<< " bad=" << iss.bad() << "\n"; // fail=1, bad=0
// 清除错误标志,继续读取
iss.clear();
std::string s;
iss >> s; // 读取 "abc"
std::cout << "清除后读取: s=" << s << "\n"; // s=abc
iss >> x;
std::cout << "继续读取: x=" << x << "\n"; // x=456
// 到达文件末尾
iss >> x; // 流已空
std::cout << "到达末尾: eof=" << iss.eof()
<< " fail=" << iss.fail() << "\n"; // eof=1, fail=1
}
// ============================================================
// 3. 字符串流演示
// ============================================================
void demo_string_streams() {
section("字符串流演示");
// ostringstream:将多个值拼接成字符串
std::ostringstream oss;
oss << "姓名: " << "Alice"
<< ", 年龄: " << 30
<< ", 分数: " << std::fixed << std::setprecision(2) << 98.5;
std::string result = oss.str();
std::cout << result << "\n";
// istringstream:从字符串解析值
std::string data = "3.14 42 hello";
std::istringstream iss(data);
double d;
int i;
std::string word;
iss >> d >> i >> word;
std::cout << "解析结果: d=" << d << " i=" << i << " word=" << word << "\n";
// stringstream:既可读又可写
std::stringstream ss;
ss << "100 200 300";
int a, b, c;
ss >> a >> b >> c;
std::cout << "a=" << a << " b=" << b << " c=" << c << "\n";
// quoted:处理含空格的字符串
std::string path_with_spaces = "/home/user/my documents/file.txt";
std::ostringstream oss2;
oss2 << std::quoted(path_with_spaces);
std::cout << "quoted 输出: " << oss2.str() << "\n";
// 从 quoted 输入中解析
std::istringstream iss2("\"/home/user/my documents/file.txt\"");
std::string parsed_path;
iss2 >> std::quoted(parsed_path);
std::cout << "解析后路径: " << parsed_path << "\n";
// view():零拷贝访问缓冲区(C++20)
std::ostringstream oss3;
oss3 << "hello world";
std::string_view sv = oss3.view();
std::cout << "view() 视图: " << sv << "\n";
}
// ============================================================
// 4. 文件流演示
// ============================================================
void demo_file_streams() {
section("文件流演示");
const std::string filename = "/tmp/cpp_io_test.txt";
// --- 写入文件 ---
{
std::ofstream ofs(filename); // 默认模式:out | trunc
if (!ofs) {
std::cerr << "无法创建文件: " << filename << "\n";
return;
}
ofs << "第一行\n"
<< "第二行,数字=" << 42 << "\n"
<< "第三行,浮点=" << 3.14 << "\n";
} // ofs 析构,自动关闭并刷新
// --- 读取文件 ---
{
std::ifstream ifs(filename);
if (!ifs) {
std::cerr << "无法打开文件\n";
return;
}
std::string line;
int linenum = 0;
while (std::getline(ifs, line)) {
std::cout << "[" << ++linenum << "] " << line << "\n";
}
std::cout << "读到文件末尾: eof=" << ifs.eof() << "\n";
}
// --- 追加模式 ---
{
std::ofstream ofs(filename, std::ios::app);
ofs << "追加的第四行\n";
}
// --- 二进制读写 ---
const std::string binfile = "/tmp/cpp_io_test.bin";
{
std::ofstream ofs(binfile, std::ios::binary);
int data[] = {1, 2, 3, 4, 5};
// write:写入原始字节
ofs.write(reinterpret_cast<const char*>(data), sizeof(data));
}
{
std::ifstream ifs(binfile, std::ios::binary);
int data[5] = {};
ifs.read(reinterpret_cast<char*>(data), sizeof(data));
std::cout << "二进制读取: ";
for (int v : data) std::cout << v << " ";
std::cout << "\n";
}
// --- seekg / tellg ---
{
std::fstream fs(filename, std::ios::in | std::ios::out);
// 移到文件末尾,获取文件大小
fs.seekg(0, std::ios::end);
std::streampos size = fs.tellg();
std::cout << "文件大小: " << size << " 字节\n";
// 回到文件开头
fs.seekg(0, std::ios::beg);
std::string firstLine;
std::getline(fs, firstLine);
std::cout << "第一行: " << firstLine << "\n";
}
// 清理临时文件
std::remove(filename.c_str());
std::remove(binfile.c_str());
}
// ============================================================
// 5. 操纵符演示
// ============================================================
void demo_manipulators() {
section("操纵符演示");
// setw 只影响下一次输出
std::cout << std::setw(8) << 42 // 宽度8
<< std::setw(8) << "hello" // 宽度8
<< "\n";
// 输出: 42 hello
// get_money / put_money(需要 locale 支持)
// 此处略去,因各平台 locale 配置不同
// resetiosflags:精确清除标志
std::cout << std::hex << std::showbase << 255 << "\n"; // 0xff
std::cout << std::resetiosflags(std::ios::showbase | std::ios::hex);
std::cout << 255 << "\n"; // 255(恢复十进制,无前缀)
}
// ============================================================
// 6. 同步流演示(多线程安全输出)
// ============================================================
void demo_syncstream() {
section("同步输出流演示");
// 多线程输出,每个线程使用 osyncstream 包装 cout
auto thread_func = [](int id) {
// 每个 osyncstream 析构时原子性写入 cout
std::osyncstream(std::cout)
<< "线程 " << id << " 的输出\n";
};
std::thread t1(thread_func, 1);
std::thread t2(thread_func, 2);
std::thread t3(thread_func, 3);
t1.join();
t2.join();
t3.join();
std::cout << "(以上三行顺序不定,但每行都完整)\n";
// emit() 的手动控制
{
std::osyncstream bout(std::cout);
bout << "Part 1, ";
bout.emit(); // 原子性写入 "Part 1, "
bout << "Part 2\n";
// 析构时自动 emit() "Part 2\n"
}
}
// ============================================================
// 7. 文件系统演示
// ============================================================
void demo_filesystem() {
section("文件系统演示");
// --- 路径操作 ---
fs::path p = "/home/user/docs/report.pdf";
std::cout << "完整路径: " << p << "\n";
std::cout << "父目录: " << p.parent_path() << "\n";
std::cout << "文件名: " << p.filename() << "\n";
std::cout << "主干名: " << p.stem() << "\n";
std::cout << "扩展名: " << p.extension() << "\n";
// 路径连接
fs::path base = "/tmp";
fs::path full = base / "subdir" / "file.txt";
std::cout << "连接路径: " << full << "\n";
// 路径规范化
fs::path messy = "/tmp/./foo/../bar";
std::cout << "规范化: " << messy.lexically_normal() << "\n";
// --- 临时目录 ---
try {
fs::path tmp = fs::temp_directory_path();
std::cout << "临时目录: " << tmp << "\n";
} catch (const fs::filesystem_error& e) {
std::cerr << "无法获取临时目录: " << e.what() << "\n";
}
// --- 文件操作 ---
fs::path testfile = "/tmp/fs_test_12345.txt";
try {
// 创建测试文件
{
std::ofstream ofs(testfile);
ofs << "test content";
}
// 查询文件信息
std::cout << "文件是否存在: " << fs::exists(testfile) << "\n";
std::cout << "是普通文件: " << fs::is_regular_file(testfile) << "\n";
std::cout << "文件大小: " << fs::file_size(testfile) << " 字节\n";
// 复制文件
fs::path copyfile = "/tmp/fs_test_copy.txt";
fs::copy_file(testfile, copyfile, fs::copy_options::overwrite_existing);
std::cout << "复制后存在: " << fs::exists(copyfile) << "\n";
// 重命名
fs::path renamed = "/tmp/fs_test_renamed.txt";
fs::rename(copyfile, renamed);
std::cout << "重命名后: original=" << fs::exists(copyfile)
<< " renamed=" << fs::exists(renamed) << "\n";
// 清理
fs::remove(testfile);
fs::remove(renamed);
} catch (const fs::filesystem_error& e) {
std::cerr << "文件系统操作失败: " << e.what() << "\n";
}
// --- 目录迭代 ---
try {
std::cout << "\n/tmp 目录下的前5个条目:\n";
int count = 0;
for (const auto& entry : fs::directory_iterator("/tmp")) {
if (count++ >= 5) break;
std::cout << " " << entry.path().filename()
<< " [" << (entry.is_directory() ? "目录" : "文件") << "]\n";
}
} catch (const fs::filesystem_error& e) {
std::cerr << "迭代失败: " << e.what() << "\n";
}
// --- 磁盘空间 ---
try {
auto info = fs::space("/tmp");
std::cout << "\n磁盘空间信息:\n";
std::cout << " 总容量: " << info.capacity / (1024*1024) << " MB\n";
std::cout << " 可用空间: " << info.available / (1024*1024) << " MB\n";
} catch (const fs::filesystem_error& e) {
std::cerr << "查询磁盘空间失败: " << e.what() << "\n";
}
// --- error_code 版本(不抛异常)---
std::error_code ec;
bool exists = fs::exists("/nonexistent_path_xyz", ec);
std::cout << "\n不存在的路径: exists=" << exists
<< " error=" << ec.message() << "\n";
}
// ============================================================
// 主函数
// ============================================================
int main() {
std::cout << "C++ 输入/输出库演示\n";
std::cout << "===================\n";
demo_fmtflags();
demo_stream_state();
demo_string_streams();
demo_file_streams();
demo_manipulators();
demo_syncstream();
demo_filesystem();
return 0;
}
https://godbolt.org/z/vPE5M4r88
关键设计思想总结
分层设计
ios_base → 格式标志、流状态、openmode(与字符类型无关)
basic_ios → 缓冲区指针、fill字符、locale(依赖字符类型)
basic_istream/basic_ostream → 格式化层
basic_streambuf → 缓冲区层
具体实现(filebuf/stringbuf) → 数据源层
虚函数扩展点
underflow() → 自定义"读区空了时的行为"
overflow() → 自定义"写区满了时的行为"
seekoff/seekpos → 自定义定位行为
sync() → 自定义同步行为
错误处理双轨制
抛异常版本 → 适合"错误是真正异常情况"的代码
error_code版本 → 适合"错误是正常情况"的高性能代码
常见陷阱和最佳实践
| 陷阱 | 说明 | 解决方案 |
|---|---|---|
setw() 只生效一次 |
每次输出后宽度自动重置为0 | 每次输出前重新设置 |
get() 不消耗分隔符 |
读取后分隔符仍在流中 | 改用 getline() 或手动 ignore() |
混用 cin/scanf |
两者的缓冲区可能不同步 | 只使用一种,或调用 sync_with_stdio(false) |
string_view 从 view() 悬空 |
oss.view() 在修改 oss 后失效 |
及时使用,或用 str() 复制 |
osyncstream 的 flush() 不立即刷新 |
只记录意图,不真正刷新 | 用 emit() 或 flush_emit() |
filesystem::path 的比较 |
== 是词法比较,不是文件等价 |
文件等价用 equivalent() |
C++26 并发支持库 完全解读
基于 N5032 标准文档第32章,面向 C++ 开发者的中文详解
目录
- 总览
- 基础要求
- 停止令牌 Stop Tokens
- 线程 Threads
- 原子操作 Atomic Operations
- 互斥锁 Mutual Exclusion
- 条件变量 Condition Variables
- 信号量 Semaphores
- 协调类型 Latch 和 Barrier
- 期货 Futures
- 安全回收 Safe Reclamation
- 综合示例:线程池
- 关键知识点总结
总览
C++ 并发支持库提供了一整套用于创建和管理线程、执行互斥、线程间通信的组件。
+------------------------------------------------------------+
| C++ 并发支持库(第32章) |
+--------------------+---------------------------------------+
| 子章节 | 头文件 |
+--------------------+---------------------------------------+
| 32.3 停止令牌 | <stop_token> |
| 32.4 线程 | <thread> |
| 32.5 原子操作 | <atomic>, <stdatomic.h> |
| 32.6 互斥锁 | <mutex>, <shared_mutex> |
| 32.7 条件变量 | <condition_variable> |
| 32.8 信号量 | <semaphore> |
| 32.9 协调类型 | <latch>, <barrier> |
| 32.10 期货 | <future> |
| 32.11 安全回收 | <rcu>, <hazard_pointer> |
+--------------------+---------------------------------------+
基础要求
锁类型层级
C++ 标准定义了一组可锁类型的接口要求,形成层级关系:
超时规范
函数命名规则:
- 以
_for结尾:接受相对时间(duration),推荐使用 steady_clock 计时 - 以
_until结尾:接受绝对时间点(time_point),使用指定 clock 计时
超时的实际耗时公式:
t r e a l = D t + D i + D m t_{real} = D_t + D_i + D_m treal=Dt+Di+Dm
其中: - D t D_t Dt 是用户指定的等待时长
- D i D_i Di 是中断响应、函数返回、调度带来的"实现质量"延迟(Implementation quality delay)
- D m D_m Dm 是处理器和内存资源争用带来的"管理质量"延迟(Management quality delay)
停止令牌
停止令牌(Stop Token)是 C++20 引入的机制,允许一个线程礼貌地请求另一个线程停止执行。
核心概念
工作机制
线程A(持有 stop_source) 线程B(持有 stop_token)
| |
| source.request_stop() | token.stop_requested()
| 返回 true | --> 返回 true
| |
|-------- 触发所有已注册回调 -------->|
stop_source 与 stop_token 完整示例
#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>
int main() {
// 创建停止源(持有共享停止状态,类似 shared_ptr 引用计数)
std::stop_source source;
// 从停止源获取令牌(用于传递给需要被停止的线程)
std::stop_token token = source.get_token();
// 注册停止回调:当 request_stop() 被调用时,此 lambda 自动执行
// 回调在 request_stop() 的调用线程中同步执行
std::stop_callback cb(token, [] {
std::cout << "收到停止请求,正在清理资源...\n";
});
// 模拟工作线程:接收 token 的副本(token 是可复制的)
std::thread worker([tok = token] {
// 循环检查是否收到停止请求
while (!tok.stop_requested()) {
std::cout << "工作线程运行中...\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "工作线程收到停止信号,退出。\n";
});
// 主线程等待一段时间后请求停止
std::this_thread::sleep_for(std::chrono::milliseconds(350));
// request_stop() 只有第一次调用有效,之后调用无效返回 false
bool first_request = source.request_stop(); // true
bool second_request = source.request_stop(); // false(已经请求过)
worker.join();
return 0;
}
https://godbolt.org/z/3zh4hh19T
never_stop_token
#include <stop_token>
// never_stop_token:永不停止的特殊令牌
// stop_possible() 和 stop_requested() 均为 constexpr 返回 false
// 适合用于不需要停止功能的通用模板代码中作为默认参数
std::never_stop_token nst;
// nst.stop_possible() == false (编译期常量,零开销)
// nst.stop_requested() == false (编译期常量,零开销)
stop_source 与 inplace_stop_source 对比
| 特性 | stop_source | inplace_stop_source |
|---|---|---|
| 所有权 | 共享所有权(引用计数,类似 shared_ptr) | 唯一所有权(停止状态直接嵌入对象内部) |
| 可复制 | 是(多个 source 指向同一状态) | 否(不可移动也不可复制) |
| 令牌类型 | stop_token | inplace_stop_token |
| 适用场景 | 跨线程共享停止信号 | 生命周期明确、使用方全部在 source 作用域内的场景 |
#include <stop_token>
#include <iostream>
int main() {
std::inplace_stop_source src;
// inplace_stop_token 只持有指向 src 的指针,不参与所有权
std::inplace_stop_token tok = src.get_token();
// 注意:回调对象的生命周期必须嵌套在 src 的生命周期内
std::inplace_stop_callback cb(tok, [] {
std::cout << "inplace 停止回调触发\n";
});
src.request_stop(); // 触发回调
// src 析构时,其内部停止状态随之销毁
return 0;
}
线程
std::thread 基础
#include <iostream>
#include <thread>
#include <string>
// 普通函数作为线程入口
void task(int id, const std::string& msg) {
std::cout << "线程 " << id << ": " << msg << "\n";
}
int main() {
// 构造函数:新线程立即开始执行
// f 和 args 会在构造线程中被 decay-copy,然后在新线程中调用
std::thread t1(task, 1, "Hello");
// joinable() 返回 true 表示线程代表了一个活跃的执行
// 默认构造、被 move 走、join/detach 后均为 not joinable
if (t1.joinable()) {
t1.join(); // 阻塞等待线程 t1 结束
}
// 移动语义:线程所有权可以转移,t2 不再代表线程
std::thread t2(task, 2, "World");
std::thread t3 = std::move(t2); // t2 变为 not joinable
t3.join();
// hardware_concurrency() 返回建议的并发线程数量
// 只是提示,返回 0 表示无法确定
unsigned int n = std::thread::hardware_concurrency();
std::cout << "建议线程数: " << n << "\n";
return 0;
}
https://godbolt.org/z/T8azra631
thread 生命周期状态机
std::thread 状态转换:
[默认构造] 或 [被 move 走] 或 [join/detach 后]
|
| 不可加入(not joinable),get_id() == id()
|
| thread(f, args...) --> 创建新线程,立即开始执行
v
[可加入 joinable]
|
|--- join() ----> 阻塞等待线程完成 ----> [不可加入]
|
|--- detach() ---> 线程独立运行 ---------> [不可加入]
|
|--- 析构时若仍 joinable() --> 调用 terminate()!
警告: thread 对象在 joinable() 为 true 时被析构,程序会调用 std::terminate() 终止!
std::jthread:自动 join 的线程(C++20)
jthread 是改进版线程,析构时自动发出停止请求并 join,避免了忘记 join 的问题:
#include <iostream>
#include <thread>
#include <stop_token>
#include <chrono>
int main() {
// jthread 构造时,若 f 的第一个参数是 stop_token,自动传入内部的 stop_token
// 否则不传 stop_token(和 std::thread 一样的用法)
std::jthread jt([](std::stop_token stoken) {
int count = 0;
while (!stoken.stop_requested()) {
std::cout << "jthread 工作中,计数: " << count++ << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "jthread 优雅退出\n";
});
std::this_thread::sleep_for(std::chrono::milliseconds(350));
// jthread 析构时自动执行:
// 1. jt.request_stop() --> 向内部 stop_source 发出停止请求
// 2. jt.join() --> 等待线程结束
// 无需手动操作!
return 0;
}
https://godbolt.org/z/nYWEj4nYo
jthread vs thread 对比
| 特性 | std::thread | std::jthread |
|---|---|---|
| 析构时行为 | 若 joinable 则 terminate | 自动 request_stop + join |
| 停止支持 | 无(需要手动实现) | 内置 stop_source/stop_token |
| 引入版本 | C++11 | C++20 |
| 是否可 detach | 是 | 是(但通常不需要) |
this_thread 命名空间
#include <thread>
#include <chrono>
#include <iostream>
void demo_this_thread() {
// 获取当前线程的唯一标识符
// 每次调用返回相同值;不等于默认构造的 thread::id
std::thread::id my_id = std::this_thread::get_id();
std::cout << "当前线程 ID: " << my_id << "\n";
// yield:向实现提示可以重新调度
// 不做任何同步保证,仅是提示
std::this_thread::yield();
// sleep_for:相对睡眠(阻塞当前线程至少 rel_time)
// 实现应使用 steady_clock 计时
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// sleep_until:绝对睡眠(阻塞当前线程直到 abs_time)
auto wake_time = std::chrono::steady_clock::now()
+ std::chrono::seconds(1);
std::this_thread::sleep_until(wake_time);
}
原子操作
原子操作是不可分割的操作,在执行过程中不会被其他线程的操作打断。
内存顺序(memory_order)
这是并发编程中最重要的概念,控制原子操作与周围内存操作的可见性和顺序:
严格程度(从严到宽):
memory_order::seq_cst (顺序一致,默认值)
所有 seq_cst 操作在所有线程中有统一的全局顺序
最安全,但性能最低
memory_order::acq_rel (获取-释放,用于读改写操作如 CAS)
同时具有 acquire 和 release 语义
memory_order::release (释放,用于写操作/store)
本操作之前的所有写操作,对执行了 acquire 读取此值的线程可见
memory_order::acquire (获取,用于读操作/load)
读到 release 写入的值后,该值之前的所有写对本线程可见
memory_order::relaxed (宽松)
只保证操作本身的原子性,不提供任何跨线程同步保证
性能最高,但最难正确使用
release-acquire 同步关系示意:
s t o r e ( r e l e a s e ) → 同步 l o a d ( a c q u i r e ) 读到了该值 store(release) \xrightarrow{同步} load(acquire)\ 读到了该值 store(release)同步load(acquire) 读到了该值
线程1 (producer) 线程2 (consumer)
data = 42; (1)
ready.store(true, (2) --> [读到true] ready.load(acquire) (3)
release); |
assert(data == 42) (4) 保证成立
(2) release-写 同步于 (3) acquire-读,因此 (1) 的写 先序可见于 (4)。
release-acquire 同步完整示例
#include <atomic>
#include <thread>
#include <cassert>
#include <iostream>
std::atomic<int> data{0};
std::atomic<bool> ready{false};
void producer() {
// relaxed 写:此写操作本身不提供同步
data.store(42, std::memory_order::relaxed); // (1)
// release 写:确保 (1) 对所有 acquire 读到 true 的线程可见
ready.store(true, std::memory_order::release); // (2)
}
void consumer() {
// acquire 读:看到 ready==true 后,保证能看到 (2) 之前的所有写操作
while (!ready.load(std::memory_order::acquire)); // (3)
// 到这里,data 必定等于 42
assert(data.load(std::memory_order::relaxed) == 42); // (4)
std::cout << "data = " << data.load() << "\n";
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
https://godbolt.org/z/Y8eGbz611
atomic 基本操作
#include <atomic>
#include <iostream>
int main() {
std::atomic<int> counter{0};
// store:原子写(只允许 relaxed/release/seq_cst)
counter.store(10);
// load:原子读(只允许 relaxed/acquire/seq_cst)
int val = counter.load(); // val == 10
// exchange:原子地写入新值,返回旧值(读改写操作)
int old = counter.exchange(20); // old == 10, counter == 20
// compare_exchange_strong(CAS 操作):
// 若 counter 当前值 == expected,则将 counter 改为 desired,返回 true
// 否则将 counter 当前值写入 expected,返回 false(不修改 counter)
int expected = 20;
bool success = counter.compare_exchange_strong(expected, 30);
// success == true, counter == 30
// compare_exchange_strong 失败的情况:
expected = 999; // 故意设置错误的期望值
success = counter.compare_exchange_strong(expected, 100);
// success == false, counter 不变(仍为30), expected 被更新为 30
// fetch_add:原子加,返回操作之前的旧值
int prev = counter.fetch_add(5); // prev == 30, counter == 35
// 运算符形式(等价于 fetch_add/fetch_sub 但返回新值)
counter += 1; // counter == 36,等价于 fetch_add(1)(忽略返回值)
++counter; // counter == 37,等价于 fetch_add(1) + 1
int v = counter++; // v == 37(旧值), counter == 38,等价于 fetch_add(1)
std::cout << "最终值: " << counter.load() << "\n"; // 38
return 0;
}
https://godbolt.org/z/eeWW4G7GK
compare_exchange_weak 与 strong 的区别
#include <atomic>
std::atomic<int> val{0};
// weak 版本:可能发生"虚假失败"(spurious failure)
// 即使 val == expected,也可能返回 false
// 在 ARM 等 LL/SC 架构上性能优于 strong
// 适合在循环中使用
void update_with_weak(int new_val) {
int expected = val.load(std::memory_order::relaxed);
// 循环重试:weak 的虚假失败会导致多循环一次,但不影响正确性
while (!val.compare_exchange_weak(
expected, // 失败时被更新为 val 的当前值
new_val,
std::memory_order::release, // 成功时的内存顺序
std::memory_order::relaxed // 失败时的内存顺序
)) {
// expected 已被更新,下次循环用新值继续尝试
}
}
// strong 版本:只有值真正不匹配才失败(不会虚假失败)
// 适合只需要尝试一次的场景(不在循环中)
bool try_update_once(int expected_val, int new_val) {
// 注意:expected_val 是按值传入,失败时会被修改
return val.compare_exchange_strong(expected_val, new_val);
}
无锁栈(atomic 高级用法)
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
// 无锁栈(演示 CAS 循环的典型用法)
template<typename T>
class LockFreeStack {
struct Node {
T data;
Node* next;
explicit Node(T val) : data(std::move(val)), next(nullptr) {}
};
std::atomic<Node*> head{nullptr};
public:
void push(T val) {
Node* new_node = new Node(std::move(val));
// 先设置 next 为当前 head
new_node->next = head.load(std::memory_order::relaxed);
// CAS 循环:尝试将 head 从旧值替换为 new_node
// 若中途 head 被其他线程修改,next 被更新后重试
while (!head.compare_exchange_weak(
new_node->next, // expected(失败时自动更新)
new_node, // desired
std::memory_order::release, // 成功:新节点对所有 acquire 可见
std::memory_order::relaxed // 失败:不需要同步
));
}
bool pop(T& result) {
Node* old_head = head.load(std::memory_order::acquire);
// 尝试将 head 向前移动一步
while (old_head && !head.compare_exchange_weak(
old_head,
old_head->next,
std::memory_order::acquire,
std::memory_order::relaxed
));
if (!old_head) return false; // 栈为空
result = std::move(old_head->data);
delete old_head; // 注意:真实无锁代码这里需要危险指针或 RCU 保护
return true;
}
~LockFreeStack() {
T dummy;
while (pop(dummy));
}
};
int main() {
LockFreeStack<int> stack;
stack.push(1);
stack.push(2);
stack.push(3);
int val;
while (stack.pop(val)) {
std::cout << val << "\n"; // 输出 3, 2, 1(后进先出)
}
return 0;
}
https://godbolt.org/z/Mq9xffGcb
atomic_flag 与自旋锁
atomic_flag 是唯一保证无锁的原子类型,可用于实现自旋锁:
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
// 使用 atomic_flag 实现自旋锁
class SpinLock {
// atomic_flag 默认初始化为 clear(false)状态
std::atomic_flag flag{};
public:
void lock() {
// test_and_set:原子地设为 true,返回旧值
// 若旧值为 false,说明成功获得锁
// 若旧值为 true,说明锁被占用,继续自旋
while (flag.test_and_set(std::memory_order::acquire)) {
// 空循环等待(自旋)
// 在 x86 上可插入 _mm_pause() 减少功耗和提高性能
}
}
void unlock() {
// clear:原子地设为 false,释放锁
// release 顺序保证临界区内的写操作对下一个 acquire 可见
flag.clear(std::memory_order::release);
}
};
SpinLock spin;
int shared_counter = 0;
void increment(int times) {
for (int i = 0; i < times; ++i) {
spin.lock();
++shared_counter; // 临界区
spin.unlock();
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(increment, 10000);
}
for (auto& t : threads) t.join();
std::cout << "计数器最终值: " << shared_counter
<< "(期望 40000)\n";
return 0;
}
https://godbolt.org/z/fvdf5zeK7
内存屏障(Fence)
内存屏障比 release/acquire 更灵活,但也更难正确使用:
#include <atomic>
#include <thread>
#include <iostream>
int data_value = 0;
std::atomic<bool> data_ready{false};
void fence_producer() {
data_value = 42; // 普通(非原子)写
// release fence:保证 fence 之前的所有写操作(包括非原子写)
// 对执行了对应 acquire fence 的线程可见
std::atomic_thread_fence(std::memory_order::release);
// relaxed store:只需保证指针更新可见,不需要本身是 release
data_ready.store(true, std::memory_order::relaxed);
}
void fence_consumer() {
// 等待数据就绪
while (!data_ready.load(std::memory_order::relaxed));
// acquire fence:与 release fence 配对,确保 fence 之后的读
// 能看到对应 release fence 之前的所有写
std::atomic_thread_fence(std::memory_order::acquire);
// 现在可以安全读取 data_value,保证能看到 42
std::cout << "data_value = " << data_value << "\n"; // 42
}
C++26 新增:atomic modify-write 操作
C++26 新增了 store_add、store_sub、store_and 等操作,与 fetch_add 的区别在于:
fetch_add(n):读改写操作(RMW),返回旧值,使用 acquire/release 语义store_add(n):仅写操作(modify-write),不返回旧值,只允许 relaxed/release/seq_cst,允许更激进的硬件优化(如树形规约)
#include <atomic>
std::atomic<int> counter{0};
// fetch_add:RMW,保证严格的修改顺序读取
int old = counter.fetch_add(1); // 返回旧值
// store_add:modify-write,不需要读取旧值,可能更高效
counter.store_add(1, std::memory_order::relaxed); // 不返回值
互斥锁
mutex 类型一览
互斥锁类型体系:
std::mutex
非递归,独占,最基础的互斥锁
std::recursive_mutex
可递归(同线程可多次加锁),独占
每次 lock() 对应一次 unlock()
std::timed_mutex
非递归,独占,额外支持 try_lock_for / try_lock_until
std::recursive_timed_mutex
可递归,独占,带超时
std::shared_mutex
非递归,支持共享(读)锁 + 独占(写)锁
多线程可同时持有共享锁,但独占锁与其他锁互斥
std::shared_timed_mutex
非递归,共享/独占,带超时
RAII 锁包装器速查
| 包装器 | 特点 | 适用场景 |
|---|---|---|
lock_guard<M> |
构造加锁,析构解锁,不可移动不可复制 | 简单作用域保护 |
scoped_lock<M...> |
同时锁多个互斥量,内置死锁避免算法 | 需要同时持有多个锁 |
unique_lock<M> |
可延迟加锁、可手动加解锁、可转移所有权 | 配合条件变量;需要灵活控制锁时机 |
shared_lock<M> |
获取共享(读)锁 | shared_mutex 的读端 |
读写锁示例
#include <shared_mutex>
#include <mutex>
#include <iostream>
#include <thread>
#include <vector>
#include <string>
// 线程安全的读写缓存
class ThreadSafeCache {
mutable std::shared_mutex rw_mutex; // mutable 允许 const 方法加锁
std::string data{"初始数据"};
public:
// 读操作:获取共享锁(多线程可同时读,不影响其他读者)
std::string read() const {
std::shared_lock lock(rw_mutex); // CTAD 推导出 shared_lock<shared_mutex>
return data;
}
// 写操作:获取独占锁(阻止所有其他读写操作)
void write(const std::string& new_data) {
std::unique_lock lock(rw_mutex); // 独占锁
data = new_data;
}
};
// 防止死锁:同时锁两个互斥量
void safe_lock_both_mutex_demo() {
std::mutex m1, m2;
// scoped_lock 内部使用死锁避免算法(类似 std::lock)
// 无论 m1 m2 的顺序,都不会死锁
std::scoped_lock lock(m1, m2);
// 临界区:同时持有 m1 和 m2
}
int main() {
ThreadSafeCache cache;
// 多个读线程并发读
std::vector<std::thread> readers;
for (int i = 0; i < 3; ++i) {
readers.emplace_back([&cache, i] {
std::cout << "读线程" << i << ": " << cache.read() << "\n";
});
}
// 写线程
std::thread writer([&cache] {
cache.write("更新后的数据");
std::cout << "写完成\n";
});
for (auto& t : readers) t.join();
writer.join();
return 0;
}
https://godbolt.org/z/fq1531Kae
unique_lock 高级用法
#include <mutex>
#include <chrono>
#include <iostream>
// 演示 unique_lock 的灵活性
void unique_lock_demo() {
std::mutex mtx;
// 1. defer_lock:构造时不加锁
{
std::unique_lock lock(mtx, std::defer_lock); // 未加锁
// ... 做一些准备工作 ...
lock.lock(); // 手动加锁
// 临界区
lock.unlock(); // 手动解锁(析构时若 owns_lock() 会再次解锁)
}
// 2. adopt_lock:已经持有锁,让 unique_lock 接管
{
mtx.lock(); // 手动加锁
std::unique_lock lock(mtx, std::adopt_lock); // 接管,不再加锁
// 析构时自动解锁
}
// 3. try_to_lock:尝试加锁,不阻塞
{
std::unique_lock lock(mtx, std::try_to_lock);
if (lock.owns_lock()) {
std::cout << "成功获取锁\n";
} else {
std::cout << "锁已被占用\n";
}
}
// 4. 带超时的加锁
{
std::timed_mutex timed_mtx;
std::unique_lock timed_lock(
timed_mtx,
std::chrono::milliseconds(100) // 最多等 100ms
);
if (timed_lock.owns_lock()) {
std::cout << "在超时前获取了锁\n";
}
}
}
call_once:线程安全的一次性初始化
#include <mutex>
#include <iostream>
#include <thread>
#include <vector>
class Singleton {
static std::once_flag init_flag;
static Singleton* instance;
Singleton() {
std::cout << "Singleton 初始化(只执行一次)\n";
}
public:
static Singleton& get() {
// call_once 保证 lambda 只被调用一次
// 即使多个线程同时进入,也只有一个线程执行初始化
// 其他线程被阻塞,直到初始化完成
std::call_once(init_flag, [] {
instance = new Singleton();
});
return *instance;
}
void do_work() {
std::cout << "工作中(来自线程 "
<< std::this_thread::get_id() << ")\n";
}
};
std::once_flag Singleton::init_flag;
Singleton* Singleton::instance = nullptr;
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back([] { Singleton::get().do_work(); });
}
for (auto& t : threads) t.join();
// "Singleton 初始化" 只打印一次
return 0;
}
https://godbolt.org/z/T54fMTWTE
条件变量
条件变量用于线程间的等待/通知机制。
基本工作模式
生产者-消费者信号流:
生产者线程 消费者线程
| |
| 1. 获取互斥锁 | 1. 获取互斥锁
| 2. 修改共享状态 | 2. while (!条件满足) {
| 3. cv.notify_one() | cv.wait(lock)
| 4. 释放锁 | // 原子释放锁 + 进入等待
| | // 被唤醒后重新获取锁
| | }
| | 3. 使用共享状态
| | 4. 释放锁
为什么必须用 while 而不是 if?
因为存在虚假唤醒(spurious wakeup):线程可能在没有 notify 的情况下被唤醒。
用 while 或谓词版本的 wait 可以防止这个问题。
生产者消费者完整示例
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <string>
// 线程安全队列
template<typename T>
class SafeQueue {
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable cv;
bool done = false;
public:
// 生产者:推入元素
void push(T item) {
{
std::lock_guard lock(mtx);
queue.push(std::move(item));
}
cv.notify_one(); // 通知至少一个等待的消费者
}
// 通知消费者不再有新数据
void finish() {
{
std::lock_guard lock(mtx);
done = true;
}
cv.notify_all(); // 唤醒所有等待的消费者
}
// 消费者:取出元素,队列空时阻塞等待
bool pop(T& item) {
std::unique_lock lock(mtx); // 条件变量需要 unique_lock
// wait(lock, pred) 等价于:
// while (!pred()) { cv.wait(lock); }
// wait 会原子地 unlock + 进入等待,被唤醒时重新 lock
cv.wait(lock, [this] {
return !queue.empty() || done;
});
if (queue.empty()) return false; // done==true 且队列为空
item = std::move(queue.front());
queue.pop();
return true;
}
};
int main() {
SafeQueue<std::string> sq;
std::thread producer([&sq] {
for (int i = 0; i < 5; ++i) {
sq.push("任务 " + std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
sq.finish(); // 通知消费者:没有更多任务了
});
std::thread consumer([&sq] {
std::string item;
while (sq.pop(item)) {
std::cout << "消费: " << item << "\n";
}
std::cout << "消费者完成\n";
});
producer.join();
consumer.join();
return 0;
}
https://godbolt.org/z/o6GT1Mo9T
condition_variable_any 与可中断等待
condition_variable_any 支持任意 BasicLockable 类型,并支持配合 stop_token 的可中断等待:
#include <condition_variable>
#include <mutex>
#include <stop_token>
#include <thread>
#include <iostream>
std::condition_variable_any cv;
std::mutex mtx;
bool data_ready = false;
// 可中断的等待函数:stop 请求到来时立即退出等待
void interruptible_worker(std::stop_token stoken) {
std::unique_lock lock(mtx);
// 可中断等待:
// - 正常情况:条件满足时返回 true
// - 中断情况:stop_requested() 为 true 时返回 false
bool result = cv.wait(lock, stoken, [] { return data_ready; });
if (result) {
std::cout << "条件满足,正常退出等待\n";
} else {
// stoken.stop_requested() 一定为 true
std::cout << "收到停止请求,退出等待(条件可能未满足)\n";
}
}
int main() {
std::stop_source ss;
std::thread worker(interruptible_worker, ss.get_token());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 发出停止请求,worker 从 wait 中退出(无需 notify)
ss.request_stop();
worker.join();
return 0;
}
https://godbolt.org/z/h6ffj81hM
信号量
信号量比互斥锁更轻量,维护一个内部计数器,用于控制对资源的并发访问数量。
计数器始终满足: c o u n t e r ≥ 0 counter \geq 0 counter≥0
操作语义:
release(n): c o u n t e r + = n counter \mathrel{+}= n counter+=n,唤醒等待的线程(n 默认为 1)acquire():若 c o u n t e r > 0 counter > 0 counter>0 则 c o u n t e r − = 1 counter \mathrel{-}= 1 counter−=1 立即返回;否则阻塞
信号量与互斥锁的区别
| 特性 | mutex | binary_semaphore(初始值1) |
|---|---|---|
| 所有权 | 有(只有加锁的线程可以解锁) | 无(任意线程可以 release) |
| 用途 | 保护临界区 | 线程间信号传递、事件通知 |
| 初始状态 | 解锁 | 可自定义(0=等待,1=可用) |
#include <semaphore>
#include <thread>
#include <iostream>
#include <vector>
// 示例1:限流器(同时最多允许 N 个线程访问资源)
template<int MaxConcurrency>
class RateLimiter {
// counting_semaphore<N>:内部计数最大值至少为 N
std::counting_semaphore<MaxConcurrency> sem{MaxConcurrency};
public:
void access(int thread_id) {
sem.acquire(); // 计数-1;若为0则阻塞等待
std::cout << "线程 " << thread_id << " 开始访问\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "线程 " << thread_id << " 释放资源\n";
sem.release(); // 计数+1,唤醒等待的线程
}
};
// 示例2:线程间单向信号传递(binary_semaphore)
void signal_demo() {
// 初始值为 0:消费者会立即阻塞等待信号
std::binary_semaphore signal{0};
std::thread waiter([&signal] {
std::cout << "等待信号...\n";
signal.acquire(); // 阻塞,直到计数 > 0
std::cout << "收到信号,继续执行\n";
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
signal.release(); // 发出信号(计数从0变为1)
waiter.join();
}
int main() {
RateLimiter<3> limiter; // 同时最多3个线程
std::vector<std::thread> threads;
for (int i = 0; i < 8; ++i) {
threads.emplace_back([&limiter, i] { limiter.access(i); });
}
for (auto& t : threads) t.join();
signal_demo();
return 0;
}
协调类型
Latch(闩锁)
Latch 是一次性的倒数计数器,一旦计数到零就永远保持在"打开"状态,不能重置。
latch 工作原理(expected = 3):
线程1 线程2 线程3
| | |
| | |-- count_down(1) --> counter: 3 -> 2
| | |
| |-- count_down(1) -------> counter: 2 -> 1
| |
|-- count_down(1) -------------> counter: 1 -> 0
|
解除所有 wait() 阻塞!
#include <latch>
#include <thread>
#include <iostream>
#include <vector>
int main() {
const int N = 5;
// 创建 latch,初始计数为 N
// 当 count_down 被调用 N 次后(计数降为0),所有 wait() 返回
std::latch sync_point{N};
std::vector<std::thread> workers;
for (int i = 0; i < N; ++i) {
workers.emplace_back([&sync_point, i] {
// 模拟每个线程的初始化工作(耗时不同)
std::this_thread::sleep_for(
std::chrono::milliseconds(50 * (i + 1)));
std::cout << "线程 " << i << " 完成初始化\n";
// 通知:我已就绪,同时等待所有线程就绪
// arrive_and_wait = count_down(1) + wait()
sync_point.arrive_and_wait();
// 所有线程都到达后,同时开始实际工作
std::cout << "线程 " << i << " 开始工作\n";
});
}
for (auto& t : workers) t.join();
return 0;
}
Barrier(屏障)
Barrier 是可复用的多阶段同步点,每个阶段结束时可执行一个完成函数:
barrier 阶段循环(expected = 3):
第1阶段 第2阶段 第3阶段
T1 --|arrive|[等待] ---|arrive|[等待] ---|arrive|...
T2 --|arrive| |arrive| |arrive|
T3 --|arrive| |arrive| |arrive|
| |
完成函数() 完成函数()
重置计数器 重置计数器
进入下一阶段 进入下一阶段
#include <barrier>
#include <thread>
#include <iostream>
#include <vector>
int main() {
const int N = 4;
int phase_count = 0;
std::vector<int> results(N, 0);
// 完成函数:每个阶段所有线程到达后执行
// 必须 noexcept!
auto on_completion = [&]() noexcept {
++phase_count;
std::cout << "=== 阶段 " << phase_count << " 完成 ===\n";
};
// barrier 创建时指定期望到达的线程数
std::barrier sync{N, on_completion};
std::vector<std::thread> workers;
for (int i = 0; i < N; ++i) {
workers.emplace_back([&, i] {
// ---- 第1阶段:各自计算 ----
results[i] = i * i;
std::cout << "线程" << i << " 第1阶段: " << results[i] << "\n";
// arrive_and_wait:到达屏障并等待所有线程
// 完成函数在所有线程 arrive 后、wait 返回前执行
sync.arrive_and_wait();
// ---- 第2阶段:使用所有结果 ----
int sum = 0;
for (int r : results) sum += r;
std::cout << "线程" << i << " 第2阶段,总和=" << sum << "\n";
sync.arrive_and_wait();
});
}
for (auto& t : workers) t.join();
return 0;
}
arrive() 与 arrive_and_wait() 的区别
arrive() 的用途(分离到达和等待):
线程A:
token = barrier.arrive(); // 立即返回,不阻塞
do_some_other_work(); // 可以在等待期间做其他事
barrier.wait(std::move(token)); // 然后再等待
arrive_and_wait():
等价于 wait(arrive())
一步完成:到达并立即等待
arrive_and_drop():
从后续所有阶段中退出
本阶段计数减1 + 后续阶段期望数减1
适合中途退出不再参与的线程
期货
future 提供了在一个线程中获取另一个线程的计算结果的机制。
共享状态生命周期
创建共享状态
|
v
+----------+ +----------+
| promise |-------->| future |
| 或 async | 共享状态 | 或 shared |
| 或 task | | _future |
+----------+ +----------+
| |
set_value(v) get() 阻塞等待,
或 set_exception(e) 直到 set_value/set_exception 被调用
| |
状态变为"就绪" 返回值或重新抛出异常
promise 与 future 基础
#include <future>
#include <thread>
#include <iostream>
#include <stdexcept>
int main() {
// promise:异步结果的"写端"
std::promise<int> prom;
// get_future():获取 future(只能调用一次)
std::future<int> fut = prom.get_future();
std::thread worker([p = std::move(prom)]() mutable {
try {
int result = 6 * 7; // 模拟计算
// set_value:将值存入共享状态,future 变为"就绪"
p.set_value(result);
} catch (...) {
// set_exception:将异常存入共享状态
p.set_exception(std::current_exception());
}
// 如果 promise 析构时共享状态还未就绪
// 会自动存入 broken_promise 异常
});
// get():阻塞等待 future 就绪,返回值或重新抛出异常
try {
int result = fut.get(); // 只能调用一次!
std::cout << "结果: " << result << "\n"; // 42
} catch (const std::exception& e) {
std::cout << "异常: " << e.what() << "\n";
}
worker.join();
return 0;
}
async:最简单的异步执行方式
#include <future>
#include <iostream>
#include <vector>
#include <numeric>
// 并行求和示例
long long sum_range(const std::vector<int>& v, size_t start, size_t end) {
long long s = 0;
for (size_t i = start; i < end; ++i) s += v[i];
return s;
}
int main() {
std::vector<int> data(1000000);
std::iota(data.begin(), data.end(), 1); // 填入 1, 2, ..., 1000000
size_t mid = data.size() / 2;
// launch::async:立即在新线程中开始执行
std::future<long long> f1 = std::async(
std::launch::async,
sum_range,
std::cref(data), 0, mid
);
// launch::deferred:延迟执行,直到 get()/wait() 被调用时才在调用线程执行
std::future<long long> f2 = std::async(
std::launch::deferred,
sum_range,
std::cref(data), mid, data.size()
);
// 默认策略(async | deferred):由实现自行选择
// 不保证并行!可能在调用线程中延迟执行
auto f3 = std::async([] { return 100LL; });
// get() 等待并取回结果
// f2 的 get() 调用触发延迟函数在当前线程执行
long long total = f1.get() + f2.get() + f3.get();
std::cout << "总和: " << total << "\n";
return 0;
}
shared_future:可多线程共享的 future
#include <future>
#include <thread>
#include <iostream>
#include <vector>
int main() {
std::promise<std::string> prom;
// future<T>:只能被一个线程 get()(get() 后 valid() 变 false)
// shared_future<T>:可以被多个线程共享,每个线程都可以 get()
std::shared_future<std::string> shared = prom.get_future().share();
std::vector<std::thread> readers;
for (int i = 0; i < 3; ++i) {
readers.emplace_back([shared, i] {
// 多个线程同时调用 get() 是安全的(但结果共享,需要只读访问)
const std::string& msg = shared.get();
std::cout << "读线程 " << i << " 收到: " << msg << "\n";
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
prom.set_value("广播消息:所有线程都能读到");
for (auto& t : readers) t.join();
return 0;
}
packaged_task:包装可调用对象
#include <future>
#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
// 简单任务队列(线程池的简化版)
class TaskQueue {
std::queue<std::packaged_task<int()>> tasks;
std::mutex mtx;
std::condition_variable cv;
bool stop_flag = false;
std::thread worker_thread;
public:
TaskQueue() {
worker_thread = std::thread([this] {
while (true) {
std::packaged_task<int()> task;
{
std::unique_lock lock(mtx);
cv.wait(lock, [this] {
return !tasks.empty() || stop_flag;
});
if (stop_flag && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
// 执行任务:task() 会自动将返回值存入共享状态
task();
}
});
}
// 提交一个可调用对象,返回 future 以获取结果
std::future<int> submit(std::function<int()> f) {
// packaged_task 将 f 包装成:执行 f 并将结果存入 promise
std::packaged_task<int()> task(std::move(f));
std::future<int> result = task.get_future();
{
std::lock_guard lock(mtx);
tasks.push(std::move(task));
}
cv.notify_one();
return result;
}
~TaskQueue() {
{
std::lock_guard lock(mtx);
stop_flag = true;
}
cv.notify_all();
worker_thread.join();
}
};
int main() {
TaskQueue tq;
auto f1 = tq.submit([] { return 1 + 1; });
auto f2 = tq.submit([] { return 10 * 5; });
auto f3 = tq.submit([] { return 100 / 4; });
std::cout << "1+1 = " << f1.get() << "\n"; // 2
std::cout << "10*5 = " << f2.get() << "\n"; // 50
std::cout << "100/4 = " << f3.get() << "\n"; // 25
return 0;
}
安全回收
RCU(读-复制-更新)
RCU 是专为频繁读、极少写场景优化的同步机制,读者完全无锁。
核心思想:
- 读者:进入"保护区域"后安全读取,无需加锁
- 写者:复制旧数据,修改副本,原子替换指针,然后"退休"旧数据
- 回收:等所有保护区域中正在读旧数据的读者退出后,才真正删除旧数据
RCU 工作时序:
读者1: [--保护区域--读取旧数据-----------退出--]
读者2: [--保护区域--读取旧数据--退出--]
写者: |复制+修改|替换指针|retire旧数据|
| |
等待现有读者 | |
全部退出区域 | |
+--删除旧数据--+
#include <rcu>
#include <atomic>
#include <thread>
#include <iostream>
#include <string>
#include <memory>
// 继承 rcu_obj_base<T> 使 T 支持 RCU 回收
struct Config : public std::rcu_obj_base<Config> {
std::string name;
int value;
Config(std::string n, int v) : name(std::move(n)), value(v) {}
};
// 全局配置指针(原子操作保证指针更新的可见性)
std::atomic<Config*> g_config{new Config("初始配置", 0)};
// 读者:高频调用,使用 RCU 保护区域
void reader(int id) {
for (int i = 0; i < 3; ++i) {
{
// 进入 RCU 保护区域(scoped_lock 析构时退出)
// 区域内的旧对象不会被 retire 机制删除
std::scoped_lock rcu_lock(std::rcu_default_domain());
Config* cfg = g_config.load(std::memory_order::acquire);
std::cout << "读者" << id << ": ["
<< cfg->name << ", " << cfg->value << "]\n";
} // 退出 RCU 保护区域
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
// 写者:低频调用,更新配置
void writer() {
for (int i = 1; i <= 2; ++i) {
Config* new_cfg = new Config("更新" + std::to_string(i), i * 10);
// 原子替换指针(所有之后的读者都会读到新配置)
Config* old_cfg = g_config.exchange(
new_cfg, std::memory_order::acq_rel);
// retire:将旧对象标记为"待回收"
// 等所有已进入保护区域的读者退出后,自动调用 delete old_cfg
old_cfg->retire();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main() {
std::thread w(writer);
std::thread r1(reader, 1);
std::thread r2(reader, 2);
w.join(); r1.join(); r2.join();
// 手动清理最后一个存活的 config(它没有被 retire)
delete g_config.load();
// rcu_barrier:等待所有已 retire 的对象完成回收
std::rcu_barrier();
return 0;
}
危险指针(Hazard Pointers)
危险指针是另一种无锁内存回收机制,适合细粒度的对象保护。
危险指针机制:
读者:
1. 声明"我正在访问对象 X"(设置危险指针指向 X)
2. 重新检查原子指针(验证 X 还没有被删除)
3. 安全使用 X
4. 清除危险指针
删除者:
1. 将对象从数据结构中移除(retire)
2. 扫描所有危险指针,若有指向该对象的则推迟删除
3. 当没有危险指针保护该对象时,真正删除
#include <hazard_pointer>
#include <atomic>
#include <thread>
#include <iostream>
#include <string>
// 继承 hazard_pointer_obj_base<T> 使 T 支持危险指针保护
struct Node : public std::hazard_pointer_obj_base<Node> {
std::string value;
explicit Node(std::string v) : value(std::move(v)) {}
};
std::atomic<Node*> g_node{new Node("初始值")};
void safe_reader(int id) {
// make_hazard_pointer:获取一个危险指针槽位
// 每个 hazard_pointer 对象独占一个槽位
std::hazard_pointer hp = std::make_hazard_pointer();
for (int i = 0; i < 3; ++i) {
// protect(src):循环执行:
// 1. 加载 src 的值为 ptr
// 2. 将危险指针设为 ptr
// 3. 重新加载 src,若值变了则重试(防止 TOCTOU)
// 返回受保护的指针
Node* node = hp.protect(g_node);
if (node) {
std::cout << "读者" << id << ": " << node->value << "\n";
}
// hp 清除保护(或 hp.reset_protection())前,node 不会被删除
std::this_thread::sleep_for(std::chrono::milliseconds(30));
}
// hp 析构 --> 危险指针槽位被释放
}
void updater() {
for (int i = 1; i <= 2; ++i) {
Node* new_node = new Node("新值" + std::to_string(i));
Node* old_node = g_node.exchange(new_node, std::memory_order::acq_rel);
// retire:当所有保护 old_node 的危险指针消失后,自动 delete
if (old_node) old_node->retire();
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
std::thread u(updater);
std::thread r1(safe_reader, 1);
std::thread r2(safe_reader, 2);
u.join(); r1.join(); r2.join();
// 清理最后一个节点
delete g_node.load();
return 0;
}
综合示例:线程池
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future>
#include <functional>
#include <vector>
#include <iostream>
#include <stdexcept>
#include <type_traits>
#include <memory>
// 基于 C++ 并发原语的通用线程池
class ThreadPool {
std::vector<std::thread> workers; // 工作线程
std::queue<std::function<void()>> task_queue; // 任务队列
std::mutex queue_mutex; // 保护任务队列
std::condition_variable condition; // 任务到来的通知
bool stopped = false; // 停止标志
public:
explicit ThreadPool(size_t thread_count) {
for (size_t i = 0; i < thread_count; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock lock(queue_mutex);
// 等待:有任务 或 停止标志被置位
condition.wait(lock, [this] {
return stopped || !task_queue.empty();
});
// 若停止且队列空,线程退出
if (stopped && task_queue.empty()) return;
// 取出一个任务
task = std::move(task_queue.front());
task_queue.pop();
}
// 在锁外执行任务,避免长时间持锁
task();
}
});
}
}
// 提交任务,返回 future 以获取结果
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>>
{
using ReturnType = std::invoke_result_t<F, Args...>;
// 用 packaged_task 包装,捕获所有参数
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
[func = std::forward<F>(f),
// 捕获参数副本
targs = std::tuple<std::decay_t<Args>...>(
std::forward<Args>(args)...)]() mutable {
return std::apply(func, std::move(targs));
}
);
std::future<ReturnType> result = task->get_future();
{
std::lock_guard lock(queue_mutex);
if (stopped) {
throw std::runtime_error("线程池已停止,不接受新任务");
}
// 将 packaged_task 包装进 function<void()>
task_queue.emplace([task] { (*task)(); });
}
condition.notify_one(); // 唤醒一个等待的工作线程
return result;
}
// 析构:等待所有任务完成
~ThreadPool() {
{
std::lock_guard lock(queue_mutex);
stopped = true;
}
condition.notify_all(); // 唤醒所有线程,让它们检查 stopped
for (auto& w : workers) {
if (w.joinable()) w.join();
}
}
};
int main() {
ThreadPool pool(4); // 4个工作线程
// 提交8个任务,每个任务计算平方
std::vector<std::future<int>> futures;
for (int i = 0; i < 8; ++i) {
futures.push_back(pool.submit([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return i * i;
}));
}
// 收集所有结果
int total = 0;
for (int i = 0; i < 8; ++i) {
int result = futures[i].get();
std::cout << i << "^2 = " << result << "\n";
total += result;
}
// 0+1+4+9+16+25+36+49 = 140
std::cout << "总和: " << total << "\n";
return 0;
}
关键知识点总结
同步原语选择指南
遇到并发问题,如何选择合适的原语?
需要保护共享数据?
是 --> 读多写少?
是 --> shared_mutex + shared_lock/unique_lock
否 --> mutex + lock_guard/unique_lock
否 --> 需要等待某个条件成立?
是 --> condition_variable(配合 unique_lock)
或 condition_variable_any(配合 stop_token 可中断)
否 --> 需要控制并发访问数量?
是 --> counting_semaphore
否 --> 需要多线程同步启动/完成?
一次性 --> latch
多阶段可重复 --> barrier
需要跨线程传递计算结果?
--> promise/future/async/packaged_task
需要请求停止另一线程?
--> stop_source/stop_token (jthread)
高频读极少写,需要无锁?
--> RCU (rcu_obj_base)
或 hazard_pointer
内存顺序使用速查
| 操作类型 | 可用内存顺序 | 推荐默认 |
|---|---|---|
| store(写) | relaxed, release, seq_cst | seq_cst |
| load(读) | relaxed, acquire, seq_cst | seq_cst |
| fetch_add/exchange 等 RMW | relaxed, acquire, release, acq_rel, seq_cst | seq_cst |
| store_add 等 modify-write(C++26) | relaxed, release, seq_cst | seq_cst |
常见错误与防范方法
- 数据竞争(Data Race)
- 问题:多线程同时读写同一非原子变量,且至少一个是写
- 防范:使用
mutex保护,或改用atomic<T>
- 死锁(Deadlock)
- 问题:线程A持有锁1等待锁2,线程B持有锁2等待锁1
- 防范:用
scoped_lock同时加多把锁;或固定加锁顺序
- 虚假唤醒(Spurious Wakeup)
- 问题:
condition_variable::wait()在没有notify的情况下返回 - 防范:总在循环中使用
wait,或使用带谓词的wait(lock, pred)
- 问题:
- thread 析构时 joinable
- 问题:
thread析构时若joinable()为 true,调用terminate() - 防范:使用
jthread;或确保每个thread都调用join()或detach()
- 问题:
- future 重复 get()
- 问题:
future::get()调用后valid()变 false,再次调用是未定义行为 - 防范:每个
future只调用一次get();多处共享用shared_future
- 问题:
- promise 析构时共享状态未就绪(broken_promise)
- 问题:
promise析构时若未调用set_value/set_exception,
关联的future::get()会抛出future_error(broken_promise) - 防范:确保所有代码路径(包括异常路径)都设置了结果
- 问题:
- stop_callback 生命周期超过 stop_source
- 问题:
inplace_stop_callback的生命周期必须嵌套在inplace_stop_source内 - 防范:使用带引用计数的
stop_source/stop_callback;
或严格控制对象析构顺序
- 问题:
C++26 执行控制库(Execution Control Library)完全解析
文档来源:N5032 标准草案 第33章
目录
- 总览:这个库是干什么的
- 核心概念:四大构件
- 可查询对象(Queryable)
- 异步操作的生命周期
- 调度器(Scheduler)
- 接收器(Receiver)
- 操作状态(Operation State)
- 发送器(Sender)
- 发送器工厂
- 发送器适配器
- 发送器消费者
- 完成签名(Completion Signatures)
- 协程工具
- 执行作用域
- 并行调度器
- 完整示例代码
1. 总览
C++26 的执行控制库(<execution>)是一套用来描述、组合、执行异步工作的框架。
想象一下餐厅的流程:
顾客下单(创建工作)
-> 厨师做菜(执行工作)
-> 服务员上菜(完成通知)
这个库做的事情类似:
发送器(Sender) 描述工作
-> 调度器(Scheduler) 决定在哪里执行
-> 接收器(Receiver) 处理结果
-> 操作状态(Operation State) 管理整个过程
四大组件关系图
三类定制点对象(表158)
| 类型 | 作用 | 举例 |
|---|---|---|
| 核心(core) | 连接各组件 | connect, start |
| 完成函数(completion functions) | 通知异步操作完成 | set_value, set_error, set_stopped |
| 查询(queries) | 读取对象属性 | get_allocator, get_scheduler |
2. 核心概念
2.1 movable-value 概念
// 这个概念检查一个类型是否可以被"移动存储"
// 用于确保发送器能安全地存储它接收到的参数
template<class T>
concept movable-value =
move_constructible<decay_t<T>> && // 衰减后的类型可以移动构造
constructible_from<decay_t<T>, T> && // 可以从 T 构造衰减类型
(!is_array_v<remove_reference_t<T>>); // 不是数组类型(数组不能直接移动)
2.2 MATCHING-SIG — 签名匹配
对于函数类型 F 1 = R 1 ( A r g s 1 . . . ) F_1 = R_1(Args_1...) F1=R1(Args1...) 和 F 2 = R 2 ( A r g s 2 . . . ) F_2 = R_2(Args_2...) F2=R2(Args2...):
MATCHING-SIG ( F 1 , F 2 ) = true ⟺ same_as ⟨ R 1 ( A r g s 1 & & . . . ) , R 2 ( A r g s 2 & & . . . ) ⟩ \text{MATCHING-SIG}(F_1, F_2) = \text{true} \iff \text{same\_as}\langle R_1(Args_1\&\&...), R_2(Args_2\&\&...)\rangle MATCHING-SIG(F1,F2)=true⟺same_as⟨R1(Args1&&...),R2(Args2&&...)⟩
简单说:把所有参数都加上 &&(右值引用),然后看两个函数类型是否完全相同。
3. 可查询对象
3.1 什么是 Queryable?
"可查询对象"就像一个键值对字典,用"查询对象"作为键,用对应的值作为值。
queryable 对象
├── key: get_allocator -> value: 某个分配器
├── key: get_scheduler -> value: 某个调度器
└── key: get_stop_token -> value: 某个停止令牌
// queryable 概念的定义极其简单:只要能被销毁就行
// 真正的约束通过语义要求来表达
template<class T>
concept queryable = destructible<T>;
3.2 查询的语法
对于查询对象 q、可查询对象 env、额外参数 args...:
- 调用方式:
q(env, args...)等价于env.query(q, args...) - 常量性:对
env的查询等价于对const env的查询(查询不修改对象) - 等价保持:相同输入总产生相同输出
3.3 常见查询对象
forwarding_query —— 询问某查询是否应该被转发
get_allocator —— 获取关联的分配器
get_stop_token —— 获取关联的停止令牌
get_scheduler —— 获取关联的调度器
get_domain —— 获取关联的执行域
get_env —— 获取对象的环境
4. 异步操作的生命周期
4.1 异步操作的定义
一个"异步操作"具有以下特征:
- 被显式创建
- 最多被显式启动一次
- 启动后,最终以恰好一种方式完成:
- 值完成(value completion / set_value):成功,可有任意数量结果
- 错误完成(error completion / set_error):失败,有单个错误结果
- 停止完成(stopped completion / set_stopped):取消,无结果
- 完成时可在不同执行资源上
- 可以创建子操作
4.2 生命周期图
[创建阶段]
Sender + Receiver --connect()--> Operation State(操作状态诞生)
[执行阶段]
Operation State --start()--> 异步操作开始执行
(生命周期开始)
[完成阶段]
异步操作 --set_value/set_error/set_stopped--> 完成操作执行
(生命周期结束)
重要规则:
- 操作状态的生命周期不能在异步操作完成前结束,否则是未定义行为
- 完成操作只能执行一次
4.3 父子操作关系
父操作
├── 子操作1(必须在父操作完成前完成)
├── 子操作2
└── 子操作3
5. 调度器
5.1 调度器是什么?
调度器是执行资源的抽象,提供统一的接口来把工作提交到某个执行上下文(线程池、GPU、当前线程等)。
// scheduler 概念要求:
template<class Sch>
concept scheduler =
// 1. 标记自己是调度器
derived_from<typename remove_cvref_t<Sch>::scheduler_concept, scheduler_t> &&
// 2. 是可查询的
queryable<Sch> &&
// 3. 支持 schedule() 调用,返回一个 sender
requires(Sch&& sch) {
{ schedule(std::forward<Sch>(sch)) } -> sender;
// 4. schedule 返回的 sender 的值完成调度器 == 自己
{ auto(get_completion_scheduler<set_value_t>(
get_env(schedule(std::forward<Sch>(sch))))) }
-> same_as<remove_cvref_t<Sch>>;
} &&
// 5. 可以判断是否相等(同一执行资源)
equality_comparable<remove_cvref_t<Sch>> &&
// 6. 可以复制
copyable<remove_cvref_t<Sch>>;
5.2 调度器语义规则
sch1 == sch2为true,当且仅当它们共享同一执行资源- 调度器的析构函数不能阻塞等待已连接的接收器完成
- 所有复制/移动/比较操作不能抛异常,不能引入数据竞争
5.3 调度器查询
get_forward_progress_guarantee(sch)
-> concurrent // 并发前进保证(最强)
-> parallel // 并行前进保证
-> weakly_parallel // 弱并行(默认)
6. 接收器
6.1 接收器的角色
接收器是异步操作的延续(continuation),它聚合了三种处理函数:
接收器 Receiver
├── set_value(rcvr, vals...) —— 值完成时调用
├── set_error(rcvr, err) —— 错误完成时调用
└── set_stopped(rcvr) —— 停止完成时调用
6.2 receiver 概念
template<class Rcvr>
concept receiver =
// 1. 标记自己是接收器
derived_from<typename remove_cvref_t<Rcvr>::receiver_concept, receiver_t> &&
// 2. get_env 返回可查询对象
requires(const remove_cvref_t<Rcvr>& rcvr) {
{ get_env(rcvr) } -> queryable;
} &&
// 3. 右值可移动
move_constructible<remove_cvref_t<Rcvr>> &&
// 4. 左值可复制构造
constructible_from<remove_cvref_t<Rcvr>, Rcvr> &&
// 5. 移动不抛异常(极其重要!完成函数必须 noexcept)
is_nothrow_move_constructible_v<remove_cvref_t<Rcvr>>;
6.3 三个完成函数的规则
所有完成函数都:
- 要求 rcvr 是右值(不能是左值或 const 右值)
- 必须 noexcept(用
MANDATE-NOTHROW宏强制)
// set_value: 值完成
// rcvr.set_value(vs...) 必须 noexcept
set_value(rcvr, vs...);
// set_error: 错误完成
// rcvr.set_error(err) 必须 noexcept
set_error(rcvr, err);
// set_stopped: 停止完成
// rcvr.set_stopped() 必须 noexcept
set_stopped(rcvr);
7. 操作状态
7.1 operation_state 概念
template<class O>
concept operation_state =
// 1. 标记自己是操作状态
derived_from<typename O::operation_state_concept, operation_state_t> &&
// 2. 支持 start(o) 调用(o 必须是左值)
requires (O& o) {
start(o); // 注意:o 是左值引用
};
7.2 重要规则
start(op)必须 noexceptstart接受左值,不接受右值(操作状态不能被移动后启动)- 对库提供的 sender 创建的操作状态,禁止复制或移动
- 操作状态在异步操作生命周期内不能被销毁
7.3 数据流图
connect(sender, receiver)
|
v
operation_state(拥有 receiver 的所有权)
|
v
start(op) <-- 异步操作生命周期开始
|
v
某个线程/执行代理上执行...
|
v
set_value/set_error/set_stopped <-- 异步操作生命周期结束
(此后 op 变为无效,访问是 UB)
8. 发送器
8.1 sender 概念
template<class Sndr>
concept sender =
// 1. 被标记为 sender(或者是可等待体)
enable_sender<remove_cvref_t<Sndr>> &&
// 2. get_env 返回可查询对象
requires (const remove_cvref_t<Sndr>& sndr) {
{ get_env(sndr) } -> queryable;
} &&
// 3. 可移动构造
move_constructible<remove_cvref_t<Sndr>> &&
// 4. 从自身构造(支持从左值构造)
constructible_from<remove_cvref_t<Sndr>, Sndr>;
8.2 sender_in 概念
// sender_in<Sndr, Env> 表示:
// 在环境类型 Env 中,发送器 Sndr 的完成签名是已知且有效的
template<class Sndr, class... Env>
concept sender_in =
sender<Sndr> &&
(sizeof...(Env) <= 1) && // 最多一个环境
(queryable<Env> && ...) && // 环境必须可查询
is-constant<get_completion_signatures<Sndr, Env...>()>; // 完成签名是编译期常量
8.3 非依赖发送器 vs 依赖发送器
非依赖发送器(non-dependent sender):
完成签名与执行环境无关,编译期就能确定
例:just(42) 永远完成为 set_value(int)
依赖发送器(dependent sender):
完成签名取决于执行环境
例:read_env(get_scheduler) 的结果类型取决于环境中的调度器类型
8.4 basic-sender 框架(最核心的内部机制)
库内部用 basic-sender 来实现所有标准算法,它是一个模板聚合类:
basic-sender<Tag, Data, Child0, Child1, ...>
├── get<0>() → Tag(标识算法种类,如 then_t)
├── get<1>() → Data(算法参数,如 then 的函数对象)
├── get<2>() → Child0(第一个子发送器)
└── get<3>() → Child1(第二个子发送器)
连接流程(内部实现):
basic-sender::connect(rcvr)
|
v
创建 basic-operation(继承自 basic-state)
|
├── basic-state
│ ├── rcvr(接收器的副本)
│ └── state(算法特定状态,由 impls-for::get-state 创建)
│
└── inner-ops(子发送器的操作状态数组)
通过 connect-all 把每个子发送器连接到对应的 basic-receiver
8.5 impls-for 定制机制
每个算法通过特化 impls-for 来定制行为:
impls-for<Tag>
├── get-attrs 定制发送器的属性(环境查询)
├── get-env 定制子接收器的环境
├── get-state 创建算法特定状态
├── start 启动逻辑
├── complete 完成逻辑
└── check-types 编译期类型检查
9. 发送器工厂
9.1 just / just_error / just_stopped
这三个是最简单的发送器工厂,同步完成:
// just(vals...) → 立即以值完成
auto s1 = just(42, 3.14f); // 完成为 set_value(42, 3.14f)
// just_error(err) → 立即以错误完成
auto s2 = just_error(std::make_exception_ptr(std::runtime_error("oops")));
// just_stopped() → 立即以停止完成
auto s3 = just_stopped();
内部实现核心(start 逻辑):
// impls-for<just_t>::start 等价于:
[](auto& state, auto& rcvr) noexcept -> void {
auto& [...ts] = state; // 解包存储的值
set_value(std::move(rcvr), std::move(ts)...); // 立即触发值完成
};
9.2 schedule
// 从调度器获取一个"调度发送器"
// 当启动时,把工作提交到调度器的执行资源
auto sndr = schedule(my_scheduler);
// 完成签名:set_value()(无参数值完成)
9.3 read_env
// 从接收器的环境中读取一个查询结果作为值完成
auto sndr = read_env(get_scheduler);
// 完成签名:set_value(<当前调度器类型>)
10. 发送器适配器
10.1 管道语法
所有适配器支持管道操作符 |:
// 函数调用形式
then(sndr, f)
// 管道形式(等价)
sndr | then(f)
// 链式管道
sndr | then(f) | upon_error(handle_err) | continues_on(sch)
10.2 then / upon_error / upon_stopped
附加回调到发送器的某种完成上:
// then: 对值完成的结果调用 f,f 的返回值作为新的值完成
auto s = just(42) | then([](int x) { return x * 2; });
// 最终完成为 set_value(84)
// upon_error: 对错误完成调用 f
auto s2 = just_error(err) | upon_error([](auto e) { return -1; });
// 最终完成为 set_value(-1)
// upon_stopped: 对停止完成调用 f
auto s3 = just_stopped() | upon_stopped([] { return 0; });
// 最终完成为 set_value(0)
内部 complete 逻辑:
// 如果完成标签匹配,调用 f 并把结果作为新的值完成
// 否则,直接转发原完成
[]<class Tag, class... Args>(auto, auto& fn, auto& rcvr, Tag, Args&&... args) noexcept -> void {
if constexpr (same_as<Tag, decayed-typeof<set-cpo>>) {
TRY-SET-VALUE(rcvr, invoke(std::move(fn), std::forward<Args>(args)...));
} else {
Tag()(std::move(rcvr), std::forward<Args>(args)...); // 转发
}
};
10.3 let_value / let_error / let_stopped
比 then 更强大:回调可以返回一个新的发送器,新发送器的完成作为整体完成:
// let_value: 值完成时,用结果调用 f,f 返回新发送器
auto s = just(42)
| let_value([](int x) {
return just(x, x * 2); // 返回新发送器
});
// 最终完成为 set_value(42, 84)
流程:
父发送器完成(值完成)
-> 调用 f(结果参数...)
-> f 返回子发送器
-> 连接并启动子发送器
-> 子发送器的完成作为整体完成
10.4 starts_on — 在指定调度器上启动
// 在 sch 的执行资源上启动 sndr
auto s = starts_on(sch, sndr);
变换过程(transform_sender):
// starts_on 最终变换为:
let_value(
schedule(sch), // 先跳到 sch 的执行资源
[sndr]() mutable {
return std::move(sndr); // 然后在那里执行 sndr
}
);
10.5 continues_on — 完成时转移到指定调度器
// sndr 在当前执行代理上运行,完成时转移到 sch
auto s = sndr | continues_on(sch);
内部变换为 schedule_from(sch, sndr):
sndr 执行
-> 完成信号发生
-> 把完成信号暂存
-> 在 sch 的执行资源上触发最终完成
10.6 on — 双向执行转移
on 有两种形式:
形式一:on(sch, sndr) — 在 sch 上运行,完成后回到原调度器
原调度器(env 中的 scheduler)
-> starts_on(sch, sndr) 在 sch 上运行
-> continues_on(原调度器) 回到原调度器
形式二:on(sndr, sch, closure) — sndr 完成后转到 sch,执行 closure,再回来
sndr 完成
-> 转移到 sch
-> closure(sndr的结果)
-> 回到原来的调度器
10.7 bulk / bulk_chunked / bulk_unchunked — 并行批量执行
// bulk: 对 0..shape-1 中每个 i 调用 f(i, vals...)
auto s = just(data) | bulk(std::execution::par, 100, [](int i, auto& data) {
data[i] = process(i);
});
// bulk_chunked: f(begin, end, vals...),实现可以把迭代分块
// bulk_unchunked: f(i, vals...),每次一个元素
三者关系:
bulk(sndr, policy, shape, f)
| transform_sender
v
bulk_chunked(sndr, policy, shape, new_f)
(new_f 循环调用原始 f)
10.8 when_all — 等待所有发送器完成
// 等待所有子发送器完成,把所有值结果拼接为一个值完成
auto s = when_all(
just(1),
just(2.0),
just("hello")
);
// 完成为 set_value(1, 2.0, "hello")
工作原理(内部状态):
state-type
├── count(原子计数器,初始 = 子发送器数量)
├── stop_src(内部停止源,任一失败则全部取消)
├── disp(disposition:started/error/stopped)
├── errors(错误存储)
└── values(各子发送器的值存储)
每个子发送器完成时:
-> 调用 arrive()
-> count--
-> 如果 count == 0,调用 complete()
-> 按 disp 触发最终完成
10.9 write_env — 注入环境属性
// 给 sndr 添加自定义环境属性
auto s = write_env(sndr, prop(get_allocator, my_alloc{}));
// 当 sndr 的子操作查询分配器时,得到 my_alloc{}
10.10 stopped_as_optional — 把停止变为 optional
// 停止完成 -> set_value(optional<T>{})(disengaged)
// 值完成(x) -> set_value(optional<T>{x})(engaged)
// 再也不会有 stopped 完成
auto s = some_sender | stopped_as_optional;
10.11 stopped_as_error — 把停止变为错误
// 停止完成 -> set_error(err)
auto s = some_sender | stopped_as_error(my_error{});
10.12 associate — 关联到执行作用域
// 把 sender 与一个 scope token 关联
// 作用域可以追踪所有关联发送器的生命周期
auto s = sndr | associate(scope.get_token());
11. 发送器消费者
11.1 sync_wait — 同步等待
// 阻塞当前线程,等待发送器完成
// 返回 optional<tuple<结果类型...>>
auto result = this_thread::sync_wait(some_sender);
if (result) {
auto [val] = *result; // 值完成
// 使用 val
}
// result 为空:stopped 完成
// 抛出异常:error 完成
内部实现:
// sync_wait 内部使用 run_loop 驱动事件循环
sync-wait-state<Sndr> state; // 包含 run_loop 和结果存储
auto op = connect(sndr, sync-wait-receiver<Sndr>{&state});
start(op); // 启动异步操作
state.loop.run(); // 当前线程运行事件循环,直到 loop.finish() 被调用
// 接收器的完成函数会调用 loop.finish()
if (state.error) rethrow_exception(state.error);
return state.result;
11.2 sync_wait_with_variant — 返回变体类型
// 对于有多种值完成签名的发送器
// 返回 optional<variant<tuple<...>, tuple<...>, ...>>
auto result = this_thread::sync_wait_with_variant(sndr);
11.3 spawn — 即发即忘
// 关联到作用域,立即启动,不关心结果
// 发送器必须只有 set_value() 和 set_stopped() 两种完成
execution::spawn(sndr, scope.get_token());
11.4 spawn_future — 即发,但可以等待结果
// 立即启动,返回一个可等待结果的 sender
auto future_sndr = execution::spawn_future(sndr, scope.get_token());
// 之后可以 co_await future_sndr 或 sync_wait(future_sndr)
12. 完成签名
12.1 什么是完成签名?
完成签名是描述异步操作如何完成的函数类型:
// 可能的完成签名:
set_value_t() // 空值完成
set_value_t(int, float) // 携带 int 和 float 的值完成
set_error_t(exception_ptr) // 携带 exception_ptr 的错误完成
set_stopped_t() // 停止完成
12.2 completion_signatures 类模板
// 封装一组完成签名
struct my_sender {
using sender_concept = execution::sender_t;
using completion_signatures = execution::completion_signatures<
set_value_t(), // 可以空值完成
set_value_t(int, float), // 可以带值完成
set_error_t(exception_ptr), // 可以带异常完成
set_stopped_t() // 可以停止完成
>;
};
12.3 gather-signatures — 提取特定标签的签名
对于发送器的完成签名集合 C o m p l e t i o n s Completions Completions 和标签 T a g Tag Tag:
gather-signatures ⟨ T a g , C o m p l e t i o n s , T u p l e , V a r i a n t ⟩ \text{gather-signatures}\langle Tag, Completions, Tuple, Variant\rangle gather-signatures⟨Tag,Completions,Tuple,Variant⟩
= 把所有 T a g ( A r g s . . . ) Tag(Args...) Tag(Args...) 形式的签名变为 T u p l e ⟨ A r g s . . . ⟩ Tuple\langle Args...\rangle Tuple⟨Args...⟩,再用 V a r i a n t Variant Variant 包装所有这些 Tuple
completion_signatures<
set_value_t(),
set_value_t(int, float),
set_error_t(exception_ptr)
>
gather-signatures<set_value_t, ..., decayed-tuple, variant>
= variant<
tuple<>, // 对应 set_value_t()
tuple<int, float> // 对应 set_value_t(int, float)
>
12.4 实用类型别名
// 提取所有值完成的类型(组成 variant of tuples)
value_types_of_t<Sndr, Env>
// 提取所有错误完成的类型(组成 variant)
error_types_of_t<Sndr, Env>
// 发送器是否有停止完成
sends_stopped<Sndr, Env> // bool 常量
13. 协程工具
13.1 as_awaitable — 让发送器可被 co_await
// 在协程中,可以 co_await 一个 sender
task<int> my_coroutine() {
int result = co_await just(42); // just(42) 被当作 awaitable
co_return result;
}
as_awaitable 的选择逻辑(优先级从高到低):
1. expr.as_awaitable(p) 如果存在
2. expr 本身就是 awaiter 如果是
3. sender + 有 await 完成适配器 如果满足
4. sender(awaitable-sender) 如果满足
5. 直接返回 expr 兜底
13.2 with_awaitable_senders — 让协程支持 sender
// 在协程 Promise 类型中继承此类,使 sender 自动可等待
struct my_promise : with_awaitable_senders<my_promise> {
// ...
};
功能:
- 提供
await_transform将 sender 转为 awaitable - 提供
unhandled_stopped默认实现(调用terminate(),除非设置了 continuation) - 支持通过
set_continuation传播停止
13.3 task<T, Environment> — 协程任务类型
// 最完整的协程 sender 包装
task<int> compute() {
int x = co_await just(21);
co_return x * 2; // 完成为 set_value(42)
}
// 带自定义环境
task<void, my_env> custom_task() {
co_await schedule(get_current_scheduler());
}
task 的完成签名:
c o m p l e t i o n _ s i g n a t u r e s = { s e t _ v a l u e _ t ( ) if T = v o i d s e t _ v a l u e _ t ( T ) otherwise ∪ e r r o r _ t y p e s ∪ { s e t _ s t o p p e d _ t ( ) } completion\_signatures = \begin{cases} set\_value\_t() & \text{if } T = void \\ set\_value\_t(T) & \text{otherwise} \end{cases} \cup \; error\_types \cup \{set\_stopped\_t()\} completion_signatures={set_value_t()set_value_t(T)if T=voidotherwise∪error_types∪{set_stopped_t()}
13.4 task 内部架构
task<T, Env>
|
+-- promise_type(协程承诺类型)
| ├── alloc(分配器)
| ├── source(停止源)
| ├── token(停止令牌)
| ├── result(T 类型的结果)
| └── errors(错误变体)
|
+-- state<Rcvr>(操作状态)
├── handle(协程句柄)
├── rcvr(外部接收器)
├── own-env(自有环境)
└── environment(环境对象)
关键机制:
co_await sender时,自动调用affine_on(sender, SCHED(*this)),确保在同一个调度器上恢复yield_value(with_error<E>{...})用于发出错误完成change_coroutine_scheduler{sch}用于切换协程的关联调度器
14. 执行作用域
14.1 作用域的作用
执行作用域(Scope)用来追踪异步操作的生命周期,确保在析构前所有关联的操作都已完成。
类似于有界线程池或 structured concurrency(结构化并发)。
14.2 scope_token 概念
// scope_token 是创建关联的句柄
template<class Token>
concept scope_token =
copyable<Token> &&
requires(const Token token) {
// try_associate(): 尝试获取关联(失败时返回未 engaged 的 Assoc)
{ token.try_associate() } -> scope_association;
// wrap(sndr): 包装发送器(可能添加停止令牌传播等)
{ token.wrap(declval<test-sender>()) } -> sender_in<test-env>;
};
14.3 作用域状态机
14.4 simple_counting_scope
最简单的作用域,只追踪关联数量,不支持取消:
execution::simple_counting_scope scope;
auto token = scope.get_token();
// 关联发送器
auto s = some_sender | associate(token);
// 关闭作用域(不再允许新关联)
scope.close();
// 等待所有已关联操作完成
auto join_sndr = scope.join();
sync_wait(starts_on(sch, join_sndr));
14.5 counting_scope
在 simple_counting_scope 基础上增加取消支持:
execution::counting_scope scope;
auto token = scope.get_token();
// token.wrap(sndr) 会把作用域的停止令牌注入到 sndr
// 当 scope.request_stop() 时,所有关联操作收到停止请求
scope.request_stop(); // 请求所有关联操作停止
token.wrap(sndr) 等价于:
stop-when(sndr, scope.s_source.get_token())
15. 并行调度器
15.1 parallel_scheduler
系统级的并行执行资源调度器:
auto sch = execution::get_parallel_scheduler();
// sch.get_forward_progress_guarantee() == forward_progress_guarantee::parallel
15.2 可替换后端
通过 system_context_replaceability 命名空间,用户可以替换默认实现:
parallel_scheduler
|
v(通过 query_parallel_scheduler_backend() 获取)
parallel_scheduler_backend(可替换!)
├── schedule(receiver_proxy&, span<byte>)
├── schedule_bulk_chunked(size_t n, bulk_item_receiver_proxy&, span<byte>)
└── schedule_bulk_unchunked(size_t n, bulk_item_receiver_proxy&, span<byte>)
15.3 receiver_proxy — 接收器代理
后端通过 receiver_proxy 与前端通信,无需知道具体接收器类型:
struct receiver_proxy {
virtual void set_value() noexcept = 0;
virtual void set_error(exception_ptr) noexcept = 0;
virtual void set_stopped() noexcept = 0;
// 查询接收器环境中的某个属性
template<class P, class Query>
optional<P> try_query(Query q) noexcept;
};
16. 完整示例代码
16.1 基础用法:just + then + sync_wait
#include <execution>
#include <print>
#include <optional>
#include <tuple>
int main() {
using namespace std::execution;
// 创建一个发送器链:just(10) -> then(乘2) -> then(加1)
auto sndr = just(10)
| then([](int x) { return x * 2; })
| then([](int x) { return x + 1; });
// 同步等待结果
auto result = std::this_thread::sync_wait(std::move(sndr));
if (result) {
auto [val] = *result;
std::println("结果: {}", val); // 输出: 结果: 21
}
return 0;
}
https://godbolt.org/z/Yssf1ezbv
16.2 错误处理示例
#include <execution>
#include <print>
#include <exception>
#include <stdexcept>
int main() {
using namespace std::execution;
// 错误处理链
auto sndr = just_error(std::make_exception_ptr(
std::runtime_error("出错了")))
| upon_error([](std::exception_ptr ep) -> int {
try {
std::rethrow_exception(ep);
} catch (const std::exception& e) {
std::println("捕获错误: {}", e.what());
return -1; // 把错误转换为值完成
}
});
auto result = std::this_thread::sync_wait(std::move(sndr));
if (result) {
auto [val] = *result;
std::println("恢复后的值: {}", val); // 输出: 恢复后的值: -1
}
return 0;
}
https://godbolt.org/z/6jGq9bGW7
16.3 run_loop 手动事件循环示例
#include <execution>
#include <thread>
#include <print>
int main() {
using namespace std::execution;
run_loop loop;
// 在另一个线程上驱动事件循环
std::thread loop_thread([&loop] {
loop.run(); // 阻塞直到 loop.finish() 被调用
});
auto sch = loop.get_scheduler();
// 在 loop 的执行资源上调度工作
auto sndr = schedule(sch)
| then([] {
std::println("在 loop 线程上执行!");
return 42;
});
// 同步等待(会委托给 loop 的调度器)
auto result = std::this_thread::sync_wait(std::move(sndr));
loop.finish(); // 通知事件循环结束
loop_thread.join();
if (result) {
auto [val] = *result;
std::println("结果: {}", val);
}
return 0;
}
https://godbolt.org/z/haM9erM3G
16.4 when_all 并行等待示例
#include <execution>
#include <print>
int main() {
using namespace std::execution;
run_loop loop;
auto sch = loop.get_scheduler();
std::thread t([&] { loop.run(); });
// 并行执行两个计算,等待都完成
auto sndr = when_all(
just(1) | then([](int x) { return x * 10; }),
just(2) | then([](int x) { return x * 20; })
);
auto result = std::this_thread::sync_wait(std::move(sndr));
loop.finish();
t.join();
if (result) {
auto [a, b] = *result;
std::println("结果: {} 和 {}", a, b); // 10 和 40
}
return 0;
}
https://godbolt.org/z/Gq7crn56c
16.5 自定义发送器示例
#include <execution>
#include <print>
// 自定义发送器:立即完成,值为固定常量
struct constant_sender {
using sender_concept = std::execution::sender_t;
// 声明完成签名
using completion_signatures =
std::execution::completion_signatures<
std::execution::set_value_t(int)
>;
int value;
// connect 返回操作状态
template<std::execution::receiver_of<completion_signatures> Rcvr>
struct operation_state {
using operation_state_concept = std::execution::operation_state_t;
Rcvr rcvr;
int value;
// start: 立即触发值完成
void start() & noexcept {
std::execution::set_value(std::move(rcvr), value);
}
};
template<std::execution::receiver_of<completion_signatures> Rcvr>
operation_state<std::remove_cvref_t<Rcvr>>
connect(Rcvr&& rcvr) && {
return {std::forward<Rcvr>(rcvr), value};
}
};
int main() {
using namespace std::execution;
constant_sender s{42};
auto result = std::this_thread::sync_wait(std::move(s));
if (result) {
auto [val] = *result;
std::println("常量发送器的值: {}", val); // 42
}
return 0;
}
https://godbolt.org/z/drnEqMbWx
总结:各组件职责一览
+------------------+--------------------------------+---------------------------+
| 组件 | 职责 | 关键操作 |
+------------------+--------------------------------+---------------------------+
| Sender | 描述异步工作的"食谱" | connect(), get_env() |
| Receiver | 处理异步工作的"结果" | set_value/error/stopped() |
| Operation State | 管理执行中的异步操作 | start() |
| Scheduler | 决定在哪里执行 | schedule() |
| Environment | 存储执行上下文的属性 | query() |
| Scope | 追踪异步操作生命周期 | get_token(), join() |
+------------------+--------------------------------+---------------------------+
数据流总结
Scheduler → schedule Sender → connect OperationState → start 执行 → set_* Receiver \text{Scheduler} \xrightarrow{\text{schedule}} \text{Sender} \xrightarrow{\text{connect}} \text{OperationState} \xrightarrow{\text{start}} \text{执行} \xrightarrow{\text{set\_*}} \text{Receiver} SchedulerscheduleSenderconnectOperationStatestart执行set_*Receiver
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)