【Go 1.26.4】(Part 7) Go 1.26.4 超深度分析 — sync 同步原语 (Mutex/WaitGroup/Pool/Once/Map/atomic)
·
Go 1.26.4 超深度分析 — sync 同步原语 (Mutex/WaitGroup/Pool/Once/Map/atomic)
核心源码:
internal/sync/mutex.go(234行) +sync/waitgroup.go(260行) +sync/pool.go(318行) +sync/once.go+sync/map.go+sync/rwmutex.go+sync/cond.go
一、Mutex — 互斥锁
1.1 模块定位
Mutex 是 Go 并发编程最基础的同步原语。Go 1.26 的 Mutex 实现双模式调度:
- 正常模式:新来的 G 与唤醒的 G 竞争,新 G 有优势(已在CPU上)
- 饥饿模式:等待超过 1ms 的 G 直接获得锁,新 G 排队
1.2 Mutex 结构
type Mutex struct {
state int32 // 锁状态位图
sema uint32 // 信号量(用于 park/unpark)
}
const (
mutexLocked = 1 << iota // bit0: 锁是否被持有
mutexWoken // bit1: 有G被唤醒
mutexStarving // bit2: 饥饿模式
mutexWaiterShift = iota // bit3+: 等待者数量
starvationThresholdNs = 1e6 // 饥饿阈值: 1ms
)
1.3 Lock — 加锁流程
func (m *Mutex) Lock() {
// 快速路径: CAS 获取未锁定的 mutex
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 慢速路径
m.lockSlow()
}
lockSlow 完整流程
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// ─── 自旋判断 ───
// 饥饿模式不自旋(锁会直接转交)
// 只有 locked + 非饥饿 + 可以自旋时才自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 设置 Woken 标志,通知 Unlock 不用唤醒其他G
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // CPU 空转 (procyield)
iter++
old = m.state
continue
}
// ─── 尝试获取锁 ───
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 正常模式: 尝试加锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 等待者+1
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 切换到饥饿模式
}
if awoke {
new &^= mutexWoken // 清除 Woken 标志
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// CAS 成功
if old&(mutexLocked|mutexStarving) == 0 {
break // ★ 获得锁!
}
// 排队等待
queueLifo := waitStartTime != 0 // 非首次等待 → 排队头
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
// 被唤醒后检查是否饥饿
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 饥饿模式: 锁已直接转交给我
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 退出饥饿模式
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
Mutex 加锁状态机
1.4 Unlock — 解锁流程
func (m *Mutex) Unlock() {
// 快速路径: 清除 Locked 位
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new) // 有等待者
}
}
func (m *Mutex) unlockSlow(new int32) {
if new&mutexStarving == 0 {
// 正常模式: 唤醒一个等待者
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return // 无等待者/已被其他人处理
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 2) // 唤醒一个G
return
}
old = m.state
}
} else {
// 饥饿模式: 直接转交锁给下一个等待者
runtime_Semrelease(&m.sema, true, 2) // handoff=true
}
}
二、WaitGroup — 等待组
2.1 结构
type WaitGroup struct {
noCopy noCopy
// state 位图 (64位):
// bits[0:32] counter (Add/Done 计数)
// bits[32] flag: synctest bubble 成员标志
// bits[33:64] wait count (Wait 等待者数量)
state atomic.Uint64
sema uint32
}
2.2 Add/Done/Wait
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
c := int32(state >> 32) // counter
w := uint32(state) // wait count (低32位)
if c < 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if w != 0 && delta == 1 && c > 0 {
// 有等待者 + Add(1) → panic (Wait期间不应Add)
// 但 Add > 1 是允许的 (并发Add)
}
if c == 0 && w != 0 {
// counter 归零 → 唤醒所有等待者
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1) // counter -1
}
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
c := int32(state >> 32)
if c == 0 {
return // counter 已为零
}
// CAS 增加 wait count
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema) // 阻塞等待
return
}
}
}
WaitGroup 流程
三、sync.Once — 单次执行
3.1 结构
type Once struct {
_ noCopy
done atomic.Bool // 是否已执行(放第一字段:热路径优化)
m Mutex
}
3.2 Do 实现
func (o *Once) Do(f func()) {
if o.done.Load() {
return // 快速路径: 已执行过
}
o.doSlow(f)
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() {
return // Double-check: 获取锁后再检查
}
o.done.Store(true) // 标记已完成(在f执行前!)
f() // 执行f
}
关键设计:done 在 f() 执行前就设为 true。这保证了:
- 后续 Do 调用不会阻塞等待
- 如果 f panic,done 也是 true,不会重复执行
四、sync.Pool — 临时对象池
4.1 结构
type Pool struct {
noCopy noCopy
local unsafe.Pointer // per-P 池 [P]poolLocal
localSize uintptr // local 数组大小
victim unsafe.Pointer // 上一轮的 local (GC survivor)
victimSize uintptr
New func() any // 创建新对象的函数
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // 防止伪共享
}
type poolLocalInternal struct {
private any // 每P私有对象(无需竞争)
shared poolChain // 双向链表(可被其他P偷)
}
4.2 Get — 获取对象
func (p *Pool) Get() any {
l := p.pin() // 绑定到当前P, 禁止抢占
// 1. 先检查 private
x := l.private
l.private = nil
if x == nil {
// 2. 从 shared 链表头取
x, _ = l.shared.popHead()
if x == nil {
// 3. 从 victim 取
x = p.getSlow(pid)
}
}
p.unpin()
if x == nil && p.New != nil {
x = p.New() // 4. 创建新对象
}
return x
}
4.3 Put — 放回对象
func (p *Pool) Put(x any) {
l := p.pin()
if l.private == nil {
l.private = x // 优先放 private
} else {
l.shared.pushHead(x) // 放 shared 链表头
}
p.unpin()
}
Pool GC 机制
设计:Pool 的对象在 GC 时可能被回收(victim 机制),不适合做持久缓存。
五、sync.Map — 并发安全 Map
5.1 结构 (Go 1.26 使用 HashTrieMap)
type Map struct {
_ noCopy
m isync.HashTrieMap[any, any] // 内部委托给 HashTrieMap
}
Go 1.26 的 sync.Map 内部使用 HashTrieMap(新增的锁-free 哈希树实现),替代了旧版的 read+dirty 双map设计。
HashTrieMap 核心结构
type HashTrieMap[K comparable, V any] struct {
root atomic.Pointer[node] // 根节点(原子指针)
sig note // 用于等待的信号
init sync.Once // 延迟初始化
}
哈希树:使用前缀树结构,按哈希值的位分组,每个节点是一个桶数组,支持无锁读取。
六、RWMutex — 读写锁
6.1 结构
type RWMutex struct {
w Mutex // 写锁 (也防止读者饿死写者)
writerSem uint32 // 写者等待信号量
readerSem uint32 // 读者等待信号量
readerCount atomic.Int32 // 读者计数 (正=读者数, 负=有写者)
readerWait atomic.Int32 // 写者等待的读者数
}
RWMutex 工作原理
七、Cond — 条件变量
type Cond struct {
noCopy noCopy
L sync.Locker // 关联的 Locker
notify notifyList // 等待者列表
checker copyChecker // 复制检查
}
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) // 获取 ticket
c.L.Unlock() // 释放锁
runtime_notifyListWait(&c.notify, t) // 等待通知
c.L.Lock() // 重新获取锁
}
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify) // 唤醒一个
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify) // 唤醒所有
}
八、sync/atomic — 原子操作
8.1 atomic.Value — 原子值
type Value struct {
v any
}
func (v *Value) Load() any {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || typ == unsafe.Pointer((*any)(nil)) {
return nil
}
return LoadPointer(&vp.data)
}
func (v *Value) Store(val any) {
vp := (*ifaceWords)(unsafe.Pointer(v))
np := (*ifaceWords)(unsafe.Pointer(&val))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// 首次 Store: 原子设置类型和数据
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(np.typ)) {
runtime_procUnpin()
continue
}
StorePointer(&vp.data, np.data)
runtime_procUnpin()
return
}
// 类型检查: 必须与首次一致
if typ != unsafe.Pointer(np.typ) {
panic("sync/atomic: store of inconsistently typed value")
}
StorePointer(&vp.data, np.data)
return
}
}
九、设计模式总结
| # | 模式 | 体现 |
|---|---|---|
| 1 | 双模式锁 | Mutex 正常/饥饿模式 |
| 2 | 自旋+让步 | lockSlow 自旋后 Semacquire |
| 3 | 快速路径+慢速路径 | Lock CAS / lockSlow; Unlock Add / unlockSlow |
| 4 | 位图状态 | Mutex state 3位标志+waiter计数 |
| 5 | Handoff | 饥饿模式直接转交锁 |
| 6 | 原子计数 | WaitGroup 64位 state |
| 7 | Double-Check | Once: atomic.Bool + Mutex 内再检查 |
| 8 | Per-P 缓存 | Pool.local per-P 无锁 |
| 9 | Victim 机制 | Pool GC 时 local→victim→清除 |
| 10 | 读者优先→写者优先 | RWMutex: 新读者等写者完成 |
| 11 | Ticket 通知 | Cond: notifyList ticket 系统 |
| 12 | Lock-Free 读 | HashTrieMap 原子指针读 |
| 13 | 类型一致性 | atomic.Value 首次 Store 固定类型 |
| 14 | procPin/procUnpin | Pool 绑定P禁止抢占 |
| 15 | 伪共享防护 | poolLocal 128字节 padding |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)