如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用
需求是在一个大数据量的表中按条件查询出数据后做相应的业务。我是使用的java线程池ThreadPoolExecutor,实现分批次去查询,查询到数据后,又分多个线程去做业务。
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。
线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
下面来看具体的代码(代码中会有部分代码以+++表示不方便各位查看的):
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
int pageSize = 1000; // 每次查询交易条数
int handleSize = 200; // 线程一次性处理的交易条数
int handleCount = 0;
int transCount = +++mapper.getTransCount(batchDate, +);//根据条件去查询需要做业务的数据条数,查询条数的sql语句快
logger.info(MessageFormat.format("[+++日期:[{0}], 待+++记录条数为:[{1}]", batchDate, transCount));//MessageFormat.format是日志的一个方法,推荐大家这么使用
List<+++> tranList = null;
while (handleCount < transCount)
{
tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
if (tranList == null || tranList.size() == 0)
{
logger.info(MessageFormat.format(++++
handleCount += pageSize;
continue;
}
int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
CountDownLatch latch = new CountDownLatch(splitCount);
for (int i = 0; i < splitCount; i++)
{
int toIndex = (i + 1) * handleSize;
if (i == splitCount - 1)
{
toIndex = tranList.size();
}
List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到线程池,执行的方法是+++Thread类中的run方法
}
handleCount += pageSize;
try
{
latch.await(5, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
logger.error(getClass().getName() + " doTask fail.", e);
}
}
下面是threadPool.execute(new +++Thread...中的thread类
public class +++Thread implements Runnable
{
private Logger logger =
private List<NpcTransaction> trans;
private CountDownLatch latch;
private String batchDate;
private +++Manager
public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
CountDownLatch latch)
{
this.+++Manager = +++Manager;
this.trans = trans;
this.batchDate = batchDate;
this.latch = latch;
}
/**
* 重载方法
*/
@Override
public void run()
{
int saveCount = 0;
try
{
saveCount = +++Manager.save+++Record(trans);//
}
catch (Exception e)
{
logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++异常:[{1}]", batchDate, e.getMessage()));
e.printStackTrace();
}
if (saveCount != trans.size())
{
logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++异常,++成功条数:[{1}],预期条数:[{2}]", batchDate,
saveCount, trans.size()));
}
latch.countDown();
}
public List<++Transaction> getTrans()
{
return trans;
}
public void setTrans(List<++Transaction> trans)
{
this.trans = trans;
}
/**
* 获取 latch
*
* @return 返回 latch
*/
public CountDownLatch getLatch()
{
return latch;
}
/**
* 设置 latch
*
* @param 对latch进行赋值
*/
public void setLatch(CountDownLatch latch)
{
this.latch = latch;
}
/**
* 获取 batchDate
*
* @return 返回 batchDate
*/
public String getBatchDate()
{
return batchDate;
}
/**
* 设置 batchDate
*
* @param 对batchDate进行赋值
*/
public void setBatchDate(String batchDate)
{
this.batchDate = batchDate;
}
}
都要写get,set方法,latch.countDown();这个最好写在finally中
关于CountDownLatch这个,我下面简单的说一下:
CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
主要方法
public CountDownLatch(int count);
public void countDown();
public void await() throws InterruptedException
构造方法参数指定了计数的次数
countDown方法,当前线程调用此方法,则计数减一
awaint方法,调用此方法会一直阻塞当前线程,直到计时器的值为0
此处用计数器,因为一组1000条数据通过5个线程去执行,这组执行完再进行第二组以及其后组的1000条数据操作。
更多推荐
所有评论(0)