在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

調度線程池ScheduledThreadPoolExecutor源碼解析

jf_78858299 ? 來源:JAVA旭陽 ? 作者:JAVA旭陽 ? 2023-05-11 10:45 ? 次閱讀

前言

ScheduledThreadPoolExecutor可以用來很方便實現我們的調度任務,具體使用可以參考調度線程池ScheduledThreadPoolExecutor的正確使用姿勢這篇文章,那大家知道它是怎么實現的嗎,本文就帶大家來揭曉謎底。

實現機制分析

我們先思考下,如果讓大家去實現ScheduledThreadPoolExecutor可以周期性執行任務的功能,需要考慮哪些方面呢?

  1. ScheduledThreadPoolExecutor的整體實現思路是什么呢?

答:我們是不是可以繼承線程池類,按照線程池的思路,將任務先丟到阻塞隊列中,等到時間到了,工作線程就從阻塞隊列獲取任務執行。

  1. 如何實現等到了未來的時間點就開始執行呢?

答:我們可以根據參數獲取這個任務還要多少時間執行,那么我們是不是可以從阻塞隊列中獲取任務的時候,通過條件隊列的的awaitNanos(delay)方法,阻塞一定時間。

  1. 如何實現 任務的重復性執行呢?

答:這就更加簡單了,任務執行完成后,把它再次加入到隊列不就行了嗎。

圖片

源碼解析

類結構圖

圖片

ScheduledThreadPoolExecutor的類結構圖如上圖所示,很明顯它是在我們的線程池ThreadPoolExecutor框架基礎上擴展的。

  • ScheduledExecutorService:實現了該接口,封裝了調度相關的API
  • ThreadPoolExecutor:繼承了該類,保留了線程池的能力和整個實現的框架
  • DelayedWorkQueue:內部類,延遲阻塞隊列。
  • ScheduledFutureTask:延遲任務對象,包含了任務、任務狀態、剩余的時間、結果等信息。

重要屬性

通過ScheduledThreadPoolExecutor類的成員屬性,我們可以了解它的數據結構。

  1. shutdown 后是否繼續執行周期任務(重復執行)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  1. shutdown 后是否繼續執行延遲任務(只執行一次)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  1. 調用cancel()方法后,是否將該任務從隊列中移除,默認false
private volatile boolean removeOnCancel = false;
  1. 任務的序列號,保證FIFO隊列的順序,用來比較優先級
private static final AtomicLong sequencer = new AtomicLong()
  1. ScheduledFutureTask延遲任務類
  • ScheduledFutureTask 繼承 FutureTask,實現 RunnableScheduledFuture 接口,無論是 runnable 還是 callable,無論是否需要延遲和定時,所有的任務都會被封裝成 ScheduledFutureTask
  • 該類具有延遲執行的特點, 覆蓋 FutureTaskrun 方法來實現對延時執行、周期執行的支持。
  • 對于延時任務調用 FutureTask#run,而對于周期性任務則調用FutureTask#runAndReset 并且在成功之后根據 fixed-delay/fixed-rate模式來設置下次執行時間并重新將任務塞到工作隊列。
  • 成員屬性如下:
// 任務序列號
private final long sequenceNumber;
// 任務可以被執行的時間,交付時間,以納秒表示
private long time;	
// 0 表示非周期任務
// 正數表示 fixed-rate(兩次開始啟動的間隔)模式的周期,
// 負數表示 fixed-delay(一次執行結束到下一次開始啟動) 模式
private final long period;	
// 執行的任務對象
RunnableScheduledFuture
  1. DelayedWorkQueue延遲隊列
  • DelayedWorkQueue 是支持延時獲取元素的阻塞隊列, 內部采用優先隊列 PriorityQueue(小根堆、滿二叉樹)存儲元素。
  • 內部數據結構是數組,所以延遲隊列出隊頭元素后需要讓其他元素(尾)替換到頭節點,防止空指針異常。
  • 成員屬性如下:
// 初始容量
private static final int INITIAL_CAPACITY = 16;	
// 節點數量
private int size = 0;
// 存放任務的數組
private RunnableScheduledFuture?[] queue = 
    new RunnableScheduledFuture?[INITIAL_CAPACITY];	
