Springboot使用ThreadPoolTaskExecutor线程池,多线程调用实例
ThreadPool
A simple C++11 Thread Pool implementation
项目地址:https://gitcode.com/gh_mirrors/th/ThreadPool
免费下载资源
·
项目场景:15万条数据需要做逻辑处理,然后存入数据库
硬件:windows 4核 i7 16G内存
问题描述:响应太慢,服务容易出现卡死
原因分析:
原先是JPA保存,JPA 中save()方法点进去可以看到,源码中还调用了isNew()方法,该方法里面又执行了一次findById(),所以每次JPA保存一条数据都会执行两条sql ,一次查询一次保存或更新
解决方案:
1.修改存库方式,通过mybatis批量insert,一般每批3000条,语句过长会报错,并不是每批量越大越好,需要根据实际情况分析,笔者这边不做过多解释,有兴趣的可以看下相关知识
2.通过多线程执行
多线程配置
新建一个class,把以下配置文件直接原封不动copy过去就行,了解一下参数含义
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ApplicationConfiguration {
/**
* 公共线程池
*/
@Bean
public ThreadPoolTaskExecutor commonThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor pool = new org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor();
int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
pool.setCorePoolSize(corePoolSize); // 核心池大小
pool.setMaxPoolSize(maxPoolSize); // 最大线程数
pool.setQueueCapacity(maxPoolSize * 1000); // 队列程度
pool.setThreadPriority(Thread.MAX_PRIORITY);
pool.setDaemon(false);
pool.setKeepAliveSeconds(300);// 线程空闲时间
return pool;
}
}
实例调用
这里模拟一个保存方法
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor; // 注入线程池
@Autowired
private AggregateAccountingMapper aggregateAccountingMapper;
@Autowired
private AggregateAccountingService aggregateAccountingService;
@GetMapping("/save")
public void save() {
List<AggregateAccounting> list = this.aggregateAccountingService.list(); // 这里模拟15万条数据源
// 注入线程池
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
threadPoolTaskExecutor);
/**
* partition()方法 大家可以看一下
* 例如 3001条数据 他会自动帮你分成两个数组 第一个数组3000条 第二个数组1条
* 不需要我们再像以前一样 通过for循环处理截取
*/
List<List<AggregateAccounting>> lists = Lists.partition(list, 3000);
lists.forEach(item -> {
// 这里做的事情就是 根据lists大小确认要多少个线程 给每个线程分配任务
completionService.submit(new Callable() {
@Override
public Object call() throws Exception {
// insertList()方法 就是批量insert语句
return aggregateAccountingMapper.insertList(item);
}
});
});
// 这里是让多线程开始执行
lists.forEach(item -> {
try {
completionService.take().get();
} catch (InterruptedException | ExecutionException e) {
System.out.println(e);
}
});
}
}
总结
通过mybatis批量insert 15万条数据(包含业务处理时间)
1.单线程:65S
2.多线程:50S
GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
更多推荐
已为社区贡献1条内容
所有评论(0)