03、JAVAEE---多线程(三)
·
队列可以有两种实现方式:size变量 vs 预留空间
一、什么是阻塞队列?
1、手动实现
相较于传统队列,阻塞队列线程安全,因为其带有阻塞效果
package javaee;
import java.util.concurrent.ArrayBlockingQueue;
public class MyBlockingQueue {
public int[] arr;
public int front = 0; //队头下标
public int rear = 0; //队尾下标
public int size = 0; //当前元素个数
Object locker = new Object();
public MyBlockingQueue(int num) {
this.arr = new int[num];
}
public void put(int elem) throws InterruptedException {
synchronized (locker) {
//判满
//这里是while不是if
while (size == arr.length) {
//满则加锁等待
locker.wait();
}
arr[rear] = elem;
rear++;
size++;
if (rear >= arr.length) {
rear = 0;
}
//放完唤醒别人
locker.notify();
}
}
public int take() throws InterruptedException {
synchronized (locker) {
//判空
//替换if为while
while (size == 0) {
locker.wait();
}
int ret = arr[front];
front++;
size--;
if (front >= arr.length) {
front = 0;
}
//拿完唤醒别人
locker.notify();
return ret;
}
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
Thread product = new Thread(()->{
int n = 1;
while (true){
try {
queue.put(n);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("放入的元素:" + n);
n++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread consumer = new Thread(()->{
while (true) {
try {
int t = queue.take();
System.out.println("拿出的元素:" + t);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
product.start();
consumer.start();
}
}
2、阻塞队列的size实现要点
①引入新变量 size 代替了 (rear+1)%array.length == front 判满
(这样做可以不用多创建一个空间,但代价是多维护一个变量)
②队头队尾移动使用 越界判断 重置,代替取模
③通过加锁实现线程安全
④注意这里不要使用if,否则会导致刚被唤醒,没检查就继续往下执行了
3、生产者消费者模型
✅优点:
①降低锁冲突
(生产者和消费者不再直接互斥访问共享资源,通过队列缓冲,减少锁竞争)
②解耦合
(生产者和消费者不直接依赖,一方修改不影响另一方)
③削峰填谷
(队列缓冲能平滑突发流量,避免下游被瞬间冲垮)
❌缺点:
①通信效率
(引入中间队列,数据需要先存再取,比直接调用多了一次传递开销)
②增加系统架构复杂程度,提高运维成本
(多了一个队列组件,需要考虑队列容量、持久化、监控、运维等问题)
4、标准库中的使用
package javaee;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//阻塞队列标准库的使用
public class Demo19 {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);
Thread product = new Thread(()->{
int count = 0;
while (true){
try {
queue1.put(count);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
count++;
System.out.println("存入的元素:" + count );
}
});
Thread consumer = new Thread(()->{
while (true){
int x = 0;
try {
x = queue1.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("取出的元素:" + x );
}
});
product.start();
consumer.start();
}
}
拿来就能用,使用上非常简单
二、线程池
1、作用
①降低资源消耗
(避免频繁创建/销毁线程,线程创建需要分配栈内存(~1MB)、进行系统调用,开销大)
②提高响应速度
(线程预先创建好,任务来了直接执行,不用等线程创建)
③便于统一管理
(控制线程数量、管理任务队列、提供拒绝策略,防止资源耗尽)
2、构造方法的参数(重点)
①线程池 (ThreadPoolExecutor)有七大核心参数:
-
corePoolSize核心线程数 -
maximumPoolSize最大线程数(核心+临时) -
keepAliveTime临时线程空闲存活时间 -
unit存活时间的单位 -
workQueue阻塞队列(存放等待的任务) -
threadFactory创建线程的工厂 -
handler拒绝策略(队列满且线程满时)
②其中有四种拒绝策略:
| 策略 | 类名 | 行为 | 比喻 |
|---|---|---|---|
① 抛异常 |
|
直接抛出 |
公司说"不招了,滚" |
② 调用者执行 |
|
谁提交的任务谁自己执行 |
老板说"你自己干吧" |
③ 丢弃最老的 |
|
丢弃队列里等待最久的任务 |
把排最久的人赶走,新人进来 |
④ 丢弃最新的 |
|
悄悄丢弃当前提交的任务 |
假装没看见新人 |
3、手动实现的简单线程池
package javaee;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
//自定义的简易线程池
//实现了核心功能:创建固定数量的工作线程,通过阻塞队列接收任务并执行
class MyThreadPoolExecutor{
// 线程安全的队列,当队列为空时take()会阻塞,当队列满时put()会阻塞
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
//构造方法,参数是线程个数
public MyThreadPoolExecutor(int nThread){
for (int i = 0; i < nThread; i++) {
Thread a = new Thread(()->{
while (true){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
a.start();
}
}
public void submit(Runnable runnable) throws InterruptedException {
queue.put(runnable);
}
}
public class Demo21 {
public static void main(String[] args) {
//创建10个线程并在构造方法内启动
MyThreadPoolExecutor m = new MyThreadPoolExecutor(10);
//提交1000个任务
for (int i = 0; i < 1000; i++) {
int id = i;
try {
m.submit(()->{
System.out.println("第" + id + "个任务");
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
4、标准库中的使用
package javaee;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo20 {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
//i不是事实final,有报错
// for (int i = 1; i < 1000; i++) {
// service.submit(()->{
// System.out.println("第"+ i + "个元素");
// });
// Thread.sleep(1000);
// }
for (int i = 1; i < 1000; i++) {
//加入新变量
int id = i;
service.submit(()->{
System.out.println("第"+ id + "个元素");
});
Thread.sleep(1000);
}
}
}
Executors 是对 ThreadPoolExecutor 的封装,提供了几种预设的线程池创建方法,使用起来很方便。
但在生产环境中,更推荐直接使用 ThreadPoolExecutor 的构造方法,手动指定所有参数,避免 Executors 预设参数可能带来的 OOM 风险(《阿里巴巴Java开发手册》规定:线程池不允许用 Executors 创建,要用 ThreadPoolExecutor 手动指定参数)
这样可以根据业务场景灵活设置队列类型、拒绝策略等,做到线程池参数可控
三、定时器
1、简单使用
package javaee;
import java.util.Timer;
import java.util.TimerTask;
public class Demo22 {
public static void main(String[] args) {
Timer timer = new Timer();
//添加任务
//第一个参数,表示要执行的任务,不是简单的Runnable而是TimerTask
//第二个参数,表示需要等待多久后执行
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 1");
}
},1000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 2");
}
},2000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("hello 3");
}
},5000);
System.out.println("hello main");
}
}
使用上非常简单,重点在于下面的手动实现一个定时器
2、手动实现(重点)
package javaee;
//手动实现定时器
public class Demo26{
private volatile boolean running = true;
private Thread worker;
// 延迟执行一次
public void schedule(Runnable task, long delay) {
worker = new Thread(() -> {
try {
Thread.sleep(delay);
if (running) task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
worker.start();
}
// 延迟 + 周期执行(核心区别在这里)
public void scheduleAtFixedRate(Runnable task, long delay, long period) {
worker = new Thread(() -> {
try {
Thread.sleep(delay);
while (running) {
task.run();
Thread.sleep(period);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
worker.start();
}
public void stop() {
running = false;
if (worker != null) {
worker.interrupt();
}
}
public static void main(String[] args) throws InterruptedException {
Demo26 timer = new Demo26();
// 测试1:延迟执行一次
System.out.println("开始执行,等待1秒...");
timer.schedule(() -> {
System.out.println("hello 1(延迟1秒执行)");
}, 1000);
// 测试2:延迟+周期执行
timer.scheduleAtFixedRate(() -> {
System.out.println("hello 2(每2秒执行一次)");
}, 3000, 2000);
// 运行10秒后停止
Thread.sleep(10000);
timer.stop();
System.out.println("定时器已停止");
}
}
多线程完

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)