队列可以有两种实现方式:size变量 vs 预留空间


一、什么是阻塞队列?

1、手动实现

相较于传统队列,阻塞队列线程安全,因为其带有阻塞效果

package javaee;

import java.util.concurrent.ArrayBlockingQueue;

public class MyBlockingQueue {

    public int[] arr;
    public int front = 0; //队头下标
    public int rear = 0; //队尾下标
    public int size = 0; //当前元素个数
    Object locker = new Object();

    public MyBlockingQueue(int num) {
        this.arr = new int[num];
    }

    public void put(int elem) throws InterruptedException {
        synchronized (locker) {
            //判满
            //这里是while不是if
            while (size == arr.length) {
                //满则加锁等待
                locker.wait();
            }
            arr[rear] = elem;
            rear++;
            size++;
            if (rear >= arr.length) {
                rear = 0;
            }
            //放完唤醒别人
            locker.notify();
        }
    }

    public int take() throws InterruptedException {
        synchronized (locker) {
            //判空
            //替换if为while
            while (size == 0) {
                locker.wait();
            }
            int ret = arr[front];
            front++;
            size--;
            if (front >= arr.length) {
                front = 0;
            }
            //拿完唤醒别人
            locker.notify();
            return ret;
        }
    }

    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(1000);

        Thread product = new Thread(()->{
            int n = 1;
            while (true){
                try {
                    queue.put(n);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("放入的元素:" + n);
                n++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        Thread consumer = new Thread(()->{
            while (true) {
                try {
                    int t = queue.take();
                    System.out.println("拿出的元素:" + t);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        product.start();
        consumer.start();
    }

}

2、阻塞队列的size实现要点

①引入新变量 size 代替了 (rear+1)%array.length == front 判满

(这样做可以不用多创建一个空间,但代价是多维护一个变量)

②队头队尾移动使用 越界判断 重置,代替取模

③通过加锁实现线程安全

④注意这里不要使用if,否则会导致刚被唤醒,没检查就继续往下执行了


3、生产者消费者模型

✅优点:

①降低锁冲突

(生产者和消费者不再直接互斥访问共享资源,通过队列缓冲,减少锁竞争)

②解耦合

(生产者和消费者不直接依赖,一方修改不影响另一方)

③削峰填谷

(队列缓冲能平滑突发流量,避免下游被瞬间冲垮)

❌缺点:

①通信效率

(引入中间队列,数据需要先存再取,比直接调用多了一次传递开销)

②增加系统架构复杂程度,提高运维成本

(多了一个队列组件,需要考虑队列容量、持久化、监控、运维等问题)


4、标准库中的使用

package javaee;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

//阻塞队列标准库的使用
public class Demo19 {

    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);

        Thread product = new Thread(()->{
            int count = 0;
            while (true){
                try {
                    queue1.put(count);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                count++;
                System.out.println("存入的元素:" + count );
            }
        });

        Thread consumer = new Thread(()->{
            while (true){
                int x = 0;
                try {
                    x = queue1.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("取出的元素:" + x );
            }
        });

        product.start();
        consumer.start();
    }
}

拿来就能用,使用上非常简单


二、线程池

1、作用

①降低资源消耗

(避免频繁创建/销毁线程,线程创建需要分配栈内存(~1MB)、进行系统调用,开销大)

②提高响应速度

(线程预先创建好,任务来了直接执行,不用等线程创建)

③便于统一管理

(控制线程数量、管理任务队列、提供拒绝策略,防止资源耗尽)


2、构造方法的参数(重点)

①线程池 (ThreadPoolExecutor)有七大核心参数:

  1. corePoolSize 核心线程数

  2. maximumPoolSize 最大线程数(核心+临时)

  3. keepAliveTime 临时线程空闲存活时间

  4. unit 存活时间的单位

  5. workQueue 阻塞队列(存放等待的任务)

  6. threadFactory 创建线程的工厂

  7. handler 拒绝策略(队列满且线程满时)

②其中有四种拒绝策略:

策略 类名 行为 比喻
① 抛异常
AbortPolicy(默认)
直接抛出 RejectedExecutionException
公司说"不招了,滚"
② 调用者执行
CallerRunsPolicy
谁提交的任务谁自己执行
老板说"你自己干吧"
③ 丢弃最老的
DiscardOldestPolicy
丢弃队列里等待最久的任务
把排最久的人赶走,新人进来
④ 丢弃最新的
DiscardPolicy
悄悄丢弃当前提交的任务
假装没看见新人

3、手动实现的简单线程池

package javaee;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

//自定义的简易线程池
//实现了核心功能:创建固定数量的工作线程,通过阻塞队列接收任务并执行
class MyThreadPoolExecutor{

    // 线程安全的队列,当队列为空时take()会阻塞,当队列满时put()会阻塞
    private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

    //构造方法,参数是线程个数
    public MyThreadPoolExecutor(int nThread){
        for (int i = 0; i < nThread; i++) {
            Thread a = new Thread(()->{
                while (true){
                    try {
                        Runnable runnable = queue.take();
                        runnable.run();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            a.start();
        }
    }

    public void submit(Runnable runnable) throws InterruptedException {
        queue.put(runnable);
    }

}

public class Demo21 {

    public static void main(String[] args) {
        //创建10个线程并在构造方法内启动
        MyThreadPoolExecutor m = new MyThreadPoolExecutor(10);

        //提交1000个任务
        for (int i = 0; i < 1000; i++) {
            int id = i;
            try {
                m.submit(()->{
                    System.out.println("第" + id + "个任务");
                });
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

}

4、标准库中的使用

package javaee;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo20 {
    public static void main(String[] args) throws InterruptedException {

        ExecutorService service = Executors.newFixedThreadPool(10);
        //i不是事实final,有报错
//        for (int i = 1; i < 1000; i++) {
//            service.submit(()->{
//                System.out.println("第"+ i + "个元素");
//            });
//            Thread.sleep(1000);
//        }

        for (int i = 1; i < 1000; i++) {
            //加入新变量
            int id = i;
            service.submit(()->{
                System.out.println("第"+ id + "个元素");
            });
            Thread.sleep(1000);
        }
    }
}

Executors 是对 ThreadPoolExecutor 的封装,提供了几种预设的线程池创建方法,使用起来很方便。
但在生产环境中,更推荐直接使用 ThreadPoolExecutor 的构造方法,手动指定所有参数,避免 Executors 预设参数可能带来的 OOM 风险(《阿里巴巴Java开发手册》规定:线程池不允许用 Executors 创建,要用 ThreadPoolExecutor 手动指定参数)
这样可以根据业务场景灵活设置队列类型、拒绝策略等,做到线程池参数可控


三、定时器

1、简单使用

package javaee;

import java.util.Timer;
import java.util.TimerTask;

public class Demo22 {
    public static void main(String[] args) {
        Timer timer = new Timer();

        //添加任务
        //第一个参数,表示要执行的任务,不是简单的Runnable而是TimerTask
        //第二个参数,表示需要等待多久后执行
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello 1");
            }
        },1000);

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello 2");
            }
        },2000);

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello 3");
            }
        },5000);

        System.out.println("hello main");
    }
}

使用上非常简单,重点在于下面的手动实现一个定时器


2、手动实现(重点)

package javaee;

//手动实现定时器
public class Demo26{
    private volatile boolean running = true;
    private Thread worker;

    // 延迟执行一次
    public void schedule(Runnable task, long delay) {
        worker = new Thread(() -> {
            try {
                Thread.sleep(delay);
                if (running) task.run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        worker.start();
    }

    // 延迟 + 周期执行(核心区别在这里)
    public void scheduleAtFixedRate(Runnable task, long delay, long period) {
        worker = new Thread(() -> {
            try {
                Thread.sleep(delay);
                while (running) {
                    task.run();
                    Thread.sleep(period);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        worker.start();
    }

    public void stop() {
        running = false;
        if (worker != null) {
            worker.interrupt();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Demo26 timer = new Demo26();

        // 测试1:延迟执行一次
        System.out.println("开始执行,等待1秒...");
        timer.schedule(() -> {
            System.out.println("hello 1(延迟1秒执行)");
        }, 1000);

        // 测试2:延迟+周期执行
        timer.scheduleAtFixedRate(() -> {
            System.out.println("hello 2(每2秒执行一次)");
        }, 3000, 2000);

        // 运行10秒后停止
        Thread.sleep(10000);
        timer.stop();
        System.out.println("定时器已停止");
    }
}


多线程完

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