Java修行实录:阻塞队列(实现生产者-消费者模型)
一、阻塞队列
阻塞队列和普通队列的区别在于:线程安全和队列边界状态的处理上

二、synchronized手写阻塞队列
属性:
private final Object[] items;
private int size;
private int putIndex;
private int takeIndex;
final修饰Object[]的含义:
- 表示数组引用 items 不可变,items不能指向另一个数组对象,但是数组的内容是可变的
- 防止在其他方法中,不小心将this.items = new Object[],
- 如果items指向了其他数组对象,那么原始数据就丢失了
循环队列实现:putIndex 和 takeIndex的初始都是 0 ,中间夹着的就是整个队列的数据,size控制队满/空的条件
2.1 put()函数
public void put(T e) throws InterruptedException {
synchronized (this){
while(size == items.length){
System.out.println("--------------当 前 队 列 已 满---------------");
wait();
}
items[putIndex] = e;
size++;
if(++putIndex == items.length) putIndex = 0;
notifyAll();
}
}
这种写法等同于:
public synchronized void put(T e) throws InterruptedException {
while(size == items.length){
System.out.println("--------------当 前 队 列 已 满---------------");
wait();
}
items[putIndex] = e;
size++;
if(++putIndex == items.length) putIndex = 0;
notifyAll();
}
synchronized锁住的是对象实例,如果是 static synchronized锁住的就是类
当队列已满时,释放锁wait(),notifyAll()唤醒所有等待池的线程,加入到锁池中竞争锁
2.2 take()函数
public synchronized T take() throws InterruptedException {
while(size == 0){
System.out.println("---------------队 列 为 空----------------");
wait();
}
T outNum = (T)items[takeIndex];
size--;
if(++takeIndex == items.length) takeIndex = 0;
notifyAll();
return outNum;
}
Object 类型转为 T 类型,向下转型需要强转(相当于条件更加苛刻)
2.3 Java中断机制
Java的中断机制是 “协作式”
线程对象上设置一个布尔标志位,iterrupt()不会强制停止CPU执行代码,而是修改标志位(true是发生中断,false是没有中断)
- wait(),sleep(),join()这些方法内部设置了检查逻辑,执行前都会检查中断标志位
- 如果是true,检查到发生了中断,将当前线程的中断标志位改为 false,再抛出 IterruptedException
- 如果没有catch()这个中断异常,并且没有重新设置中断标志位,也没有return或break,那么异常就消失了(异常被吞掉了),标志位没了,上层调用者不知道发生了中断异常
- 代码继续执行while循环,再次进入wait()
对于其他赋值操作 a = b,只是Java普通字节码,JVM不会检查中断逻辑
案例:
Thread t = new Thread(() -> {
// 1. 赋值操作(非阻塞)
int a = 1;
int b = 2;
int c = a + b;
// 即使此时调用 t.interrupt(),上面三行代码也会正常执行完,不会报错。
// 中断标志位变成了 true,但代码没检查,所以无感。
// 2. 阻塞操作
try {
synchronized(this) {
wait(); // <--- 这里会检查中断标志位!
}
} catch (InterruptedException e) {
// 如果之前调用了 interrupt(),这里会立刻捕获异常
System.out.println("被中断了!");
}
});
t.start();
// 主线程立刻中断它
t.interrupt();
2.3.1 try-catch语法讲解
处理错误的机制,try是尝试,catch是补救
try {
// 【尝试做某事】
// 比如:从队列取数据,如果队列空了且被中断,这里会出错
cq.take();
System.out.println("取数据成功"); // 如果上面出错,这行不会执行
} catch (InterruptedException e) {
// 【出错了怎么办】
// 只有当 try 里抛出 InterruptedException 时,才会进到这里
System.out.println("出错了:线程被中断了");
// 在这里决定是停止线程、记录日志还是重试
}
// 【后续】
// 无论 try 是否出错,程序都会继续执行这里的代码(除非 catch 里 return 或 throw 了)
System.out.println("程序继续运行...");
2.3.2 签名抛出异常 vs 手动恢复异常

public synchronized T take() throws InterruptedException