// 控制并發用的鎖
private final ReentrantLock lock = new ReentrantLock();	
// 條件隊列
private final Condition available = lock.newCondition();
//指定用于等待隊列頭節點任務的線程
private Thread leader = null;

提交延遲任務schedule()原理

延遲執行方法,并指定延遲執行的時間,只會執行一次。

  1. schedule()方法是延遲任務方法的入口。
public ScheduledFuture? schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    // 判空處理
    if (command == null || unit == null)
        throw new NullPointerException();
    // 將外部傳入的任務封裝成延遲任務對象ScheduledFutureTask
    RunnableScheduledFuture? t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    // 執行延遲任務
    delayedExecute(t);
    return t;
}
  1. decorateTask(...) 該方法是封裝延遲任務
  • 調用triggerTime(delay, unit)方法計算延遲的時間。
// 返回【當前時間 + 延遲時間】,就是觸發當前任務執行的時間
private long triggerTime(long delay, TimeUnit unit) {
    // 設置觸發的時間
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    // 如果 delay < Long.Max_VALUE/2,則下次執行時間為當前時間 +delay
    // 否則為了避免隊列中出現由于溢出導致的排序紊亂,需要調用overflowFree來修正一下delay
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

// 下面這種情況很少,大家看不懂可以不用強行理解
// 如果某個任務的 delay 為負數,說明當前可以執行(其實早該執行了)。
// 阻塞隊列中維護任務順序是基于 compareTo 比較的,比較兩個任務的順序會用 time 相減。
// 那么可能出現一個 delay 為正數減去另一個為負數的 delay,結果上溢為負數,則會導致 compareTo 產生錯誤的結果
private long overflowFree(long delay) {
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        long headDelay = head.getDelay(NANOSECONDS);
        // 判斷一下隊首的delay是不是負數,如果是正數就不用管,怎么減都不會溢出
        // 否則拿當前 delay 減去隊首的 delay 來比較看,如果不出現上溢,排序不會亂
		// 不然就把當前 delay 值給調整為 Long.MAX_VALUE + 隊首 delay
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}
  • 調用RunnableScheduledFuture的構造方法封裝為延遲任務
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
     // 任務的觸發時間
    this.time = ns;
     // 任務的周期, 延遲任務的為0,因為不需要重復執行
    this.period = 0;
    // 任務的序號 + 1
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • 調用decorateTask()方法裝飾延遲任務
// 沒有做任何操作,直接將 task 返回,該方法主要目的是用于子類擴展
protected

提交周期任務scheduleAtFixedRate()原理

按照固定的頻率周期性的執行任務,捕手renwu,一次任務的啟動到下一次任務的啟動的間隔

public ScheduledFuture? scheduleAtFixedRate(Runnable command, long initialDelay, long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 任務封裝,【指定初始的延遲時間和周期時間】
    ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(initialDelay, unit), unit.toNanos(period));
    // 默認返回本身
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 開始執行這個任務
    delayedExecute(t);
    return t;
}

提交周期任務scheduleWithFixedDelay()原理

按照指定的延時周期性執行任務,上一個任務執行完畢后,延時一定時間,再次執行任務。

public ScheduledFuture? scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null) 
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 任務封裝,【指定初始的延遲時間和周期時間】,周期時間為 - 表示是 fixed-delay 模式
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(initialDelay, unit), unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
     // 開始執行這個任務
    delayedExecute(t);
    return t;
}

執行任務delayedExecute(t)原理

上面多種提交任務的方式,殊途同歸,最終都會調用delayedExecute()方法執行延遲或者周期任務。

  • delayedExecute()方法是執行延遲任務的入口
private void delayedExecute(RunnableScheduledFuture? task) {
    // 線程池是 SHUTDOWN 狀態,執行拒絕策略
    if (isShutdown())
        // 調用拒絕策略的方法
        reject(task);
    else {
        // 把當前任務放入阻塞隊列
        super.getQueue().add(task);
        // 線程池狀態為 SHUTDOWN 并且不允許執行任務了,就從隊列刪除該任務,并設置任務的狀態為取消狀態
        // 非主流程,可以跳過,不重點看了
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);
        else
            // 開始執行了哈
            ensurePrestart();
    }
}
  • ensurePrestart()方法開啟線程執行
