Quartz SimpleThreadPool的源码,一个简单的线程池的实现原理

The pool has a fixed number of Thread s, and does not grow or shrink based on demand.(固定大小的线程池,不会增加,不会减少)
由于这个线程池的特性比较简单,易于我们去理解线程池,线程池的使用就是为了减少每次创建关闭的开销,节约资源反对浪费
线程池中的主要资源就是工作的线程,能够调用已经创建好的线程,去执行特定的方法,所以线程池需要对于这些线程的资源进行统一的管理
统一管理中的线程就包含了工作的线程,可用的线程,总的线程资源

  • SimpleThreadPool中可用资源的表示
    private List< WorkerThread> workers;//总的线程
    private LinkedList< WorkerThread> availWorkers = new LinkedList< WorkerThread>();//可用的线程
    private LinkedList< WorkerThread> busyWorkers = new LinkedList< WorkerThread>();//工作中的线程资源
  • 一般的线程实现无疑就是继承Thread和实现Runable两种方法
import lombok.extern.slf4j.Slf4j;

/**
 * descrption: 简单的线程实例
 * authohr: wangji
 * date: 2017-08-23 14:22
 */
@Slf4j
public class MyThread extends Thread {
    @Override
    public void run() {
     log.info("线程run.....");
    }

    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();
    }
}

这里的线程执行完了之后就回收了线程的资源..和我们的线程池的资源是有区别的,线程池的资源是一直保持运行的状态,没有被回收,看看WorkerThread的实现

