🚀 Java 巩固进阶 · 第18天

主题:线程通信 wait/notify/notifyAll —— 生产者消费者模型实战

📅 进度概览:今天学习 多线程协作的核心机制:线程通信。掌握 wait/notify,你就能实现线程间的"精准配合",这是阻塞队列、消息队列、任务调度器的底层基石。

💡 核心价值

  • 协作基石:让线程从"各自为战"升级为"团队配合",实现高效的生产消费、任务分发。
  • 框架原理:理解 ArrayBlockingQueue、SpringBoot @Async、Kafka 消费者组的底层通信机制。
  • 面试高频:wait/notify 使用规则、虚假唤醒、生产者消费者模型,是并发面试的必考题。
  • 设计思维:学会用"条件等待 + 通知唤醒"替代"忙等待",大幅提升系统性能。

一、为什么需要线程通信?从"盲等"到"精准通知" 🔔

1. 经典场景:生产者 - 消费者模型

┌─────────────────────────────────────┐
│  🏭 生产者线程                        │
│  - 负责生成数据(订单/日志/消息)    │
│  - 数据放入共享缓冲区                │
│  - 缓冲区满时:暂停生产,等待消费   │
└─────────────────────────────────────┘
                    ↓ 共享缓冲区
┌─────────────────────────────────────┐
│  🛒 消费者线程                        │
│  - 从缓冲区取数据进行处理            │
│  - 缓冲区空时:暂停消费,等待生产   │
│  - 处理完成:通知生产者可继续生产    │
└─────────────────────────────────────┘

🎯 核心诉求:
✅ 缓冲区空 → 消费者等待,生产者生产后通知
✅ 缓冲区满 → 生产者等待,消费者消费后通知
❌ 禁止"忙等待":while(缓冲区空) {空转} → 浪费CPU!

2. 真实业务场景映射

业务场景 生产者 消费者 共享缓冲区
订单处理 用户下单接口 订单履约服务 订单队列(Redis/DB)
日志收集 业务日志打印 日志异步写入 内存缓冲队列
消息推送 事件触发服务 短信/邮件发送 消息队列(Kafka/RocketMQ)
数据同步 binlog 监听 ES/缓存同步 Canal 通道

💡 核心思想
“条件不满足时等待(释放锁),条件满足时通知(唤醒等待者)”
wait/notify 替代 sleep 轮询,让线程"聪明地等待",而非"傻等"。


二、三大核心方法:wait / notify / notifyAll 🎯

1. 方法签名 & 核心特性

// ⚠️ 重要:这三个方法定义在 Object 类,所有对象都能调用!
public final void wait() throws InterruptedException;      // 等待
public final void notify();                                 // 唤醒一个
public final void notifyAll();                              // 唤醒全部

2. 使用规则(❗ 违反必报错)

规则 说明 违反后果
必须在 synchronized 内调用 因为需要持有锁才能操作等待队列 IllegalMonitorStateException
wait() 会释放锁 线程进入等待池,其他线程可竞争锁 ✅ 核心特性,实现线程协作
被唤醒后需重新竞争锁 notify 后,被唤醒线程不会立即执行,需重新获取锁 ⚠️ 注意:唤醒≠执行
InterruptedException wait 可被中断,必须捕获或抛出 ✅ 支持优雅停机

3. 方法对比 & 选型建议

synchronized (lock) {
    // 🔴 wait():当前线程释放锁,进入等待池
    // - 直到被 notify/notifyAll 唤醒,或 interrupt,或 timeout
    // - 唤醒后:重新竞争锁,竞争成功才继续执行
    lock.wait();
    
    // 🟡 notify():随机唤醒一个在 lock 上 wait 的线程
    // - 不释放当前锁!当前线程执行完同步块后,被唤醒者才有机会竞争
    // - "随机":依赖 JVM 实现,不可控,慎用!
    lock.notify();
    
    // 🟢 notifyAll():唤醒所有在 lock 上 wait 的线程
    // - 所有被唤醒者竞争锁,只有一个能执行,其余继续等待
    // - ✅ 生产环境推荐:避免"唤醒错误线程"导致的死锁/饥饿
    lock.notifyAll();
}

💡 为什么生产环境推荐 notifyAll()?

