Java多线程CountDownLatch与线程池ThreadPool的使用
一. CountDownLatch是什么
CountDownLatch是一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch是通过一个计数器实现的,计数器的初始值为线程的数量,这是一个一次性现象,计数不能重置。如果需要重置计数,可以考虑使用cyclicbarrier。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
二. CountDownLatch中的主要方法
1. public void CountDownLatch(int count) {…} 构造器中的计数值(count)实际上就是需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重置这个计数值。与CountDownLatch第一次交互式主线程等待其他线程,主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
2. await() :导致当前线程等待,直到计数器下降到零,除非线程被中断。如果当前计数为零,则此方法立即返回。如果当前计数大于零,则当前线程将禁用线程调度,并且处于休眠状态,直到出现两种情况之一:计数达到零的countdown()方法调用;或其他线程中断当前线程。
3. countDown():每调用一次这个方法,在构造函数中初始化的count值就减1。直到count的值为0,主线程就能通过await()方法,恢复执行自己的任务。
三. 线程池
Java通过Executors提供四种线程池,分别是:
1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
四. 使用场景
利用CountDownLatch和newFixedThreadPool实现excel批量导入需求。关键代码:
/**
* Description:
* 线程池工具类
* 保证全局使用一个相同的线程池,方便控制线程的创建和销毁
*/
public class ThreadPoolUtil {
private static final ExecutorService pool = Executors.newFixedThreadPool(1000);
public static void submit(Runnable runnable){
pool.submit(runnable);
}
}
/**
* 后台挂起执行列传商品以及插入数据库
*/
Map<String, Object> runAdd(final Map<Integer,String> errorRowMap,final HSSFSheet hssfSheet, final WmsCompany company){
final Map<String, Object> result = new HashMap<>();
result.put("repeatCount", 0);
//保存商品数据的集合
final Map<String, WmsGoods> goodsMap = new ConcurrentHashMap<>();
//通过检查的excel行数,即导入的商品数
final int rowCount = **;
//使添加线程等待转换线程执行后执行的工具
final CountDownLatch doWork = new CountDownLatch(rowCount);
try {
for (int i = 2; i <= rowCount; i++) {
final HSSFRow hssfRow = hssfSheet.getRow(i);
if (hssfRow == null){
doWork.countDown();
continue;
}
if( errorRowMap.containsKey(i)){
doWork.countDown();
continue;
}
ThreadPoolUtil.submit(new Runnable() {
@Override
public void run() {
WmsGoods wmsGoods = cellToWmsGoods(hssfRow);
if (wmsGoods != null) {
try{
wmsGoods.setCompid(company.getCompid());
wmsGoods.setStoreid(company.getStoreId().intValue());
wmsGoods.setStorename(company.getStoreName());
wmsGoods.setCreaemp(company.getMobile());
wmsGoods.setCrearq(DateUtil.getStrTime());
wmsGoods.setGoodType("0");//默认普通商品
......
logger.info("公司信息:"+new Gson().toJson(company).toString());
}catch(Exception e){
e.printStackTrace();
}
}
//递减锁存器的计数
doWork.countDown();
}
});
}
//添加线程等待转换线执行
doWork.await();
//执行添加操作
List<WmsGoods> goodsList = new CopyOnWriteArrayList<>();
goodsList.addAll(goodsMap.values());
result.putAll(wmsGoodsService.addGoodsTimer(goodsList));
} catch (Exception e) {
e.printStackTrace();
logger.error("添加商品失败!");
result.put("state", 1);
result.put("msg", e.getMessage());
}finally {
logger.info("添加商品完成---------------!");
}
return result;
}
五. 遇到的问题
1. 在使用线程池批量导入,几次遇到了服务器多次挂了的情况,排查原因,主要是程序的线程池未及时回收,导致资源不足,所以采用了一个全局的线程池,固定开启1000个线程。
2. 底层采用的是mybatis,最开始是用的批量更新方法,将每次导入的数据一次入库,更新,后来遇到时间过长的问题,所以改成了单条执行插入,更新操作。
更多推荐
所有评论(0)