🌺The Begin🌺点点关注,收藏不迷路🌺

1. 引言:线程不是孤岛

在多线程编程中,线程之间并非各自为政、老死不相往来。相反,它们常常需要协作来完成复杂任务:一个线程生产数据,另一个线程消费数据;一个线程等待条件满足,另一个线程触发条件。

那么,Java 中的线程之间究竟如何进行通信?本文将系统性地介绍 Java 线程间通信的多种方式,从基础的 wait/notify 到高级的 Lock/Condition,再到更现代的 CompletableFuture 和响应式流。

2. 一图看懂:线程通信全景图

通信方式分类

线程通信

共享内存

消息传递

volatile

synchronized + wait/notify

Lock + Condition

原子类

BlockingQueue

管道流 PipedInputStream/OutputStream

CountDownLatch/CyclicBarrier

Exchanger

CompletableFuture

3. 共享内存方式:wait/notify 机制

3.1 核心原理

wait/notify 机制是 Java 最基础的线程通信方式,基于 Object 类中的三个方法:

生产者线程 共享对象监视器 消费者线程 生产者线程 共享对象监视器 消费者线程 进入等待队列 synchronized 获取锁 wait() 释放锁并等待 synchronized 获取锁 生产数据 notify()/notifyAll() 唤醒 重新竞争锁 继续执行

3.2 经典生产者-消费者示例

public class WaitNotifyDemo {
    private static final int CAPACITY = 5;
    private final Queue<Integer> queue = new LinkedList<>();
    private int value = 0;
    
    // 生产者
    public synchronized void produce() throws InterruptedException {
        while (queue.size() == CAPACITY) {
            System.out.println(Thread.currentThread().getName() + " 队列满,等待...");
            wait();  // 释放锁,进入等待
        }
        queue.offer(++value);
        System.out.println(Thread.currentThread().getName() + " 生产: " + value + ",队列大小: " + queue.size());
        notifyAll();  // 唤醒所有等待线程
    }
    
    // 消费者
    public synchronized void consume() throws InterruptedException {
        while (queue.isEmpty()) {
            System.out.println(Thread.currentThread().getName() + " 队列空,等待...");
            wait();
        }
        Integer val = queue.poll();
        System.out.println(Thread.currentThread().getName() + " 消费: " + val + ",队列大小: " + queue.size());
        notifyAll();
    }
    
    public static void main(String[] args) {
        WaitNotifyDemo demo = new WaitNotifyDemo();
        
        // 启动多个生产者和消费者
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    while (true) {
                        demo.produce();
                        Thread.sleep(500);
                    }
                } catch (InterruptedException e) {}
            }, "Producer-" + i).start();
        }
        
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    while (true) {
                        demo.consume();
                        Thread.sleep(800);
                    }
                } catch (InterruptedException e) {}
            }, "Consumer-" + i).start();
        }
    }
}

3.3 wait/notify 的关键规则

wait

notify

notifyAll

调用 wait/notify 前

是否持有对象锁?

抛出 IllegalMonitorStateException

可以调用

调用哪个方法?

释放锁
进入等待队列

随机唤醒一个等待线程

唤醒所有等待线程

被唤醒后
重新竞争锁

从 wait 后继续执行

4. Lock + Condition:更灵活的通信

4.1 Condition 的优势

wait/notify 相比,Condition 提供了:

  • 多个等待队列:一个 Lock 可以创建多个 Condition
  • 可中断等待awaitInterruptibly()
  • 超时等待await(time, unit)
  • 公平性控制

Condition

一个Lock

多个Condition

notFull 队列

notEmpty 队列

精确唤醒

wait/notify

一个锁对象

一个等待队列

无法区分等待原因

4.2 使用 Condition 实现生产者-消费者

public class ConditionDemo {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();   // 队列不满条件
    private final Condition notEmpty = lock.newCondition();  // 队列不空条件
    private final Queue<Integer> queue = new LinkedList<>();
    private final int CAPACITY = 5;
    private int value = 0;
    
    public void produce() {
        lock.lock();
        try {
            while (queue.size() == CAPACITY) {
                System.out.println(Thread.currentThread().getName() + " 队列满,等待...");
                notFull.await();  // 等待队列不满
            }
            queue.offer(++value);
            System.out.println(Thread.currentThread().getName() + " 生产: " + value);
            notEmpty.signal();    // 唤醒等待队列不空的消费者
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
    
    public void consume() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + " 队列空,等待...");
                notEmpty.await();  // 等待队列不空
            }
            Integer val = queue.poll();
            System.out.println(Thread.currentThread().getName() + " 消费: " + val);
            notFull.signal();      // 唤醒等待队列不满的生产者
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
}

5. 消息传递方式:BlockingQueue

5.1 阻塞队列的原理

阻塞队列封装了线程通信的细节,开发者只需要关注 puttake

阻塞队列

生产者1

BlockingQueue

生产者2

消费者1

消费者2

队列满: put阻塞

队列空: take阻塞

