使用异步线程池ThreadPoolTaskExecutor进行并发处理批量操作
ThreadPool
A simple C++11 Thread Pool implementation
项目地址:https://gitcode.com/gh_mirrors/th/ThreadPool
免费下载资源
·
案例:用户在商品列表进行检索,结果集大约有100W商品,点击批量上架/下架。
一、配置异步线程池
1.springboot
配置类ThreadPoolConfig
package ***;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* 异步线程池
* @author yanxh
*
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig {
/**
* 核心线程数
*/
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
/**
* 最大线程数
*/
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
/**
* 队列最大长度
*/
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
/**
* 线程池维护线程所允许的空闲时间
*/
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
/**
* 线程池对拒绝任务(无线程可用)的处理策略
*/
private CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
private String threadNamePrefix = "AsyncExecutorThread-";
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize();
return executor;
}
}
配置文件 application.yml
#异步线程池
async:
executor:
thread:
core_pool_size : 10
max_pool_size : 50
queue_capacity : 1000
keep_alive_seconds : 300
2.spring
application-context.xml
<!-- 异步线程池 -->
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="10" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="50" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="1000" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="300" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
二、业务层引入异步线程池
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
/**
* 批量更新商品
* @param params
*/
public void batchUpdatePrdByParams(Map<String, Object> params){
taskExecutor.execute(new BatchUpdatePrdByParamsExecutor(redisClient, txManager, prdBaseMapper));
}
其中,BatchUpdatePrdByParamsExecutor类为Runnable接口的一个实现类,其业务逻辑所需要的数据和对象,全部通过构造器的方式进行传递。需注意,这里进入后台线程后,请求会马上响应回用户,所以为了避免用户得到响应结果但数据还未完成处理的现象,最好在用户响应页面设置等候的效果(相信难不倒猿友)。
三、线程类处理
package ....;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import com.chongdong.data.entity.PrdBase;
import com.chongdong.data.mapper.PrdBaseMapper;;
import com.github.pagehelper.PageHelper;
/**
* @author yanxh
*
*/
public class BatchUpdatePrdByParamsExecutorimplements Runnable {
Logger log = LoggerFactory.getLogger(BatchUpdatePrdByParamsExecutor.class);
private DataSourceTransactionManager txManager;
private PrdBaseMapper prdBaseMapper;
public BatchUpdatePrdByParamsExecutor(DataSourceTransactionManager txManager, PrdBaseMapper prdBaseMapper) {
this.txManager = txManager;
this.prdBaseMapper = prdBaseMapper;
}
@Override
public void run() {
int pageSize = 1000; // 批次处理条目数
while (true) {
/**
* 查询符合条件的商品
*/
PageHelper.startPage(1, pageSize, false);
List<PrdBase> baseResult = 根据参数分页查询结果集;
if (CollectionUtils.isEmpty(baseResult)) {
break;
}
// spring无法处理thread的事务,声明式事务无效
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
TransactionStatus rollbackPoint = txManager.getTransaction(def);// 设置回滚点
try {
更新商品的逻辑
// 提交事务
txManager.commit(rollbackPoint);
/**
* 结果集小于批次条目数,停止查询
*/
if (baseResult.size() < pageSize) {
break;
}
} catch (Exception e) {
log.error(e.getMessage());
e.printStackTrace();
// 回滚事务
txManager.rollback(rollbackPoint);
}
}
}
}
在这个线程类中,我们使用循环处理的方式,在一次循环中,获取第一页的数据,进行更新,并且开启事务,当本次处理成功后,提交事务,进行下一次循环,使用这种将数据分批处理的方式,会比一条一条处理节省数据库连接的开销,也比一次全部数量响应快速,至于每次循环开启分页的偏移量,需要根据自己的实际情况判断,取一个适中的值。
欢迎大家和帝都的雁积极互动,头脑交流会比个人埋头苦学更有效!共勉!
公众号:帝都的雁
GitHub 加速计划 / th / ThreadPool
7.74 K
2.22 K
下载
A simple C++11 Thread Pool implementation
最近提交(Master分支:2 个月前 )
9a42ec13 - 9 年前
fcc91415 - 9 年前
更多推荐
已为社区贡献1条内容
所有评论(0)