一、阻塞队列

阻塞队列和普通队列的区别在于:线程安全和队列边界状态的处理上

二、synchronized手写阻塞队列

属性:

private final Object[] items;
private int size;
private int putIndex;
private int takeIndex;

final修饰Object[]的含义:

  • 表示数组引用 items 不可变,items不能指向另一个数组对象,但是数组的内容是可变的
  • 防止在其他方法中,不小心将this.items = new Object[],
  • 如果items指向了其他数组对象,那么原始数据就丢失了

循环队列实现:putIndex 和 takeIndex的初始都是 0 ,中间夹着的就是整个队列的数据,size控制队满/空的条件

2.1 put()函数

    public void put(T e) throws InterruptedException {

        synchronized (this){
            while(size == items.length){
                System.out.println("--------------当 前 队 列 已 满---------------");
                wait();
            }
            items[putIndex] = e;
            size++;
            if(++putIndex == items.length) putIndex = 0;
            notifyAll();
        }
    }

这种写法等同于:

    public synchronized void put(T e) throws InterruptedException {

        while(size == items.length){
            System.out.println("--------------当 前 队 列 已 满---------------");
            wait();
        }
        items[putIndex] = e;
        size++;
        if(++putIndex == items.length) putIndex = 0;
        notifyAll();
    }

synchronized锁住的是对象实例,如果是 static synchronized锁住的就是类

当队列已满时,释放锁wait(),notifyAll()唤醒所有等待池的线程,加入到锁池中竞争锁

2.2 take()函数

    public synchronized T take() throws InterruptedException {
        while(size == 0){
            System.out.println("---------------队 列 为 空----------------");
            wait();
        }
        T outNum = (T)items[takeIndex];
        size--;
        if(++takeIndex == items.length) takeIndex = 0;
        notifyAll();
        return outNum;
    }

Object 类型转为 T 类型,向下转型需要强转(相当于条件更加苛刻)

2.3 Java中断机制

Java的中断机制是 “协作式”

线程对象上设置一个布尔标志位,iterrupt()不会强制停止CPU执行代码,而是修改标志位(true是发生中断,false是没有中断)

  • wait(),sleep(),join()这些方法内部设置了检查逻辑,执行前都会检查中断标志位
  • 如果是true,检查到发生了中断,将当前线程的中断标志位改为 false,再抛出 IterruptedException
  • 如果没有catch()这个中断异常,并且没有重新设置中断标志位,也没有return或break,那么异常就消失了(异常被吞掉了),标志位没了,上层调用者不知道发生了中断异常
  • 代码继续执行while循环,再次进入wait()

对于其他赋值操作 a = b,只是Java普通字节码,JVM不会检查中断逻辑

案例:

Thread t = new Thread(() -> {
    // 1. 赋值操作(非阻塞)
    int a = 1; 
    int b = 2;
    int c = a + b; 
    // 即使此时调用 t.interrupt(),上面三行代码也会正常执行完,不会报错。
    // 中断标志位变成了 true,但代码没检查,所以无感。

    // 2. 阻塞操作
    try {
        synchronized(this) {
            wait(); // <--- 这里会检查中断标志位!
        }
    } catch (InterruptedException e) {
        // 如果之前调用了 interrupt(),这里会立刻捕获异常
        System.out.println("被中断了!");
    }
});
t.start();

// 主线程立刻中断它
t.interrupt(); 

2.3.1 try-catch语法讲解

处理错误的机制,try是尝试,catch是补救

try {
    // 【尝试做某事】
    // 比如:从队列取数据,如果队列空了且被中断,这里会出错
    cq.take(); 
    System.out.println("取数据成功"); // 如果上面出错,这行不会执行
} catch (InterruptedException e) {
    // 【出错了怎么办】
    // 只有当 try 里抛出 InterruptedException 时,才会进到这里
    System.out.println("出错了:线程被中断了");
    // 在这里决定是停止线程、记录日志还是重试
}
// 【后续】
// 无论 try 是否出错,程序都会继续执行这里的代码(除非 catch 里 return 或 throw 了)
System.out.println("程序继续运行...");

2.3.2 签名抛出异常 vs 手动恢复异常

public synchronized T take() throws InterruptedException

 take()方法带上签名throw,会将异常传递给上层,如果不带throw,会在catch中被吞掉

