对于线程池,我仅限于简单的使用,对其原理和参数并不是很理解。以前只是在网上找个线程池的例子,然后模仿写一下而已,最近项目中又再次用的线程池,做了一些研究,现记录下,以备以后使用。

    我以前利用线程池只会这样用:

ExecutorService pool = Executors.newFixedThreadPool(5);
pool.execute(new Runnable() {
    @Override
    public void run() {
        //具体线程要干的活
    }
});

这个只是创建一个固定线程数为5的线程池(当然还有其他3种

这种写法有一定的弊端,当系统并发的增长量远远高于线程池的消费量(比如系统是每秒中增加20个并发,而线程池是每秒钟执行5个线程)。这就有可能造成服务器内存溢出的风险。原因是newFixedThreadPool采用了无边界队列。可以从源码中一窥究竟。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

这里LinkedBlockingQueue是无边界队列。ThreadPoolExecutor构造中的参数文章下面会逐一讲解。总之上面的写法不灵活,对于一些并发高的系统是不满足的。

下面我们开始步入正题,开始研究下ThreadPoolExecutor线程池。

第一步,研究ThreadPoolExecutor线程池有几种构造方法

第一个构造:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

第二个构造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

第三个构造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

第四个构造:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

总结:从上面四个构造可以看出,其实前三个构造都是调用的第四个构造方法。所以我们研究第四个构造即可。 

第二步,研究第四个构造的每个参数

corePoolSize: 线程池核心线程数量   int类型

maximumPoolSize:线程池允许线程的最大数量      int类型

keepAliveTime: 线程池中线程所允许的空闲时间。long类型

      JDK解释:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

       也就是说啊,线程池中当前的空闲线程服务完某任务后的存活时间。如果时间足够长,那么可能会服务其它任务。

       这些空闲线程指的是新建线程还是核心线程,还是两者都包含。这个不是很清楚。(哪位大神知道可以告诉我下)

unit: 线程池维护线程所允许的空闲时间的单位 

   MICROSECONDS    微秒   一百万分之一秒(就是毫秒/1000)
   MILLISECONDS    毫秒   千分之一秒    
   NANOSECONDS   毫微秒  十亿分之一秒(就是微秒/1000)
   SECONDS          秒
   MINUTES     分钟
   HOURS      小时
   DAYS      天

 

workQueue: 线程池所使用的缓冲队列(关于队列的详解在后面第四步)

        直接提交 SynchronousQueue

        无界队列 如LinkedBlockingQueue

        有界队列 如ArrayBlockingQueue

handler: 线程池对拒绝任务的处理策略

       ThreadPoolExecutor.AbortPolicy()  抛出java.util.concurrent.RejectedExecutionException异常。 

        ThreadPoolExecutor.CallerRunsPolicy()  重试添加当前的任务,他会自动重复调用execute()方法。 

        ThreadPoolExecutor.DiscardOldestPolicy()  抛弃旧的任务  

         ThreadPoolExecutor.DiscardPolicy()  抛弃当前的任务。 

threadFactory:线程工厂,主要用来创建线程:默认值 DefaultThreadFactory

第三步,有一个例子,对线程池描述的非常贴切。

   有一个工厂,核定工人数量为10,每个工人同时只做一件事,

   因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

 当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待

   如果队列workQueue的容量为1,且现在10个工人都在干活,

              又来一个任务A,则把A放到到队列中等待有空闲工人来从队列中拿任务。

              但是有这么一种情况,来的不是一个任务,是5个任务,且10个工人都没有空闲,且队列也放满了(1个任务就满了哦)

               这时候,老板一看来的活多,干不过来了,怎么办,老板就想招了,再招5个临时工吧,这时候工厂就是15个工人

总结:corePoolSize=10;maximumPoolSize=15。当任务数大于corePoolSize时,是先向队列里放,当队列里也满了,还有任务不断进来,才新建线程哦。且最大不会超过maximumPoolSize。

问题来了,如果队列满了,且线程数到最大线程数了,还进来任务怎么办呢。这就用到了handler异常策略(或者叫饱和策略)。

第四步,缓冲队列详解。

4.1、直接提交 SynchronousQueue

       该队列是将任务直接提交给线程而不保存它们。在此,如果不存在空闲的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。           SynchronousQueue线程安全的Queue,可以存放若干任务(但当前只允许有且只有一个任务在等待),其中每个插入操作必须等待另一个线程的对应移除操作,也就是说A任务进入队列,B任务必须等A任务被移除之后才能进入队列,否则执行异常策略。你来一个我扔一个,所以说SynchronousQueue没有任何内部容量。

       比如:核心线程数为2,最大线程数为3;使用SynchronousQueue。

       当前有2个核心线程在运行,又来了个A任务,两个核心线程没有执行完当前任务,根据如果运行的线程等于或多于 corePoolSize,

      则 Executor 始终首选将请求加入队列,而不添加新的线程。所以A任务被添加到队列,此时的队列是SynchronousQueue,

      当前不存在可用于立即运行任务的线程,因此会构造一个新的线程,此时又来了个B任务,两个核心线程还没有执行完。

       新创建的线程正在执行A任务,所以B任务进入Queue后,最大线程数为3,发现没地方仍了。就只能执行异常策略(RejectedExecutionException)。

4.2、无界队列  LinkedBlockingQueue

        使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有核心线程都在忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就没意义了。)也就不会有新线程被创建,都在那等着排队呢。如果未指定容量,则它等于 Integer.MAX_VALUE。如果设置了Queue预定义容量,则当核心线程忙碌时,新任务会在队列中等待,直到超过预定义容量(新任务没地方放了),才会执行异常策略。你来一个我接一个,直到我容不下你了。FIFO,先进先出。

        比如:核心线程数为2,最大线程数为3;使用LinkedBlockingQueue(1),设置容量为1。

        当前有2个核心线程在运行,又来了个A任务,两个核心线程没有执行完当前任务,根据如果运行的线程等于或多于 corePoolSize,

        则 Executor 始终首选将请求加入队列,而不添加新的线程。所以A任务被添加到队列,此时的队列是LinkedBlockingQueue,

        此时又来了个B任务,两个核心线程没有执行完当前任务,A任务在队列中等待,队列已满。所以根据如果无法将请求加入队列,则创建新的线程,

         B任务被新创建的线程所执行,此时又来个C任务,此时maximumPoolSize已满,队列已满,只能执行异常策略(RejectedExecutionException)。

