spring boot 项目引入xxl-job

1.确定版本

<dependency>
   <groupId>com.xuxueli</groupId>
   <artifactId>xxl-job-core</artifactId>
   <version>2.3.1</version>
</dependency>

我这边使用的版本是2.3.1

2.添加xxlJob配置:

以下代码支持容器化服务部署的服务,调用xxl-job

@Configuration
@Slf4j
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.enable}")
    private boolean enable;

    @Bean
  
@ConditionalOnProperty(name = "xxl.job.enable", havingValue = "true")
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        xxlJobSpringExecutor.setIp(getIpAddress());
        xxlJobSpringExecutor.setPort(port);
        return xxlJobSpringExecutor;
    }

    public String getIpAddress() {
        String ipAddress;
        if (isWindowsOS()) {
            ipAddress = getWindowsIP();
        } else {
            ipAddress = getLinuxLocalIp();
        }
        return ipAddress;
    }

    /**
     * 判断操作系统是否为windows系统
     */
    public boolean isWindowsOS() {
        boolean isWindowsOS = false;
        String osName = System.getProperty("os.name");
        if (osName.toLowerCase().contains("windows")) {
            isWindowsOS = true;
        }
        return isWindowsOS;
    }


    /**
     * 获取WindowsIP
     */
    private String getWindowsIP() {
        List<Inet4Address> addresses = new ArrayList<>(1);
        try {
            // 所有网络接口信息
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            if (ObjectUtils.isEmpty(networkInterfaces)) {
                return "127.0.0.1";
            }
            while (networkInterfaces.hasMoreElements()) {
                NetworkInterface networkInterface = networkInterfaces.nextElement();
                //滤回环网卡、点对点网卡、非活动网卡、虚拟网卡并要求网卡名字是eth或ens开头
                if (!isValidInterface(networkInterface)) {
                    continue;
                }

                // 所有网络接口的IP地址信息
                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress inetAddress = inetAddresses.nextElement();
                    // 判断是否是IPv4,并且内网地址并过滤回环地址.
                    if (isValidAddress(inetAddress)) {
                        addresses.add((Inet4Address) inetAddress);
                    }
                }
            }
            if (addresses.size() != 1) {
                final Optional<Inet4Address> ipBySocketOpt = getIpBySocket();
                if (ipBySocketOpt.isPresent()) {
                    return ipBySocketOpt.get().getHostAddress();
                } else {
                    return "127.0.0.1";
                }
            }
            return addresses.get(0).getHostAddress();
        } catch (Exception ignored) {
        }
        return "127.0.0.1";
    }

    /**
     * 过滤回环网卡、点对点网卡、非活动网卡、虚拟网卡并要求网卡名字是eth或ens开头
     *
     * @param ni 网卡
     * @return 如果满足要求则true,否则false
     */
    private static boolean isValidInterface(NetworkInterface ni) throws SocketException {
        return !ni.isLoopback() && !ni.isPointToPoint() && ni.isUp() && !ni.isVirtual()
                && (ni.getName().startsWith("eth") || ni.getName().startsWith("ens"));
    }

    /**
     * 判断是否是IPv4,并且内网地址并过滤回环地址.
     */
    private boolean isValidAddress(InetAddress address) {
        return address instanceof Inet4Address && address.isSiteLocalAddress() && !address.isLoopbackAddress();
    }

    /*
     * 通过Socket 唯一确定一个IP
     * 当有多个网卡的时候,使用这种方式一般都可以得到想要的IP。甚至不要求外网地址8.8.8.8是可连通的
     * @return Inet4Address>
     */
    private Optional<Inet4Address> getIpBySocket() throws SocketException {
        try (final DatagramSocket socket = new DatagramSocket()) {
            socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
            if (socket.getLocalAddress() instanceof Inet4Address) {
                return Optional.of((Inet4Address) socket.getLocalAddress());
            }
        } catch (UnknownHostException networkInterfaces) {
            throw new RuntimeException(networkInterfaces);
        }
        return Optional.empty();
    }

    /**
     * 获取LinuxIp
     */
    private static String getLinuxLocalIp() {
        String ip = "";
        try {
            for (Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements(); ) {
                NetworkInterface intf = en.nextElement();
                String name = intf.getName();
                if (!name.contains("docker") && !name.contains("lo")) {
                    for (Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses(); enumIpAddr.hasMoreElements(); ) {
                        InetAddress inetAddress = enumIpAddr.nextElement();
                        if (!inetAddress.isLoopbackAddress()) {
                            String ipaddress = inetAddress.getHostAddress().toString();
                            if (!ipaddress.contains("::") && !ipaddress.contains("0:0:") && !ipaddress.contains("fe80")
                                    && !ipaddress.startsWith("10.1")
                                    && !ipaddress.startsWith("10.10")) {// 忽略容器Canal IP
                                ip = ipaddress;
                            }
                        }
                    }
                }
            }
        } catch (Exception ex) {
            ip = "127.0.0.1";
            log.error("get linux localip failure!", ex);
        }
        return ip;
    }
}

3.建立任务代码:

@Slf4j
@Component
public class ProssSyncData {

    @Resource
    private IProssAccountService prossAccountService;
  
    @XxlJob(value = "SyncAccountJob")
    public void syncAccount() {
        ProssGetAccountsReq oneReq = new ProssGetAccountsReq();
        oneReq.setCurrent(1);
        oneReq.setPageSize(10000);
        oneReq.setUpdateTime(0L);
        List<AccountInfo> resultObjsForOne = ProssUtil.getAccounts(oneReq);
        batchInsertDataForAccount(resultObjsForOne);
    }
}

xxl.job.admin.addresses:   xxl -job地址

xxl.job.accessToken  xxl-job 访问token,xxl-job的启动配置有,一般默认default-token

xxl.job.executor.appname 执行器名称

xxl.job.executor.logretentiondays  日志保存时间,单位天

xxl.job.executor.ip  执行器的ip地址,就是你的服务ip地址。如果是自动的可以不配置

xxl.job.executor.port 执行器的端口,就是你的服务端口。如果是自动的可以不配置

4.准备工作:

a.部署好xxl-job-admin

b.在xxl-job-admin页面的执行器菜单栏创建执行器,名称要跟xxl-job-admin的java配置中

xxl.job.executor.appname 一致

c.在xxl-job-admin页面的任务管理菜单栏创建任务。此任务跟@XxlJob(value = "SyncAccountJob")的值要一致。

5.注意:如果要手动配置注册执行器。那需要将配置中的xxl.job.executor.ip和xxl.job.executor.port 均配上

GitHub 加速计划 / xx / xxl-job
27.15 K
10.79 K
下载
xxl-job: 是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。
最近提交(Master分支:3 个月前 )
e5d26ba2 - 3 个月前
977ad87b - 3 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