注意: 翻译自JDK1.8的源码

基本描述

ExecutorService是使用一种可能的线程池来执行每个被提交的任务, 一般配置使用Executors工厂方法.
线程池设法解决2个问题: 通常提供改良的性能当执行大量异步任务时, 由于减少了每个任务的调用支出以及提供一种约束和管理资源还有容纳当执行任务集合时消费他们的线程的手段.
每个ThreadPoolExecutor 也维持一些基本的统计,例如,已完成任务数量.

对于跨越大范围的上下文很有用,该类提供了很多可适应的参数和有扩展能力的钩子.
然而,程序员被敦促使用更方便的Executors工厂方法Executors#newCachedThreadPool(无限的线程池,并自动重用线程),Executors#newFixedThreadPool(固定线程大小), Executors#newSingleThreadExecutor(单后台线程), 它们为绝大部分场景预先设置好了. 另外,当手动配置和调制该类时按照下面的向导:

核心及最大线程池的规模(Core and maximum pool sizes)

ThreadPoolExecutor将自动调整线程池的规模(查看getPoolSize方法的解释),由核心线程池的规模(查看getCorePoolSize的解释),以及最大线程池的规模(查看getMaximumPoolSize的解释)被约束的设置.
当有新任务通过execute(Runnable)方法被提交时,并且少于核心线程池的线程在运行,那么一个新线程被创建去处理该请求,即使其他work thread是闲置的. 如果有超过核心线程池数的线程在运行,一个新的线程仅当队列已满时才会被创建. 通过设置corePoolSizemaximumPoolSize为相同的值,你创建一个固定规模的线程池. 通过设置maximumPoolSize为一个本质上无限的值,例如Integer.MAX_VALUE,则允许线程池容纳任意数量的并发任务. 最典型的情况是,核心和最大线程池的规模是仅在构造时被设置,但是它们也能被动态地改变通过使用setCorePoolSizesetMaximumPoolSize.

按需构造(On-demand construction)

默认情况下,实际上核心线程仅当有新任务抵达时被创建初始化然后开始, 但是该行为能被动态地覆盖使用prestartCoreThread或者prestartAllCoreThreads方法. 此外,你或许想重启线程池如果你以一个非空队列构造线程池.

创建新线程(Creating new threads)

新的线程使用ThreadFactory被创建. 除非另有说明,否则Executors#defaultThreadFactory被使用, 创建线程全为相同的ThreadGroup和相同的NORM_PRIORITY优先级和非守护进程状态. 通过提供一个不同的ThreadFactory,你能选择线程的名子,线程组,优先级,守护进程状态,等等. 假如ThreadFactory创建线程失败当newThread返回null时,executor将继续,但是可能会不能执行任何任务. 线程应该处理"modifyThread"的RuntimePermission. 如果工作线程或者其他线程使用线程出没有处理这个许可,服务可能被降级: 配置改变可能不及时生效,还有一个shutdown的线程池可能保留在termination,但没有完成的状态.

保活时间(Keep-alive times)

如果线程池当前有超过核心线程数的线程数,超过的线程将被terminated如果它们已经被闲置超过keepAliveTime(可查看getKeepAliveTime(TimeUnit)). 这种被提供的兽兽能煎炒资源的消费当线程池没有被使用时. 假如线程池在之后被更活跃,新线程将被构造. 这个参数也能被动态地改变,使用setKeepAliveTime(long, TimeUnit)方法. 使用Long.MAX_VALUE 以 TimeUnit#NANOSECONDS 单位值能有效禁用闲置线程在shut downterminating. 默认,保活策略运用仅当有超过核心线程数的线程时. 但是方法allowCoreThreadTimeOut(boolean)同样能对非核心线程运用超时策略,只要keepAliveTime值是非零.

排队(Queuing)

任何BlockingQueue能被用于传输和持有已提交的任务. 使用这个队列与线程池扩容互相作用:

  1. 如果少于核心线程数(corePoolSize)的线程在运行,Executor总是偏爱增加新线程而不是放入队列.
  2. 如果有超过核心线程数的线程在运行,Executor总是偏爱将新请求放入队列(BlockingQueue)而不是增加新线程.
  3. 如果请求不能被加入队列,新线程会被创建除非这超过maximumPoolSize,如果超过,任务将被拒绝.

总结: 优先填充corePoolSize数; 其次填充BlockingQueue; 最后填充maximumPoolSize; 最后全满,使用拒绝策略,拒绝后续任务.

