1. 什么是阻塞队列?

阻塞队列(BlockingQueue)是一种特殊的队列,它在支持队列基本操作(入队、出队)的同时,增加了阻塞特性:

  • 当队列满时,生产者线程试图入队会被阻塞,直到队列有空闲位置。

  • 当队列空时,消费者线程试图出队会被阻塞,直到队列有元素可用。

这种机制完美解决了生产者-消费者模型中的协调问题,避免了开发者手动编写 wait/notify 的复杂逻辑。

应用场景

  • 生产者-消费者模型

  • 线程池的任务队列

  • 消息中间件中的消息缓冲

  • 数据批处理流水线


2. BlockingQueue 接口详解

BlockingQueue 是 java.util.concurrent 包下的接口,继承自 Queue 接口。它定义了以下核心方法:

操作类型 抛出异常 返回特殊值 阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() - -

方法说明

  • 抛出异常:当操作失败时(如队列满时 add),抛出 IllegalStateException

  • 返回特殊值:操作失败时返回 false 或 null(如 offer 返回 falsepoll 返回 null)。

  • 阻塞:操作会阻塞线程,直到成功或被中断。

  • 超时退出:在指定时间内尝试操作,超时后返回特殊值。

注意事项

  • 阻塞队列不接受 null 元素,因为 null 常被用作特殊返回值(如 poll 返回 null 表示空队列)。

  • 所有实现类都是线程安全的。

  • 容量可以是无界的(Integer.MAX_VALUE),但无界队列可能导致内存溢出。


3. 核心实现类介绍

3.1 ArrayBlockingQueue

特点

  • 有界队列:必须指定容量,容量不可变。

  • 底层数组:基于数组实现,内部维护一个循环数组。

  • 公平性:构造时可指定是否使用公平锁(公平锁保证等待最久的线程优先获得锁,但会降低吞吐量)。(吞吐量 是指系统在单位时间内能够处理的任务数量。在并发编程中,它衡量的是一个组件(如线程池、队列、数据库连接池)在单位时间内完成操作(如入队、出队、请求处理)的能力。)

代码示例

// 创建容量为 10 的有界阻塞队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10, true); // 公平锁

// 生产者
queue.put("item");   // 阻塞直到有空位
boolean offered = queue.offer("item", 1, TimeUnit.SECONDS); // 超时

// 消费者
String item = queue.take(); // 阻塞直到有元素
String polled = queue.poll(1, TimeUnit.SECONDS); // 超时

适用场景:需要严格控制容量的场景,如固定大小的线程池任务队列。

3.2 LinkedBlockingQueue

特点

  • 可选有界/无界:默认容量为 Integer.MAX_VALUE(无界),也可指定容量。

  • 底层链表:基于链表实现,内部维护 head 和 last 节点。

  • 两把锁:使用 putLock 和 takeLock 分离,提高吞吐量(入队和出队可以并行)。

代码示例

// 无界队列(默认容量 Integer.MAX_VALUE)
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 有界队列
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);

性能对比LinkedBlockingQueue 的吞吐量通常高于 ArrayBlockingQueue,但受 GC 影响较大(链表节点频繁创建回收)。

适用场景:吞吐量优先、容量无限制或可预知的场景。

3.3 PriorityBlockingQueue

特点

  • 无界队列:但容量受内存限制。

  • 优先级队列:元素按照自然顺序或提供的 Comparator 排序。

  • 不保证公平性:优先级相同的元素,顺序不确定。

代码示例

// 存储整数,按自然顺序(升序)
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(5);
queue.put(1);
queue.put(3);
System.out.println(queue.take()); // 输出 1(最小)

注意PriorityBlockingQueue 的迭代器不保证顺序,take() 总是返回优先级最高的元素。

适用场景:需要按优先级处理任务(如任务调度)。

3.4 SynchronousQueue

特点

  • 容量为 0:不存储任何元素,每个 put 必须等待一个 take,反之亦然。

  • 直接传递:类似接力棒,生产者线程直接传递元素给消费者线程。

  • 公平性:支持公平模式(队列)和非公平模式(栈)。

代码示例

BlockingQueue<String> queue = new SynchronousQueue<>();

// 生产者
new Thread(() -> {
    try {
        queue.put("data"); // 阻塞直到有消费者接收
        System.out.println("produced");
    } catch (InterruptedException e) {}
}).start();

// 消费者
new Thread(() -> {
    try {
        String data = queue.take(); // 阻塞直到有生产者提供
        System.out.println("consumed: " + data);
    } catch (InterruptedException e) {}
}).start();

适用场景:线程池中 Executors.newCachedThreadPool() 使用的就是 SynchronousQueue,用于直接传递任务。

3.5 DelayQueue

特点

  • 无界队列,元素必须实现 Delayed 接口。

  • 延迟出队:只有延迟时间到期的元素才能被取出。

  • 基于优先级队列:内部使用 PriorityQueue,按剩余延迟排序。

代码示例

