下面我们来学习juc中很重要的一个模块,线程池
线程池的作用:

  • 使用线程池通过线程的重用,降低线程创建的开销,降低资源消耗
  • 可以根据系统的承受能力,调整线程池中工作线程的数量
  • 线程池还增加了一些线程执行的管理功能,方便对线程执行状态的监控

    下面咱们看看Executor–ExecutorService–AbstractExecutorService–ThreadPoolExecutor这条线的源码

首先看Executor(执行器),

/** 主要是解耦线程的提交和执行 */
public interface Executor {
    /**
     * 提交一个comman,执行由实现类自己定义,可以直接执行,也可以新开线程执行等等
     */
    void execute(Runnable command);
}

ExecutorService

public interface ExecutorService extends Executor {
    //关闭线程池,等待正在运行的线程运行完后再关
    void shutdown();
    //立刻关闭线程池,尝试停止正在运行的任务(但不能保证一定成功),返回待执行任务列表
    List<Runnable> shutdownNow();
    //执行器是否关闭
    boolean isShutdown();
    //是否所有的任务已关闭
    boolean isTerminated();
    //阻塞等待所有线程执行完毕,或时间耗尽,或当前线程被中断
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    //提交任务返回Future
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    //执行一组任务,当所有任务执行完成后返回一组Future的List
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    //执行一组任务,其中任何一个任务完成,则返回其结果,其他任务讲被取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

    //将Runnable、value转化为FutrueTask
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    //将Callable转化为FutureTask
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    //这三个方法都是把任务转化为RunnableFuture然后执行
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    //invokeAny()实际上调用的是doInvokeAny()
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    //实际进行处理的方法
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        //任务为空抛出异常
        if (tasks == null)
            throw new NullPointerException();
        //获取任务数量
        int ntasks = tasks.size();
        //任务数量为0抛出异常
        if (ntasks == 0)
            throw new IllegalArgumentException();

        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        //使用ExecutorCompletionService再封装一层
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();
            //获取一个任务来执行
            futures.add(ecs.submit(it.next()));
            //任务数减一
            --ntasks;
            int active = 1;

