在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內(nèi)不再提示

FutureTask是如何通過阻塞來獲取到異步線程執(zhí)行結果的呢?

OSC開源社區(qū) ? 來源:OSCHINA 社區(qū) ? 2023-08-12 14:37 ? 次閱讀

1、FutureTask 對象介紹

Future 對象大家都不陌生,是 JDK1.5 提供的接口,是用來以阻塞的方式獲取線程異步執(zhí)行完的結果。 在 Java 中想要通過線程執(zhí)行一個任務,離不開 Runnable 與 Callable 這兩個接口。 Runnable 與 Callable 的區(qū)別在于,Runnable 接口只有一個 run 方法,該方法用來執(zhí)行邏輯,但是并沒有返回值;而 Callable 的 call 方法,同樣用來執(zhí)行業(yè)務邏輯,但是是有一個返回值的。

Callable 執(zhí)行任務過程中可以通過 FutureTask 獲得任務的執(zhí)行狀態(tài),并且可以在執(zhí)行完成后通過 Future.get () 方式獲取執(zhí)行結果。 Future 是一個接口,而 FutureTask 就是 Future 的實現(xiàn)類。并且 FutureTask 實現(xiàn)了 RunnableFuture(Runnable + Future),說明我們可以創(chuàng)建一個 FutureTask 并直接把它放到線程池執(zhí)行,然后獲取 FutureTask 的執(zhí)行結果。

2、FutureTask 源碼解析

2.1 主要方法和屬性

那么 FutureTask 是如何通過阻塞的方式來獲取到異步線程執(zhí)行的結果的呢?我們看下 FutureTask 中的屬性。

// FutureTask的狀態(tài)及其常量
privatevolatileint state;
    privatestaticfinalint NEW          =0;
    privatestaticfinalint COMPLETING   =1;
    privatestaticfinalint NORMAL       =2;
    privatestaticfinalint EXCEPTIONAL  =3;
    privatestaticfinalint CANCELLED    =4;
    privatestaticfinalint INTERRUPTING =5;
    privatestaticfinalint INTERRUPTED  =6;
    
    // callable對象,執(zhí)行完后置空
    privateCallable callable;
    // 要返回的結果或要引發(fā)的異常來自 get() 方法
    privateObject outcome;// non-volatile, protected by state reads/writes
    // 執(zhí)行Callable的線程
    privatevolatileThread runner;
    // 等待線程的一個鏈表結構
    privatevolatileWaitNode waiters;

?FutureTask 中幾個比較重要的方法。

// 取消任務的執(zhí)行
booleancancel(boolean mayInterruptIfRunning);
// 返回任務是否已經(jīng)被取消
booleanisCancelled();
// 返回任務是否已經(jīng)完成,任務狀態(tài)不為NEW即為完成
booleanisDone();
// 通過get方法獲取任務的執(zhí)行結果
Vget()throwsInterruptedException,ExecutionException;
// 通過get方法獲取任務的執(zhí)行結果,帶有超時,如果超過給定時間則拋出異常
Vget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException;

?2.2 FutureTask 執(zhí)行

當我們在線程池中執(zhí)行一個 Callable 方法時,其實是將 Callable 任務封裝成一個 RunnableFuture 對象去執(zhí)行,同時將這個 RunnableFuture 對象返回,這樣我們就拿到了 FutureTask 的引用,可以隨時獲取到任務執(zhí)行的狀態(tài),并且可以在任務執(zhí)行完成后通過該對象獲取執(zhí)行結果。 以下為 ThreadPoolExecutor 線程池提交一個 callable 方法的源碼。

public Future submit(Callable task){
        if(task ==null)thrownewNullPointerException();
        RunnableFuture ftask =newTaskFor(task);
        execute(ftask);
        return ftask;
    }

protected RunnableFuture newTaskFor(Callable callable){
        returnnewFutureTask(callable);
    }

?2.3 run 方法介紹

