作者:米哈游大數據開發
近年來,容器、微服務、Kubernetes 等各項云原生技術的日漸成熟,越來越多的公司開始選擇擁抱云原生,并開始將 AI、大數據等類型的企業應用部署運行在云原生之上。以 Spark 為例,在云上運行 Spark 可以充分享有公共云的彈性資源、運維管控和存儲服務等,并且業界也涌現了不少 Spark on Kubernetes 的優秀實踐。
在剛剛結束的 2023 云棲大會上,米哈游數據平臺組大數據技術專家杜安明分享了米哈游大數據架構向云原生化升級過程中的目標、探索和實踐,以及如何通過以阿里云容器服務 ACK 為底座的 Spark on K8s 架構,獲得在彈性計算、成本節約以及存算分離方面的價值。
背景簡介
隨著米哈游業務的高速發展,大數據離線數據存儲量和計算任務量增長迅速,早期的大數據離線架構已不再滿足新場景和需求。
為了解決原有架構缺乏彈性、運維復雜、資源利用率低等問題,2022 年下半年,我們著手調研將大數據基礎架構云原生化,并最終在阿里云上落地了 Spark on K8s + OSS-HDFS 方案,目前在生產環境上已穩定運行了一年左右的時間,并獲得了彈性計算、成本節約以及存算分離這三大收益。
1. 彈性計算
由于游戲業務會進行周期版本更新、開啟活動以及新游戲的上線等,對離線計算資源的需求與消耗波動巨大,可能是平時水位的幾十上百倍。利用 K8s 集群天然的彈性能力,將 Spark 計算任務調度到 K8s 上運行,可以比較輕松的解決這類場景下資源消耗洪峰問題。
2. 成本節約
依托阿里云容器服務 Kubernetes 版 ACK 集群自身強大的彈性能力,所有計算資源按量申請、用完釋放,再加上我們對 Spark 組件的定制改造,以及充分利用 ECI Spot 實例,在承載同等計算任務和資源消耗下,成本節約達 50%。
3. 存算分離
Spark 運行在 K8s 之上,完全使用 K8s 集群的計算資源,而訪問的則數據也由 HDFS、OSS 逐步切換到 OSS-HDFS 上,中間 Shuffle 數據的讀寫采用 Celeborn,整套架構實現了計算和存儲的解耦,易于維護和擴展。
Spark on K8s架構演進
眾所周知,Spark 引擎可以支持并運行在多種資源管理器之上,比如 Yarn、K8s、Mesos 等。在大數據場景下,目前國內大多公司的 Spark 任務還是運行在 Yarn 集群之上的,Spark 在 2.3 版本首次支持 K8s,并于 2021 年 3 月發布的 Spark3.1 版本才正式 GA。
相較于 Yarn,Spark 在 K8s 上起步較晚,盡管在成熟度、穩定性等方面還存在一定的欠缺,但是 Spark on K8s 能夠實現彈性計算以及成本節約等非常突出的收益,所以各大公司也都在不斷進行嘗試和探索,在此過程中,Spark on K8s 的運行架構也在不斷的向前迭代演進。
1. 在離線混部
目前,將 Spark 任務運行在 K8s 上,大多公司采用的方案依舊是在線與離線混合部署的方式。架構設計依據的原理是,不同的業務系統會有不同的業務高峰時間。大數據離線業務系統典型任務高峰期間會是凌晨的0點到 9 點鐘,而像是各種應用微服務、Web 提供的 BI 系統等,常見的業務高峰期是白天時間,在這個時間以外的其它時間中,可以將業務系統的機器 Node 加入到 Spark 所使用的 K8s NameSpace 中。如下圖所示,將 Spark 與其他在線應用服務等都部署在一套 K8s 集群之上。
該架構的優點是可以通過在離線業務的混合部署和錯峰運行,來提升機器資源利用率并降低成本,但是缺點也比較明顯,即架構實施起來復雜,維護成本比較高,而且難以做到嚴格的資源隔離,尤其是網絡層面的隔離,業務之間不可避免的會產生一定的相互影響,此外,我們認為該方式也不符合云原生的理念和未來發展趨勢。
2. SparkonK8s+OSS-HDFS
考慮到在離線混合部署的弊端,我們設計采用了一種新的、也更加符合云原生的實現架構:底層存儲采用 OSS-HDFS (JindoFs),計算集群采用阿里云的容器服務 ACK,Spark 選擇功能相對豐富且比較穩定的 3.2.3 版本。
OSS-HDFS 完全兼容了 HDFS 協議,除了具備 OSS 無限容量、支持數據冷熱存儲等優點以外,還支持了目錄原子性、毫秒級 rename 操作,非常適用于離線數倉,可以很好的平替現有 HDFS 和 OSS。
阿里云 ACK 集群提供了高性能、可伸縮的容器應用管理服務,可以支持企業級 Kubernetes 容器化應用的生命周期管理,ECS 是大家所熟知的阿里云服務器,而彈性容器實例 ECI 是一種 Serverless 容器運行服務,可以按量秒級申請與釋放。
該架構簡單易維護,底層利用 ECI 的彈性能力,Spark 任務可以較為輕松的應對高峰流量,將 Spark 的 Executor 調度在 ECI 節點上運行,可最大程度的實現計算任務彈性與最佳的降本效果,整體架構的示意圖如下所示。
云原生架構設計與實現
1. 基本原理
在闡述具體實現之前,先簡要介紹一下 Spark 在 K8s 上運行的基本原理。Pod 在 K8s 中是最小的調度單元,Spark 任務的 Driver 和 Executor 都是一個單獨 Pod,每個 Pod 都分配了唯一的 IP 地址,Pod 可以包含一個或多個 Container,無論是 Driver 還是 Executor 的 JVM 進程,都是在 Container 中進行啟動、運行與銷毀的。
一個 Spark 任務被提交到 K8s 集群之后,首先啟動的是 Driver Pod,而后 Driver 會向 Apiserver 按需申請 Executor,并由 Executor 去執行具體的 Task,作業完成之后由 Driver 負責清理所有的 Executor Pod,以下是這幾者關系的簡要示意圖。
2. 執行流程
下圖展示了完整的作業執行流程,用戶在完成 Spark 作業開發后,會將任務發布到調度系統上并進行相關運行參數的配置,調度系統定時將任務提交到自研的 Launcher 中間件,并由中間件來調用 spark-k8s-cli,最終由 Cli 將任務提交至 K8s 集群上。任務提交成功之后,Spark Driver Pod 最先啟動,并向集群申請分配 Executor Pod,Executor 在運行具體的 Task 時,會與外部 Hive、Iceberg、OLAP 數據庫、OSS-HDFS 等諸多大數據組件進行數據的訪問與交互,而 Spark Executor 之間的數據 Shuffle 則由 CeleBorn 來實現。
3. 任務提交
關于如何將 Spark 任務提交到 K8s 集群上,各個公司的做法不盡相同,下面先簡要描述下目前比較常規的做法,然后再介紹目前我們線上所使用的任務提交和管理方式。
3.1 使用原生 spark-submit
通過 spark-submit 命令直接提交,Spark 原生就支持這種方式,集成起來比較簡單,也符合用戶的習慣,但是不方便進行作業狀態跟蹤和管理,無法自動配置 Spark UI 的 Service 和 Ingress,任務結束后也無法自動清理資源等,在生產環境中并不適合。
3.2 使用 spark-on-k8s-operator
這是目前較常用的一種提交作業方式,K8s 集群需要事先安裝 spark-operator,客戶端通過 kubectl 提交 yaml 文件來運行 Spark 作業。本質上這是對原生方式的擴展,最終提交作業依然是使用 spark-submit 方式,擴展的功能包括:作業管理,Service/Ingress 創建與清理,任務監控,Pod 增強等。此種方式可在生產環境中使用,但與大數據調度平臺集成性不太好,對于不熟悉 K8s 的用戶來說,使用起來復雜度和上手門檻相對較高。
3.3 使用 spark-k8s-cli
在生產環境上,我們采用 spark-k8s-cli 的方式進行任務的提交。spark-k8s-cli 本質上是一個可執行的文件,基于阿里云 emr-spark-ack 提交工具我們進行了重構、功能增強和深度的定制。
spark-k8s-cli 融合 spark-submit 和 spark-operator 兩種作業提交方式的優點,使得所有作業都能通過 spark-operator 管理,支持運行交互式 spark-shell 和本地依賴的提交,并且在使用方式上與原生 spark-submit 語法完全一致。
在上線使用初期,我們所有任務的 Spark Submit JVM 進程都啟動在 Gateway Pod 中,在使用一段時間后,發現該方式穩定性不足,一旦 Gateway Pod 異常,其上的所有正在 Spark 任務都將失敗,另外 Spark 任務的日志輸出也不好管理。鑒于此種情況,我們將 spark-k8s-cli 改成了每個任務使用單獨一個 Submit Pod 的方式,由 Submit Pod 來申請啟動任務的 Driver,Submit Pod 和 Driver Pod 一樣都運行在固定的 ECS 節點之上,Submit Pod 之間完全獨立,任務結束后 Submit Pod 也會自動釋放。spark-k8s-cli 的提交和運行原理如下圖所示。
關于 spark-k8s-cli,除了上述基本的任務提交以外,我們還做了其他一些增強和定制化的功能。
支持提交任務到同地域多個不同的 K8s 集群上,實現集群之間的負載均衡和故障轉移切換
實現類似 Yarn 資源不足時的自動排隊等待功能(K8s 如果設置了資源 Quota,當 Quota 達到上限后,任務會直接失敗)
增加與 K8s 網絡通信等異常處理、創建或啟動失敗重試等,對偶發的集群抖動、網絡異常進行容錯
支持按照不同部門或業務線,對大規模補數任務進行限流和管控功能
內嵌任務提交失敗、容器創建或啟動失敗以及運行超時等告警功能
4. 日志采集與展示
K8s 集群本身并沒有像 Yarn 那樣提供日志自動聚合和展示的功能,Driver 和 Executor 的日志收集需要用戶自己來完成。目前比較常見的方案是在各個 K8s Node 上部署 Agent,通過 Agent 把日志采集并落在第三方存儲上,比如 ES、SLS 等,但這些方式對于習慣了在 Yarn 頁面上點擊查看日志的用戶和開發者來說,使用起來很不方便,用戶不得不跳轉到第三方系統上撈取查看日志。
為實現 K8s Spark 任務日志的便捷查看,我們對 Spark 代碼進行了改造,使 Driver 和 Executor 日志最終都輸出到 OSS 上,用戶可以在 Spark UI 和 Spark Jobhistory 上,直接點擊查看日志文件。
上圖所示為日志的收集和展示原理,Spark 任務在啟動時,Driver 和 Executor 都會首先注冊一個 Shutdown Hook,當任務結束 JVM 退出時,調用 Hook 方法把完整的日志上傳到 OSS 上。此外,想要完整查看日志,還需要對 Spark 的 Job History 相關代碼做下改造,需要在 History 頁面顯示 stdout 和 stderr,并在點擊日志時,從 OSS 上拉取對應 Driver 或 Executor 的日志文件,最終由瀏覽器渲染查看。另外,對于正在運行中的任務,我們會提供一個 Spark Running Web UI 給用戶,任務提交成功后,spark-operator 會自動生成的 Service 和 Ingress 供用戶查看運行詳情,此時日志的獲取通過訪問 K8s 的 api 拉取對應 Pod 的運行日志即可。
5. 彈性與降本
基于 ACK 集群提供的彈性伸縮能力,再加上對 ECI 的充分利用,同等規模量級下的 Spark 任務,運行在 K8s 的總成本要明顯低于在 Yarn 固定集群上,同時也大大提高了資源利用率。
彈性容器實例 ECI 是一種 Serverless 容器運行服務,ECI 和 ECS 最大的不同就在于 ECI 是按量秒級計費的,申請與釋放速度也是秒級的,所以 ECI 很適合 Spark 這一類負載峰谷明顯的計算場景。
上圖示意了 Spark 任務在 ACK 集群上如何申請和使用 ECI,使用前提是在集群中安裝 ack-virtual-node 組件,并配置好 Vswitch 等信息,在任務運行時,Executor 被調度到虛擬節點上,并由虛擬節點申請創建和管理 ECI。
ECI 分為普通實例和搶占式實例,搶占式實例是一種低成本競價型實例,默認有 1 小時的保護期,適用于大部分 Spark 批處理場景,超出保護期后,搶占式實例可能被強制回收。為進一步提升降本效果,充分利用搶占式實例的價格優勢,我們對 Spark 進行改造,實現了 ECI 實例類型自動轉換的功能。Spark 任務的 Executor Pod 都優先運行在搶占式 ECI 實例上,當發生庫存不足或其他原因無法申請創建搶占式實例,則自動切換為使用普通 ECI 實例,保證任務的正常運行。具體實現原理和轉換邏輯如下圖所示。
6. Celeborn
由于 K8s 節點的磁盤容量很小,而且節點都是用時申請、用完釋放的,無法保存大量的 Spark Shuffle 數據。如果對 Executor Pod 掛載云盤,掛載盤的大小難以確定,考慮到數據傾斜等因素,磁盤的使用率也會比較低,使用起來比較復雜。此外,雖然 Spark 社區在 3.2 提供了 Reuse PVC 等功能,但是調研下來覺得功能尚不完備且穩定性不足。
為解決 Spark 在 K8s 上數據 Shuffle 的問題,在充分調研和對比多家開源產品后,最終采用了阿里開源的 Celeborn 方案。Celeborn 是一個獨立的服務,專門用于保存 Spark 的中間 Shuffle 數據,讓 Executor 不再依賴本地盤,該服務 K8s 和 Yarn 均可以使用。Celeborn 采用了 Push Shuffle 的模式,Shuffle 過程為追加寫、順序讀,提升數據讀寫性能和效率。
基于開源的 Celeborn 項目,我們內部也做了一些數據網絡傳輸方面的功能增強、Metrics 豐富、監控告警完善、Bug 修復等工作,目前已形成了內部穩定版本。
7. KyuubionK8s
Kyuubi 是一個分布式和多租戶的網關,可以為 Spark、Flink 或 Trino 等提供 SQL 等查詢服務。在早期,我們的 Spark Adhoc 查詢是發送到 Kyuubi 上執行的。為了解決 Yarn 隊列資源不足,用戶的查詢 SQL 無法提交和運行的問題,在 K8s 上我們也支持了 Kyuubi Server 的部署運行,當 Yarn 資源不足時,Spark 查詢自動切換到 K8s 上運行。鑒于 Yarn 集群規模逐漸縮減,查詢資源無法保證,以及保障相同的用戶查詢體驗,目前我們已將所有的 SparkSQL Adhoc 查詢提交到 K8s 上執行。
為了讓用戶的 Adhoc 查詢也能在 K8s 上暢快運行,我們對 Kyuubi 也做了一些源碼改造,包括對 Kyuubi 項目中 docker-image-tool.sh、Deployment.yaml、Dockfile 文件的改寫,重定向 Log 到 OSS 上,Spark Operator 管理支持、權限控制、便捷查看任務運行 UI 等。
8. K8sManager
在 Spark on K8s 場景下,盡管 K8s 有集群層面的監控告警,但是還不能完全滿足我們的需求。在生產環境中,我們更加關注的是在集群上的 Spark 任務、Pod 狀態、資源消耗以及 ECI 等運行情況。利用 K8s 的 Watch 機制,我們實現了自己的監控告警服務 K8s Manager,下圖所示為該服務的示意圖。
K8sManager 是內部實現的一個比較輕量的 Spring Boot 服務,實現的功能就是對各個 K8s 集群上的 Pod、Quota、Service、ConfigMap、Ingress、Role 等各類資源信息監聽和匯總處理,從而生成自定義的 Metrics 指標,并對指標進行展示和異常告警,其中包括集群 CPU 與 Memory 總使用量、當前運行的 Spark 任務數、Spark 任務內存資源消耗與運行時長 Top 統計、單日 Spark 任務量匯總、集群 Pod 總數、Pod 狀態統計、ECI 機器型號與可用區分布統計、過期資源監控等等,這里就不一一列舉了。
9. 其他工作
9.1 調度任務自動切換
在我們的調度系統中,Spark 任務支持配置 Yarn、K8s、Auto 三種執行策略。如果用戶任務指明了需要運行使用的資源管理器,則任務只會在 Yarn 或 K8s 上運行,若用戶選擇了 Auto,則任務具體在哪里執行,取決于當前 Yarn 隊列的資源使用率,如下圖所示。由于總任務量較大,且 Hive 任務也在不斷遷移至 Spark,目前仍然有部分任務運行在 Yarn 集群上,但最終的形態所有任務將由 K8s 來托管。
9.2 多可用區、多交換機支持
Spark 任務運行過程中大量使用 ECI,ECI 創建成功有兩個前提條件: 1、能夠申請到 IP 地址;2、當前可用區有庫存。實際上,單個交換機提供的可用 IP 數量有限,單個可用區擁有的搶占式實例的總個數也是有限的,因此在實際生產環境中,無論是使用普通 ECI 還是 Spot 類型的 ECI,比較好的實踐方式是配置支持多可用區、多交換機。
9.3 成本計算
由于在 Spark 任務提交時,都已明確指定了每個 Executor 的 Cpu、Memory 等型號信息,在任務結束 SparkContxt 關閉之前,我們可以從任務的中拿到每個 Executor 的實際運行時長,再結合單價,即可計算出 Spark 任務的大致花費。由于 ECI Spot 實例是隨著市場和庫存量隨時變動的,該方式計算出來的單任務成本是一個上限值,主要用于反映趨勢。
9.4 優化 SparkOperator
在上線初期任務量較少時,Spark Operator 服務運行良好,但隨著任務不斷增多,Operator 處理各類 Event 事件的速度越來越慢,甚至集群出現大量的 ConfigMap、Ingress、Service 等任務運行過程中產生的資源無法及時清理導致堆積的情況,新提交 Spark 任務的 Web UI 也無法打開訪問。發現問題后,我們調整了 Operator 的協程數量,并實現對 Pod Event 的批量處理、無關事件的過濾、TTL 刪除等功能,解決了 Spark Operator 性能不足的問題。
9.5 升級 SparkK8sClient
Spark3.2.2 采用 fabric8 (Kubernetes Java Client) 來訪問和操作 K8s 集群中的資源,默認客戶端版本為 5.4.1,在此版本中,當任務結束 Executor 集中釋放時,Driver 會大量發送 Delete Pod 的 Api 請求到 K8s Apiserver 上,對集群 Apiserver 和 ETCD 造成較大的壓力,Apiserver 的 cpu 會瞬間飆高。
目前我們的內部 Spark 版本,已將 kubernetes-client 升級到 6.2.0,支持 pod 的批量刪除,解決 Spark 任務集中釋放時,由大量的刪除 Api 請求操作的集群抖動。
問題與解決方案
在整個 Spark on K8s 的方案設計以及實施過程中,我們也遇到了各種各樣的問題、瓶頸和挑戰,這里做下簡單的介紹,并給出我們的解決方案。
1.彈性網卡釋放慢
彈性網卡釋放速度慢的問題,屬于 ECI 大規模應用場景下的性能瓶頸,該問題會導致交換機上 IP 的劇烈消耗,最終導致 Spark 任務卡住或提交失敗,具體觸發原因如下圖所示。目前阿里云團隊已通過技術升級改造解決,并大幅提升了釋放速度和整體性能。
2.Watcher 失效
Spark 任務在啟動 Driver 時,會創建對 Executor 的事件監聽器,用于實時獲取所有 Executor 的運行狀態,對于一些長時運行的 Spark 任務,這個監聽器往往會由于資源過期、網絡異常等情況而失效,因此在此情況下,需要對 Watcher 進行重置,否則任務可能會跑飛。該問題屬于 Spark 的一個 Bug,當前我們內部版本已修復,并將 PR 提供到了 Spark 社區。
3.任務卡死
如上圖所示,Driver 通過 List 和 Watch 兩種方式來獲取 Executor 的運行狀況。Watch 采用被動監聽機制,但是由于網絡等問題可能會發生事件漏接收或漏處理,但這種概率比較低。List 采用主動請求的方式,比如每隔 3 分鐘,Driver 可向 Apiserver 請求一次自己任務當前全量 Executor 的信息。
由于 List 請求任務所有 Pod 信息,當任務較多時,頻繁 List 對 K8s 的 Apiserver 和 ETCD 造成較大壓力,早期我們關閉了定時 List,只使用 Watch。當 Spark 任務運行異常,比如有很多 Executor OOM 了,有一定概率會導致 Driver Watch 的信息錯誤,盡管 Task 還沒有運行完,但是 Driver 卻不再申請 Executor 去執行任務,發生任務卡死。對此我們的解決方案如下:
在開啟 Watch 機制的同時,也開啟 List 機制,并將 List 時間間隔拉長,設置每 5 分鐘請求一次
修改 ExecutorPodsPollingSnapshotSource 相關代碼,允許 Apiserver 服務端緩存,從緩存中獲取全量 Pod 信息,降低 List 對集群的壓力
4. Celeborn 讀寫超時、失敗
ApacheCeleborn 是阿里開源的一款產品,前身為 RSS (Remote Shuffle Service)。在早期成熟度上還略有欠缺,在對網絡延遲、丟包異常處理等方面處理的不夠完善,導致線上出現一些有大量 Shuffle 數據的 Spark 任務運行時間很長、甚至任務失敗,以下三點是我們針對此問題的解決辦法。
優化 Celeborn,形成內部版本,完善網絡包傳輸方面的代碼
調優 CelebornMaster 和 Worker 相關參數,提升 Shuffle 數據的讀寫性能
升級 ECI 底層鏡像版本,修復 ECILinux 內核 Bug
5. 批量提交任務時,Quota 鎖沖突
為了防止資源被無限使用,我們對每個 K8s 集群都設置了 Quota 上限。在 K8s 中,Quota 也是一種資源,每一個 Pod 的申請與釋放都會修改 Quota 的內容 (Cpu/Memory 值),當很多任務并發提交時,可能會發生 Quota 鎖沖突,從而影響任務 Driver 的創建,任務啟動失敗。
應對這種情況導致的任務啟動失敗,我們修改 Spark Driver Pod 的創建邏輯,增加可配置的重試參數,當檢測到 Driver Pod 創建是由于 Quota 鎖沖突引起時,進行重試創建。Executor Pod 的創建也可能會由于 Quota 鎖沖突而失敗,這種情況可以不用處理,Executor 創建失敗 Driver 會自動申請創建新的,相當于是自動重試了。
6.批量提交任務時,UnknownHost 報錯
當瞬時批量提交大量任務到集群時,多個 Submit Pod 會同時啟動,并向 Terway 組件申請 IP 同時綁定彈性網卡,存在一定概率出現以下情況,即 Pod 已經啟動了,彈性網卡也綁定成功但是實際并沒有完全就緒,此時該 Pod 的網絡通信功能實際還無法正常使用,任務訪問 Core DNS 時,請求無法發出去,Spark 任務報錯 UnknownHost 并運行失敗。該問題我們通過下面這兩個措施進行規避和解決:
為每臺 ECS 節點,都分配一個 TerwayPod
開啟 Terway 的緩存功能,提前分配好 IP 和彈性網卡,新 Pod 來的直接從緩存池中獲取,用完之后歸還到緩存池中
7. 可用區之間網絡丟包
為保障庫存的充足,各 K8s 集群都配置了多可用區,但跨可用區的網絡通信要比同可用區之間通信的穩定性略差,即可用區之間就存在一定概率的丟包,表現為任務運行時長不穩定。
對于跨可用區存在網絡丟包的現象,可嘗試將 ECI 的調度策略設定為 VSwitchOrdered,這樣一個任務的所有 Executor 基本都在一個可用區,避免了不同可以區 Executor 之間的通信異常,導致的任務運行時間不穩定的問題。
總結與展望
最后,非常感謝阿里云容器、ECI、EMR 等相關團隊的同學,在我們整個技術方案的落地與實際遷移過程中,給予了非常多的寶貴建議和專業的技術支持。
目前新的云原生架構已在生產環境上穩定運行了近一年左右的時間,在未來,我們將持續對整體架構進行優化和提升,主要圍繞以下幾個方面:
持續優化云原生的整體方案,進一步提升系統承載與容災能力
云原生架構升級,更多大數據組件容器化,讓整體架構更加徹底的云原生化
更加細粒度的資源管理和精準的成本控制
審核編輯:湯梓紅
-
容器
+關注
關注
0文章
496瀏覽量
22085 -
大數據
+關注
關注
64文章
8899瀏覽量
137572 -
云原生
+關注
關注
0文章
250瀏覽量
7958 -
kubernetes
+關注
關注
0文章
225瀏覽量
8729
原文標題:米哈游大數據云原生實踐
文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論