在线观看www成人影院-在线观看www日本免费网站-在线观看www视频-在线观看操-欧美18在线-欧美1级

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
电子发烧友
开通电子发烧友VIP会员 尊享10大特权
海量资料免费下载
精品直播免费看
优质内容免费畅学
课程9折专享价
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

關于Spark的從0實現30s內實時監控指標計算

佳佳 ? 來源:jf_36786605 ? 作者:jf_36786605 ? 2024-06-14 15:52 ? 次閱讀

前言

說起Spark,大家就會自然而然地想到Flink,而且會不自覺地將這兩種主流的大數據實時處理技術進行比較。然后最終得出結論:Flink實時性大于Spark。

的確,Flink中的數據計算是以事件為驅動的,所以來一條數據就會觸發一次計算,而Spark基于數據集RDD計算,RDD最小生成間隔就是50毫秒,所以Spark就被定義為亞實時計算。

窗口Window

這里的RDD就是“天然的窗口”,將RDD生成的時間間隔設置成1min,那么這個RDD就可以理解為“1min窗口”。所以如果想要窗口計算,首選Spark。

但當需要對即臨近時間窗口進行計算時,必須借助滑動窗口的算子來實現。

臨近時間如何理解

例如“3分鐘內”這種時間范圍描述。這種時間范圍的計算,需要計算歷史的數據。例如1 ~ 3是3min,2 ~ 4也是3min,這里就重復使用了2和3的數據,依次類推,3 ~ 5也是3min,同樣也重復使用了3和4。

如果使用普通窗口,就無法滿足“最近3分鐘內”這種時間概念。

很多窗口都丟失了臨近時間,例如第3個RDD的臨近時間其實是第二個RDD,但是他們就沒法在一起計算,這就是為什么不用普通窗口的原因。

滑動窗口

滑動窗口三要素:RDD的生成時間、窗口的長度、滑動的步長。

我在本次實踐中,將RDD的時間間隔設置為10s,窗口長度為30s、滑動步長為10s。也就是說每10s就會生成一個窗口,計算最近30s內的數據,每個窗口由3個RDD組成。

數據源構建

1. 數據規范

假設我們采集了設備的指標信息,這里我們只關注吞吐量和響應時間,在采集之前定義數據字段和規范[throughput, response_time],這里都定義成int類型,響應時間單位這里定義成毫秒ms。

實際情況中,我們不可能只采集一臺設備,如果我們想要得出每臺或者每個種類設備的指標監控,就要在采集數據的時候對每個設備加上唯一ID或者TypeID。

我這里的想法是對每臺設備的指標進行分析,所以我給每個設備都增加了一個唯一ID,最終字段[id, throughput, response_time],所以我們就按照這個數據格式,在SparkStreaming中構建數據源讀取部分。

2. 讀取kafka

代碼語言:scala

復制

val conf = new SparkConf().setAppName("aqi").setMaster("local[1]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "121.91.168.193:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "aqi",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

val topics = Array("evt_monitor")
val stream: DStream[String] = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
).map(_.value)

這里我們將一個RDD時間間隔設置為10S,因為使用的是筆記本跑,所以這里要將Master設置為local,表示本地運行模式,1代表使用1個線程。

我們使用Kafka作為數據源,在讀取時就要構建Consumer的config,像bootstrap.servers這些基本配置沒有什么好說的,關鍵的是auto.offset.reset和enable.auto.commit,

這兩個參數分表控制讀取topic消費策略和是否提交offset。這里的earliest會從topic中現存最早的數據開始消費,latest是最新的位置開始消費。

當重啟程序時,這兩種消費模式又被enable.auto.commit控制,設置true提交offset時,earliest和latest不再生效,都是從消費組記錄的offset進行消費。設置為false不提交offset,offset不被提交記錄earliest還是從topic中現存最早的數據開始消費,latest還是從最新的數據消費。

最后就是設置要讀取的topic和創建Kafka的DStream數據流。至此,整個數據源的讀取就已經完成了,下面就是對數據處理邏輯的開發。

3. 指標聚合計算

代碼語言:scala

復制

