前言

ConditionObject维护条件队列,AQS维护同步队列。互动体现在await()释放锁并进入条件队列等待,signal()将节点从条件队列转移到同步队列。我们分大白话版学术代码版来详细解释 ConditionObject 与 AQS 队列之间的互动。


🗣️ 大白话版:银行VIP室与普通排队区的互动

把 AQS 的同步队列想象成银行大厅的普通排队区(所有人按顺序排队办业务),而 ConditionObject 就像一个VIP 休息室(条件等待区)。

情景: 你正在柜台办业务,但柜员说:“你要取的钱暂时不够,需要等一会儿。你先去旁边的VIP休息室等着,等钱到了我再叫你。”

  1. await() 做的事(去休息室等待)

    • 释放锁:你先把手头的业务放下(释放持有的锁),因为休息室的人不能占着柜台。
    • 进入休息室:你走进 VIP 休息室(进入条件队列),安静地坐着。
    • 阻塞:此时你不再参与大厅的排队竞争,完全挂起等待通知。
  2. signal() 做的事(叫号)

    • 从休息室出来:柜员说“钱到了”,把你从 VIP 休息室叫出来(从条件队列中移除)。
    • 回到排队区:你重新进入大厅的普通排队区(转移到 AQS 同步队列),就像新来的客户一样排队等待。
    • 等待竞争:等排到你的时候,你再次获得锁,然后继续之前没办完的业务。

关键点:条件队列和同步队列是两个独立的队列await 让你从同步队列“转移”到条件队列并阻塞;signal 让你从条件队列“转移”回同步队列,重新竞争锁。


📖 学术代码版

我们从为什么需要调用流程两个方面,结合 ReentrantLock 来解释 ConditionObject 的作用。


1. 为什么需要 ConditionObject

ReentrantLock 提供了基本的互斥锁功能:同一时刻只有一个线程能执行临界区代码。但在很多场景中,光有互斥是不够的,还需要线程之间的协调

比如经典的生产者-消费者模式:

  • 当缓冲区时,消费者线程应该等待,直到生产者放入数据。
  • 当缓冲区时,生产者线程应该等待,直到消费者取出数据。

用单纯的锁(ReentrantLock)只能保证存取操作互斥,无法实现“条件不满足时主动阻塞并在条件满足时被唤醒”。如果让线程自己循环检查条件并调用 Thread.sleep(),效率极低且无法精确唤醒。

ConditionObject 正是为了解决这个问题:它允许线程在某个条件不成立时释放锁并进入等待状态,等其他线程改变了条件后精确地唤醒那些等待的线程

一句话:Condition 是锁上的“等待 - 通知”机制,是 Object.wait/notify 的更强大、更灵活的替代品。


2. ReentrantLock 中使用 ConditionObject 的完整调用流程

我们用一个有界队列的实例来演示整个流程。

代码示例

public class BoundedQueue<T> {
    private final ReentrantLock lock = new ReentrantLock();
    // 两个条件对象:队列非空、队列未满
    private final Condition notEmpty = lock.newCondition(); // 内部是 ConditionObject
    private final Condition notFull  = lock.newCondition();

    private final Object[] items;
    private int putIndex, takeIndex, count;

    public BoundedQueue(int capacity) {
        items = new Object[capacity];
    }

    // 生产者调用:添加元素
    public void put(T t) throws InterruptedException {
        lock.lock();                    // 1. 获取锁
        try {
            while (count == items.length) { // 2. 条件不满足:队列已满
                notFull.await();        // 3. 等待“未满”条件
            }
            items[putIndex] = t;        // 4. 执行操作
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();          // 5. 唤醒等待“非空”条件的消费者
        } finally {
            lock.unlock();              // 6. 释放锁
        }
    }

    // 消费者调用:取出元素
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {        // 条件不满足:队列为空
                notEmpty.await();       // 等待“非空”条件
            }
            T t = (T) items[takeIndex];
            if (++takeIndex == items.length) takeIndex = 0;
            count--;
            notFull.signal();           // 唤醒等待“未满”条件的生产者
            return t;
        } finally {
            lock.unlock();
        }
    }
}

调用时序图(以消费者线程等待为例)

