搭建动态线程池组件工程骨架

搭一个父子 Maven 工程:

dynamic-thread-pool
└── dynamic-thread-pool-spring-boot-starter

父工程 dynamic-thread-pool 负责统一管理版本、依赖、插件、Java 版本。

子工程 dynamic-thread-pool-spring-boot-starter 是真正要做的组件包,也就是以后别人引入这个 starter,就能使用你的动态线程池能力。

目前 starter 下面分了几个包:

config:自动配置入口
domain:核心业务能力,比如线程池管理、线程池刷新
registry:注册中心,比如把线程池信息上报到 Redis/Nacos
trigger:触发器,比如定时任务、监听器、配置变更监听

关键文件分析

pom.xml

父工程使用:

<packaging>pom</packaging>

说明不是直接打 jar,而是作为聚合工程管理子模块。

👉 聚合工程 = 只负责“管理多个子模块”的工程,不写业务代码、不打 jar

同时引入了 Spring Boot 2.7.12,并统一管理了:

guava
commons-lang
commons-codec
fastjson
junit

guava

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
</dependency>

Google 出的 Java增强工具库,可以理解为:

👉 “Java 标准库不够用,我帮你补齐”


常用功能

  • 集合增强(List / Map / Set)
  • 缓存(Local Cache)
  • 限流(RateLimiter)
  • 字符串工具
  • Optional(早期)

commons-lang

<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
</dependency>

作用

👉 Java 基础增强工具类(字符串 / 对象 / 数组)

常用类

StringUtils
ObjectUtils
ArrayUtils
NumberUtils

commons-codec

<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
</dependency>

 作用

👉 编码 & 加密工具库


支持内容

  • Base64
  • MD5
  • SHA
  • URL 编码

fastjson

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
</dependency>

作用

👉 JSON 序列化 / 反序列化

String json = JSON.toJSONString(obj);

User user = JSON.parseObject(json, User.class);

junit

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
</dependency>

作用

👉 Java 单元测试框架

@Test
public void test() {
    assertEquals(2, 1 + 1);
}

dynamic-thread-pool-spring-boot-starter/pom.xml

这个是真正的 starter 模块。

引入了:

spring-boot-starter
spring-boot-autoconfigure
spring-boot-configuration-processor
spring-boot-starter-aop
redisson-spring-boot-starter

spring-boot-starter

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

作用

👉 Spring Boot 核心启动依赖(最基础的)

它会帮你引入:

Spring Core
Spring Context
日志系统(Logback)
自动配置基础能力

spring-boot-autoconfigure

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
</dependency>

作用

👉 实现自动装配的核心依赖

@Configuration
@ConditionalOnMissingBean
public class DynamicThreadPoolAutoConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        return new ThreadPoolExecutor(...);
    }
}

spring-boot-configuration-processor

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

作用

👉 生成配置提示(IDE 自动补全)

示例

你写一个配置类:

@ConfigurationProperties(prefix = "dynamic.thread.pool")
public class ThreadPoolProperties {
private int coreSize;
private int maxSize;
}

👉 在 application.yml 里:

dynamic:
thread:
pool:
core-size: 10

IDE 会自动提示!

spring-boot-starter-aop

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

作用

👉 面向切面编程(AOP)

redisson-spring-boot-starter

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>

作用

👉 Redis 分布式客户端(高级版)


 能干什么?

分布式锁
分布式 Map / Set
发布订阅(Pub/Sub)
配置共享

DynamicThreadPoolAutoConfig

public class DynamicThreadPoolAutoConfig {
}

但它的定位很重要。

后面它应该会变成自动配置入口,例如:

@Configuration
@EnableConfigurationProperties(DynamicThreadPoolProperties.class)
public class DynamicThreadPoolAutoConfig {

}

@EnableConfigurationProperties()👉 “把配置文件里的内容,自动绑定成一个 Java 对象,并交给 Spring 管理”

① 启用配置类

告诉 Spring:

这个类是一个配置类(Properties),你要帮我加载它


② 绑定配置文件

application.yml / application.properties 里的配置:

dynamic:
thread:
pool:
core-size: 10
max-size: 50

自动绑定到这个类:

@ConfigurationProperties(prefix = "dynamic.thread.pool")
public class DynamicThreadPoolProperties {

private int coreSize;
private int maxSize;

}

也就是说,用户项目只要引入 starter,Spring Boot 就能自动加载你的动态线程池组件。

DynamicThreadPoolAutoConfig

现在还是空类:

public class DynamicThreadPoolAutoConfig {
}

但它的定位很重要。

后面它应该会变成自动配置入口,例如:

@Configuration
@EnableConfigurationProperties(DynamicThreadPoolProperties.class)
public class DynamicThreadPoolAutoConfig {

}

也就是说,用户项目只要引入 starter,Spring Boot 就能自动加载你的动态线程池组件。


META-INF/spring.factories

现在内容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=

这个目前还不完整。

后面应该写成类似:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.bustack.middleware.dynamic.thread.pool.sdk.config.DynamicThreadPoolAutoConfig

否则你的 DynamicThreadPoolAutoConfig 不会被 Spring Boot 自动加载。

动态线程池实现

项目结构

starter(核心能力)
├── config        自动装配 + 配置
├── domain        线程池核心逻辑
├── registry      注册中心(Redis)
├── trigger       定时任务 + 监听器

admin(控制台)
├── controller    接口层

test(测试应用)
├── config        线程池定义
├── main          模拟任务运行

之前 DynamicThreadPoolAutoConfig 是空的

现在它做了几件关键事:

1. 创建 RedissonClient
2. 创建 RedisRegistry
3. 扫描 ThreadPoolExecutor
4. 初始化线程池配置(从 Redis 拉)
5. 创建 DynamicThreadPoolService
6. 启动定时任务
7. 注册 Redis Topic 监听器
@Configuration
@EnableConfigurationProperties(DynamicThreadPoolAutoProperties.class)
@EnableScheduling
public class DynamicThreadPoolAutoConfig {

    private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolAutoConfig.class);

    private String applicationName;

    @Bean("redissonClient")
    public RedissonClient redissonClient(DynamicThreadPoolAutoProperties properties) {
        Config config = new Config();
        // 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
        config.setCodec(JsonJacksonCodec.INSTANCE);

        config.useSingleServer()
                .setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
                .setPassword(properties.getPassword())
                .setConnectionPoolSize(properties.getPoolSize())
                .setConnectionMinimumIdleSize(properties.getMinIdleSize())
                .setIdleConnectionTimeout(properties.getIdleTimeout())
                .setConnectTimeout(properties.getConnectTimeout())
                .setRetryAttempts(properties.getRetryAttempts())
                .setRetryInterval(properties.getRetryInterval())
                .setPingConnectionInterval(properties.getPingInterval())
                .setKeepAlive(properties.isKeepAlive())
        ;

        RedissonClient redissonClient = Redisson.create(config);

        logger.info("动态线程池,注册器(redis)链接初始化完成。{} {} {}", properties.getHost(), properties.getPoolSize(), !redissonClient.isShutdown());

        return redissonClient;
    }

    @Bean
    public IRegistry redisRegistry(RedissonClient redissonClient) {
        return new RedisRegistry(redissonClient);
    }

    @Bean("dynamicThreadPollService")
    public DynamicThreadPoolService dynamicThreadPollService(ApplicationContext applicationContext, Map<String, ThreadPoolExecutor> threadPoolExecutorMap, RedissonClient redissonClient) {
        applicationName = applicationContext.getEnvironment().getProperty("spring.application.name");

        if (StringUtils.isBlank(applicationName)) {
            applicationName = "缺省的";
            logger.warn("动态线程池,启动提示。SpringBoot 应用未配置 spring.application.name 无法获取到应用名称!");
        }

        // 获取缓存数据,设置本地线程池配置
        Set<String> threadPoolKeys = threadPoolExecutorMap.keySet();
        for (String threadPoolKey : threadPoolKeys) {
            ThreadPoolConfigEntity threadPoolConfigEntity = redissonClient.<ThreadPoolConfigEntity>getBucket(RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey() + "_" + applicationName + "_" + threadPoolKey).get();
            if (null == threadPoolConfigEntity) continue;
            ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolKey);
            threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
            threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
        }

        return new DynamicThreadPoolService(applicationName, threadPoolExecutorMap);
    }

    @Bean
    public ThreadPoolDataReportJob threadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
        return new ThreadPoolDataReportJob(dynamicThreadPoolService, registry);
    }

    @Bean
    public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
        return new ThreadPoolConfigAdjustListener(dynamicThreadPoolService, registry);
    }

    @Bean(name = "dynamicThreadPoolRedisTopic")
    public RTopic threadPoolConfigAdjustListener(RedissonClient redissonClient, ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener) {
        RTopic topic = redissonClient.getTopic(RegistryEnumVO.DYNAMIC_THREAD_POOL_REDIS_TOPIC.getKey() + "_" + applicationName);
        topic.addListener(ThreadPoolConfigEntity.class, threadPoolConfigAdjustListener);
        return topic;
    }

}

