Go 1.26.4 超深度分析 — sync 同步原语 (Mutex/WaitGroup/Pool/Once/Map/atomic)

核心源码:internal/sync/mutex.go (234行) + sync/waitgroup.go (260行) + sync/pool.go (318行) + sync/once.go + sync/map.go + sync/rwmutex.go + sync/cond.go


一、Mutex — 互斥锁

1.1 模块定位

Mutex 是 Go 并发编程最基础的同步原语。Go 1.26 的 Mutex 实现双模式调度:

  • 正常模式:新来的 G 与唤醒的 G 竞争,新 G 有优势(已在CPU上)
  • 饥饿模式:等待超过 1ms 的 G 直接获得锁,新 G 排队

1.2 Mutex 结构

type Mutex struct {
    state int32  // 锁状态位图
    sema  uint32 // 信号量(用于 park/unpark)
}

const (
    mutexLocked      = 1 << iota  // bit0: 锁是否被持有
    mutexWoken                    // bit1: 有G被唤醒
    mutexStarving                 // bit2: 饥饿模式
    mutexWaiterShift = iota       // bit3+: 等待者数量
    starvationThresholdNs = 1e6   // 饥饿阈值: 1ms
)

Mutex

-state: int32

-sema: uint32

+Lock()

+TryLock() : bool

+Unlock()

state 位图:\nbit0: Locked (1=被锁)\nbit1: Woken (1=有唤醒)\nbit2: Starving (1=饥饿)\nbit3+: Waiter count

1.3 Lock — 加锁流程

func (m *Mutex) Lock() {
    // 快速路径: CAS 获取未锁定的 mutex
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // 慢速路径
    m.lockSlow()
}

lockSlow 完整流程

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state

    for {
        // ─── 自旋判断 ───
        // 饥饿模式不自旋(锁会直接转交)
        // 只有 locked + 非饥饿 + 可以自旋时才自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 设置 Woken 标志,通知 Unlock 不用唤醒其他G
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()  // CPU 空转 (procyield)
            iter++
            old = m.state
            continue
        }

        // ─── 尝试获取锁 ───
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked  // 正常模式: 尝试加锁
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift  // 等待者+1
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving  // 切换到饥饿模式
        }
        if awoke {
            new &^= mutexWoken  // 清除 Woken 标志
        }

        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // CAS 成功
            if old&(mutexLocked|mutexStarving) == 0 {
                break  // ★ 获得锁!
            }
            // 排队等待
            queueLifo := waitStartTime != 0  // 非首次等待 → 排队头
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 2)

            // 被唤醒后检查是否饥饿
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state

            if old&mutexStarving != 0 {
                // 饥饿模式: 锁已直接转交给我
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving  // 退出饥饿模式
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

Mutex 加锁状态机

初始状态

CAS(0→mutexLocked) 快速路径

CAS失败 + canSpin

自旋次数超限

Semrelease唤醒

CAS成功(正常模式)

等待>1ms(饥饿模式)

Unlock()

Unlock()→直接转交(handoff)

queueLifo=true(排队头)

Unlocked

Locked

Spinning

Waiting

Woken

Locked_Normal

Locked_Starving

1.4 Unlock — 解锁流程

func (m *Mutex) Unlock() {
    // 快速路径: 清除 Locked 位
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)  // 有等待者
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if new&mutexStarving == 0 {
        // 正常模式: 唤醒一个等待者
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return  // 无等待者/已被其他人处理
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 2)  // 唤醒一个G
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式: 直接转交锁给下一个等待者
        runtime_Semrelease(&m.sema, true, 2)  // handoff=true
    }
}

二、WaitGroup — 等待组

2.1 结构

type WaitGroup struct {
    noCopy noCopy
    // state 位图 (64位):
    //   bits[0:32]  counter (Add/Done 计数)
    //   bits[32]    flag: synctest bubble 成员标志
    //   bits[33:64] wait count (Wait 等待者数量)
    state atomic.Uint64
    sema  uint32
}

2.2 Add/Done/Wait

