心有灵犀:全面解析 Java 线程间通信机制
心有灵犀:全面解析 Java 线程间通信机制
|
🌺The Begin🌺点点关注,收藏不迷路🌺
|
1. 引言:线程不是孤岛
在多线程编程中,线程之间并非各自为政、老死不相往来。相反,它们常常需要协作来完成复杂任务:一个线程生产数据,另一个线程消费数据;一个线程等待条件满足,另一个线程触发条件。
那么,Java 中的线程之间究竟如何进行通信?本文将系统性地介绍 Java 线程间通信的多种方式,从基础的 wait/notify 到高级的 Lock/Condition,再到更现代的 CompletableFuture 和响应式流。
2. 一图看懂:线程通信全景图
3. 共享内存方式:wait/notify 机制
3.1 核心原理
wait/notify 机制是 Java 最基础的线程通信方式,基于 Object 类中的三个方法:
3.2 经典生产者-消费者示例
public class WaitNotifyDemo {
private static final int CAPACITY = 5;
private final Queue<Integer> queue = new LinkedList<>();
private int value = 0;
// 生产者
public synchronized void produce() throws InterruptedException {
while (queue.size() == CAPACITY) {
System.out.println(Thread.currentThread().getName() + " 队列满,等待...");
wait(); // 释放锁,进入等待
}
queue.offer(++value);
System.out.println(Thread.currentThread().getName() + " 生产: " + value + ",队列大小: " + queue.size());
notifyAll(); // 唤醒所有等待线程
}
// 消费者
public synchronized void consume() throws InterruptedException {
while (queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " 队列空,等待...");
wait();
}
Integer val = queue.poll();
System.out.println(Thread.currentThread().getName() + " 消费: " + val + ",队列大小: " + queue.size());
notifyAll();
}
public static void main(String[] args) {
WaitNotifyDemo demo = new WaitNotifyDemo();
// 启动多个生产者和消费者
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
while (true) {
demo.produce();
Thread.sleep(500);
}
} catch (InterruptedException e) {}
}, "Producer-" + i).start();
}
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
while (true) {
demo.consume();
Thread.sleep(800);
}
} catch (InterruptedException e) {}
}, "Consumer-" + i).start();
}
}
}
3.3 wait/notify 的关键规则
4. Lock + Condition:更灵活的通信
4.1 Condition 的优势
与 wait/notify 相比,Condition 提供了:
- 多个等待队列:一个 Lock 可以创建多个 Condition
- 可中断等待:
awaitInterruptibly() - 超时等待:
await(time, unit) - 公平性控制
4.2 使用 Condition 实现生产者-消费者
public class ConditionDemo {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 队列不满条件
private final Condition notEmpty = lock.newCondition(); // 队列不空条件
private final Queue<Integer> queue = new LinkedList<>();
private final int CAPACITY = 5;
private int value = 0;
public void produce() {
lock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println(Thread.currentThread().getName() + " 队列满,等待...");
notFull.await(); // 等待队列不满
}
queue.offer(++value);
System.out.println(Thread.currentThread().getName() + " 生产: " + value);
notEmpty.signal(); // 唤醒等待队列不空的消费者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " 队列空,等待...");
notEmpty.await(); // 等待队列不空
}
Integer val = queue.poll();
System.out.println(Thread.currentThread().getName() + " 消费: " + val);
notFull.signal(); // 唤醒等待队列不满的生产者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
5. 消息传递方式:BlockingQueue
5.1 阻塞队列的原理
阻塞队列封装了线程通信的细节,开发者只需要关注 put 和 take:
5.2 使用 BlockingQueue 简化通信
public class BlockingQueueDemo {
// 使用 LinkedBlockingQueue,默认容量 Integer.MAX_VALUE
private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
static class Producer implements Runnable {
@Override
public void run() {
int value = 0;
try {
while (true) {
queue.put(++value); // 队列满时自动阻塞
System.out.println(Thread.currentThread().getName() + " 生产: " + value);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer val = queue.take(); // 队列空时自动阻塞
System.out.println(Thread.currentThread().getName() + " 消费: " + val);
Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.submit(new Producer());
executor.submit(new Producer());
executor.submit(new Consumer());
executor.submit(new Consumer());
}
}
5.3 常用阻塞队列对比
| 阻塞队列 | 特点 | 适用场景 |
|---|---|---|
| ArrayBlockingQueue | 有界数组结构,公平/非公平 | 固定大小的缓冲池 |
| LinkedBlockingQueue | 可选有界,链表结构 | 生产者-消费者模式 |
| PriorityBlockingQueue | 无界,支持优先级 | 任务优先级调度 |
| SynchronousQueue | 容量为0,直接传递 | 直接交接,如线程池 |
| DelayQueue | 延迟出队 | 定时任务调度 |
| LinkedTransferQueue | 支持 transfer 操作 | 高性能生产者-消费者 |
6. 同步辅助类:线程间的握手
6.1 CountDownLatch:等待一组线程完成
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int id = i;
new Thread(() -> {
System.out.println("线程 " + id + " 开始工作");
try {
Thread.sleep((long) (Math.random() * 2000));
} catch (InterruptedException e) {}
System.out.println("线程 " + id + " 完成工作");
latch.countDown(); // 计数器减1
}).start();
}
System.out.println("主线程等待所有子线程完成...");
latch.await(); // 等待计数器变为0
System.out.println("所有线程完成,主线程继续");
}
}
6.2 CyclicBarrier:等待一组线程到达屏障点
public class CyclicBarrierDemo {
public static void main(String[] args) {
int partyCount = 3;
CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
System.out.println("所有线程已到达屏障点,执行汇总任务");
});
for (int i = 0; i < partyCount; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("线程 " + id + " 执行阶段1");
Thread.sleep((long) (Math.random() * 1000));
barrier.await(); // 等待其他线程
System.out.println("线程 " + id + " 执行阶段2");
Thread.sleep((long) (Math.random() * 1000));
barrier.await();
System.out.println("线程 " + id + " 执行完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
6.3 CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数器 | 递减 | 递增到阈值 |
| 可重用 | 否(一次性) | 是(可重置) |
| 等待方式 | 等待计数归零 | 等待达到阈值 |
| 触发动作 | 无 | 可设置 barrierAction |
| 典型场景 | 等待所有任务完成 | 多阶段并行计算 |
6.4 Exchanger:两个线程交换数据
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread producer = new Thread(() -> {
try {
String data = "生产者数据";
System.out.println("生产者准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("生产者收到: " + received);
} catch (InterruptedException e) {}
});
Thread consumer = new Thread(() -> {
try {
String data = "消费者数据";
System.out.println("消费者准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("消费者收到: " + received);
} catch (InterruptedException e) {}
});
producer.start();
consumer.start();
}
}
7. 现代异步通信:CompletableFuture
7.1 链式异步通信
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 线程间通过 CompletableFuture 链式传递结果
CompletableFuture.supplyAsync(() -> {
System.out.println("线程1: 获取用户ID");
return 1001;
}).thenApplyAsync(userId -> {
System.out.println("线程2: 根据用户ID获取用户信息");
return "User_" + userId;
}).thenAcceptAsync(userInfo -> {
System.out.println("线程3: 处理用户信息: " + userInfo);
}).thenRun(() -> {
System.out.println("所有任务完成");
});
// 等待完成
Thread.sleep(2000);
}
}
7.2 组合多个异步任务
public class CompletableFutureCombineDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
return 20;
});
// 等待两个任务都完成,然后组合结果
CompletableFuture<Integer> combined = future1.thenCombine(future2, (a, b) -> a + b);
System.out.println("结果: " + combined.get()); // 30
// 任意一个完成即触发
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2);
System.out.println("最先完成的任务结果: " + anyOf.get());
}
private static void sleep(long millis) {
try { Thread.sleep(millis); } catch (InterruptedException e) {}
}
}
8. 管道流:PipedInputStream/PipedOutputStream
public class PipedStreamDemo {
public static void main(String[] args) throws IOException {
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos); // 连接两个管道
Thread sender = new Thread(() -> {
try {
String message = "Hello from sender!";
pos.write(message.getBytes());
System.out.println("发送: " + message);
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
});
Thread receiver = new Thread(() -> {
try {
byte[] buffer = new byte[1024];
int len = pis.read(buffer);
System.out.println("接收: " + new String(buffer, 0, len));
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
});
sender.start();
receiver.start();
}
}
9. volatile 和原子类:轻量级通信
9.1 volatile 实现线程间可见性
public class VolatileCommunicationDemo {
private static volatile boolean flag = false;
private static int data = 0;
public static void main(String[] args) {
Thread producer = new Thread(() -> {
// 生产数据
data = 42; // 普通写
flag = true; // volatile 写,触发内存屏障
System.out.println("生产者:数据已生产");
});
Thread consumer = new Thread(() -> {
while (!flag) { // volatile 读
// 自旋等待
Thread.yield();
}
System.out.println("消费者:读到数据 " + data); // 一定能看到 42
});
producer.start();
consumer.start();
}
}
9.2 原子类的 CAS 通信
public class AtomicCommunicationDemo {
private static final AtomicInteger counter = new AtomicInteger(0);
private static final int TARGET = 100;
public static void main(String[] args) {
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (counter.get() < TARGET) {
int current = counter.get();
if (counter.compareAndSet(current, current + 1)) {
System.out.println(Thread.currentThread().getName() +
" 将计数从 " + current + " 增加到 " + (current + 1));
}
}
});
threads[i].start();
}
}
}
10. 通信方式对比与选择
10.1 全面对比表
| 通信方式 | 易用性 | 灵活性 | 性能 | 适用场景 |
|---|---|---|---|---|
| wait/notify | ⭐⭐ | ⭐⭐ | ⭐⭐⭐ | 简单的一对一等待通知 |
| Lock/Condition | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 多个等待队列的复杂场景 |
| BlockingQueue | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 生产者-消费者模式 |
| CountDownLatch | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 等待一组任务完成 |
| CyclicBarrier | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 多阶段并行计算 |
| CompletableFuture | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 异步任务编排 |
| volatile | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 简单的状态标志 |
| 管道流 | ⭐⭐ | ⭐⭐ | ⭐⭐ | 线程间字节流传输 |
10.2 选择决策图
11. 常见问题与最佳实践
11.1 虚假唤醒(Spurious Wakeup)
// ❌ 错误:可能因虚假唤醒导致条件不满足就执行
synchronized (lock) {
if (!condition) {
lock.wait();
}
// 执行操作
}
// ✅ 正确:总是使用 while 循环检查条件
synchronized (lock) {
while (!condition) {
lock.wait();
}
// 执行操作
}
11.2 避免死锁
// 注意:wait 会释放锁,但 Condition.await() 也会释放锁
// 但 notify 不会释放锁,被唤醒的线程需要等待 notify 线程释放锁
11.3 最佳实践清单
12. 总结
12.1 核心要点
| 要点 | 说明 |
|---|---|
| 共享内存 | wait/notify、Lock/Condition、volatile |
| 消息传递 | BlockingQueue、管道流 |
| 同步辅助 | CountDownLatch、CyclicBarrier、Exchanger |
| 异步编程 | CompletableFuture |
| 核心原则 | 避免忙等待、处理虚假唤醒、注意死锁 |
一句话总结:
Java 线程间通信主要分为共享内存(wait/notify、Lock/Condition、volatile)和消息传递(BlockingQueue、管道流)两大类,辅以 CountDownLatch、CyclicBarrier 等同步辅助工具。选择哪种方式取决于具体场景:简单的状态通知用 volatile,数据传递用 BlockingQueue,复杂的等待通知用 Condition,异步任务编排用 CompletableFuture。
📌 记住:线程通信是并发编程的核心,正确使用通信机制能让多线程协作如臂使指,错误使用则可能导致死锁、活锁、性能下降等问题。
如果觉得本文对你有帮助,欢迎点赞、收藏、转发~

|
🌺The End🌺点点关注,收藏不迷路🌺
|
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)