在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

如何在RocketMQ中合理使用重試機制

OSC開源社區 ? 來源:阿里云云原生 ? 作者:斜陽 ? 2022-11-23 10:15 ? 次閱讀

引言

Cloud Native

本文主要介紹在使用 RocketMQ 時為什么需要重試與兜底機制,生產者與消費者觸發重試的條件和具體行為,如何在 RocketMQ 中合理使用重試機制,幫助構建彈性,高可用系統的最佳實踐

RocketMQ 的重試機制包括三部分,分別是生產者重試,服務端內部數據復制遇到非預期問題時重試,消費者消費重試。本文中僅討論生產者重試和消費者消費重試兩種面向用戶側的實現。 6e5014b8-6a66-11ed-8abf-dac502259ad0.png

生產者發送重試

Cloud Native

RocketMQ 的生產者在發送消息到服務端時,可能會因為網絡問題,服務異常等原因導致調用失敗,這時候應該怎么辦?如何盡可能的保證消息不丟失呢?

1. 生產者重試次數

RocketMQ 在客戶端中內置了請求重試邏輯,支持在初始化時配置消息發送最大重試次數(默認為 2 次),失敗時會按照設置的重試次數重新發送。直到消息發送成功,或者達到最大重試次數時結束,并在最后一次失敗后返回調用錯誤的響應。對于同步發送和異步發送,均支持消息發送重試

同步發送:調用線程會一直阻塞,直到某次重試成功或最終重試失敗(返回錯誤碼或拋出異常)。

異步發送:調用線程不會阻塞,但調用結果會通過回調的形式,以異常事件或者成功事件返回。

2. 生產者重試間隔

在介紹生產者重試前,我們先來了解下流控的概念,流控一般是指服務端壓力過大,容量不足時服務端會限制客戶端收發消息的行為,是服務端自我保護的一種設計。RocketMQ 會根據當前是否觸發了流控而采用不同的重試策略:

非流控錯誤場景:其他觸發條件觸發重試后,均會立即進行重試,無等待間隔流控錯誤場景:系統會按照預設的指數退避策略進行延遲重試

為什么要引入退避和隨機抖動?

如果故障是由過載流控引起的,重試會增加服務端負載,導致情況進一步惡化,因此客戶端在遇到流控時會在兩次嘗試之間等待一段時間。每次嘗試后的等待時間都呈指數級延長。

指數回退可能導致很長的回退時間,因為指數函數增長很快。指數退避算法通過以下參數控制重試行為。

INITIAL_BACKOFF:第一次失敗重試前后需等待多久,默認值:1 秒;

MULTIPLIER :指數退避因子,即退避倍率,默認值:1.6;

JITTER :隨機抖動因子,默認值:0.2;

MAX_BACKOFF :等待間隔時間上限,默認值:120 秒;

MIN_CONNECT_TIMEOUT :最短重試間隔,默認值:20 秒。

ConnectWithBackoff()

  current_backoff = INITIAL_BACKOFF
  current_deadline = now() + INITIAL_BACKOFF
  while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS)
    SleepUntil(current_deadline)
    current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)
    current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)
特別說明:對于事務消息,只會進行透明重試(transparent retries),網絡超時或異常等場景不會進行重試。

3. 重試帶來的副作用

不停的重試看起來很美好,但也是有副作用的,主要包括兩方面:消息重復,服務端壓力增大

遠程調用的不確定性,因請求超時觸發消息發送重試流程,此時客戶端無法感知服務端的處理結果;客戶端進行的消息發送重試可能會導致消費方重復消費,應該按照用戶ID、業務主鍵等信息冪等處理消息

較多的重試次數也會增大服務端的處理壓力

4. 用戶的最佳實踐是什么

1)合理設置發送超時時間,發送的最大次數

發送的最大次數在初始化客戶端時配置在 ClientConfiguration;對于某些實時調用類場景,可能會導致消息發送請求鏈路被阻塞導致業務請求整體耗時高或耗時;需要合理評估每次調用請求的超時時間以及最大重試次數,避免影響全鏈路的耗時。

2)如何保證發送消息不丟失