func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    c := int32(state >> 32)     // counter
    w := uint32(state)          // wait count (低32位)

    if c < 0 {
        panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    if w != 0 && delta == 1 && c > 0 {
        // 有等待者 + Add(1) → panic (Wait期间不应Add)
        // 但 Add > 1 是允许的 (并发Add)
    }
    if c == 0 && w != 0 {
        // counter 归零 → 唤醒所有等待者
        wg.state.Store(0)
        for ; w != 0; w-- {
            runtime_Semrelease(&wg.sema, false, 0)
        }
    }
}

func (wg *WaitGroup) Done() {
    wg.Add(-1)  // counter -1
}

func (wg *WaitGroup) Wait() {
    for {
        state := wg.state.Load()
        c := int32(state >> 32)
        if c == 0 {
            return  // counter 已为零
        }
        // CAS 增加 wait count
        if wg.state.CompareAndSwap(state, state+1) {
            runtime_Semacquire(&wg.sema)  // 阻塞等待
            return
        }
    }
}

WaitGroup 流程

Goroutine 2 Goroutine 1 WaitGroup Main Goroutine Goroutine 2 Goroutine 1 WaitGroup Main Goroutine counter==0 → 唤醒所有waiter Add(2) → counter=2 go task1() go task2() Wait() → wait++ → Semacquire (阻塞) Done() → Add(-1) → counter=1 Done() → Add(-1) → counter=0 Semrelease → Wait()返回

三、sync.Once — 单次执行

3.1 结构

type Once struct {
    _    noCopy
    done atomic.Bool  // 是否已执行(放第一字段:热路径优化)
    m    Mutex
}

3.2 Do 实现

func (o *Once) Do(f func()) {
    if o.done.Load() {
        return  // 快速路径: 已执行过
    }
    o.doSlow(f)
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done.Load() {
        return  // Double-check: 获取锁后再检查
    }
    o.done.Store(true)  // 标记已完成(在f执行前!)
    f()                  // 执行f
}

关键设计donef() 执行前就设为 true。这保证了:

  • 后续 Do 调用不会阻塞等待
  • 如果 f panic,done 也是 true,不会重复执行

四、sync.Pool — 临时对象池

4.1 结构

type Pool struct {
    noCopy noCopy
    local     unsafe.Pointer  // per-P 池 [P]poolLocal
    localSize uintptr         // local 数组大小
    victim     unsafe.Pointer // 上一轮的 local (GC survivor)
    victimSize uintptr
    New        func() any     // 创建新对象的函数
}

type poolLocal struct {
    poolLocalInternal
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte  // 防止伪共享
}

type poolLocalInternal struct {
    private any       // 每P私有对象(无需竞争)
    shared  poolChain // 双向链表(可被其他P偷)
}

4.2 Get — 获取对象

func (p *Pool) Get() any {
    l := p.pin()  // 绑定到当前P, 禁止抢占
    // 1. 先检查 private
    x := l.private
    l.private = nil
    if x == nil {
        // 2. 从 shared 链表头取
        x, _ = l.shared.popHead()
        if x == nil {
            // 3. 从 victim 取
            x = p.getSlow(pid)
        }
    }
    p.unpin()
    if x == nil && p.New != nil {
        x = p.New()  // 4. 创建新对象
    }
    return x
}

4.3 Put — 放回对象

func (p *Pool) Put(x any) {
    l := p.pin()
    if l.private == nil {
        l.private = x  // 优先放 private
    } else {
        l.shared.pushHead(x)  // 放 shared 链表头
    }
    p.unpin()
}

Pool GC 机制

GC 周期 N+1

GC 周期 N

GC触发

GC清除

local (当前周期)

victim (上周期)

local (新周期)

victim (N周期的local)

对象被回收

设计:Pool 的对象在 GC 时可能被回收(victim 机制),不适合做持久缓存。


五、sync.Map — 并发安全 Map

5.1 结构 (Go 1.26 使用 HashTrieMap)

type Map struct {
    _ noCopy
    m isync.HashTrieMap[any, any]  // 内部委托给 HashTrieMap
}

Go 1.26 的 sync.Map 内部使用 HashTrieMap(新增的锁-free 哈希树实现),替代了旧版的 read+dirty 双map设计。