stream.map(x => {
      val s = x.split(",")
      (s(0), (s(2).toInt, 1))
    }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
      .reduceByKeyAndWindow((x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2), Seconds(30), Seconds(10))
    .foreachRDD(rdd => {
      rdd.foreach(x => {
        val id = x._1
        val responseTimes = x._2._1
        val num = x._2._2
        val responseTime_avg = responseTimes / num
        println(id, responseTime_avg)
      })
    })

我們從自身需求出發,來構思程序邏輯的開發。從需求看,關鍵字無非是最近一段時間內、平均值。想要取一段時間內的數據,就要使用滑動窗口,以當前時間為基準,向前圈定時間范圍。

而平均值,無非就是將時間范圍內,即窗口所有的響應時間加起來,然后除以數據條數即可。想要把所有的響應時間加起來,這里使用reduceByKey() 將窗口內相同ID的設備時間相加,將數據條數進行相加。

所以我在第一步切分數據的時候,就將數據切分成KV的元組形式,V有兩個字段,第一個是響應時間,第二個1表示一條數據。reduceByKey一共分為兩步,第一是RDD內的reduceByKey,這也算是數據的預處理,RDD的數據只會計算一次,當這個RDD被多個窗口使用,就不會重復計算了。第二步是基于窗口的reduceByKey,將窗口所有RDD的數據再一次聚合,最后在foreachRDD中獲取輸出

4. 驗證結果

我們向kafka的evt_monitor這個topic中寫入數據。

備注:(最后11那個id是終端顯示問題,其實是1),然后可以輸出平均值。

驗證結果是沒有問題的,換個角度,我們也可以從DAG來看。

這個窗口一共計算了3個RDD,其中左側的兩個是灰色的,上面是skipped標識,代表著這兩個RDD在上一個窗口已經計算完成了,在這個窗口只需要計算當前的RDD,然后再一起對RDD的結果數據進行窗口計算。

結語

本篇文章主要是利用Spark的滑動窗口,做了一個計算平均響應時長的應用場景,以Kafka作為數據源、通過滑動窗口和reduceByKey算子得以實現。同時,開發Spark還是強烈推薦scala,整個程序看起來沒有任何多余的部分。

最后對于Spark和Flink的選型看法,Spark的確是在實時性上比Flink差一些,但是Spark對于窗口計算還是有優勢的。所以對于每種技術,也不用人云亦云,適合自己的才是最好的。

審核編輯 黃宇

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • RDD
    RDD
    +關注

    關注

    0

    文章

    7

    瀏覽量

    8050
  • 實時監控
    +關注

    關注

    1

    文章

    102

    瀏覽量

    13857
  • SPARK
    +關注

    關注

    1

    文章

    105

    瀏覽量

    20271
收藏 0人收藏

    評論

    相關推薦

    自媒體推廣實時監控服務器帶寬到用戶行為解決方法

    自媒體推廣的實時監控需要從底層基礎設施到前端用戶行為進行全鏈路覆蓋,確保推廣活動的穩定性和效果可追蹤。以下是系統性解決方案,主機推薦小編為您整理發布自媒體推廣實時監控
    的頭像 發表于 04-09 10:47 ?137次閱讀

    邊緣計算網關的實時監控與預測性維護都有哪些方面?適合哪些行業使用?

    邊緣計算網關的實時監控與預測性維護都有哪些方面?適合哪些行業使用? 有實施過得案例的介紹嗎? 深控技術的不需要點表的邊緣計算網關如何?
    發表于 04-01 09:44

    HarmonyOS NEXT 原生應用/元服務-DevEco Profiler性能問題定界實時監控

    不同的圖像形式(直方圖、柱狀圖、折線圖等)來更加清晰的展示某一項資源在一段時間范圍的變化趨勢,以幫助您快速判斷性能熱點區域。 整個實時監控頁面從上到下,依次展示了系統事件、異常事件、前臺應用、CPU
    發表于 02-21 14:35

    HarmonyOS NEXT 原生應用/元服務-DevEco Profiler性能問題定界實時監控

    不同的圖像形式(直方圖、柱狀圖、折線圖等)來更加清晰的展示某一項資源在一段時間范圍的變化趨勢,以幫助您快速判斷性能熱點區域。 整個實時監控頁面從上到下,依次展示了系統事件、異常事件、前臺應用、CPU
    發表于 02-20 10:14

    輸電線路防外破防異物實時監控預警裝置|場景模型真實還原|測距誤差在0.25米

    輸電線路防外破防異物實時監控預警裝置|場景模型真實還原|測距誤差在0.25米 輸電線路防外破防異物實時監控預警裝置針對輸電線路通道走廊
    的頭像 發表于 01-21 09:48 ?236次閱讀

    spark為什么比mapreduce快?

    spark為什么比mapreduce快? 首先澄清幾個誤區: 1:兩者都是基于內存計算的,任何計算框架都肯定是基于內存的,所以網上說的spark是基于內存
    的頭像 發表于 09-06 09:45 ?404次閱讀

    監控系統原理揭秘-數據運算篇

    、數據計算、數據存儲、數據可視化及監控預警等功能。本文主要介紹數據計算部分。 二、實時計算 流數據實時計算是一種處理和分析
    的頭像 發表于 08-06 10:30 ?947次閱讀
    <b class='flag-5'>監控</b>系統原理揭秘-數據運算篇

    30元如何實現車輛防后撞

    HLK-LD2451是海凌科最新推出的一款專用于檢測車輛狀態的24G雷達模塊,以不到30元的價格,實現實時感知周圍100米范圍的車輛靠近與遠離。
    的頭像 發表于 07-29 09:43 ?1342次閱讀
    <b class='flag-5'>30</b>元如何<b class='flag-5'>實現</b>車輛防后撞

    spark運行的基本流程

    記錄和分享下spark運行的基本流程。 一、spark的基礎組件及其概念 1. ClusterManager 在Standalone模式中即為Master,控制整個集群,監控Worker。在YARN
    的頭像 發表于 07-02 10:31 ?586次閱讀
    <b class='flag-5'>spark</b>運行的基本流程

    Spark基于DPU的Native引擎算子卸載方案

    1.背景介紹 Apache Spark(以下簡稱Spark)是一個開源的分布式計算框架,由UC Berkeley AMP Lab開發,可用于批處理、交互式查詢(Spark SQL)、
    的頭像 發表于 06-28 17:12 ?888次閱讀
    <b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸載方案

    上位監控程序如何實現

    上位監控程序是一種用于實時監控和管理工業自動化、物聯網設備、網絡設備等的軟件系統。本文將詳細介紹上位監控程序的設計和實現,包括需求分析、系統
    的頭像 發表于 06-07 09:12 ?771次閱讀

    NB05-01啟動后查找有信號,持續30s后就再也沒有信號了是怎么回事?

    NB05-01 啟動后查找有信號,持續30s后就再也沒有信號
    發表于 06-04 06:50

    計算原理分類——數字存計算與模擬存計算

    數字存計算與模擬存計算各有優劣,都是存算一體發展進程中的重點發展路徑,數字存計算由于其高速
    的頭像 發表于 05-21 16:26 ?3419次閱讀
    存<b class='flag-5'>內</b><b class='flag-5'>計算</b>原理分類——數字存<b class='flag-5'>內</b><b class='flag-5'>計算</b>與模擬存<b class='flag-5'>內</b><b class='flag-5'>計算</b>

    MRAM的演進看存計算的發展

    : (1)基于存計算架構,可高效地實現神經網絡語音激活檢測和上百條語音命令詞識別。 (2)以超低功耗實現神經網絡環境降噪算法、健康監測與分析算法。 (3)典型應用場景下,工
    的頭像 發表于 05-17 14:25 ?1684次閱讀
    <b class='flag-5'>從</b>MRAM的演進看存<b class='flag-5'>內</b><b class='flag-5'>計算</b>的發展

    淺談存計算生態環境搭建以及軟件開發

    和靈活的平臺。這種環境的核心優勢在于其能夠提供極高的數據處理速度和效率,使得數據可以直接在內存中被快速訪問和處理,這對于需要實時數據處理和分析的應用來說至關重要。 在了解存計算開發環境的核心優勢和作用
    發表于 05-16 16:40

    電子發燒友

    中國電子工程師最喜歡的網站

    • 2931785位工程師會員交流學習
    • 獲取您個性化的科技前沿技術信息
    • 參加活動獲取豐厚的禮品
    主站蜘蛛池模板: 伊人小婷婷色香综合缴缴情 | 天天爽夜夜春 | 国产一区二区在线视频播放 | 欧美一级色 | 天天综合色天天综合色sb | 涩涩色中文综合亚洲 | 欧美一区二区三区男人的天堂 | 天天综合天天做天天综合 | 色欲麻豆国产福利精品 | 好爽毛片一区二区三区四区 | 让她爽的喷水叫爽乱 | 最刺激黄a大片免费观看下截 | 日本免费不卡视频一区二区三区 | 色老头影院 | 日韩亚洲人成在线综合 | 午夜性爽视频男人的天堂在线 | 最新毛片网 | 亚洲乱强| 日本大黄在线观看 | 三级视频网站在线观看 | 99精品国产高清自在线看超 | 淫性视频 | 乱色伦肉小说 | 丁香啪啪天堂激情婷婷 | 成年ssswww中国女人 | 99久久99久久精品国产 | 国产好深好硬好爽我还要视频 | 亚洲成年人影院 | 成人黄色网址 | 好爽~~~~嗯~~~再快点明星 | 亚洲三级网址 | 666精品国产精品亚洲 | 国产精品毛片久久久久久久 | 热九九精品 | 来啊mm影院亚洲mm影院 | 特黄特a级特别特级特毛片 特黄特色大片免费播放路01 | 国产ar高清视频+视频 | 五月婷婷激情五月 | 亚洲五月激情 | 天天伊人网 | 久久51|