// ThreadPoolExecutor#ensurePrestart
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // worker數目小于corePoolSize,則添加一個worker。
    if (wc < corePoolSize)
        // 第二個參數 true 表示采用核心線程數量限制,false 表示采用 maximumPoolSize
        addWorker(null, true);
    // corePoolSize = 0的情況,至少開啟一個線程,【擔保機制】
    else if (wc == 0)
        addWorker(null, false);
}

addWorker()方法實際上父類ThreadPoolExecutor的方法,這個方法在該文章 Java線程池源碼深度解析中詳細介紹過,這邊做個總結:

  • 如果線程池中工作線程數量小于最大線程數,創建工作線程,執行任務。
  • 如果線程池中工作線程數量大于最大線程數,直接返回。

獲取延遲任務take()原理

目前工作線程已經創建好了,工作線程開始工作了,它會從阻塞隊列中獲取延遲任務執行,這部分也是線程池里面的原理,不做展開,那我們看下它是如何實現延遲執行的? 主要關注如何從阻塞隊列中獲取任務。

  1. DelayedWorkQueue#take()方法獲取延遲任務
  • 該方法會在上面的addWoker()方法創建工作線程后,工作線程中循環持續調用workQueue.take()方法獲取延遲任務。
  • 該方法主要獲取延遲隊列中任務延遲時間小于等于0 的任務。
  • 如果延遲時間不小于0,那么調用條件隊列的awaitNanos(delay)阻塞方法等待一段時間,等時間到了,延遲時間自然小于等于0了。
  • 獲取到任務后,工作線程就可以開始執行調度任務了。
// DelayedWorkQueue#take()
public RunnableScheduledFuture? take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加可中斷鎖
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 獲取阻塞隊列中的頭結點
            RunnableScheduledFuture? first = queue[0];
            // 如果阻塞隊列沒有數據,為空
            if (first == null)
                // 等待隊列不空,直至有任務通過 offer 入隊并喚醒
                available.await();
            else {
                // 獲取頭節點的的任務還剩余多少時間才執行
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 到達觸發時間,獲取頭節點并調整堆,重新選擇延遲時間最小的節點放入頭部
                    return finishPoll(first);

                // 邏輯到這說明頭節點的延遲時間還沒到
                first = null;
                // 說明有 leader 線程在等待獲取頭節點,當前線程直接去阻塞等待
                if (leader != null)
                	// 當前線程阻塞
                    available.await();
                else {
                    // 沒有 leader 線程,【當前線程作為leader線程,并設置頭結點的延遲時間作為阻塞時間】
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 當前線程通過awaitNanos方法等待delay時間后,會自動喚醒,往后面繼續執行
                        available.awaitNanos(delay);
                        // 到達阻塞時間時,當前線程會從這里醒來,進入下一輪循環,就有可能執行了
                    } finally {
                        // t堆頂更新,leader 置為 null,offer 方法釋放鎖后,
                        // 有其它線程通過 take/poll 拿到鎖,讀到 leader == null,然后將自身更新為leader。
                        if (leader == thisThread)
                            // leader 置為 null 用以接下來判斷是否需要喚醒后繼線程
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 沒有 leader 線程并且頭結點不為 null,喚醒阻塞獲取頭節點的線程,
        // 【如果沒有這一步,就會出現有了需要執行的任務,但是沒有線程去執行】
        if (leader == null && queue[0] != null)
            available.signal();
        // 解鎖
        lock.unlock();
    }
}
  1. finishPoll()方法獲取到任務后執行

該方法主要做兩個事情, 獲取頭節點并調整堆,重新選擇延遲時間最小的節點放入頭部。

