部門:資料中臺
出處:https://mp.weixin.qq.com/s?__biz=MzAxOTY5MDMxNA==&mid=2455762177&idx=1&sn=e72732ed9026a9700a9cf79adda69452
一、前言隨著近幾年業務快速發展與迭代,大資料的成本也水漲船高,如何最佳化成本,建設低成本高效率的底層服務成為了有贊資料基礎平臺2020年的主旋律。本文主要介紹了隨著雲原生時代的到來,經歷7年發展的有贊離線計算平臺如何擁抱雲原生,透過容器化改造、彈性伸縮、大資料元件的錯峰混部,做到業務成倍增長的情況下成本負增長。
首先介紹一下目前有贊離線計算的一些現狀。
萬兆網絡卡的新叢集,機器頻寬不再是瓶頸。之前我們完成了一次跨雲運營商(UCloud -> Qcloud)的叢集遷移。而且新叢集機型全部都是萬兆的網絡卡(之前老叢集還存在部分千兆機型),所以頻寬不再是瓶頸。同時 Qcloud 的機型選擇更加靈活,機器伸縮也更加方便。90%以上的離線計算任務使用 Spark 引擎。我們 19 年完成離線任務從 Hive 到 Spark 的遷移,因此在考慮 K8s 容器化時,只針對 Spark 處理。儲存計算混部下的木桶效應。在 YARN 模式下,計算和儲存是混部的,當一種資源不足而叢集擴容時,勢必造成了另一個資源的浪費。我們這邊定期擴容的原因總是計算資源先達到瓶頸(雖然一直在對離線任務進行治理和最佳化),而儲存資源相對比較浪費(即使儲存之前一直沒怎麼去最佳化和治理,比如我們現在資料還是 3 副本的模式,而沒有去使用 Erasure Coding 去最佳化儲存效率)針對現在離線計算的侷限性,我們提出了新的方向:
本文的主要內容包括:
技術方案Spark 改造部署最佳化踩坑和經驗二、技術方案從 YARN 環境遷移到 K8s 環境有兩個明顯需要解決的問題,一是 executor dynamic allocation 能力缺失,二是在儲存計算分離之後的 shuffle 和 sort 資料儲存問題。針對這兩個問題我們做了技術方案,這兩個方案也分享給大家。
2.1 Dynamic Allocationexecutor 動態分配是一個相對比較有用的功能,它能讓各個任務按需的伸縮資源,使叢集的資源利用率更高。在 YARN 環境下需要基於 NodeManager 的 external shuffle service 才能開啟。而在 K8s 環境下因為沒有 external shuffle service 的方案而無法使用,所以我們引入了 SPARK-27963(Allow dynamic allocation without an external shuffle service)來緩解。這個方案是基於跟蹤 shuffle 引用,只有在 executor 產生的 shuffle 沒有被引用的 executor 才可以被釋放,同時 shuffle 引用清理又是基於 spark cleaner 的弱引用清理機制(清理時機依賴於 GC),這種方案釋放 executor 的效果相比原來會差很多。
為支援 K8s 上 executor 動態分配的 shuffle service 方案也有下面兩種方式,大家可以考慮一下( 據我們調研這兩種方式已有一些公司在生產上實踐):
enabling shuffle service as a DaemonSet早期 Spark 2.2 版本的實驗功能,將 shuffle service 作為 DaemonSet 部署到 K8s 叢集。缺陷就是需要用到 hostPath volume 來對接宿主機讀寫 shuffle 資料,跟宿主機綁死了(只能執行在能滿足磁碟需求的宿主機上)。remote storage for persisting shuffle data(SPARK-25299)目前社群正在實現的方案,但進展比較緩慢,至今還沒有全部完成。最近 Uber 開源一個比較完整的 RemoteShuffle 實現。我們這邊也在小規模嘗試和驗證。2.2 儲存卷 (PV) 選擇我們在 YARN 模式下叢集使用的是儲存能力比較大的大資料機型,有多個大容量的本地機械盤,而計算儲存是共用這些本地機械盤。儲存計算分離之後就引入了新問題:怎麼解決 Spark 計算所需的磁碟需求?Spark 計算過程中的磁碟需求主要有 shuffle 和 sort 構成。shuffle 部分大家比較清楚,但 sort 往往被忽略,比如在 Spark 內部 sort 場景經常被用到的 UnsafeExternalSorter 。在 executor 記憶體不足的情況下, UnsafeExternalSorter 就會發生大量 spill 。spill 過程主要是對 IO 讀寫吞吐需求比較大,對儲存容量要求不高。
hostPath
出於降低成本的目的,我們評估了騰訊雲提供的各種機型和儲存產品,最終選擇的方案是:計算節點掛載多塊雲硬碟(塊儲存 CBS),然後在 Spark Pod 中使用 hostPath 方式來引用宿主機雲硬碟的掛載目錄。
目前這個方案也有很多不足:
hostPath 方式的缺點很明顯,使用了 hostPath 的 Pod 綁定了特定的宿主機。你沒辦法將 Pod 排程到沒有云硬碟掛載目錄的宿主機上,這對於同其他應用的混部會有很大的制約。效能受到單盤 IO 吞吐限制,而又無法充分利用雲硬碟的總吞吐。雲商提供的雲硬碟會有一些 IO 吞吐上限設定,比如單塊高效能雲硬碟限制最大吞吐 150MB/s。因此我們會經常碰到由於 IO 不均勻導致某個雲硬碟達到限制瓶頸,而其他盤空閒的情況。Ceph
為了解決上述問題,我們調研了分散式儲存 Ceph。Ceph 有下列優勢:
有比較高的讀寫效能。一方面它支援資料條帶化(striping),讀寫能並行利用多塊磁碟;另一方面新的 BuleStore 儲存引擎是基於快閃記憶體儲存介質設計,並在裸盤上構建(繞過了本地檔案系統),效能上更加突出。基於 RADOS 架構核心之上構建了三種儲存方案,分別是塊儲存、檔案儲存和物件儲存,可以滿足不同的儲存需求。作為雲計算的儲存方案有比較多的實踐(如 OpenStack),同時也是 K8s 官網支援的 volume plugin 之一。Ceph 作為 K8s 儲存卷 (PV) 時選擇哪種儲存方案呢 ?
對於一般普通的應用的 Pod,使用 Ceph RBD 是第一選擇,無中心化的架構在穩定性和效能上更具優勢。對於 Spark Pod,Pod 之間又有大量的資料讀寫交換(shuffle 和 sort),我覺得可以去嘗試 Ceph FS。因為它的訪問方式支援ReadWriteMany(可被多個節點讀寫掛載),那麼在上面提到的 shuffle service 方案中,就可以有第三種嘗試方案:經過儲存卷 (PV)掛載之後,Spark Pod 的讀寫 shuffle 跟本地操作一樣,下游 stage 的 task 需要上游 task 產生的 shuffle 資料檔案,只需要知道對應的 shuffle 資料檔案路徑就可以直接轉為本地讀;當然這裡需要基於 Spark ShuffleManager 擴展出新的一種“Local” ShuffleManager。這種方式不需要再引入額外的 shuffle service 模組,總體上算是一個比較有吸引力的解決方向。Ceph 的優勢很多,但複雜性、學習成本、運維難度也比較高。目前 Ceph 方案在有贊大資料還處於測試驗證的階段,我們也在摸索的過程去驗證它的可行性。
三、Spark 改造Apache Spark 從 2.2 版本開始支援 K8s 環境,到 3.0 版本正式支援 K8s。在有贊,我們使用的 Spark 版本是 2.3.3 版本,Spark 還沒有正式支援 K8s 環境。為了實現對 K8s 環境的支援,需要對 Spark 做一些修改。
3.1 新增小檔案合併功能資料在流轉過程中經歷 filter/shuffle 等過程後,開發人員難以評估作業寫出的資料量。即使使用了 Spark 提供的 AE 功能,目前也只能控制 shuffle read 階段的資料量,寫出資料的大小實際還會受壓縮演算法及格式的影響,因此在任務執行時,對分割槽的資料評估非常困難。
shuffle 分割槽過多過碎,寫入效能會較差且生成的小檔案會非常多。shuffle 分割槽過少過大,則寫入併發度可能會不夠,影響任務執行時間。在產生大量碎片檔案後,任務資料讀取的速度會變慢(需要尋找讀入大量的檔案,如果是機械盤更是需要大量的定址操作),同時會對 HDFS NameNode 記憶體造成很大的壓力。因此我們添加了合併小檔案的操作。小檔案校驗以及合併流程如下:
透過新增新的語法,替換掉了現有的 MR 版本的小檔案合併。達到緩解 NameNode 的記憶體壓力,提高了下游任務效能的效果。改進點在於,現在小檔案合併過程是同步合併的,為了更好的靈活性可以修改成為非同步合併的模式。
3.2 日誌收集服務Spark 整體遷移到 K8s 之後,日誌會隨著 K8s Pod 的釋放而被清除掉。會導致在出現任務異常的情況下,日誌會隨著 executor 的釋放而丟失。會給排查線上問題帶來不便。
因此我們自己添加了一個新的元件,如上圖所示。在 Spark executor container 裡面加入新的 Filebeat,透過 Filebeat 將日誌資料輸出到 Kafka。透過注入環境變數的方式,將 app type , app id , executor Id 三個資訊作為 Key,日誌行作為內容,發給 Kafka。
吞吐量控制是透過 Kafka topic partition 的數量和 Flink job 的併發度來實現。Flink job 將 executor 級別日誌聚合,儲存到儲存中,實現了實時,可拓展的日誌收集查詢服務,解決了 Spark 在 K8s 環境下日誌丟失和不能方便的查詢日誌的問題。同時這個服務也能夠提供給公司內部其它在 K8s 環境上執行的元件使用,比如說 Flink 和 Flume 。
3.3 Remote Shuffle Service我們這裡使用的是 Uber 開源的 remote shuffle service ,同時修改了 Spark 的 executor shuffle 資料記錄機制,實現了在使用 remote shuffle service 的情況下,不標記 executor 是否有活躍的 shuffle 資料,實現了在 K8s 環境下 executor 在任務執行完成後迅速釋放掉。而不會因為shuffle 資料由 full gc 回收不及時而導致 executor 沒有任務的情況下不回收。
同時由於 remote shuffle service 的存在,shuffle 資料的儲存離開了 executor pod ,即使在 executor 出現異常的情況下,shuffle 資料還是能夠獲取到,提高了 Spark SQL 任務的穩定性。
下圖是 remote shuffle service 原理圖:
3.4 Spark K8s Driver Pod 構建順序修改Spark app 啟動需要先構建 Driver Pod,如果你不是透過構建映象,而是透過 configmap 的形式注入配置或者掛載 volume 的形式來啟動。對於 spark 系統來說,會先建立 Spark Driver Pod,後建立 configmaps 和 volumes,這會導致 Driver Pod 無法啟動,因為 Pod 在建立時需要依賴的 configmaps 和 volumes 必須存在才能夠正常建立。所以需要針對這種情況修改為先建立需要的資源,再建立 Driver Pod,最後再將兩者關聯起來。
3.5 新增對本地資源的分發Spark app 啟動過程中,executor 和 driver 都是需要能夠訪問到資源的。如果使用 K8s 的話,會因為 executor 不能訪問到使用者程式碼或者資原始檔而任務失敗。有兩個解決方案可以處理。
方案一:對每一個新的任務把相關的資原始檔放到 ${SPARK_HOME}/jars 目錄中,優點是處理依賴問題容易,缺點是每次需要打包新的映象,如果任務很多,需要很多個映象,會導致 Docker host 磁碟消耗很大。
方案二:修改 spark-submit 程式碼,將資原始檔和各種資料都上傳到 HDFS 上,根據特定規則生成目錄,然後在 executor 執行中,下載被上傳的資原始檔,新增到 classpath 裡面。
綜合考量之後,我們這裡採用了方案二,透過 HDFS 系統暫存資源,然後在 executor 中下載資源。
3.6 web ui 暴露Spark 任務在使用過程中,會有檢視 web ui 來檢視任務執行狀態的需求,在生產環境中,K8s executor Pod 是不能和辦公網路環境聯通的,所以要使用 ingress 來轉發請求。要使用 ingress,需要新建對應的 service,配置需要暴露的埠,就可以實現對辦公網路的 web ui 訪問。
在 K8s 系統中,service 的訪問資訊是在叢集內部才能生效的,不能在叢集外部直接訪問。ingress 是 K8s 系統中為不同的 service 設定的負載均衡服務,是 service 的 “service”, 使用 K8s 統一的 ingress 服務可以透過域名的方式將不同的 service 暴露出去。ingress 的優勢在於可以遮蔽掉 driver Pod ip 的變化,服務重啟或者任務重新排程都會導致 Pod ip 發生變化,ingress 和 service 結合使用,可以實現透過域名訪問,而外部使用者對具體 Pod ip 的變化無感知。
3.7 Spark Pod label 擴充套件預設情況 Spark driver Pod 和 executor Pod 的 nodeSelector 是相同的。如果想實現 driver Pod 被排程到特定的 K8s node 上,executor Pod 排程到其它的 node 上,需要對 Pod 建立過程做修改,使得 executor 和 driver pod 的 nodeSelector 不相同。
這個修改的主要目的是為了適應叢集動態擴縮容,driver Pod 如果被驅逐任務會整體重算,計算成本太大,所以 driver Pod 需要排程在不會因縮容而驅逐 Pod 的機器上,executor 可以排程在多種機器上。如下圖所示:
3.8 Spark app 狀態管理當用戶提交了 Spark app 任務到 K8s 環境時,spark-submit 程序會在申請建立 driver Pod 後立即退出,不會監控driver Pod 的狀態,只要 driver Pod 建立成功,spark-submit 程序就會直接返回 0 。Airflow 在排程的時候,是根據命令執行的返回碼來判斷任務執行是否成功,這樣即使任務失敗,但是 spark-submit 程序的返回碼還是會保持為 0 , Airflow 系統會認為任務執行成功。
為了解決 spark-submit 程式返回值和 driver Pod 執行結果無關問題,需要在 spark-submit 中監聽 driver Pod 執行結果,將 driver Pod 的返回值作為 Spark submit 程序的返回值。sssss
當 Airflow 任務需要殺掉一個 spark app 程序時,Airflow 會向 spark-submit 程序傳送SIGKILL 命令,能夠成功的殺掉 spark-submit 程序,但是不會影響到 K8s 環境中對應的 driver Pod 執行狀態,會導致Airflow 停止任務功能失效。這裡需要新增新的 shutdown hook ,確保 spark-submit 程序在收到
3.9 動態修改 dynamicAllocation.maxExecutorsSpark thriftserver 修改 dynamicAllocation.maxExecutors 引數啟動後,這個引數在執行過程中是不支援修改的。但是偶爾會遇到業務資料突然增大,或者臨時插入了新任務的情況。這時候為了加速離線任務的產出,我們會擴容裝置,新增更多的計算資源,在添加了更多的計算資源後,因為是處於業務高峰期內,不能重啟服務,就需要能夠動態配置 dynamicAllocation.maxExecutors 引數。
四、部署最佳化為了節省資源,提高對現有叢集的利用率,我們在引入了 K8s 之後,對系統的部署模式也做了較大的最佳化。
4.1 錯峰混部不同的業務系統會有不同的業務高峰時間,像離線業務系統典型任務高峰期間會是凌晨的 0 點到 9 點鐘。而像是 HBase 或者 Druid 提供 BI 展示和查詢的系統,常見的業務高峰期是工作日時間,在這個時間以外的其它時間中,可以將其它業務系統的 node 加入到 Spark 所使用的 K8s namespace 中。這樣,Spark on K8s 就可以使用其它業務系統的資源。
需要預先設定可使用資源,在特定時間範圍內將可使用資源的排程開啟,結合上文中不同的 Pod label,就可以實現在特定時間內,executor 能夠使用混部伺服器的資源。在這種情況下,不需要修改作業系統 CPU 優先順序排程策略,在其它業務的低峰期間佔用伺服器資源不會影響到 RT。
下面會有一個業務系統的例子,混部後線上系統的資源利用率得到了明顯的提高。下圖中描述的是一個在03:00~23:00混部的線上業務系統。能夠看到在混部開啟時間內,叢集無論是CPU還是記憶體的使用率有了明顯的上升。
4.2 彈性擴縮容我們使用的是騰訊雲,能夠提供 K8s 叢集對動態擴容的能力。離線任務在排程上會顯示出週期性,如下圖展示了離線任務 K8s 佇列在高峰期的任務堆積現象。
可以很容易的發現,在任務高峰期,0:00~09:00 期間,任務會有堆積的現象。這意味著叢集需要更多的計算資源,高峰期過後,叢集就不會在有任務堆積等待執行的現象了。這裡可以利用 k8s 快速變更叢集節點數量的能力,在 00:00~09:00 時間範圍內,申請全量的資源來保證離線任務的產出,在 09:00~24:00 之間,釋放掉離線叢集一半的資源完成日常工作負載。這樣可以節省在離線叢集低負載時間內的雲服務資源的費用,也可以在遇到業務高峰時動態擴容來應對業務高峰。
五、踩坑和經驗在使用 Spark 過程中,我們踩過一些坑,也積累了一些經驗。
5.1 K8s 誤殺 executorDocker 的 containerd 存在一個 bug ,現象是 container 裡的程序退出後,containerd-shim 不退出,在發生這個 bug 後,Docker 系統會認為 Docker 容器中的程序還在執行中。這導致在某些情況下,Docker 容器會嘗試不停的殺掉具有特定 PID 號的程序,在這個過程中,Docker 服務會向特定 PID 傳送 KILL 訊息。
在同一個節點上,會有其它的 executor 啟動,當發生了上文中的異常後,Docker 系統會持續的傳送 KILL 給特定的 PID 。新的 Java 程序啟動後,工作過程中,可能新建立的 Thread ID 會和上文中的 PID 相同,會接收到 KILL 訊息,導致執行緒異常退出,執行緒的異常退出會導致 Java 程序也異常退出,引起穩定性問題。
針對這個問題,需要升級到 containerd-shim 沒有異常的版本。
5.2 linux 核心引數調優在 K8s 環境上執行時,executor 需要和 driver 保持網路連線來維持心跳訊息,executor 之間在獲取 shuffle 資料的情況下,也會需要新的網路連線。這種情況下,會導致某些 executor 的連線數維持在一個比較高的狀態。在業務高峰期,偶現如下異常:
...... Successfully created connection to /x.x.x.x:38363 after 0 ms (0 ms spent in bootstraps)21/01/22 14:25:12,980 WARN [shuffle-client-4-3] TransportChannelHandler: Exception in connection from /10.109.14.86:38363java.io.IOException: Connection reset by peerat sun.nio.ch.FileDispatcherImpl.read0(Native Method)at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)at sun.nio.ch.IOUtil.read(IOUtil.java:192)at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
這個異常和 Linux 作業系統引數有關係。TCP 連線建立後,三次握手的最後一次握手後,連線會加入到 accept queue 中,這個佇列的計算公式是min(somaxconn,backlog),如果這個佇列打滿的話,會丟掉連線導致出現上文中的異常。在業務高峰期間,一個 executor 的 shuffle 可能會被數千個 executor 獲取,很容易導致部分 executor 的 accept queue被打滿。
針對這種情況,需要針對 Linux 核心引數進行調優。作業系統的全連線佇列引數儲存在 /proc/sys/net/core/somaxconn 中,Spark 中使用的 netty 的全連線佇列引數是透過 spark.shuffle.io.backLog 引數配置的,程式執行中實際的佇列大小是這兩個值中的最小值。可以根據具體情況配置的更大一些。
修改了這些配置後,上文中的網路異常幾乎沒有出現過了。
5.3 executor 丟失,導致任務持續等待Spark thriftserver 系統在執行過程中,會啟動大量的 executor,每個 executor 有各自獨立的生命週期。
當一個 executor 失聯之後,Spark 系統內會發送一條 executorLost 訊息。當系統收到 executorLost 訊息之後,KubernetesClusterSchedulerBackend 會開始走 executorDisable 邏輯,這個邏輯會檢查 executorsPendingLossReason 佇列和 addressToExecutorId 這兩個佇列。當這兩個佇列資料有異常的情況,會導致丟失後的 executor 持續存在 ,一直不會被 remove,會進一步導致在這些已經丟失了的 executor 上的 task 不會結束。
所以需要改良這個邏輯,當 executorLost 之後,將邏輯從 disableExecutor 修改為 removeExecutor ,這樣就能解決 executor 失聯後,任務會直接卡住的問題。
5.4 同一個 executor 多個 task 持續等記憶體如果一個 executor 配置多個 cores,就會有多個 task 分配到同一個 executor 上。在 Spark 系統 中,記憶體的分配最後是透過 ExecutionMemoryPool 來實現的。
while (true) { val numActiveTasks = memoryForTask.keys.size val curMem = memoryForTask(taskAttemptId) maybeGrowPool(numBytes - memoryFree) val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks) val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) val toGrant = math.min(maxToGrant, memoryFree) if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait() //cause dead lock } else { memoryForTask(taskAttemptId) += toGrant return toGrant }}
具體分配記憶體的程式碼邏輯如上。可以看到分配記憶體過程中,會有一個迴圈,迴圈過程中,會 wait 直到任務執行完成釋放記憶體才會 notify,這裡會導致 Spark 任務在執行過程可能會等待數小時,在任務高峰期會導致任務執行時間不可控。所以需要最佳化這塊邏輯,新增任務分配超時機制,控制任務分配超時時間,當任務超時後,返回獲取到的記憶體數量為 0,讓 task 在當前 executor 上失敗,從而在其它的 executor 節點上執行。
5.5 shuffle 資料壞塊19/12/10 01:53:53 ERROR executor.Executor: Exception in task 6.0 in stage 0.0 (TID 6)java.io.IOException:Stream is corrupted atnet.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:228)at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Spark 在大量資料做 shuffle 的過程中,偶然會看到如上圖所示日誌,這個錯誤是因為 shuffle 資料寫出過程中損壞了導致的。
透過引入 SPARK-30225 和 SPARK-26089 ,一定程度上緩解了這個問題。30225 這個 issue 修改的核心在於,只有在資料需要重新讀取的情況下,才會重置 bytebuffer 位置指標。26089 這個 issue 的核心在於,讀取 shuffle 資料塊的 1/3,然後解壓檢查是否有錯誤,如果有錯誤直接丟擲 fetchfailed exception,如果沒有錯誤,繼續解壓後續資料。如果解壓資料錯誤的地點已經超過了檔案的 1/3,會丟擲異常讓整個 task 失敗,透過新增新的訊息,新增一種新的 TaskEndReason,重新計算 shuffle 資料。而不是直接丟擲IOException,導致任務失敗。
5.6 spark 配置檔案載入順序問題app 任務需要打包才能執行,少量使用者會將一些資原始檔打包到 fat jar 裡面。這種情況下,再使用 --files 提交相同的資原始檔,會導致 Spark 系統只能讀取到 fat jar 裡面的資原始檔,引發程式執行異常。例如 hive-site.xml 檔案,如果打包進入 fat jar 會導致程式異常。這個解決方案也很簡單,需要將 Spark executor 的 user-dir 加入到 executor classpath 中就可以解決問題。
5.7 新增對 k8s 資源不足情況的處理Message: Forbidden! User xxxx doesn't have permission. pods "thrift-jdbcodbc-server-xxxxx-exec-xxxx" is forbidden: exceeded quota: cpu-memory-quota, requested: requests.cpu=..., used: requests.cpu=..., limited: requests.cpu=....
Spark 程式啟動過程中,偶爾會遇到如上所示的錯誤資訊。這個錯誤資訊的含義當前 K8s namespace 資源用盡,超出了 resource quota 。
Spark app 任務在啟動時,會申請新的 Pod 作為執行 driver 的載體。在這個過程中,社群版本會在 driver Pod 申請過程中有一次超時等待,如果分配超時,spark-submit 程序會返回非 0 的數值,這會導致在沒有資源的情況下任務直接失敗,但是在批次任務排程過程中,任務因為資源情況或者優先順序情況等待是一個很常見的現象,對於這種情況需要的是 Spark app 任務等待資源,當資源就緒後直接執行即可。
我們添加了資源不足情況下的重試等待機制。一個簡單的策略如重試 50 次,每次重試之間等待 10s。
五、結語有贊大資料離線計算 Spark 任務從 YARN 上轉移到了 K8s 環境上,擁抱了雲原生,透過實現儲存計算分離,容器化和混部,具有了小時級別資源擴充套件能力,在面對業務高峰時,能夠更加遊刃有餘。經過各種改造,最佳化,踩坑,也補齊了開源版本 Spark 的問題,能夠更好的支撐業務。
部門:資料中臺
出處:https://mp.weixin.qq.com/s?__biz=MzAxOTY5MDMxNA==&mid=2455762177&idx=1&sn=e72732ed9026a9700a9cf79adda69452