Java--多线程--阻塞队列
1. 什么是阻塞队列?
阻塞队列(BlockingQueue)是一种特殊的队列,它在支持队列基本操作(入队、出队)的同时,增加了阻塞特性:
-
当队列满时,生产者线程试图入队会被阻塞,直到队列有空闲位置。
-
当队列空时,消费者线程试图出队会被阻塞,直到队列有元素可用。
这种机制完美解决了生产者-消费者模型中的协调问题,避免了开发者手动编写 wait/notify 的复杂逻辑。
应用场景:
-
生产者-消费者模型
-
线程池的任务队列
-
消息中间件中的消息缓冲
-
数据批处理流水线
2. BlockingQueue 接口详解
BlockingQueue 是 java.util.concurrent 包下的接口,继承自 Queue 接口。它定义了以下核心方法:
| 操作类型 | 抛出异常 | 返回特殊值 | 阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
| 移除 | remove() |
poll() |
take() |
poll(time, unit) |
| 检查 | element() |
peek() |
- | - |
方法说明:
-
抛出异常:当操作失败时(如队列满时
add),抛出IllegalStateException。 -
返回特殊值:操作失败时返回
false或null(如offer返回false,poll返回null)。 -
阻塞:操作会阻塞线程,直到成功或被中断。
-
超时退出:在指定时间内尝试操作,超时后返回特殊值。
注意事项:
-
阻塞队列不接受
null元素,因为null常被用作特殊返回值(如poll返回null表示空队列)。 -
所有实现类都是线程安全的。
-
容量可以是无界的(
Integer.MAX_VALUE),但无界队列可能导致内存溢出。
3. 核心实现类介绍
3.1 ArrayBlockingQueue
特点:
-
有界队列:必须指定容量,容量不可变。
-
底层数组:基于数组实现,内部维护一个循环数组。
-
公平性:构造时可指定是否使用公平锁(公平锁保证等待最久的线程优先获得锁,但会降低吞吐量)。(吞吐量 是指系统在单位时间内能够处理的任务数量。在并发编程中,它衡量的是一个组件(如线程池、队列、数据库连接池)在单位时间内完成操作(如入队、出队、请求处理)的能力。)
代码示例:
// 创建容量为 10 的有界阻塞队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10, true); // 公平锁
// 生产者
queue.put("item"); // 阻塞直到有空位
boolean offered = queue.offer("item", 1, TimeUnit.SECONDS); // 超时
// 消费者
String item = queue.take(); // 阻塞直到有元素
String polled = queue.poll(1, TimeUnit.SECONDS); // 超时
适用场景:需要严格控制容量的场景,如固定大小的线程池任务队列。
3.2 LinkedBlockingQueue
特点:
-
可选有界/无界:默认容量为
Integer.MAX_VALUE(无界),也可指定容量。 -
底层链表:基于链表实现,内部维护
head和last节点。 -
两把锁:使用
putLock和takeLock分离,提高吞吐量(入队和出队可以并行)。
代码示例:
// 无界队列(默认容量 Integer.MAX_VALUE)
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 有界队列
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);
性能对比:LinkedBlockingQueue 的吞吐量通常高于 ArrayBlockingQueue,但受 GC 影响较大(链表节点频繁创建回收)。
适用场景:吞吐量优先、容量无限制或可预知的场景。
3.3 PriorityBlockingQueue
特点:
-
无界队列:但容量受内存限制。
-
优先级队列:元素按照自然顺序或提供的
Comparator排序。 -
不保证公平性:优先级相同的元素,顺序不确定。
代码示例:
// 存储整数,按自然顺序(升序)
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(5);
queue.put(1);
queue.put(3);
System.out.println(queue.take()); // 输出 1(最小)
注意:PriorityBlockingQueue 的迭代器不保证顺序,take() 总是返回优先级最高的元素。
适用场景:需要按优先级处理任务(如任务调度)。
3.4 SynchronousQueue
特点:
-
容量为 0:不存储任何元素,每个
put必须等待一个take,反之亦然。 -
直接传递:类似接力棒,生产者线程直接传递元素给消费者线程。
-
公平性:支持公平模式(队列)和非公平模式(栈)。
代码示例:
BlockingQueue<String> queue = new SynchronousQueue<>();
// 生产者
new Thread(() -> {
try {
queue.put("data"); // 阻塞直到有消费者接收
System.out.println("produced");
} catch (InterruptedException e) {}
}).start();
// 消费者
new Thread(() -> {
try {
String data = queue.take(); // 阻塞直到有生产者提供
System.out.println("consumed: " + data);
} catch (InterruptedException e) {}
}).start();
适用场景:线程池中 Executors.newCachedThreadPool() 使用的就是 SynchronousQueue,用于直接传递任务。
3.5 DelayQueue
特点:
-
无界队列,元素必须实现
Delayed接口。 -
延迟出队:只有延迟时间到期的元素才能被取出。
-
基于优先级队列:内部使用
PriorityQueue,按剩余延迟排序。
代码示例:
class DelayedElement implements Delayed {
private long delayTime;
private long expireTime;
public DelayedElement(long delay, TimeUnit unit) {
this.delayTime = delay;
this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((DelayedElement) o).expireTime);
}
}
// 使用
DelayQueue<DelayedElement> queue = new DelayQueue<>();
queue.put(new DelayedElement(5, TimeUnit.SECONDS));
DelayedElement e = queue.take(); // 5 秒后返回
适用场景:缓存过期、定时任务、重试机制等。
3.6 LinkedTransferQueue
特点:
-
无界队列,基于链表。
-
增加
transfer方法:生产者等待消费者接收元素,否则阻塞。 -
吞吐量高:内部使用 CAS 操作,减少锁竞争。
代码示例:
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// 生产者
queue.transfer("data"); // 阻塞直到被消费者接收
// 消费者
String data = queue.take(); // 接收元素
适用场景:需要确认消费者已接收消息的场景(类似 SynchronousQueue 但可存储元素)。
4. 阻塞队列的实现原理
4.1 基于 ReentrantLock 和 Condition
大多数阻塞队列(如 ArrayBlockingQueue)内部使用 ReentrantLock 和 Condition 实现阻塞。
原理:
-
维护一个锁
lock和两个条件变量:notFull(队列不满)和notEmpty(队列不空)。 -
生产者执行
put时:-
获取锁。
-
如果队列满,则在
notFull上等待(await())。 -
否则插入元素,并唤醒
notEmpty上的等待线程。 -
释放锁。
-
-
消费者执行
take时:-
获取锁。
-
如果队列空,则在
notEmpty上等待。 -
否则取出元素,并唤醒
notFull上的等待线程。 -
释放锁。
-
简化代码示例(理解原理):
public class SimpleBlockingQueue<T> {
private final Object[] items;
private int putIndex = 0, takeIndex = 0, count = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public SimpleBlockingQueue(int capacity) {
items = new Object[capacity];
}
public void put(T t) throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == items.length) {
notFull.await();
}
items[putIndex] = t;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (count == 0) {
notEmpty.await();
}
@SuppressWarnings("unchecked")
T t = (T) items[takeIndex];
items[takeIndex] = null; // help GC
if (++takeIndex == items.length) takeIndex = 0;
count--;
notFull.signal();
return t;
} finally {
lock.unlock();
}
}
}
注意:真实实现中还有超时、中断处理等细节,但核心就是 lock + Condition。
4.2 LinkedBlockingQueue 的两把锁设计
LinkedBlockingQueue 使用两把锁(putLock 和 takeLock),允许入队和出队并行执行,提高吞吐量。但需要维护一个 AtomicInteger 作为计数器,保证线程安全。
伪代码:
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final AtomicInteger count = new AtomicInteger();
public void put(E e) throws InterruptedException {
int c = -1;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal(); // 还有空位,唤醒其他生产者
}
} finally {
putLock.unlock();
}
if (c == 0) { // 之前队列为空,通知消费者
signalNotEmpty();
}
}
5. 生产者-消费者模型实战
下面使用 LinkedBlockingQueue 实现一个简单的生产者-消费者。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducerConsumerDemo {
public static void main(String[] args) {
// 创建容量为 5 的阻塞队列
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
// 生产者
Runnable producer = () -> {
AtomicInteger value = new AtomicInteger(0);
while (true) {
try {
int val = value.getAndIncrement();
queue.put(val);
System.out.println("生产者生产:" + val);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
// 消费者
Runnable consumer = () -> {
while (true) {
try {
Integer val = queue.take();
System.out.println("消费者消费:" + val);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
new Thread(producer).start();
new Thread(consumer).start();
}
}
优点:无需手动管理锁和条件,代码简洁,可靠性高。
6. 使用场景与最佳实践
| 场景 | 推荐队列 | 理由 |
|---|---|---|
| 固定容量、严格控制内存 | ArrayBlockingQueue |
有界、数组结构,内存稳定 |
| 高吞吐量、无界或大容量 | LinkedBlockingQueue |
两把锁,入队出队并行 |
| 按优先级处理任务 | PriorityBlockingQueue |
无界,支持优先级 |
| 直接传递,不缓存 | SynchronousQueue |
容量0,强制对接 |
| 延迟任务/缓存过期 | DelayQueue |
只有到期元素才能取出 |
| 需要等待消费者确认 | LinkedTransferQueue |
支持 transfer 等待 |
最佳实践:
-
尽量使用有界队列,防止内存溢出。
-
使用
offer/poll带超时版本,避免无限阻塞。 -
处理中断:
put/take会抛出InterruptedException,应正确恢复中断或退出。 -
线程池任务队列通常选择
LinkedBlockingQueue或ArrayBlockingQueue。
7. 面试考点详解
7.1 阻塞队列与普通队列的区别?
-
阻塞特性:阻塞队列提供阻塞的
put/take操作,普通队列没有。 -
线程安全:阻塞队列所有实现都是线程安全的;普通队列(如
LinkedList)非线程安全,需外部同步。 -
等待通知机制:阻塞队列内部封装了条件变量,无需手动
wait/notify。 -
用途:阻塞队列适用于生产者-消费者模型,普通队列用于单线程或非阻塞场景。
7.2 ArrayBlockingQueue 与 LinkedBlockingQueue 的区别?
| 特性 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 底层结构 | 数组 | 链表 |
| 容量 | 有界,固定 | 有界或无界 |
| 锁 | 一把锁 | 两把锁(putLock/takeLock) |
| 吞吐量 | 较低(单锁) | 较高(双锁并行) |
| 内存占用 | 固定,预分配 | 动态,每个节点有额外开销 |
| 公平性 | 可配置 | 不支持公平性 |
面试回答:ArrayBlockingQueue 有界、数组实现、单锁,内存稳定;LinkedBlockingQueue 可选有界、链表实现、双锁,吞吐量高,但节点开销大。
7.3 如何实现一个阻塞队列?
见上文 4.1 节的自定义实现。重点考察对 ReentrantLock 和 Condition 的理解。
7.4 线程池中为什么常用 LinkedBlockingQueue 或 SynchronousQueue?
-
LinkedBlockingQueue:用于固定大小线程池(如Executors.newFixedThreadPool),任务队列可以缓冲任务,避免创建过多线程。 -
SynchronousQueue:用于缓存线程池(Executors.newCachedThreadPool),直接传递任务,不缓存,需要时立即创建线程,适合处理大量短时任务。
7.5 阻塞队列如何保证线程安全?
内部使用 ReentrantLock 或 CAS(如 LinkedTransferQueue)保护共享状态,保证并发下的正确性。同时通过 Condition 实现阻塞与唤醒。
7.6 阻塞队列的 drainTo 方法有什么作用?
drainTo 可以一次性将队列中的全部或部分元素转移到集合中,减少锁竞争次数,提高批量处理效率。
List<String> list = new ArrayList<>();
queue.drainTo(list, 100); // 最多转移100个元素
7.7 无界队列的风险?
无界队列(如默认的 LinkedBlockingQueue)在生产者生产速度持续高于消费者时,会导致队列无限增长,最终内存溢出(OOM)。因此,在无法控制生产速率时,应使用有界队列。
8. 例题演练
例题1:选择题
以下关于阻塞队列的说法,正确的是?
A. ArrayBlockingQueue 是线程安全的,且容量可变。
B. LinkedBlockingQueue 默认是无界的,使用两把锁。
C. SynchronousQueue 容量为0,每次 put 必须等待 take。
D. PriorityBlockingQueue 是 FIFO 队列。
答案:B、C
解析:A 错误,容量不可变;D 错误,是优先级队列,不是 FIFO。
例题2:代码填空题
使用阻塞队列实现生产者-消费者,将代码补充完整:
BlockingQueue<Integer> queue = new __________(5); // 使用有界数组队列
// 生产者
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.________(i); // 阻塞式入队
System.out.println("生产:" + i);
}
} catch (InterruptedException e) {}
}).start();
// 消费者
new Thread(() -> {
try {
while (true) {
Integer val = queue.________(); // 阻塞式出队
System.out.println("消费:" + val);
}
} catch (InterruptedException e) {}
}).start();
答案:ArrayBlockingQueue、put、take
例题3:分析题
给定以下代码,分析可能存在的问题:
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); // 无界队列
ExecutorService executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, queue);
while (true) {
executor.submit(() -> {
// 耗时任务
Thread.sleep(1000);
});
}
答案:生产者(循环提交任务)速度极快,而队列无界,会导致队列无限增长,最终内存溢出。应使用有界队列,并设置拒绝策略。
例题4:设计题
请用阻塞队列实现一个简单的“日志收集器”,多个线程并发写日志,一个线程负责将日志写入文件。
参考答案:
public class LogCollector {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);
private final Thread writerThread;
private volatile boolean running = true;
public LogCollector() {
writerThread = new Thread(() -> {
try (FileWriter fw = new FileWriter("app.log")) {
while (running || !queue.isEmpty()) {
String log = queue.poll(1, TimeUnit.SECONDS);
if (log != null) {
fw.write(log + "\n");
fw.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
writerThread.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
public void shutdown() throws InterruptedException {
running = false;
writerThread.join();
}
}
9. 总结
阻塞队列是 Java 并发包中用于线程间协作的核心组件,它封装了锁和条件变量的复杂逻辑,使开发者可以轻松实现生产者-消费者模型。理解不同实现类的特点、底层原理以及适用场景,是掌握并发编程的关键。
推荐实践:
-
优先使用
LinkedBlockingQueue或ArrayBlockingQueue,根据是否需要固定容量选择。 -
警惕无界队列的内存风险,尽量使用有界队列。
-
利用
drainTo等方法提高批量处理性能。 -
处理中断时,正确恢复中断状态或退出循环。
以上就是本篇的全部内容啦~~~咱们下期再见
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)