18-Java语言核心-并发编程-并发模型详解
·
并发模型详解
一、知识概述
并发模型是指多线程程序中线程之间协作和通信的模式。不同的并发模型适用于不同的应用场景,选择合适的模型可以显著提高程序的性能和可维护性。Java 中常见的并发模型包括:
- 生产者-消费者模型:解耦数据生产和消费
- 读写分离模型:提高读多写少场景的并发性能
- 工作窃取模型:提高多核 CPU 利用率
- Actor 模型:基于消息传递的并发模型
- Fork/Join 模型:分治任务的并行处理
这些模型是构建高并发系统的理论基础,理解它们有助于设计出高效、可扩展的并发应用。
二、知识点详细讲解
2.1 生产者-消费者模型
核心概念
生产者-消费者模型是最经典的并发模型之一,它将数据的产生和处理分离到不同的线程中执行:
- 生产者:负责生成数据并放入缓冲区
- 消费者:从缓冲区取出数据并处理
- 缓冲区:解耦生产者和消费者,平衡速度差异
关键特点
- 解耦:生产者和消费者互不直接依赖
- 缓冲:平衡生产和消费速度差异
- 异步:生产和消费可以并行执行
实现方式
- 阻塞队列:使用 BlockingQueue 作为缓冲区
- wait/notify:使用 Object 的等待/通知机制
- Condition:使用 Lock 的条件变量
2.2 读写分离模型
核心概念
读写分离模型针对读多写少的场景,允许多个读操作同时进行,但写操作必须独占访问。
关键特点
- 读读并发:多个读操作可以同时执行
- 写写互斥:写操作之间互斥
- 读写互斥:读写操作之间互斥
实现方式
- ReadWriteLock:标准的读写锁实现
- StampedLock:支持乐观读的高性能读写锁
- CopyOnWrite:写时复制,适用于读远多于写的场景
2.3 工作窃取模型
核心概念
工作窃取模型是一种任务调度策略:
- 每个工作线程维护自己的任务队列
- 空闲线程从其他线程的队列末尾"窃取"任务
- 实现负载均衡,提高 CPU 利用率
关键特点
- 负载均衡:自动平衡各线程的工作量
- 减少竞争:每个线程主要操作自己的队列
- 高效利用:避免线程空闲等待
实现方式
- ForkJoinPool:JDK 7 引入的工作窃取线程池
- 双端队列:使用 Deque 作为任务队列
2.4 Actor 模型
核心概念
Actor 模型是一种基于消息传递的并发模型:
- Actor:独立的并发实体,封装状态和行为
- 消息传递:Actor 之间通过消息通信
- 无共享状态:避免共享内存带来的并发问题
关键特点
- 无锁编程:不使用锁进行同步
- 消息驱动:通过消息触发行为
- 位置透明:Actor 可以分布在不同节点
实现框架
- Akka:Scala/Java 平台著名的 Actor 框架
- Quasar:Java 平台的轻量级 Actor 实现
2.5 Fork/Join 模型
核心概念
Fork/Join 模型是分治思想的并行实现:
- Fork:将大任务分解为小任务
- Join:合并小任务的结果
关键特点
- 递归分解:任务可以递归分解
- 工作窃取:使用工作窃取调度
- 高效并行:充分利用多核 CPU
实现方式
- ForkJoinPool:专用线程池
- RecursiveTask:有返回值的递归任务
- RecursiveAction:无返回值的递归任务
三、代码示例
3.1 生产者-消费者模型
示例1:使用阻塞队列
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
Thread producer = new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
String item = "Item-" + i;
queue.put(item); // 队列满时阻塞
System.out.println("生产: " + item + ", 队列大小: " + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("生产者结束");
});
// 消费者
Thread consumer = new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
String item = queue.take(); // 队列空时阻塞
System.out.println("消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("消费者结束");
});
producer.start();
consumer.start();
}
}
示例2:多生产者多消费者
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiProducerConsumer {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
AtomicInteger counter = new AtomicInteger(0);
// 3个生产者
for (int i = 0; i < 3; i++) {
final int producerId = i;
new Thread(() -> {
try {
while (true) {
int item = counter.incrementAndGet();
queue.put(item);
System.out.println("生产者-" + producerId + " 生产: " + item);
Thread.sleep((long) (Math.random() * 500));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// 2个消费者
for (int i = 0; i < 2; i++) {
final int consumerId = i;
new Thread(() -> {
try {
while (true) {
int item = queue.take();
System.out.println("消费者-" + consumerId + " 消费: " + item);
Thread.sleep((long) (Math.random() * 800));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
示例3:使用 wait/notify 实现
import java.util.LinkedList;
import java.util.Queue;
public class WaitNotifyBuffer {
private final Queue<String> queue = new LinkedList<>();
private final int capacity;
public WaitNotifyBuffer(int capacity) {
this.capacity = capacity;
}
public synchronized void put(String item) throws InterruptedException {
// 缓冲区满时等待
while (queue.size() == capacity) {
System.out.println("缓冲区满,生产者等待");
wait();
}
queue.offer(item);
System.out.println("生产: " + item + ", 大小: " + queue.size());
notifyAll(); // 通知消费者
}
public synchronized String take() throws InterruptedException {
// 缓冲区空时等待
while (queue.isEmpty()) {
System.out.println("缓冲区空,消费者等待");
wait();
}
String item = queue.poll();
System.out.println("消费: " + item + ", 大小: " + queue.size());
notifyAll(); // 通知生产者
return item;
}
public static void main(String[] args) {
WaitNotifyBuffer buffer = new WaitNotifyBuffer(5);
// 生产者
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
buffer.put("Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// 消费者
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
buffer.take();
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
示例4:使用 Condition 实现
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionBuffer<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public ConditionBuffer(int capacity) {
this.capacity = capacity;
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println("缓冲区满,等待...");
notFull.await();
}
queue.offer(item);
System.out.println("生产: " + item);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("缓冲区空,等待...");
notEmpty.await();
}
T item = queue.poll();
System.out.println("消费: " + item);
notFull.signal(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ConditionBuffer<String> buffer = new ConditionBuffer<>(5);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
buffer.put("Item-" + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
buffer.take();
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
3.2 读写分离模型
示例1:使用 ReadWriteLock
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public V get(K key) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取: " + key);
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入: " + key + " = " + value);
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public void remove(K key) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 删除: " + key);
cache.remove(key);
} finally {
lock.writeLock().unlock();
}
}
public static void main(String[] args) {
ReadWriteCache<String, String> cache = new ReadWriteCache<>();
// 写线程
new Thread(() -> {
cache.put("key1", "value1");
cache.put("key2", "value2");
cache.put("key3", "value3");
}, "Writer").start();
// 读线程
for (int i = 0; i < 5; i++) {
new Thread(() -> {
cache.get("key1");
cache.get("key2");
cache.get("key3");
}, "Reader-" + i).start();
}
}
}
示例2:使用 StampedLock 乐观读
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.StampedLock;
public class StampedLockCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final StampedLock lock = new StampedLock();
public V get(K key) {
// 乐观读
long stamp = lock.tryOptimisticRead();
V value = cache.get(key);
if (!lock.validate(stamp)) {
// 乐观读失败,获取读锁
stamp = lock.readLock();
try {
value = cache.get(key);
} finally {
lock.unlockRead(stamp);
}
}
return value;
}
public V getWithReadLock(K key) {
long stamp = lock.readLock();
try {
return cache.get(key);
} finally {
lock.unlockRead(stamp);
}
}
public void put(K key, V value) {
long stamp = lock.writeLock();
try {
cache.put(key, value);
} finally {
lock.unlockWrite(stamp);
}
}
public V computeIfAbsent(K key, java.util.function.Function<K, V> function) {
// 先乐观读
long stamp = lock.tryOptimisticRead();
V value = cache.get(key);
if (!lock.validate(stamp)) {
// 转换为读锁
stamp = lock.readLock();
try {
value = cache.get(key);
if (value == null) {
// 升级为写锁(需要先释放读锁)
long writeStamp = lock.tryConvertToWriteLock(stamp);
if (writeStamp == 0) {
lock.unlockRead(stamp);
writeStamp = lock.writeLock();
}
stamp = writeStamp;
try {
// 双重检查
value = cache.get(key);
if (value == null) {
value = function.apply(key);
cache.put(key, value);
}
} finally {
lock.unlockWrite(stamp);
return value;
}
}
} finally {
if (lock.isReadLocked()) {
lock.unlockRead(stamp);
}
}
}
return value;
}
public static void main(String[] args) {
StampedLockCache<String, Integer> cache = new StampedLockCache<>();
// 并发读取
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
Integer value = cache.get("key");
System.out.println(Thread.currentThread().getName() + " 读取: " + value);
}
}, "Reader-" + i).start();
}
// 写入
new Thread(() -> {
for (int i = 0; i < 3; i++) {
cache.put("key", i);
System.out.println("写入: key = " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Writer").start();
}
}
示例3:CopyOnWrite 写时复制
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CopyOnWriteDemo {
public static void main(String[] args) throws InterruptedException {
List<String> list = new CopyOnWriteArrayList<>();
// 初始化数据
for (int i = 0; i < 5; i++) {
list.add("Item-" + i);
}
ExecutorService executor = Executors.newFixedThreadPool(10);
// 读线程
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
// 遍历不会受写操作影响
for (String item : list) {
System.out.println(Thread.currentThread().getName() + " 读取: " + item);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
// 写线程
for (int i = 5; i < 10; i++) {
final String item = "Item-" + i;
executor.submit(() -> {
list.add(item);
System.out.println(Thread.currentThread().getName() + " 添加: " + item);
});
}
executor.shutdown();
Thread.sleep(2000);
System.out.println("\n最终列表: " + list);
}
}
3.3 工作窃取模型
示例1:ForkJoinPool 基本使用
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ThreadLocalRandom;
public class ForkJoinDemo {
// 递归任务:计算数组元素和
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start;
private final int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 大任务分解
int mid = start + length / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// 左任务异步执行(放入队列)
leftTask.fork();
// 右任务在当前线程执行
long rightResult = rightTask.compute();
// 等待左任务完成
long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
public static void main(String[] args) {
// 准备数据
int[] array = new int[10000];
for (int i = 0; i < array.length; i++) {
array[i] = ThreadLocalRandom.current().nextInt(100);
}
// 使用 ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
Long result = pool.invoke(new SumTask(array, 0, array.length));
long endTime = System.currentTimeMillis();
System.out.println("Fork/Join 结果: " + result);
System.out.println("耗时: " + (endTime - startTime) + " ms");
// 验证结果
long expected = 0;
for (int num : array) {
expected += num;
}
System.out.println("验证结果: " + expected);
pool.shutdown();
}
}
示例2:并行归并排序
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ParallelMergeSort {
static class MergeSortTask extends RecursiveAction {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int[] temp;
private final int start;
private final int end;
public MergeSortTask(int[] array, int[] temp, int start, int end) {
this.array = array;
this.temp = temp;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// 小任务使用普通排序
Arrays.sort(array, start, end);
return;
}
int mid = start + (end - start) / 2;
// 并行分解
MergeSortTask leftTask = new MergeSortTask(array, temp, start, mid);
MergeSortTask rightTask = new MergeSortTask(array, temp, mid, end);
invokeAll(leftTask, rightTask);
// 合并
merge(start, mid, end);
}
private void merge(int start, int mid, int end) {
System.arraycopy(array, start, temp, start, end - start);
int i = start, j = mid, k = start;
while (i < mid && j < end) {
if (temp[i] <= temp[j]) {
array[k++] = temp[i++];
} else {
array[k++] = temp[j++];
}
}
while (i < mid) {
array[k++] = temp[i++];
}
while (j < end) {
array[k++] = temp[j++];
}
}
}
public static void main(String[] args) {
int[] array = new int[100000];
for (int i = 0; i < array.length; i++) {
array[i] = (int) (Math.random() * 100000);
}
int[] temp = new int[array.length];
System.out.println("开始并行归并排序...");
long startTime = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new MergeSortTask(array, temp, 0, array.length));
long endTime = System.currentTimeMillis();
System.out.println("排序完成,耗时: " + (endTime - startTime) + " ms");
// 验证排序结果
boolean sorted = true;
for (int i = 1; i < array.length; i++) {
if (array[i] < array[i - 1]) {
sorted = false;
break;
}
}
System.out.println("排序正确: " + sorted);
pool.shutdown();
}
}
示例3:工作窃取可视化
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class WorkStealingDemo {
private static final AtomicInteger taskId = new AtomicInteger(0);
static class WorkTask extends RecursiveTask<Integer> {
private final int id;
private final int workload;
public WorkTask(int workload) {
this.id = taskId.incrementAndGet();
this.workload = workload;
}
@Override
protected Integer compute() {
String threadName = Thread.currentThread().getName();
System.out.println("任务 " + id + " 在 " + threadName + " 开始执行");
if (workload <= 1) {
// 小任务直接执行
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务 " + id + " 在 " + threadName + " 完成");
return workload;
}
// 分解任务
WorkTask task1 = new WorkTask(workload / 2);
WorkTask task2 = new WorkTask(workload - workload / 2);
task1.fork();
int result2 = task2.compute();
int result1 = task1.join();
return result1 + result2;
}
}
public static void main(String[] args) {
// 使用4个工作线程
ForkJoinPool pool = new ForkJoinPool(4);
// 创建不平衡的任务
WorkTask mainTask = new WorkTask(16);
System.out.println("=== 工作窃取演示 ===");
pool.invoke(mainTask);
System.out.println("\n=== 线程池统计 ===");
System.out.println("并行度: " + pool.getParallelism());
System.out.println("活跃线程数: " + pool.getActiveThreadCount());
System.out.println("已窃取任务数: " + pool.getStealCount());
pool.shutdown();
}
}
3.4 Actor 模型
示例1:简单 Actor 实现
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class SimpleActorDemo {
// Actor 基类
static abstract class Actor implements Runnable {
protected final BlockingQueue<Object> mailbox = new LinkedBlockingQueue<>();
protected final AtomicBoolean running = new AtomicBoolean(true);
protected final String name;
public Actor(String name) {
this.name = name;
}
public void send(Object message) {
mailbox.offer(message);
}
public void stop() {
running.set(false);
}
@Override
public void run() {
System.out.println(name + " 启动");
while (running.get() || !mailbox.isEmpty()) {
try {
Object message = mailbox.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
if (message != null) {
onReceive(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(name + " 停止");
}
protected abstract void onReceive(Object message);
}
// 打印 Actor
static class PrintActor extends Actor {
public PrintActor() {
super("PrintActor");
}
@Override
protected void onReceive(Object message) {
System.out.println("打印: " + message);
}
}
// 计数 Actor
static class CounterActor extends Actor {
private int count = 0;
public CounterActor() {
super("CounterActor");
}
@Override
protected void onReceive(Object message) {
if (message instanceof String) {
String msg = (String) message;
if ("increment".equals(msg)) {
count++;
System.out.println(name + " 计数: " + count);
} else if ("reset".equals(msg)) {
count = 0;
System.out.println(name + " 重置");
} else if ("get".equals(msg)) {
System.out.println(name + " 当前计数: " + count);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
PrintActor printActor = new PrintActor();
CounterActor counterActor = new CounterActor();
Thread printThread = new Thread(printActor);
Thread counterThread = new Thread(counterActor);
printThread.start();
counterThread.start();
// 发送消息
for (int i = 0; i < 5; i++) {
printActor.send("消息-" + i);
counterActor.send("increment");
Thread.sleep(100);
}
counterActor.send("get");
Thread.sleep(100);
counterActor.send("reset");
Thread.sleep(500);
printActor.stop();
counterActor.stop();
printThread.join();
counterThread.join();
}
}
示例2:Actor 之间通信
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class ActorCommunicationDemo {
// 消息类
static class Message {
final Object sender;
final Object content;
public Message(Object sender, Object content) {
this.sender = sender;
this.content = content;
}
}
// Actor 接口
interface Actor {
void send(Message message);
void start();
void stop();
}
// 基础 Actor 实现
static abstract class BaseActor implements Actor, Runnable {
protected final BlockingQueue<Message> mailbox = new LinkedBlockingQueue<>();
protected final AtomicBoolean running = new AtomicBoolean(false);
protected final String name;
protected Thread thread;
public BaseActor(String name) {
this.name = name;
}
@Override
public void send(Message message) {
mailbox.offer(message);
}
@Override
public void start() {
if (running.compareAndSet(false, true)) {
thread = new Thread(this, name);
thread.start();
}
}
@Override
public void stop() {
running.set(false);
if (thread != null) {
thread.interrupt();
}
}
@Override
public void run() {
while (running.get()) {
try {
Message message = mailbox.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
if (message != null) {
handleMessage(message);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
protected abstract void handleMessage(Message message);
}
// Ping Actor
static class PingActor extends BaseActor {
private int count = 0;
private Actor pong;
public PingActor() {
super("Ping");
}
public void setPong(Actor pong) {
this.pong = pong;
}
@Override
public void start() {
super.start();
// 开始 ping-pong
if (pong != null) {
pong.send(new Message(this, "ping"));
}
}
@Override
protected void handleMessage(Message message) {
if ("pong".equals(message.content)) {
count++;
System.out.println(name + " 收到 pong (" + count + ")");
if (count < 5) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
pong.send(new Message(this, "ping"));
} else {
System.out.println("Ping-Pong 完成");
stop();
pong.stop();
}
}
}
}
// Pong Actor
static class PongActor extends BaseActor {
public PongActor() {
super("Pong");
}
@Override
protected void handleMessage(Message message) {
if ("ping".equals(message.content)) {
System.out.println(name + " 收到 ping");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
((Actor) message.sender).send(new Message(this, "pong"));
}
}
}
public static void main(String[] args) throws InterruptedException {
PingActor ping = new PingActor();
PongActor pong = new PongActor();
ping.setPong(pong);
ping.start();
pong.start();
// 等待完成
while (ping.running.get() || pong.running.get()) {
Thread.sleep(100);
}
System.out.println("演示结束");
}
}
3.5 Fork/Join 实战
示例1:并行文件搜索
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ParallelFileSearch {
static class SearchTask extends RecursiveTask<List<File>> {
private final File directory;
private final String extension;
private static final int THRESHOLD = 10;
public SearchTask(File directory, String extension) {
this.directory = directory;
this.extension = extension;
}
@Override
protected List<File> compute() {
List<File> result = new ArrayList<>();
File[] files = directory.listFiles();
if (files == null) {
return result;
}
List<SearchTask> subtasks = new ArrayList<>();
for (File file : files) {
if (file.isDirectory()) {
// 创建子任务
SearchTask task = new SearchTask(file, extension);
subtasks.add(task);
} else if (file.getName().endsWith(extension)) {
result.add(file);
}
}
if (subtasks.size() > THRESHOLD) {
// 任务较多时并行执行
for (SearchTask task : subtasks) {
task.fork();
}
for (SearchTask task : subtasks) {
result.addAll(task.join());
}
} else {
// 任务较少时直接计算
for (SearchTask task : subtasks) {
result.addAll(task.compute());
}
}
return result;
}
}
public static void main(String[] args) {
String searchPath = "/usr"; // Linux 示例
String extension = ".txt";
File root = new File(searchPath);
if (!root.exists()) {
searchPath = System.getProperty("user.home");
root = new File(searchPath);
}
System.out.println("搜索路径: " + searchPath);
System.out.println("文件类型: " + extension);
System.out.println("---");
ForkJoinPool pool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
List<File> result = pool.invoke(new SearchTask(root, extension));
long endTime = System.currentTimeMillis();
System.out.println("找到 " + result.size() + " 个文件");
System.out.println("耗时: " + (endTime - startTime) + " ms");
// 显示前10个结果
result.stream().limit(10).forEach(f -> System.out.println(f.getAbsolutePath()));
pool.shutdown();
}
}
示例2:并行矩阵乘法
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ParallelMatrixMultiplication {
static class MatrixMultiplyTask extends RecursiveAction {
private static final int THRESHOLD = 64;
private final int[][] A;
private final int[][] B;
private final int[][] C;
private final int rowStart, rowEnd;
private final int colStart, colEnd;
private final int n;
public MatrixMultiplyTask(int[][] A, int[][] B, int[][] C,
int rowStart, int rowEnd,
int colStart, int colEnd, int n) {
this.A = A;
this.B = B;
this.C = C;
this.rowStart = rowStart;
this.rowEnd = rowEnd;
this.colStart = colStart;
this.colEnd = colEnd;
this.n = n;
}
@Override
protected void compute() {
int rowSize = rowEnd - rowStart;
int colSize = colEnd - colStart;
if (rowSize <= THRESHOLD && colSize <= THRESHOLD) {
// 小任务直接计算
for (int i = rowStart; i < rowEnd; i++) {
for (int j = colStart; j < colEnd; j++) {
C[i][j] = 0;
for (int k = 0; k < n; k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}
return;
}
// 分解任务
int rowMid = rowStart + rowSize / 2;
int colMid = colStart + colSize / 2;
invokeAll(
new MatrixMultiplyTask(A, B, C, rowStart, rowMid, colStart, colMid, n),
new MatrixMultiplyTask(A, B, C, rowStart, rowMid, colMid, colEnd, n),
new MatrixMultiplyTask(A, B, C, rowMid, rowEnd, colStart, colMid, n),
new MatrixMultiplyTask(A, B, C, rowMid, rowEnd, colMid, colEnd, n)
);
}
}
public static void main(String[] args) {
int size = 512;
// 初始化矩阵
int[][] A = new int[size][size];
int[][] B = new int[size][size];
int[][] C = new int[size][size];
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
A[i][j] = (int) (Math.random() * 10);
B[i][j] = (int) (Math.random() * 10);
}
}
System.out.println("矩阵大小: " + size + "x" + size);
// 串行计算
System.out.println("\n串行计算...");
long startTime = System.currentTimeMillis();
multiplySerial(A, B, C, size);
long endTime = System.currentTimeMillis();
System.out.println("串行耗时: " + (endTime - startTime) + " ms");
// 并行计算
System.out.println("\n并行计算...");
startTime = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new MatrixMultiplyTask(A, B, C, 0, size, 0, size, size));
endTime = System.currentTimeMillis();
System.out.println("并行耗时: " + (endTime - startTime) + " ms");
pool.shutdown();
}
private static void multiplySerial(int[][] A, int[][] B, int[][] C, int n) {
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
C[i][j] = 0;
for (int k = 0; k < n; k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}
}
}
四、实战应用场景
4.1 日志处理系统
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class LogProcessingSystem {
// 日志实体
static class LogEntry {
final String level;
final String message;
final long timestamp;
public LogEntry(String level, String message) {
this.level = level;
this.message = message;
this.timestamp = System.currentTimeMillis();
}
@Override
public String toString() {
return "[" + level + "] " + message;
}
}
// 日志生产者
static class LogProducer implements Runnable {
private final BlockingQueue<LogEntry> queue;
private final String name;
private final AtomicLong counter;
public LogProducer(BlockingQueue<LogEntry> queue, String name, AtomicLong counter) {
this.queue = queue;
this.name = name;
this.counter = counter;
}
@Override
public void run() {
String[] levels = {"INFO", "WARN", "ERROR", "DEBUG"};
while (!Thread.currentThread().isInterrupted()) {
try {
String level = levels[(int) (Math.random() * levels.length)];
String message = name + " - Message " + counter.incrementAndGet();
LogEntry entry = new LogEntry(level, message);
queue.offer(entry, 1, TimeUnit.SECONDS);
System.out.println("生产: " + entry);
Thread.sleep((long) (Math.random() * 500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 日志消费者
static class LogConsumer implements Runnable {
private final BlockingQueue<LogEntry> queue;
private final String name;
public LogConsumer(BlockingQueue<LogEntry> queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
LogEntry entry = queue.poll(1, TimeUnit.SECONDS);
if (entry != null) {
// 处理日志(这里简化为打印)
System.out.println(name + " 处理: " + entry);
Thread.sleep(200); // 模拟处理时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<LogEntry> queue = new ArrayBlockingQueue<>(100);
AtomicLong counter = new AtomicLong(0);
ExecutorService executor = Executors.newCachedThreadPool();
// 启动多个生产者
for (int i = 0; i < 3; i++) {
executor.submit(new LogProducer(queue, "Producer-" + i, counter));
}
// 启动多个消费者
for (int i = 0; i < 2; i++) {
executor.submit(new LogConsumer(queue, "Consumer-" + i));
}
// 运行一段时间后停止
Thread.sleep(5000);
executor.shutdownNow();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("\n日志系统停止,剩余日志: " + queue.size());
}
}
4.2 缓存系统
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ConcurrentCache<K, V> {
private final Map<K, V> cache;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<K, Semaphore> keyLocks = new ConcurrentHashMap<>();
// 统计信息
private final AtomicLong hitCount = new AtomicLong();
private final AtomicLong missCount = new AtomicLong();
public ConcurrentCache() {
this.cache = new HashMap<>();
}
public V get(K key) {
lock.readLock().lock();
try {
V value = cache.get(key);
if (value != null) {
hitCount.incrementAndGet();
return value;
}
} finally {
lock.readLock().unlock();
}
missCount.incrementAndGet();
return null;
}
public V getOrCompute(K key, Callable<V> loader) throws Exception {
V value = get(key);
if (value != null) {
return value;
}
// 获取键级别的锁,防止缓存击穿
Semaphore keyLock = keyLocks.computeIfAbsent(key, k -> new Semaphore(1));
keyLock.acquire();
try {
// 双重检查
value = get(key);
if (value != null) {
return value;
}
// 加载值
value = loader.call();
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
return value;
} finally {
keyLock.release();
keyLocks.remove(key);
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public void remove(K key) {
lock.writeLock().lock();
try {
cache.remove(key);
} finally {
lock.writeLock().unlock();
}
}
public void clear() {
lock.writeLock().lock();
try {
cache.clear();
} finally {
lock.writeLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try {
return cache.size();
} finally {
lock.readLock().unlock();
}
}
public double getHitRate() {
long hits = hitCount.get();
long total = hits + missCount.get();
return total == 0 ? 0 : (double) hits / total;
}
public static void main(String[] args) throws Exception {
ConcurrentCache<String, String> cache = new ConcurrentCache<>();
ExecutorService executor = Executors.newFixedThreadPool(10);
// 并发读取
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
try {
String value = cache.getOrCompute("key-" + (taskId % 5), () -> {
System.out.println("加载 key-" + (taskId % 5));
Thread.sleep(100);
return "Value-" + (taskId % 5);
});
System.out.println("任务 " + taskId + " 获取: " + value);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("\n缓存统计:");
System.out.println("缓存大小: " + cache.size());
System.out.println("命中率: " + String.format("%.2f%%", cache.getHitRate() * 100));
}
}
五、模型对比
| 模型 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 生产者-消费者 | 数据处理流水线 | 解耦、缓冲、异步 | 缓冲区大小需要调优 |
| 读写分离 | 读多写少 | 读并发高 | 写操作阻塞所有读 |
| 工作窃取 | 计算密集型任务 | 负载均衡 | 任务窃取有开销 |
| Actor | 分布式系统 | 无锁、易扩展 | 消息传递有延迟 |
| Fork/Join | 分治任务 | 并行高效 | 任务分解需要设计 |
选择建议
- IO 密集型:生产者-消费者、读写分离
- CPU 密集型:工作窃取、Fork/Join
- 分布式系统:Actor 模型
- 读多写少:读写分离、CopyOnWrite
- 数据流水线:生产者-消费者
六、总结与最佳实践
最佳实践
-
生产者-消费者
- 合理设置缓冲区大小
- 处理好生产者和消费者的速度差异
- 注意任务拒绝策略
-
读写分离
- 读远多于写时效果明显
- 注意锁降级的正确使用
- StampedLock 的乐观读适合读多写少
-
工作窃取
- 任务分解粒度要适中
- 避免 IO 操作占用工作线程
- 使用适当的并行度
-
Actor
- 避免阻塞 Actor
- 消息不可变
- 合理设计 Actor 层次结构
-
Fork/Join
- 任务粒度适中(太小有调度开销)
- 避免在任务中执行 IO
- 合理使用 fork 和 invokeAll
常见陷阱
- 死锁:多个资源以不同顺序加锁
- 活锁:线程持续重试但无法前进
- 饥饿:某些线程长期无法获取资源
- 性能退化:同步开销超过并行收益
性能优化
- 减少锁竞争:缩小锁范围、使用读写锁
- 减少上下文切换:减少线程数、使用协程
- 减少内存竞争:使用 ThreadLocal、避免伪共享
- 批量处理:合并小任务、批量提交
并发模型是构建高并发系统的理论基础。选择合适的模型,遵循最佳实践,能够设计出高效、可扩展的并发应用。在实际开发中,往往需要结合多种模型,针对具体场景进行优化。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)