@EnableAsync的使用、进阶、源码分析
@EnableAsync使用
基础使用
使用@EnableAsync开启异步切面,然后在异步调用的方法上加上@Asyc注解即可
@SpringBootApplication
@EnableAsync //开启异步切面
public class SpringdemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringdemoApplication.class, args);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async //异步
@Override
public void invokeAsyncTest01() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!");
}
}
自定义异步注解
@Async注解是异步切面默认的异步注解,我们可以在@EnableAsync(annotation = AsyncCustom.class)开启异步切面时指定自定义的异步注解
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncCustom {
}
@SpringBootApplication
@EnableAsync(annotation = AsyncCustom.class)
public class SpringdemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringdemoApplication.class, args);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@AsyncCustom //异步
@Override
public void invokeAsyncTest01() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!");
}
}
@EnableAsync进阶
线程池配置
配置默认线程池
当@Async注解的value没有指定线程池名称时,将会使用此线程池,不手动设置默认线程池,系统也会给你创建一个默认线程池(详细流程请看 线程池获取优先级)。
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
/**
* 设置默认线程池
**/
@Override
public Executor getAsyncExecutor() {
//此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setThreadNamePrefix("CustomAsync-Test-");
return taskExecutor;
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public void invokeAsyncTest01() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!");
}
}
指定线程池 (建议,根据业务进行线程池隔离)
当@Async注解的value有指定线程池名称时,将会使用容器中beanname=此value值的Executor线程池
@Configuration
public class TaskExecutorConfig {
@Bean
public Executor deleteFileExecutor() {
//此处最好使用new ThreadPoolExecutor显示创建,SimpleAsyncTaskExecutor没有复用线程
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setThreadNamePrefix("delete-file-");
return taskExecutor;
}
@Bean
public Executor sendEmailExecutor() {
//此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setThreadNamePrefix("send-email-");
return taskExecutor;
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async("deleteFileExecutor")
@Override
public void deleteFile() {
System.out.println(Thread.currentThread() + "运行了deleteFile方法!");
}
@Async("sendEmailExecutor")
@Override
public void sendEmail() {
System.out.println(Thread.currentThread() + "运行了sendEmail方法!");
}
}
异步任务结果
只要是异步,一般都有可能用到需要返回结果的异步任务,当然@Async也支持异步结果返回,目前仅支持CompletableFuture、ListenableFuture、Future
CompletableFuture
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test02")
public void test02() {
CompletableFuture<String> completableFuture = asyncTestService.invokeAsyncTest02();
completableFuture.thenAccept(System.out::println);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public CompletableFuture<String> invokeAsyncTest02() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest02方法!");
return CompletableFuture.completedFuture("Hello world!");
}
}
ListenableFuture
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test03")
public void test03() {
ListenableFuture<String> stringListenableFuture = asyncTestService.invokeAsyncTest03();
stringListenableFuture.addCallback(System.out::println, System.out::println);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public ListenableFuture<String> invokeAsyncTest03() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest03方法!");
return new AsyncResult<String>("Hello World!");
}
}
Future
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test04")
public void test04() throws ExecutionException, InterruptedException {
Future<String> future = asyncTestService.invokeAsyncTest04();
String str = future.get();
System.out.println(str);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public Future<String> invokeAsyncTest04() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest04方法!");
return new AsyncResult<>("Hello World!");
}
}
Future、ListenableFuture、CompletableFuture区别
-
Future为异步任务调用的结果
-
ListenableFuture继承了Future,所以也为异步任务调用的结果,但是ListenableFuture还阔以添加两个回调函数,成功回调和异常回调
-
CompletableFuture也继承了Future,所以也为异步任务调用的结果,但是CompletableFuture阔以对异步任务进行编排
异常处理器
当返回值是Future及其子类
此时,如果异步任务在执行时抛出异常时,异常先会存储在Future中并记录状态,当正真调用future.get()等获取结果函数时才会抛出异常。
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test04")
public void test04() throws ExecutionException, InterruptedException {
Future<String> future = asyncTestService.invokeAsyncTest04();
//此时当当前线程获取结果时 才会抛出异常
String str = future.get();
System.out.println(str);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public Future<String> invokeAsyncTest04() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest04方法!");
if(true){
throw new IllegalArgumentException("Hello sendEmailExecutor Exception!");
}
return new AsyncResult<>("Hello World!");
}
}
当返回值是非Future
返回类型非Future时,任务发生异常将会调用异常处理器处理异常。异常处理器阔以AsyncConfigurer 实现类的getAsyncUncaughtExceptionHandler方法手动设置,如果未设置异常处理器,系统将会给你创建一个默认的SimpleAsyncUncaughtExceptionHandler异常处理器,此默认异常处理器异常处理器只对异常进行了日志输出
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setThreadNamePrefix("CustomAsync-Test-");
return taskExecutor;
}
/**
* 当异步任务调用出现时将会调用此异常处理器 可在此记录日志,补偿机制等
**/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
System.err.println("Unexpected exception occurred invoking async method: " + method + ":" + ex.getMessage());
};
}
}
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test06")
public void test06() {
asyncTestService.invokeAsyncTest06();
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public void invokeAsyncTest06() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest06方法!");
throw new IllegalArgumentException("Hello Exception!");
}
}
扩展异常处理器
原因
博主通过源码发现,异常处理器只能设置一个,且后续所有@Async使用的线程池全都只有走我们设置的默认异常处理器,如果我们根据业务划分了线程池,不同线程池的异常想走不同的处理逻辑,就只有在我们手动设置的异常处理器中进行逻辑判断,非常的不优雅。
博主的解决方案
- 扩展@Async注解,添加exceptionHandler属性指定异常处理器AsyncUncaughtExceptionHandler 的容器名
- 在设置AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器
- 如果能在容器找到给定容器名称的异常处理器,就走此异常处理器
- 如果不能找到给定容器名称的处理器,就走默认异常处理器
- 如果没有设置@Async的exceptionHandler属性值,也走默认异常处理器
方案实现
扩展@Async注解,添加@JokerAsync继承@Async,添加exceptionHandler属性
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Async
public @interface JokerAsync {
@AliasFor(annotation = Async.class)
String value() default "";
String exceptionHandler() default "";
}
把AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器
@Slf4j
@Component
public class AsyncConfig implements AsyncConfigurer {
@Autowired(required = false)
private Map<String, AsyncUncaughtExceptionHandler> exceptionHandlerMap = new HashMap<>();
private final AsyncUncaughtExceptionHandler defaultExceptionHandler = new SimpleAsyncUncaughtExceptionHandler();
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
String qualifier = getExceptionHandlerQualifier(method);
AsyncUncaughtExceptionHandler exceptionHandler = null;
if (Objects.nonNull(qualifier) && qualifier.length() > 0) {
exceptionHandler = exceptionHandlerMap.get(qualifier);
}
if (Objects.isNull(exceptionHandler)) {
exceptionHandler = defaultExceptionHandler;
}
exceptionHandler.handleUncaughtException(ex, method, params);
};
}
protected String getExceptionHandlerQualifier(Method method) {
JokerAsync async = AnnotatedElementUtils.findMergedAnnotation(method, JokerAsync.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), JokerAsync.class);
}
return (async != null ? async.exceptionHandler() : null);
}
}
测试示例代码
@Slf4j
@Component
public class DeleteFileAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("DeleteFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: " + method, ex);
}
}
@Slf4j
@Component
public class SendFileAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("SendFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: " + method, ex);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@JokerAsync(exceptionHandler = "deleteFileAsyncUncaughtExceptionHandler")
@Override
public void deleteFile() {
System.out.println(Thread.currentThread() + "运行了deleteFile方法!");
throw new IllegalArgumentException("Hello deleteFileExecutor Exception!");
}
@JokerAsync(exceptionHandler = "sendFileAsyncUncaughtExceptionHandler")
@Override
public void sendEmail() {
System.out.println(Thread.currentThread() + "运行了sendEmail方法!");
throw new IllegalArgumentException("Hello sendEmailExecutor Exception!");
}
}
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/sendEmail")
public void sendEmail() {
asyncTestService.sendEmail();
}
@GetMapping("/deleteFile")
public void deleteFile() {
asyncTestService.deleteFile();
}
}
结果如下:不同的业务走不同的异常处理器
源码分析
首先咱们从@EnableAsync入口开始看起
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
//使用@Import 导入AsyncConfigurationSelector类到容器中
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//自定义异步注解
Class<? extends Annotation> annotation() default Annotation.class;
//JDK代理 还是 CGLIB代理
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
注意使用@Import注解导入的一般会实现ImportSelector 接口,则ImportSelector 中的selectImports方法返回的类的完全限定名数组中的类会被加入到容器中;如果是实现了ImportBeanDefinitionRegistrar接口,则会调用registerBeanDefinitions的方法
public interface ImportSelector {
String[] selectImports(AnnotationMetadata importingClassMetadata);
@Nullable
default Predicate<String> getExclusionFilter() {
return null;
}
}
public interface ImportBeanDefinitionRegistrar {
default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,
BeanNameGenerator importBeanNameGenerator) {
registerBeanDefinitions(importingClassMetadata, registry);
}
default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
}
}
继续看@EnableAsync使用@Import导入的AsyncConfigurationSelector类
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
//@EnableAsync mode属性默认为AdviceMode.PROXY
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
看哈AsyncConfigurationSelector的父类AdviceModeImportSelector
/**
* 由于该类实现ImportSelector接口 所以会调用selectImports方法
**/
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";
protected String getAdviceModeAttributeName() {
return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
}
//importingClassMetadata 是加了@Import注解的类的元信息
@Override
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}
//得到加了@Import注解类上的mode属性值
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
//模板方法 调用子类实现的selectImports方法得到需要导入到Spring容器中的类的
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}
@Nullable
protected abstract String[] selectImports(AdviceMode adviceMode);
}
由于@EnableAsync mode属性默认为AdviceMode.PROXY ,所以ProxyAsyncConfiguration类将会导入容器继续点进去看
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
//把异步后置处理器放入容器中
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//异步后置处理器
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//把线程池和异常处理器放到后置处理器中
bpp.configure(this.executor, this.exceptionHandler);
//得到@EnableAsync中annotation的注解
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
//自定义注解不等于默认值时 把自定义异步注解放入后置处理器中
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//设置动态代理方式
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
看哈ProxyAsyncConfiguration 的父类AbstractAsyncConfiguration
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableAsync;
@Nullable
protected Supplier<Executor> executor;
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
//importMetadata 是加了@Import注解的类的元信息
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
//@EnableAsync的注解属性设置给enableAsync属性
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}
/**
* 配置默认线程池 默认异常处理器
**/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
}
public interface AsyncConfigurer {
//配置异步线程池
@Nullable
default Executor getAsyncExecutor() {
return null;
}
//配置异步异常处理器
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
上述代码表明 把@EnableAsync注解的属性解析了设置到了AsyncAnnotationBeanPostProcessor后置处理器中,还有AsyncConfigurer配置的线程池和异常处理器也设置到了后置处理中,现在我们继续看AsyncAnnotationBeanPostProcessor后置处理器的代码
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
//默认线程池
@Nullable
private Supplier<Executor> executor;
//异常处理器
@Nullable
private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
//异步注解
@Nullable
private Class<? extends Annotation> asyncAnnotationType;
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
public void configure(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
this.executor = executor;
this.exceptionHandler = exceptionHandler;
}
public void setExecutor(Executor executor) {
this.executor = SingletonSupplier.of(executor);
}
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
}
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
this.asyncAnnotationType = asyncAnnotationType;
}
/**
* 由于父类实现了BeanFactoryAware接口 在实例初始化时会被调用
**/
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
/**
* Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。
* PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强);
* Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。
**/
//我们阔以看到此处创建了一个通知器 把线程池和异常处理器传进去
AsyncAnnotation advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
//把类工厂传入通知器中
advisor.setBeanFactory(beanFactory);
//把通知器赋给本类的成员变量
this.advisor = advisor;
}
}
上诉代码主要是把增强的advisor 类创建好并复制给了本类成员变量,
下面我们继续看此类的父类AbstractAdvisingBeanPostProcessor,应为此类实现了BeanPostProcessor 接口,所以初始化完后肯定会调用postProcessAfterInitialization方法
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
@Nullable
protected Advisor advisor;
protected boolean beforeExistingAdvisors = false;
private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);
public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
this.beforeExistingAdvisors = beforeExistingAdvisors;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//如果被代理过 直接把Advisor加入到代理里中的Advisor列表中
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
//如果没被代理过但是需要被代理的类 创建代理并直接加入到增强Advisor加入的Advisor列表中,并返回代理类
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
return proxyFactory.getProxy(classLoader);
}
// No proxy needed.
return bean;
}
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}
//判断此类是否需要代理
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);
proxyFactory.setTarget(bean);
return proxyFactory;
}
protected void customizeProxyFactory(ProxyFactory proxyFactory) {
}
}
上述代码可以知道,只是把增强的advisor 放入代理类中,所以我们只需要看advisor 中的增强方法就知道增强的代码逻辑。我们来看advisor 成员的实现类AsyncAnnotationAdvisor,而AsyncAnnotationAdvisor是Advisor的实现类。而Advisor实现类一般会包含一般里面持有一个Advice和一个PointCut类,而Advice的子类MethodInterceptor的invoke方法就是代理的主要增强代码实现的地方
* Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。
* PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强);
* Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;
public AsyncAnnotationAdvisor() {
this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
}
public AsyncAnnotationAdvisor(
@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
}
@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知实现
this.advice = buildAdvice(executor, exceptionHandler);
//切入点实现
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
asyncAnnotationTypes.add(asyncAnnotationType);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
@Override
public Advice getAdvice() {
return this.advice;
}
@Override
public Pointcut getPointcut() {
return this.pointcut;
}
/**
* 通知的实现类
**/
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//核心通知类
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
}
上面代码可以知道核心通知的实现类是AnnotationAsyncExecutionInterceptor,那就继续AnnotationAsyncExecutionInterceptor代码
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
super(defaultExecutor, exceptionHandler);
}
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
}
没有看到我们需要的invoke方法,继续看父类AsyncExecutionInterceptor
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}
public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
super(defaultExecutor, exceptionHandler);
}
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//通过方法上的@Async注解里的value参数 value参数就是线程池Executor放入Spring容器的名称 ********
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//把任务调用封装成callable方法 ****************
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
//如果出现了异常 走异常处理器
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//把callable 线程池 和 方法返回类型一同传到doSubmit方法 *************
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
@Nullable
protected String getExecutorQualifier(Method method) {
return null;
}
/**
* 如果从父类方法获取不到线程池 就返回一个默认线程池new SimpleAsyncTaskExecutor()
**/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
我们会发现获取线程池方法和正真调用方法的doSubmit方法都是在父类AsyncExecutionAspectSupport中,继续看AsyncExecutionAspectSupport代码
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
protected final Log logger = LogFactory.getLog(getClass());
private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
private SingletonSupplier<Executor> defaultExecutor;
private SingletonSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;
@Nullable
private BeanFactory beanFactory;
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
//默认线程池
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//默认异常处理器
this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
}
public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
//默认线程池
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//默认异常处理器
this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
}
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认线程池
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//默认异常处理器
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
public void setExecutor(Executor defaultExecutor) {
this.defaultExecutor = SingletonSupplier.of(defaultExecutor);
}
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
/**
* 获取@Async注释方法使用的线程池
**/
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//先从缓存中取
AsyncTaskExecutor executor = this.executors.get(method);
//没有在从容器中找
if (executor == null) {
Executor targetExecutor;
//得到此方法中@Async属性value的值 即 容器中线程池的Bean名称
String qualifier = getExecutorQualifier(method);
//如果设置了value值 就从容器中获取
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
//如果没有设置value值 就获取AsyncConfigurer配置的默认线程池 如果没有就从容器中获取TaskExecutor的实现类,如果有多个TaskExecutor实现类,就取容器bean名称为“taskExecutor”的容Bean类
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
//放入缓存中
this.executors.put(method, executor);
}
return executor;
}
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
//先获取容器中TaskExecutor的实现类
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskExecutor bean. " +
"Continuing search for an Executor bean named 'taskExecutor'", ex);
try {
//如果有多个就取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
//如果容器中没有TaskExecutor的实现类 取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskExecutor bean. " +
"Continuing search for an Executor bean named 'taskExecutor'", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
//走完所有都没取到 线程池 那么就返回null 子类中会判断如果返回null 将new出一个默认线程池
return null;
}
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//如果返回类型是CompletableFuture及其子类
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
//如果返回类型是ListenableFuture及其子类
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
//如果返回类型是Future及其子类
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
//如果返回类型是其他
else {
executor.submit(task);
return null;
}
}
/**
* 异常处理器
**/
protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
//如果返回类型是Future及其子类 发生异常 则直接丢出异常
if (Future.class.isAssignableFrom(method.getReturnType())) {
ReflectionUtils.rethrowException(ex);
}
//否则 则走异常处理器
else {
// Could not transmit the exception to the caller with default executor
try {
this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
}
catch (Throwable ex2) {
logger.warn("Exception handler for async method '" + method.toGenericString() +
"' threw unexpected exception itself", ex2);
}
}
}
}
到此为止,源码已经分析的差不多了,我们阔以得出几个重点:
- AsyncConfigurer实现类可以设置默认线程池和默认异常处理器
- @Async的value是支持指定线程池
- @Async是支持全局异常处理器
- @Async注解的方法是可以支持返回类型为CompletableFuture、ListenableFuture、Future
总结
线程池获取优先级
当@Async中value值没有指定线程池
- 首先使用 AsyncConfigurer类中配置的默认线程池
- 如果没有配置AsyncConfigurer类,那么将使用容器TaskExecutor的实现类
- 如果容器中有多个TaskExecutor个实现类,将会使用beanname="taskExecutor"的Executor实现类
- 如果容器中没有有TaskExecutor实现类,将会使用beanname="taskExecutor"的Executor实现类
- 如果容器中没有beanname="taskExecutor"的Executor实现类,将会new出一个SimpleAsyncTaskExecutor线程池
/最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程
当@Async中value值指定了线程池beanname,可以根据业务进行线程池级别隔离
- 取出容器中beanname=(@Async注解value值)的Executor实现类
如果没有取到相应的线程池,比如beanname写错导致取不到相应线程池将会抛出异常
异常处理器
返回类型为Future及其子类时
- 直接抛出异常
返回类型不是Future及其子类
- 当AsyncConfigurer设置了默认异常处理器,则走此异常处理器
- 如果没有设置AsyncConfigurer异常处理器,则走SimpleAsyncUncaughtExceptionHandler异常处理器,此处理器是直接打印日志
方法返回类型
CompletableFuture及其子类
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
Callable<Object> task = () -> {
try {
//@Async注释的方法调用
Object result = invocation.proceed();
//如果是Future类型 调用get获取结果值
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
if (CompletableFuture.class.isAssignableFrom(returnType)) {
//@Async注释的方法返回类型如果为CompletableFuture及其子类
//就使用线程池执行并封装成CompletableFuture返回
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
ListenableFuture及其子类
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
Callable<Object> task = () -> {
try {
//@Async注释的方法调用
Object result = invocation.proceed();
//如果是Future类型 调用get获取结果值
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
if (ListenableFuture.class.isAssignableFrom(returnType)) {
//@Async注释的方法返回类型如果为ListenableFuture及其子类
//就使用线程池执行并返回ListenableFuture
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
注意ListenableFuture.addCallback()添加回调函数时,如果异步任务还未执行完成,则回调函数由异步任务线程执行,如果异步任务已经执行完成,则是当前掉addCallback函数的线程调用回调函数
Future及其子类
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
Callable<Object> task = () -> {
try {
//@Async注释的方法调用
Object result = invocation.proceed();
//如果是Future类型 调用get获取结果值
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
if (Future.class.isAssignableFrom(returnType)) {
//@Async注释的方法返回类型如果为Future及其子类
//就使用线程池执行并返回Future
return executor.submit(task);
}
其他
源码分析
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
Callable<Object> task = () -> {
try {
//@Async注释的方法调用
Object result = invocation.proceed();
//如果是Future类型 调用get获取结果值
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit
//@Async注释的方法返回类型如果是非Future
//使用线程池执行后 直接返回null
executor.submit(task);
return null;
当返回值为void时无返回值示例
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test05")
public void test05() {
asyncTestService.invokeAsyncTest05();
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public void invokeAsyncTest05() {
System.out.println(Thread.currentThread() + "运行了invokeAsyncTest05方法!");
}
}
当返回值为非Futute类型示例
- 返回的结果为空,如果要异步结果,请用Future封装返回结果
@RestController
@RequestMapping("/testasync")
public class TestAsyncController {
@Autowired
private AsyncTestService asyncTestService;
@GetMapping("/test07")
public void test07() {
//永远为null 如果要异步结果 请用Future封装返回结果
List<String> result = asyncTestService.invokeAsyncTest07();
System.out.println(result);
}
}
@Service
public class AsyncTestServiceImpl implements AsyncTestService {
@Async
@Override
public List<String> invokeAsyncTest07() {
System.out.println(Thread.currentThread() + "invokeAsyncTest07!");
List<String> result = Arrays.asList("Hello World1", "Hello World2");
return result;
}
}
思考
- 容器中beanname="taskExecutor"的默认线程池是何时注入容器中的
- ListenableFuture的子类ListenableFutureTask的addCallback()添加的回调函数是被哪个线程调用的
更多推荐
所有评论(0)