5.2 使用 BlockingQueue 简化通信

public class BlockingQueueDemo {
    // 使用 LinkedBlockingQueue,默认容量 Integer.MAX_VALUE
    private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
    
    static class Producer implements Runnable {
        @Override
        public void run() {
            int value = 0;
            try {
                while (true) {
                    queue.put(++value);  // 队列满时自动阻塞
                    System.out.println(Thread.currentThread().getName() + " 生产: " + value);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Integer val = queue.take();  // 队列空时自动阻塞
                    System.out.println(Thread.currentThread().getName() + " 消费: " + val);
                    Thread.sleep(800);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        executor.submit(new Producer());
        executor.submit(new Producer());
        executor.submit(new Consumer());
        executor.submit(new Consumer());
    }
}

5.3 常用阻塞队列对比

阻塞队列 特点 适用场景
ArrayBlockingQueue 有界数组结构,公平/非公平 固定大小的缓冲池
LinkedBlockingQueue 可选有界,链表结构 生产者-消费者模式
PriorityBlockingQueue 无界,支持优先级 任务优先级调度
SynchronousQueue 容量为0,直接传递 直接交接,如线程池
DelayQueue 延迟出队 定时任务调度
LinkedTransferQueue 支持 transfer 操作 高性能生产者-消费者

6. 同步辅助类:线程间的握手

6.1 CountDownLatch:等待一组线程完成

线程3 线程2 线程1 CountDownLatch(3) 主线程 线程3 线程2 线程1 CountDownLatch(3) 主线程 继续执行 await() 等待 countDown() countDown() countDown() 唤醒
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            final int id = i;
            new Thread(() -> {
                System.out.println("线程 " + id + " 开始工作");
                try {
                    Thread.sleep((long) (Math.random() * 2000));
                } catch (InterruptedException e) {}
                System.out.println("线程 " + id + " 完成工作");
                latch.countDown();  // 计数器减1
            }).start();
        }
        
        System.out.println("主线程等待所有子线程完成...");
        latch.await();  // 等待计数器变为0
        System.out.println("所有线程完成,主线程继续");
    }
}

6.2 CyclicBarrier:等待一组线程到达屏障点

CyclicBarrier 线程3 线程2 线程1 CyclicBarrier 线程3 线程2 线程1 等待中... await() await() await() 同时唤醒 同时唤醒 同时唤醒
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int partyCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
            System.out.println("所有线程已到达屏障点,执行汇总任务");
        });
        
        for (int i = 0; i < partyCount; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + id + " 执行阶段1");
                    Thread.sleep((long) (Math.random() * 1000));
                    barrier.await();  // 等待其他线程
                    
                    System.out.println("线程 " + id + " 执行阶段2");
                    Thread.sleep((long) (Math.random() * 1000));
                    barrier.await();
                    
                    System.out.println("线程 " + id + " 执行完成");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

6.3 CountDownLatch vs CyclicBarrier

特性 CountDownLatch CyclicBarrier
计数器 递减 递增到阈值
可重用 否(一次性) 是(可重置)
等待方式 等待计数归零 等待达到阈值
触发动作 可设置 barrierAction
典型场景 等待所有任务完成 多阶段并行计算

6.4 Exchanger:两个线程交换数据

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        Thread producer = new Thread(() -> {
            try {
                String data = "生产者数据";
                System.out.println("生产者准备交换: " + data);
                String received = exchanger.exchange(data);
                System.out.println("生产者收到: " + received);
            } catch (InterruptedException e) {}
        });
        
        Thread consumer = new Thread(() -> {
            try {
                String data = "消费者数据";
                System.out.println("消费者准备交换: " + data);
                String received = exchanger.exchange(data);
                System.out.println("消费者收到: " + received);
            } catch (InterruptedException e) {}
        });
        
        producer.start();
        consumer.start();
    }
}

7. 现代异步通信:CompletableFuture

7.1 链式异步通信

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 线程间通过 CompletableFuture 链式传递结果
        CompletableFuture.supplyAsync(() -> {
            System.out.println("线程1: 获取用户ID");
            return 1001;
        }).thenApplyAsync(userId -> {
            System.out.println("线程2: 根据用户ID获取用户信息");
            return "User_" + userId;
        }).thenAcceptAsync(userInfo -> {
            System.out.println("线程3: 处理用户信息: " + userInfo);
        }).thenRun(() -> {
            System.out.println("所有任务完成");
        });
        
        // 等待完成
        Thread.sleep(2000);
    }
}

7.2 组合多个异步任务

public class CompletableFutureCombineDemo {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return 10;
        });
        
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            return 20;
        });
        
        // 等待两个任务都完成,然后组合结果
        CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> a + b);
        
        System.out.println("结果: " + combined.get());  // 30
        
        // 任意一个完成即触发
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
        System.out.println("最先完成的任务结果: " + anyOf.get());
    }
    
    private static void sleep(long millis) {
        try { Thread.sleep(millis); } catch (InterruptedException e) {}
    }
}

