目录

一.阻塞队列

1.定义

2.阻塞的特性

3.Java中的阻塞队列实现

1.ArrayBlockingQueue

2.LinkedBlockingQueue

3. PriorityBlockingQueue

4. SynchronousQueue

5. DelayQueue

4.阻塞队列的常用方法

5.使用场景

6.模拟实现阻塞队列

二.线程池

1.定义

2.常见的线程池类型

3.线程池参数

1.corePollSize

2.maximumPoolSize

3.keepAliveTime

4.TimeUnit unit

5.BlockingQueue workQueue

6.threadFActory

7.rejectedExecutionHandler(重点)


一.阻塞队列

1.定义

阻塞队列是一种线程安全的队列,它可以在队列满的时候,阻塞插入操作,也可以在队列空闲时,阻塞获取操作。常用于生产者-消费者模型,用于协调多线程的数据交换。

2.阻塞的特性

1.确保线程安全。

2.当队列为空的时候,如果尝试出队列,此时出队列的操作就会被阻塞,知道有其他线程添加元素时,才会停止阻塞。

2.当队列为满的时候,如果尝试入队列,此时入队列的操作就会被阻塞,知道有其他线程取走元素时,才会停止阻塞。

3.Java中的阻塞队列实现

1.ArrayBlockingQueue

这个是基于数组的有界队列,在初始化时要指定容量。

BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
queue.put(1); // 阻塞插入
int value = queue.take(); // 阻塞获取
 

2.LinkedBlockingQueue

基于链表,可选有界或无界(默认Integer.MAX_VALUE)。

吞吐量高:两把锁分离(插入和获取操作可并行)。

3. PriorityBlockingQueue

无界优先级队列,元素按自然顺序或Comparator排序。

注意:插入操作永不阻塞(无界),但获取可能阻塞。

4. SynchronousQueue

不存储元素,每个插入操作必须等待对应的移除操作。

直接传递数据,适用于高吞吐场景。

5. DelayQueue

存储Delayed元素,只有到期时才能被取出(如定时任务)。

4.阻塞队列的常用方法

方法 说明
put(E e) 阻塞插入,直到队列有空位。
take() 阻塞获取,直到队列非空。
offer(E e, long timeout, TimeUnit unit) 插入元素,超时后返回false
poll(long timeout, TimeUnit unit) 获取元素,超时后返回null

5.使用场景

1.生产者-消费者模型

// 生产者
public void produce() throws InterruptedException {
    queue.put(data);
}

// 消费者
public void consume() throws InterruptedException {
    Data data = queue.take();
    process(data);
}
 

2.线程池任务调度

ThreadPoolExecutor使用阻塞队列管理待执行任务。

6.模拟实现阻塞队列

模拟实现阻塞队列,并且实现消费者模型

class MyBlockingQueue{
    private Object object = new Object();
    private String[] data = null;
    //队首
    private int head = 0;
    //队尾
    private int tail= 0;
    //元素个数
    private int size = 0;


    public MyBlockingQueue(int a) {
       data = new String[a];
    }

    public void put(String elme) throws InterruptedException {
        synchronized (this) {
            while (size >= data.length) {
                //队列满了等待
              this.wait();
            }
            data[tail] = elme;
            tail++;
            if (tail >= data.length) {
                tail = 0;
            }
            size++;
            this.notify();//唤醒take
        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            while (size == 0) {
                //队列没满
                this.wait();
            }
            String ret = data[head];
            head++;
            if (head >= data.length) {
                head = 0;
            }
            size--;
            this.notify();//唤醒put
            return ret;
        }
    }
}
public class Demo28 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(1000);

        Thread producer = new Thread(() -> {
           int n = 0;
           while (true){
               try {
                   queue.put(n + "");
                   System.out.println("生产者元素" + n);
                   n++;
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });

        Thread consumer = new Thread(() -> {
           while (true){
               try {
                 String n = queue.take();
                   System.out.println("消费者元素" + n);
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });
        producer.start();
        consumer.start();

    }
}

优点:解耦合,削峰填谷。缺点:服务器的集群结构更复杂,性能较差。

二.线程池

1.定义

线程池是一种多线程处理形式,通过预先创建一组线程并管理它们的生命周期,避免频繁创建和销毁线程的开销。线程池的核心目的是提高系统资源利用率,减少线程创建和销毁的性能损耗。

核心方法为:submit,通过Runnable 描述一段要执行的人物,通过submit任务放到线程池中,此时线程池里的线程就会执行这样的任务。1

2.常见的线程池类型

1.FixedThreadPool:固定线程数的线程池,队列无界。

ExecutorService executor = Executors.newFixedThreadPool(5);
 

2.CachedThreadPool:线程数可动态扩展,适合短时异步任务。

ExecutorService executor = Executors.newCachedThreadPool();
 

3.ScheduledThreadPool:支持定时或周期性任务。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
 

4.SingleThreadExecutor:单线程的线程池,确保任务顺序执行。

ExecutorService executor = Executors.newSingleThreadExecutor();
 

3.线程池参数

1.corePollSize

这个表示的是核心线程。如果线程池创建,这些线程也随之创建,直到整个线程池全部销毁,这些线程才会销毁。

2.maximumPoolSize

这个表示的是最大线程数。最大线程数 = 核心线程+ 非核心线程,这里的非核心线程在不繁忙的时候会直接销毁,当繁忙的时候在重新创建

3.keepAliveTime

这个表示非核心线程运行空闲的最大时间。

4.TimeUnit unit

一个枚举类,用于表示时间单位(如纳秒、毫秒、秒等)。

5.BlockingQueue<Runnable> workQueue

阻塞队列。常用的队列有:

ArrayBlockingQueue:基于数组的有界队列

LinkedBlockingQueue:基于链表的可选有界队列

SynchronousQueue:不存储元素的直接传递队列

6.threadFActory

线程工厂,可以用来弥补构造方法的缺陷,这个也是一种设计模式,和单例模式是并列关系。

核心是通过静态方法,把构造对象new的过程,各种属性初始化的过程,封装起来。

class PointFactory{
    public static Point makepointbyxy (double x , double y){
Point p = new Point();
        return p;
    }

    public static Point makepointbyra (double r , double e){
Point p = new Point();
return p;
    }
}
public class Demo30 {
    public static void main(String[] args) {
        Point p =PointFactory.makepointbyra(10,20);
    }
}

7.rejectedExecutionHandler(重点)

拒绝策略

当线程池和队列已满时的处理策略,常见策略包括:

AbortPolicy:默认策略,直接抛出RejectedExecutionException

CallerRunsPolicy:让调用submit的线程制动执行

DiscardPolicy:丢弃队列中最新的任务,当前submit这个任务

DiscardOldestPolicy:丢弃队列中最老的任务

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                      // corePoolSize
    10,                     // maximumPoolSize
    60,                     // keepAliveTime (seconds)
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),  // workQueue
    new CustomThreadFactory(),      // threadFactory
    new ThreadPoolExecutor.CallerRunsPolicy()  // rejectedPolicy
);
 

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