class DelayedElement implements Delayed {
    private long delayTime;
    private long expireTime;
    
    public DelayedElement(long delay, TimeUnit unit) {
        this.delayTime = delay;
        this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.expireTime, ((DelayedElement) o).expireTime);
    }
}

// 使用
DelayQueue<DelayedElement> queue = new DelayQueue<>();
queue.put(new DelayedElement(5, TimeUnit.SECONDS));
DelayedElement e = queue.take(); // 5 秒后返回

适用场景:缓存过期、定时任务、重试机制等。

3.6 LinkedTransferQueue

特点

  • 无界队列,基于链表。

  • 增加 transfer 方法:生产者等待消费者接收元素,否则阻塞。

  • 吞吐量高:内部使用 CAS 操作,减少锁竞争。

代码示例

LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

// 生产者
queue.transfer("data"); // 阻塞直到被消费者接收
// 消费者
String data = queue.take(); // 接收元素

适用场景:需要确认消费者已接收消息的场景(类似 SynchronousQueue 但可存储元素)。


4. 阻塞队列的实现原理

4.1 基于 ReentrantLock 和 Condition

大多数阻塞队列(如 ArrayBlockingQueue)内部使用 ReentrantLock 和 Condition 实现阻塞。

原理

  • 维护一个锁 lock 和两个条件变量:notFull(队列不满)和 notEmpty(队列不空)。

  • 生产者执行 put 时:

    1. 获取锁。

    2. 如果队列满,则在 notFull 上等待(await())。

    3. 否则插入元素,并唤醒 notEmpty 上的等待线程。

    4. 释放锁。

  • 消费者执行 take 时:

    1. 获取锁。

    2. 如果队列空,则在 notEmpty 上等待。

    3. 否则取出元素,并唤醒 notFull 上的等待线程。

    4. 释放锁。

简化代码示例(理解原理):

public class SimpleBlockingQueue<T> {
    private final Object[] items;
    private int putIndex = 0, takeIndex = 0, count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public SimpleBlockingQueue(int capacity) {
        items = new Object[capacity];
    }

    public void put(T t) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putIndex] = t;
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            @SuppressWarnings("unchecked")
            T t = (T) items[takeIndex];
            items[takeIndex] = null; // help GC
            if (++takeIndex == items.length) takeIndex = 0;
            count--;
            notFull.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
}

注意:真实实现中还有超时、中断处理等细节,但核心就是 lock + Condition

4.2 LinkedBlockingQueue 的两把锁设计

LinkedBlockingQueue 使用两把锁(putLock 和 takeLock),允许入队和出队并行执行,提高吞吐量。但需要维护一个 AtomicInteger 作为计数器,保证线程安全。

伪代码

private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final AtomicInteger count = new AtomicInteger();

public void put(E e) throws InterruptedException {
    int c = -1;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(e);
        c = count.getAndIncrement();
        if (c + 1 < capacity) {
            notFull.signal(); // 还有空位,唤醒其他生产者
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0) { // 之前队列为空,通知消费者
        signalNotEmpty();
    }
}

5. 生产者-消费者模型实战