工作的线程的实现WorkerThread( WorkerThread extends Thread )

  • WorkerThread的成员变量
    private final Object lock = new Object(); //锁
    // A flag that signals the WorkerThread to terminate.
    private AtomicBoolean run = new AtomicBoolean(true);//运行关闭状态量

    private SimpleThreadPool tp;//当前线程池的引用,用来回收可用的线程资源availWorkers

    private Runnable runnable = null;//真正执行的Runable中的run方法,线程当前执行的方法

    private boolean runOnce = false;//是否执行一次
  • 构造函数(设置是否守护进程,是否执行一次,设置线程的优先级)
   /**
     * Create a worker thread and start it. Waiting for the next Runnable,
     * executing it, and waiting for the next Runnable, until the shutdown
     * flag is set.
     */
    WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                 int prio, boolean isDaemon) {

        this(tp, threadGroup, name, prio, isDaemon, null);
    }

    /**
     * Create a worker thread, start it, execute the runnable and terminate
     * the thread (one time execution).
     */
    WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                 int prio, boolean isDaemon, Runnable runnable) {

        super(threadGroup, name);
        this.tp = tp;
        this.runnable = runnable;
        if(runnable != null)
            runOnce = true;
        setPriority(prio);
        setDaemon(isDaemon);
    }
  • 循环的执行Run,让线程永不停息(这样才线程的资源不会被回收),通过状态量控制线程的终止
    /**
     * Loop, executing targets as they are received.
     */
    @Override
    public void run() {
        boolean ran = false;

        while (run.get()) {//状态量控制其终止状态
            try {
                synchronized(lock) {
                    while (runnable == null && run.get()) {
                        lock.wait(500);
                        //导致线程进入等待状态直到它被通知或者经过指定的时间
                        //没有可以执行的东西,就休息....
                    }

                    if (runnable != null) {
                        ran = true;
                        runnable.run();
                    }
                }
            } catch (InterruptedException unblock) {//忽略中断的异常
                // do nothing (loop will terminate if shutdown() was called
                try {
                    getLog().error("Worker thread was interrupt()'ed.", unblock);
                } catch(Exception e) {
                    // ignore to help with a tomcat glitch
                }
            } catch (Throwable exceptionInRunnable) {
                try {
                    getLog().error("Error while executing the Runnable: ",
                        exceptionInRunnable);
                } catch(Exception e) {
                    // ignore to help with a tomcat glitch
                }
            } finally {
                synchronized(lock) {//执行完了,将当前的runable,设置为空
                    runnable = null;
                }
                // repair the thread in case the runnable mucked it up...
                if(getPriority() != tp.getThreadPriority()) {
                    setPriority(tp.getThreadPriority());
                }

                if (runOnce) {//只执行一次,将当前Loop状态量设置false
                       run.set(false);
                    clearFromBusyWorkersList(this);//从工作线程中清除,只执行一次的
                } else if(ran) {
                    ran = false;
                    makeAvailable(this);//工作线程变为可用
                }

            }
        }

    }
}
//这两个方法就是执行线程的方法完成后,修改可用的工作线程,然后通知有这个nextRunnableLock锁的执行
//如果之前有线程在等待去执行,那么就会一直等待哦,知道这里的工作线程通知执行,都是同步执行哦
 protected void makeAvailable(WorkerThread wt) {
        synchronized(nextRunnableLock) {
            if(!isShutdown) {
                availWorkers.add(wt);
            }
            busyWorkers.remove(wt);
            nextRunnableLock.notifyAll();
        }
    }

    protected void clearFromBusyWorkersList(WorkerThread wt) {
        synchronized(nextRunnableLock) {
            busyWorkers.remove(wt);
            nextRunnableLock.notifyAll();
        }
    }
  • SimpleThreadPool怎么通知工作线程执行呢
    当前WorKThread的一个方法,去设置runable,然后就可以通知阻塞的工作线程去执行方法啦
 public void run(Runnable newRunnable) {
    synchronized(lock) {
        if(runnable != null) {
            throw new IllegalStateException("Already running a Runnable!");
        }

        runnable = newRunnable;
        lock.notifyAll();//通知run方法中阻塞的方法执行哦!Loop方法就可以来玩了
    }
}
  • WorkThread终止的方法,就是设置其状态量,不再去Loop,线程执行完毕就终止了
    /**
     * <p>
     * Signal the thread that it should terminate.
     * </p>
     */
    void shutdown() {
        run.set(false);
    }
  • WorkThread的完整的方法
 /**
     * <p>
     * A Worker loops, waiting to execute tasks.
     * </p>
     */
    class WorkerThread extends Thread {

        private final Object lock = new Object();

        // A flag that signals the WorkerThread to terminate.
        private AtomicBoolean run = new AtomicBoolean(true);

        private SimpleThreadPool tp;

        private Runnable runnable = null;

        private boolean runOnce = false;

        /**
         * <p>
         * Create a worker thread and start it. Waiting for the next Runnable,
         * executing it, and waiting for the next Runnable, until the shutdown
         * flag is set.
         * </p>
         */
        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                     int prio, boolean isDaemon) {

            this(tp, threadGroup, name, prio, isDaemon, null);
        }

        /**
         * <p>
         * Create a worker thread, start it, execute the runnable and terminate
         * the thread (one time execution).
         * </p>
         */
        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                     int prio, boolean isDaemon, Runnable runnable) {

            super(threadGroup, name);
            this.tp = tp;
            this.runnable = runnable;
            if(runnable != null)
                runOnce = true;
            setPriority(prio);
            setDaemon(isDaemon);
        }

        /**
         * <p>
         * Signal the thread that it should terminate.
         * </p>
         */
        void shutdown() {
            run.set(false);
        }

        public void run(Runnable newRunnable) {
            synchronized(lock) {
                if(runnable != null) {
                    throw new IllegalStateException("Already running a Runnable!");
                }

                runnable = newRunnable;
                lock.notifyAll();
            }
        }

        /**
         * <p>
         * Loop, executing targets as they are received.
         * </p>
         */
        @Override
        public void run() {
            boolean ran = false;

            while (run.get()) {
                try {
                    synchronized(lock) {
                        while (runnable == null && run.get()) {
                            lock.wait(500);
                        }

                        if (runnable != null) {
                            ran = true;
                            runnable.run();
                        }
                    }
                } catch (InterruptedException unblock) {
                    // do nothing (loop will terminate if shutdown() was called
                    try {
                        getLog().error("Worker thread was interrupt()'ed.", unblock);
                    } catch(Exception e) {
                        // ignore to help with a tomcat glitch
                    }
                } catch (Throwable exceptionInRunnable) {
                    try {
                        getLog().error("Error while executing the Runnable: ",
                            exceptionInRunnable);
                    } catch(Exception e) {
                        // ignore to help with a tomcat glitch
                    }
                } finally {
                    synchronized(lock) {
                        runnable = null;
                    }
                    // repair the thread in case the runnable mucked it up...
                    if(getPriority() != tp.getThreadPriority()) {
                        setPriority(tp.getThreadPriority());
                    }

                    if (runOnce) {
                           run.set(false);
                        clearFromBusyWorkersList(this);
                    } else if(ran) {
                        ran = false;
                        makeAvailable(this);
                    }

                }
            }

            //if (log.isDebugEnabled())
            try {
                getLog().debug("WorkerThread is shut down.");
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        }
    }
}

