一、简介 

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, 
RejectedExecutionHandler handler) 

  • corePoolSize: 线程池维护线程的最少数量 
  • maximumPoolSize:线程池维护线程的最大数量 
  • keepAliveTime: 线程池维护线程所允许的空闲时间 
  • unit: 线程池维护线程所允许的空闲时间的单位 
  • workQueue: 线程池所使用的缓冲队列 
  • handler: 线程池对拒绝任务的处理策略 


        一个任务( Runnable类型的对象)通过 execute(Runnable)方法被添加到线程池,任务的执行方法就是Runnable类型对象的run()方法。 当一个任务通过execute(Runnable)方法欲添加到线程池时: 
        如果此时线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 
        如果此时线程池中的线程数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 
        如果此时线程池中的线程数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的线程数量小于maximumPoolSize,建新的线程来处理被添加的任务。

        如果此时线程池中的线程数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的线程数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。 

也就是:处理任务的优先级为: 
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 

unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性: 
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 

workQueue可用的队列类是:java.util.concurrent.ArrayBlockingQueue 

handler有四个选择: 

  • (1)ThreadPoolExecutor.AbortPolicy()   抛出java.util.concurrent.RejectedExecutionException异常 
  • (2)ThreadPoolExecutor.CallerRunsPolicy()  重试添加当前的任务,他会自动重复调用execute()方法 
  • (3)ThreadPoolExecutor.DiscardOldestPolicy() 抛弃旧的任务 (等待队列里面最早进入的)
  • (4)ThreadPoolExecutor.DiscardPolicy()     抛弃当前的任务 

二、例子

1.使用ThreadPoolExecutor.AbortPolicy() 策略

import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPool2 {
    private static int produceTaskSleepTime = 2;
    private static int produceTaskMaxNumber = 8;

    public static void main( String[] args ) {
        // 构造一个线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 4, 3, TimeUnit.SECONDS,
                        new ArrayBlockingQueue < Runnable >( 3 ), new ThreadPoolExecutor.AbortPolicy() );

        for ( int i = 1; i <= produceTaskMaxNumber; i++ ) {
            try {
                // 产生一个任务,并将其加入到线程池
                String task = "task@ " + i;
                System.out.println( "put " + task );
                threadPool.execute( new ThreadPoolTask( task ) );

                // 便于观察,等待一段时间
                Thread.sleep( produceTaskSleepTime );
            }
            catch ( Exception e ) {
                e.printStackTrace();
            }

           
        }
    }
}


/**
 * 线程池执行的任务
 */
class ThreadPoolTask implements Runnable, Serializable {
    private static final long serialVersionUID = 0;
    // 保存任务所需要的数据
    private Object threadPoolTaskData;

    ThreadPoolTask( Object tasks ) {
        this.threadPoolTaskData = tasks;
    }

    public void run() {
        // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
        System.out.println( Thread.currentThread().getName() );
        System.out.println( "start .." + threadPoolTaskData );

        try {
            // //便于观察,等待一段时间
            Thread.sleep( 2000 );
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
        threadPoolTaskData = null;
    }

    public Object getTask() {
        return this.threadPoolTaskData;
    }

}

程序的执行结果:

put task@ 1
pool-1-thread-1
start ..task@ 1
put task@ 2
pool-1-thread-2
start ..task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
pool-1-thread-3
start ..task@ 6
put task@ 7
pool-1-thread-4
start ..task@ 7
put task@ 8
java.util.concurrent.RejectedExecutionException: Task outputMml2.ThreadPoolTask@5aaa6d82 rejected from java.util.concurrent.ThreadPoolExecutor@69222c14[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
	at outputMml2.TestThreadPool2.main(TestThreadPool2.java:22)
pool-1-thread-1
start ..task@ 3
pool-1-thread-2
start ..task@ 4
pool-1-thread-3
start ..task@ 5


2.使用ThreadPoolExecutor.DiscardOldestPolicy() 策略

把上述代码的例子的策略改为:new ThreadPoolExecutor.DiscardOldestPolicy() ,程序的执行结果是:

put task@ 1
pool-1-thread-1
start ..task@ 1
put task@ 2
pool-1-thread-2
start ..task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
pool-1-thread-3
start ..task@ 6
put task@ 7
pool-1-thread-4
start ..task@ 7
put task@ 8
pool-1-thread-1
start ..task@ 4
pool-1-thread-2
start ..task@ 5
pool-1-thread-3
start ..task@ 8


可以看出,最早进入队列的任务3被丢弃了。

GitHub 加速计划 / th / ThreadPool
3
0
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:3 个月前 )
9a42ec13 - 10 年前
fcc91415 - 10 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