场景:缓冲区有"空"和"满"两个条件,生产者和消费者都在 wait

如果用 notify():
- 生产者生产完,调用 notify(),可能唤醒另一个生产者(而非消费者)
- 被唤醒的生产者发现缓冲区仍满,继续 wait → 消费者永远不被唤醒 → 死锁!

如果用 notifyAll():
- 所有等待者都被唤醒,各自检查条件
- 消费者发现缓冲区有数据,执行消费;生产者发现仍满,继续 wait
- ✅ 虽然多唤醒几个线程,但逻辑正确,避免死锁

三、经典范式:条件等待 + 通知唤醒(⭐ 背下来!)

1. 标准模板(直接复用)

synchronized (lock) {
    // 🔁 步骤1:while 循环检查条件(❗ 必须用 while,不能用 if)
    while (条件不满足) {  // 如:buffer.isEmpty() / buffer.isFull()
        try {
            lock.wait();  // 释放锁,进入等待
        } catch (InterruptedException e) {
            // ✅ 中断处理:恢复中断标志 + 优雅退出
            Thread.currentThread().interrupt();
            return;  // 或 throw new RuntimeException(e)
        }
    }
    
    // ✅ 步骤2:条件满足,执行核心业务
    doBusinessLogic();
    
    // 🔔 步骤3:状态改变,通知其他等待线程
    lock.notifyAll();  // ✅ 推荐用 notifyAll,避免唤醒错误线程
}

2. ❗ 为什么必须用 while 而不是 if

// ❌ 错误:用 if 判断,可能遭遇"虚假唤醒"
synchronized (lock) {
    if (buffer.isEmpty()) {  // 只判断一次!
        lock.wait();  // 被唤醒后,直接往下执行,不再检查条件!
    }
    // 🐛 风险:被唤醒时缓冲区可能仍为空(虚假唤醒/其他线程抢占)
    buffer.take();  // 可能抛出 NoSuchElementException!
}

// ✅ 正确:用 while 循环,唤醒后重新检查条件
synchronized (lock) {
    while (buffer.isEmpty()) {  // 每次唤醒都重新检查!
        lock.wait();
    }
    // ✅ 安全:能执行到这里,说明缓冲区一定非空
    buffer.take();
}

🔍 什么是"虚假唤醒"(Spurious Wakeup)?

⚠️ 现象:线程没有被 notify/notifyAll 唤醒,也没有被 interrupt,却从 wait() 返回了!

📚 原因:
- JVM/操作系统底层实现机制(如 Linux 的 futex)
- 为了性能优化,允许"无理由唤醒"

✅ 应对方案:
- 永远用 while 循环检查条件,而非 if
- 这是 Java 并发编程的"铁律",所有源码(如 ArrayBlockingQueue)都遵守

💡 记忆口诀:
"等待用 while,唤醒再检查,虚假唤醒也不怕"

四、实战案例:单缓冲区生产者消费者(完整可运行)

📦 共享资源:Box(容量=1)

/**
 * 单元素缓冲区:生产-消费经典模型
 * 状态:0=空,1=满
 */
class Box {
    private int product = 0;  // 0:空, 1:满
    private final Object lock = new Object();
    
    /**
     * 生产:缓冲区空才能生产,生产后通知消费者
     */
    public void produce(int value) throws InterruptedException {
        synchronized (lock) {
            // 🔁 缓冲区满则等待
            while (product == 1) {
                System.out.println("📦 缓冲区满,生产者等待...");
                lock.wait();  // 释放锁,进入等待
            }
            
            // ✅ 执行生产
            product = 1;
            System.out.println("🏭 生产者生产: " + value + ",当前状态: 满");
            
            // 🔔 通知消费者(可能正在等待)
            lock.notifyAll();
        }
    }
    
    /**
     * 消费:缓冲区满才能消费,消费后通知生产者
     */
    public int consume() throws InterruptedException {
        synchronized (lock) {
            // 🔁 缓冲区空则等待
            while (product == 0) {
                System.out.println("📦 缓冲区空,消费者等待...");
                lock.wait();
            }
            
            // ✅ 执行消费
            product = 0;
            System.out.println("🛒 消费者消费,当前状态: 空");
            
            // 🔔 通知生产者(可能正在等待)
            lock.notifyAll();
            
            return 1;  // 简化:返回固定值
        }
    }
}