RunnableFuture 其實也是一個可以執(zhí)行的 runnable,我們看下他的 run 方法。其主要流程就是執(zhí)行 call 方法,正常執(zhí)行完畢后將 result 結果賦值到 outcome 屬性上。

publicvoidrun(){
        if(state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null,Thread.currentThread()))
            return;
        try{
            // 將callable賦值到本地變量
            Callable c = callable;
            // 判斷callable不為空并且FutureTask的狀態(tài)必須為新創(chuàng)建
            if(c !=null&& state == NEW){
                V result;
                boolean ran;
                try{
                    // 執(zhí)行call方法(用戶自己實現(xiàn)的call邏輯),并獲取到result結果
                    result = c.call();
                    ran =true;
                }catch(Throwable ex){
                    result =null;
                    ran =false;
                    // 如果執(zhí)行過程出現(xiàn)異常,則將異常對象賦值到outcome上
                    setException(ex);
                }
                // 如果正常執(zhí)行完畢,則將result賦值到outcome屬性上
                if(ran)
                    set(result);
            }
        }finally{
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner =null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if(s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

?以下邏輯為正常執(zhí)行完成后賦值的邏輯。

// 如果任務沒有被取消,將future執(zhí)行完的返回值賦值給result結果
// FutureTask任務的執(zhí)行狀態(tài)是通過CAS的方式進行賦值的,并且由此可知,COMPLETING其實是一個瞬時狀態(tài)
// 當將線程執(zhí)行結果賦值給outcome后,狀態(tài)會修改為對應的NORMAL,即正常結束
protectedvoidset(V v){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state
            finishCompletion();
        }
    }

?以下為執(zhí)行異常時賦值邏輯,直接將 Throwable 對象賦值到 outcome 屬性上。

protectedvoidsetException(Throwable t){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
            finishCompletion();
        }
    }

?無論是正常執(zhí)行還是異常執(zhí)行,最終都會調(diào)用一個 finishCompletion 方法,用來做工作的收尾工作。

2.4 get 方法介紹

Future 的 get 方法有兩個重載的方法,一個是 get () 獲取結果,一個是 get (long, TimeUnit) 帶有超時時間的獲取結果,我們看下 FutureTask 中的這兩個方法是如何實現(xiàn)的。

// 不帶有超時時間,一直阻塞直到獲取結果
publicVget()throwsInterruptedException,ExecutionException{
        int s = state;
        if(s <= COMPLETING)
            // 等待結果完成,帶有超時的get方法也是調(diào)用的awaitDone方法
            s =awaitDone(false,0L);
        // 返回結果
        returnreport(s);
    }

