关于ThreadPoolExecutor的分析已经有很多,比较著名的诸如支付宝 清英 的聊聊并发(三)Java线程池的分析和使用,饭饭泛的《深入浅出 Java Concurrency》等,对于ThreadPoolExecutor的分析和介绍已经非常的清楚,但是博主笨拙,对于ThreadPoolExecutor源码研读了一段时间之后才理解其中的奥妙所在,本博客主要从以下两个方面ThreadPoolExecutor进行介绍,①、ThreadPoolExecutor当中corePoolSize中线程的保持,即ThreadPoolExecutor源码当中介绍的 常驻线程池当中的线程(corePoolSize the number of threads to keep in the pool, even if they are idle.);②、ThreadPoolExecutor当中keepAliveTime线程保活时间的使用(keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.);③、参考《Java并发编程实战》和清英大神设计的ThreadPoolExecutor工具类。

1 ThreadPoolExecutor的状态流转

        ThreadPoolExecutor共有四种状态,RUNNING、SHUTDOWN、STOP、TERMINATED,这四种状态下,ThreadPoolExecutor对任务的处理流程如下:
  • RUNNING:接受新任务并且处理任务队列当中的任务
  • SHUTDOWN:不接受新任务,但是处理任务队列当中的任务
  • STOP :不接受新任务,并且不处理任务队列当中的任务,中断正在处理的线程     
  • TERMINATED:同STOP类似,并且所有的线程都已经被中断

        四种状态的转换图如下:


2 ThreadPoolExecutor对任务的处理流程

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //如果线程数小于基本线程数,则创建线程并执行当前任务
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            //如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
                //如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
            } else if (!addIfUnderMaximumPoolSize(command))
                //拒绝任务
                reject(command); // is shutdown or saturated
        }
    }
        ThreadPoolExecutor处理任务的具体流程图如下:

3 ThreadPoolExecutor当中核心线程池的保持及keepAliveTime线程保活时间的使用

        有了以上两点后,我们来看一下ThreadPoolExecutor核心线程池的保持方法和keepAliveTime的应用方法。我们还是从ThreadPoolExecutor.execute(task);方法入手,首先我们建立线程池之后会调用ThreadPoolExecutor.execute(task)方法,如果当前线程数小于核心线程数,则调用
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
当中的addIfUnderCorePoolSize()方法,
 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        //其他操作
        Thread t = null;
        t = addThread(firstTask);
        t.start();//在这里开启线程执行任务,执行的其实是封装了任务的worker!
        //其他操作
    }
        addIfUnderCorePoolSize方法会调用addThread方法得到一个线程,然后开启这个线程。addThread方法如下:
 private Thread addThread(Runnable firstTask) {
        //使用Worker对任务进行封装
        Worker w = new Worker(firstTask);
        //获得线程,并将此线程关联到Worker而非firstTask(这里是要注意的一点!)
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            //将线程赋值为Worker中的引用
            w.thread = t;
            //放入任务队列
            workers.add(w);
            //线程池当中线程的数量+1
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }
        通过addThread方法,我们知道,线程所关联的任务不是外部所定义的原生任务,而是包装了任务的Worker,所以在addIfUnderCorePoolSize中所执行的t.start()方法,开启的是Worker当中的run()方法,而非原生任务当中的run方法。我们来看一下Worker当中的run()方法。
 public void run() {
            try {
                Runnable task = firstTask; //在核心线程数之下时,其不为空;
                firstTask = null;
		//无限循环获取工作队列里的任务来执行  getTask()是核心线程的保持和多余核心线程keepAliveTime时间的保活的关键点!!!
                while (task != null || (task = getTask()) != null) {
		     //不断的执行任务
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);//线程完成工作之后,就会从线程队列当中删除。前提是退出上面的while循环!要看懂getTask()方法!!
            }
        }
        通过Work的run()方法,我们可以知道,一个线程开启之后,就不断的循环,从任务队列当中取任务,做任务。而这里的关紧点就在while循环当中的getTask方法。
Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)//如果当前状态值大于SHUTDOWN,即为STOP或TERMINATED,这是返回null,不执行任务!
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // 如果线程池已经shutdown了,则从任务队列当中取任务。
                    r = workQueue.poll();
		   //当前线程数超出了核心线程池线程数时,才对超出核心线程的线程采用keepAliveTime保活策略
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
		   //keepAliveTime保活的关键。最长等待keepAliveTime时间,在此期间,如果有任务则返回任务。
		   //如果在keepAliveTimes时间内没有元素返回,而超出了等待时间超出了keepAliveTime,则返回 null
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
		   //当前线程数没有超出了核心线程池线程数时,一直等待到有任务返回! 
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
        整个核心线程池的保持和多余线程的保活操作的 实现都基于阻塞队列workQueue实现,当前线程数小于核心线程数时,核心线程池当中的线程一直阻塞等待任务的到来,等任务来了之后,getTask()返回就不为null,这样,线程就可以开始做任务,而不用执行workerDone()方法,将自己从线程池当中移除出去。而线程数大于核心线程池数时,线程获得任务的时间是有限制的,在keepAliveTime时间内,如果线程得不到任务的话,就会返回,这样getTask()任务返回为null,执行workerDone()方法,将自己从线程池当中删除!过程图如下:

