线程池

ExecrtorServise接口
ThreadPoolExecutor实现类(最基本线程池)

ScheduledExecutorService 接口
ScheduledThreadPoolExecutor实现类(同时实现ExecrtorServise接口 ),带有任务调度的线程池

ThreadPoolExecutor

使用int的高3位表示线程池状态,低29位表示线程数量
线程池状态:

状态 含义 是否接收新任务 是否处理队列任务 说明
RUNNING 运行中 默认状态,可以接收新任务并处理阻塞队列中的任务
SHUTDOWN 关闭中 不再接收新任务,但会处理队列中已有任务
STOP 停止 不接收新任务,不处理队列任务,还会中断正在执行的线程
TIDYING 整理 所有任务执行完,线程池即将结束
TERMINATED 已终止 线程池完全关闭

为什么存在一个int中:
将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值

构造方法

corePoolSize:核心线程数目(最多保留的线程数)
maximumPoolSize:最大线程数目
keepAliveTime:生存时间-针对救急线程
unit:时间单位-针对救急线程
workQueue:阻塞队列
threadFactory:线程工厂-可以为线程创建时起个好名字
handler:拒绝策略

拒绝策略:

策略 类名 行为
AbortPolicy 默认策略 直接抛异常
CallerRunsPolicy 调用者运行 由提交任务的线程执行
DiscardPolicy 丢弃任务 静默丢弃
DiscardOldestPolicy 丢弃最旧任务 丢弃队列头任务,然后重试提交

dubbo,抛异常前记录日志,方便问题排查
Netty:创建新线程执行任务
ActiveMQ:等待时间内尝试放入消息队列
PinPoint:使用拒绝策略链,尝试策略链中每种拒绝策略

根据这个构造方法,Executors类中提供了众多工厂方法创建各种用途线程池

newFixedThreadPool

Executors.newFixedThreadPool(int nThreads)
没有救急线程
阻塞队列是无界的
适用于任务量已知,相对耗时的任务

newCachedThreadPool

Executors.newCachedThreadPool()
核心线程数:0
最大线程数:Integer.MAX_VALUE
生存时间:60s
队列:SynchronizedQueue(没有容量,没有线程来取是放不进去的)

适合任务数比较密集,但每个任务执行时间较短

newSingleThreadExecutor

Executors.newSingleThreadExecutor()
核心线程:1
最大线程:1
队列:阻塞无界队列

希望多个任务排队执行

与自己主线程执行的区别:当出现错误,线程池会创建线程继续执行
与参数为1的fixed线程池区别:
single不能修改线程数

  • 返回FinalizableDelegateExecutorService对象,应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有方法

fixed可以强转修改线程数

  • 返回ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改

提交任务

执行任务
void execute(Runnable command)

有返回值的Callable任务
<T> Future<T>submit(Callable<T> task)

提交tasks中所有任务,带超时时间
invokeAll(Collection <? extends Callable<T>> task,long timeout,TimeUnit unit)

提交tasks中所有任务,那个任务先完成,返回此项任务执行结果,其他任务取消
invokeAny(Collection <? extends Callable<T>> task,long timeout,TimeUnit unit)

关闭线程池

shutdow()

  • 修改线程池状态为shutdown
  • 不会接收新任务
  • 单已提交任务会执行完
  • 此方法不会阻塞调用线程的执行

shutdownNow()

  • 修改线程池状态为stop
  • 会将队列中的任务返回
  • 并用interrupt的方式中断正在执行的任务

isShutdown():是否是Running状态
isTerminated():是否是终结状态
awaitTermination():等待线程关闭

异步模式之工作线程

让有限的工作线程来轮流异步处理无限多的任务
注意:不同的任务类型使用不同的线程池,避免饥饿

饥饿

线程池中线程数不足导致的饥饿问题(任务一等待任务二,线程都执行任务一)

解决:不同任务,使用不同线程池

创建多少线程数合适

cpu密集型运算

  • 通常采用核数+1能够实现最优cpu利用率,+1是保障当线程由于页缺失或其他原因导致暂停时,额外的这个线程能顶上去,保证cpu时钟周期不被浪费

I/O密集型运算(cpu不总是处于繁忙状态)

  • 线程数=核数期望cpu利用率总时间(cpu计算时间+等待时间)/cpu计算时间

任务调度线程池

在任务调度线程池功能加入前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是由同一线程来调度,因此所有任务都是串行执行,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响之后的任务

newScheduledThreadPool

Executors.newScheduledThreadPool(long nThread)
固定大小线程池

延时执行:pool.schedule(Runnale/Callable,int delay,TimeUnit)

固定频率执行(任务执行时间超过,会直接下一次任务):
scheduleAtFixedRate(Runnale,long initialDelay,long period,TimeUnit)

任务执行成功后,等待delay时间下次执行
scheduleWithFixedDelay(Runnale,long initialDelay,long Delay,TimeUnit)