take()方法带上签名throw,会将异常传递给上层,如果不带throw,会在catch中被吞掉
1.场景A:签名throw(通过异常传递)
// 底层方法
public T take() throws InterruptedException {
wait(); // 中断时:1.清除标志位 2.抛出异常
}
// 调用者
try {
cq.take();
} catch (InterruptedException e) {
// 异常已经传递上来了,调用者已经知道被中断了
// 不需要再恢复标志位,因为异常本身就是信号
break;
}
执行take()方法时,wait()方法被中断,take()方法会抛出异常,从而在调用者的catch中捕获到异常信号,进入catch块中处理,catch块中也不需要恢复标志位,因为中断信号已经传给了调用者
2.场景B:手动恢复异常(通过标志位传递)
// 底层方法(不能 throws,比如实现 Runnable)
public T take() {
try {
wait();
} catch (InterruptedException e) {
// ✅ 吃掉异常,但必须恢复标志位!
Thread.currentThread().interrupt();
return null; // 或抛出运行时异常
}
}
// 调用者
T result = cq.take(); // ❌ 这里捕获不到 InterruptedException!
// 如果想感知中断,必须主动检查标志位:
if (Thread.currentThread().isInterrupted()) {
// 哦,原来被中断过
return;
}
wait发生了异常时,take中的catch块会吞掉异常,所以必须将中断位恢复位true,才能让调用者知道

2.4 生产者消费者模型
public static void main(String[] args) {
CBlockingQueue<Integer> cq = new CBlockingQueue(100);
for (int i = 0; i < 3; i++) {
new Thread(()->{
while(true){
int inNum = (int)(Math.random() * 100);
try {
cq.put(inNum);
System.out.println("生产数字" + inNum);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();;
}
}
}).start();
}
for (int i = 0; i < 3 ; i++) {
new Thread(()->{
while(true){
try {
int num = cq.take();
System.out.println("将数字" + num + "消费");
Thread.sleep(400);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();;
}
}
}).start();
}
new Thread(()->{
while(true){
System.out.println("当前任务总量为:" + cq.size);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();;
}
}
}).start();
}
创建三个生产者线程、三个消费者线程和一个监听任务总量的线程
三、ReentrantLock和Condition实现阻塞队列
属性
private final Object[] items;
private int putIndex;
private int takeIndex;
private int capacity;
private int size;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
(capacity不是必要的,可以用items.length代替)
- ReentranLock lock:一个lock一个锁池,锁池中争抢的是lock锁,synchronized争抢的是对象的Monitor监视器
- Condition:条件变量,基于lock锁划分等待池,每一个条件变量都有自己的等待池,synchronized只有一个等待池和一个锁池
3.1 put()方法
public void put(T e) throws InterruptedException {
if(e == null){
throw new NullPointerException("不能为null");
}
lock.lock();
try{
while(size == capacity){
System.out.println(Thread.currentThread().getName()+
"队列已满...");
notFull.await(); //释放锁,当前队列已满,可以取出
}
items[putIndex] = e; // T 向上转型为 Object
if(++putIndex == capacity) putIndex = 0;
size++;
notEmpty.signal();
}finally{
lock.unlock();
}
}
手动释放锁:lock.lock() 和 lock.unlock()要成对出现,且用try-finally包裹起来
notFull.await():释放lock锁,将当前线程加入notFull的等待池中
notEmpty.signal():随机唤醒条件变量notEmpty下的等待池的一个线程,进入锁池争抢lcok
3.1.1 为什么要加上try-finally

如果没有加上try-fianlly,程序如果在执行业务逻辑时(或被await()中断)发生意外,unlock()将永远不会执行!那这把锁就永远释放不了,发生死锁!
加上try-finally后,即使抛出异常,JVM在跳出方法前,会强制执行finally块,确保锁会正常释放,其他线程会继续工作
特殊情况:遇到return
public int test() {
try {
System.out.println("A: 准备返回");
return 1; // 遇到 return
} finally {
System.out.println("B: 但是我要先执行 finally!");
// 注意:如果 finally 里也有 return,会覆盖 try 里的返回值(这是坏味道)
}
}
// 输出:A -> B -> (方法真正返回 1)

