Java 线程池 ThreadPoolExecutor.

 

JDK1.5 开始关于多线程加了很多特性。如:

ConcurrentHashMap: 放弃使用公用锁同步每一个方法,使用了更细化的锁机制,分离锁。对于大数据量的 HashMap 同步操作效率有了较大提升。

CopyOnWriteArrayList: 是同步 List 的一个并发替代品。其线程安全性来源于这样一个事实:只要有效的不可变对象被正确发布,那么访问它将不再需要更多的同步。在每次需要修改时它们会创建并重新发布一个信的容器拷贝,以此来实现可变性。

增加了 Callable Future Callable runnable 的一个可选替代。我们之前用的 Runnable 是不能返回状态的,而 Callable 是可以返回状态,返回的状态保存在泛型 Future<T> 里。

JDK1.5 里面还包含了一个重要的特性就是线程池。通过查看代码可以看出主要都是由大师 Doug Lea 来完成的。本文主要介绍线程池 ThreadPoolExecutor 的使用。

 

JDK1.5 的线程池由 Executor 框架提供。 Executor 框架将处理请求任务的提交和它的执行解耦。可以制定执行策略。在线程池中执行线程可以重用已经存在的线程,而不是创建新的线程,可以在处理多请求时抵消线程创建、消亡产生的开销。如果线程池过大,会导致内存的高使用量,还可能耗尽资源。如果过小,会由于存在很多的处理器资源未工作,对吞吐量造成损失。

由于内容较多没有一一研究,只看了较常用的 ThreadPoolExecutor ,所以在这里做个介绍。 ThreadPoolExecutor 的继承关系如下。

Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor

核心池大小 (core pool size) 、最大池的大小 (maximum pool size) 、存活时间 (keep-alive time) 共同管理着线程的创建和销毁。

线程池类为 java.util.concurrent.ThreadPoolExecutor ,常用构造方法为:

/**

* Creates a new <tt> ThreadPoolExecutor </tt> with the given initial

* parameters.

*

* @param corePoolSize the number of threads to keep in the

* pool, even if they are idle.

* @param maximumPoolSize the maximum number of threads to allow in the

* pool.

* @param keepAliveTime when the number of threads is greater than

* the core, this is the maximum time that excess idle threads

* will wait for new tasks before terminating.

* @param unit the time unit for the keepAliveTime

* argument.

* @param workQueue the queue to use for holding tasks before they

* are executed. This queue will hold only the <tt> Runnable </tt>

* tasks submitted by the <tt> execute </tt> method.

* @param handler the handler to use when execution is blocked

* because the thread bounds and queue capacities are reached.

* @throws IllegalArgumentException if corePoolSize, or

* keepAliveTime less than zero, or if maximumPoolSize less than or

* equal to zero, or if corePoolSize greater than maximumPoolSize.

* @throws NullPointerException if <tt> workQueue </tt>

* or <tt> handler </tt> are null.

*/

public ThreadPoolExecutor( int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler) {

this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors. defaultThreadFactory (), handler);

}

corePoolSize : 线程池维护线程的最少数量,哪怕是空闲的。

maximumPoolSize :线程池维护线程的最大数量。

keepAliveTime : 线程池维护线程所允许的空闲时间。

unit : 线程池维护线程所允许的空闲时间的单位。

workQueue : 线程池所使用的缓冲队列,改缓冲队列的长度决定了能够缓冲的最大数量。

拒绝任务:拒绝任务是指当线程池里面的线程数量达到 maximumPoolSize workQueue 队列已满的情况下被尝试添加进来的任务。

handler : 线程池对拒绝任务的处理策略。在 ThreadPoolExecutor 里面定义了 4 handler 策略,分别是

1. CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。

2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。

3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。

4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

 

一个任务通过 execute(Runnable) 方法被添加到线程池,任务就是一个 Runnable 类型的对象,任务的执行方法就是 Runnable 类型对象的 run() 方法。

当一个任务通过 execute(Runnable) 方法欲添加到线程池时,线程池采用的策略如下:

1. 如果此时线程池中的数量小于 corePoolSize ,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

2. 如果此时线程池中的数量等于 corePoolSize ,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。

3. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量小于 maximumPoolSize ,建新的线程来处理被添加的任务。

4. 如果此时线程池中的数量大于 corePoolSize ,缓冲队列 workQueue 满,并且线程池中的数量等于 maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。

处理任务的优先级为:

核心线程 corePoolSize 、任务队列 workQueue 、最大线程 maximumPoolSize ,如果三者都满了,使用 handler 处理被拒绝的任务。当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。

理解了上面关于 ThreadPoolExecutord 的介绍,应该就基本能了解它的一些使用,不过在 ThreadPoolExocutor 里面有个关键的 Worker 类,所有的线程都要经过 Worker 的包装。这样才能够做到线程可以复用而无需重新创建线程。

同时 Executors 类里面有 newFixedThreadPool(),newCachedThreadPool() 等几个方法,实际上也是间接调用了 ThreadPoolExocutor ,不过是传的不同的构造参数。

下面通过一个例子的执行结果来理解

 

代码:

 

上面代码定义了一个 corePoolSize 2 maximumPoolSize 3 workerQuene 容量为 3 的线程池,也就是说在饱和状态下,只能容纳 6 个线程, 3 个是运行状态, 3 个在队列里面。同时代码又往线程池里面添加了 9 个线程,每个线程会运行 2 秒,这样必然会到达饱和状态。而饱和状态就涉及到对拒绝任务的处理策略,本处采用了 ThreadPoolExecutor.DiscardOldestPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

put task@ 7

put task@ 8

put task@ 9

start ..task@ 8

start ..task@ 9

采用 ThreadPoolExecutor.DiscardOldestPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

start ..task@ 6

start ..task@ 4

start ..task@ 5

put task@ 7

start ..task@ 7

put task@ 8

put task@ 9

start ..task@ 8

start ..task@ 9

采用 ThreadPoolExecutor.AbortPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

put task@ 7

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

put task@ 8

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

put task@ 9

java.util.concurrent.RejectedExecutionException

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( ThreadPoolExecutor.java:1477 )

at java.util.concurrent.ThreadPoolExecutor.reject( ThreadPoolExecutor.java:384 )

at java.util.concurrent.ThreadPoolExecutor.execute( ThreadPoolExecutor.java:867 )

at TestThreadPool.main( TestThreadPool.java:22 )

start ..task@ 4

start ..task@ 5

采用 ThreadPoolExecutor.DiscardPolicy() 运行结果如下:

put task@ 1

start ..task@ 1

put task@ 2

start ..task@ 2

put task@ 3

put task@ 4

put task@ 5

start ..task@ 3

put task@ 6

put task@ 7

put task@ 8

put task@ 9

start ..task@ 4

start ..task@ 5

从运行结果可以看出不同的 Handler 策略对拒绝任务的处理方式。

目前还只偏重在使用层面的理解,底层细节的原理还有待日后学习,欢迎交流。

 

 

本文部分内容和例子参考了: http://blog.csdn.net/imicro/archive/2007/08/29/1763955.aspx

 

 

GitHub 加速计划 / th / ThreadPool
3
0
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:4 个月前 )
9a42ec13 - 10 年前
fcc91415 - 10 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