Java — 慎用Executors类中newFixedThreadPool()和newCachedThreadPool()
在一些要求严格的公司,一般都明令禁止是使用Excutor提供的newFixedThreadPool()和newCachedThreadPool()直接创建线程池来操作线程,既然被禁止,那么就会有被禁止的道理,我们先来看一下之所以会被禁止的原因。
Executors.newCachedThreadPool()
源码
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
大致意思就是,通过该方法会创建一个线程池,当你执行一个任务,并且线程池中不存在可用的已构造好的线程时,它就会创建一个新线程,否则它会优先复用已有的线程,当线程未被使用时,默认 60 秒后被移除。
里面有一句话:
These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
这些线程池可以很明显的提升那些短期存活的异步任务的执行效率
很明显,官方标注他适合处理业务简单、耗时短的任务,这是为什么呢?
我们接着看 ThreadPoolExecutor 构造方法的描述:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
* 核心线程数
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* 最大线程数
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* 存活时长
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* 时间单位
* @param unit the time unit for the {@code keepAliveTime} argument
* 任务队列
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
分析
结合ThreadPoolExecutor 构造方法的描述,我们可以知道,当我们调用newCachedThreadPool()方法的时候,它会创建一个核心线程数为 0 ,最大线程数为Integer上限,无用线程存活时间为 6 秒的线程池。
这意味着当我们需要在多线程中执行复杂业务时,它会疯狂的创建线程,因为其他线程中的业务并未执行完。
例如下列代码:
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
//DO SOMETHING
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
模拟瞬间创建100000000十万个任务,且每个任务需要等待一小时,会发现电脑内存使用率迅速增加并一直持续到 OOM。
Executors.newFixedThreadPool()
我们再来看一下源码
源码
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
大致描述是:创建一个由固定数量线程并在共享无界队列上运行的线程池,在任何时候都最多只有nThreads个线程存在并执行任务。
如果在任务提交时,所有线程都在工作中,则会将该任务放入到队列中等待,直到有可用的线程。如果某个线程在执行过程中出现异常,那么这个线程会终止,并且会有一个新的线程代替它进行后续的工作,线程池中的线程会一直存在直到线程池被明确的停止掉。
//停止接收新的任务,并继续完成正在执行的任务和队列中的任务
ExecutorService#shutdown
//等所有已提交的任务(包括正在跑的和队列中等待的)执行完
//或者等超时时间到
//或者线程被中断,抛出InterruptedException
ExecutorService#awaitTermination(1, TimeUnit.HOURS);
//停止接受新的任务,忽略队列中的任务并尝试终止正在执行的任务
ExecutorService#shutdownNow
分析
通过源码我们可以看出,该方法创建一个固定核心线程数和线程池大小的线程池,并且核心数等于最大线程数。看起来好像没有类似newCachedThreadPool无限创建线程的情况,但是在他的描述中有一点很引人注意,
operating off a shared unbounded queue
操作一个共享无界的队列
通过查看newFixedThreadPool()在创建线程池时传入的队列 new LinkedBlockingQueue()
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
会发现,这个队列的最大长度时Integer.MAX_VALUE,这就意味着,未能及时执行的任务都将添加到这个队列里面
随着任务的增加,这个队列所占用的内存将越来越多。最终导致OOM也是迟早的事情。
避坑指南
对于线程池这种东西,其实让我们自己去控制是最好的,我们可以通过实现自定义的线程池提供线程,不仅可以定制化的获取线程执行过程中的状态等信息,还能根据不同的任务使用不同的线程池。
例如,一条简单的 查询操作 和 文件读取操作 就应该放在不同的线程池里面
因为如果两种任务在同一个线程池里面,文件操作本身就是耗时的,它占用了线程之后会导致查询操作等待或者直接被丢弃(取决于自定义线程池任务添加时的规则),这样严重影响了查询性能。
自定义线程池
//队列长度为100
BlockingQueue<Runnable> blockqueue = new LinkedBlockingQueue<Runnable>(100) {
/**
* 这里重写offer方法
* 在接收到新的任务时,会先加入到队列中,当队列满了之后,才会创建新的线程 直到达到线程池的最大线程数
* 我们现在需要接收到新任务时,优先将线程数扩容到最大数,后续任务再放入到队列中
* 加入队列会调用 offer方法 ,我们直接返回false,制造队列已满的假象
*/
@Override
public boolean offer(Runnable e) {
return false;
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
4, 10,
10, TimeUnit.SECONDS,
blockqueue , new ThreadFactoryBuilder().setNameFormat("mypool-%d").get(), (r, executor) -> {
/**
* 这里拒绝策略,被拒绝的任务会走该方法 及没添加到队列中,且没有获取到线程的任务
* 因为我们设置的队列中 offer方法固定返回false
*/
try {
//如果允许该任务执行但是不阻塞,及如果进不了队列就放弃,我们可以调用 offer 的另一个多参的方法
if (!executor.getQueue().offer(r, 0, TimeUnit.SECONDS)) {
throw new RejectedExecutionException("ThreadPool queue full, failed to offer " + r.toString());
}
//如果我们需要让任务一定要执行,及足协而等待进入队列,可以使用put
executor.getQueue().put(r)
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
更多推荐
所有评论(0)