下面使用 LinkedBlockingQueue 实现一个简单的生产者-消费者。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        // 创建容量为 5 的阻塞队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);

        // 生产者
        Runnable producer = () -> {
            AtomicInteger value = new AtomicInteger(0);
            while (true) {
                try {
                    int val = value.getAndIncrement();
                    queue.put(val);
                    System.out.println("生产者生产:" + val);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        };

        // 消费者
        Runnable consumer = () -> {
            while (true) {
                try {
                    Integer val = queue.take();
                    System.out.println("消费者消费:" + val);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        };

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

优点:无需手动管理锁和条件,代码简洁,可靠性高。


6. 使用场景与最佳实践

场景 推荐队列 理由
固定容量、严格控制内存 ArrayBlockingQueue 有界、数组结构,内存稳定
高吞吐量、无界或大容量 LinkedBlockingQueue 两把锁,入队出队并行
按优先级处理任务 PriorityBlockingQueue 无界,支持优先级
直接传递,不缓存 SynchronousQueue 容量0,强制对接
延迟任务/缓存过期 DelayQueue 只有到期元素才能取出
需要等待消费者确认 LinkedTransferQueue 支持 transfer 等待

最佳实践

  • 尽量使用有界队列,防止内存溢出。

  • 使用 offer/poll 带超时版本,避免无限阻塞。

  • 处理中断:put/take 会抛出 InterruptedException,应正确恢复中断或退出。

  • 线程池任务队列通常选择 LinkedBlockingQueue 或 ArrayBlockingQueue


7. 面试考点详解

7.1 阻塞队列与普通队列的区别?

  • 阻塞特性:阻塞队列提供阻塞的 put/take 操作,普通队列没有。

  • 线程安全:阻塞队列所有实现都是线程安全的;普通队列(如 LinkedList)非线程安全,需外部同步。

  • 等待通知机制:阻塞队列内部封装了条件变量,无需手动 wait/notify

  • 用途:阻塞队列适用于生产者-消费者模型,普通队列用于单线程或非阻塞场景。

7.2 ArrayBlockingQueue 与 LinkedBlockingQueue 的区别?

特性 ArrayBlockingQueue LinkedBlockingQueue
底层结构 数组 链表
容量 有界,固定 有界或无界
一把锁 两把锁(putLock/takeLock)
吞吐量 较低(单锁) 较高(双锁并行)
内存占用 固定,预分配 动态,每个节点有额外开销
公平性 可配置 不支持公平性

面试回答ArrayBlockingQueue 有界、数组实现、单锁,内存稳定;LinkedBlockingQueue 可选有界、链表实现、双锁,吞吐量高,但节点开销大。

7.3 如何实现一个阻塞队列?

见上文 4.1 节的自定义实现。重点考察对 ReentrantLock 和 Condition 的理解。

7.4 线程池中为什么常用 LinkedBlockingQueue 或 SynchronousQueue?

  • LinkedBlockingQueue:用于固定大小线程池(如 Executors.newFixedThreadPool),任务队列可以缓冲任务,避免创建过多线程。

  • SynchronousQueue:用于缓存线程池(Executors.newCachedThreadPool),直接传递任务,不缓存,需要时立即创建线程,适合处理大量短时任务。

7.5 阻塞队列如何保证线程安全?

内部使用 ReentrantLock 或 CAS(如 LinkedTransferQueue)保护共享状态,保证并发下的正确性。同时通过 Condition 实现阻塞与唤醒。

7.6 阻塞队列的 drainTo 方法有什么作用?

drainTo 可以一次性将队列中的全部或部分元素转移到集合中,减少锁竞争次数,提高批量处理效率。

List<String> list = new ArrayList<>();
queue.drainTo(list, 100); // 最多转移100个元素

7.7 无界队列的风险?

无界队列(如默认的 LinkedBlockingQueue)在生产者生产速度持续高于消费者时,会导致队列无限增长,最终内存溢出(OOM)。因此,在无法控制生产速率时,应使用有界队列。


8. 例题演练

例题1:选择题

以下关于阻塞队列的说法,正确的是?
A. ArrayBlockingQueue 是线程安全的,且容量可变。
B. LinkedBlockingQueue 默认是无界的,使用两把锁。
C. SynchronousQueue 容量为0,每次 put 必须等待 take。
D. PriorityBlockingQueue 是 FIFO 队列。

答案:B、C
解析:A 错误,容量不可变;D 错误,是优先级队列,不是 FIFO。

例题2:代码填空题

使用阻塞队列实现生产者-消费者,将代码补充完整:

BlockingQueue<Integer> queue = new __________(5); // 使用有界数组队列

// 生产者
new Thread(() -> {
    try {
        for (int i = 0; i < 10; i++) {
            queue.________(i); // 阻塞式入队
            System.out.println("生产:" + i);
        }
    } catch (InterruptedException e) {}
}).start();

// 消费者
new Thread(() -> {
    try {
        while (true) {
            Integer val = queue.________(); // 阻塞式出队
            System.out.println("消费:" + val);
        }
    } catch (InterruptedException e) {}
}).start();

答案ArrayBlockingQueueputtake

例题3:分析题

给定以下代码,分析可能存在的问题:

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); // 无界队列
ExecutorService executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, queue);
while (true) {
    executor.submit(() -> {
        // 耗时任务
        Thread.sleep(1000);
    });
}

答案:生产者(循环提交任务)速度极快,而队列无界,会导致队列无限增长,最终内存溢出。应使用有界队列,并设置拒绝策略。

例题4:设计题

请用阻塞队列实现一个简单的“日志收集器”,多个线程并发写日志,一个线程负责将日志写入文件。

参考答案

public class LogCollector {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);
    private final Thread writerThread;
    private volatile boolean running = true;

    public LogCollector() {
        writerThread = new Thread(() -> {
            try (FileWriter fw = new FileWriter("app.log")) {
                while (running || !queue.isEmpty()) {
                    String log = queue.poll(1, TimeUnit.SECONDS);
                    if (log != null) {
                        fw.write(log + "\n");
                        fw.flush();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        writerThread.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    public void shutdown() throws InterruptedException {
        running = false;
        writerThread.join();
    }
}

9. 总结

阻塞队列是 Java 并发包中用于线程间协作的核心组件,它封装了锁和条件变量的复杂逻辑,使开发者可以轻松实现生产者-消费者模型。理解不同实现类的特点、底层原理以及适用场景,是掌握并发编程的关键。

推荐实践

  • 优先使用 LinkedBlockingQueue 或 ArrayBlockingQueue,根据是否需要固定容量选择。

  • 警惕无界队列的内存风险,尽量使用有界队列。

  • 利用 drainTo 等方法提高批量处理性能。

  • 处理中断时,正确恢复中断状态或退出循环。

以上就是本篇的全部内容啦~~~咱们下期再见

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