并发编程(六)
·
一、前言
在前面的文章中,我们已经系统学习了Java内存模型(JMM)、volatile关键字的内存语义、synchronized的实现原理以及并发容器的使用。作为并发编程系列的重要延续,本文将深入探讨以下核心主题:
- 线程间通信机制:管道流实现的内存级线程通信
- 线程隔离机制:ThreadLocal的原理与实战应用
- 线程协同机制:Join方法的底层实现与经典应用场景
- 高级锁机制:包括ReentrantLock的AQS实现、读写锁优化、锁降级技术等
- 条件队列:Condition接口实现的精确线程唤醒机制
这些内容不仅是互联网大厂面试的高频考点(据统计,90%以上的Java高级工程师面试都会涉及这些知识点),更是构建高并发、高性能系统的必备技能。通过本文的学习,你将能够:
- 设计更高效的线程间通信方案
- 解决共享变量线程安全问题
- 实现复杂的线程调度逻辑
- 优化锁的使用提升系统吞吐量
二、管道输入输出流:线程间的内存通信
管道流(PipedInputStream/PipedOutputStream)是Java提供的一种基于内存的线程间通信机制。它通过在内核空间创建循环缓冲区(默认大小1024字节),实现两个线程之间的数据传输,避免了磁盘I/O或网络I/O的开销。
2.1 工作原理图解
线程A (生产者) → PipedOutputStream → 内核缓冲区 → PipedInputStream → 线程B (消费者)
2.2 完整示例代码解析
public class PipedCommunication {
public static void main(String[] args) throws IOException {
// 创建管道流对象
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream input = new PipedInputStream(output); // 推荐这种连接方式
// 生产者线程
Thread producer = new Thread(() -> {
try {
String[] messages = {"Java", "Concurrency", "Programming"};
for (String msg : messages) {
output.write(msg.getBytes(StandardCharsets.UTF_8));
System.out.println("[Producer] Sent: " + msg);
Thread.sleep(1000); // 模拟处理耗时
}
output.close(); // 必须关闭流
} catch (Exception e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int data;
while ((data = input.read()) != -1) { // 逐字节读取
buffer.write(data);
// 当遇到消息分隔符时处理完整消息
if (data == '\n') { // 假设消息以换行符分隔
String received = buffer.toString(StandardCharsets.UTF_8.name());
System.out.println("[Consumer] Received: " + received.trim());
buffer.reset();
}
}
input.close();
} catch (Exception e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
2.3 关键注意事项
- 连接建立时机:必须在开始数据传输前建立连接,否则会抛出IOException
- 线程安全:管道流本身不是线程安全的,需要确保读写在不同的线程
- 缓冲区大小:可以通过构造函数指定缓冲区大小(默认1024字节)
- 流关闭:使用完毕后必须关闭流,否则可能导致资源泄漏
- 死锁风险:如果缓冲区满时生产者继续写入,或缓冲区空时消费者继续读取,都会导致线程阻塞
2.4 应用场景
- 日志收集系统中日志生产与处理的解耦
- 多阶段处理流水线中的数据传递
- 测试代码中的模拟数据源
三、ThreadLocal:线程本地存储
ThreadLocal提供了线程级别的变量隔离,每个线程都拥有自己的变量副本,互不干扰。
3.1 核心应用场景
| 场景 | 说明 | 优势 |
|---|---|---|
| 数据库连接 | 每个线程维护独立Connection | 避免线程安全问题 |
| 用户会话 | 存储当前用户信息 | 无需显式传递参数 |
| 日期格式化 | SimpleDateFormat线程不安全问题的解决方案 | 避免创建大量实例 |
| 事务上下文 | 跨方法传递事务信息 | 代码解耦 |
3.2 深度原理解析
public class Thread implements Runnable {
// 每个线程持有自己的ThreadLocalMap
ThreadLocal.ThreadLocalMap threadLocals = null;
// 继承自父线程的ThreadLocalMap
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k); // 关键:对ThreadLocal对象是弱引用
value = v;
}
}
private Entry[] table;
// ...其他实现细节
}
内存泄漏问题分析:
- 当ThreadLocal对象失去强引用后,由于Entry是弱引用,可以被GC回收
- 但Entry中的value是强引用,如果线程长时间运行,会导致value无法回收
- 解决方案:使用后必须调用remove()方法清理
3.3 最佳实践示例
public class UserContextHolder {
private static final ThreadLocal<User> context = new ThreadLocal<>();
public static void set(User user) {
context.set(user);
}
public static User get() {
return context.get();
}
public static void clear() {
context.remove(); // 必须清理
}
}
// 使用示例
public class UserService {
public void processRequest() {
try {
User user = getUserFromRequest();
UserContextHolder.set(user);
doBusinessLogic();
} finally {
UserContextHolder.clear(); // 确保清理
}
}
private void doBusinessLogic() {
User currentUser = UserContextHolder.get();
// 使用当前用户
}
}
四、Thread.join():线程等待机制
4.1 源码级解析
public final synchronized void join() throws InterruptedException {
while (isAlive()) {
wait(0); // 调用Object.wait()
}
}
// 线程终止时的处理(在Thread.exit()中调用)
private void exit() {
synchronized (this) {
notifyAll(); // 唤醒所有等待线程
}
}
4.2 高级应用:多阶段任务处理
public class MultiStageTask {
public static void main(String[] args) throws InterruptedException {
Thread dataLoader = new Thread(() -> {
System.out.println("Loading initial data...");
// 模拟耗时
try { Thread.sleep(2000); } catch (InterruptedException e) {}
System.out.println("Data loading complete");
});
Thread processor = new Thread(() -> {
try {
dataLoader.join(); // 等待数据加载完成
System.out.println("Processing data...");
Thread.sleep(1000);
System.out.println("Processing complete");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread reporter = new Thread(() -> {
try {
processor.join(); // 等待处理完成
System.out.println("Generating report...");
Thread.sleep(500);
System.out.println("Report generated");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
dataLoader.start();
processor.start();
reporter.start();
reporter.join(); // 主线程等待最终完成
System.out.println("All tasks completed");
}
}
4.3 常见误区
- join()与start()顺序:必须先start()再join(),否则join()无效
- 中断处理:join()会响应中断,抛出InterruptedException
- 超时控制:可以使用join(millis)避免永久等待
- 性能影响:过度使用join()会导致并发度降低
五、ReentrantLock与AQS深度剖析
5.1 AQS(AbstractQueuedSynchronizer)核心结构
public abstract class AbstractQueuedSynchronizer {
// 等待队列头节点
private transient volatile Node head;
// 等待队列尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
}
5.2 公平锁与非公平锁对比
// 非公平锁尝试获取锁的逻辑
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 直接尝试CAS获取
setExclusiveOwnerThread(current);
return true;
}
}
// ...重入逻辑
}
// 公平锁尝试获取锁的逻辑
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 先检查队列是否有等待线程
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// ...重入逻辑
}
5.3 锁的可中断获取
public void lockInterruptibly() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(1)) // 尝试非阻塞获取
doAcquireInterruptibly(1); // 可中断方式获取
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 响应中断
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
六、读写锁与锁降级实战
6.1 缓存系统实现示例
public class ThreadSafeCache<K, V> {
private final Map<K, V> map = new HashMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
public V get(K key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public void put(K key, V value) {
writeLock.lock();
try {
map.put(key, value);
} finally {
writeLock.unlock();
}
}
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
V value;
readLock.lock();
try {
value = map.get(key);
if (value != null) {
return value;
}
} finally {
readLock.unlock();
}
// 升级为写锁
writeLock.lock();
try {
// 再次检查,防止其他线程已经修改
value = map.get(key);
if (value == null) {
value = mappingFunction.apply(key);
map.put(key, value);
}
return value;
} finally {
writeLock.unlock();
}
}
}
6.2 锁降级流程详解
- 获取写锁
- 修改共享数据
- 获取读锁(此时仍持有写锁)
- 释放写锁(此时只持有读锁)
- 使用读锁保护下的数据
- 最终释放读锁
public void processCachedData() {
rwl.writeLock().lock();
try {
// 修改数据...
updateData();
// 降级开始
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // 写锁释放,降级完成
}
try {
// 使用只读方式访问数据
useData();
} finally {
rwl.readLock().unlock();
}
}
七、Condition精准线程调度
7.1 生产者-消费者模型实现
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putPtr, takePtr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 等待不满信号
items[putPtr] = x;
if (++putPtr == items.length) putPtr = 0;
++count;
notEmpty.signal(); // 发送非空信号
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 等待非空信号
Object x = items[takePtr];
if (++takePtr == items.length) takePtr = 0;
--count;
notFull.signal(); // 发送不满信号
return x;
} finally {
lock.unlock();
}
}
}
7.2 Condition与Object监视器方法对比
| 特性 | Condition | Object监视器 |
|---|---|---|
| 等待队列数量 | 多个 | 单个 |
| 精确唤醒 | 支持 | 不支持 |
| 超时等待 | 支持 | 支持 |
| 中断响应 | 支持 | 支持 |
| 条件谓词 | 显式检查 | 隐式检查 |
| 使用方式 | 必须配合Lock | 配合synchronized |
八、总结与最佳实践
-
线程通信选择:
- 轻量级数据交换:优先考虑管道流
- 复杂数据共享:使用并发容器+锁机制
- 线程隔离:ThreadLocal是首选方案
-
锁使用原则:
- 读多写少场景:读写锁可提升性能
- 锁持有时间:应尽可能缩短
- 锁粒度:尽可能细化
- 锁顺序:避免死锁
-
性能优化建议:
- 减少锁竞争:使用分段锁或无锁数据结构
- 避免过早优化:先保证正确性再优化性能
- 监控锁争用:通过JMX或Profiler工具
-
资源清理:
- 显式释放:finally块中释放锁和清理ThreadLocal
- 异常处理:确保异常情况下资源能被正确释放
通过掌握这些高级并发技术,开发者可以构建出更高效、更健壮的并发应用程序,从容应对高并发场景下的各种挑战。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)