并发工具类详解

一、知识概述

Java并发包(JUC)提供了一系列并发工具类,用于解决多线程编程中的协调、同步和资源控制问题。这些工具类位于 java.util.concurrent 包下,主要包括:

  • CountDownLatch:倒计时计数器,用于等待一组线程完成
  • CyclicBarrier:循环栅栏,用于让一组线程互相等待到达某个屏障点
  • Semaphore:信号量,用于控制同时访问资源的线程数量
  • Exchanger:交换器,用于两个线程之间交换数据
  • Phaser:阶段器,支持动态注册的同步屏障

这些工具类基于 AQS(AbstractQueuedSynchronizer) 或类似机制实现,提供了比传统 wait/notify 更强大、更灵活的线程协调能力。

二、知识点详细讲解

2.1 CountDownLatch - 倒计时计数器

核心概念

CountDownLatch 是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。它通过一个计数器实现,初始值为线程数量,每个线程完成时计数减1,当计数为0时,等待的线程被释放。

关键特点
  1. 单向性:计数器只能递减,不能重置
  2. 一次性:计数到0后无法重用
  3. 非公平性:不保证线程获取锁的顺序
核心方法
// 构造方法,指定初始计数值
public CountDownLatch(int count)

// 等待计数器归零,可被中断
public void await() throws InterruptedException

// 带超时的等待
public boolean await(long timeout, TimeUnit unit)

// 计数减1
public void countDown()

// 获取当前计数值
public long getCount()
应用场景
  1. 主线程等待所有子线程完成:并行计算、批量任务处理
  2. 服务启动检查:等待所有必要服务就绪
  3. 资源初始化:等待所有资源加载完成

2.2 CyclicBarrier - 循环栅栏

核心概念

CyclicBarrier 让一组线程互相等待,直到所有线程都到达某个屏障点后才继续执行。与 CountDownLatch 不同,它是可循环使用的。

关键特点
  1. 可重用:计数器可重置,支持重复使用
  2. 双向等待:所有线程互相等待
  3. 支持回调:可设置栅栏动作
核心方法
// 构造方法,指定参与线程数
public CyclicBarrier(int parties)

// 带栅栏动作的构造方法
public CyclicBarrier(int parties, Runnable barrierAction)

// 到达栅栏并等待其他线程
public int await() throws InterruptedException, BrokenBarrierException

// 带超时的等待
public int await(long timeout, TimeUnit unit)

// 获取参与线程数
public int getParties()

// 检查栅栏是否破损
public boolean isBroken()

// 重置栅栏
public void reset()
应用场景
  1. 多阶段任务:多个线程分阶段执行
  2. 并行计算合并:等待所有线程完成某阶段后合并结果
  3. 批量数据处理:分批处理、批量提交
BrokenBarrierException

当栅栏被打破时(线程被中断、超时或重置),等待的线程会抛出此异常:

// 栅栏被打破的情况:
// 1. 某个等待线程被中断
// 2. 某个等待线程超时
// 3. 栅栏被显式 reset()
// 4. 栅栏动作抛出异常

2.3 Semaphore - 信号量

核心概念

Semaphore 通过许可证机制控制同时访问某个资源的线程数量。线程在访问资源前获取许可证,使用完毕后释放许可证。

关键特点
  1. 资源限制:精确控制并发访问数量
  2. 公平性可选:支持公平和非公平模式
  3. 可重入性:同一线程可多次获取许可证
核心方法
// 构造方法,指定许可数量
public Semaphore(int permits)

// 指定公平性
public Semaphore(int permits, boolean fair)

// 获取一个许可(阻塞)
public void acquire() throws InterruptedException

// 获取多个许可
public void acquire(int permits) throws InterruptedException

// 尝试获取许可(非阻塞)
public boolean tryAcquire()

// 释放许可
public void release()

// 释放多个许可
public void release(int permits)

// 获取可用许可数
public int availablePermits()
应用场景
  1. 资源池:数据库连接池、线程池控制
  2. 限流:控制并发请求量
  3. 互斥锁:许可数为1时实现互斥

2.4 Exchanger - 交换器

核心概念

