JUC(线程池、并发锁工具)
线程池
ExecrtorServise接口
ThreadPoolExecutor实现类(最基本线程池)
ScheduledExecutorService 接口
ScheduledThreadPoolExecutor实现类(同时实现ExecrtorServise接口 ),带有任务调度的线程池
ThreadPoolExecutor
使用int的高3位表示线程池状态,低29位表示线程数量
线程池状态:
| 状态 | 含义 | 是否接收新任务 | 是否处理队列任务 | 说明 |
|---|---|---|---|---|
| RUNNING | 运行中 | ✔ | ✔ | 默认状态,可以接收新任务并处理阻塞队列中的任务 |
| SHUTDOWN | 关闭中 | ✘ | ✔ | 不再接收新任务,但会处理队列中已有任务 |
| STOP | 停止 | ✘ | ✘ | 不接收新任务,不处理队列任务,还会中断正在执行的线程 |
| TIDYING | 整理 | ✘ | ✘ | 所有任务执行完,线程池即将结束 |
| TERMINATED | 已终止 | ✘ | ✘ | 线程池完全关闭 |
为什么存在一个int中:
将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值
构造方法
corePoolSize:核心线程数目(最多保留的线程数)
maximumPoolSize:最大线程数目
keepAliveTime:生存时间-针对救急线程
unit:时间单位-针对救急线程
workQueue:阻塞队列
threadFactory:线程工厂-可以为线程创建时起个好名字
handler:拒绝策略
拒绝策略:
| 策略 | 类名 | 行为 |
|---|---|---|
| AbortPolicy | 默认策略 | 直接抛异常 |
| CallerRunsPolicy | 调用者运行 | 由提交任务的线程执行 |
| DiscardPolicy | 丢弃任务 | 静默丢弃 |
| DiscardOldestPolicy | 丢弃最旧任务 | 丢弃队列头任务,然后重试提交 |
dubbo,抛异常前记录日志,方便问题排查
Netty:创建新线程执行任务
ActiveMQ:等待时间内尝试放入消息队列
PinPoint:使用拒绝策略链,尝试策略链中每种拒绝策略
根据这个构造方法,Executors类中提供了众多工厂方法创建各种用途线程池
newFixedThreadPool
Executors.newFixedThreadPool(int nThreads)
没有救急线程
阻塞队列是无界的
适用于任务量已知,相对耗时的任务
newCachedThreadPool
Executors.newCachedThreadPool()
核心线程数:0
最大线程数:Integer.MAX_VALUE
生存时间:60s
队列:SynchronizedQueue(没有容量,没有线程来取是放不进去的)
适合任务数比较密集,但每个任务执行时间较短
newSingleThreadExecutor
Executors.newSingleThreadExecutor()
核心线程:1
最大线程:1
队列:阻塞无界队列
希望多个任务排队执行
与自己主线程执行的区别:当出现错误,线程池会创建线程继续执行
与参数为1的fixed线程池区别:
single不能修改线程数
- 返回FinalizableDelegateExecutorService对象,应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有方法
fixed可以强转修改线程数
- 返回ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
提交任务
执行任务
void execute(Runnable command)
有返回值的Callable任务
<T> Future<T>submit(Callable<T> task)
提交tasks中所有任务,带超时时间
invokeAll(Collection <? extends Callable<T>> task,long timeout,TimeUnit unit)
提交tasks中所有任务,那个任务先完成,返回此项任务执行结果,其他任务取消
invokeAny(Collection <? extends Callable<T>> task,long timeout,TimeUnit unit)
关闭线程池
shutdow()
- 修改线程池状态为shutdown
- 不会接收新任务
- 单已提交任务会执行完
- 此方法不会阻塞调用线程的执行
shutdownNow()
- 修改线程池状态为stop
- 会将队列中的任务返回
- 并用interrupt的方式中断正在执行的任务
isShutdown():是否是Running状态
isTerminated():是否是终结状态
awaitTermination():等待线程关闭
异步模式之工作线程
让有限的工作线程来轮流异步处理无限多的任务
注意:不同的任务类型使用不同的线程池,避免饥饿
饥饿
线程池中线程数不足导致的饥饿问题(任务一等待任务二,线程都执行任务一)
解决:不同任务,使用不同线程池
创建多少线程数合适
cpu密集型运算
- 通常采用核数+1能够实现最优cpu利用率,+1是保障当线程由于页缺失或其他原因导致暂停时,额外的这个线程能顶上去,保证cpu时钟周期不被浪费
I/O密集型运算(cpu不总是处于繁忙状态)
- 线程数=核数期望cpu利用率总时间(cpu计算时间+等待时间)/cpu计算时间
任务调度线程池
在任务调度线程池功能加入前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一线程来调度,因此所有任务都是串行执行,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响之后的任务
newScheduledThreadPool
Executors.newScheduledThreadPool(long nThread)
固定大小线程池
延时执行:pool.schedule(Runnale/Callable,int delay,TimeUnit)
固定频率执行(任务执行时间超过,会直接下一次任务):
scheduleAtFixedRate(Runnale,long initialDelay,long period,TimeUnit)
任务执行成功后,等待delay时间下次执行
scheduleWithFixedDelay(Runnale,long initialDelay,long Delay,TimeUnit)
异常处理
- try-catch线程捕获并处理
- submit方法,get方法会返回异常
案例(定时任务)
每周四18点整执行任务
scheduleAtFixedRate(任务对象,现在到周四18点时间,时间间隔,单位)
Fork/join线程池
分治思想、能够进行任务拆分的cpu密集型运算,常用与递归,例如(归并排序)
使用:
- 继承RecursiveTask<T>(有返回值)
- 继承RecursiveAcitve(无返回值)
| 对比 | 普通线程池 | Fork/Join |
|---|---|---|
| 等待 | 阻塞 | 可执行等待 |
| CPU利用率 | 低 | 高 |
| 线程状态 | 挂起 | 继续工作 |
简单说就是fork交给一个线程去干(可能是在对应的阻塞队列里面)
join时,如果没结果我去干其他事,有结果我直接执行放回
JUC
AQS原理
abstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
- 用state属性来表示资源状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState:获取state状态
- setState:设置state状态
- compareAndSetState:乐观锁机制设置state状态
- 独占模式是只用一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于FIFO的等待队列,类似Monitor的EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似Monitor的waitSet
子类主要实现的方法(默认抛出UnsupportedOperationExeception)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
ReentrantLock原理
默认非公平锁(NonfairSync())
-
先尝试 CAS 抢锁(非公平直接抢)
-
失败 → 进入 AQS 队列(addWaiter)
-
进入 acquireQueued 自旋:
循环:
- 如果前驱是 head:
尝试获取锁(CAS)
成功 → 设置自己为 head,返回 - 否则:
判断是否应该 park:
- 如果前驱状态是 SIGNAL(-1) → park 挂起
- 否则 → 将前驱状态改为 SIGNAL(-1),继续下一轮
- 如果前驱是 head:
在非公平锁中,线程首先通过 CAS 直接尝试获取锁,体现非公平性;若失败则进入 AQS 队列,一旦进入队列,线程将按照 FIFO 顺序竞争锁。队列中只有 head 的后继节点才有资格尝试获取锁,从而避免无效竞争。节点入队需要通过 CAS 修改 tail 指针以保证并发安全,而 head 的更新由于只会由成功获取锁的线程执行,因此不需要 CAS。
可重入原理
重入时state++(当前线程是owner)
释放时state–(state为0时释放成功)
lock()
→ CAS 抢锁(非公平特性)
→ 失败 → acquire(1)
→ tryAcquire()
→ 失败 → addWaiter()
→ acquireQueued()
addWaiter(Node.EXCLUSIVE)
👉 把当前线程包装成 Node,加到 tail(CAS)
核心:acquireQueued(灵魂循环)
acquireQueued(node, arg)
本质是一个死循环:
循环:
如果前驱是 head:
tryAcquire()
成功 → setHead → 返回
否则:
shouldParkAfterFailedAcquire()
→ 决定是否 park
可打断原理
默认不可打断,即使被打断,仍会驻留在AQS队列中,等获得锁后方可继续运行(是继续运行!只是打断标记设置为true)
可打断模式调用acquireInterruptibly,当被打断之后抛出异常
公平锁
其实就是少了上来直接cas,如果state为0我需要判断队列是否为空,如果不为0需要判断是否重入
条件变量
await:每个条件变量对应一个等待列表,实现类是ConditionObject
signal:把条件队列中的线程节点转移到AQS同步队列尾部,并建立前驱SIGNAL关系,等待重新竞争锁
ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能
类似数据库中的select … from … lock in share mode
提供了一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法
读锁不支持条件变量,写锁支持
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
原理
用的同一个Sycn同步器,因此等待队列,state等也是一个
写锁状态占state的低16位,而读锁使用的是state的高16位
原理类似:只是需要判断锁类型,然后进行之后操作,同步队列里面有读写两种节点类型
StampedLock
jdk8引入,进一步优化读性能,特点是在使用读锁、写锁时都必须配合戳使用
乐观读:stampedLock支持tryOptimisticRead()方法(乐观读),读取完毕后需要做一次戳校验如果校验通过,表示期间确实没有写操作,数据可以安全使用,如果校验不通过,需要重新获取读锁,保证数据安全
long stamp=lock.tryOptimisticRead();
验证戳:lock.validate(stamp)
stampedLock不支持条件变量
StampedLock不支持可重入
Semaphore(信号量)
用来限制能同时访问共享资源的线程上限
使用semaphore可以做简单的限流在访问高峰期,让请求阻塞,访问高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数
原理
许可通过构造方法传入state
CountdownLatch(倒计时锁)
用来进行线程间同步协作,等待所有线程完成倒计时
其中构造参数用来初始化等待计数值,await()用来等待计数归零countDown()用来让计数减一
Cyclicbarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数,构造时设置计数个数,每个线程执行到某个需要同步的时刻调用await()方法进行等待,当等待的线程数满足计数个数时,继续执行
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)