CompletableFuture是jdk8的新特性。CompletableFuture實現了CompletionStage接口和Future接口,前者是對后者的一個擴展,增加了異步會點、流式處理、多個Future組合處理的能力,使Java在處理多任務的協同工作時更加順暢便利。
一、創建異步任務
1. supplyAsync
supplyAsync是創建帶有返回值的異步任務。它有如下兩個方法,一個是使用默認線程池(ForkJoinPool.commonPool())的方法,一個是帶有自定義線程池的重載方法
//帶返回值異步請求,默認線程池
publicstaticCompletableFuturesupplyAsync(Suppliersupplier)
//帶返回值的異步請求,可以自定義線程池
publicstaticCompletableFuturesupplyAsync(Suppliersupplier,Executorexecutor)
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf=CompletableFuture.supplyAsync(()->{
System.out.println("dosomething....");
return"result";
});
//等待任務執行完成
System.out.println("結果->"+cf.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
//自定義線程池
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();
CompletableFuturecf=CompletableFuture.supplyAsync(()->{
System.out.println("dosomething....");
return"result";
},executorService);
//等待子任務執行完成
System.out.println("結果->"+cf.get());
}
測試結果:
2. runAsync
runAsync是創建沒有返回值的異步任務。它有如下兩個方法,一個是使用默認線程池(ForkJoinPool.commonPool())的方法,一個是帶有自定義線程池的重載方法
//不帶返回值的異步請求,默認線程池
publicstaticCompletableFuturerunAsync(Runnablerunnable)
//不帶返回值的異步請求,可以自定義線程池
publicstaticCompletableFuturerunAsync(Runnablerunnable,Executorexecutor)
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf=CompletableFuture.runAsync(()->{
System.out.println("dosomething....");
});
//等待任務執行完成
System.out.println("結果->"+cf.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
//自定義線程池
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();
CompletableFuturecf=CompletableFuture.runAsync(()->{
System.out.println("dosomething....");
},executorService);
//等待任務執行完成
System.out.println("結果->"+cf.get());
}
測試結果:
3.獲取任務結果的方法
//如果完成則返回結果,否則就拋出具體的異常
publicTget()throwsInterruptedException,ExecutionException
//最大時間等待返回結果,否則就拋出具體異常
publicTget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException
//完成時返回結果值,否則拋出unchecked異常。為了更好地符合通用函數形式的使用,如果完成此CompletableFuture所涉及的計算引發異常,則此方法將引發unchecked異常并將底層異常作為其原因
publicTjoin()
//如果完成則返回結果值(或拋出任何遇到的異常),否則返回給定的valueIfAbsent。
publicTgetNow(TvalueIfAbsent)
//如果任務沒有完成,返回的值設置為給定值
publicbooleancomplete(Tvalue)
//如果任務沒有完成,就拋出給定異常
publicbooleancompleteExceptionally(Throwableex)
基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
- 項目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 視頻教程:https://doc.iocoder.cn/video/
二、異步回調處理
1. thenApply和thenApplyAsync
thenApply 表示某個任務執行完成后執行的動作,即回調方法,會將該任務的執行結果即方法返回值作為入參傳遞到回調方法中,帶有返回值。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenApplyAsync((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
result+=2;
returnresult;
});
//等待任務1執行完成
System.out.println("cf1結果->"+cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenApply((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
result+=2;
returnresult;
});
//等待任務1執行完成
System.out.println("cf1結果->"+cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
測試結果:
從上面代碼和測試結果我們發現thenApply和thenApplyAsync區別在于,使用thenApply方法時子任務與父任務使用的是同一個線程,而thenApplyAsync在子任務中是另起一個線程執行任務,并且thenApplyAsync可以自定義線程池,默認的使用ForkJoinPool.commonPool()線程池。
2. thenAccept和thenAcceptAsync
thenAccep表示某個任務執行完成后執行的動作,即回調方法,會將該任務的執行結果即方法返回值作為入參傳遞到回調方法中,無返回值。
測試代碼
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenAccept((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任務1執行完成
System.out.println("cf1結果->"+cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenAcceptAsync((result)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任務1執行完成
System.out.println("cf1結果->"+cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
測試結果:
從上面代碼和測試結果我們發現thenAccep和thenAccepAsync區別在于,使用thenAccep方法時子任務與父任務使用的是同一個線程,而thenAccepAsync在子任務中可能是另起一個線程執行任務,并且thenAccepAsync可以自定義線程池,默認的使用ForkJoinPool.commonPool()線程池。
3.thenRun和thenRunAsync
thenRun表示某個任務執行完成后執行的動作,即回調方法,無入參,無返回值。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenRun(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任務1執行完成
System.out.println("cf1結果->"+cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=cf1.thenRunAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
//等待任務1執行完成
System.out.println("cf1結果->"+cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
測試結果:
從上面代碼和測試結果我們發現thenRun和thenRunAsync區別在于,使用thenRun方法時子任務與父任務使用的是同一個線程,而thenRunAsync在子任務中可能是另起一個線程執行任務,并且thenRunAsync可以自定義線程池,默認的使用ForkJoinPool.commonPool()線程池。
4.whenComplete和whenCompleteAsync
whenComplete是當某個任務執行完成后執行的回調方法,會將執行結果或者執行期間拋出的異常傳遞給回調方法,如果是正常執行則異常為null,回調方法對應的CompletableFuture的result和該任務一致,如果該任務正常執行,則get方法返回執行結果,如果是執行異常,則get方法拋出異常。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
inta=1/0;
return1;
});
CompletableFuturecf2=cf1.whenComplete((result,e)->{
System.out.println("上個任務結果:"+result);
System.out.println("上個任務拋出異常:"+e);
System.out.println(Thread.currentThread()+"cf2dosomething....");
});
////等待任務1執行完成
//System.out.println("cf1結果->"+cf1.get());
////等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
測試結果:
whenCompleteAsync和whenComplete區別也是whenCompleteAsync可能會另起一個線程執行任務,并且thenRunAsync可以自定義線程池,默認的使用ForkJoinPool.commonPool()線程池。
5.handle和handleAsync
跟whenComplete基本一致,區別在于handle的回調方法有返回值。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
//inta=1/0;
return1;
});
CompletableFuturecf2=cf1.handle((result,e)->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
System.out.println("上個任務結果:"+result);
System.out.println("上個任務拋出異常:"+e);
returnresult+2;
});
//等待任務2執行完成
System.out.println("cf2結果->"+cf2.get());
}
測試結果 :
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
三、多任務組合處理
1. thenCombine、thenAcceptBoth 和runAfterBoth
這三個方法都是將兩個CompletableFuture組合起來處理,只有兩個任務都正常完成時,才進行下階段任務。
區別:thenCombine會將兩個任務的執行結果作為所提供函數的參數,且該方法有返回值;thenAcceptBoth同樣將兩個任務的執行結果作為方法入參,但是無返回值;runAfterBoth沒有入參,也沒有返回值。注意兩個任務中只要有一個執行異常,則將該異常信息作為指定任務的執行結果。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
return2;
});
CompletableFuturecf3=cf1.thenCombine(cf2,(a,b)->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
returna+b;
});
System.out.println("cf3結果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
return2;
});
CompletableFuturecf3=cf1.thenAcceptBoth(cf2,(a,b)->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
System.out.println(a+b);
});
System.out.println("cf3結果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf1dosomething....");
return1;
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread()+"cf2dosomething....");
return2;
});
CompletableFuturecf3=cf1.runAfterBoth(cf2,()->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
});
System.out.println("cf3結果->"+cf3.get());
}
測試結果:
2.applyToEither、acceptEither和runAfterEither
這三個方法和上面一樣也是將兩個CompletableFuture組合起來處理,當有一個任務正常完成時,就會進行下階段任務。
區別:applyToEither會將已經完成任務的執行結果作為所提供函數的參數,且該方法有返回值;acceptEither同樣將已經完成任務的執行結果作為方法入參,但是無返回值;runAfterEither沒有入參,也沒有返回值。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf1任務完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf2任務完成";
});
CompletableFuturecf3=cf1.applyToEither(cf2,(result)->{
System.out.println("接收到"+result);
System.out.println(Thread.currentThread()+"cf3dosomething....");
return"cf3任務完成";
});
System.out.println("cf3結果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf1任務完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return"cf2任務完成";
});
CompletableFuturecf3=cf1.acceptEither(cf2,(result)->{
System.out.println("接收到"+result);
System.out.println(Thread.currentThread()+"cf3dosomething....");
});
System.out.println("cf3結果->"+cf3.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf1任務完成");
return"cf1任務完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf2任務完成");
return"cf2任務完成";
});
CompletableFuturecf3=cf1.runAfterEither(cf2,()->{
System.out.println(Thread.currentThread()+"cf3dosomething....");
System.out.println("cf3任務完成");
});
System.out.println("cf3結果->"+cf3.get());
}
測試結果:
從上面可以看出cf1任務完成需要2秒,cf2任務完成需要5秒,使用applyToEither組合兩個任務時,只要有其中一個任務完成時,就會執行cf3任務,顯然cf1任務先完成了并且將自己任務的結果傳值給了cf3任務,cf3任務中打印了接收到cf1任務完成,接著完成自己的任務,并返回cf3任務完成;acceptEither和runAfterEither類似,acceptEither會將cf1任務的結果作為cf3任務的入參,但cf3任務完成時并無返回值;runAfterEither不會將cf1任務的結果作為cf3任務的入參,它是沒有任務入參,執行完自己的任務后也并無返回值。
2. allOf / anyOf
allOf:CompletableFuture是多個任務都執行完成后才會執行,只有有一個任務執行異常,則返回的CompletableFuture執行get方法時會拋出異常,如果都是正常執行,則get返回null。
anyOf :CompletableFuture是多個任務只要有一個任務執行完成,則返回的CompletableFuture執行get方法時會拋出異常,如果都是正常執行,則get返回執行完成任務的結果。
測試代碼:
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf1任務完成");
return"cf1任務完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
inta=1/0;
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf2任務完成");
return"cf2任務完成";
});
CompletableFuturecf3=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf3任務完成");
return"cf3任務完成";
});
CompletableFuturecfAll=CompletableFuture.allOf(cf1,cf2,cf3);
System.out.println("cfAll結果->"+cfAll.get());
}
publicstaticvoidmain(String[]args)throwsExecutionException,InterruptedException{
CompletableFuturecf1=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf1dosomething....");
Thread.sleep(2000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf1任務完成");
return"cf1任務完成";
});
CompletableFuturecf2=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(5000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf2任務完成");
return"cf2任務完成";
});
CompletableFuturecf3=CompletableFuture.supplyAsync(()->{
try{
System.out.println(Thread.currentThread()+"cf2dosomething....");
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println("cf3任務完成");
return"cf3任務完成";
});
CompletableFuture
測試結果:
審核編輯 :李倩
-
接口
+關注
關注
33文章
8667瀏覽量
151524 -
線程
+關注
關注
0文章
505瀏覽量
19715
原文標題:一網打盡:異步神器 CompletableFuture 萬字詳解!
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論