Exchanger 提供一个同步点,两个线程在此交换数据。一个线程到达交换点后等待另一个线程,两者都到达后交换数据并继续执行。

核心方法
// 交换数据(阻塞)
public V exchange(V x) throws InterruptedException

// 带超时的交换
public V exchange(V x, long timeout, TimeUnit unit)
应用场景
  1. 生产者-消费者:数据交换
  2. 遗传算法:配对交叉
  3. 缓冲区切换:双缓冲技术

2.5 Phaser - 阶段器

核心概念

Phaser 是 JDK 7 引入的更灵活的同步屏障,支持动态注册和注销参与者,适合分阶段任务。

核心方法
// 构造方法
public Phaser()
public Phaser(int parties)

// 注册参与者
public int register()
public int bulkRegister(int parties)

// 到达并等待其他参与者
public int arriveAndAwaitAdvance()

// 到达并注销
public int arriveAndDeregister()

// 仅到达,不等待
public int arrive()

// 等待下一阶段
public int awaitAdvance(int phase)

// 获取当前阶段
public int getPhase()

// 获取参与者数量
public int getRegisteredParties()

// 设置终止回调
protected boolean onAdvance(int phase, int registeredParties)
应用场景
  1. 动态任务:参与者数量可变的任务
  2. 多阶段处理:不确定阶段数的流程
  3. 分治算法:递归任务分解

三、代码示例

3.1 CountDownLatch 示例

示例1:主线程等待子线程完成
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo {
    
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("任务 " + taskId + " 开始执行");
                    Thread.sleep((long) (Math.random() * 3000));
                    System.out.println("任务 " + taskId + " 执行完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 计数减1
                }
            });
        }
        
        System.out.println("主线程等待所有任务完成...");
        latch.await(); // 等待计数归零
        System.out.println("所有任务已完成,主线程继续执行");
        
        executor.shutdown();
    }
}

// 输出示例:
// 主线程等待所有任务完成...
// 任务 0 开始执行
// 任务 1 开始执行
// 任务 2 开始执行
// 任务 3 开始执行
// 任务 4 开始执行
// 任务 2 执行完成
// 任务 0 执行完成
// 任务 4 执行完成
// 任务 1 执行完成
// 任务 3 执行完成
// 所有任务已完成,主线程继续执行
示例2:服务启动检查
import java.util.concurrent.CountDownLatch;

public class ServiceStartupDemo {
    
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        
        // 启动数据库服务
        new Thread(() -> {
            System.out.println("正在启动数据库服务...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("数据库服务启动完成");
            latch.countDown();
        }).start();
        
        // 启动缓存服务
        new Thread(() -> {
            System.out.println("正在启动缓存服务...");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("缓存服务启动完成");
            latch.countDown();
        }).start();
        
        // 启动消息队列服务
        new Thread(() -> {
            System.out.println("正在启动消息队列服务...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("消息队列服务启动完成");
            latch.countDown();
        }).start();
        
        // 等待所有服务启动完成
        System.out.println("等待所有服务启动...");
        latch.await();
        System.out.println("所有服务启动完成,应用开始提供服务");
    }
}

3.2 CyclicBarrier 示例

示例1:多阶段任务处理
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    
    public static void main(String[] args) {
        int threadCount = 3;
        
        // 创建栅栏,所有线程到达后执行合并动作
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("=== 所有线程已完成本阶段,开始下一阶段 ===\n");
        });
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 第一阶段
                    System.out.println("线程 " + threadId + " 执行第一阶段");
                    Thread.sleep((long) (Math.random() * 1000));
                    barrier.await();
                    
                    // 第二阶段
                    System.out.println("线程 " + threadId + " 执行第二阶段");
                    Thread.sleep((long) (Math.random() * 1000));
                    barrier.await();
                    
                    // 第三阶段
                    System.out.println("线程 " + threadId + " 执行第三阶段");
                    Thread.sleep((long) (Math.random() * 1000));
                    barrier.await();
                    
                    System.out.println("线程 " + threadId + " 全部完成");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