由于分布式環境的復雜性,例如網絡不可達時 RocketMQ 客戶端發送請求重試機制并不能保證消息發送一定成功。業務方需要捕獲異常,并做好冗余保護處理,常見的解決方案有兩種:

向調用方返回業務處理失敗;

嘗試將失敗的消息存儲到數據庫,然后由后臺線程定時重試,保證業務邏輯的最終一致性。

3)關注流控異常導致無法重試

觸發流控的根本原因是系統容量不足,如果因為突發原因觸發消息流控,且客戶端內置的重試流程執行失敗,

則建議執行服務端擴容,將請求調用臨時替換到其他系統進行應急處理。

4)早期版本客戶端如何使用故障延遲機制進行發送重試?

對于 RocketMQ 4.x 和 3.x以下客戶端開啟故障延遲機制可以用:

producer.setSendLatencyFaultEnable(true)

配置重試次數使用:

producer.setRetryTimesWhenSendFailed() producer.setRetryTimesWhenSendAsyncFailed()

消費者消費重試

Cloud Native

消息中間件做異步解耦時的一個典型問題是如果下游服務處理消息事件失敗,那應該怎么做呢? RocketMQ 的消息確認機制以及消費重試策略可以幫助分析如下問題:

如何保證業務完整處理消息?

消費重試策略可以在設計實現消費者邏輯時保證每條消息處理的完整性,避免部分消息消費異常導致業務狀態不一致。

業務應用異常時處理中的消息狀態如何恢復?

當系統出現異常(宕機故障)等場景時,處理中的消息狀態如何恢復,消費重試具體行為是什么。

1. 什么是消費重試?

什么時候認為消費失敗?

消費者在接收到消息后將調用用戶的消費函數執行業務邏輯。如果客戶端返回消費失敗 ReconsumeLater,拋出非預期異常,或消息處理超時(包括在 PushConsumer 中排隊超時),只要服務端服務端一定時間內沒收到響應,將認為消費失敗

消費重試是什么?

消費者在消費某條消息失敗后,服務端會根據重試策略重新向客戶端投遞該消息。超過一次定數后若還未消費成功,則該消息將不再繼續重試,直接被發送到死信隊列中;

重試過程狀態機:消息在重試流程中的狀態和變化邏輯;

重試間隔:上一次消費失敗或超時后,下次重新嘗試消費的間隔時間;

最大重試次數:消息可被重試消費的最大次數。

2. 消息重試的場景

需要注意重試是應對異常情況,給予程序再次消費失敗消息的機會,不應該被用作常態化的鏈路。

推薦使用場景:

業務處理失敗,失敗原因跟當前的消息內容相關,預期一段時間后可執行成功;

是一個小概率事件,對于大批的消息只有很少量的失敗,后面的消息大概率會消費成功,是非常態化的。

正例:消費邏輯是扣減庫存,極少量商品因為樂觀鎖版本沖突導致扣減失敗,重試一般立刻成功。 錯誤使用場景:

消費處理邏輯中使用消費失敗來做條件判斷的結果分流,是不合理的。

反例:訂單在數據庫中狀態已經是已取消,此時如果收到發貨的消息,處理時不應返回消費失敗,而應該返回成功并標記不用發貨。

消費處理中使用消費失敗來做處理速率限流,是不合理的。

限流的目的是將超出流量的消息暫時堆積在隊列中達到削峰的作用,而不是讓消息進入重試鏈路。

這種做法會讓消息反復在服務端和客戶端之間傳遞,增大了系統的開銷,主要包括以下方面:

RocketMQ 內部重試涉及寫放大,每一次重試將生成新的重試消息,大量重試將帶來嚴重的 IO 壓力;

重試有復雜的退避邏輯,內部實現為梯度定時器,該定時器本身不具備高吞吐的特性,大量重試將導致重試消息無法及時出隊。重試的間隔將不穩定,將導致大量重試消息延后消費,即削峰的周期被大幅度延長。

3. 不要以重試替代限流

上述誤用的場景實際上是組合了限流和重試能力來進行削峰,RocketMQ 推薦的削峰最佳手段為組合限流和堆積,業務以保護自身為前提,需要對消費流量進行限流,并利用 RocketMQ 提供的堆積能力將超出業務當前處理的消息滯后消費,以達到削峰的目的。下圖中超過處理能力的消息都應該被堆積在服務端,而不是通過消費失敗進行重試。