DynamicThreadPoolAutoProperties

绑定配置文件(application.yml)

@ConfigurationProperties(prefix = "dynamic.thread.pool.config", ignoreInvalidFields = true)
public class DynamicThreadPoolAutoProperties {

    /** 状态;open = 开启、close 关闭 */
    private boolean enable;
    /** redis host */
    private String host;
    /** redis port */
    private int port;
    /** 账密 */
    private String password;
    /** 设置连接池的大小,默认为64 */
    private int poolSize = 64;
    /** 设置连接池的最小空闲连接数,默认为10 */
    private int minIdleSize = 10;
    /** 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000 */
    private int idleTimeout = 10000;
    /** 设置连接超时时间(单位:毫秒),默认为10000 */
    private int connectTimeout = 10000;
    /** 设置连接重试次数,默认为3 */
    private int retryAttempts = 3;
    /** 设置连接重试的间隔时间(单位:毫秒),默认为1000 */
    private int retryInterval = 1000;
    /** 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查 */
    private int pingInterval = 0;
    /** 设置是否保持长连接,默认为true */
    private boolean keepAlive = true;

    public boolean isEnable() {
        return enable;
    }

    public void setEnable(boolean enable) {
        this.enable = enable;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getPoolSize() {
        return poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public int getMinIdleSize() {
        return minIdleSize;
    }

    public void setMinIdleSize(int minIdleSize) {
        this.minIdleSize = minIdleSize;
    }

    public int getIdleTimeout() {
        return idleTimeout;
    }

    public void setIdleTimeout(int idleTimeout) {
        this.idleTimeout = idleTimeout;
    }

    public int getConnectTimeout() {
        return connectTimeout;
    }

    public void setConnectTimeout(int connectTimeout) {
        this.connectTimeout = connectTimeout;
    }

    public int getRetryAttempts() {
        return retryAttempts;
    }

    public void setRetryAttempts(int retryAttempts) {
        this.retryAttempts = retryAttempts;
    }

    public int getRetryInterval() {
        return retryInterval;
    }

    public void setRetryInterval(int retryInterval) {
        this.retryInterval = retryInterval;
    }

    public int getPingInterval() {
        return pingInterval;
    }

    public void setPingInterval(int pingInterval) {
        this.pingInterval = pingInterval;
    }

    public boolean isKeepAlive() {
        return keepAlive;
    }

    public void setKeepAlive(boolean keepAlive) {
        this.keepAlive = keepAlive;
    }

}

DynamicThreadPoolService(🔥核心业务)

管理所有线程池

public class DynamicThreadPoolService implements IDynamicThreadPoolService {

    private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolService.class);

    private final String applicationName;
    private final Map<String, ThreadPoolExecutor> threadPoolExecutorMap;

    public DynamicThreadPoolService(String applicationName, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
        this.applicationName = applicationName;
        this.threadPoolExecutorMap = threadPoolExecutorMap;
    }

    @Override
    public List<ThreadPoolConfigEntity> queryThreadPoolList() {
        Set<String> threadPoolBeanNames = threadPoolExecutorMap.keySet();
        List<ThreadPoolConfigEntity> threadPoolVOS = new ArrayList<>(threadPoolBeanNames.size());
        for (String beanName : threadPoolBeanNames) {
            ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(beanName);
            ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, beanName);
            threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
            threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
            threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
            threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
            threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
            threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
            threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());
            threadPoolVOS.add(threadPoolConfigVO);
        }
        return threadPoolVOS;
    }

    @Override
    public ThreadPoolConfigEntity queryThreadPoolConfigByName(String threadPoolName) {
        ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolName);
        if (null == threadPoolExecutor) return new ThreadPoolConfigEntity(applicationName, threadPoolName);

        // 线程池配置数据
        ThreadPoolConfigEntity threadPoolConfigVO = new ThreadPoolConfigEntity(applicationName, threadPoolName);
        threadPoolConfigVO.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
        threadPoolConfigVO.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
        threadPoolConfigVO.setActiveCount(threadPoolExecutor.getActiveCount());
        threadPoolConfigVO.setPoolSize(threadPoolExecutor.getPoolSize());
        threadPoolConfigVO.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
        threadPoolConfigVO.setQueueSize(threadPoolExecutor.getQueue().size());
        threadPoolConfigVO.setRemainingCapacity(threadPoolExecutor.getQueue().remainingCapacity());

        if (logger.isDebugEnabled()) {
            logger.info("动态线程池,配置查询 应用名:{} 线程名:{} 池化配置:{}", applicationName, threadPoolName, JSON.toJSONString(threadPoolConfigVO));
        }

        return threadPoolConfigVO;
    }

    @Override
    public void updateThreadPoolConfig(ThreadPoolConfigEntity threadPoolConfigEntity) {
        if (null == threadPoolConfigEntity || !applicationName.equals(threadPoolConfigEntity.getAppName())) return;
        ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolConfigEntity.getThreadPoolName());
        if (null == threadPoolExecutor) return;

        // 设置参数 「调整核心线程数和最大线程数」
        threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
        threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
    }

}