8. 管道流:PipedInputStream/PipedOutputStream

public class PipedStreamDemo {
    public static void main(String[] args) throws IOException {
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream(pos);  // 连接两个管道
        
        Thread sender = new Thread(() -> {
            try {
                String message = "Hello from sender!";
                pos.write(message.getBytes());
                System.out.println("发送: " + message);
                pos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        Thread receiver = new Thread(() -> {
            try {
                byte[] buffer = new byte[1024];
                int len = pis.read(buffer);
                System.out.println("接收: " + new String(buffer, 0, len));
                pis.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        sender.start();
        receiver.start();
    }
}

9. volatile 和原子类:轻量级通信

9.1 volatile 实现线程间可见性

public class VolatileCommunicationDemo {
    private static volatile boolean flag = false;
    private static int data = 0;
    
    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            // 生产数据
            data = 42;           // 普通写
            flag = true;         // volatile 写,触发内存屏障
            System.out.println("生产者:数据已生产");
        });
        
        Thread consumer = new Thread(() -> {
            while (!flag) {      // volatile 读
                // 自旋等待
                Thread.yield();
            }
            System.out.println("消费者:读到数据 " + data);  // 一定能看到 42
        });
        
        producer.start();
        consumer.start();
    }
}

9.2 原子类的 CAS 通信

public class AtomicCommunicationDemo {
    private static final AtomicInteger counter = new AtomicInteger(0);
    private static final int TARGET = 100;
    
    public static void main(String[] args) {
        Thread[] threads = new Thread[10];
        
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                while (counter.get() < TARGET) {
                    int current = counter.get();
                    if (counter.compareAndSet(current, current + 1)) {
                        System.out.println(Thread.currentThread().getName() + 
                            " 将计数从 " + current + " 增加到 " + (current + 1));
                    }
                }
            });
            threads[i].start();
        }
    }
}

10. 通信方式对比与选择

10.1 全面对比表

通信方式 易用性 灵活性 性能 适用场景
wait/notify ⭐⭐ ⭐⭐ ⭐⭐⭐ 简单的一对一等待通知
Lock/Condition ⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ 多个等待队列的复杂场景
BlockingQueue ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐ 生产者-消费者模式
CountDownLatch ⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐ 等待一组任务完成
CyclicBarrier ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ 多阶段并行计算
CompletableFuture ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ 异步任务编排
volatile ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐⭐ 简单的状态标志
管道流 ⭐⭐ ⭐⭐ ⭐⭐ 线程间字节流传输

10.2 选择决策图

简单等待通知

数据传递

任务协调

异步结果

少量数据

大量数据

等待完成

阶段同步

数据交换

需要线程间通信

通信方式?

wait/notify 或 Condition

数据量?

协调模式?

CompletableFuture

volatile/原子类

BlockingQueue/管道流

CountDownLatch

CyclicBarrier

Exchanger

11. 常见问题与最佳实践

11.1 虚假唤醒(Spurious Wakeup)

// ❌ 错误:可能因虚假唤醒导致条件不满足就执行
synchronized (lock) {
    if (!condition) {
        lock.wait();
    }
    // 执行操作
}

// ✅ 正确:总是使用 while 循环检查条件
synchronized (lock) {
    while (!condition) {
        lock.wait();
    }
    // 执行操作
}

11.2 避免死锁

// 注意:wait 会释放锁,但 Condition.await() 也会释放锁
// 但 notify 不会释放锁,被唤醒的线程需要等待 notify 线程释放锁

11.3 最佳实践清单

线程通信最佳实践

优先使用高级工具

BlockingQueue

CompletableFuture

CountDownLatch

正确使用wait/notify

必须在synchronized内

使用while循环检查

notifyAll优先于notify

避免忙等待

使用wait/await替代循环检查

减少CPU浪费

处理中断

正确处理InterruptedException

恢复中断状态

文档化通信协议

说明线程间的协作方式

明确共享状态

12. 总结

12.1 核心要点

要点 说明
共享内存 wait/notify、Lock/Condition、volatile
消息传递 BlockingQueue、管道流
同步辅助 CountDownLatch、CyclicBarrier、Exchanger
异步编程 CompletableFuture
核心原则 避免忙等待、处理虚假唤醒、注意死锁

一句话总结:

Java 线程间通信主要分为共享内存(wait/notify、Lock/Condition、volatile)和消息传递(BlockingQueue、管道流)两大类,辅以 CountDownLatch、CyclicBarrier 等同步辅助工具。选择哪种方式取决于具体场景:简单的状态通知用 volatile,数据传递用 BlockingQueue,复杂的等待通知用 Condition,异步任务编排用 CompletableFuture。


📌 记住:线程通信是并发编程的核心,正确使用通信机制能让多线程协作如臂使指,错误使用则可能导致死锁、活锁、性能下降等问题。

如果觉得本文对你有帮助,欢迎点赞、收藏、转发~

在这里插入图片描述


🌺The End🌺点点关注,收藏不迷路🌺
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