6e695068-6a66-11ed-8abf-dac502259ad0.png

如果不想依賴額外的產品/組件來完成該功能,也可以利用一些本地工具類,比如 Guava 的 RateLimiter 來完成單機限流。

如下所示,聲明一個 50 QPS 的 RateLimiter,在消費前以阻塞的方式 acquire 一個令牌,獲取到即處理消息,未獲取到阻塞。


RateLimiter rateLimiter = RateLimiter.create(50);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // 設置訂閱組名稱
    .setConsumerGroup(consumerGroup)
    // 設置訂閱的過濾器
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
        // 阻塞直到獲得一個令牌,也可以配置一個超時時間
        rateLimiter.acquire();
        LOGGER.info("Consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();

4. PushConsumer 消費重試策略

PushConsumer 消費消息時,消息的幾個主要狀態如下:

6e843504-6a66-11ed-8abf-dac502259ad0.png

Ready:已就緒狀態。消息在消息隊列RocketMQ版服務端已就緒,可以被消費者消費;

Inflight:處理中狀態。消息被消費者客戶端獲取,處于消費中還未返回消費結果的狀態;

Commit:提交狀態。消費成功的狀態,消費者返回成功響應即可結束消息的狀態機;

DLQ:死信狀態

消費邏輯的最終兜底機制,若消息一直處理失敗并不斷進行重試,直到超過最大重試次數還未成功,此時消息不會再重試。

該消息會被投遞至死信隊列。您可以通過消費死信隊列的消息進行業務恢復。

最大重試次數

PushConsumer 的最大重試次數由創建時決定。 例如,最大重試次數為 3 次,則該消息最多可被投遞 4 次,1 次為原始消息,3 次為重試投遞次數。

重試間隔時間

無序消息(非順序消息):重試間隔為階梯時間,具體時間如下:

說明:若重試次數超過 16 次,后面每次重試間隔都為 2 小時。

6ea49f74-6a66-11ed-8abf-dac502259ad0.png

順序消息:重試間隔為固定時間,默認為 3 秒。

5. SimpleConsumer 消費重試策略

和 PushConsumer 消費重試策略不同,SimpleConsumer 消費者的重試間隔是預分配的,每次獲取消息消費者會在調用 API 時設置一個不可見時間參數 InvisibleDuration,即消息的最大處理時長。

若消息消費失敗觸發重試,不需要設置下一次重試的時間間隔,直接復用不可見時間參數的取值。 6ec44284-6a66-11ed-8abf-dac502259ad0.png

由于不可見時間為預分配的,可能和實際業務中的消息處理時間差別較大,可以通過 API 接口修改不可見時間。

例如,預設消息處理耗時最多 20 ms,但實際業務中 20 ms內消息處理不完,可以修改消息不可見時間,延長消息處理時間,避免消息觸發重試機制。 修改消息不可見時間需要滿足以下條件:

消息處理未超時

消息處理未提交消費狀態

如下圖所示,消息不可見時間修改后立即生效,即從調用 API 時刻開始,重新計算消息不可見時間。

6ee3f124-6a66-11ed-8abf-dac502259ad0.png

最大重試次數

與 PushConsumer 相同。

消息重試間隔

消息重試間隔 = 不可見時間 - 消息實際處理時長 例如:消息不可見時間為 30 ms,實際消息處理用了 10 ms 就返回失敗響應,則距下次消息重試還需要 20 ms,此時的消息重試間隔即為 20 ms;若直到 30 ms 消息還未處理完成且未返回結果,則消息超時,立即重試,此時重試間隔即為 0 ms。 SimpleConsumer 的消費重試間隔通過消息的不可見時間控制。


//消費示例:使用SimpleConsumer消費普通消息,主動獲取消息處理并提交。
ClientServiceProvider provider1 = ClientServiceProvider.loadService();
String topic1 = "Your Topic";
FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);


SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder()
        //設置消費者分組。
        .setConsumerGroup("Your ConsumerGroup")
        //設置接入點。
        .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
        //設置預綁定的訂閱關系。
        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
        .build();
