线程池ThreadPoolExecutor配置、参数详解及例子
对于线程池,我仅限于简单的使用,对其原理和参数并不是很理解。以前只是在网上找个线程池的例子,然后模仿写一下而已,最近项目中又再次用的线程池,做了一些研究,现记录下,以备以后使用。
我以前利用线程池只会这样用:
ExecutorService pool = Executors.newFixedThreadPool(5); pool.execute(new Runnable() { @Override public void run() { //具体线程要干的活 } });
这个只是创建一个固定线程数为5的线程池(当然还有其他3种)。
这种写法有一定的弊端,当系统并发的增长量远远高于线程池的消费量(比如系统是每秒中增加20个并发,而线程池是每秒钟执行5个线程)。这就有可能造成服务器内存溢出的风险。原因是newFixedThreadPool采用了无边界队列。可以从源码中一窥究竟。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
这里LinkedBlockingQueue是无边界队列。ThreadPoolExecutor构造中的参数文章下面会逐一讲解。总之上面的写法不灵活,对于一些并发高的系统是不满足的。
下面我们开始步入正题,开始研究下ThreadPoolExecutor线程池。
第一步,研究ThreadPoolExecutor线程池有几种构造方法
第一个构造: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
第二个构造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
第三个构造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
第四个构造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
总结:从上面四个构造可以看出,其实前三个构造都是调用的第四个构造方法。所以我们研究第四个构造即可。
第二步,研究第四个构造的每个参数
corePoolSize: 线程池核心线程数量 int类型
maximumPoolSize:线程池允许线程的最大数量 int类型
keepAliveTime: 线程池中线程所允许的空闲时间。long类型
JDK解释:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
也就是说啊,线程池中当前的空闲线程服务完某任务后的存活时间。如果时间足够长,那么可能会服务其它任务。
这些空闲线程指的是新建线程还是核心线程,还是两者都包含。这个不是很清楚。(哪位大神知道可以告诉我下)
unit: 线程池维护线程所允许的空闲时间的单位
MICROSECONDS 微秒 一百万分之一秒(就是毫秒/1000)
MILLISECONDS 毫秒 千分之一秒
NANOSECONDS 毫微秒 十亿分之一秒(就是微秒/1000)
SECONDS 秒
MINUTES 分钟
HOURS 小时
DAYS 天
workQueue: 线程池所使用的缓冲队列(关于队列的详解在后面第四步)
直接提交 SynchronousQueue
无界队列 如LinkedBlockingQueue
有界队列 如ArrayBlockingQueue
handler: 线程池对拒绝任务的处理策略
ThreadPoolExecutor.AbortPolicy() 抛出java.util.concurrent.RejectedExecutionException异常。
ThreadPoolExecutor.CallerRunsPolicy() 重试添加当前的任务,他会自动重复调用execute()方法。
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy() 抛弃当前的任务。
threadFactory:线程工厂,主要用来创建线程:默认值 DefaultThreadFactory
第三步,有一个例子,对线程池描述的非常贴切。
有一个工厂,核定工人数量为10,每个工人同时只做一件事,
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待
如果队列workQueue的容量为1,且现在10个工人都在干活,
又来一个任务A,则把A放到到队列中等待有空闲工人来从队列中拿任务。
但是有这么一种情况,来的不是一个任务,是5个任务,且10个工人都没有空闲,且队列也放满了(1个任务就满了哦)
这时候,老板一看来的活多,干不过来了,怎么办,老板就想招了,再招5个临时工吧,这时候工厂就是15个工人。
总结:corePoolSize=10;maximumPoolSize=15。当任务数大于corePoolSize时,是先向队列里放,当队列里也满了,还有任务不断进来,才新建线程哦。且最大不会超过maximumPoolSize。
问题来了,如果队列满了,且线程数到最大线程数了,还进来任务怎么办呢。这就用到了handler异常策略(或者叫饱和策略)。
第四步,缓冲队列详解。
4.1、直接提交 SynchronousQueue
该队列是将任务直接提交给线程而不保存它们。在此,如果不存在空闲的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。 SynchronousQueue线程安全的Queue,可以存放若干任务(但当前只允许有且只有一个任务在等待),其中每个插入操作必须等待另一个线程的对应移除操作,也就是说A任务进入队列,B任务必须等A任务被移除之后才能进入队列,否则执行异常策略。你来一个我扔一个,所以说SynchronousQueue没有任何内部容量。
比如:核心线程数为2,最大线程数为3;使用SynchronousQueue。
当前有2个核心线程在运行,又来了个A任务,两个核心线程没有执行完当前任务,根据如果运行的线程等于或多于 corePoolSize,
则 Executor 始终首选将请求加入队列,而不添加新的线程。所以A任务被添加到队列,此时的队列是SynchronousQueue,
当前不存在可用于立即运行任务的线程,因此会构造一个新的线程,此时又来了个B任务,两个核心线程还没有执行完。
新创建的线程正在执行A任务,所以B任务进入Queue后,最大线程数为3,发现没地方仍了。就只能执行异常策略(RejectedExecutionException)。
4.2、无界队列 LinkedBlockingQueue
使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有核心线程都在忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就没意义了。)也就不会有新线程被创建,都在那等着排队呢。如果未指定容量,则它等于 Integer.MAX_VALUE。如果设置了Queue预定义容量,则当核心线程忙碌时,新任务会在队列中等待,直到超过预定义容量(新任务没地方放了),才会执行异常策略。你来一个我接一个,直到我容不下你了。FIFO,先进先出。
比如:核心线程数为2,最大线程数为3;使用LinkedBlockingQueue(1),设置容量为1。
当前有2个核心线程在运行,又来了个A任务,两个核心线程没有执行完当前任务,根据如果运行的线程等于或多于 corePoolSize,
则 Executor 始终首选将请求加入队列,而不添加新的线程。所以A任务被添加到队列,此时的队列是LinkedBlockingQueue,
此时又来了个B任务,两个核心线程没有执行完当前任务,A任务在队列中等待,队列已满。所以根据如果无法将请求加入队列,则创建新的线程,
B任务被新创建的线程所执行,此时又来个C任务,此时maximumPoolSize已满,队列已满,只能执行异常策略(RejectedExecutionException)。
4.3 有界队列 ArrayBlockingQueue
操作模式跟LinkedBlockingQueue查不多,只不过必须为其设置容量。所以叫有界队列。new ArrayBlockingQueue<Runnable>(Integer.MAX_VALUE) 跟 new LinkedBlockingQueue(Integer.MAX_VALUE)效果一样。LinkedBlockingQueue 底层是链表结构,ArrayBlockingQueue 底层是数组结构。你来一个我接一个,直到我容不下你了。FIFO,先进先出。
总结下:
使用无界队列,要防止任务增长的速度远远超过处理任务的速度,控制不好可能导致的结果就是内存溢出。
使用有界队列,关键在于调节线程数和Queue大小 ,线程数多,队列容量少,资源浪费。线程数少,队列容量多,性能低,还可能导致内存溢出。
第五步,我的一个实际应用例子代码如下(我采用的是xml配置式,更加灵活)
5.1、xml中配置线程池ThreadPoolExecutor
<!--线程池配置开始 --> <bean id="linkQueue" class="java.util.concurrent.LinkedBlockingQueue"> <constructor-arg name="capacity" value="${thread.pool.linkQueue.size}"/> </bean> <bean id="pool" class="java.util.concurrent.ThreadPoolExecutor"> <constructor-arg name="corePoolSize" value="${thread.pool.core.size}"/> <constructor-arg name="maximumPoolSize" value="${thread.pool.max.size}"/> <constructor-arg name="keepAliveTime" value="${thread.pool.keep.alive.time}"/> <constructor-arg name="unit" value="${thread.pool.time.unit}"/> <constructor-arg name="workQueue" ref="linkQueue"/> </bean> <!--线程池配置结束 -->
5.2、properties文件中配置如下:
#线程池相关配置 #LinkedBlockingQueue队列容量 thread.pool.linkQueue.size=10 #线程池核心线程数 thread.pool.core.size=5 #线程池最大线程数 thread.pool.max.size=10 #空闲线程最大存活时间 thread.pool.keep.alive.time=2 /#时间单位 SECONDS(秒) MILLISECONDS(毫秒) MICROSECONDS(微秒) thread.pool.time.unit=SECONDS
5.3、后台java代码
public class SaveEventAdviceHolder { @Autowired private ThreadPoolExecutor pool; public void saveSerializeObjEvent(Object[] objects,SaveEventBO bo){ //设置线程池的异常策略为CallerRunsPolicy() RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); pool.setRejectedExecutionHandler(handler); pool.execute(new Runnable() { public void run() { //线程具体执行任务..... }); } }
更多推荐
所有评论(0)