// 输出示例:
// 线程 0 执行第一阶段
// 线程 1 执行第一阶段
// 线程 2 执行第一阶段
// === 所有线程已完成本阶段,开始下一阶段 ===
//
// 线程 2 执行第二阶段
// 线程 0 执行第二阶段
// 线程 1 执行第二阶段
// === 所有线程已完成本阶段,开始下一阶段 ===
//
// 线程 1 执行第三阶段
// 线程 2 执行第三阶段
// 线程 0 执行第三阶段
// === 所有线程已完成本阶段,开始下一阶段 ===
//
// 线程 0 全部完成
// 线程 1 全部完成
// 线程 2 全部完成
示例2:并行计算合并
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class ParallelComputeDemo {
    
    private static final int THREAD_COUNT = 4;
    private static final int[] data = new int[100];
    private static final int[] partialSums = new int[THREAD_COUNT];
    private static final AtomicInteger totalSum = new AtomicInteger(0);
    
    public static void main(String[] args) {
        // 初始化数据
        for (int i = 0; i < data.length; i++) {
            data[i] = i + 1;
        }
        
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
            // 合并各线程的部分和
            int sum = 0;
            for (int partialSum : partialSums) {
                sum += partialSum;
            }
            totalSum.set(sum);
            System.out.println("所有部分和已合并,总和: " + sum);
        });
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadId = i;
            new Thread(() -> {
                int start = threadId * 25;
                int end = start + 25;
                int sum = 0;
                
                // 计算部分和
                for (int j = start; j < end; j++) {
                    sum += data[j];
                }
                partialSums[threadId] = sum;
                System.out.println("线程 " + threadId + " 部分和: " + sum);
                
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
        
        // 等待计算完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("最终结果: " + totalSum.get());
        // 最终结果应为 5050 (1+2+...+100)
    }
}

3.3 Semaphore 示例

示例1:资源池控制
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ResourcePoolDemo {
    
    public static void main(String[] args) {
        int poolSize = 3;  // 资源池大小
        int threadCount = 10;  // 请求线程数
        
        Semaphore semaphore = new Semaphore(poolSize);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            executor.submit(() -> {
                try {
                    // 获取许可
                    semaphore.acquire();
                    System.out.println("线程 " + threadId + " 获取资源,剩余资源: " 
                        + semaphore.availablePermits());
                    
                    // 模拟资源使用
                    Thread.sleep(2000);
                    
                    System.out.println("线程 " + threadId + " 释放资源");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 释放许可
                    semaphore.release();
                }
            });
        }
        
        executor.shutdown();
    }
}

// 输出示例(资源限制生效,最多同时3个线程访问):
// 线程 0 获取资源,剩余资源: 2
// 线程 1 获取资源,剩余资源: 1
// 线程 2 获取资源,剩余资源: 0
// 线程 0 释放资源
// 线程 3 获取资源,剩余资源: 0
// 线程 1 释放资源
// 线程 4 获取资源,剩余资源: 0
// ...
示例2:公平与非公平信号量
import java.util.concurrent.Semaphore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FairSemaphoreDemo {
    
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 非公平信号量 ===");
        testSemaphore(false);
        
        Thread.sleep(1000);
        
        System.out.println("\n=== 公平信号量 ===");
        testSemaphore(true);
    }
    
    private static void testSemaphore(boolean fair) throws InterruptedException {
        Semaphore semaphore = new Semaphore(1, fair);
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            executor.submit(() -> {
                try {
                    semaphore.acquire();
                    System.out.println("线程 " + threadId + " 获取锁");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release();
                }
            });
        }
        
        executor.shutdown();
        Thread.sleep(2000);
    }
}
示例3:简单限流器
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class RateLimiter {
    
    private final Semaphore semaphore;
    private final AtomicInteger acquired = new AtomicInteger(0);
    
    public RateLimiter(int maxConcurrent) {
        this.semaphore = new Semaphore(maxConcurrent);
    }
    
    public boolean tryAcquire() {
        boolean success = semaphore.tryAcquire();
        if (success) {
            acquired.incrementAndGet();
        }
        return success;
    }
    
    public void release() {
        semaphore.release();
    }
    
    public int getAcquired() {
        return acquired.get();
    }
    
    public static void main(String[] args) throws InterruptedException {
        RateLimiter limiter = new RateLimiter(3);  // 最大并发3
        
        // 模拟10个请求
        for (int i = 0; i < 10; i++) {
            final int requestId = i;
            new Thread(() -> {
                if (limiter.tryAcquire()) {
                    try {
                        System.out.println("请求 " + requestId + " 通过");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        limiter.release();
                        System.out.println("请求 " + requestId + " 完成");
                    }
                } else {
                    System.out.println("请求 " + requestId + " 被限流");
                }
            }).start();
            Thread.sleep(100);
        }
    }
}