ThreadPoolConfigEntity

线程池数据模型

public class ThreadPoolConfigEntity {

    /**
     * 应用名称
     */
    private String appName;

    /**
     * 线程池名称
     */
    private String threadPoolName;

    /**
     * 核心线程数
     */
    private int corePoolSize;

    /**
     * 最大线程数
     */
    private int maximumPoolSize;

    /**
     * 当前活跃线程数
     */
    private int activeCount;

    /**
     * 当前池中线程数
     */
    private int poolSize;

    /**
     * 队列类型
     */
    private String queueType;

    /**
     * 当前队列任务数
     */
    private int queueSize;

    /**
     * 队列剩余任务数
     */
    private int remainingCapacity;

    public ThreadPoolConfigEntity() {
    }

    public ThreadPoolConfigEntity(String appName, String threadPoolName) {
        this.appName = appName;
        this.threadPoolName = threadPoolName;
    }

    public String getAppName() {
        return appName;
    }

    public String getThreadPoolName() {
        return threadPoolName;
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        this.maximumPoolSize = maximumPoolSize;
    }

    public int getActiveCount() {
        return activeCount;
    }

    public void setActiveCount(int activeCount) {
        this.activeCount = activeCount;
    }

    public int getPoolSize() {
        return poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public String getQueueType() {
        return queueType;
    }

    public void setQueueType(String queueType) {
        this.queueType = queueType;
    }

    public int getQueueSize() {
        return queueSize;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    public int getRemainingCapacity() {
        return remainingCapacity;
    }

    public void setRemainingCapacity(int remainingCapacity) {
        this.remainingCapacity = remainingCapacity;
    }

}

IRegistry + RedisRegistry

定义 & 实现 注册中心

Redis 存什么

线程池列表(List)
线程池配置(Bucket)

reportThreadPool()              // 上报列表
reportThreadPoolConfigParameter // 上报配置
/**
 * Redis 注册中心实现
 *
 * 作用:
 * 1. 负责线程池数据的上报(供 Admin 查询)
 * 2. 负责线程池配置的存储(供初始化和动态刷新使用)
 *
 * 设计说明:
 * - 使用 Redisson 操作 Redis
 * - 使用 List 存储线程池列表(整体数据)
 * - 使用 Bucket 存储单个线程池配置(精确查询)
 */
public class RedisRegistry implements IRegistry {

    /**
     * Redisson 客户端(操作 Redis 的核心工具)
     */
    private final RedissonClient redissonClient;

    public RedisRegistry(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

    /**
     * 上报线程池列表数据(用于 Admin 页面展示)
     *
     * 流程:
     * 1. 获取 Redis 中的线程池列表(RList)
     * 2. 删除旧数据(避免脏数据)
     * 3. 写入最新线程池信息
     *
     * Redis 结构:
     * key: THREAD_POOL_CONFIG_LIST_KEY
     * value: List<ThreadPoolConfigEntity>
     *
     * @param threadPoolEntities 当前应用所有线程池信息
     */
    @Override
    public void reportThreadPool(List<ThreadPoolConfigEntity> threadPoolEntities) {
        // 获取 Redis List
        RList<ThreadPoolConfigEntity> list =
                redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey());

        // 清空旧数据(全量覆盖)
        list.delete();

        // 写入最新线程池列表
        list.addAll(threadPoolEntities);
    }

    /**
     * 上报单个线程池配置(用于配置查询 + 初始化 + 动态刷新)
     *
     * 设计说明:
     * - 每个线程池一个独立 key
     * - key 由 应用名 + 线程池名 拼接,保证唯一性
     * - 设置过期时间(防止脏数据长期存在)
     *
     * Redis 结构:
     * key: THREAD_POOL_CONFIG_PARAMETER_LIST_KEY_appName_threadPoolName
     * value: ThreadPoolConfigEntity
     *
     * 示例:
     * THREAD_POOL_CONFIG_PARAMETER_LIST_KEY_order-service_threadPoolExecutor
     *
     * @param threadPoolConfigEntity 单个线程池配置
     */
    @Override
    public void reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity) {

        // 构建唯一 key:配置前缀 + 应用名 + 线程池名
        String cacheKey =
                RegistryEnumVO.THREAD_POOL_CONFIG_PARAMETER_LIST_KEY.getKey()
                        + "_"
                        + threadPoolConfigEntity.getAppName()
                        + "_"
                        + threadPoolConfigEntity.getThreadPoolName();

        // 获取 Redis Bucket(类似 key-value)
        RBucket<ThreadPoolConfigEntity> bucket = redissonClient.getBucket(cacheKey);

        // 写入数据,并设置过期时间(30天)
        bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
    }

}

RegistryEnumVO

public enum RegistryEnumVO {

