多线程(阻塞队列,线程池)4
目录
7.rejectedExecutionHandler(重点)
一.阻塞队列
1.定义
阻塞队列是一种线程安全的队列,它可以在队列满的时候,阻塞插入操作,也可以在队列空闲时,阻塞获取操作。常用于生产者-消费者模型,用于协调多线程的数据交换。
2.阻塞的特性
1.确保线程安全。
2.当队列为空的时候,如果尝试出队列,此时出队列的操作就会被阻塞,知道有其他线程添加元素时,才会停止阻塞。
2.当队列为满的时候,如果尝试入队列,此时入队列的操作就会被阻塞,知道有其他线程取走元素时,才会停止阻塞。
3.Java中的阻塞队列实现
1.ArrayBlockingQueue
这个是基于数组的有界队列,在初始化时要指定容量。
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
queue.put(1); // 阻塞插入
int value = queue.take(); // 阻塞获取
2.LinkedBlockingQueue
基于链表,可选有界或无界(默认Integer.MAX_VALUE)。
吞吐量高:两把锁分离(插入和获取操作可并行)。
3. PriorityBlockingQueue
无界优先级队列,元素按自然顺序或Comparator排序。
注意:插入操作永不阻塞(无界),但获取可能阻塞。
4. SynchronousQueue
不存储元素,每个插入操作必须等待对应的移除操作。
直接传递数据,适用于高吞吐场景。
5. DelayQueue
存储Delayed元素,只有到期时才能被取出(如定时任务)。
4.阻塞队列的常用方法
| 方法 | 说明 |
|---|---|
put(E e) |
阻塞插入,直到队列有空位。 |
take() |
阻塞获取,直到队列非空。 |
offer(E e, long timeout, TimeUnit unit) |
插入元素,超时后返回false。 |
poll(long timeout, TimeUnit unit) |
获取元素,超时后返回null。 |
5.使用场景
1.生产者-消费者模型
// 生产者
public void produce() throws InterruptedException {
queue.put(data);
}
// 消费者
public void consume() throws InterruptedException {
Data data = queue.take();
process(data);
}
2.线程池任务调度
如ThreadPoolExecutor使用阻塞队列管理待执行任务。
6.模拟实现阻塞队列
模拟实现阻塞队列,并且实现消费者模型
class MyBlockingQueue{
private Object object = new Object();
private String[] data = null;
//队首
private int head = 0;
//队尾
private int tail= 0;
//元素个数
private int size = 0;
public MyBlockingQueue(int a) {
data = new String[a];
}
public void put(String elme) throws InterruptedException {
synchronized (this) {
while (size >= data.length) {
//队列满了等待
this.wait();
}
data[tail] = elme;
tail++;
if (tail >= data.length) {
tail = 0;
}
size++;
this.notify();//唤醒take
}
}
public String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
//队列没满
this.wait();
}
String ret = data[head];
head++;
if (head >= data.length) {
head = 0;
}
size--;
this.notify();//唤醒put
return ret;
}
}
}
public class Demo28 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
Thread producer = new Thread(() -> {
int n = 0;
while (true){
try {
queue.put(n + "");
System.out.println("生产者元素" + n);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread consumer = new Thread(() -> {
while (true){
try {
String n = queue.take();
System.out.println("消费者元素" + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
consumer.start();
}
}
优点:解耦合,削峰填谷。缺点:服务器的集群结构更复杂,性能较差。
二.线程池
1.定义
线程池是一种多线程处理形式,通过预先创建一组线程并管理它们的生命周期,避免频繁创建和销毁线程的开销。线程池的核心目的是提高系统资源利用率,减少线程创建和销毁的性能损耗。
核心方法为:submit,通过Runnable 描述一段要执行的人物,通过submit任务放到线程池中,此时线程池里的线程就会执行这样的任务。1
2.常见的线程池类型
1.FixedThreadPool:固定线程数的线程池,队列无界。
ExecutorService executor = Executors.newFixedThreadPool(5);
2.CachedThreadPool:线程数可动态扩展,适合短时异步任务。
ExecutorService executor = Executors.newCachedThreadPool();
3.ScheduledThreadPool:支持定时或周期性任务。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
4.SingleThreadExecutor:单线程的线程池,确保任务顺序执行。
ExecutorService executor = Executors.newSingleThreadExecutor();
3.线程池参数
1.corePollSize
这个表示的是核心线程。如果线程池创建,这些线程也随之创建,直到整个线程池全部销毁,这些线程才会销毁。
2.maximumPoolSize
这个表示的是最大线程数。最大线程数 = 核心线程+ 非核心线程,这里的非核心线程在不繁忙的时候会直接销毁,当繁忙的时候在重新创建
3.keepAliveTime
这个表示非核心线程运行空闲的最大时间。
4.TimeUnit unit
一个枚举类,用于表示时间单位(如纳秒、毫秒、秒等)。
5.BlockingQueue<Runnable> workQueue
阻塞队列。常用的队列有:
ArrayBlockingQueue:基于数组的有界队列
LinkedBlockingQueue:基于链表的可选有界队列
SynchronousQueue:不存储元素的直接传递队列
6.threadFActory
线程工厂,可以用来弥补构造方法的缺陷,这个也是一种设计模式,和单例模式是并列关系。
核心是通过静态方法,把构造对象new的过程,各种属性初始化的过程,封装起来。
class PointFactory{
public static Point makepointbyxy (double x , double y){
Point p = new Point();
return p;
}
public static Point makepointbyra (double r , double e){
Point p = new Point();
return p;
}
}
public class Demo30 {
public static void main(String[] args) {
Point p =PointFactory.makepointbyra(10,20);
}
}
7.rejectedExecutionHandler(重点)
拒绝策略
当线程池和队列已满时的处理策略,常见策略包括:
AbortPolicy:默认策略,直接抛出RejectedExecutionException
CallerRunsPolicy:让调用submit的线程制动执行
DiscardPolicy:丢弃队列中最新的任务,当前submit这个任务
DiscardOldestPolicy:丢弃队列中最老的任务
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60, // keepAliveTime (seconds)
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // workQueue
new CustomThreadFactory(), // threadFactory
new ThreadPoolExecutor.CallerRunsPolicy() // rejectedPolicy
);
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)