需求是在一个大数据量的表中按条件查询出数据后做相应的业务。我是使用的java线程池ThreadPoolExecutor,实现分批次去查询,查询到数据后,又分多个线程去做业务。

线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,   long keepAliveTime,   TimeUnit unit,   BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

corePoolSize: 线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime: 线程池维护线程所允许的空闲时间

unit: 线程池维护线程所允许的空闲时间的单位

workQueue: 线程池所使用的缓冲队列

handler: 线程池对拒绝任务的处理策略

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。

线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。当调用 execute() 方法添加一个任务时,线程池会做如下判断:
         a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
   b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
   c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
   d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
   这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

下面来看具体的代码(代码中会有部分代码以+++表示不方便各位查看的):

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());

        int pageSize = 1000; // 每次查询交易条数
        int handleSize = 200; // 线程一次性处理的交易条数
        int handleCount = 0;
        int transCount = +++mapper.getTransCount(batchDate, +);//根据条件去查询需要做业务的数据条数,查询条数的sql语句快
        logger.info(MessageFormat.format("[+++日期:[{0}], 待+++记录条数为:[{1}]", batchDate, transCount));//MessageFormat.format是日志的一个方法,推荐大家这么使用
        List<+++> tranList = null;
        while (handleCount < transCount)
        {
            tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
            if (tranList == null || tranList.size() == 0)
            {
                logger.info(MessageFormat.format(++++
                handleCount += pageSize;
                continue;
            }

            int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
            CountDownLatch latch = new CountDownLatch(splitCount);
            for (int i = 0; i < splitCount; i++)
            {
                int toIndex = (i + 1) * handleSize;
                if (i == splitCount - 1)
                {
                    toIndex = tranList.size();
                }
                List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
                threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到线程池,执行的方法是+++Thread类中的run方法
            }

            handleCount += pageSize;

            try
            {
                latch.await(5, TimeUnit.MINUTES);
            }
            catch (InterruptedException e)
            {
                logger.error(getClass().getName() + " doTask fail.", e);
            }
        }

下面是threadPool.execute(new +++Thread...中的thread类

public class +++Thread implements Runnable
{

    private Logger logger =

    private List<NpcTransaction> trans;

    private CountDownLatch latch;

    private String batchDate;

    private +++Manager

    public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
            CountDownLatch latch)
    {
        this.+++Manager = +++Manager;
        this.trans = trans;
        this.batchDate = batchDate;
        this.latch = latch;
    }

    /**
     * 重载方法
     */
    @Override
    public void run()
    {

        int saveCount = 0;
        try
        {
            saveCount = +++Manager.save+++Record(trans);//
        }
        catch (Exception e)
        {
            logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++异常:[{1}]", batchDate, e.getMessage()));
            e.printStackTrace();
        }
        if (saveCount != trans.size())
        {
            logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++异常,++成功条数:[{1}],预期条数:[{2}]", batchDate,
                    saveCount, trans.size()));
        }
        latch.countDown();

    }

    public List<++Transaction> getTrans()
    {
        return trans;
    }

    public void setTrans(List<++Transaction> trans)
    {
        this.trans = trans;
    }

    /**
     * 获取 latch
     *
     * @return 返回 latch
     */
    public CountDownLatch getLatch()
    {

        return latch;
    }

    /**
     * 设置 latch
     *
     * @param 对latch进行赋值
     */
    public void setLatch(CountDownLatch latch)
    {

        this.latch = latch;
    }

    /**
     * 获取 batchDate
     *
     * @return 返回 batchDate
     */
    public String getBatchDate()
    {

        return batchDate;
    }

    /**
     * 设置 batchDate
     *
     * @param 对batchDate进行赋值
     */
    public void setBatchDate(String batchDate)
    {

        this.batchDate = batchDate;
    }

}

都要写get,set方法,latch.countDown();这个最好写在finally中

关于CountDownLatch这个,我下面简单的说一下:

CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
主要方法
 public CountDownLatch(int count);
 public void countDown();
 public void await() throws InterruptedException
 构造方法参数指定了计数的次数
 countDown方法,当前线程调用此方法,则计数减一
 awaint方法,调用此方法会一直阻塞当前线程,直到计时器的值为0

此处用计数器,因为一组1000条数据通过5个线程去执行,这组执行完再进行第二组以及其后组的1000条数据操作。




GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:25 天前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