17-Java语言核心-并发编程-并发工具类详解
并发工具类详解
一、知识概述
Java并发包(JUC)提供了一系列并发工具类,用于解决多线程编程中的协调、同步和资源控制问题。这些工具类位于 java.util.concurrent 包下,主要包括:
- CountDownLatch:倒计时计数器,用于等待一组线程完成
- CyclicBarrier:循环栅栏,用于让一组线程互相等待到达某个屏障点
- Semaphore:信号量,用于控制同时访问资源的线程数量
- Exchanger:交换器,用于两个线程之间交换数据
- Phaser:阶段器,支持动态注册的同步屏障
这些工具类基于 AQS(AbstractQueuedSynchronizer) 或类似机制实现,提供了比传统 wait/notify 更强大、更灵活的线程协调能力。
二、知识点详细讲解
2.1 CountDownLatch - 倒计时计数器
核心概念
CountDownLatch 是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。它通过一个计数器实现,初始值为线程数量,每个线程完成时计数减1,当计数为0时,等待的线程被释放。
关键特点
- 单向性:计数器只能递减,不能重置
- 一次性:计数到0后无法重用
- 非公平性:不保证线程获取锁的顺序
核心方法
// 构造方法,指定初始计数值
public CountDownLatch(int count)
// 等待计数器归零,可被中断
public void await() throws InterruptedException
// 带超时的等待
public boolean await(long timeout, TimeUnit unit)
// 计数减1
public void countDown()
// 获取当前计数值
public long getCount()
应用场景
- 主线程等待所有子线程完成:并行计算、批量任务处理
- 服务启动检查:等待所有必要服务就绪
- 资源初始化:等待所有资源加载完成
2.2 CyclicBarrier - 循环栅栏
核心概念
CyclicBarrier 让一组线程互相等待,直到所有线程都到达某个屏障点后才继续执行。与 CountDownLatch 不同,它是可循环使用的。
关键特点
- 可重用:计数器可重置,支持重复使用
- 双向等待:所有线程互相等待
- 支持回调:可设置栅栏动作
核心方法
// 构造方法,指定参与线程数
public CyclicBarrier(int parties)
// 带栅栏动作的构造方法
public CyclicBarrier(int parties, Runnable barrierAction)
// 到达栅栏并等待其他线程
public int await() throws InterruptedException, BrokenBarrierException
// 带超时的等待
public int await(long timeout, TimeUnit unit)
// 获取参与线程数
public int getParties()
// 检查栅栏是否破损
public boolean isBroken()
// 重置栅栏
public void reset()
应用场景
- 多阶段任务:多个线程分阶段执行
- 并行计算合并:等待所有线程完成某阶段后合并结果
- 批量数据处理:分批处理、批量提交
BrokenBarrierException
当栅栏被打破时(线程被中断、超时或重置),等待的线程会抛出此异常:
// 栅栏被打破的情况:
// 1. 某个等待线程被中断
// 2. 某个等待线程超时
// 3. 栅栏被显式 reset()
// 4. 栅栏动作抛出异常
2.3 Semaphore - 信号量
核心概念
Semaphore 通过许可证机制控制同时访问某个资源的线程数量。线程在访问资源前获取许可证,使用完毕后释放许可证。
关键特点
- 资源限制:精确控制并发访问数量
- 公平性可选:支持公平和非公平模式
- 可重入性:同一线程可多次获取许可证
核心方法
// 构造方法,指定许可数量
public Semaphore(int permits)
// 指定公平性
public Semaphore(int permits, boolean fair)
// 获取一个许可(阻塞)
public void acquire() throws InterruptedException
// 获取多个许可
public void acquire(int permits) throws InterruptedException
// 尝试获取许可(非阻塞)
public boolean tryAcquire()
// 释放许可
public void release()
// 释放多个许可
public void release(int permits)
// 获取可用许可数
public int availablePermits()
应用场景
- 资源池:数据库连接池、线程池控制
- 限流:控制并发请求量
- 互斥锁:许可数为1时实现互斥
2.4 Exchanger - 交换器
核心概念
Exchanger 提供一个同步点,两个线程在此交换数据。一个线程到达交换点后等待另一个线程,两者都到达后交换数据并继续执行。
核心方法
// 交换数据(阻塞)
public V exchange(V x) throws InterruptedException
// 带超时的交换
public V exchange(V x, long timeout, TimeUnit unit)
应用场景
- 生产者-消费者:数据交换
- 遗传算法:配对交叉
- 缓冲区切换:双缓冲技术
2.5 Phaser - 阶段器
核心概念
Phaser 是 JDK 7 引入的更灵活的同步屏障,支持动态注册和注销参与者,适合分阶段任务。
核心方法
// 构造方法
public Phaser()
public Phaser(int parties)
// 注册参与者
public int register()
public int bulkRegister(int parties)
// 到达并等待其他参与者
public int arriveAndAwaitAdvance()
// 到达并注销
public int arriveAndDeregister()
// 仅到达,不等待
public int arrive()
// 等待下一阶段
public int awaitAdvance(int phase)
// 获取当前阶段
public int getPhase()
// 获取参与者数量
public int getRegisteredParties()
// 设置终止回调
protected boolean onAdvance(int phase, int registeredParties)
应用场景
- 动态任务:参与者数量可变的任务
- 多阶段处理:不确定阶段数的流程
- 分治算法:递归任务分解
三、代码示例
3.1 CountDownLatch 示例
示例1:主线程等待子线程完成
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep((long) (Math.random() * 3000));
System.out.println("任务 " + taskId + " 执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数减1
}
});
}
System.out.println("主线程等待所有任务完成...");
latch.await(); // 等待计数归零
System.out.println("所有任务已完成,主线程继续执行");
executor.shutdown();
}
}
// 输出示例:
// 主线程等待所有任务完成...
// 任务 0 开始执行
// 任务 1 开始执行
// 任务 2 开始执行
// 任务 3 开始执行
// 任务 4 开始执行
// 任务 2 执行完成
// 任务 0 执行完成
// 任务 4 执行完成
// 任务 1 执行完成
// 任务 3 执行完成
// 所有任务已完成,主线程继续执行
示例2:服务启动检查
import java.util.concurrent.CountDownLatch;
public class ServiceStartupDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
// 启动数据库服务
new Thread(() -> {
System.out.println("正在启动数据库服务...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("数据库服务启动完成");
latch.countDown();
}).start();
// 启动缓存服务
new Thread(() -> {
System.out.println("正在启动缓存服务...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("缓存服务启动完成");
latch.countDown();
}).start();
// 启动消息队列服务
new Thread(() -> {
System.out.println("正在启动消息队列服务...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("消息队列服务启动完成");
latch.countDown();
}).start();
// 等待所有服务启动完成
System.out.println("等待所有服务启动...");
latch.await();
System.out.println("所有服务启动完成,应用开始提供服务");
}
}
3.2 CyclicBarrier 示例
示例1:多阶段任务处理
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int threadCount = 3;
// 创建栅栏,所有线程到达后执行合并动作
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("=== 所有线程已完成本阶段,开始下一阶段 ===\n");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 第一阶段
System.out.println("线程 " + threadId + " 执行第一阶段");
Thread.sleep((long) (Math.random() * 1000));
barrier.await();
// 第二阶段
System.out.println("线程 " + threadId + " 执行第二阶段");
Thread.sleep((long) (Math.random() * 1000));
barrier.await();
// 第三阶段
System.out.println("线程 " + threadId + " 执行第三阶段");
Thread.sleep((long) (Math.random() * 1000));
barrier.await();
System.out.println("线程 " + threadId + " 全部完成");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
// 输出示例:
// 线程 0 执行第一阶段
// 线程 1 执行第一阶段
// 线程 2 执行第一阶段
// === 所有线程已完成本阶段,开始下一阶段 ===
//
// 线程 2 执行第二阶段
// 线程 0 执行第二阶段
// 线程 1 执行第二阶段
// === 所有线程已完成本阶段,开始下一阶段 ===
//
// 线程 1 执行第三阶段
// 线程 2 执行第三阶段
// 线程 0 执行第三阶段
// === 所有线程已完成本阶段,开始下一阶段 ===
//
// 线程 0 全部完成
// 线程 1 全部完成
// 线程 2 全部完成
示例2:并行计算合并
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
public class ParallelComputeDemo {
private static final int THREAD_COUNT = 4;
private static final int[] data = new int[100];
private static final int[] partialSums = new int[THREAD_COUNT];
private static final AtomicInteger totalSum = new AtomicInteger(0);
public static void main(String[] args) {
// 初始化数据
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
// 合并各线程的部分和
int sum = 0;
for (int partialSum : partialSums) {
sum += partialSum;
}
totalSum.set(sum);
System.out.println("所有部分和已合并,总和: " + sum);
});
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
new Thread(() -> {
int start = threadId * 25;
int end = start + 25;
int sum = 0;
// 计算部分和
for (int j = start; j < end; j++) {
sum += data[j];
}
partialSums[threadId] = sum;
System.out.println("线程 " + threadId + " 部分和: " + sum);
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// 等待计算完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终结果: " + totalSum.get());
// 最终结果应为 5050 (1+2+...+100)
}
}
3.3 Semaphore 示例
示例1:资源池控制
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ResourcePoolDemo {
public static void main(String[] args) {
int poolSize = 3; // 资源池大小
int threadCount = 10; // 请求线程数
Semaphore semaphore = new Semaphore(poolSize);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executor.submit(() -> {
try {
// 获取许可
semaphore.acquire();
System.out.println("线程 " + threadId + " 获取资源,剩余资源: "
+ semaphore.availablePermits());
// 模拟资源使用
Thread.sleep(2000);
System.out.println("线程 " + threadId + " 释放资源");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可
semaphore.release();
}
});
}
executor.shutdown();
}
}
// 输出示例(资源限制生效,最多同时3个线程访问):
// 线程 0 获取资源,剩余资源: 2
// 线程 1 获取资源,剩余资源: 1
// 线程 2 获取资源,剩余资源: 0
// 线程 0 释放资源
// 线程 3 获取资源,剩余资源: 0
// 线程 1 释放资源
// 线程 4 获取资源,剩余资源: 0
// ...
示例2:公平与非公平信号量
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FairSemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 非公平信号量 ===");
testSemaphore(false);
Thread.sleep(1000);
System.out.println("\n=== 公平信号量 ===");
testSemaphore(true);
}
private static void testSemaphore(boolean fair) throws InterruptedException {
Semaphore semaphore = new Semaphore(1, fair);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int threadId = i;
executor.submit(() -> {
try {
semaphore.acquire();
System.out.println("线程 " + threadId + " 获取锁");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
});
}
executor.shutdown();
Thread.sleep(2000);
}
}
示例3:简单限流器
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
public class RateLimiter {
private final Semaphore semaphore;
private final AtomicInteger acquired = new AtomicInteger(0);
public RateLimiter(int maxConcurrent) {
this.semaphore = new Semaphore(maxConcurrent);
}
public boolean tryAcquire() {
boolean success = semaphore.tryAcquire();
if (success) {
acquired.incrementAndGet();
}
return success;
}
public void release() {
semaphore.release();
}
public int getAcquired() {
return acquired.get();
}
public static void main(String[] args) throws InterruptedException {
RateLimiter limiter = new RateLimiter(3); // 最大并发3
// 模拟10个请求
for (int i = 0; i < 10; i++) {
final int requestId = i;
new Thread(() -> {
if (limiter.tryAcquire()) {
try {
System.out.println("请求 " + requestId + " 通过");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
limiter.release();
System.out.println("请求 " + requestId + " 完成");
}
} else {
System.out.println("请求 " + requestId + " 被限流");
}
}).start();
Thread.sleep(100);
}
}
}
3.4 Exchanger 示例
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// 线程A:生产数据
new Thread(() -> {
try {
String data = "来自A的数据";
System.out.println("线程A准备交换: " + data);
Thread.sleep(1000);
String received = exchanger.exchange(data);
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 线程B:处理数据
new Thread(() -> {
try {
String data = "来自B的数据";
System.out.println("线程B准备交换: " + data);
Thread.sleep(2000);
String received = exchanger.exchange(data);
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
// 输出:
// 线程A准备交换: 来自A的数据
// 线程B准备交换: 来自B的数据
// 线程A收到: 来自B的数据
// 线程B收到: 来自A的数据
双缓冲示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class DoubleBufferDemo {
public static void main(String[] args) {
Exchanger<List<Integer>> exchanger = new Exchanger<>();
// 生产者:填充缓冲区
new Thread(() -> {
List<Integer> buffer = new ArrayList<>();
for (int i = 0; i < 5; i++) {
buffer.add(i);
System.out.println("生产者添加: " + i);
if (buffer.size() == 3) {
try {
buffer = exchanger.exchange(buffer); // 交换缓冲区
System.out.println("生产者交换缓冲区");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// 消费者:处理缓冲区
new Thread(() -> {
List<Integer> buffer = new ArrayList<>();
while (true) {
try {
buffer = exchanger.exchange(buffer); // 等待交换
System.out.println("消费者收到: " + buffer);
buffer.clear(); // 清空缓冲区
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}
3.5 Phaser 示例
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 初始3个参与者
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
// 第一阶段
System.out.println("线程 " + threadId + " 完成第一阶段,阶段号: "
+ phaser.getPhase());
phaser.arriveAndAwaitAdvance();
// 第二阶段
System.out.println("线程 " + threadId + " 完成第二阶段,阶段号: "
+ phaser.getPhase());
phaser.arriveAndAwaitAdvance();
// 第三阶段
System.out.println("线程 " + threadId + " 完成第三阶段,阶段号: "
+ phaser.getPhase());
phaser.arriveAndDeregister(); // 注销参与者
}).start();
}
// 等待所有阶段完成
while (!phaser.isTerminated()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("所有阶段完成");
}
}
动态注册示例
import java.util.concurrent.Phaser;
public class DynamicPhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("阶段 " + phase + " 完成,参与者: " + registeredParties);
return phase >= 2 || registeredParties == 0; // 终止条件
}
};
// 主线程注册
phaser.register();
// 动态创建任务
for (int i = 0; i < 3; i++) {
phaser.register(); // 动态注册
final int taskId = i;
new Thread(() -> {
System.out.println("任务 " + taskId + " 开始阶段0");
phaser.arriveAndAwaitAdvance();
System.out.println("任务 " + taskId + " 开始阶段1");
phaser.arriveAndAwaitAdvance();
System.out.println("任务 " + taskId + " 开始阶段2");
phaser.arriveAndDeregister(); // 完成并注销
}).start();
}
// 主线程也参与
phaser.arriveAndDeregister();
// 等待完成
while (!phaser.isTerminated()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("所有任务完成");
}
}
四、实战应用场景
4.1 并行任务汇总
import java.util.concurrent.*;
import java.util.*;
public class ParallelAggregation {
public static void main(String[] args) throws InterruptedException {
// 模拟并行处理大数据集
List<Integer> data = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
data.add(i);
}
int threadCount = 4;
CountDownLatch latch = new CountDownLatch(threadCount);
List<Future<Map<String, Integer>>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
int chunkSize = data.size() / threadCount;
for (int i = 0; i < threadCount; i++) {
int start = i * chunkSize;
int end = (i == threadCount - 1) ? data.size() : start + chunkSize;
List<Integer> chunk = data.subList(start, end);
futures.add(executor.submit(() -> {
try {
Map<String, Integer> stats = new HashMap<>();
stats.put("sum", 0);
stats.put("max", Integer.MIN_VALUE);
stats.put("min", Integer.MAX_VALUE);
stats.put("count", 0);
for (int num : chunk) {
stats.merge("sum", num, Integer::sum);
stats.put("max", Math.max(stats.get("max"), num));
stats.put("min", Math.min(stats.get("min"), num));
stats.merge("count", 1, Integer::sum);
}
return stats;
} finally {
latch.countDown();
}
}));
}
latch.await();
// 合并结果
int totalSum = 0;
int globalMax = Integer.MIN_VALUE;
int globalMin = Integer.MAX_VALUE;
int totalCount = 0;
for (Future<Map<String, Integer>> future : futures) {
try {
Map<String, Integer> stats = future.get();
totalSum += stats.get("sum");
globalMax = Math.max(globalMax, stats.get("max"));
globalMin = Math.min(globalMin, stats.get("min"));
totalCount += stats.get("count");
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("统计结果:");
System.out.println("总和: " + totalSum);
System.out.println("最大值: " + globalMax);
System.out.println("最小值: " + globalMin);
System.out.println("数量: " + totalCount);
System.out.println("平均值: " + (double) totalSum / totalCount);
executor.shutdown();
}
}
4.2 数据库连接池模拟
import java.util.concurrent.Semaphore;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ConnectionPool {
private final BlockingQueue<Connection> pool;
private final Semaphore semaphore;
public ConnectionPool(int poolSize) {
this.pool = new ArrayBlockingQueue<>(poolSize);
this.semaphore = new Semaphore(poolSize);
// 初始化连接
for (int i = 0; i < poolSize; i++) {
pool.offer(new Connection("Connection-" + i));
}
}
public Connection getConnection(long timeout, TimeUnit unit)
throws InterruptedException {
if (!semaphore.tryAcquire(timeout, unit)) {
throw new RuntimeException("获取连接超时");
}
Connection conn = pool.poll();
if (conn != null) {
conn.setInUse(true);
return conn;
}
semaphore.release();
throw new RuntimeException("连接池为空");
}
public void releaseConnection(Connection conn) {
if (conn != null) {
conn.setInUse(false);
pool.offer(conn);
semaphore.release();
}
}
public int getAvailableConnections() {
return semaphore.availablePermits();
}
// 简单连接对象
static class Connection {
private final String name;
private boolean inUse;
public Connection(String name) {
this.name = name;
this.inUse = false;
}
public String getName() { return name; }
public boolean isInUse() { return inUse; }
public void setInUse(boolean inUse) { this.inUse = inUse; }
}
public static void main(String[] args) throws InterruptedException {
ConnectionPool pool = new ConnectionPool(3);
// 模拟5个线程竞争3个连接
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
Connection conn = pool.getConnection(5, TimeUnit.SECONDS);
System.out.println("线程 " + threadId + " 获取: " + conn.getName()
+ ", 剩余: " + pool.getAvailableConnections());
Thread.sleep(2000);
pool.releaseConnection(conn);
System.out.println("线程 " + threadId + " 释放: " + conn.getName()
+ ", 剩余: " + pool.getAvailableConnections());
} catch (InterruptedException e) {
System.out.println("线程 " + threadId + " 获取连接超时");
Thread.currentThread().interrupt();
}
}).start();
}
}
}
4.3 多阶段数据处理管道
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.*;
public class DataProcessingPipeline {
private static final int STAGES = 4;
private static final int WORKERS = 3;
public static void main(String[] args) {
// 每个阶段的数据
List<List<String>> stageData = new ArrayList<>();
for (int i = 0; i < STAGES; i++) {
stageData.add(Collections.synchronizedList(new ArrayList<>()));
}
CyclicBarrier barrier = new CyclicBarrier(WORKERS, () -> {
int currentStage = (int) (System.currentTimeMillis() % STAGES);
System.out.println("\n=== 阶段转换,处理数据 ===");
});
for (int i = 0; i < WORKERS; i++) {
final int workerId = i;
new Thread(() -> {
try {
for (int stage = 0; stage < STAGES; stage++) {
// 模拟数据处理
String result = "Worker-" + workerId + "-Stage" + stage;
stageData.get(stage).add(result);
System.out.println(result + " 处理完成");
barrier.await();
}
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
4.4 柔性并发控制
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
public class FlexibleConcurrency {
public static void main(String[] args) {
Phaser phaser = new Phaser(1) { // 主线程注册
@Override
protected boolean onAdvance(int phase, int parties) {
System.out.println("阶段 " + phase + " 完成,参与人数: " + parties);
return phase >= 5; // 5个阶段后终止
}
};
// 动态创建任务
for (int i = 0; i < 10; i++) {
phaser.register(); // 动态注册
new Thread(() -> {
try {
while (!phaser.isTerminated()) {
// 模拟工作
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
// 随机决定是否继续
if (ThreadLocalRandom.current().nextDouble() < 0.2) {
System.out.println(Thread.currentThread().getName() + " 退出");
phaser.arriveAndDeregister();
break;
} else {
phaser.arriveAndAwaitAdvance();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Task-" + i).start();
}
// 主线程注销
phaser.arriveAndDeregister();
// 等待终止
while (!phaser.isTerminated()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("所有任务完成");
}
}
五、工具类对比
| 特性 | CountDownLatch | CyclicBarrier | Semaphore | Phaser |
|---|---|---|---|---|
| 可重用 | 否 | 是 | 是 | 是 |
| 等待方向 | 单向(等待者) | 双向(互相等待) | 资源获取 | 分阶段 |
| 参与者动态 | 否 | 否 | 否 | 是 |
| 阶段概念 | 无 | 有(隐式) | 无 | 有(显式) |
| 回调支持 | 否 | 是 | 否 | 是 |
| 适用场景 | 一次性等待 | 循环屏障 | 资源限制 | 动态任务 |
选择建议
- CountDownLatch:一次性事件,如初始化、批量任务完成
- CyclicBarrier:重复的固定参与者任务,如分阶段计算
- Semaphore:资源限制,如连接池、限流
- Phaser:动态参与者的多阶段任务
六、总结与最佳实践
最佳实践
-
CountDownLatch 使用要点
- 确保
countDown()在 finally 块中调用 - 注意计数器不能为负数
- 考虑使用
await(timeout, unit)避免无限等待
- 确保
-
CyclicBarrier 使用要点
- 处理
BrokenBarrierException - 栅栏动作不宜耗时过长
- 确保所有线程都能到达栅栏点
- 处理
-
Semaphore 使用要点
- 获取和释放必须配对
- 公平模式可能影响性能
- 注意许可证数量设置
-
Phaser 使用要点
- 合理设置终止条件
- 动态注册时注意线程安全
- 覆盖
onAdvance实现自定义逻辑
常见陷阱
- 忘记释放资源:使用 try-finally 确保释放
- 死锁:避免在持有锁时等待其他资源
- 计数错误:确保 countDown 和 acquire 次数匹配
- 中断处理:正确处理 InterruptedException
性能考虑
- 高竞争场景:考虑使用公平模式
- 大量线程:避免过度创建线程
- 超时设置:避免无限等待
- 资源泄漏:及时释放许可和注销参与者
并发工具类是 Java 并发编程的重要组成部分,正确使用这些工具可以大大简化多线程编程的复杂性。选择合适的工具类,遵循最佳实践,能够构建出高效、可靠的多线程应用。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)