动态线程池组件

搭建动态线程池组件工程骨架
搭一个父子 Maven 工程:
dynamic-thread-pool
└── dynamic-thread-pool-spring-boot-starter
父工程 dynamic-thread-pool 负责统一管理版本、依赖、插件、Java 版本。
子工程 dynamic-thread-pool-spring-boot-starter 是真正要做的组件包,也就是以后别人引入这个 starter,就能使用你的动态线程池能力。
目前 starter 下面分了几个包:
config:自动配置入口
domain:核心业务能力,比如线程池管理、线程池刷新
registry:注册中心,比如把线程池信息上报到 Redis/Nacos
trigger:触发器,比如定时任务、监听器、配置变更监听
关键文件分析
pom.xml
父工程使用:
<packaging>pom</packaging>
说明不是直接打 jar,而是作为聚合工程管理子模块。
👉 聚合工程 = 只负责“管理多个子模块”的工程,不写业务代码、不打 jar
同时引入了 Spring Boot 2.7.12,并统一管理了:
guava
commons-lang
commons-codec
fastjson
junit
guava
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
Google 出的 Java增强工具库,可以理解为:
👉 “Java 标准库不够用,我帮你补齐”
常用功能
- 集合增强(List / Map / Set)
- 缓存(Local Cache)
- 限流(RateLimiter)
- 字符串工具
- Optional(早期)
commons-lang
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
作用
👉 Java 基础增强工具类(字符串 / 对象 / 数组)
常用类
StringUtils
ObjectUtils
ArrayUtils
NumberUtils
commons-codec
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
作用
👉 编码 & 加密工具库
支持内容
- Base64
- MD5
- SHA
- URL 编码
fastjson
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
作用
👉 JSON 序列化 / 反序列化
String json = JSON.toJSONString(obj);
User user = JSON.parseObject(json, User.class);
junit
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
作用
👉 Java 单元测试框架
@Test
public void test() {
assertEquals(2, 1 + 1);
}
dynamic-thread-pool-spring-boot-starter/pom.xml
这个是真正的 starter 模块。
引入了:
spring-boot-starter
spring-boot-autoconfigure
spring-boot-configuration-processor
spring-boot-starter-aop
redisson-spring-boot-starter
spring-boot-starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
作用
👉 Spring Boot 核心启动依赖(最基础的)
它会帮你引入:
Spring Core
Spring Context
日志系统(Logback)
自动配置基础能力
spring-boot-autoconfigure
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
作用
👉 实现自动装配的核心依赖
@Configuration
@ConditionalOnMissingBean
public class DynamicThreadPoolAutoConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
return new ThreadPoolExecutor(...);
}
}
spring-boot-configuration-processor
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
作用
👉 生成配置提示(IDE 自动补全)
示例
你写一个配置类:
@ConfigurationProperties(prefix = "dynamic.thread.pool")
public class ThreadPoolProperties {
private int coreSize;
private int maxSize;
}
👉 在 application.yml 里:
dynamic:
thread:
pool:
core-size: 10
IDE 会自动提示!
spring-boot-starter-aop
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
作用
👉 面向切面编程(AOP)
redisson-spring-boot-starter
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
作用
👉 Redis 分布式客户端(高级版)
能干什么?
分布式锁
分布式 Map / Set
发布订阅(Pub/Sub)
配置共享
DynamicThreadPoolAutoConfig
public class DynamicThreadPoolAutoConfig {
}
但它的定位很重要。
后面它应该会变成自动配置入口,例如:
@Configuration
@EnableConfigurationProperties(DynamicThreadPoolProperties.class)
public class DynamicThreadPoolAutoConfig {
}
@EnableConfigurationProperties()👉 “把配置文件里的内容,自动绑定成一个 Java 对象,并交给 Spring 管理”
① 启用配置类
告诉 Spring:
这个类是一个配置类(Properties),你要帮我加载它
② 绑定配置文件
把 application.yml / application.properties 里的配置:
dynamic:
thread:
pool:
core-size: 10
max-size: 50
自动绑定到这个类:
@ConfigurationProperties(prefix = "dynamic.thread.pool")
public class DynamicThreadPoolProperties {
private int coreSize;
private int maxSize;
}
也就是说,用户项目只要引入 starter,Spring Boot 就能自动加载你的动态线程池组件。
DynamicThreadPoolAutoConfig
现在还是空类:
public class DynamicThreadPoolAutoConfig {
}
但它的定位很重要。
后面它应该会变成自动配置入口,例如:
@Configuration
@EnableConfigurationProperties(DynamicThreadPoolProperties.class)
public class DynamicThreadPoolAutoConfig {
}
也就是说,用户项目只要引入 starter,Spring Boot 就能自动加载你的动态线程池组件。
META-INF/spring.factories
现在内容是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
这个目前还不完整。
后面应该写成类似:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.bustack.middleware.dynamic.thread.pool.sdk.config.DynamicThreadPoolAutoConfig
否则你的 DynamicThreadPoolAutoConfig 不会被 Spring Boot 自动加载。
动态线程池实现
项目结构
starter(核心能力)
├── config 自动装配 + 配置
├── domain 线程池核心逻辑
├── registry 注册中心(Redis)
├── trigger 定时任务 + 监听器admin(控制台)
├── controller 接口层test(测试应用)
├── config 线程池定义
├── main 模拟任务运行
之前 DynamicThreadPoolAutoConfig 是空的
现在它做了几件关键事:
1. 创建 RedissonClient
2. 创建 RedisRegistry
3. 扫描 ThreadPoolExecutor
4. 初始化线程池配置(从 Redis 拉)
5. 创建 DynamicThreadPoolService
6. 启动定时任务
7. 注册 Redis Topic 监听器
@Configuration
@EnableConfigurationProperties(DynamicThreadPoolAutoProperties.class)
@EnableScheduling
public class DynamicThreadPoolAutoConfig {
private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolAutoConfig.class);
private String applicationName;
@Bean("redissonClient")
public RedissonClient redissonClient(DynamicThreadPoolAutoProperties properties) {
Config config = new Config();
// 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
config.setCodec(JsonJacksonCodec.INSTANCE);
config.useSingleServer()
.setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
.setPassword(properties.getPassword())
.setConnectionPoolSize(properties.getPoolSize())
.setConnectionMinimumIdleSize(properties.getMinIdleSize())
.setIdleConnectionTimeout(properties.getIdleTimeout())
.setConnectTimeout(properties.getConnectTimeout())
.setRetryAttempts(properties.getRetryAttempts())
.setRetryInterval(properties.getRetryInterval())
.setPingConnectionInterval(properties.getPingInterval())
.setKeepAlive(properties.isKeepAlive())
;
RedissonClient redissonClient = Redisson.create(config);
logger.info("动态线程池,注册器(redis)链接初始化完成。{} {} {}", properties.getHost(), properties.getPoolSize(), !redissonClient.isShutdown());
return redissonClient;
}
@Bean
public IRegistry redisRegistry(RedissonClient redissonClient) {
return new RedisRegistry(redissonClient);
}
@Bean("dynamicThreadPollService")
public DynamicThreadPoolService dynamicThreadPollService(ApplicationContext applicationContext, Map<String, ThreadPoolExecutor> threadPoolExecutorMap, RedissonClient redissonClient) {
applicationName = applicationContext.getEnvironment().getProperty("spring.application.name");
if (StringUtils.isBlank(applicationName)) {
applicationName = "缺省的";
logger.warn("动态线程池,启动提示。SpringBoot 应用未配置 spring.application.name 无法获取到应用名称!");
}
// 获取缓存数据,设置本地线程池配置
Set<String> threadPoolKeys = threadPoolExecutorMap.keySet();
for (String threadPoolKey : threadPoolKeys) {
ThreadPoolConfigEntity threadPoolConfigEntity = redissonClient.<ThreadPoolConfigEntity>getBucket(RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey() + "_" + applicationName + "_" + threadPoolKey).get();
if (null == threadPoolConfigEntity) continue;
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolKey);
threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
}
return new DynamicThreadPoolService(applicationName, threadPoolExecutorMap);
}
@Bean
public ThreadPoolDataReportJob threadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
return new ThreadPoolDataReportJob(dynamicThreadPoolService, registry);
}
@Bean
public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
return new ThreadPoolConfigAdjustListener(dynamicThreadPoolService, registry);
}
@Bean(name = "dynamicThreadPoolRedisTopic")
public RTopic threadPoolConfigAdjustListener(RedissonClient redissonClient, ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener) {
RTopic topic = redissonClient.getTopic(RegistryEnumVO.DYNAMIC_THREAD_POOL_REDIS_TOPIC.getKey() + "_" + applicationName);
topic.addListener(ThreadPoolConfigEntity.class, threadPoolConfigAdjustListener);
return topic;
}
}
DynamicThreadPoolAutoProperties
绑定配置文件(application.yml)
@ConfigurationProperties(prefix = "dynamic.thread.pool.config", ignoreInvalidFields = true)
public class DynamicThreadPoolAutoProperties {
/** 状态;open = 开启、close 关闭 */
private boolean enable;
/** redis host */
private String host;
/** redis port */
private int port;
/** 账密 */
private String password;
/** 设置连接池的大小,默认为64 */
private int poolSize = 64;
/** 设置连接池的最小空闲连接数,默认为10 */
private int minIdleSize = 10;
/** 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000 */
private int idleTimeout = 10000;
/** 设置连接超时时间(单位:毫秒),默认为10000 */
private int connectTimeout = 10000;
/** 设置连接重试次数,默认为3 */
private int retryAttempts = 3;
/** 设置连接重试的间隔时间(单位:毫秒),默认为1000 */
private int retryInterval = 1000;
/** 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查 */
private int pingInterval = 0;
/** 设置是否保持长连接,默认为true */
private boolean keepAlive = true;
public boolean isEnable() {
return enable;
}
public void setEnable(boolean enable) {
this.enable = enable;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getPoolSize() {
return poolSize;
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
public int getMinIdleSize() {
return minIdleSize;
}
public void setMinIdleSize(int minIdleSize) {
this.minIdleSize = minIdleSize;
}
public int getIdleTimeout() {
return idleTimeout;
}
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
public int getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getRetryAttempts() {
return retryAttempts;
}
public void setRetryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
}
public int getRetryInterval() {
return retryInterval;
}
public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
}
public int getPingInterval() {
return pingInterval;
}
public void setPingInterval(int pingInterval) {
this.pingInterval = pingInterval;
}
public boolean isKeepAlive() {
return keepAlive;
}
public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}
}
DynamicThreadPoolService(🔥核心业务)
管理所有线程池
public class DynamicThreadPoolService implements IDynamicThreadPoolService {
private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolService.class);
private final String applicationName;
private final Map<String, ThreadPoolExecutor> threadPoolExecutorMap;
public DynamicThreadPoolService(String applicationName, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
this.applicationName = applicationName;
this.threadPoolExecutorMap = threadPoolExecutorMap;
}
@Override
public List<ThreadPoolConfigEntity> queryThreadPoolList() {
Set<String> threadPoolBeanNames = threadPoolExecutorMap.keySet();
List<ThreadPoolConfigEntity> threadPoolVOS = new ArrayList<>(threadPoolBeanNames.size());
for (String beanName : threadPoolBeanNames) {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(beanName);
ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, beanName);
threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());
threadPoolVOS.add(threadPoolConfigVO);
}
return threadPoolVOS;
}
@Override
public ThreadPoolConfigEntity queryThreadPoolConfigByName(String threadPoolName) {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolName);
if (null == threadPoolExecutor) return new ThreadPoolConfigEntity(applicationName, threadPoolName);
// 线程池配置数据
ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, threadPoolName);
threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());
if (logger.isDebugEnabled()) {
logger.info("动态线程池,配置查询 应用名:{} 线程名:{} 池化配置:{}", applicationName, threadPoolName, JSON.toJSONString(threadPoolConfigVO));
}
return threadPoolConfigVO;
}
@Override
public void updateThreadPoolConfig(ThreadPoolConfigEntity threadPoolConfigEntity) {
if (null == threadPoolConfigEntity || !applicationName.equals(threadPoolConfigEntity.getAppName())) return;
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolConfigEntity.getThreadPoolName());
if (null == threadPoolExecutor) return;
// 设置参数 「调整核心线程数和最大线程数」
threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
}
}
ThreadPoolConfigEntity
线程池数据模型
public class ThreadPoolConfigEntity {
/**
* 应用名称
*/
private String appName;
/**
* 线程池名称
*/
private String threadPoolName;
/**
* 核心线程数
*/
private int corePoolSize;
/**
* 最大线程数
*/
private int maximumPoolSize;
/**
* 当前活跃线程数
*/
private int activeCount;
/**
* 当前池中线程数
*/
private int poolSize;
/**
* 队列类型
*/
private String queueType;
/**
* 当前队列任务数
*/
private int queueSize;
/**
* 队列剩余任务数
*/
private int remainingCapacity;
public ThreadPoolConfigEntity() {
}
public ThreadPoolConfigEntity(String appName, String threadPoolName) {
this.appName = appName;
this.threadPoolName = threadPoolName;
}
public String getAppName() {
return appName;
}
public String getThreadPoolName() {
return threadPoolName;
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public void setMaximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}
public int getActiveCount() {
return activeCount;
}
public void setActiveCount(int activeCount) {
this.activeCount = activeCount;
}
public int getPoolSize() {
return poolSize;
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
public String getQueueType() {
return queueType;
}
public void setQueueType(String queueType) {
this.queueType = queueType;
}
public int getQueueSize() {
return queueSize;
}
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
public int getRemainingCapacity() {
return remainingCapacity;
}
public void setRemainingCapacity(int remainingCapacity) {
this.remainingCapacity = remainingCapacity;
}
}
IRegistry + RedisRegistry
定义 & 实现 注册中心
Redis 存什么
线程池列表(List)
线程池配置(Bucket)
reportThreadPool() // 上报列表
reportThreadPoolConfigParameter // 上报配置
/**
* Redis 注册中心实现
*
* 作用:
* 1. 负责线程池数据的上报(供 Admin 查询)
* 2. 负责线程池配置的存储(供初始化和动态刷新使用)
*
* 设计说明:
* - 使用 Redisson 操作 Redis
* - 使用 List 存储线程池列表(整体数据)
* - 使用 Bucket 存储单个线程池配置(精确查询)
*/
public class RedisRegistry implements IRegistry {
/**
* Redisson 客户端(操作 Redis 的核心工具)
*/
private final RedissonClient redissonClient;
public RedisRegistry(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* 上报线程池列表数据(用于 Admin 页面展示)
*
* 流程:
* 1. 获取 Redis 中的线程池列表(RList)
* 2. 删除旧数据(避免脏数据)
* 3. 写入最新线程池信息
*
* Redis 结构:
* key: THREAD_POOL_CONFIG_LIST_KEY
* value: List<ThreadPoolConfigEntity>
*
* @param threadPoolEntities 当前应用所有线程池信息
*/
@Override
public void reportThreadPool(List<ThreadPoolConfigEntity> threadPoolEntities) {
// 获取 Redis List
RList<ThreadPoolConfigEntity> list =
redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey());
// 清空旧数据(全量覆盖)
list.delete();
// 写入最新线程池列表
list.addAll(threadPoolEntities);
}
/**
* 上报单个线程池配置(用于配置查询 + 初始化 + 动态刷新)
*
* 设计说明:
* - 每个线程池一个独立 key
* - key 由 应用名 + 线程池名 拼接,保证唯一性
* - 设置过期时间(防止脏数据长期存在)
*
* Redis 结构:
* key: THREAD_POOL_CONFIG_PARAMETER_LIST_KEY_appName_threadPoolName
* value: ThreadPoolConfigEntity
*
* 示例:
* THREAD_POOL_CONFIG_PARAMETER_LIST_KEY_order-service_threadPoolExecutor
*
* @param threadPoolConfigEntity 单个线程池配置
*/
@Override
public void reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity) {
// 构建唯一 key:配置前缀 + 应用名 + 线程池名
String cacheKey =
RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey()
+ "_"
+ threadPoolConfigEntity.getAppName()
+ "_"
+ threadPoolConfigEntity.getThreadPoolName();
// 获取 Redis Bucket(类似 key-value)
RBucket<ThreadPoolConfigEntity> bucket = redissonClient.getBucket(cacheKey);
// 写入数据,并设置过期时间(30天)
bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
}
}
RegistryEnumVO
public enum RegistryEnumVO {
THREAD_POOL_CONFIG_LIST_KEY("THREAD_POOL_CONFIG_LIST_KEY", "池化配置列表"),
THREAD_POOL_CONFIG_PARAMETER_LIST_KEY("THREAD_POOL_CONFIG_PARAMETER_LIST_KEY", "池化配置参数"),
DYNAMIC_THREAD_POOL_REDIS_TOPIC("DYNAMIC_THREAD_POOL_REDIS_TOPIC", "动态线程池监听主题配置");
private final String key;
private final String desc;
RegistryEnumVO(String key, String desc) {
this.key = key;
this.desc = desc;
}
public String getKey() {
return key;
}
public String getDesc() {
return desc;
}
}
定义 Redis Key
ThreadPoolDataReportJob(定时任务)
/**
* 线程池数据上报任务(定时任务)
*
* 核心作用:
* 1. 定时采集当前应用中的线程池运行状态
* 2. 将线程池信息上报到 Redis(注册中心)
* 3. 供 Admin 后台进行展示和管理
*
* 在整个系统中的位置:
* 【应用线程池】 → 【本任务】 → 【Redis】 → 【Admin展示】
*/
public class ThreadPoolDataReportJob {
// 日志记录器
private final Logger logger = LoggerFactory.getLogger(ThreadPoolDataReportJob.class);
// 线程池服务(负责获取线程池信息)
private final IDynamicThreadPoolService dynamicThreadPoolService;
// 注册中心(这里是 Redis 实现)
private final IRegistry registry;
/**
* 构造函数注入
*/
public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.registry = registry;
}
/**
* 定时任务:每 20 秒执行一次
*
* cron 表达式解释:
* 0/20 * * * * ?
* → 每 20 秒执行一次
*/
@Scheduled(cron = "0/20 * * * * ?")
public void execReportThreadPoolList() {
// 1. 获取当前应用中所有线程池的运行状态
List<ThreadPoolConfigEntity> threadPoolConfigEntities =
dynamicThreadPoolService.queryThreadPoolList();
// 2. 上报线程池列表(用于 Admin 展示整体列表)
registry.reportThreadPool(threadPoolConfigEntities);
logger.info("动态线程池,上报线程池信息:{}",
JSON.toJSONString(threadPoolConfigEntities));
// 3. 遍历每个线程池,单独上报详细配置
for (ThreadPoolConfigEntity threadPoolConfigEntity : threadPoolConfigEntities) {
// 上报单个线程池配置(用于查询详情 / 修改配置)
registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);
logger.info("动态线程池,上报线程池配置:{}",
JSON.toJSONString(threadPoolConfigEntity));
}
}
}
ThreadPoolConfigAdjustListener(🔥动态核心)
@Bean
public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
return new ThreadPoolConfigAdjustListener(dynamicThreadPoolService, registry);
}
/**
* 线程池配置变更监听器(核心组件🔥)
*
* 核心作用:
* 1. 监听 Redis Topic 中的线程池配置变更消息
* 2. 收到消息后,动态修改本地线程池参数
* 3. 修改完成后,将最新线程池状态重新上报到 Redis
*
* 在系统中的位置:
* Admin 修改配置
* ↓
* Redis Topic 发布消息
* ↓
* 当前 Listener 接收
* ↓
* 修改本地线程池(无需重启🔥)
* ↓
* 上报最新状态 → Admin 实时可见
*
* 本类 = 动态线程池的“执行者”
*/
public class ThreadPoolConfigAdjustListener implements MessageListener<ThreadPoolConfigEntity> {
// 日志组件
private Logger logger = LoggerFactory.getLogger(ThreadPoolConfigAdjustListener.class);
// 线程池服务(负责更新线程池)
private final IDynamicThreadPoolService dynamicThreadPoolService;
// 注册中心(Redis)
private final IRegistry registry;
/**
* 构造函数注入
*/
public ThreadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.registry = registry;
}
/**
* Redis Topic 消息监听方法
*
* @param charSequence topic 名称
* @param threadPoolConfigEntity 线程池配置变更数据
*/
@Override
public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {
// 1. 打印接收到的配置变更信息
logger.info("动态线程池,调整线程池配置。线程池名称:{} 核心线程数:{} 最大线程数:{}",
threadPoolConfigEntity.getThreadPoolName(),
threadPoolConfigEntity.getPoolSize(),
threadPoolConfigEntity.getMaximumPoolSize());
// 2. 更新本地线程池参数(核心操作🔥)
// 本质:调用 ThreadPoolExecutor.setXXX() 实现动态调整
dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);
// 3. 更新后,重新获取所有线程池状态
List<ThreadPoolConfigEntity> threadPoolConfigEntities =
dynamicThreadPoolService.queryThreadPoolList();
// 4. 上报最新线程池列表到 Redis(用于 Admin 展示)
registry.reportThreadPool(threadPoolConfigEntities);
// 5. 获取当前被修改的线程池的最新配置
ThreadPoolConfigEntity threadPoolConfigEntityCurrent =
dynamicThreadPoolService.queryThreadPoolConfigByName(
threadPoolConfigEntity.getThreadPoolName());
// 6. 上报该线程池最新配置(覆盖旧数据)
registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);
// 7. 记录日志
logger.info("动态线程池,上报线程池配置:{}",
JSON.toJSONString(threadPoolConfigEntity));
}
}
DynamicThreadPoolController(Admin)
@Slf4j
@RestController
@CrossOrigin("*") // 允许跨域访问(前端页面调用接口)
@RequestMapping("/api/v1/dynamic/thread/pool/")
public class DynamicThreadPoolController {
// Redis 客户端(用于读取数据 + 发布消息)
@Resource
public RedissonClient redissonClient;
/**
* 查询线程池列表
*
* 作用:
* 从 Redis 中获取所有线程池的运行状态(由定时任务上报)
*
* 数据来源链路:
* 业务应用线程池
* ↓
* ThreadPoolDataReportJob 定时上报
* ↓
* Redis List(THREAD_POOL_CONFIG_LIST_KEY)
* ↓
* 当前接口读取
*
* curl 示例:
* GET http://localhost:8089/api/v1/dynamic/thread/pool/query_thread_pool_list
*/
@RequestMapping(value = "query_thread_pool_list", method = RequestMethod.GET)
public Response<List<ThreadPoolConfigEntity>> queryThreadPoolList() {
try {
// 从 Redis 获取线程池列表(List 结构)
RList<ThreadPoolConfigEntity> cacheList =
redissonClient.getList("THREAD_POOL_CONFIG_LIST_KEY");
return Response.<List<ThreadPoolConfigEntity>>builder()
.code(Response.Code.SUCCESS.getCode())
.info(Response.Code.SUCCESS.getInfo())
.data(cacheList.readAll()) // 返回所有线程池信息
.build();
} catch (Exception e) {
log.error("查询线程池数据异常", e);
return Response.<List<ThreadPoolConfigEntity>>builder()
.code(Response.Code.UN_ERROR.getCode())
.info(Response.Code.UN_ERROR.getInfo())
.build();
}
}
/**
* 查询单个线程池配置
*
* 作用:
* 查询指定应用 + 指定线程池的详细配置(核心线程数、最大线程数等)
*
* 数据来源:
* Redis Bucket(单个线程池配置)
*
* key 结构:
* THREAD_POOL_CONFIG_PARAMETER_LIST_KEY_appName_threadPoolName
*
* curl 示例:
* GET http://localhost:8089/api/v1/dynamic/thread/pool/query_thread_pool_config?appName=xxx&threadPoolName=xxx
*/
@RequestMapping(value = "query_thread_pool_config", method = RequestMethod.GET)
public Response<ThreadPoolConfigEntity> queryThreadPoolConfig(
@RequestParam String appName,
@RequestParam String threadPoolName) {
try {
// 拼接 Redis Key
String cacheKey =
"THREAD_POOL_CONFIG_PARAMETER_LIST_KEY" + "_" + appName + "_" + threadPoolName;
// 从 Redis 获取线程池配置(Bucket 结构)
ThreadPoolConfigEntity threadPoolConfigEntity =
redissonClient.<ThreadPoolConfigEntity>getBucket(cacheKey).get();
return Response.<ThreadPoolConfigEntity>builder()
.code(Response.Code.SUCCESS.getCode())
.info(Response.Code.SUCCESS.getInfo())
.data(threadPoolConfigEntity)
.build();
} catch (Exception e) {
log.error("查询线程池配置异常", e);
return Response.<ThreadPoolConfigEntity>builder()
.code(Response.Code.UN_ERROR.getCode())
.info(Response.Code.UN_ERROR.getInfo())
.build();
}
}
/**
* 修改线程池配置(🔥核心入口)
*
* 作用:
* 接收前端修改请求,并通过 Redis 发布订阅机制通知各个服务实例更新线程池
*
* ⚠️ 注意:
* 这里“不会直接修改线程池”
* 而是 → 发布消息 → 由各服务的 Listener 处理
*
* 链路:
* 前端请求
* ↓
* Controller(这里)
* ↓
* Redis Topic 发布
* ↓
* 各应用 Listener 接收
* ↓
* 动态修改线程池(setCorePoolSize)
*
* curl 示例:
* POST http://localhost:8089/api/v1/dynamic/thread/pool/update_thread_pool_config
*/
@RequestMapping(value = "update_thread_pool_config", method = RequestMethod.POST)
public Response<Boolean> updateThreadPoolConfig(@RequestBody ThreadPoolConfigEntity request) {
try {
log.info("修改线程池配置开始 {} {} {}",
request.getAppName(),
request.getThreadPoolName(),
JSON.toJSONString(request));
// 获取 Redis Topic(按应用隔离)
RTopic topic = redissonClient.getTopic(
"DYNAMIC_THREAD_POOL_REDIS_TOPIC" + "_" + request.getAppName()
);
// 发布线程池配置变更消息(核心🔥)
topic.publish(request);
log.info("修改线程池配置完成 {} {}",
request.getAppName(),
request.getThreadPoolName());
return Response.<Boolean>builder()
.code(Response.Code.SUCCESS.getCode())
.info(Response.Code.SUCCESS.getInfo())
.data(true)
.build();
} catch (Exception e) {
log.error("修改线程池配置异常 {}", JSON.toJSONString(request), e);
return Response.<Boolean>builder()
.code(Response.Code.UN_ERROR.getCode())
.info(Response.Code.UN_ERROR.getInfo())
.data(false)
.build();
}
}
}
Application
创建 RedissonClient+启动入口
@SpringBootApplication
@Configurable
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
@Configuration
@EnableConfigurationProperties(RedisClientConfigProperties.class)
public static class RedisClientConfig {
@Bean("redissonClient")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) {
Config config = new Config();
// 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
config.setCodec(JsonJacksonCodec.INSTANCE);
config.useSingleServer()
.setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
.setConnectionPoolSize(properties.getPoolSize())
.setConnectionMinimumIdleSize(properties.getMinIdleSize())
.setIdleConnectionTimeout(properties.getIdleTimeout())
.setConnectTimeout(properties.getConnectTimeout())
.setRetryAttempts(properties.getRetryAttempts())
.setRetryInterval(properties.getRetryInterval())
.setPingConnectionInterval(properties.getPingInterval())
.setKeepAlive(properties.isKeepAlive())
;
return Redisson.create(config);
}
}
@Data
@ConfigurationProperties(prefix = "redis.sdk.config", ignoreInvalidFields = true)
public static class RedisClientConfigProperties {
/**
* host:ip
*/
private String host;
/**
* 端口
*/
private int port;
/**
* 账密
*/
private String password;
/**
* 设置连接池的大小,默认为64
*/
private int poolSize = 64;
/**
* 设置连接池的最小空闲连接数,默认为10
*/
private int minIdleSize = 10;
/**
* 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000
*/
private int idleTimeout = 10000;
/**
* 设置连接超时时间(单位:毫秒),默认为10000
*/
private int connectTimeout = 10000;
/**
* 设置连接重试次数,默认为3
*/
private int retryAttempts = 3;
/**
* 设置连接重试的间隔时间(单位:毫秒),默认为1000
*/
private int retryInterval = 1000;
/**
* 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查
*/
private int pingInterval = 0;
/**
* 设置是否保持长连接,默认为true
*/
private boolean keepAlive = true;
}
}
dynamic-thread-pool-test(测试工程)
模拟真实线程池运行
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)