SimpleThread主要的初始化方法,初始化线程放置在List< WorkThread> 中去

  • 初始化共享的线程
 public void initialize() throws SchedulerConfigException {

    if(workers != null && workers.size() > 0) // already initialized...
        return;    

    if (count <= 0) {
        throw new SchedulerConfigException(
                "Thread count must be > 0");
    }
    if (prio <= 0 || prio > 9) {
        throw new SchedulerConfigException(
                "Thread priority must be > 0 and <= 9");
    }
    //设置线程组
    if(isThreadsInheritGroupOfInitializingThread()) {
        threadGroup = Thread.currentThread().getThreadGroup();
    } else {
        // follow the threadGroup tree to the root thread group.
        threadGroup = Thread.currentThread().getThreadGroup();
        ThreadGroup parent = threadGroup;
        while ( !parent.getName().equals("main") ) {
            threadGroup = parent;
            parent = threadGroup.getParent();
        }
        threadGroup = new ThreadGroup(parent, schedulerInstanceName + "-SimpleThreadPool");
        if (isMakeThreadsDaemons()) {
            threadGroup.setDaemon(true);
        }
    }
    //创建工作线程,然后start他们,这里没有设置runable,所以是不止执行一次哦!
    // create the worker threads and start them
    Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
    while(workerThreads.hasNext()) {
        WorkerThread wt = workerThreads.next();
        wt.start();
        availWorkers.add(wt);
    }
}