            for (;;) {
                //从esc中取出一个任务来,查看结果
                Future<T> f = ecs.poll();
                if (f == null) {//队列为空
                    //还有为提交的任务,则再获取一个然后提交
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        //执行中的任务+1
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    //如果设置过时间,则阻塞等待指定时间
                    else if (timed) {
                        //这里其实就是一个阻塞队列,如果在等待时间内未获取到Future,则抛出异常
                        //只有提交完所有任务,且无任务完成时,且超时timed为true

                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    //如果没有设置超时阻塞,则等待有任务完成
                    else
                        f = ecs.take();
                }
                //已有任务完成
                if (f != null) {
                    //活跃任务数量减1
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            //说明没有任务正常执行完成返回结果
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //取消剩余任务
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }

FutureTask实现了RunnableFuture,而RunnableFuture接口又继承了Futrue接口和Runnable接口,因此FutrueTask即能被当做Future用能被当做Runnable

doInvokeAny中使用ExecutorCompletionService(此类下次做分析)来管理执行完成的任务,ExecutorCompletionService内部使用LinkedBlockingQueue作为完成队列,任务执行完后自动将结果Future加入到队列中。

再来看看invokAll()

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            //提交所有任务到执行器中
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                //保证每个任务都完成
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            //出现异常取消任务
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

    //区别是加上了超时设置
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                //循环提交任务
                execute((Runnable)(it.next()));
                //计算剩余时间
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                //超时则直接返回
                if (nanos <= 0)
                    return futures;
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {//保证每个任务都执行完
                    //超时则直接返回
                    if (nanos <= 0)
                        return futures;
                    try {
                        //在指定时间内等待任务执行结果
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    //计算剩余时间
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }   

这两个方法的大体的流程都差不多基本是1.提交任务;2.等待任务完成.超时的方法就是在没步执行的时候都加了一个时间验证.


下面看看ThreadPoolExecutor
这个类比较复杂,咱们一点一点慢慢分析
首先来看状态相关的东西有个大概认识

/**ctl代表2个含义:高3位为线程池运行状态,低29位线程数量 */
//11100000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //00011111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    /**
     *   RUNNING:  接收新的任务并且也会处理已经提交等待的任务
     *   SHUTDOWN: 不会接收新的任务,但会处理已经提交等待的任务
     *   STOP:     不接受新任务,不处理已经提交等待的任务,而且还会中断处理中的任务
     *   TIDYING:  所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法
     *   TERMINATED: terminated()调用完成
     */

    //11100000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    //00000000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //00100000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    //01000000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    //01100000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;

    //&操作比较高3位获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // &比较低29位获取线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //通过rs高3位运行状态|wc低29位线程数量计算最后的值
    private static int ctlOf(int rs, int wc) { return rs | wc; }




    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    //CAS增加线程数量
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    //CAS减少线程数量
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    //减少工作线程数量
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    //把当前线程池状态更改为targetState
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

以上的属性、方法都跟线程池的状态相关.ctl高低位分离,之前读写锁的时候也有这种处理,可以一起记忆加深理解

再来看看其主要的结构

    //线程池的等待队列;如果线程池大小未到的corePoolSize则线程直接执行,否则加入此阻塞队列等待执行
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    //所有的线程
    private final HashSet<Worker> workers = new HashSet<Worker>();
    /**
     * 锁的条件
     */
    private final Condition termination = mainLock.newCondition();
    //线程池最大线程数量
    private int largestPoolSize;
    //已结束的任务数量,只有在线程池关闭的时候才更新
    private long completedTaskCount;
    //线程工厂,用于生产线程
    private volatile ThreadFactory threadFactory;
    //线程池满了后的拒绝策略
    private volatile RejectedExecutionHandler handler;
    //线程池线程超过corePoolSize,且allowCoreThreadTimeOut为true时,则线程等待需要判断是否等待超时
    private volatile long keepAliveTime;
    //核心线程数量,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,队列满了才创建新的线程
    private volatile int corePoolSize;
    //最大线程数量
    private volatile int maximumPoolSize;
    //默认拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    //线程池关闭许可
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

再来看看构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造器其实就是根据上面讲的参数来创建线程池,这些请务必记清楚,因为这是阿里推荐的使用线程池的方式
这里写图片描述

再来看看默认线程工厂是如何实现的

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        // 返回一个线程:name为pool-线程池序列-thread-线程序列,优先级为5
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

再看看线程池提供的默认拒绝策略

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        //直接抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    //不进行处理
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    //线程池没有关闭就把任务队列的第一个丢弃然后执行新的
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

这里提供了三种策略,也可以自己写

下面看看真正的执行方法execute()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        //如果工作线程小于核心线程
        if (workerCountOf(c) < corePoolSize) {
            //增加一个核心线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果核心线程满了,线程池还在运行,那么就入列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池关闭,移除任务,按照给定的策略reject
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果核心线程数为0,则新开一个核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果检查的时候线程池就关闭了或者线队列满了入队列失败,那就再开一个线程;失败就reject
        else if (!addWorker(command, false))
            reject(command);
    }


    //基于当前线程池的状态和大小(core:true使用corePoolSize,false使用maximumPoolSize),决定能否加入一个新的任务
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //线程池状态
            int rs = runStateOf(c);
            /** 1.如果为Running状态无所谓,不管新增线程还是接收新任务都可以
             * 2.如果为shutdown状态那就只能处理任务,不能接收任务。这时入参firstTask必须null且,队列不为空,可以增加一个线程来处理队列,但不能新增任务
             * 3.其他状态直接false
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //工作线程数量
                int wc = workerCountOf(c);
                //根据core检验大小
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //线程数量+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // CAS+自旋更新变量c
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //线程池主锁
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 如果线程已启动(可能在自定义线程工厂内启动)
                            throw new IllegalThreadStateException();
                        //加入线程池
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            //设置线程池大小
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //已经加入线程池就直接启动
                if (workerAdded) {
                    //worker线程开始运行
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

当将一个任务添加到线程池中时会进行如下判断:

  1. 如果线程池中的线程数量少于 corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务
  2. 如果线程池中的线程数量大于等于 corePoolSize,但缓冲队列 workQueue 未满,则将新添加的任务放到 workQueue中,按照 FIFO 的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行)
  3. 如果线程池中的线程数量大于等于 corePoolSize,且缓冲队列 workQueue 已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务
  4. 如果线程池中的线程数量等于了 maximumPoolSize,则使用拒绝策略(默认报错)


下面来看看一个核心类Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread;
        Runnable firstTask;
        //任务完成时间
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // AQS的state初始状态为-1 
            this.firstTask = firstTask;
            //线程工厂创建线程
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
        //继承AQS的实现
        //0表示未被锁,1表示加锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        //中断运行中的线程
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//线程的初始任务
        w.firstTask = null;
        w.unlock(); //这里是worker构造默认的aqs为-1,这里unlock下,容许中断
        boolean completedAbruptly = true;//线程退出的原因:true是任务导致(中断或异常等等),false是线程正常退出
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //空方法留给子类扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //空方法留给子类扩展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //线程正常退出,false
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

在runWorker()中有使用到getTask()方法用于获取任务

/**
 * 存在阻塞或超时获取任务或由于下列原因返回null:
 * 1. 超过设置的线程池大小maximumPoolSize;
 * 2. 线程池stop.
 * 3. 线程池shutdown,队列为空.
 * 4. 线程超时等待任务
 */
private Runnable getTask() {
        boolean timedOut = false; 

        retry:
        for (;;) {
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);
            //如果线程池被关闭,且等待队列为空,那就直接worker减1,返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;  

            for (;;) {
                int wc = workerCountOf(c);
                // 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
                // 2.allowCoreThreadTimeOut == false && wc >corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只有超过的线程才会
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                // 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
                // 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者poll出异常了重试
                // 2.timeOut == true && timed == false:看后面的代码workerQueue.poll超时时timeOut才为true,并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true),所以超时不会继续执行而是return null结束线程
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                // workerCount递减,结束当前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                    continue retry;
            }

            try {
                // 1.以指定的超时时间从队列中取任务
                // 2.core thread没有超时
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超时
            } catch (InterruptedException retry) {
                timedOut = false;// 线程被中断重试
            }
        }
    }

再来看看退出线程

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的话再runWorker的getTask方法workerCount已经被减一了
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加线程的completedTasks
            completedTaskCount += w.completedTasks;
            // 从线程池中移除超时或者出现异常的线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 尝试停止线程池
        tryTerminate();

        int c = ctl.get();
        // runState为RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 线程不是异常结束
            if (!completedAbruptly) {
                // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 线程池还不为空那就不用担心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.线程异常退出
            // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
            addWorker(null, false);
        }
    }

shutdownshutdownNow
shutdown这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
            // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
            // tryTerminate方法中会保证队列中剩余的任务得到执行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP状态:不再接受新任务且不再执行队列中的任务。
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 返回队列中还没有被执行的任务。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
            // 因此保证了中断的肯定是空闲的线程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化时state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

至此主要方法分析完了,这个真的非常难理解,我也是结合别人的分析才能勉强看懂,以后还要多看几遍加深理解

GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