現在大資料在各行業的應用越來越廣泛:運營基於資料關注運營效果,產品基於資料分析關注轉化率情況,開發基於資料衡量系統優化效果等。
美圖公司有美拍、美圖秀秀、美顏相機等十幾個 app,每個 app 都會基於資料做個性化推薦、搜尋、報表分析、反作弊、廣告等,整體對資料的業務需求比較多、應用也比較廣泛。
因此美圖資料技術團隊的業務背景主要體現在:業務線多以及應用比較廣泛。這也是促使我們搭建資料平臺的一個最主要的原因,由業務驅動。
美圖資料平臺整體架構如圖所示是我們資料平臺的整體架構。在資料收集這部分,我們構建一套採集服務端日誌系統 Arachnia,支援各 app 整合的客戶端 SDK,負責收集 app 客戶端資料;同時也有基於 DataX 實現的資料整合(匯入匯出);Mor 爬蟲平臺支援可配置的爬取公網資料的任務開發。
資料儲存層主要是根據業務特點來選擇不同的儲存方案,目前主要有用到 HDFS、MongoDB、Hbase、ES 等。在資料計算部分,當前離線計算主要還是基於 Hive&MR、實時流計算是 Storm 、 Flink 以及還有另外一個自研的 bitmap 系統 Naix。
在資料開發這塊我們構建了一套資料工坊、資料匯流排分發、任務排程等平臺。資料視覺化與應用部分主要是基於使用者需求構建一系列資料應用平臺,包括:A/B 實驗平臺、渠道推廣跟蹤平臺、資料視覺化平臺、使用者畫像等等。
右側展示的是一些各元件都可能依賴的基礎服務,包括地理位置、元資料管理、唯一裝置標識等。
如下圖所示是基本的資料架構流圖,典型的 lamda 架構,從左端資料來源收集開始,Arachnia、AppSDK 分別將服務端、客戶端資料上報到代理服務 collector,通過解析資料協議,把資料寫到 kafka,然後實時流會經過一層資料分發,最終業務消費 kafka 資料進行實時計算。
離線會由 ETL 服務負責從 Kafka dump 資料到 HDFS,然後異構資料來源(比如 MySQL、Hbase 等)主要基於 DataX 以及 Sqoop 進行資料的匯入匯出,最終通過 hive、kylin、spark 等計算把資料寫入到各類的儲存層,最後通過統一的對外 API 對接業務系統以及我們自己的視覺化平臺等。
資料平臺的階段性發展企業級資料平臺建設主要分三個階段:
剛開始是基本使用免費的第三方平臺,這個階段的特點是能快速整合並看到 app 的一些統計指標,但是缺點也很明顯,沒有原始資料除了那些第三方提供的基本指標其他分析、推薦等都無法實現。所以有從 0 到 1 的過程,讓我們自己有資料可以用;在有資料可用後,因為業務線、需求量的爆發,需要提高開發效率,讓更多的人蔘與資料開發、使用到資料,而不僅僅侷限於資料研發人員使用,所以就涉及到把資料、計算儲存能力開放給各個業務線,而不是握在自己手上;在當資料開放了以後,業務方會要求資料任務能否跑得更快,能否秒出,能否更實時;另外一方面,為了滿足業務需求叢集的規模越來越大,因此會開始考慮滿足業務的同時,如何實現更節省資源。美圖現在是處於第二與第三階段的過渡期,在不斷完善資料開放的同時,也逐步提升查詢分析效率,以及開始考慮如何進行優化成本。接下來會重點介紹 0 到 1 以及資料開放這兩個階段我們平臺的實踐以及優化思路。
從 0 到 1從 0 到 1 解決從資料採集到最終可以使用資料。如圖 4 所示是資料收集的演進過程,從剛開始使用類似 umeng、flurry 這類的免費第三方平臺,到後面快速使用 rsync 同步日誌到一臺伺服器上儲存、計算,再到後面快速開發了一個簡單的python指令碼支援業務伺服器上報日誌,最終我們開發了服務端日誌採集系統 Arachnia 以及客戶端 AppSDK。
資料採集是資料的源頭,在整個資料鏈路中是相對重要的環節,需要更多關注:資料是否完整、資料是否支援實時上報、資料埋點是否規範準確、以及維護管理成本。因此我們的日誌採集系統需要滿足以下需求:
能整合管理維護,包括 Agent 能自動化部署安裝升級解除安裝、配置熱更、延遲方面的監控;在可靠性方面至少需要保證 at least once;美圖現在有多 IDC 的情況,需要能支援多個 IDC 資料採集彙總到資料中心;在資源消耗方面儘量小,儘量做到不影響業務。基於以上需求我們沒有使用 flume、scribe、fluentd,最終選擇自己開發一套採集系統 Arachnia。
上圖是 Arachnia 的簡易架構圖,它通過系統大腦進行集中式管理。puppet 模組主要作為單個 IDC 內統一彙總 Agent 的 metrics,中轉轉發的 metrics 或者配置熱更命令。採集器 Agent 主要是運維平臺負責安裝、啟動後從 brain 拉取到配置,並開始採集上報資料到 collector。
接著看 Arachnia 的實踐優化,首先是 at least once 的可靠性保證。不少的系統都是採用把上報失敗的資料通過 WAL 的方式記錄下來,重試再上報,以免上報失敗丟失。我們的實踐是去掉 WAL,增加了 coordinator 來統一的分發管理 tx 狀態。
開始採集前會從 coordinator 發出 txid,source 接收到訊號後開始採集,並交由 sink 傳送資料,傳送後會ack tx,告訴 coordinator 已經 commit。coordinator 會進行校驗確認,然後再發送 commit 的訊號給 source、sink 更新狀態,最終 tx 完 source 會更新採集進度到持久層(預設是本地 file)。該方式如果在前面 3 步有問題,則資料沒有傳送成功,不會重複執行;如果後面 4 個步驟失敗,則資料會重複,該 tx 會被重放。
基於上文的 at least once 可靠性保證,有些業務方是需要唯一性的,我們這邊支援為每條日誌生成唯一 ID 標識。另外一個數據採集系統的主要實踐是:唯一定位一個檔案以及給每條日誌做唯一的 MsgID,方便業務方可以基於 MsgID 在發生日誌重複時能在後面做清洗。
我們一開始是使用 filename,後面發現 filename 很多業務方都會變更,所以改為 inode,但是 inode linux 會回收重複利用,最後是以 inode & 檔案頭部內容做 hash 來作為fileID。而 MsgID 是通過 agentID & fileID & offset 來唯一確認。
資料上報之後由 collector 負責解析協議推送資料到 Kafka,那麼 Kafka 如何落地到 HDFS 呢? 首先看美圖的訴求:
支援分散式處理;涉及到較多業務線因此有多種資料格式,所以需要支援多種資料格式的序列化,包括 json、avro、特殊分隔符等;支援因為機器故障、服務問題等導致的資料落地失敗重跑,而且需要能有比較快速的重跑能力,因為一旦這塊故障,會影響到後續各個業務線的資料使用;支援可配置的 HDFS 分割槽策略,能支援各個業務線相對靈活的、不一樣的分割槽配置;支援一些特殊的業務邏輯處理,包括:資料校驗、過期過濾、測試資料過濾、注入等;基於上述訴求痛點,美圖從 Kafka 落地到 HDFS 的資料服務實現方式如圖 7 所示。
基於 Kafka 和 MR 的特點,針對每個 kafka topic 的 partition,組裝 mapper 的 inputsplit,然後起一個 mapper 程序處理消費這個批次的 kafka 資料,經過資料解析、業務邏輯處理、校驗過濾、最終根據分割槽規則落地寫到目標 HDFS 檔案。
落地成功後會把這次處理的 meta 資訊(包括 topic、partition、開始的 offset、結束的offset)儲存到 MySQL。下次再處理的時候,會從上次處理的結束的 offset 開始讀取訊息,開始新一批的資料消費落地。
實現了基本功能後難免會遇到一些問題,比如不同的業務 topic 的資料量級是不一樣的,這樣會導致一次任務需要等待 partition 資料量最多以及處理時間最長的 mapper 結束,才能結束整個任務。那我們怎麼解決這個問題呢?系統設計中有個不成文原則是:分久必合、合久必分,針對資料傾斜的問題我們採用了類似的思路。
首先對資料量級較小的 partition 合併到一個 inputsplit,達到一個 mapper 可以處理多個業務的 partition 資料,最終落地寫多份檔案。
另外對資料量級較大的 partition 支援分段拆分,平分到多個 mapper 處理同一個 partition,這樣就實現了更均衡的 mapper 處理,能更好地應對業務量級的突增。
除了資料傾斜的問題,還出現各種原因導致資料 dump 到 HDFS 失敗的情況,比如因為 kafka 磁碟問題、hadoop 叢集節點宕機、網路故障、外部訪問許可權等導致該 ETL 程式出現異常,最終可能導致因為未 close HDFS 檔案導致檔案損壞等,需要重跑資料。那我們的資料時間分割槽基本都是以天為單位,用原來的方式可能會導致一個天粒度的檔案損壞,解析無法讀取。
我們採用了分兩階段處理的方式:mapper 1 先把資料寫到一個臨時目錄,mapper 2 把 Hdfs 的臨時目錄的資料 append 到目標檔案。這樣當 mapper1 失敗的時候可以直接重跑這個批次,而不用重跑整天的資料;當 mapper2 失敗的時候能直接從臨時目錄 merge 資料替換最終檔案,減少了重新 ETL 天粒度的過程。
在資料的實時分發訂閱寫入到 kafka1 的資料基本是每個業務的全量資料,但是針對需求方大部分業務都只關注某個事件、某小類別的資料,而不是任何業務都消費全量資料做處理,所以我們增加了一個實時分發 Databus 來解決這個問題。
Databus 支援業務方自定義分發 rules 往下游的 kafka 叢集寫資料,方便業務方訂閱處理自己想要的資料,並且支援更小粒度的資料重複利用。
上圖可以看出 Databus 的實現方式,它的主體基於 Storm 實現了 databus topology。Databus 有兩個 spout,一個支援拉取全量以及新增的 rules,然後更新到下游的分發 bolt 更新快取規則,另外一個是從 kafka 消費的 spout。而 distributionbolt 主要是負責解析資料、規則 match,以及把資料往下游的 kafka 叢集傳送。
資料開放有了原始資料並且能做離線、實時的資料開發以後,隨之而來的是資料開發需求的井噴,資料研發團隊應接不暇。所以我們通過資料平臺的方式開放資料計算、儲存能力,賦予業務方有資料開發的能力。
對實現元資料管理、任務排程、資料整合、DAG 任務編排、視覺化等不一一贅述,主要介紹資料開放後,美圖對穩定性方面的實踐心得。
資料開放和系統穩定性是相愛相殺的關係:一方面,開放了之後不再是有資料基礎的研發人員來做,經常會遇到提交非法、高資源消耗等問題的資料任務,給底層的計算、儲存叢集的穩定性造成了比較大的困擾;另外一方面,其實也是因為資料開放,才不斷推進我們必須提高系統穩定性。
針對不少的高資源、非法的任務,我們首先考慮能否在 HiveSQL 層面能做一些校驗、限制。如圖 13 所示是 HiveSQL 的整個解析編譯為可執行的 MR 的過程:
首先基於 Antlr 做語法的解析,生成 AST,接著做語義解析,基於AST 會生成 JAVA 物件 QueryBlock。基於 QueryBlock 生成邏輯計劃後做邏輯優化,最後生成物理計劃,進行物理優化後,最終轉換為一個可執行的 MR 任務。
我們主要在語義解析階段生成 QueryBlock 後,拿到它做了不少的語句校驗,包括:非法操作、查詢條件限制、高資源消耗校驗判斷等。
第二個在穩定性方面的實踐,主要是對叢集的優化,包括:
我們完整地對 Hive、Hadoop 叢集做了一次升級。主要是因為之前在低版本有 fix 一些問題以及合併一些社群的 patch,在後面新版本都有修復;另外一個原因是新版本的特性以及效能方面的優化。我們把 Hive 從 0.13 版本升級到 2.1 版本,Hadoop 從 2.4 升級到 2.7;對 Hive 做了 HA 的部署優化。我們把 HiveServer 和 MetaStoreServer 拆分開來分別部署了多個節點,避免合併在一個服務部署執行相互影響;之前執行引擎基本都是 On MapReduce 的,我們也在做 Hive On Spark 的遷移,逐步把線上任務從 Hive On MR 切換到 Hive On Spark;拉一個內部分支對平時遇到的一些問題做 bugfix 或合併社群 patch 的特性;在平臺穩定性方面的實踐最後一部分是提高許可權、安全性,防止對叢集、資料的非法訪問、攻擊等。提高許可權主要分兩塊:API 訪問與叢集。
API Server :上文提到我們有 OneDataAPI,提供給各個業務系統訪問資料的統一 API。這方面主要是額外實現了一個統一認證 CA 服務,業務系統必須接入 CA 拿到 token 後來訪問OneDataAPI,OneDataAPI 在 CA 驗證過後,合法的才允許真正訪問資料,從而防止業務系統可以任意訪問所有資料指標。叢集:目前主要是基於 Apache Ranger 來統一各類叢集,包括 Kafka、Hbase、Hadoop 等做叢集的授權管理和維護;以上就是美圖在搭建完資料平臺並開放給各個業務線使用後,對平臺穩定性做的一些實踐和優化。
總結首先在搭建資料平臺之前,一定要先了解業務,看業務的整體體量是否比較大、業務線是否比較廣、需求量是否多到嚴重影響我們的生產力。如果都是肯定答案,那可以考慮儘快搭建資料平臺,以更高效、快速提高資料的開發應用效率。如果本身的業務量級、需求不多,不一定非得套大資料或者搭建多麼完善的資料平臺,以快速滿足支撐業務優先。在平臺建設過程中,需要重點關注資料品質、平臺的穩定性,比如關注資料來源採集的完整性、時效性、裝置的唯一標識,多在平臺的穩定性方面做優化和實踐,為業務方提供一個穩定可靠的平臺。在提高分析決策效率以及規模逐漸擴大後需要對成本、資源做一些優化和思考。