List messageViewList = null;
try {
    //SimpleConsumer需要主動獲取消息,并處理。
    messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        //消費處理完成后,需要主動調用ACK提交消費結果。
        //沒有ack會被認為消費失敗
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            e.printStackTrace();
        }
    });
} catch (ClientException e) {
    //如果遇到系統流控等原因造成拉取失敗,需要重新發起獲取消息請求。
    e.printStackTrace();
}

修改消息的不可見時間

案例:某產品使用消息隊列來發送解耦“視頻渲染”的業務邏輯,發送方發送任務編號,消費方收到編號后處理任務。

由于消費方的業務邏輯耗時較長,消費者重新消費到同一個任務時,該任務未完成,只能返回消費失敗。

在這種全新的 API 下,用戶可以調用可以通過修改不可見時間給消息續期,實現對單條消息狀態的精確控制。


simpleConsumer.changeInvisibleDuration();
simpleConsumer.changeInvisibleDurationAsync();

6. 功能約束與最佳實踐

設置消費的最大超時時間和次數

盡快明確的向服務端返回成功或失敗,不要以超時(有時是異常拋出)代替消費失敗。

不要用重試機制來進行業務限流

錯誤示例:如果當前消費速度過高觸發限流,則返回消費失敗,等待下次重新消費。 正確示例:如果當前消費速度過高觸發限流,則延遲獲取消息,稍后再消費。

發送重試和消費重試會導致相同的消息重復消費,消費方應該有一個良好的冪等設計

正確示例:某系統中消費的邏輯是為某個用戶發送短信,該短信已經發送成功了,當消費者應用重復收到該消息,此時應該返回消費成功。

總結

Cloud Native

本文主要介紹重試的基本概念,生產者消費者收發消息時觸發重試的條件和具體行為,以及 RocketMQ 收發容錯的最佳實踐。

重試策略幫助我們從隨機的、短暫的瞬態故障中恢復,是在容忍錯誤時,提高可用性的一種強大機制。

但請謹記 “重試是對于分布式系統來說自私的”,因為客戶端認為其請求很重要,并要求服務端花費更多資源來處理,盲目的重試設計不可取,合理的使用重試可以幫助我們構建更加彈性且可靠的系統。






審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 狀態機
    +關注

    關注

    2

    文章

    492

    瀏覽量

    27571
  • QPS
    QPS
    +關注

    關注

    0

    文章

    24

    瀏覽量

    8813
  • API串口
    +關注

    關注

    0

    文章

    13

    瀏覽量

    4858