    THREAD_POOL_CONFIG_LIST_KEY("THREAD_POOL_CONFIG_LIST_KEY", "池化配置列表"),
    THREAD_POOL_CONFIG_PARAMETER_LIST_KEY("THREAD_POOL_CONFIG_PARAMETER_LIST_KEY", "池化配置参数"),
    DYNAMIC_THREAD_POOL_REDIS_TOPIC("DYNAMIC_THREAD_POOL_REDIS_TOPIC", "动态线程池监听主题配置");

    private final String key;
    private final String desc;

    RegistryEnumVO(String key, String desc) {
        this.key = key;
        this.desc = desc;
    }

    public String getKey() {
        return key;
    }

    public String getDesc() {
        return desc;
    }


}

定义 Redis Key

ThreadPoolDataReportJob(定时任务)

/**
 * 线程池数据上报任务(定时任务)
 *
 * 核心作用:
 * 1. 定时采集当前应用中的线程池运行状态
 * 2. 将线程池信息上报到 Redis(注册中心)
 * 3. 供 Admin 后台进行展示和管理
 *
 * 在整个系统中的位置:
 * 【应用线程池】 → 【本任务】 → 【Redis】 → 【Admin展示】
 */
public class ThreadPoolDataReportJob {

    // 日志记录器
    private final Logger logger = LoggerFactory.getLogger(ThreadPoolDataReportJob.class);

    // 线程池服务(负责获取线程池信息)
    private final IDynamicThreadPoolService dynamicThreadPoolService;

    // 注册中心(这里是 Redis 实现)
    private final IRegistry registry;

    /**
     * 构造函数注入
     */
    public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
        this.dynamicThreadPoolService = dynamicThreadPoolService;
        this.registry = registry;
    }

