ThreadPoolTaskExecutor有两种提交方法execute和submit:

无返回值的任务使用public void execute(Runnable command) 方法提交;

有返回值的任务使用public <T> Future<T> submit(Callable) 方法提交

下面具体来看下两者的应用以及区别。

一、与主线程执行顺序的区别:

1、(1)public void execute(Runnable command) 方法提交,子线程可能在主线程结束之后结束;

举例:

 @RequestMapping("/execute")
    public String execute(){
        System.out.println("进入方法");
        threadPoolTaskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20000);
                    System.out.println("sleep后");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("执行提交后");
        return "aa";
    }

请求后打印:

进入方法
执行提交后
sleep后

可见,由于子线程比较耗时,主线程结束后子线程还没有执行完。 

(2)public <T> Future<T> submit(Callable) 方法提交,因为提交任务后有个取数据的过程,在从Future取数据的过程中,Callable自带的阻塞机制,这个机制保证主线程一定在子线程结束之后结束。反之如果没有取数据,子线程可能会在主线程结束之后才结束。

举例说明:

package exceldemo.task;

import exceldemo.dto.User;
import exceldemo.service.UserService;

import java.util.List;
import java.util.concurrent.Callable;

public class UserTaskTest implements Callable<List<User>> {

    private List<Integer> ids;
    private UserService userService;

    public UserTaskTest(List<Integer> childIds, UserService userService) {
        System.out.println("构造");
        this.ids = childIds;
        this.userService = userService;
    }

    @Override
    public List<User> call() throws Exception {
        Thread.sleep(4000);
        System.out.println("执行");
        return userService.getByIds(ids);
    }
}

例a:submit提交任务之后没有取数据:

package exceldemo.rest;

import exceldemo.dto.User;
import exceldemo.service.UserService;
import exceldemo.task.UserTaskTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

@RestController
@RequestMapping("/order")
public class OrderTest {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private UserService userService;

 
    @RequestMapping("/submit")
    public List<User> submit(){
        List<Integer> ids = new ArrayList<>();
        for(int i = 0;i<=500;i++){
            ids.add(i);
        }

        //异步获取所有用户

        List<User> users = new ArrayList<>();
        List<Future> futures = new ArrayList<>();

        for (int i = 0; i < ids.size(); i += 100) {
            int startIndex = i;
            int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
            UserTaskTest task = new UserTaskTest(ids.subList(startIndex, endIndex),userService);
            Future<List<User>> future = threadPoolTaskExecutor.submit(task);
            System.out.println("加入futurn");
            futures.add(future);
        }
       
        System.out.println("返回结果"+users.size());
        return users;
    }
}

请求后后端打印:

构造
加入futurn
构造
加入futurn
构造
加入futurn
构造
加入futurn
构造
加入futurn
构造
加入futurn
返回结果0
执行
执行
执行
执行
执行
执行

 可以看到子线程比较耗时,主线程结束之后,子线程还没有执行完;

例b:submit提交任务之后取数据:

@RequestMapping("/submit")
    public List<User> submit(){
        List<Integer> ids = new ArrayList<>();
        for(int i = 0;i<=500;i++){
            ids.add(i);
        }

        //异步获取所有用户

        List<User> users = new ArrayList<>();
        List<Future> futures = new ArrayList<>();

        for (int i = 0; i < ids.size(); i += 100) {
            int startIndex = i;
            int endIndex = startIndex + 100 > ids.size() ? ids.size() : startIndex + 100;
            UserTaskTest task = new UserTaskTest(ids.subList(startIndex, endIndex),userService);
            Future<List<User>> future = threadPoolTaskExecutor.submit(task);
            System.out.println("加入futurn");
            futures.add(future);
        }
        //取数据
        try{
            System.out.println("获取数据");
            for(Future future : futures){
                System.out.println("获取数据内部");
                users.addAll((List<User>) future.get());
            }
        }catch (Exception e){

        }
        System.out.println("返回结果"+users.size());
        return users;
    }

请求后后端打印:

构造
加入futurn
构造
加入futurn
构造
加入futurn
构造
加入futurn
构造
加入futurn
构造
加入futurn
获取数据
获取数据内部
执行
执行
执行
执行
执行
获取数据内部
获取数据内部
获取数据内部
获取数据内部
获取数据内部
执行
返回结果501

可以看到,即使子线程比主线程耗时,主线程也等子线程结束后才结束。