protected List<WorkerThread> createWorkerThreads(int createCount) {
    workers = new LinkedList<WorkerThread>();
    for (int i = 1; i<= createCount; ++i) {
        String threadPrefix = getThreadNamePrefix();
        if (threadPrefix == null) {
            threadPrefix = schedulerInstanceName + "_Worker";
        }
        WorkerThread wt = new WorkerThread(this, threadGroup,
            threadPrefix + "-" + i,
            getThreadPriority(),
            isMakeThreadsDaemons());
        if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
            wt.setContextClassLoader(Thread.currentThread()
                    .getContextClassLoader());
        }
        workers.add(wt);
    }

    return workers;
}
  • 让线程池运行一个任务,都是通过锁阻塞来实现线程间的通信。
 public boolean runInThread(Runnable runnable) {
    if (runnable == null) {
        return false;
    }
    synchronized (nextRunnableLock) {

        handoffPending = true;

        // Wait until a worker thread is available
        //阻塞等待,直到有线程可用
        while ((availWorkers.size() < 1) && !isShutdown) {
            try {
                nextRunnableLock.wait(500);
            } catch (InterruptedException ignore) {
            }
        }

        if (!isShutdown) {
            WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
            busyWorkers.add(wt);
            wt.run(runnable);
        } else {
            //这里的意思就是,如果已经关闭了,还有请求来了,创建一个新的线程来执行
            // If the thread pool is going down, execute the Runnable
            // within a new additional worker thread (no thread from the pool).
            WorkerThread wt = new WorkerThread(this, threadGroup,
                    "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
            busyWorkers.add(wt);
            workers.add(wt);
            wt.start();
        }
        nextRunnableLock.notifyAll();
        handoffPending = false;
    }

    return true;
}
  • 和平的关闭所有的线程?
 public void shutdown(boolean waitForJobsToComplete) {

        synchronized (nextRunnableLock) {
            getLog().debug("Shutting down threadpool...");

            isShutdown = true;

            if(workers == null) // case where the pool wasn't even initialize()ed
                return;

            // signal each worker thread to shut down
            //将所有可用的线程移除掉!
            Iterator<WorkerThread> workerThreads = workers.iterator();
            while(workerThreads.hasNext()) {
                WorkerThread wt = workerThreads.next();
                wt.shutdown();
                availWorkers.remove(wt);
            }

            // Give waiting (wait(1000)) worker threads a chance to shut down.
            // Active worker threads will shut down after finishing their
            // current job.
            nextRunnableLock.notifyAll();//让其他阻塞的线程执行完成

            if (waitForJobsToComplete == true) {

                boolean interrupted = false;
                try {
                    // wait for hand-off in runInThread to complete...
                    while(handoffPending) {
                        try {
                            nextRunnableLock.wait(100);
                        } catch(InterruptedException _) {
                            interrupted = true;
                        }
                    }

                    // Wait until all worker threads are shut down
                    while (busyWorkers.size() > 0) {//一直等待线程执行完成
                        WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
                        try {
                            getLog().debug(
                                    "Waiting for thread " + wt.getName()
                                            + " to shut down");

                            // note: with waiting infinite time the
                            // application may appear to 'hang'.
                            nextRunnableLock.wait(2000);
                        } catch (InterruptedException _) {
                            interrupted = true;
                        }
                    }

                    workerThreads = workers.iterator();
                    while(workerThreads.hasNext()) {
                        WorkerThread wt = (WorkerThread) workerThreads.next();
                        try {
                            wt.join();
                            workerThreads.remove();
                        } catch (InterruptedException _) {
                            interrupted = true;
                        }
                    }
                } finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }

                getLog().debug("No executing jobs remaining, all threads stopped.");
            }
            getLog().debug("Shutdown of threadpool complete.");
        }
    }

总结

看过源码之后,清楚的了解线程池维护的整个过程,由于这里没有涉及线程的增加的维护问题,整个代码使用wait和notify维持线程间的同步执行问题
对于共享线程的原理时使用Loop+wait+notify,当有线程执行的时候,被唤醒执行任务。

源码 SimpleThreadPool

 <dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.2.2</version>
