一. CountDownLatch是什么
    CountDownLatch是一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
    CountDownLatch是通过一个计数器实现的,计数器的初始值为线程的数量,这是一个一次性现象,计数不能重置。如果需要重置计数,可以考虑使用cyclicbarrier。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

二. CountDownLatch中的主要方法
    1. public void CountDownLatch(int count) {…} 构造器中的计数值(count)实际上就是需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重置这个计数值。与CountDownLatch第一次交互式主线程等待其他线程,主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
    2. await() :导致当前线程等待,直到计数器下降到零,除非线程被中断。如果当前计数为零,则此方法立即返回。如果当前计数大于零,则当前线程将禁用线程调度,并且处于休眠状态,直到出现两种情况之一:计数达到零的countdown()方法调用;或其他线程中断当前线程。
    3. countDown():每调用一次这个方法,在构造函数中初始化的count值就减1。直到count的值为0,主线程就能通过await()方法,恢复执行自己的任务。

三. 线程池
    Java通过Executors提供四种线程池,分别是:
    1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
    4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

四. 使用场景
    利用CountDownLatch和newFixedThreadPool实现excel批量导入需求。关键代码:

/**
 * Description:
 * 线程池工具类
 * 保证全局使用一个相同的线程池,方便控制线程的创建和销毁
 */
public class ThreadPoolUtil {

    private static final ExecutorService pool = Executors.newFixedThreadPool(1000);

    public static void submit(Runnable runnable){
        pool.submit(runnable);
    }
}
/**
* 后台挂起执行列传商品以及插入数据库
*/
Map<String, Object> runAdd(final Map<Integer,String> errorRowMap,final HSSFSheet hssfSheet, final WmsCompany company){
   final Map<String, Object> result = new HashMap<>();
   result.put("repeatCount", 0);

   //保存商品数据的集合
   final Map<String, WmsGoods> goodsMap = new ConcurrentHashMap<>();

   //通过检查的excel行数,即导入的商品数
   final int rowCount = **;

   //使添加线程等待转换线程执行后执行的工具
   final CountDownLatch doWork = new CountDownLatch(rowCount);

   try {
       for (int i = 2; i <= rowCount; i++) {
           final HSSFRow hssfRow = hssfSheet.getRow(i);

           if (hssfRow == null){
               doWork.countDown();
               continue;
           }

           if( errorRowMap.containsKey(i)){
               doWork.countDown();
               continue;
           }

ThreadPoolUtil.submit(new Runnable() {
     @Override
     public void run() {
        WmsGoods wmsGoods = cellToWmsGoods(hssfRow);

          if (wmsGoods != null) {

            try{
               wmsGoods.setCompid(company.getCompid());
               wmsGoods.setStoreid(company.getStoreId().intValue());
               wmsGoods.setStorename(company.getStoreName());
               wmsGoods.setCreaemp(company.getMobile());
               wmsGoods.setCrearq(DateUtil.getStrTime());
               wmsGoods.setGoodType("0");//默认普通商品
            ......
 logger.info("公司信息:"+new Gson().toJson(company).toString());

               }catch(Exception e){
                  e.printStackTrace();
               }
         }
             //递减锁存器的计数
             doWork.countDown();
       }
    });

 }

      //添加线程等待转换线执行
      doWork.await();
       //执行添加操作
       List<WmsGoods> goodsList = new CopyOnWriteArrayList<>();
       goodsList.addAll(goodsMap.values());

       result.putAll(wmsGoodsService.addGoodsTimer(goodsList));
   } catch (Exception e) {
       e.printStackTrace();

       logger.error("添加商品失败!");
       result.put("state", 1);
       result.put("msg", e.getMessage());
   }finally {
       logger.info("添加商品完成---------------!");
   }
   return result;
}

五. 遇到的问题

    1. 在使用线程池批量导入,几次遇到了服务器多次挂了的情况,排查原因,主要是程序的线程池未及时回收,导致资源不足,所以采用了一个全局的线程池,固定开启1000个线程。

    2. 底层采用的是mybatis,最开始是用的批量更新方法,将每次导入的数据一次入库,更新,后来遇到时间过长的问题,所以改成了单条执行插入,更新操作。

GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