这两个例子证明了使用submit提交任务,提交后只要有从Future取数据的操作,就可以保证主线程在子线程结束后才结束。

************************************************************分割线****************************************************************************

2、下面再举个完整的例子,在子线程同样耗时以及主线程执行步骤一样的情况下比较execute和submit这两种方法:

线程池:

package exceldemo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ThreadPoolTaskExecutorConfig {
    private static int CORE_POOL_SIZE = 5;
    private static int MAX_POOL_SIZE = 1000;
    @Bean(name="threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor serviceJobTaskExecutor(){
        ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
        //线程池维护线程的最少数量
        poolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        //线程池维护线程的最大数量
        poolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        //线程池所使用的缓冲队列
        poolTaskExecutor.setQueueCapacity(200);
        //线程池维护线程所允许的空闲时间
        poolTaskExecutor.setKeepAliveSeconds(30000);
        poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        System.out.println(poolTaskExecutor);
        return poolTaskExecutor;
    }
}

controller接口: 

package exceldemo.rest;

import exceldemo.task.MyOrderCallableTask;
import exceldemo.task.MyOrderRunnableTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@RestController
@RequestMapping("/myOrderDemo")
public class MyOrderDemo {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @RequestMapping("/execute")
    public void execute(){
        String str = "execute方法";
        threadPoolTaskExecutor.execute(new MyOrderRunnableTask(str));
        System.out.println("主线程调用结束");

    }

    @RequestMapping("/submit")
    public String submit(){
        String str = "submit方法";
        Future<String> future = threadPoolTaskExecutor.submit(new MyOrderCallableTask(str));
        try {
            str = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("主线程调用结束");
        return str;
    }
}

Callable实现类: 

package exceldemo.task;

import java.util.concurrent.Callable;

public class MyOrderCallableTask implements Callable<String> {

    private String name;

    public MyOrderCallableTask(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        name = "MyOrderCallableTask";
        System.out.println("MyOrderCallableTask已执行");
        return name;
    }
}

Runnable实现类: 

package exceldemo.task;

public class MyOrderRunnableTask implements Runnable{

    private String name;

    public MyOrderRunnableTask(String name){
        this.name = name;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        name = "MyOrderRunnableTask";
        System.out.println("MyOrderRunnableTask已执行");
    }
}

启动类:

package exceldemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


@SpringBootApplication
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
 
	}
}

 启动项目后,

请求execute接口:

后端打印:

主线程调用结束
MyOrderRunnableTask已执行

请求submit方法:

后端打印:

MyOrderCallableTask已执行
主线程调用结束

验证结束。这也和他们的功能是保持一致的。不需要返回结果,主线程就不需要等待子线程执行;需要返回结果,主线程肯定需要等所有的子线程结束后汇总结果。所以在调用的时候也需要注意:

(1)如果主线程调用了ThreadPoolTaskExecutor的execute提交任务,且传递了参数给子线程,并且子线程在修改这个参数,调用后主线程就不应该再使用这个参数,因为这个参数的值已经无法确定了;

(2)如果主线程调用了ThreadPoolTaskExecutor的submit提交任务,记得要在调用的逻辑后面,从Future里面把返回值取出来(调用Future的get方法),否则就和execute的效果一样了。

二、处理异常的区别:Callable执行call时遇到异常会抛出,而Runnable执行run时遇到异常并不会抛出。

    举例:

package com.demo.rest;

import com.demo.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @RequestMapping("/submit")
    public String submit(String param){

        param = userService.submit(param);
        return param;
    }

    @RequestMapping("/execute")
    public String execute(String param){
        String res = userService.execute(param);
        return res;
    }
}
package com.demo.service.impl;

import com.demo.dto.UserDTO;
import com.demo.mapper.UserMapper;
import com.demo.service.UserService;
import com.demo.task.UserCallableTask;
import com.demo.task.UserRunnableTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service("userService")
public class UserServiceImpl implements UserService {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private UserMapper userMapper;

    @Override
    public String submit(String param) {


        Future<String> future = threadPoolTaskExecutor.submit(new UserCallableTask(param));
        try {
            param =  future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return "error";
        } catch (ExecutionException e) {
            e.printStackTrace();
            return "error";
        }
        UserDTO user = new UserDTO();
        user.setName(param);
        userMapper.insert(user);
        return param;
    }