收藏 人收藏

    評論

    相關推薦

    RocketMQ入門手冊

    RocketMQ入門篇
    發表于 10-09 14:13

    Rocketmq怎么安裝

    Rocketmq 安裝步驟
    發表于 10-24 07:47

    HBase客戶端實踐重試機制

    現在,網易視頻云與大家分享HBase客戶端實踐重試機制。 在運維HBase的這段時間里,發現業務用戶一方面比較關注HBase本身服務的讀寫性能:吞吐量以及讀寫延遲,另一方面也會比較關注HBase
    發表于 10-10 10:15 ?0次下載
    HBase客戶端實踐<b class='flag-5'>重試機制</b>

    華為愿與美國政府共同探討和建立保障網絡安全的有效測試機制

    華為美國首席安全官Andy Purdy在11月12日的美國電視訪問公開表示,為了打消美國政府對于安全的顧慮,應該建立起一套保障網絡安全的有效測試機制。“我們很愿意和美國政府就如何實施這一測試機制進行探討。”Purdy稱。
    的頭像 發表于 11-13 14:09 ?2805次閱讀

    Apache RocketMQ MQTT協議架構模型

    rocketmq-mqtt.zip
    發表于 04-20 10:45 ?0次下載
    Apache <b class='flag-5'>RocketMQ</b> MQTT協議架構模型

    開源軟件-RocketMQ Externals Apache RocketMQ的擴展項目

    ./oschina_soft/rocketmq-externals.zip
    發表于 06-23 15:03 ?0次下載
    開源軟件-<b class='flag-5'>RocketMQ</b> Externals Apache <b class='flag-5'>RocketMQ</b>的擴展項目

    RISC-V CPU調試機制的設計原理

    本文將詳細介紹RISC-V CPU調試機制的設計原理。
    發表于 10-18 09:19 ?2284次閱讀

    何在RocketMQ合理使用重試機制構建彈性高可用系統

      RocketMQ 的生產者在發送消息到服務端時,可能會因為網絡問題,服務異常等原因導致調用失敗,這時候應該怎么辦?如何盡可能的保證消息不丟失呢?
    的頭像 發表于 11-23 10:18 ?793次閱讀

    聊聊RocketMQ的主從復制

    RocketMQ 主從復制是 RocketMQ 高可用機制之一,數據可以從主節點復制到一個或多個從節點。
    的頭像 發表于 07-04 09:42 ?636次閱讀
    聊聊<b class='flag-5'>RocketMQ</b>的主從復制

    RocketMQ和RabbitMQ的區別

    化:RocketMQ將消息存儲在磁盤上,保證消息的可靠性;RabbitMQ默認將消息保存在內存,可以通過插件進行持久化。 可用性:RocketMQ具備分布
    的頭像 發表于 07-24 13:39 ?1.4w次閱讀

    Python 在什么情況下才進行重試

    如何寫得優雅、易用,是我們要考慮的問題。 這里要給大家介紹的是一個第三方庫 - Tenacity (標題中的重試機制并并不準確,它不是 Python 的內置模塊,因此并不能稱之為機制),它實現了幾乎我們可以使用到的所有重試場景,
    的頭像 發表于 10-21 11:18 ?370次閱讀

    Tenacity重試模塊實踐

    為了避免由于一些網絡或等其他不可控因素,而引起的功能性問題。比如在發送請求時,會因為網絡不穩定,往往會有請求超時的問題。 這種情況下,我們通常會在代碼中加入重試的代碼。重試的代碼本身不難實現,但
    的頭像 發表于 11-02 11:33 ?419次閱讀

    Pythonretrying庫的有參數重試

    有參數重試 (1) stop_max_attempt_number 在retry傳入stop_max_attempt_number參數后可以指定失敗重試的次數 @retry
    的頭像 發表于 11-14 11:08 ?818次閱讀
    Python<b class='flag-5'>中</b>retrying庫的有參數<b class='flag-5'>重試</b>

    磁盤RocketMQ構建的索引結構

    RocketMQ 廣泛使用于各類業務場景,在實際生產場景,用戶通常會選擇消息 ID 或者特定的業務 Key(例如學號,訂單號)來查詢和定位特定的一批消息,進而定位分布式系統的復雜
    的頭像 發表于 12-22 10:43 ?418次閱讀
    磁盤<b class='flag-5'>中</b><b class='flag-5'>RocketMQ</b>構建的索引結構

    RocketMQ協議是什么?RocketMQ協議特點

    RocketMQ是由阿里巴巴開發的開源分布式消息和流處理平臺。它提供可靠、可擴展和高性能的消息傳輸和實時處理解決方案。 RocketMQ使用一種名為RocketMQ協議的通信協議。該協議旨在促進
    的頭像 發表于 01-03 16:11 ?836次閱讀
    主站蜘蛛池模板: 亚洲一区二区三区精品视频| 欧美成人一区亚洲一区| 手机看片91| 免费大片av手机看片| 久久夜夜操| 亚洲激情四射| 亚洲美女黄视频| 中文一级黄色片| 午夜三级a三点| 日韩美女三级| 久久久综合视频| 成年女人毛片免费观看97| 伊人久久影视| 久久午夜影院| 黄频网| 99精品在线| 色六月丁香| 黄色美女网站在线观看| 综合色影院| 精品久草| 丝袜美腿视频一区二区三区| 一级毛片不收费| 奇米狠狠干| www4虎| 国产日日干| 男女交性永久免费视频播放| 五月伊人婷婷| 美女骚网站| 婷婷日日夜夜| 欧美五月激情| 一级做a爰片久久毛片毛片| 日本动漫天堂| 7777色鬼xxxx欧美色夫| 加勒比一到三区| 欧美a欧美| 三级在线观看网站| 大色综合| 国产伦精品一区二区三区女| 末发育女一区二区三区| 日本三级人妇| 最近在线视频免费观看2019|