异常处理

  • try-catch线程捕获并处理
  • submit方法,get方法会返回异常

案例(定时任务)

每周四18点整执行任务
scheduleAtFixedRate(任务对象,现在到周四18点时间,时间间隔,单位)

Fork/join线程池

分治思想、能够进行任务拆分的cpu密集型运算,常用与递归,例如(归并排序)

使用:

  • 继承RecursiveTask<T>(有返回值)
  • 继承RecursiveAcitve(无返回值)
对比 普通线程池 Fork/Join
等待 阻塞 可执行等待
CPU利用率
线程状态 挂起 继续工作

简单说就是fork交给一个线程去干(可能是在对应的阻塞队列里面)
join时,如果没结果我去干其他事,有结果我直接执行放回

JUC

AQS原理

abstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:

  • 用state属性来表示资源状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState:获取state状态
    • setState:设置state状态
    • compareAndSetState:乐观锁机制设置state状态
    • 独占模式是只用一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于FIFO的等待队列,类似Monitor的EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似Monitor的waitSet

子类主要实现的方法(默认抛出UnsupportedOperationExeception)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

ReentrantLock原理

默认非公平锁(NonfairSync())

  1. 先尝试 CAS 抢锁(非公平直接抢)

  2. 失败 → 进入 AQS 队列(addWaiter)

  3. 进入 acquireQueued 自旋:

    循环:

    • 如果前驱是 head:
      尝试获取锁(CAS)
      成功 → 设置自己为 head,返回
    • 否则:
      判断是否应该 park:
      - 如果前驱状态是 SIGNAL(-1) → park 挂起
      - 否则 → 将前驱状态改为 SIGNAL(-1),继续下一轮

在非公平锁中,线程首先通过 CAS 直接尝试获取锁,体现非公平性;若失败则进入 AQS 队列,一旦进入队列,线程将按照 FIFO 顺序竞争锁。队列中只有 head 的后继节点才有资格尝试获取锁,从而避免无效竞争。节点入队需要通过 CAS 修改 tail 指针以保证并发安全,而 head 的更新由于只会由成功获取锁的线程执行,因此不需要 CAS。

可重入原理

重入时state++(当前线程是owner)
释放时state–(state为0时释放成功)

lock()
→ CAS 抢锁(非公平特性)
→ 失败 → acquire(1)
→ tryAcquire()
→ 失败 → addWaiter()
→ acquireQueued()

addWaiter(Node.EXCLUSIVE)

👉 把当前线程包装成 Node,加到 tail(CAS)

核心:acquireQueued(灵魂循环)
acquireQueued(node, arg)

本质是一个死循环:

循环:
如果前驱是 head:
tryAcquire()
成功 → setHead → 返回
否则:
shouldParkAfterFailedAcquire()
→ 决定是否 park

可打断原理

默认不可打断,即使被打断,仍会驻留在AQS队列中,等获得锁后方可继续运行(是继续运行!只是打断标记设置为true)

可打断模式调用acquireInterruptibly,当被打断之后抛出异常

公平锁

其实就是少了上来直接cas,如果state为0我需要判断队列是否为空,如果不为0需要判断是否重入

条件变量

await:每个条件变量对应一个等待列表,实现类是ConditionObject

signal:把条件队列中的线程节点转移到AQS同步队列尾部,并建立前驱SIGNAL关系,等待重新竞争锁

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能
类似数据库中的select … from … lock in share mode
提供了一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法

读锁不支持条件变量,写锁支持

重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

原理

用的同一个Sycn同步器,因此等待队列,state等也是一个

写锁状态占state的低16位,而读锁使用的是state的高16位

原理类似:只是需要判断锁类型,然后进行之后操作,同步队列里面有读写两种节点类型

StampedLock

jdk8引入,进一步优化读性能,特点是在使用读锁、写锁时都必须配合戳使用

乐观读:stampedLock支持tryOptimisticRead()方法(乐观读),读取完毕后需要做一次戳校验如果校验通过,表示期间确实没有写操作,数据可以安全使用,如果校验不通过,需要重新获取读锁,保证数据安全

long stamp=lock.tryOptimisticRead();
验证戳:lock.validate(stamp)

stampedLock不支持条件变量
StampedLock不支持可重入

Semaphore(信号量)

用来限制能同时访问共享资源的线程上限

使用semaphore可以做简单的限流在访问高峰期,让请求阻塞,访问高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数

原理

许可通过构造方法传入state

CountdownLatch(倒计时锁)

用来进行线程间同步协作,等待所有线程完成倒计时
其中构造参数用来初始化等待计数值,await()用来等待计数归零countDown()用来让计数减一

Cyclicbarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数,构造时设置计数个数,每个线程执行到某个需要同步的时刻调用await()方法进行等待,当等待的线程数满足计数个数时,继续执行

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