通過之前三篇關于Spring Boot異步任務實現的博文,,我們分別學會了用@Async創(chuàng)建異步任務、為異步任務配置線程池,、使用多個線程池隔離不同的異步任務,。今天這篇,我們繼續(xù)對上面的知識進行完善和優(yōu)化,! 如果你已經看過上面幾篇內容并已經掌握之后,,一起來思考下面這個問題:
場景重現我們先來把上面的假設用代碼實現一下: 第一步:創(chuàng)建Spring Boot應用,,根據上面的假設寫好線程池配置,。 @EnableAsync @SpringBootApplication public class Chapter78Application { public static void main(String[] args) { SpringApplication.run(Chapter78Application.class, args); } @EnableAsync @Configuration class TaskPoolConfig { @Bean public Executor taskExecutor1() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(2); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("executor-1-"); return executor; } } } 第二步:用 @Slf4j @Component public class AsyncTasks { public static Random random = new Random(); @Async("taskExecutor1") public CompletableFuturedoTaskOne(String taskNo) throws Exception { log.info("開始任務:{}", taskNo); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("完成任務:{},耗時:{} 毫秒", taskNo, end - start); return CompletableFuture.completedFuture("任務完成"); } } 第三步:編寫測試用例 @Slf4j @SpringBootTest public class Chapter78ApplicationTests { @Autowired private AsyncTasks asyncTasks; @Test public void test2() throws Exception { // 線程池配置:core-2,max-2,queue=2,,同時有5個任務,,出現下面異常: // org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2, // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9 long start = System.currentTimeMillis(); // 線程池1 CompletableFuturetask1 = asyncTasks.doTaskOne("1"); CompletableFuturetask2 = asyncTasks.doTaskOne("2"); CompletableFuturetask3 = asyncTasks.doTaskOne("3"); CompletableFuturetask4 = asyncTasks.doTaskOne("4"); CompletableFuturetask5 = asyncTasks.doTaskOne("5"); // 一起執(zhí)行 CompletableFuture.allOf(task1, task2, task3, task4, task5).join(); long end = System.currentTimeMillis(); log.info("任務全部完成,總耗時:" + (end - start) + "毫秒"); } } 執(zhí)行一下,,可以類似下面這樣的日志信息: 2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-2] com.didispace.chapter78.AsyncTasks : 開始任務:2 2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-1] com.didispace.chapter78.AsyncTasks : 開始任務:1 org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324) at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604) at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830) at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274) at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne() at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at java.util.ArrayList.forEach(ArrayList.java:1255) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321) ... 74 more 從異常信息 所有,默認情況下,,線程池的拒絕策略是:當線程池隊列滿了,,會丟棄這個任務,,并拋出異常。 配置拒絕策略雖然線程池有默認的拒絕策略,,但實際開發(fā)過程中,,有些業(yè)務場景,直接拒絕的策略往往并不適用,,有時候我們可能會選擇舍棄最早開始執(zhí)行而未完成的任務,、也可能會選擇舍棄剛開始執(zhí)行而未完成的任務等更貼近業(yè)務需要的策略。所以,,為線程池配置其他拒絕策略或自定義拒絕策略是很常見的需求,,那么這個要怎么實現呢? 下面就來具體說說今天的正題,,如何為線程池配置拒絕策略,、如何自定義拒絕策略。 看下面這段代碼的最后一行,, ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //...其他線程池配置 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 在 // AbortPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // DiscardPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // DiscardOldestPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // CallerRunsPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 這四個策略對應的含義分別是:
而如果你要自定義一個拒絕策略,,那么可以這樣寫: executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 拒絕策略的邏輯 } }); 當然如果你喜歡用Lamba表達式,,也可以這樣寫: executor.setRejectedExecutionHandler((r, executor1) -> { // 拒絕策略的邏輯 }); 好了,今天的學習就到這里,! 如果您學習過程中如遇困難,?可以加入我們超高質量的Spring技術交流群,參與交流與討論,,更好的學習與進步,!更多Spring Boot教程可以點擊直達!,,歡迎收藏與轉發(fā)支持,! 代碼示例本文的完整工程可以查看下面?zhèn)}庫中 |
|