ThreadPoolExecutor在一个确定的队列下提交任务,如果执行队列满必须阻塞的解决方法
ThreadPool
A simple C++11 Thread Pool implementation
项目地址:https://gitcode.com/gh_mirrors/th/ThreadPool
免费下载资源
·
原文:http://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size/4522411#4522411
问题:
需要解决只有一个IO读写生产者的任务并行化,每个消费者数据都在内存中,想控制等待消费者的数量。使用java的 ThreadPoolExecutor类
解决方法1:实现一个队列,offer方法也为阻塞
缺点:改变了ThreadPoolExecutor的设计初衷,因为获得任务是阻塞方式,那么达到maximumPoolSize后的线程,因为offer的方法为阻塞,无法将空闲的线程收回
public class LimitedQueue < E > extends LinkedBlockingQueue < E > { public LimitedQueue( int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true ; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return false ; } }
解决方法2:使用RejectedExecutionHandler,方法1
ThreadPoolExecutor.execute(Runnable)的代码如下
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl. get (); if ( ! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker( null , false ); } else if ( ! addWorker(command, false )) reject(command);
offer阻塞导致else if (!addWorker(command, false))无法进入,可采用RejectedExecutionHandler
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if ( ! executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException( " Executor was interrupted while the task was waiting to put on work queue " , e); } }
说明:corePoolSize==0会导致问题;put方法导致任务和ThreadPoolExecutor内包装不一致,有影响;corePoolSize < maxPoolSize的情况下通常这种方法好用。
解决方法3:子类化ThreadPoolExecutor,在提交之前和之后,使用信号量进行限制
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor( int bound) { super(bound, Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue < Runnable > ()); semaphore = new Semaphore(bound); } /* *Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public < T > Future < T > submitButBlockIfFull(final Callable < T > task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }
方法4:使用锁和条件变量在ThreadPoolExecutor的beforeExecute/afterExecute做文章
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /* * * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this .maxTaskCount = maxTaskCount; } /* * * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock. lock (); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount ++ ; } finally { taskLock.unlock(); } } /* * * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock. lock (); try { currentTaskCount -- ; unpaused.signalAll(); } finally { taskLock.unlock(); } } }
解决方法5:本方法不具有通用性,设置为一个具有固定长度的阻塞队列保存future,消费者消费阻塞队列中的future,生产者提交任务通过put到阻塞队列来进行处理
final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue < Future > futures = new LinkedBlockingQueue <> (maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || ! futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }
可供参考的其他文档
http://howtodoinjava.com/core-java/multi-threading/how-to-use-blockingqueue-and-threadpoolexecutor-in-java/http://stackoverflow.com/questions/7556465/why-threadpoolexecutor-has-blockingqueue-as-its-argument
http://stackoverflow.com/questions/3446011/threadpoolexecutor-block-when-queue-is-full
http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated/2001205#2001205
http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated
GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
更多推荐
已为社区贡献4条内容
所有评论(0)