</dependency>
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.quartz.simpl;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.quartz.SchedulerConfigException;
import org.quartz.spi.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleThreadPool implements ThreadPool {
    private int count = -1;
    private int prio = 5;
    private boolean isShutdown = false;
    private boolean handoffPending = false;
    private boolean inheritLoader = false;
    private boolean inheritGroup = true;
    private boolean makeThreadsDaemons = false;
    private ThreadGroup threadGroup;
    private final Object nextRunnableLock = new Object();
    private List<SimpleThreadPool.WorkerThread> workers;
    private LinkedList<SimpleThreadPool.WorkerThread> availWorkers = new LinkedList();
    private LinkedList<SimpleThreadPool.WorkerThread> busyWorkers = new LinkedList();
    private String threadNamePrefix;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private String schedulerInstanceName;

    public SimpleThreadPool() {
    }

    public SimpleThreadPool(int threadCount, int threadPriority) {
        this.setThreadCount(threadCount);
        this.setThreadPriority(threadPriority);
    }

    public Logger getLog() {
        return this.log;
    }

    public int getPoolSize() {
        return this.getThreadCount();
    }

    public void setThreadCount(int count) {
        this.count = count;
    }

    public int getThreadCount() {
        return this.count;
    }

    public void setThreadPriority(int prio) {
        this.prio = prio;
    }

    public int getThreadPriority() {
        return this.prio;
    }

    public void setThreadNamePrefix(String prfx) {
        this.threadNamePrefix = prfx;
    }

    public String getThreadNamePrefix() {
        return this.threadNamePrefix;
    }

    public boolean isThreadsInheritContextClassLoaderOfInitializingThread() {
        return this.inheritLoader;
    }

    public void setThreadsInheritContextClassLoaderOfInitializingThread(boolean inheritLoader) {
        this.inheritLoader = inheritLoader;
    }

    public boolean isThreadsInheritGroupOfInitializingThread() {
        return this.inheritGroup;
    }

    public void setThreadsInheritGroupOfInitializingThread(boolean inheritGroup) {
        this.inheritGroup = inheritGroup;
    }

    public boolean isMakeThreadsDaemons() {
        return this.makeThreadsDaemons;
    }

    public void setMakeThreadsDaemons(boolean makeThreadsDaemons) {
        this.makeThreadsDaemons = makeThreadsDaemons;
    }

    public void setInstanceId(String schedInstId) {
    }

    public void setInstanceName(String schedName) {
        this.schedulerInstanceName = schedName;
    }

    public void initialize() throws SchedulerConfigException {
        if(this.workers == null || this.workers.size() <= 0) {
            if(this.count <= 0) {
                throw new SchedulerConfigException("Thread count must be > 0");
            } else if(this.prio > 0 && this.prio <= 9) {
                if(this.isThreadsInheritGroupOfInitializingThread()) {
                    this.threadGroup = Thread.currentThread().getThreadGroup();
                } else {
                    this.threadGroup = Thread.currentThread().getThreadGroup();

                    ThreadGroup workerThreads;
                    for(workerThreads = this.threadGroup; !workerThreads.getName().equals("main"); workerThreads = this.threadGroup.getParent()) {
                        this.threadGroup = workerThreads;
                    }

                    this.threadGroup = new ThreadGroup(workerThreads, this.schedulerInstanceName + "-SimpleThreadPool");
                    if(this.isMakeThreadsDaemons()) {
                        this.threadGroup.setDaemon(true);
                    }
                }

                if(this.isThreadsInheritContextClassLoaderOfInitializingThread()) {
                    this.getLog().info("Job execution threads will use class loader of thread: " + Thread.currentThread().getName());
                }

                Iterator workerThreads1 = this.createWorkerThreads(this.count).iterator();

                while(workerThreads1.hasNext()) {
                    SimpleThreadPool.WorkerThread wt = (SimpleThreadPool.WorkerThread)workerThreads1.next();
                    wt.start();
                    this.availWorkers.add(wt);
                }

            } else {
                throw new SchedulerConfigException("Thread priority must be > 0 and <= 9");
            }
        }
    }

    protected List<SimpleThreadPool.WorkerThread> createWorkerThreads(int createCount) {
        this.workers = new LinkedList();

        for(int i = 1; i <= createCount; ++i) {
            String threadPrefix = this.getThreadNamePrefix();
            if(threadPrefix == null) {
                threadPrefix = this.schedulerInstanceName + "_Worker";
            }

            SimpleThreadPool.WorkerThread wt = new SimpleThreadPool.WorkerThread(this, this.threadGroup, threadPrefix + "-" + i, this.getThreadPriority(), this.isMakeThreadsDaemons());
            if(this.isThreadsInheritContextClassLoaderOfInitializingThread()) {
                wt.setContextClassLoader(Thread.currentThread().getContextClassLoader());
            }

            this.workers.add(wt);
        }

        return this.workers;
    }

    public void shutdown() {
        this.shutdown(true);
    }

    public void shutdown(boolean waitForJobsToComplete) {
        Object var2 = this.nextRunnableLock;
        synchronized(this.nextRunnableLock) {
            this.getLog().debug("Shutting down threadpool...");
            this.isShutdown = true;
            if(this.workers != null) {
                Iterator workerThreads = this.workers.iterator();

                while(workerThreads.hasNext()) {
                    SimpleThreadPool.WorkerThread interrupted = (SimpleThreadPool.WorkerThread)workerThreads.next();
                    interrupted.shutdown();
                    this.availWorkers.remove(interrupted);
                }

                this.nextRunnableLock.notifyAll();
                if(waitForJobsToComplete) {
                    boolean interrupted1 = false;

                    try {
                        while(this.handoffPending) {
                            try {
                                this.nextRunnableLock.wait(100L);
                            } catch (InterruptedException var16) {
                                interrupted1 = true;
                            }
                        }

                        SimpleThreadPool.WorkerThread wt;
                        while(this.busyWorkers.size() > 0) {
                            wt = (SimpleThreadPool.WorkerThread)this.busyWorkers.getFirst();

                            try {
                                this.getLog().debug("Waiting for thread " + wt.getName() + " to shut down");
                                this.nextRunnableLock.wait(2000L);
                            } catch (InterruptedException var15) {
                                interrupted1 = true;
                            }
                        }

                        workerThreads = this.workers.iterator();

                        while(workerThreads.hasNext()) {
                            wt = (SimpleThreadPool.WorkerThread)workerThreads.next();

                            try {
                                wt.join();
                                workerThreads.remove();
                            } catch (InterruptedException var14) {
                                interrupted1 = true;
                            }
                        }
                    } finally {
                        if(interrupted1) {
                            Thread.currentThread().interrupt();
                        }

                    }

                    this.getLog().debug("No executing jobs remaining, all threads stopped.");
                }

                this.getLog().debug("Shutdown of threadpool complete.");
            }
        }
    }

    public boolean runInThread(Runnable runnable) {
        if(runnable == null) {
            return false;
        } else {
            Object var2 = this.nextRunnableLock;
            synchronized(this.nextRunnableLock) {
                this.handoffPending = true;

                while(this.availWorkers.size() < 1 && !this.isShutdown) {
                    try {
                        this.nextRunnableLock.wait(500L);
                    } catch (InterruptedException var5) {
                        ;
                    }
                }

                SimpleThreadPool.WorkerThread wt;
                if(!this.isShutdown) {
                    wt = (SimpleThreadPool.WorkerThread)this.availWorkers.removeFirst();
                    this.busyWorkers.add(wt);
                    wt.run(runnable);
                } else {
                    wt = new SimpleThreadPool.WorkerThread(this, this.threadGroup, "WorkerThread-LastJob", this.prio, this.isMakeThreadsDaemons(), runnable);
                    this.busyWorkers.add(wt);
                    this.workers.add(wt);
                    wt.start();
                }

                this.nextRunnableLock.notifyAll();
                this.handoffPending = false;
                return true;
            }
        }
    }

    public int blockForAvailableThreads() {
        Object var1 = this.nextRunnableLock;
        synchronized(this.nextRunnableLock) {
            while((this.availWorkers.size() < 1 || this.handoffPending) && !this.isShutdown) {
                try {
                    this.nextRunnableLock.wait(500L);
                } catch (InterruptedException var4) {
                    ;
                }
            }

            return this.availWorkers.size();
        }
    }

    protected void makeAvailable(SimpleThreadPool.WorkerThread wt) {
        Object var2 = this.nextRunnableLock;
        synchronized(this.nextRunnableLock) {
            if(!this.isShutdown) {
                this.availWorkers.add(wt);
            }

            this.busyWorkers.remove(wt);
            this.nextRunnableLock.notifyAll();
        }
    }

    protected void clearFromBusyWorkersList(SimpleThreadPool.WorkerThread wt) {
        Object var2 = this.nextRunnableLock;
        synchronized(this.nextRunnableLock) {
            this.busyWorkers.remove(wt);
            this.nextRunnableLock.notifyAll();
        }
    }

    class WorkerThread extends Thread {
        private final Object lock;
        private AtomicBoolean run;
        private SimpleThreadPool tp;
        private Runnable runnable;
        private boolean runOnce;

        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name, int prio, boolean isDaemon) {
            this(tp, threadGroup, name, prio, isDaemon, (Runnable)null);
        }

        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name, int prio, boolean isDaemon, Runnable runnable) {
            super(threadGroup, name);
            this.lock = new Object();
            this.run = new AtomicBoolean(true);
            this.runnable = null;
            this.runOnce = false;
            this.tp = tp;
            this.runnable = runnable;
            if(runnable != null) {
                this.runOnce = true;
            }

            this.setPriority(prio);
            this.setDaemon(isDaemon);
        }

        void shutdown() {
            this.run.set(false);
        }

        public void run(Runnable newRunnable) {
            Object var2 = this.lock;
            synchronized(this.lock) {
                if(this.runnable != null) {
                    throw new IllegalStateException("Already running a Runnable!");
                } else {
                    this.runnable = newRunnable;
                    this.lock.notifyAll();
                }
            }
        }

        public void run() {
            boolean ran = false;

            while(this.run.get()) {
                boolean var21 = false;

                Object e2;
                label288: {
                    label289: {
                        try {
                            var21 = true;
                            e2 = this.lock;
                            synchronized(this.lock) {
                                while(this.runnable == null && this.run.get()) {
                                    this.lock.wait(500L);
                                }

                                if(this.runnable != null) {
                                    ran = true;
                                    this.runnable.run();
                                }

                                var21 = false;
                                break label288;
                            }
                        } catch (InterruptedException var30) {
                            InterruptedException e1 = var30;

                            try {
                                SimpleThreadPool.this.getLog().error("Worker thread was interrupt()\'ed.", e1);
                                var21 = false;
                            } catch (Exception var27) {
                                var21 = false;
                            }
                            break label289;
                        } catch (Throwable var31) {
                            Throwable e = var31;

                            try {
                                SimpleThreadPool.this.getLog().error("Error while executing the Runnable: ", e);
                                var21 = false;
                            } catch (Exception var25) {
                                var21 = false;
                            }
                        } finally {
                            if(var21) {
                                Object var8 = this.lock;
                                synchronized(this.lock) {
                                    this.runnable = null;
                                }

                                if(this.getPriority() != this.tp.getThreadPriority()) {
                                    this.setPriority(this.tp.getThreadPriority());
                                }

                                if(this.runOnce) {
                                    this.run.set(false);
                                    SimpleThreadPool.this.clearFromBusyWorkersList(this);
                                } else if(ran) {
                                    ran = false;
                                    SimpleThreadPool.this.makeAvailable(this);
                                }

                            }
                        }

                        e2 = this.lock;
                        synchronized(this.lock) {
                            this.runnable = null;
                        }

                        if(this.getPriority() != this.tp.getThreadPriority()) {
                            this.setPriority(this.tp.getThreadPriority());
                        }

                        if(this.runOnce) {
                            this.run.set(false);
                            SimpleThreadPool.this.clearFromBusyWorkersList(this);
                        } else if(ran) {
                            ran = false;
                            SimpleThreadPool.this.makeAvailable(this);
                        }
                        continue;
                    }

                    e2 = this.lock;
                    synchronized(this.lock) {
                        this.runnable = null;
                    }

                    if(this.getPriority() != this.tp.getThreadPriority()) {
                        this.setPriority(this.tp.getThreadPriority());
                    }

                    if(this.runOnce) {
                        this.run.set(false);
                        SimpleThreadPool.this.clearFromBusyWorkersList(this);
                    } else if(ran) {
                        ran = false;
                        SimpleThreadPool.this.makeAvailable(this);
                    }
                    continue;
                }

                e2 = this.lock;
                synchronized(this.lock) {
                    this.runnable = null;
                }

                if(this.getPriority() != this.tp.getThreadPriority()) {
                    this.setPriority(this.tp.getThreadPriority());
                }

                if(this.runOnce) {
                    this.run.set(false);
                    SimpleThreadPool.this.clearFromBusyWorkersList(this);
                } else if(ran) {
                    ran = false;
                    SimpleThreadPool.this.makeAvailable(this);
                }
            }

            try {
                SimpleThreadPool.this.getLog().debug("WorkerThread is shut down.");
            } catch (Exception var23) {
                ;
            }

        }
    }
}
GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