ThreadPoolExecutor的分析与使用
ThreadPool
A simple C++11 Thread Pool implementation
项目地址:https://gitcode.com/gh_mirrors/th/ThreadPool
免费下载资源
·
关于ThreadPoolExecutor的分析已经有很多,比较著名的诸如支付宝 清英 的聊聊并发(三)Java线程池的分析和使用,饭饭泛的《深入浅出 Java Concurrency》等,对于ThreadPoolExecutor的分析和介绍已经非常的清楚,但是博主笨拙,对于ThreadPoolExecutor源码研读了一段时间之后才理解其中的奥妙所在,本博客主要从以下两个方面ThreadPoolExecutor进行介绍,①、ThreadPoolExecutor当中corePoolSize中线程的保持,即ThreadPoolExecutor源码当中介绍的 常驻线程池当中的线程(corePoolSize the number of threads to keep in the pool, even if they are idle.);②、ThreadPoolExecutor当中keepAliveTime线程保活时间的使用(keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.);③、参考《Java并发编程实战》和清英大神设计的ThreadPoolExecutor工具类。
1 ThreadPoolExecutor的状态流转
ThreadPoolExecutor共有四种状态,RUNNING、SHUTDOWN、STOP、TERMINATED,这四种状态下,ThreadPoolExecutor对任务的处理流程如下:
- RUNNING:接受新任务并且处理任务队列当中的任务
- SHUTDOWN:不接受新任务,但是处理任务队列当中的任务
- STOP :不接受新任务,并且不处理任务队列当中的任务,中断正在处理的线程
- TERMINATED:同STOP类似,并且所有的线程都已经被中断
四种状态的转换图如下:
2 ThreadPoolExecutor对任务的处理流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
//如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
} else if (!addIfUnderMaximumPoolSize(command))
//拒绝任务
reject(command); // is shutdown or saturated
}
}
ThreadPoolExecutor处理任务的具体流程图如下:
3 ThreadPoolExecutor当中核心线程池的保持及keepAliveTime线程保活时间的使用
有了以上两点后,我们来看一下ThreadPoolExecutor核心线程池的保持方法和keepAliveTime的应用方法。我们还是从ThreadPoolExecutor.execute(task);方法入手,首先我们建立线程池之后会调用ThreadPoolExecutor.execute(task)方法,如果当前线程数小于核心线程数,则调用
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
当中的addIfUnderCorePoolSize()方法,
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
//其他操作
Thread t = null;
t = addThread(firstTask);
t.start();//在这里开启线程执行任务,执行的其实是封装了任务的worker!
//其他操作
}
addIfUnderCorePoolSize方法会调用addThread方法得到一个线程,然后开启这个线程。addThread方法如下:
private Thread addThread(Runnable firstTask) {
//使用Worker对任务进行封装
Worker w = new Worker(firstTask);
//获得线程,并将此线程关联到Worker而非firstTask(这里是要注意的一点!)
Thread t = threadFactory.newThread(w);
if (t != null) {
//将线程赋值为Worker中的引用
w.thread = t;
//放入任务队列
workers.add(w);
//线程池当中线程的数量+1
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
通过addThread方法,我们知道,线程所关联的任务不是外部所定义的原生任务,而是包装了任务的Worker,所以在addIfUnderCorePoolSize中所执行的t.start()方法,开启的是Worker当中的run()方法,而非原生任务当中的run方法。我们来看一下Worker当中的run()方法。
public void run() {
try {
Runnable task = firstTask; //在核心线程数之下时,其不为空;
firstTask = null;
//无限循环获取工作队列里的任务来执行 getTask()是核心线程的保持和多余核心线程keepAliveTime时间的保活的关键点!!!
while (task != null || (task = getTask()) != null) {
//不断的执行任务
runTask(task);
task = null;
}
} finally {
workerDone(this);//线程完成工作之后,就会从线程队列当中删除。前提是退出上面的while循环!要看懂getTask()方法!!
}
}
通过Work的run()方法,我们可以知道,一个线程开启之后,就不断的循环,从任务队列当中取任务,做任务。而这里的关紧点就在while循环当中的getTask方法。
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)//如果当前状态值大于SHUTDOWN,即为STOP或TERMINATED,这是返回null,不执行任务!
return null;
Runnable r;
if (state == SHUTDOWN) // 如果线程池已经shutdown了,则从任务队列当中取任务。
r = workQueue.poll();
//当前线程数超出了核心线程池线程数时,才对超出核心线程的线程采用keepAliveTime保活策略
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//keepAliveTime保活的关键。最长等待keepAliveTime时间,在此期间,如果有任务则返回任务。
//如果在keepAliveTimes时间内没有元素返回,而超出了等待时间超出了keepAliveTime,则返回 null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//当前线程数没有超出了核心线程池线程数时,一直等待到有任务返回!
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
整个核心线程池的保持和多余线程的保活操作的
实现都基于阻塞队列workQueue实现,当前线程数小于核心线程数时,核心线程池当中的线程一直阻塞等待任务的到来,等任务来了之后,getTask()返回就不为null,这样,线程就可以开始做任务,而不用执行workerDone()方法,将自己从线程池当中移除出去。而线程数大于核心线程池数时,线程获得任务的时间是有限制的,在keepAliveTime时间内,如果线程得不到任务的话,就会返回,这样getTask()任务返回为null,执行workerDone()方法,将自己从线程池当中删除!过程图如下:
4 ThreadPoolExecutor在项目当中的使用实例
一般一个项目当中最好配置一个公用的线程池工具类,这样的话,项目中需要开启线程时,从线程池当中取即可,减少随意出现线程创建,线程的关闭,线程切换所带来的性能开销。参考《Java并发编程实战》和清英大神设计的ThreadPoolExecutor工具类如下:
package com.yang.thread;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
/**
* @author Yangzl2008
*/
public class ThreadPoolManager {
/**
* 定时执行线程个数*
*/
private final static int minSchedule = 1;
/**
* 核心线程个数 IO密集型设置为:2*Ncpu CPU密集型设置为:Ncpu+1 *
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
/**
* 最大线程个数 设置为50个线程 *
*/
private static int maximumPoolSize = 50;
/**
* 线程存活时间 尝试设置为30秒 因为newCachedThreadPool设置为60秒*
*/
private static int keepAliveTime = 30;
/**
* 线程池实例化*
*/
private static ThreadPoolManager threadPoolManage = new ThreadPoolManager();
/** **/
private ThreadPoolExecutor threadPool;
/**
* 设置一个有界队列以存放被拒绝的任务 *
*/
private ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(10000);
/**
* 延时执行线程*
*/
private ScheduledExecutorService appSchedule;
/**
* 线程池构造方法
*/
private ThreadPoolManager() {
RejectedExecutionHandler myHandler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
taskQueue.offer(r);
}
};
Runnable command = new
Runnable() {
public void run() {
Runnable task = null;
try {
task = taskQueue.take();//使用具备阻塞特性的方法
} catch (InterruptedException e) {
return;
}
threadPool.execute(task);
}
};
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(minSchedule);
scheduledPool.scheduleWithFixedDelay(command, 0L, 1L, TimeUnit.SECONDS);
//有界队列大小设置为2000
threadPool = new TimingThreadPool(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), myHandler);
appSchedule = Executors.newScheduledThreadPool(minSchedule);
}
/**
* 得到线程池的实例
*
* @return
*/
public static ThreadPoolManager getInstance() {
return threadPoolManage;
}
/**
* 任务添加到线程池中
*
* @param paramRunnable
*/
public void addExecuteTask(Runnable paramRunnable) {
if (paramRunnable == null)
return;
this.threadPool.execute(paramRunnable);
}
/**
* 延时任务添加到线程池中
*
* @param paramRunnable
*/
public void addDelayExecuteTask(Runnable paramRunnable, int delayTime) {
appSchedule.schedule(new DelayTask(paramRunnable), delayTime,
TimeUnit.MILLISECONDS);
}
/**
* 获取线程池的信息
* taskCount:线程池需要执行的任务数量。
* completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
* largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。
* 如等于线程池的最大大小,则表示线程池曾经满了。
* poolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
* activeCount:获取活动的线程数。
*
* @return
*/
public String getThreadPoolInfo() {
StringBuilder sb = new StringBuilder();
sb.append("taskCount is ").append(threadPool.getTaskCount())
.append("; completedTaskCount is ").append(threadPool.getCompletedTaskCount())
.append("; largestPoolSize is ").append(threadPool.getLargestPoolSize())
.append("; poolSize is ").append(threadPool.getPoolSize())
.append("; activeCount is ").append(threadPool.getActiveCount());
return sb.toString();
}
private final class TimingThreadPool extends ThreadPoolExecutor {
private final Logger _log = Logger.getLogger("TimingThreadPool");
//开始时间放到startTime为键值的ThreadLocalMap当中当中
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final AtomicLong numTask = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.currentTimeMillis();
long taskTime = endTime - startTime.get();
_log.info(String.format("TimingThreadPool task cost time is %s", taskTime));
_log.info(getThreadPoolInfo());
totalTime.addAndGet(taskTime);
numTask.incrementAndGet();
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
_log.info(String.format("Terminated totalTime is %dms and numTask is %d avg time=%dms", totalTime.get(), numTask.get(), totalTime.get() / numTask.get()));
} finally {
super.terminated();
}
}
}
/**
* 延时任务*
*/
class DelayTask implements Runnable {
private Runnable task;
public DelayTask(Runnable paramTask) {
this.task = paramTask;
}
public void run() {
ThreadPoolManager.getInstance().addExecuteTask(task);
}
}
}
GitHub 加速计划 / th / ThreadPool
3
0
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:4 个月前 )
9a42ec13 - 10 年前
fcc91415 - 10 年前
更多推荐
已为社区贡献1条内容
所有评论(0)