最近在開發 延保服務 頻道頁時,為了提高查詢效率,使用到了多線程技術。為了對多線程方案設計有更加充分的了解,在業余時間讀完了《圖解 Java 多線程設計模式》這本書,覺得收獲良多。本篇文章將介紹其中提到的 Future 模式,以及在實際業務開發中對該模式的應用,而這些內容對于本書來說只是冰山一角,還是推薦大家有時間去閱讀原書。
1. Future 模式:“先給您提貨單”
我們先來看一個場景:假如我們去蛋糕店買蛋糕,下單后,店員會遞給我們提貨單并告知“請您傍晚來取蛋糕”。到了傍晚我們拿著提貨單去取蛋糕,店員會先和我們說“您的蛋糕已經做好了”,然后將蛋糕拿給我們。
如果將下單蛋糕到取蛋糕的過程抽象成一個方法的話,那么意味著這個方法需要花很長的時間才能獲取執行結果,與其一直等待結果,不如先拿著一張“提貨單”,到我們需要取貨的時候,再通過它去取,而獲取“提貨單”的過程是幾乎不耗時的,而這個提貨單對象就被稱為 Future
,后續便可以通過它來獲取方法的返回值。用 Java 來表示這個過程的話,需要使用到 FutureTask
和 Callable
兩個類,如下:
public class Example {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 預定蛋糕,并定義“提貨單”
System.out.println("我:預定蛋糕");
FutureTask future = new FutureTask?>(() -> {
System.out.println("店員:請您傍晚來取蛋糕");
Thread.sleep(2000);
System.out.println("店員:您的蛋糕已經做好了");
return "Holiland";
});
// 開始做蛋糕
new Thread(future).start();
// 去做其他事情
Thread.sleep(1000);
System.out.println("我:忙碌中...");
// 取蛋糕
System.out.println("我:取蛋糕 " + future.get());
}
}
// 運行結果:
// 我:預定蛋糕
// 店員:請您傍晚來取蛋糕
// 我:忙碌中...
// 店員:您的蛋糕已經做好了
// 我:取蛋糕 Holiland
方法的調用者可以將任務交給其他線程去處理,無需阻塞等待方法的執行,這樣調用者便可以繼續執行其他任務,并能通過 Future
對象獲取執行結果。
它的運行原理如下:創建 FutureTask
實例時,Callable
對象會被傳遞給構造函數,當線程調用 FutureTask
的 run
方法時,Callable
對象的 call
方法也會被執行。調用 call
方法的線程會同步地獲取結果,并通過 FutureTask
的 set
方法來記錄結果對象,如果 call
方法執行期間發生了異常,則會調用 setException
方法記錄異常。最后,通過調用 get
方法獲取方法的結果,注意這里可能會拋出方法執行時產生的異常。
public void run() {
// ...
try {
// “提貨任務”
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調用 callable 的 call 方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 捕獲并設置異常
setException(ex);
}
if (ran)
// 為結果賦值
set(result);
}
} finally {
// ...
}
}
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將結果賦值給 outcome 全局變量,供 get 時獲取
outcome = v;
// 修改狀態為 NORMAL
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 將異常賦值給 outcome 變量,供 get 時拋出
outcome = t;
// 修改狀態為 EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 未完成時阻塞等一等
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常結束的話能正常獲取到結果
if (s == NORMAL)
return (V)x;
// 否則會拋出異常,注意如果執行中出現異常,調用 get 時會被拋出
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
現在對 Future 模式 已經有了基本的了解:它通過 Future
接口來表示未來的結果,實現 調用者與執行者之間的解耦,提高系統的吞吐量和響應速度,那在實踐中對該模式是如何使用的呢?
2. 對 Future 模式的實踐
因為 延保服務 頻道頁訪問量大且對接口性能要求較高,單線程處理并不能滿足性能要求,所以應用了 Future 模式 來提高查詢效率,但是并沒有借助上文所述的 FutureTask
來實現,而是使用了 CompletableFuture
工具類,它們的實現原理基本一致,但是后者提供的方法和對 鏈式編程 的支持使代碼更加簡潔,實現更加容易(相關 API 參考見文末)。
如下是使用 CompletableFuture
異步多線程查詢訂單列表的邏輯,根據配置的 pageNo
分多條線程查詢各頁的訂單數據:
List result = new ArrayList?>();
// 并發查詢訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢的頁數 pageNo,并發查詢不同頁碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture.supplyAsync(
() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor);
futureList.add(future);
}
// 等待所有線程處理完畢,并封裝結果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發查詢用戶訂單信息異常", e);
}
這段代碼中對異常的處理能進行優化:第 15 行代碼,如果某條線程查詢訂單列表時發生異常,那么在調用 get
方法時會拋出該異常,被 catch
后返回空結果,即使有其他線程查詢成功,這些訂單結果值也會被忽略掉,可以針對這一點進行優化,如下:
List result = new ArrayList?>();
// 并發查詢訂單列表
List>> futureList = new ArrayList?>();
try {
// 配置需要查詢的頁數 pageNo,并發查詢不同頁碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢用戶訂單信息異常", e);
return Collections.emptyList();
});
futureList.add(future);
}
// 等待所有線程處理完畢,并封裝結果值
for (CompletableFuture> future : futureList) {
result.addAll(future.get());
}
} catch (Exception e) {
log.error("并發查詢用戶訂單信息異常", e);
}
優化后針對查詢發生異常的任務打印異常日志,并返回空集合,這樣即使單線程查詢失敗,也不會影響到其他線程查詢成功的結果。
CompletableFuture
還提供了 allOf
方法,它返回的 CompletableFuture
對象在所有 CompletableFuture
執行完成時完成,相比于對每個任務都調用 get
阻塞等待任務完成的實現可讀性更好,改造后代碼如下:
List result = new ArrayList?>();
// 并發查詢訂單列表
CompletableFuture>[] futures = new CompletableFuture[pageNo];
// 配置需要查詢的頁數 pageNo,并發查詢不同頁碼的訂單
for (int i = 1; i <= pageNo; i++) {
int curPageNo = i;
CompletableFuture> future = CompletableFuture
.supplyAsync(() -> getOrderInfoList(userNo, curPageNo), threadPoolExecutor)
// 添加異常處理
.exceptionally(e -> {
log.error("查詢用戶訂單信息異常", e);
return Collections.emptyList();
});
futures[i - 1] = future;
}
try {
// 等待所有線程處理完畢
CompletableFuture.allOf(futures).get();
for (CompletableFuture> future : futures) {
List orderInfoList = future.get();
if (CollectionUtils.isEmpty(orderInfoList)) {
result.addAll(orderInfoList);
}
}
} catch (Exception e) {
log.error("處理用戶訂單結果信息異常", e);
}
Tips:
CompletableFuture
的設計初衷是支持異步編程,所以應盡量避免在CompletableFuture
鏈中使用get()/join()
方法,因為這些方法會阻塞當前線程直到CompletableFuture
完成,應該在必須使用該結果值時才調用它們。
相關的模式:命令模式
命令模式能將操作的調用者和執行者解耦,它能很容易的與 Future 模式 結合,以查詢訂單的任務為例,我們可以將該任務封裝為“命令”對象的形式,執行時為每個線程提交一個命令,實現解耦并提高擴展性。在命令模式中,命令對象需要 支持撤銷和重做,那么這便在查詢出現異常時,提供了補償處理的可能,命令模式類圖關系如下:
3.《圖解Java多線程設計模式》書籍推薦
我覺得本書算得上是一本老書:05 年出版的基于 JDK1.5 的Java多線程書籍,相比于目前我們常用的 JDK1.8 和時髦的 JDK21,在讀之前總會讓人覺得有一種過時的感覺。但是當我讀完時,發現其中的模式能對應上代碼中的處理邏輯:對 CompletableFuture
的使用正對應了其中的 Future 模式(異步獲取其他線程的執行結果)等等,所以我覺得模式的應用不會局限于技術的新老,它是在某種情況下,研發人員共識或通用的解決方案,在知曉某種模式,采用已有的技術實現它是容易的,而反過來在只掌握技術去探索模式是困難且沒有方向的。
同時,我也在考慮一個問題:對于新人學習多線程技術來說,究竟適不適合直接從模式入門呢?因為我對設計模式有了比較多的實踐經驗,所以對“模式”相關的內容足夠敏感,如果新人沒有這些經驗的話,這對他們來說會不會更像是一個個知識點的堆砌呢?好在的是,本書除了模式相關的內容,對基礎知識也做足了鋪墊,而且提出的關于多線程編程的思考點也是非常值得參考和學習的,以線程互斥和協同為例,書中談到:在對線程進行互斥處理時需要考慮 “要保護的東西是什么”,這樣便能夠 清晰的確定鎖的粒度;對于線程的協同,書中提到的是需要考慮 “放在中間的東西是什么”,直接的拋出這個觀點是不容易理解的,“中間的東西”是在多線程的 生產者和消費者模式 中提出的,部分線程負責生產,生產完成后將對象放在“中間”,部分線程負責消費,消費時取的便是“中間”的對象,而合理規劃這些中間的東西便能 消除生產者和消費者之間的速度差異,提高系統的吞吐量和響應速度。而再深入考慮這兩個角度時,線程的互斥和協同其實是內外統一的:為了讓線程協調運行,必須執行互斥處理,以防止共享的內容被破壞,而線程的互斥是為了線程的協調運行才進行的必要操作。
附:CompletableFuture 常用 API
使用 supplyAsync 方法異步執行任務,并返回 CompletableFuture 對象
如下代碼所示,調用 CompletableFuture.supplyAsync
靜態方法異步執行查詢邏輯,并返回一個新的 CompletableFuture
對象
CompletableFuture> future = CompletableFuture.supplyAsync(() -> doQuery(), executor);
使用 join 方法阻塞獲取完成結果
如下代碼所示,在封裝結果前,調用 join
方法阻塞等待獲取結果
futureList.forEach(CompletableFuture::join);
它與 get
方法的主要區別在于,join
方法拋出的是未經檢查的異常 CompletionException
,并將原始異常作為其原因,這意味著我們可以不需要在方法簽名中聲明它或在調用 join
方法的地方進行異常處理,而 get
方法會拋出 InterruptedException
和 ExecutionException
異常,我們必須對它進行處理,get
方法源碼如下:
public T get() throws InterruptedException, ExecutionException {
Object r;
if ((r = result) == null)
r = waitingGet(true);
return (T) reportGet(r);
}
用 thenApply(Function) 和 thenAccept(Consumer) 等回調函數處理結果
如下是使用 thenApply()
方法對 CompletableFuture
的結果進行轉換的操作:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(greeting -> greeting + " World");
使用 exceptionally() 處理 CompletableFuture 中的異常
CompletableFuture
提供了exceptionally()
方法來處理異常,這是一個非常重要的步驟。如果在 CompletableFuture
的運行過程中拋出異常,那么這個異常會被傳遞到最終的結果中。如果沒有適當的異常處理,那么在調用 get()
或 join()
方法時可能會拋出異常。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Exception occurred");
}
return "Hello, World!";
}).exceptionally(e -> "An error occurred");
使用 allOf() 和 anyOf() 處理多個 CompletableFuture
如果有多個 CompletableFuture
需要處理,可以使用 CompletableFuture.allOf()
或者 CompletableFuture.anyOf()
。allOf()
在所有的 CompletableFuture
完成時完成,而 anyOf()
則會在任意一個 CompletableFuture
完成時完成。
complete()、completeExceptionally()、cancel() 方法
CompletableFuture
的運行是在調用了 complete()
、completeExceptionally()
、cancel()
等方法后才會被標記為完成。如果沒有正確地完成 CompletableFuture
,那么在調用 get()
方法時可能會永久阻塞。這三個方法在 Java 并發編程中有著重要的應用。以下是這三個方法的常見使用場景:
complete(T value)
: 此方法用于顯式地完成一個 CompletableFuture
,并設置它的結果值。這在你需要在某個計算完成時,手動設置 CompletableFuture
的結果值的場景中非常有用。例如,你可能在一個異步操作完成時,需要設置 CompletableFuture
的結果值。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.complete("Operation Result");
completeExceptionally(Throwable ex)
: 此方法用于顯式地以異常完成一個 CompletableFuture
。這在你需要在某個計算失敗時,手動設置 CompletableFuture
的異常的場景中非常有用。例如,你可能在一個異步操作失敗時,需要設置 CompletableFuture
的異常。
CompletableFuture future = new CompletableFuture?>();
// Some asynchronous operation
future.completeExceptionally(new RuntimeException("Operation Failed"));
cancel(boolean mayInterruptIfRunning)
: 此方法用于取消與 CompletableFuture
關聯的計算。這在你需要取消一個長時間運行的或者不再需要的計算的場景中非常有用。例如,你可能在用戶取消操作或者超時的情況下,需要取消 CompletableFuture
的計算。
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// Long running operation
});
// Some condition
future.cancel(true);
這些方法都是線程安全的,可以從任何線程中調用。
使用 thenCompose() 處理嵌套的 CompletableFuture
如果在處理 CompletableFuture
的結果時又創建了新的CompletableFuture
,那么就會產生嵌套的 CompletableFuture
。這時可以使用 thenCompose()
方法來避免 CompletableFuture
的嵌套,如下代碼所示:
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
使用 thenCombine() 處理兩個 CompletableFuture 的結果
CompletableFuture completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2);
審核編輯 黃宇
-
JAVA
+關注
關注
19文章
2973瀏覽量
104901 -
API
+關注
關注
2文章
1507瀏覽量
62214 -
多線程
+關注
關注
0文章
278瀏覽量
20032
發布評論請先 登錄
相關推薦
評論