HashTrieMap 核心结构

type HashTrieMap[K comparable, V any] struct {
    root   atomic.Pointer[node]  // 根节点(原子指针)
    sig    note                  // 用于等待的信号
    init   sync.Once             // 延迟初始化
}

哈希树:使用前缀树结构,按哈希值的位分组,每个节点是一个桶数组,支持无锁读取。


六、RWMutex — 读写锁

6.1 结构

type RWMutex struct {
    w           Mutex       // 写锁 (也防止读者饿死写者)
    writerSem   uint32      // 写者等待信号量
    readerSem   uint32      // 读者等待信号量
    readerCount atomic.Int32  // 读者计数 (正=读者数, 负=有写者)
    readerWait  atomic.Int32  // 写者等待的读者数
}

RWMutex 工作原理

RWMutex Reader 2 Reader 1 Writer RWMutex Reader 2 Reader 1 Writer 2个读者同时持有 readerCount = 2-maxReaders (负数=有写者等待) 等待所有读者退出 readerWait-- (=1) readerWait==0 → 唤醒写者 readerCount += maxReaders 唤醒等待的读者 RLock() → readerCount++ (=1) RLock() → readerCount++ (=2) Lock() → 获取 w (Mutex) readerCount -= rwmutexMaxReaders readerWait = readerCount + maxReaders (=2) RUnlock() → readerCount-- RUnlock() → readerCount-- writerSem → 写者获得锁 Unlock() readerSem

七、Cond — 条件变量

type Cond struct {
    noCopy noCopy
    L       sync.Locker   // 关联的 Locker
    notify  notifyList    // 等待者列表
    checker copyChecker   // 复制检查
}

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)  // 获取 ticket
    c.L.Unlock()                            // 释放锁
    runtime_notifyListWait(&c.notify, t)    // 等待通知
    c.L.Lock()                              // 重新获取锁
}

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)  // 唤醒一个
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)  // 唤醒所有
}

八、sync/atomic — 原子操作

8.1 atomic.Value — 原子值

type Value struct {
    v any
}

func (v *Value) Load() any {
    vp := (*ifaceWords)(unsafe.Pointer(v))
    typ := LoadPointer(&vp.typ)
    if typ == nil || typ == unsafe.Pointer((*any)(nil)) {
        return nil
    }
    return LoadPointer(&vp.data)
}

func (v *Value) Store(val any) {
    vp := (*ifaceWords)(unsafe.Pointer(v))
    np := (*ifaceWords)(unsafe.Pointer(&val))
    for {
        typ := LoadPointer(&vp.typ)
        if typ == nil {
            // 首次 Store: 原子设置类型和数据
            runtime_procPin()
            if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(np.typ)) {
                runtime_procUnpin()
                continue
            }
            StorePointer(&vp.data, np.data)
            runtime_procUnpin()
            return
        }
        // 类型检查: 必须与首次一致
        if typ != unsafe.Pointer(np.typ) {
            panic("sync/atomic: store of inconsistently typed value")
        }
        StorePointer(&vp.data, np.data)
        return
    }
}

九、设计模式总结

# 模式 体现
1 双模式锁 Mutex 正常/饥饿模式
2 自旋+让步 lockSlow 自旋后 Semacquire
3 快速路径+慢速路径 Lock CAS / lockSlow; Unlock Add / unlockSlow
4 位图状态 Mutex state 3位标志+waiter计数
5 Handoff 饥饿模式直接转交锁
6 原子计数 WaitGroup 64位 state
7 Double-Check Once: atomic.Bool + Mutex 内再检查
8 Per-P 缓存 Pool.local per-P 无锁
9 Victim 机制 Pool GC 时 local→victim→清除
10 读者优先→写者优先 RWMutex: 新读者等写者完成
11 Ticket 通知 Cond: notifyList ticket 系统
12 Lock-Free 读 HashTrieMap 原子指针读
13 类型一致性 atomic.Value 首次 Store 固定类型
14 procPin/procUnpin Pool 绑定P禁止抢占
15 伪共享防护 poolLocal 128字节 padding
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