1.场景A:签名throw(通过异常传递)

// 底层方法
public T take() throws InterruptedException {
    wait(); // 中断时:1.清除标志位 2.抛出异常
}

// 调用者
try {
    cq.take();
} catch (InterruptedException e) {
    // 异常已经传递上来了,调用者已经知道被中断了
    // 不需要再恢复标志位,因为异常本身就是信号
    break; 
}

执行take()方法时,wait()方法被中断,take()方法会抛出异常,从而在调用者的catch中捕获到异常信号,进入catch块中处理,catch块中也不需要恢复标志位,因为中断信号已经传给了调用者

2.场景B:手动恢复异常(通过标志位传递)

// 底层方法(不能 throws,比如实现 Runnable)
public T take() {
    try {
        wait();
    } catch (InterruptedException e) {
        // ✅ 吃掉异常,但必须恢复标志位!
        Thread.currentThread().interrupt();
        return null; // 或抛出运行时异常
    }
}

// 调用者
T result = cq.take();  // ❌ 这里捕获不到 InterruptedException!
// 如果想感知中断,必须主动检查标志位:
if (Thread.currentThread().isInterrupted()) {
    // 哦,原来被中断过
    return;
}

wait发生了异常时,take中的catch块会吞掉异常,所以必须将中断位恢复位true,才能让调用者知道

2.4 生产者消费者模型

public static void main(String[] args) {

        CBlockingQueue<Integer> cq = new CBlockingQueue(100);


        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                while(true){
                    int inNum = (int)(Math.random() * 100);
                    try {
                        cq.put(inNum);
                        System.out.println("生产数字" + inNum);
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();;
                    }
                }

            }).start();
        }

        for (int i = 0; i < 3 ; i++) {
            new Thread(()->{
                while(true){
                    try {
                        int num = cq.take();
                        System.out.println("将数字" + num + "消费");
                        Thread.sleep(400);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();;
                    }
                }

            }).start();
        }

        new Thread(()->{
            while(true){
                System.out.println("当前任务总量为:" + cq.size);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();;
                }
            }

        }).start();

    }

        创建三个生产者线程、三个消费者线程和一个监听任务总量的线程

三、ReentrantLock和Condition实现阻塞队列

属性

    private final Object[] items;
    private int putIndex;
    private int takeIndex;
    private int capacity;
    private int size;
    private final ReentrantLock lock  = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

(capacity不是必要的,可以用items.length代替)

  • ReentranLock lock:一个lock一个锁池,锁池中争抢的是lock锁,synchronized争抢的是对象的Monitor监视器
  • Condition:条件变量,基于lock锁划分等待池,每一个条件变量都有自己的等待池,synchronized只有一个等待池和一个锁池

3.1 put()方法

    public void put(T e) throws InterruptedException {
        if(e == null){
            throw new NullPointerException("不能为null");
        }
        lock.lock();
        try{
            while(size == capacity){
                System.out.println(Thread.currentThread().getName()+
                        "队列已满...");
                notFull.await(); //释放锁,当前队列已满,可以取出
            }
            items[putIndex] = e; // T 向上转型为 Object
            if(++putIndex == capacity) putIndex = 0;
            size++;
            notEmpty.signal();
        }finally{
            lock.unlock();
        }
    }

手动释放锁:lock.lock() 和 lock.unlock()要成对出现,且用try-finally包裹起来

notFull.await():释放lock锁,将当前线程加入notFull的等待池中

notEmpty.signal():随机唤醒条件变量notEmpty下的等待池的一个线程,进入锁池争抢lcok

3.1.1 为什么要加上try-finally

如果没有加上try-fianlly,程序如果在执行业务逻辑时(或被await()中断)发生意外,unlock()将永远不会执行!那这把锁就永远释放不了,发生死锁!

加上try-finally后,即使抛出异常,JVM在跳出方法前,会强制执行finally块,确保锁会正常释放,其他线程会继续工作

特殊情况:遇到return

public int test() {
    try {
        System.out.println("A: 准备返回");
        return 1; // 遇到 return
    } finally {
        System.out.println("B: 但是我要先执行 finally!");
        // 注意:如果 finally 里也有 return,会覆盖 try 里的返回值(这是坏味道)
    }
}
// 输出:A -> B -> (方法真正返回 1)