    /**
     * 定时任务:每 20 秒执行一次
     *
     * cron 表达式解释:
     * 0/20 * * * * ?
     * → 每 20 秒执行一次
     */
    @Scheduled(cron = "0/20 * * * * ?")
    public void execReportThreadPoolList() {

        // 1. 获取当前应用中所有线程池的运行状态
        List<ThreadPoolConfigEntity> threadPoolConfigEntities =
                dynamicThreadPoolService.queryThreadPoolList();

        // 2. 上报线程池列表(用于 Admin 展示整体列表)
        registry.reportThreadPool(threadPoolConfigEntities);

        logger.info("动态线程池,上报线程池信息:{}",
                JSON.toJSONString(threadPoolConfigEntities));

        // 3. 遍历每个线程池,单独上报详细配置
        for (ThreadPoolConfigEntity threadPoolConfigEntity : threadPoolConfigEntities) {

            // 上报单个线程池配置(用于查询详情 / 修改配置)
            registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);

            logger.info("动态线程池,上报线程池配置:{}",
                    JSON.toJSONString(threadPoolConfigEntity));
        }

    }

}

ThreadPoolConfigAdjustListener(🔥动态核心)

 @Bean
    public ThreadPoolConfigAdjustListener threadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
        return new ThreadPoolConfigAdjustListener(dynamicThreadPoolService, registry);
    }
/**
 * 线程池配置变更监听器(核心组件🔥)
 *
 * 核心作用:
 * 1. 监听 Redis Topic 中的线程池配置变更消息
 * 2. 收到消息后,动态修改本地线程池参数
 * 3. 修改完成后,将最新线程池状态重新上报到 Redis
 *
 * 在系统中的位置:
 * Admin 修改配置
 *        ↓
 * Redis Topic 发布消息
 *        ↓
 * 当前 Listener 接收
 *        ↓
 * 修改本地线程池(无需重启🔥)
 *        ↓
 * 上报最新状态 → Admin 实时可见
 *
 * 本类 = 动态线程池的“执行者”
 */
public class ThreadPoolConfigAdjustListener implements MessageListener<ThreadPoolConfigEntity> {

    // 日志组件
    private Logger logger = LoggerFactory.getLogger(ThreadPoolConfigAdjustListener.class);

    // 线程池服务(负责更新线程池)
    private final IDynamicThreadPoolService dynamicThreadPoolService;

    // 注册中心(Redis)
    private final IRegistry registry;

    /**
     * 构造函数注入
     */
    public ThreadPoolConfigAdjustListener(IDynamicThreadPoolService dynamicThreadPoolService, IRegistry registry) {
        this.dynamicThreadPoolService = dynamicThreadPoolService;
        this.registry = registry;
    }

    /**
     * Redis Topic 消息监听方法
     *
     * @param charSequence topic 名称
     * @param threadPoolConfigEntity 线程池配置变更数据
     */
    @Override
    public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {

        // 1. 打印接收到的配置变更信息
        logger.info("动态线程池,调整线程池配置。线程池名称:{} 核心线程数:{} 最大线程数:{}",
                threadPoolConfigEntity.getThreadPoolName(),
                threadPoolConfigEntity.getPoolSize(),
                threadPoolConfigEntity.getMaximumPoolSize());

        // 2. 更新本地线程池参数(核心操作🔥)
        // 本质:调用 ThreadPoolExecutor.setXXX() 实现动态调整
        dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);

        // 3. 更新后,重新获取所有线程池状态
        List<ThreadPoolConfigEntity> threadPoolConfigEntities =
                dynamicThreadPoolService.queryThreadPoolList();

        // 4. 上报最新线程池列表到 Redis(用于 Admin 展示)
        registry.reportThreadPool(threadPoolConfigEntities);

        // 5. 获取当前被修改的线程池的最新配置
        ThreadPoolConfigEntity threadPoolConfigEntityCurrent =
                dynamicThreadPoolService.queryThreadPoolConfigByName(
                        threadPoolConfigEntity.getThreadPoolName());