3.1.2 使用synchronized(隐式的try-finally)
synchronized是JVM关键字,编译器在编译时,会自动包裹隐式的try-finally,所以不需要手动释放锁,JVM帮你做了,如果是关文件、关数据库连接,依然要手写try-finally
3.2 take()方法
public T take() throws InterruptedException{
lock.lock();
try{
while (size == 0){
System.out.println(Thread.currentThread().getName()+
"队列为空...");
notEmpty.await();
}
T element = (T)items[takeIndex]; // 将Object从上往下转为 T,需要强转
if(++takeIndex == capacity) takeIndex = 0;
size--;
notFull.signal();
return element;
} finally {
lock.unlock();
}
}
3.3 生产者消费者模型
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue<Integer> bq = new MyBlockingQueue<>(100);
// 启动两个生产者线程
for (int i = 0; i < 2; i++) {
new Thread(()->{
//每个生产者 每一秒生产一个num
while(true){
int num = (int) (Math.random() * 100);
try {
bq.put(num);
System.out.println(Thread.currentThread().getName()+
"生产" + num);
Thread.sleep(200); //模拟生产耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
//启动两个消费者
for (int i = 0; i < 2; i++) {
new Thread(()->{
try {
while(true){
Integer outNum = bq.take();
System.out.println(Thread.currentThread().getName() +
" 消费 " + outNum);
Thread.sleep(500); //模拟消费耗时
}
} catch (InterruptedException e) {
//throw new RuntimeException(e);
Thread.currentThread().interrupt();
}
}).start();
}
new Thread(()->{
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("现有的队列任务量为:" + bq.size);
}
}).start();
Thread.sleep(5000);
System.out.println("测试结束");
}
3.4 throw new RuntimeException(e)是什么?
- 当代码执行到throw new RuntimeException(e)时,将不再执行业务逻辑的代码(比如生产者的while循环(true)),后续所有代码全部跳过
- JVM会捕获这个未处理的异常,在控制台打一大串红色的堆栈错误信息
- 该线程的生命周期停止,也不会醒来,也不会执行任务

方案A:直接抛出异常(猝死)
catch (InterruptedException e) {
throw new RuntimeException(e);
// 结果:
// 1. 线程立即死亡。
// 2. 中断标志位丢失(false)。
// 3. 上层只知道出错了,不知道是被中断了。
// 4. break 根本不会执行。
}
throw之后不能break,throw语句一旦执行,后面的任何代码都不会执行
方案B:恢复标志位 + break
catch (InterruptedException e) {
// 1. 手动把中断标志位补回来(因为抛异常时 JVM 把它清空了)
Thread.currentThread().interrupt();
// 2. 正常跳出循环
break;
// 结果:
// 1. 循环结束,run() 方法自然执行完毕。
// 2. 线程状态变为 TERMINATED(正常死亡)。
// 3. 中断标志位保持为 true(如果上层检查的话能知道)。
// 4. 没有红色报错堆栈,世界和平。
}
虽然两个方案都会导致TERMINATED,但是两种死亡方式有着本质区别:


代码层面的对比:
方案A:异常死亡路径
public void run() {
try {
while(true) {
doWork(); // 假设这里被中断
}
} catch (InterruptedException e) {
// 💥 炸弹爆炸!
// 1. 这里的代码执行完,run() 方法直接非正常返回
// 2. 循环后面的 cleanup() 永远不会被执行!
throw new RuntimeException(e);
}
// ❌ 下面的清理代码永远到不了
cleanup();
}
// 结果:线程 TERMINATED,但资源可能泄漏,日志全是报错。
方案B:正常死亡路径
public void run() {
try {
while(true) {
doWork();
}
} catch (InterruptedException e) {
// 🛡️ 恢复现场
Thread.currentThread().interrupt();
// 🚪 优雅出门
break;
}
// ✅ 因为是从 break 出来的,这里可以正常执行!
cleanup();
}
// 结果:线程 TERMINATED,资源已清理,日志干净,中断信号保留。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)