4 ThreadPoolExecutor在项目当中的使用实例

        一般一个项目当中最好配置一个公用的线程池工具类,这样的话,项目中需要开启线程时,从线程池当中取即可,减少随意出现线程创建,线程的关闭,线程切换所带来的性能开销。参考《Java并发编程实战》和清英大神设计的ThreadPoolExecutor工具类如下:
package com.yang.thread;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

/**
 * @author Yangzl2008
 */
public class ThreadPoolManager {
    /**
     * 定时执行线程个数*
     */
    private final static int minSchedule = 1;
    /**
     * 核心线程个数 IO密集型设置为:2*Ncpu   CPU密集型设置为:Ncpu+1 *
     */
    private static int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
    /**
     * 最大线程个数 设置为50个线程 *
     */
    private static int maximumPoolSize = 50;
    /**
     * 线程存活时间 尝试设置为30秒 因为newCachedThreadPool设置为60秒*
     */
    private static int keepAliveTime = 30;
    /**
     * 线程池实例化*
     */
    private static ThreadPoolManager threadPoolManage = new ThreadPoolManager();
    /** **/
    private ThreadPoolExecutor threadPool;
    /**
     * 设置一个有界队列以存放被拒绝的任务 *
     */
    private ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(10000);
    /**
     * 延时执行线程*
     */
    private ScheduledExecutorService appSchedule;

    /**
     * 线程池构造方法
     */
    private ThreadPoolManager() {
        RejectedExecutionHandler myHandler = new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r,
                                          ThreadPoolExecutor executor) {
                taskQueue.offer(r);
            }
        };
        Runnable command = new

                Runnable() {
                    public void run() {
                        Runnable task = null;
                        try {
                            task = taskQueue.take();//使用具备阻塞特性的方法
                        } catch (InterruptedException e) {
                            return;
                        }
                        threadPool.execute(task);
                    }
                };
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(minSchedule);
        scheduledPool.scheduleWithFixedDelay(command, 0L, 1L, TimeUnit.SECONDS);

        //有界队列大小设置为2000
        threadPool = new TimingThreadPool(corePoolSize, maximumPoolSize,
                keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), myHandler);

        appSchedule = Executors.newScheduledThreadPool(minSchedule);
    }

    /**
     * 得到线程池的实例
     *
     * @return
     */
    public static ThreadPoolManager getInstance() {
        return threadPoolManage;
    }

    /**
     * 任务添加到线程池中
     *
     * @param paramRunnable
     */
    public void addExecuteTask(Runnable paramRunnable) {
        if (paramRunnable == null)
            return;
        this.threadPool.execute(paramRunnable);
    }

    /**
     * 延时任务添加到线程池中
     *
     * @param paramRunnable
     */
    public void addDelayExecuteTask(Runnable paramRunnable, int delayTime) {
        appSchedule.schedule(new DelayTask(paramRunnable), delayTime,
                TimeUnit.MILLISECONDS);
    }

    /**
     * 获取线程池的信息
     * taskCount:线程池需要执行的任务数量。
     * completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
     * largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。
     * 如等于线程池的最大大小,则表示线程池曾经满了。
     * poolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
     * activeCount:获取活动的线程数。
     *
     * @return
     */
    public String getThreadPoolInfo() {
        StringBuilder sb = new StringBuilder();
        sb.append("taskCount is ").append(threadPool.getTaskCount())
                .append("; completedTaskCount is ").append(threadPool.getCompletedTaskCount())
                .append("; largestPoolSize is ").append(threadPool.getLargestPoolSize())
                .append("; poolSize is ").append(threadPool.getPoolSize())
                .append("; activeCount is ").append(threadPool.getActiveCount());
        return sb.toString();
    }

    private final class TimingThreadPool extends ThreadPoolExecutor {

        private final Logger _log = Logger.getLogger("TimingThreadPool");
        //开始时间放到startTime为键值的ThreadLocalMap当中当中
        private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
        private final AtomicLong numTask = new AtomicLong();
        private final AtomicLong totalTime = new AtomicLong();


        public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            startTime.set(System.currentTimeMillis());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                long endTime = System.currentTimeMillis();
                long taskTime = endTime - startTime.get();
                _log.info(String.format("TimingThreadPool task cost time is %s", taskTime));
                _log.info(getThreadPoolInfo());
                totalTime.addAndGet(taskTime);
                numTask.incrementAndGet();
            } finally {
                super.afterExecute(r, t);
            }
        }

        @Override
        protected void terminated() {
            try {
                _log.info(String.format("Terminated totalTime is %dms and numTask is %d avg time=%dms", totalTime.get(), numTask.get(), totalTime.get() / numTask.get()));
            } finally {
                super.terminated();
            }
        }
    }

    /**
     * 延时任务*
     */
    class DelayTask implements Runnable {

        private Runnable task;

        public DelayTask(Runnable paramTask) {
            this.task = paramTask;
        }

        public void run() {
            ThreadPoolManager.getInstance().addExecuteTask(task);
        }
    }
}


GitHub 加速计划 / th / ThreadPool
3
0
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:4 个月前 )
9a42ec13 - 10 年前
fcc91415 - 10 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