消费者线程                        生产者线程
    |                                 |
    | lock.lock() (获取锁)             |
    | while (count == 0)               |
    | notEmpty.await()  ------------>  (释放锁,进入条件队列阻塞)
    |                                 |
    |                                 | lock.lock() (获取锁)
    |                                 | 放入元素,count 变为 1
    |                                 | notEmpty.signal() --+
    |                                 | lock.unlock()       |
    |                                 |                     |
    | <-- 被唤醒,从 await() 返回 --+                     |
    | 重新竞争锁 (在 AQS 同步队列中排队)   |
    | 获得锁后继续执行 (取出元素)         |
    | ...                              |

详细步骤分解(重点看 await()signal() 的内部动作)

步骤 消费者线程(调用 await() 生产者线程(调用 signal()
1 调用 lock.lock(),成功获取锁 (初始时消费者持有锁并已调用 await
2 条件 count == 0 成立,调用 notEmpty.await() 调用 lock.lock(),此时锁被消费者释放了吗? 注意:消费者在 await 中已经释放了锁,所以生产者可以获取锁
3 await() 内部动作:
- 将当前线程包装成 Node,加入 ConditionObject条件队列(非 AQS 队列)
- 调用 fullyRelease 完全释放锁(包括重入次数)
- 阻塞线程 (LockSupport.park)
signal() 内部动作:
- 从 ConditionObject 的条件队列中取出第一个 Node
- 将该 Node 从条件队列中移除
- 调用 enq(node)Node 转移到 AQS 的同步队列(等待锁的队列)
- 修改 Node 的状态
4 线程阻塞,不再参与锁竞争 (可选)如果转移后前驱节点状态正常,不会立即唤醒;等待生产者释放锁
5 (等待被唤醒) 生产者调用 lock.unlock(),释放锁
6 - 释放锁时,AQS 会从同步队列中唤醒下一个节点(可能正是刚刚转移过来的消费者线程)
7 消费者线程被唤醒,从 park() 返回,继续执行 await() 的后半部分:进入 acquireQueued 在同步队列中竞争锁 -
8 消费者线程竞争到锁后,从 await() 返回,重新检查条件 while (count == 0),此时条件不成立(因为生产者已放入元素),退出循环,执行取元素操作 -

3. 为什么这个流程需要 ConditionObject 而不是只用 AQS 同步队列?

AQS 的同步队列(CLH 队列)只负责管理等待锁的线程。如果一个线程因为业务条件不满足而等待,它必须释放锁,否则会造成死锁(其他线程永远无法改变条件)。同时,它需要被精确唤醒(而不是所有等待锁的线程)。

ConditionObject 提供了独立的条件队列

  • 每个 Condition 对象有自己的队列,允许同一把锁上存在多个不同的等待条件(如 notFullnotEmpty)。
  • await() 让线程从同步队列转移到条件队列,并释放锁。
  • signal() / signalAll() 让线程从条件队列转移回同步队列,然后正常竞争锁。

这样就实现了锁的释放与等待条件的分离,而且比 Object.wait/notify 更灵活(支持多条件、公平性选择等)。


4. 总结

角色 作用
ReentrantLock 提供互斥锁和可重入性
ConditionObject 提供等待/通知机制,让线程在条件不满足时释放锁并阻塞,条件满足时被唤醒
调用流程核心 await(): 释放锁 → 入条件队列 → 阻塞
signal(): 出条件队列 → 入同步队列 → 等待锁 → 唤醒后重新竞争锁

最终效果:线程既保证了临界区的互斥,又能高效地等待特定条件,避免无效循环和忙等。这正是 ReentrantLock + Condition 组合的强大之处。


🤔 ConditionObject 是如何与 AQS 队列互动的?

ConditionObject是AQS内部实现等待/通知机制的关键,相当于为每个Condition对象维护了一个独立的条件队列。它与AQS主队列的互动,生动展示了线程等待与唤醒的完整过程:

  • await(): 等待条件发生

    • 步骤1:加入条件队列:当前线程(必须已持有锁)会通过addConditionWaiter()将自己封装成一个Node,并加入到ConditionObject的条件队列中。
    • 步骤2:释放锁资源:通过fullyRelease(node),将当前线程持有的锁全部释放(包括所有重入次数),并唤醒AQS队列中的后继线程。
    • 步骤3:阻塞等待唤醒:线程进入自旋,只要它还在条件队列中(!isOnSyncQueue(node)),就会通过LockSupport.park(this)进入阻塞状态,等待被唤醒。
  • signal(): 唤醒等待线程

    • 步骤1:取出节点:从条件队列的头部取出第一个等待节点(firstWaiter)。
    • 步骤2:节点转移:将此Node从条件队列中移除,并通过enq(node)方法,安全地将其重新加入到AQS的主等待队列尾部。
    • 步骤3:状态调整:将节点的waitStatusCONDITION重置为0,为后续在AQS队列中正常排队做准备。
  • signalAll(): 唤醒所有等待线程

    • 它会遍历整个条件队列,将所有节点都移动到AQS的主队列中。这些被转移的线程将在主队列中竞争锁,并最终一个接一个地获得执行机会。

从源码剖析互动细节

AbstractQueuedSynchronizer 内部,ConditionObject 维护了自己的条件队列(单向链表),而 AQS 本身维护同步队列(双向链表)。它们的互动主要通过 await()signal() 完成。源码以JDK8为例。

1️⃣ await() 做了什么 —— 释放锁 + 进入条件队列 + 阻塞

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1. 将当前线程封装成 Node,加入条件队列
    Node node = addConditionWaiter();
    // 2. 释放当前线程持有的锁(全部重入次数),并唤醒同步队列中的后继节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 3. 判断当前 node 是否还在同步队列中
    while (!isOnSyncQueue(node)) {
        // 4. 不在同步队列中(说明还在条件队列中),就阻塞线程
        LockSupport.park(this);
        // 检查中断...
    }
    // 5. 被唤醒后,进入同步队列的竞争流程
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 后续处理中断...
}

关键互动点:

  • addConditionWaiter():将当前线程的 Node 加入 ConditionObject 的条件队列(firstWaiter/lastWaiter)。
  • fullyRelease(node):释放锁,让 AQS 同步队列中的下一个线程可以继续竞争。
  • isOnSyncQueue(node):检查节点是否已经从条件队列转移到了 AQS 同步队列。如果还在条件队列中,就 park 阻塞。只有当 signal 将其转移到同步队列后,才会退出循环。
  • 退出循环后,调用 acquireQueued,让线程在同步队列中正常排队竞争锁

2️⃣ signal() 做了什么 —— 将节点从条件队列移到同步队列

// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
public final void signal() {
    // 1. 检查当前线程是否持有锁(只有锁持有者才能调用 signal)
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 2. 将 first 从条件队列中移除(first = first.nextWaiter)
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 3. 将节点转移到 AQS 的同步队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// 该方法在 AQS 中
final boolean transferForSignal(Node node) {
    // 1. 将节点的 waitStatus 从 CONDITION 设为 0(表示不再是条件等待状态)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 2. 将该节点入队到 AQS 的同步队列尾部,并返回该节点的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 3. 如果前驱节点被取消或无法设为 SIGNAL,则直接唤醒节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

关键互动点:

  • 从条件队列头部取出节点。
  • 将节点的 waitStatusCONDITION 改为 0,表示它不再是条件等待状态。
  • 通过 enq(node) 将该节点安全地加入 AQS 同步队列的尾部
  • 如果有必要,直接唤醒该线程(让它从 await()park 处恢复,然后通过 acquireQueued 正常排队竞争锁)。

3️⃣ 完整流程图(条件队列 ↔ 同步队列)

获取锁的线程                                   锁被释放
     │                                            │
     ▼                                            ▼
┌─────────────┐   await()   ┌─────────────┐   signal()   ┌─────────────┐
│ AQS 同步队列 │ ──释放锁──▶ │ 条件队列     │ ──转移节点──▶ │ AQS 同步队列 │
│ (排队区)     │ ◀─竞争锁─── │ (休息室)     │              │ (排队区)     │
└─────────────┘             └─────────────┘              └─────────────┘
      │                            │                            │
      │                            │                            │
  持有锁的线程                    等待条件的线程                重新排队竞争锁
  调用 await()                   被唤醒后转移                   最终获得锁

📌 总结

队列 管理者 存储内容 节点转移方向
AQS 同步队列 AQS 本身 正在等待锁的线程 signal 时从条件队列 → 同步队列
条件队列 ConditionObject 等待某个条件成立的线程 await 时从同步队列 → 条件队列

互动核心:

  • await = 释放锁 + 进入条件队列 + 阻塞
  • signal = 从条件队列移除 + 加入同步队列(不自动解锁,等待锁释放后线程自然竞争)。

这就是 Condition 实现线程间精确通知的基础:将等待和锁的竞争完全分离,由 AQS 统一管理阻塞和唤醒的底层细节。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