這是一個或許對你有用的開源項目
國產 Star 破 10w+ 的開源項目,前端包括管理后臺 + 微信小程序,后端支持單體和微服務架構。
來源:后端元宇宙
- 一個示例回顧Future
- 通過CompletableFuture實現上面示例
- CompletableFuture創建方式
- 異步回調方法
- 異常回調
- 多任務組合回調
- CompletableFuture使用有哪些注意點
一個示例回顧Future
一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。
JDK5新增了Future
接口,用于描述一個異步計算的結果。
雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對于結果的獲取卻是很不方便,我們必須使用Future.get()
的方式阻塞調用線程,或者使用輪詢方式判斷 Future.isDone
任務是否結束,再獲取結果。
這兩種處理方式都不是很優雅,相關代碼如下:
@Test
publicvoidtestFuture()throwsExecutionException,InterruptedException{
ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);
Futurefuture=executorService.submit(()->{
Thread.sleep(2000);
return"hello";
});
System.out.println(future.get());
System.out.println("end");
}
與此同時,Future無法解決多個異步任務需要相互依賴的場景,簡單點說就是,主線程需要等待子線程任務執行完畢之后在進行執行,這個時候你可能想到了CountDownLatch
,沒錯確實可以解決,代碼如下。
這里定義兩個Future,第一個通過用戶id獲取用戶信息,第二個通過商品id獲取商品信息。
@Test
publicvoidtestCountDownLatch()throwsInterruptedException,ExecutionException{
ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);
CountDownLatchdownLatch=newCountDownLatch(2);
longstartTime=System.currentTimeMillis();
FutureuserFuture=executorService.submit(()->{
//模擬查詢商品耗時500毫秒
Thread.sleep(500);
downLatch.countDown();
return"用戶A";
});
FuturegoodsFuture=executorService.submit(()->{
//模擬查詢商品耗時500毫秒
Thread.sleep(400);
downLatch.countDown();
return"商品A";
});
downLatch.await();
//模擬主程序耗時時間
Thread.sleep(600);
System.out.println("獲取用戶信息:"+userFuture.get());
System.out.println("獲取商品信息:"+goodsFuture.get());
System.out.println("總共用時"+(System.currentTimeMillis()-startTime)+"ms");
}
「運行結果」
獲取用戶信息:用戶A
獲取商品信息:商品A
總共用時1110ms
從運行結果可以看出結果都已經獲取,而且如果我們不用異步操作,執行時間應該是:500+400+600 = 1500
,用異步操作后實際只用1110。
但是Java8以后我不在認為這是一種優雅的解決方式,接下來來了解下CompletableFuture
的使用。
基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
通過CompletableFuture實現上面示例
@Test
publicvoidtestCompletableInfo()throwsInterruptedException,ExecutionException{
longstartTime=System.currentTimeMillis();
//調用用戶服務獲取用戶基本信息
CompletableFutureuserFuture=CompletableFuture.supplyAsync(()->
//模擬查詢商品耗時500毫秒
{
try{
Thread.sleep(500);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"用戶A";
});
//調用商品服務獲取商品基本信息
CompletableFuturegoodsFuture=CompletableFuture.supplyAsync(()->
//模擬查詢商品耗時500毫秒
{
try{
Thread.sleep(400);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"商品A";
});
System.out.println("獲取用戶信息:"+userFuture.get());
System.out.println("獲取商品信息:"+goodsFuture.get());
//模擬主程序耗時時間
Thread.sleep(600);
System.out.println("總共用時"+(System.currentTimeMillis()-startTime)+"ms");
}
運行結果
獲取用戶信息:用戶A
獲取商品信息:商品A
總共用時1112ms
通過CompletableFuture
可以很輕松的實現CountDownLatch
的功能,你以為這就結束了,遠遠不止,CompletableFuture
比這要強多了。
比如可以實現 :任務1執行完了再執行任務2,甚至任務1執行的結果,作為任務2的入參數等等強大功能,下面就來學學CompletableFuture
的API。
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
- 項目地址:https://github.com/YunaiV/yudao-cloud
- 視頻教程:https://doc.iocoder.cn/video/
CompletableFuture創建方式
1、常用的4種創建方式
CompletableFuture
源碼中有四個靜態方法用來執行異步任務
publicstaticCompletableFuturesupplyAsync(Suppliersupplier){..}
publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor){..}
publicstaticCompletableFuturerunAsync(Runnablerunnable) {..}
publicstaticCompletableFuturerunAsync(Runnablerunnable,Executorexecutor) {..}
一般我們用上面的靜態方法來創建CompletableFuture
,這里也解釋下他們的區別:
- 「supplyAsync」 執行任務,支持返回值。
- 「runAsync」 執行任務,沒有返回值。
「supplyAsync方法」
//使用默認內置線程池ForkJoinPool.commonPool(),根據supplier構建執行任務
publicstaticCompletableFuturesupplyAsync(Suppliersupplier)
//自定義線程,根據supplier構建執行任務
publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor)
「runAsync方法」
//使用默認內置線程池ForkJoinPool.commonPool(),根據runnable構建執行任務
publicstaticCompletableFuturerunAsync(Runnablerunnable)
//自定義線程,根據runnable構建執行任務
publicstaticCompletableFuturerunAsync(Runnablerunnable,Executorexecutor)
2、結果獲取的4種方式
對于結果的獲取CompltableFuture
類提供了四種方式
//方式一
publicTget()
//方式二
publicTget(longtimeout,TimeUnitunit)
//方式三
publicTgetNow(TvalueIfAbsent)
//方式四
publicTjoin()
說明:
- 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已經提供了,后者提供超時處理,如果在指定時間內未獲取結果將拋出超時異常
- 「getNow」 => 立即獲取結果不阻塞,結果計算已完成將返回結果或計算過程中的異常,如果未計算完成將返回設定的valueIfAbsent值
- 「join」 => 方法里不會拋出異常
示例:
@Test
publicvoidtestCompletableGet()throwsInterruptedException,ExecutionException{
CompletableFuturecp1=CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"商品A";
});
//getNow方法測試
System.out.println(cp1.getNow("商品B"));
//join方法測試
CompletableFuturecp2=CompletableFuture.supplyAsync((()->1/0));
System.out.println(cp2.join());
System.out.println("-----------------------------------------------------");
//get方法測試
CompletableFuturecp3=CompletableFuture.supplyAsync((()->1/0));
System.out.println(cp3.get());
}
「運行結果」 :
- 第一個執行結果為 「商品B」 ,因為要先睡上1秒結果不能立即獲取
-
join方法獲取結果方法里不會拋異常,但是執行結果會拋異常,拋出的異常為
CompletionException
-
get方法獲取結果方法里將拋出異常,執行結果拋出的異常為
ExecutionException
異步回調方法
1、thenRun/thenRunAsync
通俗點講就是,「做完第一個任務后,再做第二個任務,第二個任務也沒有返回值」 。
示例
@Test
publicvoidtestCompletableThenRunAsync()throwsInterruptedException,ExecutionException{
longstartTime=System.currentTimeMillis();
CompletableFuturecp1=CompletableFuture.runAsync(()->{
try{
//執行任務A
Thread.sleep(600);
}catch(InterruptedExceptione){
e.printStackTrace();
}
});
CompletableFuturecp2=cp1.thenRun(()->{
try{
//執行任務B
Thread.sleep(400);
}catch(InterruptedExceptione){
e.printStackTrace();
}
});
//get方法測試
System.out.println(cp2.get());
//模擬主程序耗時時間
Thread.sleep(600);
System.out.println("總共用時"+(System.currentTimeMillis()-startTime)+"ms");
}
//運行結果
/**
*null
*總共用時1610ms
*/
「thenRun 和thenRunAsync有什么區別呢?」
如果你執行第一個任務的時候,傳入了一個自定義線程池:
- 調用thenRun方法執行第二個任務時,則第二個任務和第一個任務是共用同一個線程池。
-
調用
thenRunAsync
執行第二個任務時,則第一個任務使用的是你自己傳入的線程池,第二個任務使用的是ForkJoin線程池。
說明: 后面介紹的thenAccept
和thenAcceptAsync
,thenApply
和thenApplyAsync
等,它們之間的區別也是這個。
2、thenAccept/thenAcceptAsync
第一個任務執行完成后,執行第二個回調方法任務,會將該任務的執行結果,作為入參 ,傳遞到回調方法中,但是回調方法是沒有返回值的。
示例
@Test
publicvoidtestCompletableThenAccept()throwsExecutionException,InterruptedException{
longstartTime=System.currentTimeMillis();
CompletableFuturecp1=CompletableFuture.supplyAsync(()->{
return"dev";
});
CompletableFuturecp2=cp1.thenAccept((a)->{
System.out.println("上一個任務的返回結果為:"+a);
});
cp2.get();
}
3、 thenApply/thenApplyAsync
表示第一個任務執行完成后,執行第二個回調方法任務,會將該任務的執行結果,作為入參,傳遞到回調方法中,并且回調方法是有返回值的。
示例
@Test
publicvoidtestCompletableThenApply()throwsExecutionException,InterruptedException{
CompletableFuturecp1=CompletableFuture.supplyAsync(()->{
return"dev";
}).thenApply((a)->{
if(Objects.equals(a,"dev")){
return"dev";
}
return"prod";
});
System.out.println("當前環境為:"+cp1.get());
//輸出:當前環境為:dev
}
異常回調
當CompletableFuture
的任務不論是正常完成還是出現異常它都會調用 「whenComplete」 這回調函數。
- 「正常完成」 :whenComplete返回結果和上級任務一致,異常為null;
- 「出現異常」 :whenComplete返回結果為null,異常為上級任務的異常;
即調用get()
時,正常完成時就獲取到結果,出現異常時就會拋出異常,需要你處理該異常。
下面來看看示例
1、只用whenComplete
@Test
publicvoidtestCompletableWhenComplete()throwsExecutionException,InterruptedException{
CompletableFuturefuture=CompletableFuture.supplyAsync(()->{
if(Math.random()0.5){
thrownewRuntimeException("出錯了");
}
System.out.println("正常結束");
return0.11;
}).whenComplete((aDouble,throwable)->{
if(aDouble==null){
System.out.println("whenCompleteaDoubleisnull");
}else{
System.out.println("whenCompleteaDoubleis"+aDouble);
}
if(throwable==null){
System.out.println("whenCompletethrowableisnull");
}else{
System.out.println("whenCompletethrowableis"+throwable.getMessage());
}
});
System.out.println("最終返回的結果="+future.get());
}
正常完成,沒有異常時:
正常結束
whenCompleteaDoubleis0.11
whenCompletethrowableisnull
最終返回的結果=0.11
出現異常時:get()會拋出異常
whenCompleteaDoubleisnull
whenCompletethrowableisjava.lang.RuntimeException:出錯了
java.util.concurrent.ExecutionException:java.lang.RuntimeException:出錯了
atjava.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
atjava.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
2、whenComplete + exceptionally示例
@Test
publicvoidtestWhenCompleteExceptionally()throwsExecutionException,InterruptedException{
CompletableFuturefuture=CompletableFuture.supplyAsync(()->{
if(Math.random()0.5){
thrownewRuntimeException("出錯了");
}
System.out.println("正常結束");
return0.11;
}).whenComplete((aDouble,throwable)->{
if(aDouble==null){
System.out.println("whenCompleteaDoubleisnull");
}else{
System.out.println("whenCompleteaDoubleis"+aDouble);
}
if(throwable==null){
System.out.println("whenCompletethrowableisnull");
}else{
System.out.println("whenCompletethrowableis"+throwable.getMessage());
}
}).exceptionally((throwable)->{
System.out.println("exceptionally中異常:"+throwable.getMessage());
return0.0;
});
System.out.println("最終返回的結果="+future.get());
}
當出現異常時,exceptionally
中會捕獲該異常,給出默認返回值0.0。
whenCompleteaDoubleisnull
whenCompletethrowableisjava.lang.RuntimeException:出錯了
exceptionally中異常:java.lang.RuntimeException:出錯了
最終返回的結果=0.0
多任務組合回調
1、AND組合關系
thenCombine
/ thenAcceptBoth
/ runAfterBoth
都表示:「當任務一和任務二都完成再執行任務三」 。
區別在于:
- 「runAfterBoth」 不會把執行結果當做方法入參,且沒有返回值
- 「thenAcceptBoth」 : 會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且無返回值
- 「thenCombine」 :會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且有返回值
示例
@Test
publicvoidtestCompletableThenCombine()throwsExecutionException,InterruptedException{
//創建線程池
ExecutorServiceexecutorService=Executors.newFixedThreadPool(10);
//開啟異步任務1
CompletableFuturetask=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務1,當前線程是:"+Thread.currentThread().getId());
intresult=1+1;
System.out.println("異步任務1結束");
returnresult;
},executorService);
//開啟異步任務2
CompletableFuturetask2=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務2,當前線程是:"+Thread.currentThread().getId());
intresult=1+1;
System.out.println("異步任務2結束");
returnresult;
},executorService);
//任務組合
CompletableFuturetask3=task.thenCombineAsync(task2,(f1,f2)->{
System.out.println("執行任務3,當前線程是:"+Thread.currentThread().getId());
System.out.println("任務1返回值:"+f1);
System.out.println("任務2返回值:"+f2);
returnf1+f2;
},executorService);
Integerres=task3.get();
System.out.println("最終結果:"+res);
}
「運行結果」
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:18
異步任務2結束
執行任務3,當前線程是:19
任務1返回值:2
任務2返回值:2
最終結果:4
2、OR組合關系
applyToEither
/ acceptEither
/ runAfterEither
都表示:「兩個任務,只要有一個任務完成,就執行任務三」 。
區別在于:
- 「runAfterEither」 :不會把執行結果當做方法入參,且沒有返回值
- 「acceptEither」 : 會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且無返回值
- 「applyToEither」 :會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且有返回值
示例
@Test
publicvoidtestCompletableEitherAsync(){
//創建線程池
ExecutorServiceexecutorService=Executors.newFixedThreadPool(10);
//開啟異步任務1
CompletableFuturetask=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務1,當前線程是:"+Thread.currentThread().getId());
intresult=1+1;
System.out.println("異步任務1結束");
returnresult;
},executorService);
//開啟異步任務2
CompletableFuturetask2=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務2,當前線程是:"+Thread.currentThread().getId());
intresult=1+2;
try{
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("異步任務2結束");
returnresult;
},executorService);
//任務組合
task.acceptEitherAsync(task2,(res)->{
System.out.println("執行任務3,當前線程是:"+Thread.currentThread().getId());
System.out.println("上一個任務的結果為:"+res);
},executorService);
}
運行結果
//通過結果可以看出,異步任務2都沒有執行結束,任務3獲取的也是1的執行結果
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:18
執行任務3,當前線程是:19
上一個任務的結果為:2
注意
如果把上面的核心線程數改為1也就是
ExecutorServiceexecutorService=Executors.newFixedThreadPool(1);
運行結果就是下面的了,會發現根本沒有執行任務3,顯然是任務3直接被丟棄了。
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:17
3、多任務組合
- 「allOf」 :等待所有任務完成
- 「anyOf」 :只要有一個任務完成
示例
allOf:等待所有任務完成
@Test
publicvoidtestCompletableAallOf()throwsExecutionException,InterruptedException{
//創建線程池
ExecutorServiceexecutorService=Executors.newFixedThreadPool(10);
//開啟異步任務1
CompletableFuturetask=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務1,當前線程是:"+Thread.currentThread().getId());
intresult=1+1;
System.out.println("異步任務1結束");
returnresult;
},executorService);
//開啟異步任務2
CompletableFuturetask2=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務2,當前線程是:"+Thread.currentThread().getId());
intresult=1+2;
try{
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("異步任務2結束");
returnresult;
},executorService);
//開啟異步任務3
CompletableFuturetask3=CompletableFuture.supplyAsync(()->{
System.out.println("異步任務3,當前線程是:"+Thread.currentThread().getId());
intresult=1+3;
try{
Thread.sleep(4000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("異步任務3結束");
returnresult;
},executorService);
//任務組合
CompletableFutureallOf=CompletableFuture.allOf(task,task2,task3);
//等待所有任務完成
allOf.get();
//獲取任務的返回結果
System.out.println("task結果為:"+task.get());
System.out.println("task2結果為:"+task2.get());
System.out.println("task3結果為:"+task3.get());
}
anyOf: 只要有一個任務完成
@Test
publicvoidtestCompletableAnyOf()throwsExecutionException,InterruptedException{
//創建線程池
ExecutorServiceexecutorService=Executors.newFixedThreadPool(10);
//開啟異步任務1
CompletableFuturetask=CompletableFuture.supplyAsync(()->{
intresult=1+1;
returnresult;
},executorService);
//開啟異步任務2
CompletableFuturetask2=CompletableFuture.supplyAsync(()->{
intresult=1+2;
returnresult;
},executorService);
//開啟異步任務3
CompletableFuturetask3=CompletableFuture.supplyAsync(()->{
intresult=1+3;
returnresult;
},executorService);
//任務組合
CompletableFuture
CompletableFuture使用有哪些注意點
CompletableFuture
使我們的異步編程更加便利的、代碼更加優雅的同時,我們也要關注下它,使用的一些注意點。
1、Future需要獲取返回值,才能獲取異常信息
@Test
publicvoidtestWhenCompleteExceptionally(){
CompletableFuturefuture=CompletableFuture.supplyAsync(()->{
if(1==1){
thrownewRuntimeException("出錯了");
}
return0.11;
});
//如果不加get()方法這一行,看不到異常信息
//future.get();
}
Future需要獲取返回值,才能獲取到異常信息。如果不加 get()
/join()
方法,看不到異常信息。
小伙伴們使用的時候,注意一下哈,考慮是否加try...catch...
或者使用exceptionally
方法。
2、CompletableFuture的get()方法是阻塞的
CompletableFuture
的get()
方法是阻塞的,如果使用它來獲取異步調用的返回值,需要添加超時時間。
//反例
CompletableFuture.get();
//正例
CompletableFuture.get(5,TimeUnit.SECONDS);
3、不建議使用默認線程池
CompletableFuture
代碼中又使用了默認的 「ForkJoin線程池」 ,處理的線程個數是電腦 「CPU核數-1」 。在大量請求過來的時候,處理邏輯復雜的話,響應會很慢。一般建議使用自定義線程池,優化線程池配置參數。
4、自定義線程池時,注意飽和策略
CompletableFuture
的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)
。并且一般建議使用自定義線程池。
但是如果線程池拒絕策略是DiscardPolicy
或者DiscardOldestPolicy
,當線程池飽和時,會直接丟棄任務,不會拋棄異常。因此建議,CompletableFuture
線程池策略最好使用AbortPolicy
,然后耗時的異步線程,做好線程池隔離哈。
-
代碼
+關注
關注
30文章
4788瀏覽量
68612 -
異步
+關注
關注
0文章
62瀏覽量
18050 -
線程
+關注
關注
0文章
504瀏覽量
19683
原文標題:奇淫巧技,CompletableFuture 異步多線程是真的優雅
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論