        // 6. 上报该线程池最新配置(覆盖旧数据)
        registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);

        // 7. 记录日志
        logger.info("动态线程池,上报线程池配置:{}",
                JSON.toJSONString(threadPoolConfigEntity));
    }

}

DynamicThreadPoolController(Admin)

@Slf4j
@RestController
@CrossOrigin("*") // 允许跨域访问(前端页面调用接口)
@RequestMapping("/api/v1/dynamic/thread/pool/")
public class DynamicThreadPoolController {

    // Redis 客户端(用于读取数据 + 发布消息)
    @Resource
    public RedissonClient redissonClient;

    /**
     * 查询线程池列表
     *
     * 作用:
     * 从 Redis 中获取所有线程池的运行状态(由定时任务上报)
     *
     * 数据来源链路:
     * 业务应用线程池
     *        ↓
     * ThreadPoolDataReportJob 定时上报
     *        ↓
     * Redis List(THREAD_POOL_CONFIG_LIST_KEY)
     *        ↓
     * 当前接口读取
     *
     * curl 示例:
     * GET http://localhost:8089/api/v1/dynamic/thread/pool/query_thread_pool_list
     */
    @RequestMapping(value = "query_thread_pool_list", method = RequestMethod.GET)
    public Response<List<ThreadPoolConfigEntity>> queryThreadPoolList() {
        try {
            // 从 Redis 获取线程池列表(List 结构)
            RList<ThreadPoolConfigEntity> cacheList =
                    redissonClient.getList("THREAD_POOL_CONFIG_LIST_KEY");

            return Response.<List<ThreadPoolConfigEntity>>builder()
                    .code(Response.Code.SUCCESS.getCode())
                    .info(Response.Code.SUCCESS.getInfo())
                    .data(cacheList.readAll()) // 返回所有线程池信息
                    .build();

        } catch (Exception e) {
            log.error("查询线程池数据异常", e);
            return Response.<List<ThreadPoolConfigEntity>>builder()
                    .code(Response.Code.UN_ERROR.getCode())
                    .info(Response.Code.UN_ERROR.getInfo())
                    .build();
        }
    }

    /**
     * 查询单个线程池配置
     *
     * 作用:
     * 查询指定应用 + 指定线程池的详细配置(核心线程数、最大线程数等)
     *
     * 数据来源:
     * Redis Bucket(单个线程池配置)
     *
     * key 结构:
     * THREAD_POOL_CONFIG_PARAMETER_LIST_KEY_appName_threadPoolName
     *
     * curl 示例:
     * GET http://localhost:8089/api/v1/dynamic/thread/pool/query_thread_pool_config?appName=xxx&threadPoolName=xxx
     */
    @RequestMapping(value = "query_thread_pool_config", method = RequestMethod.GET)
    public Response<ThreadPoolConfigEntity> queryThreadPoolConfig(
            @RequestParam String appName,
            @RequestParam String threadPoolName) {

        try {
            // 拼接 Redis Key
            String cacheKey =
                    "THREAD_POOL_CONFIG_PARAMETER_LIST_KEY" + "_" + appName + "_" + threadPoolName;

            // 从 Redis 获取线程池配置(Bucket 结构)
            ThreadPoolConfigEntity threadPoolConfigEntity =
                    redissonClient.<ThreadPoolConfigEntity>getBucket(cacheKey).get();

            return Response.<ThreadPoolConfigEntity>builder()
                    .code(Response.Code.SUCCESS.getCode())
                    .info(Response.Code.SUCCESS.getInfo())
                    .data(threadPoolConfigEntity)
                    .build();

        } catch (Exception e) {
            log.error("查询线程池配置异常", e);
            return Response.<ThreadPoolConfigEntity>builder()
                    .code(Response.Code.UN_ERROR.getCode())
                    .info(Response.Code.UN_ERROR.getInfo())
                    .build();
        }
    }