有3种一般的排队(queuing)策略

  1. 直接传递(Direct Handoffs). 好的默认选择,对于工作队列是SynchronousQueue类的没有其他的持有直接传递任务给线程. 在这里,如果没有立即可用的线程来运行任务,则对任务进行排队的尝试将失败,因此将构造一个新线程. 该策略避免加锁当处理的请求集合有内部依赖时. 直接传递工厂需要无限制的最大线程池大小以此避免拒绝新提交的任务. 这打开了无限线程增长的许可当命令持续抵达的平均速度快速他们能处理的速度.
  2. 无限队列(Unbounded queues). 使用一个无限队列(例如LinkedBlockingQueue没有预定义的容量)将导致新任务在队列中等待当所有核心线程处于忙绿时. 因此,没有超过核心线程数的线程将被一直创建. (同时最大线程数将因此没有任何影响) 这是合适的当每个任务独立于其他任务时, 这样的任务不能影响其他的执行; 例如,在web页面服务. 这种风格的排队方式对于平滑的传输突发请求是很有用的, 当然也存在有这种可能性,当命令持续抵达的平均速度快于处理速度时工作队列会无限增长.
  3. 有限队列(Bounded queues).一个有限队列(例如,ArrayBlockingQueue)会帮助阻止资源耗尽,当使用无线最大线程池时,但是调制和控制更加困难. 队列的大小和最大线程池大小可能被相互权衡: 使用大的队列和小的线程池会产生最小的cpu,os资源,以及上下文切换支出的使用率,但是会导致人为的低吞吐量. 如果任务频繁的堵塞(例如,如果它们是因i/o堵塞的), 系统可能有能力调度超过你允许的更多线程. 使用小队列通常需要更大的线程池,这样能保持CPUs忙绿,但也许会遇到不可接受的调度开销,这样也会降低吞吐量.

被拒绝的任务(Rejected tasks)

execute(Runnable)提交新任务时,如果Executor已经被shutdown或者使用无限制的最大线程数和工作队列且容量已满,则会被拒绝. 对于上面的情况,将调用对应的RejectedExecutionHandler的RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor) 方法,该方法提供4种策略:

拒绝策略

  1. 默认策略ThreadPoolExecutor.AbortPolicy,在拒绝之后将抛出一个运行时异常RejectedExecutionException.
  2. ThreadPoolExecutor.CallerRunsPolicy策略时,调用execute()方法的线程自身. 提供一种简单的反馈控制机制,减缓新任务的提交速率.
  3. 对于ThreadPoolExecutor.DiscardPolicy策略,不能被执行的任务会简单地被丢弃.
  4. 对于ThreadPoolExecutor.DiscardOldestPolicy策略,如果executor还未被shutdown,那么在work queue前面的任务将被丢弃,然后重试(可能再次失败,导致重复该操作).

定义及使用其他RejectedExecutionHandler类是可能的.当策略被设计仅工作在特别的容量或者队列策略时,这样做必须特别小心.

钩子方法(Hook methods)

提供protected可覆盖方法beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable) 它们将分别在任务执行前后被调用.
这些操作能被用于操纵执行环境. 例如: 重新初始化线程本地,收集统计信息,或增加日志条目.
此外,terminated方法能被覆盖,然后执行任何当Executor被完全终止时,仅需要被执行一次的专门的处理过程.

如果钩子或者回调方法抛出异常,内部的工作线程可能转为失败并突然终止.

队列维持(Queue maintenance)

getQueue()方法允许访问工作队列,用于监控和debugging.
使用该方法用于任何其他目的都是被强烈阻止的.
两个被提供的方法,remove(Runnable)purge,当有大量的任务被取消时,可用于协助空间的再利用.

销毁(Finalization)

一个不再被程序引用的线程池,同时也没有保留任何线程时将被自动shutdown.
假如你想要确保未引用的线程池被再利用,即便用户忘记调用shutdown方法,之后你必须安排那些未被使用的线程最终是死亡的, 通过设置适当的保活时间,以及使用一个小于0的核心线程数同时设置allowCoreThreadTimeOut(boolean).

扩展例子(Extension example)

绝大多数该类的扩展覆盖了一个或多个protected钩子方法.例如,此处有个添加了简单的暂停/恢复特性的子类:

static class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(int corePoolSize,
                                      int maximumPoolSize,
                                      long keepAliveTime,
                                      TimeUnit unit,
                                      BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}

源码解析

ThreadPoolExecutor的类图

ThreadPoolExecutor的类图

状态及计数器记录

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 计数器部分的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 计数器部分的掩码
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor通过仅仅使用一个AtomicInteger类型就记录了5种线程状态,以及每种状态下线程的数量.
这种设计应该是考虑到线程的数量短时间内不会超过2^29,且减少了需要维护的属性数量,此外还减少了需要并发控制的变量数量.
因为当前存在5种状态,如果每个状态维护,就需要5个,冗长且没必要,而且这些都是存在竞态条件的变量,变量多控制就麻烦.
当然将状态和计数器放在一个变量中,略微增加了该变量的操作步骤会略微损失性能,稍微降低并发能力.

其实现方式很简单,就是通过bit操作,将部分区域标记为特定的用途. ThreadPoolExecutor 就是将最高位的3个bit (1-3位)作为状态描述的部分,剩下的29 bit (4-32位)作为计数器计数部分.

// RUNNING
11100000000000000000000000000000
// SHUTDOWN
0
// STOP
00100000000000000000000000000000
// TIDING
01000000000000000000000000000000
// TERMINATED
01100000000000000000000000000000

未完待续…

GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