    @Override
    public String execute(String param) {
        threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
        return "success";
    }
}

task:

package com.demo.task;

import com.demo.dto.UserDTO;
import com.demo.mapper.UserMapper;

import java.util.concurrent.Callable;

public class UserCallableTask implements Callable<String> {

    private String param;

    public UserCallableTask (String param){
        this.param = param;
    }
    @Override
    public String call() throws Exception {
        param += "UserCallableTask";
        int a = 1/0;
        return param;
    }
}
package com.demo.task;

import com.demo.dto.UserDTO;
import com.demo.mapper.UserMapper;

public class UserRunnableTask implements Runnable {

    private String param;

    private UserMapper userMapper;

    public UserRunnableTask (String param,UserMapper userMapper){
        this.param = param;
        this.userMapper = userMapper;
    }

    @Override
    public void run() {
        param += "UserRunnableTask";
        UserDTO user = new UserDTO();
        user.setName(param);
        int a = 1/0;
        userMapper.insert(user);
    }
}

请求submit:

请求execute:

因为在两个task里面都加了异常1/0,所以请求这两个方法都不会往数据库插入数据。call方法抛出异常,service层捕获到后return就不再插入数据了,run方法自己遇到异常就终止了也不再往下执行。区别就再于二者对于异常的处理,调用sumbit方法执行时可以捕获异常,这样就可以自定义处理如把异常抛出给调用处(controller层),而execute的run方法遇到异常就自己终止了,主线程无法感知其运行成功与否。

有的人可能会想在调用execute时加上try...catch....,这个肯定是不可以的,这个try...catch...捕获的只是

threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));这个任务提交有没有异常,而这个任务和主线程是异步的,它实际执行的run方法主线程是捕获不到的。可以验证一下:
@Override
    @Transactional
    public String execute(String param) {
        try{
            threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
        }catch (Exception e){
            System.out.println("有异常");
            return "error";
        }

        return "success";
    }

请求:

验证确实没有捕获到。 

三、多线程与事务回滚:

上述,如果在事务中调用了多线程,submit遇到异常会抛出且必须被捕获,不会触发回滚,execute遇到异常主线程无法感知,也不会触发回滚。那如果需要在多线程调用时实现事务回滚该怎么做呢?这就需要加入其它的操作了:

1、submit方法与事务回滚:我们知道sumbit方法提交线程在获取返回结果时是需要捕获异常的,那么我们就可以在捕获到异常时手动回滚当前事务。

(1)主线程正常,子线程发生异常,只回滚主线程:这种情况比较简单,主线程捕获异常后直接TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();回滚主线程就可以了:

package com.demo.service.impl;

import com.demo.dto.UserDTO;
import com.demo.mapper.UserMapper;
import com.demo.service.UserService;
import com.demo.task.UserCallableTask;
import com.demo.task.UserRunnableTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Service("userService")
public class UserServiceImpl implements UserService {

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private UserMapper userMapper;

    @Override
    @Transactional
    public String submit(String param) {


        Future<String> future = threadPoolTaskExecutor.submit(new UserCallableTask(param,userMapper));
        UserDTO user = new UserDTO();
        user.setName("我是主线程");
        userMapper.insert(user);
        try {
            param =  future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return "error";
        } catch (ExecutionException e) {
            e.printStackTrace();
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            return "error";
        }

        return param;
    }

    @Override
    public String execute(String param) {
       // threadPoolTaskExecutor.execute(new UserRunnableTask(param,userMapper));
        return "success";
    }
}

任务: 

package com.demo.task;

import com.demo.dto.UserDTO;
import com.demo.mapper.UserMapper;

import java.util.concurrent.Callable;

public class UserCallableTask implements Callable<String> {

    private String param;

    private UserMapper userMapper;

    public UserCallableTask(String param, UserMapper userMapper){
        this.param = param;
        this.userMapper = userMapper;
    }
    @Override
    public String call() throws Exception {
        param += "UserCallableTask";
        UserDTO user = new UserDTO();
        user.setName("我是子线程");
        userMapper.insert(user);
        int a = 1/0;
        return param;
    }
}

请求

数据库没有主线程的数据插入:

说明主线程回滚成功。 

(2)、主线程或子线程异常,主线程、子线程全部回滚:同时回滚主线程和子线程,就需要把主线程和子线程放到同一个事务中。

说明主线程、子线程全部回滚成功。

2、execute方法:

四、 

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