    /**
     * 修改线程池配置(🔥核心入口)
     *
     * 作用:
     * 接收前端修改请求,并通过 Redis 发布订阅机制通知各个服务实例更新线程池
     *
     * ⚠️ 注意:
     * 这里“不会直接修改线程池”
     * 而是 → 发布消息 → 由各服务的 Listener 处理
     *
     * 链路:
     * 前端请求
     *        ↓
     * Controller(这里)
     *        ↓
     * Redis Topic 发布
     *        ↓
     * 各应用 Listener 接收
     *        ↓
     * 动态修改线程池(setCorePoolSize)
     *
     * curl 示例:
     * POST http://localhost:8089/api/v1/dynamic/thread/pool/update_thread_pool_config
     */
    @RequestMapping(value = "update_thread_pool_config", method = RequestMethod.POST)
    public Response<Boolean> updateThreadPoolConfig(@RequestBody ThreadPoolConfigEntity request) {

        try {
            log.info("修改线程池配置开始 {} {} {}",
                    request.getAppName(),
                    request.getThreadPoolName(),
                    JSON.toJSONString(request));

            // 获取 Redis Topic(按应用隔离)
            RTopic topic = redissonClient.getTopic(
                    "DYNAMIC_THREAD_POOL_REDIS_TOPIC" + "_" + request.getAppName()
            );

            // 发布线程池配置变更消息(核心🔥)
            topic.publish(request);

            log.info("修改线程池配置完成 {} {}",
                    request.getAppName(),
                    request.getThreadPoolName());

            return Response.<Boolean>builder()
                    .code(Response.Code.SUCCESS.getCode())
                    .info(Response.Code.SUCCESS.getInfo())
                    .data(true)
                    .build();

        } catch (Exception e) {
            log.error("修改线程池配置异常 {}", JSON.toJSONString(request), e);

            return Response.<Boolean>builder()
                    .code(Response.Code.UN_ERROR.getCode())
                    .info(Response.Code.UN_ERROR.getInfo())
                    .data(false)
                    .build();
        }
    }

}

Application

创建 RedissonClient+启动入口

@SpringBootApplication
@Configurable
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }

    @Configuration
    @EnableConfigurationProperties(RedisClientConfigProperties.class)
    public static class RedisClientConfig {

        @Bean("redissonClient")
        public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) {
            Config config = new Config();
            // 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
            config.setCodec(JsonJacksonCodec.INSTANCE);

            config.useSingleServer()
                    .setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
                    .setConnectionPoolSize(properties.getPoolSize())
                    .setConnectionMinimumIdleSize(properties.getMinIdleSize())
                    .setIdleConnectionTimeout(properties.getIdleTimeout())
                    .setConnectTimeout(properties.getConnectTimeout())
                    .setRetryAttempts(properties.getRetryAttempts())
                    .setRetryInterval(properties.getRetryInterval())
                    .setPingConnectionInterval(properties.getPingInterval())
                    .setKeepAlive(properties.isKeepAlive())
            ;

            return Redisson.create(config);
        }

    }


    @Data
    @ConfigurationProperties(prefix = "redis.sdk.config", ignoreInvalidFields = true)
    public static class RedisClientConfigProperties {
        /**
         * host:ip
         */
        private String host;
        /**
         * 端口
         */
        private int port;
        /**
         * 账密
         */
        private String password;
        /**
         * 设置连接池的大小,默认为64
         */
        private int poolSize = 64;
        /**
         * 设置连接池的最小空闲连接数,默认为10
         */
        private int minIdleSize = 10;
        /**
         * 设置连接的最大空闲时间(单位:毫秒),超过该时间的空闲连接将被关闭,默认为10000
         */
        private int idleTimeout = 10000;
        /**
         * 设置连接超时时间(单位:毫秒),默认为10000
         */
        private int connectTimeout = 10000;
        /**
         * 设置连接重试次数,默认为3
         */
        private int retryAttempts = 3;
        /**
         * 设置连接重试的间隔时间(单位:毫秒),默认为1000
         */
        private int retryInterval = 1000;
        /**
         * 设置定期检查连接是否可用的时间间隔(单位:毫秒),默认为0,表示不进行定期检查
         */
        private int pingInterval = 0;
        /**
         * 设置是否保持长连接,默认为true
         */
        private boolean keepAlive = true;
    }

}

dynamic-thread-pool-test(测试工程)

模拟真实线程池运行

xfg项目地址:KnowledgePlanet / dynamic-thread-pool · GitCode

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