private RunnableScheduledFuture?</span> finishPoll(RunnableScheduledFuture?span> f) {
    // 獲取尾索引
    int s = --size;
    // 獲取尾節點
    RunnableScheduledFuture? x = queue[s];
    // 將堆結構最后一個節點占用的 slot 設置為 null,因為該節點要嘗試升級成堆頂,會根據特性下調
    queue[s] = null;
    // s == 0 說明 當前堆結構只有堆頂一個節點,此時不需要做任何的事情
    if (s != 0)
        // 從索引處 0 開始向下調整
        siftDown(0, x);
    // 出隊的元素索引設置為 -1
    setIndex(f, -1);
    return f;
}

延遲任務運行的原理

從延遲隊列中獲取任務后,工作線程會調用延遲任務的run()方法執行任務。

  1. ScheduledFutureTask#run()方法運行任務
  • 調用isPeriodic()方法判斷任務是否是周期性任務還是非周期性任務
  • 如果任務是非周期任務,就調用父類的FutureTask#run()執行一次
  • 如果任務是非周期任務,就調用父類的FutureTask#runAndReset(), 返回true會設置下一次的執行時間,重新放入線程池的阻塞隊列中,等待下次獲取執行
public void run() {
    // 是否周期性,就是判斷 period 是否為 0
    boolean periodic = isPeriodic();
    // 根據是否是周期任務檢查當前狀態能否執行任務,不能執行就取消任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 非周期任務,直接調用 FutureTask#run 執行一次
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 周期任務的執行,返回 true 表示執行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 設置周期任務的下一次執行時間
        setNextRunTime();
        // 任務的下一次執行安排,如果當前線程池狀態可以執行周期任務,加入隊列,并開啟新線程
        reExecutePeriodic(outerTask);
    }
}
  1. FutureTask#runAndReset()執行周期性任務
  • 周期任務正常完成后任務的狀態不會變化,依舊是 NEW,不會設置 outcome 屬性。
  • 但是如果本次任務執行出現異常,會進入 setException 方法將任務狀態置為異常,把異常保存在 outcome 中。
  • 方法返回 false,后續的該任務將不會再周期的執行
protected boolean runAndReset() {
    // 任務不是新建的狀態了,或者被別的線程執行了,直接返回 false
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable
  1. ScheduledFutureTask#setNextRunTime()設置下次執行時間
  • 如果屬性period大于0,表示fixed-rate模式,直接加上period時間即可。
  • 如果屬性period小于等于0, 表示是fixed-delay模式, 調用triggerTime重新計算下次時間。
// 任務下一次的觸發時間
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        // fixed-rate 模式,【時間設置為上一次執行任務的時間 + p】,兩次任務執行的時間差
        time += p;
    else
        // fixed-delay 模式,下一次執行時間是【當前這次任務結束的時間(就是現在) + delay 值】
        time = triggerTime(-p);
}
  1. ScheduledFutureTask#reExecutePeriodic(),重新放入阻塞任務隊列,等待獲取,進行下一輪執行
// ScheduledThreadPoolExecutor#reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture? task) {
    if (canRunInCurrentRunState(true)) {
        // 【放入任務隊列】
        super.getQueue().add(task);
        // 如果提交完任務之后,線程池狀態變為了 shutdown 狀態,需要再次檢查是否可以執行,
        // 如果不能執行且任務還在隊列中未被取走,則取消任務
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 當前線程池狀態可以執行周期任務,加入隊列,并【根據線程數量是否大于核心線程數確定是否開啟新線程】
            ensurePrestart();
    }
}
聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 線程池
    +關注

    關注

    0

    文章

    57

    瀏覽量

    6854