🧵 线程启动 & 运行

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        Box box = new Box();
        
        // 🏭 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    box.produce(i);
                    Thread.sleep(500);  // 模拟生产耗时(放锁外!)
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("生产者被中断");
            }
        }, "Producer-Thread");
        
        // 🛒 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    box.consume();
                    Thread.sleep(800);  // 模拟消费耗时(放锁外!)
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("消费者被中断");
            }
        }, "Consumer-Thread");
        
        // 🚀 启动线程
        producer.start();
        consumer.start();
        
        // ⏳ 主线程等待(实际项目可用 CountDownLatch)
        producer.join();
        consumer.join();
        System.out.println("✅ 生产消费完成");
    }
}

🔍 执行流程图解

时间线:
T0: Producer 启动,检查 product=0(空)→ 生产 product=1 → notifyAll → sleep
T1: Consumer 启动,检查 product=1(满)→ 消费 product=0 → notifyAll → sleep
T2: Producer 醒来,检查 product=0 → 生产 → notifyAll → sleep
T3: Consumer 醒来,检查 product=1 → 消费 → notifyAll → sleep
...
✅ 完美交替,无忙等待,无超卖/空取

💡 关键设计:
1. while 循环检查条件 → 防虚假唤醒
2. wait() 释放锁 → 允许对方线程执行
3. notifyAll() 通知 → 避免唤醒错误线程
4. sleep() 放锁外 → 提升并发度

五、进阶:多生产者多消费者 + 有界缓冲区 🚀

📦 升级版:ArrayBuffer(容量=N)

/**
 * 有界缓冲区:支持多生产者/多消费者
 * 使用循环数组 + 头尾指针实现
 */
class ArrayBuffer<T> {
    private final Object[] items;
    private int count;      // 当前元素个数
    private int putIndex;   // 下一个放入位置
    private int takeIndex;  // 下一个取出位置
    private final Object lock = new Object();
    
    public ArrayBuffer(int capacity) {
        items = new Object[capacity];
    }
    
    /**
     * 放入元素:缓冲区满则等待
     */
    @SuppressWarnings("unchecked")
    public void put(T item) throws InterruptedException {
        synchronized (lock) {
            // 🔁 缓冲区满则等待
            while (count == items.length) {
                System.out.println("📦 缓冲区满,生产者等待...");
                lock.wait();
            }
            
            // ✅ 放入元素
            items[putIndex] = item;
            putIndex = (putIndex + 1) % items.length;  // 循环指针
            count++;
            System.out.println("🏭 放入: " + item + ", 当前数量: " + count);
            
            // 🔔 通知消费者(可能有多个在等待)
            lock.notifyAll();
        }
    }
    
    /**
     * 取出元素:缓冲区空则等待
     */
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        synchronized (lock) {
            // 🔁 缓冲区空则等待
            while (count == 0) {
                System.out.println("📦 缓冲区空,消费者等待...");
                lock.wait();
            }
            
            // ✅ 取出元素
            T item = (T) items[takeIndex];
            items[takeIndex] = null;  // 帮助 GC
            takeIndex = (takeIndex + 1) % items.length;
            count--;
            System.out.println("🛒 取出: " + item + ", 当前数量: " + count);
            
            // 🔔 通知生产者
            lock.notifyAll();
            
            return item;
        }
    }
}

🔍 为什么这个实现是"线程安全"的?

  1. 互斥:所有操作在 synchronized(lock) 内,同一时间只有一个线程执行
  2. 可见性wait/notify 机制保证工作内存与主内存同步
  3. 条件等待while 循环确保"满才能 put,空才能 take"
  4. 通知唤醒notifyAll 确保所有等待者有机会重新检查条件

💡 这就是 JDK ArrayBlockingQueue 的核心原理!
实际项目中,建议直接使用 BlockingQueue 系列,而非手写 wait/notify。


六、🎯 今日实战任务:构建简易任务调度器

任务1:复现单缓冲区生产消费

/**
 * 要求:
 * 1. 实现 Box 类(容量=1),支持 produce/consume
 * 2. 1 个生产者线程:生产 1~5,每次间隔 500ms
 * 3. 1 个消费者线程:消费并打印,每次间隔 800ms
 * 4. 观察输出:是否完美交替?有无死锁/空取?
 * 
 * 💡 调试技巧:
 * - 为每个线程设置语义化名称
 * - 在 wait/notify 处打印日志,追踪线程状态
 */