4.3 有界队列  ArrayBlockingQueue

       操作模式跟LinkedBlockingQueue查不多,只不过必须为其设置容量。所以叫有界队列。new ArrayBlockingQueue<Runnable>(Integer.MAX_VALUE) 跟 new LinkedBlockingQueue(Integer.MAX_VALUE)效果一样。LinkedBlockingQueue 底层是链表结构,ArrayBlockingQueue  底层是数组结构。你来一个我接一个,直到我容不下你了。FIFO,先进先出。

总结下:

使用无界队列,要防止任务增长的速度远远超过处理任务的速度,控制不好可能导致的结果就是内存溢出。

使用有界队列,关键在于调节线程数和Queue大小 ,线程数多,队列容量少,资源浪费。线程数少,队列容量多,性能低,还可能导致内存溢出。

第五步,我的一个实际应用例子代码如下(我采用的是xml配置式,更加灵活)

5.1、xml中配置线程池ThreadPoolExecutor

<!--线程池配置开始 -->
   <bean id="linkQueue" class="java.util.concurrent.LinkedBlockingQueue">
       <constructor-arg name="capacity" value="${thread.pool.linkQueue.size}"/>
   </bean>
   <bean id="pool" class="java.util.concurrent.ThreadPoolExecutor">
       <constructor-arg name="corePoolSize" value="${thread.pool.core.size}"/>
       <constructor-arg name="maximumPoolSize" value="${thread.pool.max.size}"/>
       <constructor-arg name="keepAliveTime" value="${thread.pool.keep.alive.time}"/>
       <constructor-arg name="unit" value="${thread.pool.time.unit}"/>
       <constructor-arg name="workQueue" ref="linkQueue"/>
   </bean>
<!--线程池配置结束 -->

5.2、properties文件中配置如下:

#线程池相关配置
#LinkedBlockingQueue队列容量
thread.pool.linkQueue.size=10
#线程池核心线程数
thread.pool.core.size=5
#线程池最大线程数
thread.pool.max.size=10
#空闲线程最大存活时间
thread.pool.keep.alive.time=2
/#时间单位 SECONDS(秒) MILLISECONDS(毫秒) MICROSECONDS(微秒)
thread.pool.time.unit=SECONDS

5.3、后台java代码

public class SaveEventAdviceHolder {


    @Autowired
    private ThreadPoolExecutor pool;

    public void saveSerializeObjEvent(Object[] objects,SaveEventBO bo){
        //设置线程池的异常策略为CallerRunsPolicy()
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        pool.setRejectedExecutionHandler(handler);
        pool.execute(new Runnable() {
            public void run() {
               //线程具体执行任务.....

        });
    }
 }

 

GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