收藏 人收藏

    評論

    相關推薦

    Java中的線程包括哪些

    線程是用來統一管理線程的,在 Java 中創建和銷毀線程都是一件消耗資源的事情,線程可以重復
    的頭像 發表于 10-11 15:33 ?823次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實現的

    線程的概念是什么?線程是如何實現的?
    發表于 02-28 06:20

    java自帶的線程方法

    二、原理分析 從上面使用線程的例子來看,最主要就是兩步,構造ThreadPoolExecutor對象,然后每來一個任務,就調用ThreadPoolExecutor對象的execute方法。 1
    發表于 09-27 11:06 ?0次下載

    原理解析線程池中多余的線程是如何回收的?

    最近閱讀了JDK線程ThreadPoolExecutor的源碼,對線程執行任務的流程有了大體了解,實際上這個流程也十分通俗易懂,就不再贅
    的頭像 發表于 11-11 09:57 ?962次閱讀

    線程線程

    線程通常用于服務器應用程序。 每個傳入請求都將分配給線程池中的一個線程,因此可以異步處理請求,而不會占用主線程,也不會延遲后續請求的處理
    的頭像 發表于 02-28 09:53 ?800次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    Java線程核心原理

    看過Java線程源碼的小伙伴都知道,在Java線程池中最核心的類就是ThreadPoolExecutor,
    的頭像 發表于 04-21 10:24 ?858次閱讀

    如何用C++實現一個線程呢?

    C++線程是一種多線程管理模型,把線程分成任務執行和線程調度兩部分。
    發表于 06-08 14:53 ?1788次閱讀
    如何用C++實現一個<b class='flag-5'>線程</b><b class='flag-5'>池</b>呢?

    線程線程怎么釋放

    線程分組看,pool名開頭線程占616條,而且waiting狀態也是616條,這個點就非??梢闪?,我斷定就是這個pool開頭線程導致的問題。我們先排查為何這個
    發表于 07-31 10:49 ?2304次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的<b class='flag-5'>線程</b>怎么釋放

    線程的兩個思考

    今天還是說一下線程的兩個思考。 池子 我們常用的線程, JDK的ThreadPoolExecutor. CompletableFutures 默認使用了
    的頭像 發表于 09-30 11:21 ?3106次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的兩個思考

    Spring 的線程應用

    我們在日常開發中,經常跟多線程打交道,Spring 為我們提供了一個線程方便我們開發,它就是 ThreadPoolTaskExecutor ,接下來我們就來聊聊 Spring 的線程
    的頭像 發表于 10-13 10:47 ?626次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應用

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優勢 C++線程簡介
    的頭像 發表于 11-10 10:24 ?541次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來的目的就是:提高程序執行效
    的頭像 發表于 11-10 16:37 ?529次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    線程七大核心參數執行順序

    線程是一種用于管理和調度線程執行的技術,通過將任務分配到線程池中的線程進行處理,可以有效地控制
    的頭像 發表于 12-04 16:45 ?1075次閱讀

    線程的創建方式有幾種

    線程是一種用于管理和調度線程的技術,能夠有效地提高系統的性能和資源利用率。它通過預先創建一組線程并維護一個工作隊列,將任務提交給
    的頭像 發表于 12-04 16:52 ?878次閱讀

    什么是動態線程?動態線程的簡單實現思路

    因此,動態可監控線程一種針對以上痛點開發的線程管理工具。主要可實現功能有:提供對 Spring 應用內線程
    的頭像 發表于 02-28 10:42 ?650次閱讀
    主站蜘蛛池模板: jlzzjlzz亚洲大全| 免费任我爽橹视频在线观看| 五月激情婷婷网| 天天干天天玩天天操| 六月婷婷精品视频在线观看| 婷婷五月小说| 色天使久久综合网天天| 91深夜福利| 亚洲精品资源| 丁香综合网| 男人午夜影院| 国产香蕉视频在线| 手机看片午夜| 午夜96影视| 成年男人永久免费看片| 九色福利| 来吧成人综合网| 国产一区在线播放| 久久精品.com| 成人在线91| 国产激情在线观看| 国产小视频你懂的| 激情五月婷婷在线| 欧美三级手机在线| 日本最色视频| 婷婷色在线视频| 日本在线观看永久免费网站| 涩五月婷婷| 四虎永久精品视频在线| 亚洲午夜久久久久久噜噜噜| 不卡一区| 四虎影院com| 色妞网| 尻美女视频| 夜夜操伊人| 天天射天天舔| 人人看人人鲁狠狠高清| 美女被日出白浆| 手机免费在线视频| 五月激情丁香网| 插菊综合网|