任务2:升级多生产者多消费者

/**
 * 要求:
 * 1. 使用 ArrayBuffer(容量=3)
 * 2. 启动 2 个生产者:分别生产 "P1-1"~"P1-5", "P2-1"~"P2-5"
 * 3. 启动 3 个消费者:竞争消费
 * 4. 观察:是否出现"生产者饥饿"或"消费者空转"?
 * 
 * 💡 挑战:
 * - 如果消费者处理速度远慢于生产者,缓冲区会怎样?
 * - 如何实现"优雅停机":生产完成后,通知消费者退出?
 */

任务3:模拟 SpringBoot 异步日志(实战高频)

/**
 * 异步日志写入器:业务线程生产日志,后台线程消费写入文件
 * 
 * 要求:
 * 1. LogBuffer:有界缓冲区(容量=100),存储日志字符串
 * 2. LoggerProducer:业务线程调用 log(msg) → 放入缓冲区(满则阻塞)
 * 3. LoggerConsumer:后台守护线程,循环取日志 → 写入文件(空则等待)
 * 4. 支持优雅关闭:调用 shutdown() 后,处理完剩余日志再退出
 * 
 * 💡 最佳实践:
 * - 消费者设为守护线程:setDaemon(true)
 * - shutdown 时:设置标志位 + notifyAll 唤醒消费者
 * - 日志写入用 BufferedWriter + 定期 flush
 */

任务4:对比 wait/notify vs BlockingQueue(理解框架设计)

/**
 * 用两种方式实现相同的生产消费逻辑,对比代码复杂度
 * 
 * 方式1:手写 wait/notify(今天学的)
 * 方式2:使用 ArrayBlockingQueue(JDK 原生)
 * 
 * 要求:
 * 1. 分别实现两种版本
 * 2. 对比:代码量、可读性、异常处理、扩展性
 * 3. 思考:为什么框架推荐用 BlockingQueue?
 * 
 * 💡 结论预告:
 * - BlockingQueue 封装了 wait/notify,API 更简洁
 * - 内置超时、中断、批量操作等高级特性
 * - 经过 JDK 团队充分测试,更可靠
 * ✅ 生产环境:优先用 BlockingQueue,理解原理即可
 */

📝 第18天 · 核心总结(极简背诵版)

  1. 线程通信三要素

    🎯 共享资源 + synchronized 锁 + wait/notify 机制
    ✅ 条件不满足 → wait() 释放锁等待
    ✅ 条件改变后 → notifyAll() 唤醒等待者
    
  2. 三大方法铁律

    方法 作用 关键特性 注意事项
    wait() 等待条件 释放锁,进入等待池 必须 catch InterruptedException
    notify() 唤醒一个 随机选择,不可控 ⚠️ 可能唤醒错误线程,慎用
    notifyAll() 唤醒全部 所有等待者竞争锁 ✅ 生产环境推荐,避免死锁
  3. 经典范式模板(直接复用):

    synchronized (lock) {
        while (条件不满足) {  // ❗ 必须 while,防虚假唤醒
            lock.wait();     // 释放锁,等待通知
        }
        // ✅ 执行业务逻辑
        doSomething();
        // 🔔 状态改变,通知他人
        lock.notifyAll();    // ✅ 推荐 notifyAll
    }
    
  4. 关键设计原则

    • 🔁 条件检查用 while:虚假唤醒是真实存在的!
    • 🔓 wait 释放锁:让其他线程有机会修改条件
    • 🔔 优先 notifyAll:避免"唤醒错误线程"导致的逻辑错误
    • ⏱️ 耗时操作放锁外sleep/IO/网络调用 不要放在 synchronized 内
  5. 生产环境建议

    • ✅ 理解 wait/notify 原理,但优先使用 java.util.concurrent 工具类
    • BlockingQueue / CountDownLatch / CyclicBarrier 更安全可靠
    • ✅ 日志记录 wait/notify 事件,便于排查"线程不唤醒"问题
    • ❌ 避免在锁内调用外部未知方法(可能死锁)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