3.4 Exchanger 示例

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        // 线程A:生产数据
        new Thread(() -> {
            try {
                String data = "来自A的数据";
                System.out.println("线程A准备交换: " + data);
                Thread.sleep(1000);
                
                String received = exchanger.exchange(data);
                System.out.println("线程A收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        // 线程B:处理数据
        new Thread(() -> {
            try {
                String data = "来自B的数据";
                System.out.println("线程B准备交换: " + data);
                Thread.sleep(2000);
                
                String received = exchanger.exchange(data);
                System.out.println("线程B收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

// 输出:
// 线程A准备交换: 来自A的数据
// 线程B准备交换: 来自B的数据
// 线程A收到: 来自B的数据
// 线程B收到: 来自A的数据
双缓冲示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class DoubleBufferDemo {
    
    public static void main(String[] args) {
        Exchanger<List<Integer>> exchanger = new Exchanger<>();
        
        // 生产者:填充缓冲区
        new Thread(() -> {
            List<Integer> buffer = new ArrayList<>();
            for (int i = 0; i < 5; i++) {
                buffer.add(i);
                System.out.println("生产者添加: " + i);
                
                if (buffer.size() == 3) {
                    try {
                        buffer = exchanger.exchange(buffer);  // 交换缓冲区
                        System.out.println("生产者交换缓冲区");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
        
        // 消费者:处理缓冲区
        new Thread(() -> {
            List<Integer> buffer = new ArrayList<>();
            while (true) {
                try {
                    buffer = exchanger.exchange(buffer);  // 等待交换
                    System.out.println("消费者收到: " + buffer);
                    buffer.clear();  // 清空缓冲区
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }).start();
    }
}

3.5 Phaser 示例

import java.util.concurrent.Phaser;

public class PhaserDemo {
    
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);  // 初始3个参与者
        
        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                // 第一阶段
                System.out.println("线程 " + threadId + " 完成第一阶段,阶段号: " 
                    + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                
                // 第二阶段
                System.out.println("线程 " + threadId + " 完成第二阶段,阶段号: " 
                    + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                
                // 第三阶段
                System.out.println("线程 " + threadId + " 完成第三阶段,阶段号: " 
                    + phaser.getPhase());
                phaser.arriveAndDeregister();  // 注销参与者
            }).start();
        }
        
        // 等待所有阶段完成
        while (!phaser.isTerminated()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("所有阶段完成");
    }
}
动态注册示例
import java.util.concurrent.Phaser;

public class DynamicPhaserDemo {
    
    public static void main(String[] args) {
        Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("阶段 " + phase + " 完成,参与者: " + registeredParties);
                return phase >= 2 || registeredParties == 0;  // 终止条件
            }
        };
        
        // 主线程注册
        phaser.register();
        
        // 动态创建任务
        for (int i = 0; i < 3; i++) {
            phaser.register();  // 动态注册
            final int taskId = i;
            new Thread(() -> {
                System.out.println("任务 " + taskId + " 开始阶段0");
                phaser.arriveAndAwaitAdvance();
                
                System.out.println("任务 " + taskId + " 开始阶段1");
                phaser.arriveAndAwaitAdvance();
                
                System.out.println("任务 " + taskId + " 开始阶段2");
                phaser.arriveAndDeregister();  // 完成并注销
            }).start();
        }
        
        // 主线程也参与
        phaser.arriveAndDeregister();
        
        // 等待完成
        while (!phaser.isTerminated()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("所有任务完成");
    }
}

四、实战应用场景

4.1 并行任务汇总

import java.util.concurrent.*;
import java.util.*;

public class ParallelAggregation {
    
    public static void main(String[] args) throws InterruptedException {
        // 模拟并行处理大数据集
        List<Integer> data = new ArrayList<>();
        for (int i = 1; i <= 1000; i++) {
            data.add(i);
        }
        
        int threadCount = 4;
        CountDownLatch latch = new CountDownLatch(threadCount);
        List<Future<Map<String, Integer>>> futures = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        int chunkSize = data.size() / threadCount;
        
        for (int i = 0; i < threadCount; i++) {
            int start = i * chunkSize;
            int end = (i == threadCount - 1) ? data.size() : start + chunkSize;
            List<Integer> chunk = data.subList(start, end);
            
            futures.add(executor.submit(() -> {
                try {
                    Map<String, Integer> stats = new HashMap<>();
                    stats.put("sum", 0);
                    stats.put("max", Integer.MIN_VALUE);
                    stats.put("min", Integer.MAX_VALUE);
                    stats.put("count", 0);
                    
                    for (int num : chunk) {
                        stats.merge("sum", num, Integer::sum);
                        stats.put("max", Math.max(stats.get("max"), num));
                        stats.put("min", Math.min(stats.get("min"), num));
                        stats.merge("count", 1, Integer::sum);
                    }
                    
                    return stats;
                } finally {
                    latch.countDown();
                }
            }));
        }
        
        latch.await();
        
        // 合并结果
        int totalSum = 0;
        int globalMax = Integer.MIN_VALUE;
        int globalMin = Integer.MAX_VALUE;
        int totalCount = 0;
        
        for (Future<Map<String, Integer>> future : futures) {
            try {
                Map<String, Integer> stats = future.get();
                totalSum += stats.get("sum");
                globalMax = Math.max(globalMax, stats.get("max"));
                globalMin = Math.min(globalMin, stats.get("min"));
                totalCount += stats.get("count");
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        
        System.out.println("统计结果:");
        System.out.println("总和: " + totalSum);
        System.out.println("最大值: " + globalMax);
        System.out.println("最小值: " + globalMin);
        System.out.println("数量: " + totalCount);
        System.out.println("平均值: " + (double) totalSum / totalCount);
        
        executor.shutdown();
    }
}

4.2 数据库连接池模拟

import java.util.concurrent.Semaphore;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ConnectionPool {
    
    private final BlockingQueue<Connection> pool;
    private final Semaphore semaphore;
    
    public ConnectionPool(int poolSize) {
        this.pool = new ArrayBlockingQueue<>(poolSize);
        this.semaphore = new Semaphore(poolSize);
        
        // 初始化连接
        for (int i = 0; i < poolSize; i++) {
            pool.offer(new Connection("Connection-" + i));
        }
    }
    
    public Connection getConnection(long timeout, TimeUnit unit) 
            throws InterruptedException {
        if (!semaphore.tryAcquire(timeout, unit)) {
            throw new RuntimeException("获取连接超时");
        }
        
        Connection conn = pool.poll();
        if (conn != null) {
            conn.setInUse(true);
            return conn;
        }
        
        semaphore.release();
        throw new RuntimeException("连接池为空");
    }
    
    public void releaseConnection(Connection conn) {
        if (conn != null) {
            conn.setInUse(false);
            pool.offer(conn);
            semaphore.release();
        }
    }
    
    public int getAvailableConnections() {
        return semaphore.availablePermits();
    }
    
    // 简单连接对象
    static class Connection {
        private final String name;
        private boolean inUse;
        
        public Connection(String name) {
            this.name = name;
            this.inUse = false;
        }
        
        public String getName() { return name; }
        public boolean isInUse() { return inUse; }
        public void setInUse(boolean inUse) { this.inUse = inUse; }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ConnectionPool pool = new ConnectionPool(3);
        
        // 模拟5个线程竞争3个连接
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    Connection conn = pool.getConnection(5, TimeUnit.SECONDS);
                    System.out.println("线程 " + threadId + " 获取: " + conn.getName() 
                        + ", 剩余: " + pool.getAvailableConnections());
                    
                    Thread.sleep(2000);
                    
                    pool.releaseConnection(conn);
                    System.out.println("线程 " + threadId + " 释放: " + conn.getName() 
                        + ", 剩余: " + pool.getAvailableConnections());
                } catch (InterruptedException e) {
                    System.out.println("线程 " + threadId + " 获取连接超时");
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

4.3 多阶段数据处理管道

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.*;

public class DataProcessingPipeline {
    
    private static final int STAGES = 4;
    private static final int WORKERS = 3;
    
    public static void main(String[] args) {
        // 每个阶段的数据
        List<List<String>> stageData = new ArrayList<>();
        for (int i = 0; i < STAGES; i++) {
            stageData.add(Collections.synchronizedList(new ArrayList<>()));
        }
        
        CyclicBarrier barrier = new CyclicBarrier(WORKERS, () -> {
            int currentStage = (int) (System.currentTimeMillis() % STAGES);
            System.out.println("\n=== 阶段转换,处理数据 ===");
        });
        
        for (int i = 0; i < WORKERS; i++) {
            final int workerId = i;
            new Thread(() -> {
                try {
                    for (int stage = 0; stage < STAGES; stage++) {
                        // 模拟数据处理
                        String result = "Worker-" + workerId + "-Stage" + stage;
                        stageData.get(stage).add(result);
                        System.out.println(result + " 处理完成");
                        
                        barrier.await();
                    }
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

4.4 柔性并发控制

import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;

public class FlexibleConcurrency {
    
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1) {  // 主线程注册
            @Override
            protected boolean onAdvance(int phase, int parties) {
                System.out.println("阶段 " + phase + " 完成,参与人数: " + parties);
                return phase >= 5;  // 5个阶段后终止
            }
        };
        
        // 动态创建任务
        for (int i = 0; i < 10; i++) {
            phaser.register();  // 动态注册
            
            new Thread(() -> {
                try {
                    while (!phaser.isTerminated()) {
                        // 模拟工作
                        Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
                        
                        // 随机决定是否继续
                        if (ThreadLocalRandom.current().nextDouble() < 0.2) {
                            System.out.println(Thread.currentThread().getName() + " 退出");
                            phaser.arriveAndDeregister();
                            break;
                        } else {
                            phaser.arriveAndAwaitAdvance();
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Task-" + i).start();
        }
        
        // 主线程注销
        phaser.arriveAndDeregister();
        
        // 等待终止
        while (!phaser.isTerminated()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("所有任务完成");
    }
}

五、工具类对比

特性 CountDownLatch CyclicBarrier Semaphore Phaser
可重用
等待方向 单向(等待者) 双向(互相等待) 资源获取 分阶段
参与者动态
阶段概念 有(隐式) 有(显式)
回调支持
适用场景 一次性等待 循环屏障 资源限制 动态任务

选择建议

  1. CountDownLatch:一次性事件,如初始化、批量任务完成
  2. CyclicBarrier:重复的固定参与者任务,如分阶段计算
  3. Semaphore:资源限制,如连接池、限流
  4. Phaser:动态参与者的多阶段任务

六、总结与最佳实践

最佳实践

  1. CountDownLatch 使用要点

    • 确保 countDown() 在 finally 块中调用
    • 注意计数器不能为负数
    • 考虑使用 await(timeout, unit) 避免无限等待
  2. CyclicBarrier 使用要点

    • 处理 BrokenBarrierException
    • 栅栏动作不宜耗时过长
    • 确保所有线程都能到达栅栏点
  3. Semaphore 使用要点

    • 获取和释放必须配对
    • 公平模式可能影响性能
    • 注意许可证数量设置
  4. Phaser 使用要点

    • 合理设置终止条件
    • 动态注册时注意线程安全
    • 覆盖 onAdvance 实现自定义逻辑

常见陷阱

  1. 忘记释放资源:使用 try-finally 确保释放
  2. 死锁:避免在持有锁时等待其他资源
  3. 计数错误:确保 countDown 和 acquire 次数匹配
  4. 中断处理:正确处理 InterruptedException

性能考虑

  1. 高竞争场景:考虑使用公平模式
  2. 大量线程:避免过度创建线程
  3. 超时设置:避免无限等待
  4. 资源泄漏:及时释放许可和注销参与者

并发工具类是 Java 并发编程的重要组成部分,正确使用这些工具可以大大简化多线程编程的复杂性。选择合适的工具类,遵循最佳实践,能够构建出高效、可靠的多线程应用。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