1 如何高效的定时群发所有的数据

课题内容

  1. 腾讯课堂、微信公众号定时消息推送实现原理
  2. 随着用户的增长,如何高效定时群发完所有数据
  3. 定时任务执行器如何实现动态的扩容与缩容
  4. 设计一套千万级定时消息推送平台

腾讯课堂公开课定时群发推送消息
假设第一天每特教育公开课订阅人数2万人,5分钟全部推送完毕,第二天每特教育公开课订阅人数增加到3万人,请问5分钟能够全部推送完毕吗?
思考:如何随着用户数的不断增加,消息推送要在规定的时间内群发完毕。

2 定时任务与业务逻辑一定要分离

定时任务监听最大的缺陷:
非常占用服务器的内存,底层一定是通过死循环实现。

定时任务代码是否能够和业务代码放入同一个jar包中部署?
稍微比较大的互联网公司业务代码和定时任务代码都是分开部署的。

定时任务模块为了提高效率集群部署,要注意保证幂等性。
定时任务集群执行的时候,如何保证数据的幂等性?
任务调度分片集群。

3 XXL-Job任务调度平台设计原理

在这里插入图片描述
分布式任务调度平台的原理:
将定时任务项目(执行器)服务ip和端口号手动注册到分布式任务调度平台的注册中心中,触发定时任务的时候先走分布式任务调度中心,先去注册中心获取执行器集群列表,采用负载均衡算法得到一个地址,再采用rpc通知执行器执行定时任务。
注意任务调度中心给每个执行器发rpc通知的时候都会带不同的参数。

4 SpringBoot整合XXL-Job

maven依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.11.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-job-core</artifactId>
        <version>2.1.2</version>
    </dependency>
</dependencies>

配置文件(resources目录下)
application.yml

# web port
server.port=8081

# log config
logging.config=classpath:logback.xml


### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### xxl-job executor address
xxl.job.executor.appname=mayikt-xxl-job
xxl.job.executor.ip=
xxl.job.executor.port=9999

### xxl-job, access token
xxl.job.accessToken=

### xxl-job log path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job log retention days
xxl.job.executor.logretentiondays=30

logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">

    <contextName>logback</contextName>
    <property name="log.path" value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n
            </pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="console"/>
        <appender-ref ref="file"/>
    </root>

</configuration>

配置类/任务类/启动类

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appName;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */
}
@Component
public class MyJob {

    @Value("${xxl.job.executor.port}")
    private String executorPort;

    @XxlJob("myJobHandler")
    public ReturnT<String> myJobHandler(String param) {
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        System.out.println
                (">>>myJobHandler触发<<<param:" + param +
                        ",executorPort:" + executorPort +
                        ",间隔时间:" + new Date() +
                        ",分片序号:" + shardingVO.getIndex());
        return ReturnT.SUCCESS;
    }
}
@SpringBootApplication
public class AppJob {
    public static void main(String[] args) {
        SpringApplication.run(AppJob.class);
    }
}

5 XXL-Job轮询机制底层执行方式

在这里插入图片描述

6 XXL-Job底层分片集群模式

任务选择路由策略为分片广播,每个执行器固定拿到一个分片序号,可以根据不同序号查询不同的数据库范围。支持实现动态扩容,直接新启动独立的服务即可。(注意修改两个端口号server.port-节点服务端口号、xxl.job.executor.port-job注册到任务调度平台端口号)
在这里插入图片描述

7 执行器如果宕机了,任务会丢失吗

执行器如果宕机,admin平台会报错连接执行器失败,但是任务不会丢失,底层会把每次失败的任务记录下来,等执行器启动的时候再一次性执行。

GitHub 加速计划 / xx / xxl-job
27.15 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:3 个月前 )
e5d26ba2 - 3 个月前
977ad87b - 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