3.1.2 使用synchronized(隐式的try-finally)

        synchronized是JVM关键字,编译器在编译时,会自动包裹隐式的try-finally,所以不需要手动释放锁,JVM帮你做了,如果是关文件、关数据库连接,依然要手写try-finally

3.2 take()方法

public T take() throws InterruptedException{
        lock.lock();

        try{
            while (size == 0){
                System.out.println(Thread.currentThread().getName()+
                        "队列为空...");
                notEmpty.await();
            }

            T element = (T)items[takeIndex];  // 将Object从上往下转为 T,需要强转
            if(++takeIndex == capacity) takeIndex = 0;
            size--;

            notFull.signal();
            return element;

        } finally {
            lock.unlock();
        }

    }

3.3 生产者消费者模型

public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue<Integer> bq = new MyBlockingQueue<>(100);

        // 启动两个生产者线程
        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                //每个生产者 每一秒生产一个num
                while(true){
                    int num = (int) (Math.random() * 100);
                    try {
                        bq.put(num);
                        System.out.println(Thread.currentThread().getName()+
                                "生产" + num);
                        Thread.sleep(200); //模拟生产耗时
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }



            }).start();
        }

        //启动两个消费者
        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                try {
                    while(true){
                        Integer outNum = bq.take();
                        System.out.println(Thread.currentThread().getName() +
                                " 消费 " + outNum);
                        Thread.sleep(500); //模拟消费耗时
                    }
                } catch (InterruptedException e) {
                    //throw new RuntimeException(e);
                    Thread.currentThread().interrupt();
                }

            }).start();

        }

        new Thread(()->{
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("现有的队列任务量为:" + bq.size);
            }


        }).start();

        Thread.sleep(5000);
        System.out.println("测试结束");

    }

3.4 throw new RuntimeException(e)是什么?

  • 当代码执行到throw new RuntimeException(e)时,将不再执行业务逻辑的代码(比如生产者的while循环(true)),后续所有代码全部跳过
  • JVM会捕获这个未处理的异常,在控制台打一大串红色的堆栈错误信息
  • 该线程的生命周期停止,也不会醒来,也不会执行任务

方案A:直接抛出异常(猝死)

catch (InterruptedException e) {
    throw new RuntimeException(e); 
    // 结果:
    // 1. 线程立即死亡。
    // 2. 中断标志位丢失(false)。
    // 3. 上层只知道出错了,不知道是被中断了。
    // 4. break 根本不会执行。
}

throw之后不能break,throw语句一旦执行,后面的任何代码都不会执行

方案B:恢复标志位 + break

catch (InterruptedException e) {
    // 1. 手动把中断标志位补回来(因为抛异常时 JVM 把它清空了)
    Thread.currentThread().interrupt(); 
    
    // 2. 正常跳出循环
    break; 
    
    // 结果:
    // 1. 循环结束,run() 方法自然执行完毕。
    // 2. 线程状态变为 TERMINATED(正常死亡)。
    // 3. 中断标志位保持为 true(如果上层检查的话能知道)。
    // 4. 没有红色报错堆栈,世界和平。
}

虽然两个方案都会导致TERMINATED,但是两种死亡方式有着本质区别:

代码层面的对比:

方案A:异常死亡路径

public void run() {
    try {
        while(true) {
            doWork(); // 假设这里被中断
        }
    } catch (InterruptedException e) {
        // 💥 炸弹爆炸!
        // 1. 这里的代码执行完,run() 方法直接非正常返回
        // 2. 循环后面的 cleanup() 永远不会被执行!
        throw new RuntimeException(e); 
    }
    // ❌ 下面的清理代码永远到不了
    cleanup(); 
}
// 结果:线程 TERMINATED,但资源可能泄漏,日志全是报错。

方案B:正常死亡路径

public void run() {
    try {
        while(true) {
            doWork(); 
        }
    } catch (InterruptedException e) {
        // 🛡️ 恢复现场
        Thread.currentThread().interrupt(); 
        // 🚪 优雅出门
        break; 
    }
    // ✅ 因为是从 break 出来的,这里可以正常执行!
    cleanup(); 
}
// 结果:线程 TERMINATED,资源已清理,日志干净,中断信号保留。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