// 帶有超時時間的獲取結果,如果超過時間還沒有獲取到結果則拋出異常
publicVget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException{
        if(unit ==null)
            thrownewNullPointerException();
        int s = state;
        // 如果任務未中斷,調(diào)用awaitDone方法等待任務結果
        if(s <= COMPLETING &&
            (s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
            thrownewTimeoutException();
        // 返回結果
        returnreport(s);
    }

?我們主要看下 awaitDone 方法的執(zhí)行邏輯。此方法會通過 for 循環(huán)的方式一直阻塞等待任務執(zhí)行完成。如果帶有超時時間,則超過截止時間后會直接返回。

// timed:是否需要超時獲取
// nanos:超時時間單位納秒
privateintawaitDone(boolean timed,long nanos)
        throwsInterruptedException{
        finallong deadline = timed ?System.nanoTime()+ nanos :0L;
        WaitNode q =null;
        boolean queued =false;
        // 此方法會一直for循環(huán)判斷任務狀態(tài)是否已經(jīng)完成,是Future.get阻塞的原因
        for(;;){
            if(Thread.interrupted()){
                removeWaiter(q);
                thrownewInterruptedException();
            }

            int s = state;
            // 任務狀態(tài)大于COMPLETING,則表明任務結束,直接返回
            if(s > COMPLETING){
                if(q !=null)
                    q.thread =null;
                return s;
            }
            elseif(s == COMPLETING)// cannot time out yet
                // Thread.yield() 方法,使當前線程由執(zhí)行狀態(tài),變成為就緒狀態(tài),讓出cpu時間,在下一個線程執(zhí)行時候,此線程有可能被執(zhí)行,也有可能沒有被執(zhí)行。
                // COMPLETING狀態(tài)為瞬時狀態(tài),任務執(zhí)行完成,要么是正常結束,要么異常結束,后續(xù)會被置為NORMAL或者EXCEPTIONAL
                Thread.yield();
            elseif(q ==null)
                // 每調(diào)用一次get方法,都會創(chuàng)建一個WaitNode等待節(jié)點
                q =newWaitNode();
            elseif(!queued)
                // 將該等待節(jié)點添加到鏈表結構waiters中,q.next = waiters 即在waiters的頭部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法帶有超時判斷,則判斷當前時間是否已經(jīng)超過了截止時間,如果超過了及截止日期,則退出循環(huán)直接返回當前狀態(tài),此時任務狀態(tài)一定是NEW
            elseif(timed){
                nanos = deadline -System.nanoTime();
                if(nanos <=0L){
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

?我們在看下 report 方法,在調(diào)用 get 方法時是如何返回結果的。

這里首先獲取 outcome 的值,并判斷任務是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成,則將 outcome 對象強轉成泛型指定的類型;如果任務被取消了,則拋出一個 CancellationException 異常;如果都不是,則說明任務在執(zhí)行過程中發(fā)生了異常,此時任務狀態(tài)位 EXCEPTIONAL,此時的 outcome 即為 Throwable 對象,所以將 outcome 強轉為 Throwable 并拋出異常。

由此可以知道,我們將一個 FutureTask 任務 submit 到線程池中執(zhí)行的時候,如果發(fā)生了異常,是會在調(diào)用 get 方法的時候拋出的。
privateVreport(int s)throwsExecutionException{
        Object x = outcome;
        if(s == NORMAL)
            return(V)x;
        if(s >= CANCELLED)
            thrownewCancellationException();
        thrownewExecutionException((Throwable)x);
    }

?2.5 cancel 方法介紹

cancel 方法用于取消正在運行的任務,如果任務取消成功,則返回 TRUE,如果取消失敗則返回 FALSE。

// mayInterruptIfRunning:允許中斷正在運行的任務
publicbooleancancel(boolean mayInterruptIfRunning){
        // mayInterruptIfRunning如果為true則將狀態(tài)置為INTERRUPTING,如果未false則將狀態(tài)置為CANCELLED
        if(!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            returnfalse;
        // 如果狀態(tài)修改成功后,判斷是否允許中斷線程,如果允許,則調(diào)用Thread的interrupt方法中斷
        try{// in case call to interrupt throws exception
            if(mayInterruptIfRunning){
                try{
                    Thread t = runner;
                    if(t !=null)
                        t.interrupt();
                }finally{// final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        }finally{
            // 取消后的收尾工作
            finishCompletion();
        }
        returntrue;
    }

?2.6 isDone/isCancelled 方法介紹

isDone 方法用于判斷 FutureTask 是否已經(jīng)完成;isCancelled 方法用來判斷 FutureTask 是否已經(jīng)取消,這兩個方法都是通過狀態(tài)位來判斷的。

publicbooleanisCancelled(){
        return state >= CANCELLED;
    }

    publicbooleanisDone(){
        return state != NEW;
    }

?2.7 finishCompletion 方法介紹

我們看下 finishCompletion 方法都做了哪些工作。

// 刪除所有等待線程并發(fā)出信號,最后執(zhí)行done方法
privatevoidfinishCompletion(){
        // assert state > COMPLETING;
        for(WaitNode q;(q = waiters)!=null;){
            if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){
                for(;;){
                    Thread t = q.thread;
                    if(t !=null){
                        q.thread =null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if(next ==null)
                        break;
                    q.next =null;// unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable =null;// to reduce footprint
    }

?我們看到 done 方法是一個受保護的空方法,此處沒有任何邏輯,由其子類去根據(jù)自己的業(yè)務去實現(xiàn)相應的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}

3、總結

通過源碼解讀可以了解到 Future 的原理:

第一步:主線程將任務封裝成一個 Callable 對象,通過 submit 方法提交到線程池去執(zhí)行。

第二步:線程池執(zhí)行任務的 run 方法,主線程則可以繼續(xù)執(zhí)行其他邏輯。

第三步:線程池中方法執(zhí)行完成后將結果賦值到 outcome 屬性上,并修改任務狀態(tài)。

第四步:主線程在需要拿到異步任務結果的時候,主動調(diào)用 fugure.get () 方法來獲取結果。

第五步:如果異步線程在執(zhí)行過程中發(fā)生異常,則會在調(diào)用 future.get () 方法的時候拋出來。 以上就是對于 FutureTask 的分析,我們可以了解 FutureTask 任務執(zhí)行的方式以及 Future.get 已阻塞的方式獲取線程執(zhí)行的結果原理,并且從代碼中可以了解 FutureTask 的任務執(zhí)行狀態(tài)以及狀態(tài)的變化過程。





審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權轉載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 狀態(tài)機
    +關注

    關注

    2

    文章

    492

    瀏覽量

    27542
  • 線程池
    +關注

    關注

    0

    文章

    57

    瀏覽量

    6851
  • for循環(huán)

    關注

    0

    文章

    61

    瀏覽量

    2503

原文標題:并發(fā)編程 - FutureTask 解析

文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    鴻蒙原生應用開發(fā)-ArkTS語言基礎類庫多線程I/O密集型任務開發(fā)

    使用異步并發(fā)可以解決單次I/O任務阻塞的問題,但是如果遇到I/O密集型任務,同樣會阻塞線程中其它任務的執(zhí)行,這時需要使用多
    發(fā)表于 03-21 14:57

    Java線程阻塞方法大全

    如果線程是因為調(diào)用了wait()、sleep()或者join()方法而導致的阻塞,可以中斷線程,并且通過拋出InterruptedException
    發(fā)表于 04-02 15:42

    Java的線程喚醒與阻塞規(guī)則

    如果線程是因為調(diào)用了wait()、sleep()或者join()方法而導致的阻塞,可以中斷線程,并且通過拋出InterruptedException
    發(fā)表于 07-06 15:11

    同步與異步,阻塞與非阻塞的區(qū)別是什么

    同步與異步阻塞與非阻塞的區(qū)別
    發(fā)表于 01-26 06:12

    labview怎么終止‘執(zhí)行系統(tǒng)命令’并獲取所有結果

    現(xiàn)在有一個應用要一邊運動一邊通過執(zhí)行系統(tǒng)命令’獲取一系列數(shù)據(jù),獲取數(shù)據(jù)的指令是一直以0.1秒的速率讀取數(shù)據(jù),我可以在運動開始的時候開啟‘
    發(fā)表于 04-07 10:31

    如何使用多線程異步操作等并發(fā)設計方法最大化程序的性能

    很多朋友往往會使用線程執(zhí)行耗時較長的I/O操作。這樣在只有少數(shù)幾個并發(fā)操作的時候還無傷大雅,如果需要處理大量的并發(fā)操作時就不合適了。  異步調(diào)用與多
    發(fā)表于 08-23 16:31

    A線程如何在線程本身識別變量是否改變

    阻塞獲取可以解決但是這個B線程是別人代碼寫的。不好修改不想再增加一個線程去循環(huán)讀取變量X是否改變,再釋放信號量需求A線程如何在
    發(fā)表于 11-02 11:02

    異步調(diào)用子vi問題

    我試了異步調(diào)用子vi,現(xiàn)在的問題是子vi是一個循環(huán),但是我在主程序獲取子vi的結果時,只有子vi結束了才能獲取且只能獲取到循環(huán)最后一次的
    發(fā)表于 11-11 10:34

    是否有函數(shù)或者功能可以實現(xiàn)A線程阻塞變量的值?

    阻塞獲取可以解決但是這個B線程是別人代碼寫的。不好修改不想再增加一個線程去循環(huán)讀取變量X是否改變,再釋放信號量需求A線程如何在
    發(fā)表于 02-01 16:25

    Runloop是怎樣進行線程?;?/a>

    ,它會向一個 C語言程序那樣在運行完所有代碼后退出線程。 而網(wǎng)絡請求是異步的,這導致獲取到請求數(shù)據(jù)時,線程已經(jīng)退出,代理方法沒有機會執(zhí)行。
    發(fā)表于 09-26 10:34 ?0次下載

    沒有互聯(lián)網(wǎng),如何本地獲取到LoRaWAN的終端數(shù)據(jù)?

    一般情況下,我們可以通過連接TTN,獲取到LoRaWAN的終端數(shù)據(jù)。 但是,如果沒有互聯(lián)網(wǎng),那么,我們也就無法通過連接TTN
    發(fā)表于 04-10 16:38 ?980次閱讀

    詳解同步異步阻塞阻塞

    同步、異步分別指的是一種通訊方式,當 cpu 不需要執(zhí)行線程上下文切換就能完成任務,此時便認為這種通訊方式是同步的,相對的如果存在cpu 上下文切換,這種方式便是異步
    的頭像 發(fā)表于 05-03 17:53 ?4847次閱讀
    詳解同步<b class='flag-5'>異步</b>和<b class='flag-5'>阻塞</b>非<b class='flag-5'>阻塞</b>

    使用匿名管道技術獲取CMD命令的執(zhí)行結果

    遠程 CMD 是指惡意程序接收到控制端發(fā)送的 CMD 指令后,在本地執(zhí)行 CMD 命令,并將執(zhí)行結果回傳至控制端。本文將演示使用匿名管道技術獲取 CMD 命令的
    的頭像 發(fā)表于 04-03 18:04 ?4002次閱讀

    CompletableFuture異步線程是真的優(yōu)雅

    雖然 Future 以及相關使用方法提供了異步執(zhí)行任務的能力,但是對于結果獲取卻是很不方便,我們必須使用Future.get()的方式阻塞
    的頭像 發(fā)表于 08-07 15:40 ?683次閱讀
    CompletableFuture<b class='flag-5'>異步</b>多<b class='flag-5'>線程</b>是真的優(yōu)雅

    verilog同步和異步的區(qū)別 verilog阻塞賦值和非阻塞賦值的區(qū)別

    Verilog中同步和異步的區(qū)別,以及阻塞賦值和非阻塞賦值的區(qū)別。 一、Verilog中同步和異步的區(qū)別 同步傳輸和異步傳輸是指數(shù)據(jù)在電路中
    的頭像 發(fā)表于 02-22 15:33 ?1741次閱讀
    主站蜘蛛池模板: 天天噜噜日日噜噜久久综合网| 色吧综合网| 色在线免费| 四虎影院美女| 李老汉的性生生活1全部| 欧美一区高清| 黄视频国产| 国产午夜精品片一区二区三区| 午夜在线影院| 视频在线精品| 国产午夜精品理论片| 亚洲性夜| 波多野结衣久久精品| 奇米四色777亚洲图| 高颜值露脸极品在线播放| 5x性区m免费毛片视频看看| 日日操夜夜操天天操| 久久精品国产免费| 欧美又黄又嫩大片a级| 天天做夜夜爱| 6969精品视频在线观看| 天天舔天天爱| 国产精品夜夜春夜夜爽| 欧美啊片| 2021最新国产成人精品视频| 亚洲男人的天堂久久无| 一本二卡三卡四卡乱码二百| 色www| 视频在线你懂的| 欧美日韩a级a| 成人免费看毛片| 97蜜桃| a毛片免费观看完整| 天天操天天干天搞天天射| 在线观看黄的网站| 99在线热播精品免费| 在线成人aa在线看片| 亚洲综人网| 欧美大狠狠大臿蕉香蕉大视频| 成色视频| 国产干美女|