首頁>科技>

在得到的大資料產品體系中,使用者分群是資料化精準運營方向的一個重要功能。該專案的落地為資料產品驅動業務發展提供了新的思路。在架構設計中,大資料中心採用了目前業界應用較少的新型資料庫同步技術,盡最大可能的保障參與計算的資料素材的實時性,以此保障運營動作的精準性。

本文將圍繞資料驅動型專案的技術選型、架構設計、效能最佳化等方面展開。本文分為四個部分:

第一部分介紹一切事情的起源,以及需求理解後的分層設計;

第二部分是本次分享的重點,介紹資料層的選型方案和整體架構;

第三部分為產品層和應用層的設計;

第四部分為踩坑回顧和未來規劃。

一、故事的開始是這樣的

有一天,運營老師提出這樣一個需求:

“我想要做這樣一個運營動作,篩選出‘目前’買過X課程的但沒有買過Y課程的,有Z勳章的且昨天看過Y課程詳情頁的人群。給該人群中的每一個使用者傳送一個push,以及一張Y課程的優惠券,促使該使用者群體買Y課程。如果使用者滿足以上條件,說明使用者已經有很大的傾向發生購買行為了,這時候推波助瀾一下,deal。”

以上需求中,有一個關鍵字:“目前”。因為該運營動作天生要求實時性,想象一下,如果使用者昨天還沒有買Y課程,今天早上剛下單,如果基於延遲計算的資料結果(如T+1)執行該運營動作,結果就是push和優惠券恰好姍姍來遲,這對使用者來說已經沒有促進,而是傷害了。故這種運營動作,資料一定要儘可能的保證實時。

從技術角度來理解下這個需求:從表面上來看,該需求主要是需要實時的計算資料,最後應用計算出的人群在各類運營平臺上進行聯動操作,比如訊息中心、優惠券系統。實際上,這只是表層的,隱藏的技術難點有很多:

需要將已購表、勳章表等不同結構資料庫與使用者行為資料進行關聯計算,這就要求我們具備異構資料實時聚合以及計算的能力;產品層面上,需要提供給運營老師易用的人群定義系統,以及給上層系統提供高吞吐人群資料服務。

如此,我們按職能可以將架構從下到上分為四層:

最下面一層是負責將各種各樣的的資料來源實時同步並存儲的資料構造層。第二層是在第一層準備好的資料素材基礎上,進行各種計算的資料計算層。第三層為交付給需求方使用的產品層,落地為資料門戶中的易用系統,透過點選即可完成複雜的人群生產。最上層為使用者分群系統與各上層應用分群的業務聯動的資料服務層,在約定好的邊界下,應用產品層生產好的人群做各類運營動作。

以故事起點的需求為例:資料構造層將已購表、勳章表彙集到一處儲存,運營老師在產品層定義人群,資料計算層在資料構造層的基礎上做交集並集運算計算出人群uid列表,最後訊息中心、優惠劵系統拉走人群檔案,遍歷傳送push併發出優惠券。

目前該分層架構圖還很空,隨著章節的推進,我們會一點一點將該分層架構圖填滿。

另外需要劃重點的是,在四層中,資料構造層和資料計算層是緊密繫結在一起的。因為在大資料領域中,各類計算引擎的執行都是建立在檔案的基礎上的。比如得到資料門戶中的行為分析系統應用的impala計算引擎,其高效的運算就依賴於 parquet列式檔案儲存。所以,下面我們會先結合這兩層一起來做架構選型。

二、資料構造層和資料計算層

綜合第一部分的需求分析,我們需要在這兩層解決的問題有三個:

異構資料的實時同步;計算引擎;承上啟下的儲存。

在大資料中心已有的架構中是否有可以複用的呢?

穩定的資料同步場景,目前大資料中心離線資料倉庫採用的是阿里巴巴開源的DataxT+1同步。DataX是阿里巴巴集團內被廣泛作為離線大資料倉庫和各類儲存系統的正向和逆向的傳輸工具。而T+1同步,是指延遲一日進行資料同步的方式,這種方式相當在凌晨將資料表做一個此時此刻的映象,每日的映象按分割槽儲存在資料倉庫中,使用時指定最新的分割槽即可。那我們是否可以透過細化DataX執行週期,來做到準實時呢。理論上是可行的,但是每次的全量同步都會消耗大量的頻寬資源和時間。從各種維度來看,都過於粗暴並且不合理。

首先是效能瓶頸:隨著業務規模的增長,Select * From MySQL -> Save to LocalFile -> Load to Hive 這種資料流花費的時間將越來越長,流量也將越來越大。從MySQL中Select全量資料,對MySQL的資源消耗必然很大,可能產生連鎖反應,影響線上業務的正常服務,即便透過建立專用從庫解決,對資源的需求卻會大大增加。

為何不採用增量同步?這是因為業務資料庫通常是存在刪改的,在Hive倉庫資料不可更改的背景下,增量拉資料會導致歷史資料不一致的問題。

故聚合業務資料庫的資料,不應該面向資料,合理的技術選型當然是使用資料庫 CDC (Change Data Capture)方案,基於各類資料庫的變更日誌,可以逆流程的還原資料。比如MySQL的Binlog,MongoDB的OPlog,PostgreSQL WAL。CDC的本質是有序且源源不斷的資料流,為了便於程式處理,需要有一個能承載有序流的載體。這個載體是毫無爭議的,必然是天生為此誕生的Kafka。確認了有序流的載體,基於此,需求可以細化為兩個小點:

如何提取CDC進Kafka;如何從Kafka消費資料,逆流程還原資料並存儲?

在這條鏈路中,Kafka是核心樞紐,細化的需求1和2其實就是Kafka的進和出。對於這種場景,Kafka的原始團隊推出的Confluent套件中的核心元件Kafka Conncet專注於此。Kafka Connector,可以簡單理解為連結Kafka的管道,管道的兩端必然有一個是Kafka。

如何提取CDC進Kafka?

由於業界業務庫仍以MySql為主,故我們以MySql的重心進行選型。下表中,總結了市面上主流的可用工具的特點。其中,省略了很多共性:

抓取原理均為偽MySql的slave,實時抓取CDC資料;必要的功能均支援:支援DDL同步, 支撐基於GTID的 HA;均為大資料主語言Java開發,方便後續的原理探索和最佳實踐。

所以,我們需要考慮其他方面,包括:運維成本、易用性、與既定架構契合度、活躍度、架構設計理念等。

首先被pass的是領英的DataBus,其各方面都落後太多,且官方早已經不維護了。接著就是新生代Maxwell和Debezium,與老兵Canal之爭,由於Canal多個維度的模型均落下風,包括不支援Bootstrap,架構和應用的複雜度較複雜。最終我們選擇了新生代中更優秀的Debezium,這個架構非常新,由2016初開始迭代,選擇它在技術選型上可以稱得上比較大膽了。為何做出這個冒險呢?原因如下:

先進的 Snapshot + Binlog智慧切換模式;與Confluent整合度非常好,天生以Kafka Connector的模式執行,與我們已確定的Confluent架構非常契合;專注做CDC抓取,在官網的描述中,其誕生的初心就是建立一個聯結器庫,捕獲來自各種結構化資料庫的更改;與得到的儲存系統高度契合,包括穩定的Mysql,MongoDB,Postgre的聯結器且持續迭代最佳化;最後,紅帽出品,根紅苗正,熱度非常好。

重點介紹一下Debezimu 的核心特性:

首先就是Snapshot + Binlog 智慧切換模式,下面的流程圖即為一次dibezimu初始化的流程:

關鍵流程點:

當一個Debezium任務啟動時,首先根據Kafka Conncet狀態資訊判定自己是否是新的任務,如果是,初始化狀態資訊(由Kafka Conncet服務維護),開始Snapshot模式拉取資料表存量資料;獲取一個讀鎖(這將阻塞其他客戶端的寫),開啟一個可重複讀事務,以確保Snapshot過程中後續讀到的資料都是一致的;記錄Binlog當前offset。拉取關注表的Schema資訊至SchemaDDL Topic,釋放讀鎖,2和3的操作雖然產生了讀阻塞,但是由於很輕量,基本是無感知的;準備工作完成,Snapshot正式開始,在事務內分頁scan 資料至 CDC Topic,在scan完成後提交事務,從之前記錄的offset開始Binlog模式;在Binlog模式執行過程中持續維護狀態資訊以及SchemaDDL資訊,一旦發生異常,Debezium也可以透過狀態資訊自愈,如此,Debezium可以按預期穩定持續的跑下去。

以上的操作,僅僅在Confluent Connetor服務中提交一個Debezium任務即可,相當的絲般順滑。另外,強大的Snapshot+Binlog智慧模式,以及其DDL語句的持續跟蹤的特性,帶來的高擴充套件性對於平臺化建設絕對是強有力的加成。Debezium Connector拉取到的CDC資料以及流程中提到的狀態資訊和Schema資訊,均儲存在Kafka獨立的Topic中。這是Kafka Connector的設計,不引用其他儲存介質,完全封閉在Confluent的生態內,降低了架構的複雜性,另外,儲存的完全隔離也對穩定性更有保障。綜上,是得到選擇Debezium的原因。當然,也承擔了這個選擇帶來的坑。

如此,我們已經確定了資料構造層的大部分選型,如該圖中,黃色的是已經確認的,Debezium Connector負責CDC資料抓取,抓取到的資料被打入Kafka。對於這兩個核心元件,我們還需要額外兩個基礎元件的支援。Kafka依賴的Zookeeper,以及 Connector依賴的Schema-Registry,其中Schema-Registry是Confluent的另外一個基礎元件,用於支撐各個Connector傳輸進Kafka資料的系列化和反系列化操作。

如何從Kafka消費資料,逆流程還原資料並存儲?

在整個資料構造層中,目前唯一不確認的是儲存。回到這一章節的開始梳理的,整個資料層需要解決的另外兩個問題,就是承上啟下的還原儲存,和計算引擎的確定。資料儲存的所謂啟下就是從Kafka中消費CDC資料還原真實資料,所謂呈上就是定製計算引擎使用的格式。由於這兩點是繫結的,確定了計算引擎,也就確定了還原儲存方案。

現今大資料計算引擎型別多種多樣,引擎的確認必須滿足真實計算需求,回到需求本身,運營老師發起的人群計算,是可預見的基於規則的自由組合。自由選擇是指,對於條件的定義且、或、非的關係,完全由使用者隨時思考且定義。這是典型的Ad Hoc即席查詢場景。故下表中即為我們可以考慮的可以滿足該場景的計算架構。

Hive:Hive是Apache開源的基於HDFS組織資料,基於sql語法提供計算能力的離線大資料倉庫計算架構。透過定義描述即可知:該方案的劣勢非常明顯,不支撐資料更新,需要排程系統驅動密集批計算任務構建還原分割槽完整資料,只能儘量透過縮小排程任務時間粒度保證儘量實時,但優點也非常明顯:計算穩定,開發維護成本低,團隊底蘊好。 HBase+ Hive: 這是對方案1的一種升級方案,使用HBase儲存全量資料,應用HBase的高吞吐能力處理CDC資料實時逆流程還原。另一面,由於HBase是將資料儲存在HDFS,Hive可以透過門面包裝HBase表的方式提供計算能力。該方案的最大優點是,可以保證資料實時儲存,需要挑戰的問題是:Hive分析時會影響HBase的讀能力,可能會造成實時寫程式背壓導致連鎖反應,另外,需要多維護兩套實時的架構,包括HBase和訊息驅動寫入,複雜性較高。Imapla+kudu: 這一對組合是Cloudera開源的MMP架構,在追加場景下(只有增,沒有刪改)是絕對的黃金組合,使用Imapla儲存無變更的歸檔冷資料,kudu儲存新增追加資料,在資料計算時,冷資料路由到Impala引擎,實時資料路由到kudu引擎,已檢視的形式提供服務,該組合實現的冷熱分離方案能達到相當好的實時分析能力。但在其他包括刪改的場景下必須排除不支援變更的Impala單獨使用kudu,即便如此,kudu在同時實時寫和分析的能力在AD HOC場景也是表現也是相當優異的。但是對比HBase,kudu自己組織資料儲存,相對使用HDFS的HBase,需要對資料檔案保障投入大量資源。最後,同方案2一樣,需要多維護兩套實時的架構,複雜性較高。目前得到的行為分析系統已經使用了Imapla,還沒有引入kudu,主要的困擾就是運維成本。

總結看來,只使用kudu是我們的最佳選擇。但是最終,我們在第一版本中選用了方案一,這個只能達到準實時有弊端的方案。為何要選看似最不好的選擇呢?

原因相信大家都經歷過,就是:時間緊任務重人手不足。當時投入專案的開發資源,在已經引入全新架構Debezium, Connect的情況下,很難在分配精力在另一個新架構。由於基礎架構的限制,故選擇了團隊最熟悉的Hive+排程系統的來實現30分鐘延遲的資料準備,當然,這是在需求方認可的情況下!共識是:資料驅動型產品落地的初心是驅動業務發展,在初期,相比上來就平臺化的思路,更應該選擇快速驗證推進。

確定了使用Hive + 排程系統的方式後,檔案儲存也確定下來,必然是HDFS。此時資料層的邏輯基本上已經齊整了,此時只需要處理兩個關鍵點就可以將整個邏輯串起來:

如何將從Kafka取出的資料庫CDC資料放到HDFS供後續的還原,這個自然而然的就選擇Confluent官方的 HDFS Connector;HDFS中已存在的歷史資料,如何與從Kafka中取出的Binlog資料做逆流程還原為最新的原始資料?

答案是:根據資料的性質分層組織資料,再基於分層倉庫進行Hive merge還原計算。

首先,Hive資料倉庫使用了簡化的三層數倉分層模型:

第一層:ODS資料緩衝層,儲存的是HDFS Conector從Kafka拉取到的Binlog檔案。

第二層:DWS資料彙總層,這層承載的是完全還原回來的業務資料庫,由於Hive每次還原都是按時間點分割槽儲存的全量資料映象,故可以稱之為事實分割槽表。最新的映象依賴於上一次全量映象與Binlog merge計算生成。如此,在使用DWS表必須指定最新分割槽。

第三層:為了遮蔽DWS層必須指定分割槽的問題,增加了ADS應用層,ADS表是對DWS表建立的一個邏輯表,其永遠透過軟連線指向DWS表的最新分割槽。

DWS和ADS的建設是需要有排程支撐的,這裡使用團隊底蘊較好的Azkaban。

如何做最關鍵的 Merge計算?

Hive場景下的Merge計算,是在最近一次映象的資料集上合併新增Binlog的批計算。這種操作更像是階段性的執行雙流合併。下面的流程圖即為一個簡單的邏輯演示,左側是有序的Binlog,右側是作為基準資料集的存量資料。

首先對Binlog按 主鍵+事件型別 group by,取出delete 事件與存量資料進行left outer join,這樣就從存量資料中剔除了刪除資料;對剩餘Binlog資料進行二次篩選,按主鍵 group by 取最新的Binlog,無論是insert或是update;將步驟2中生成的資料與存量資料合併,生成最新資料。如圖中的黃色的1、2是受Binlog的update重放影響的,綠色的4是insert新增的。

至此,資料構造層和資料計算層都已經確定,各種素材都已經準備好了,可以透過Hive Server元件提供人群計算服務了。如何基於這些資料定義人群並應用起來呢?就是產品層和服務層這兩層要解決的。

三、產品層和應用層

產品層核心問題主要有兩個:

如何將人群構建流程抽象為易用系統;如何將複雜的人群定義條件轉為最佳化的Hive SQL語句?

先討論產品層的第一步問題:複雜的人群定義條件,如何向上抽象易用的系統,向下轉化為高效的Hive sql呼叫資料計算層?再次分析初始需求:(滿足X,滿足Y,滿足Z,不滿足M),進行計算機語言轉化後 -> (X且Y且Z且) 非 M ,其本質是明顯的集合運算。故計算流程可以抽象為下方的圖,共分三步:

第一步:計算所有滿足人群的交集;

第二步: 計算過濾的集合;

第三步:從滿足人群的交集中剔除過濾集合,最終的灰白色即為我們的目標人群。

集合條件的自由組合,適合的產品形態當然是下拉框點選,最終向需求方交付的資料門戶中的使用者分群系統就是這樣的形態。初始需求中的點選方案 “且或否”,只是一個簡單的例子,且或非集合操作能達成非常靈活的組合,這對轉化成的Hive sql的複雜性和執行效能也會有極大的挑戰。如果在Hive計算時發生傾斜情況,將拖慢整個查詢,計算時長將遠超出我們的可接受範圍。

而集合運算場景如果按照標準的join思路去解決,就容易發生傾斜問題。比如< 多且> 場景,如果按照多次inner join解決,如圖中的演示,藍色和黃色的join,結果與綠色的join,結果再與紫色的join,多個inner join將不斷重複這一流程,在Hive計算引擎將sql轉化為MapReduce時,stage的個數將於且條件個數成正比。同時,Hive在進行join計算時,當進行關聯的兩張表資料量差異過大,很容易產生嚴重的資料傾斜問題。

所以,必須在條件轉化為Hivesql時最佳化處理,消除stage過多和資料傾斜問題,這兩點也是Hive工程中必須解決的。考慮最終輸出結果是單列的uid列表,最終的解決方案是使用(uid+隨機數)Union all + Group By +HavingCount 。對於上面的例子,篩選各個滿足的子條件人群時,在各個子人群的使用者上增加隨機數標識。接著直接將各個子人群Union all合併為一個大資料集。最後對這個大資料集按(uid+隨機數)Group By並且Having Count = 子條件個數。這樣的操作,有效優化了計算資源消耗和計算時間。

應用層要解決的事情是如何確定好邊界,為上層應用提供高效易用的分群資料服務。由於服務層是負責系統之間的互動的,所以確定好邊界是最先要做的事情。透過和需求方運營老師和上游系統討論後,確定最終的邊界是:運營老師在資料門戶的使用者分群系統建立人群,在其他的運營動作系統聯動使用者分群系統提供的Api服務,將運營動作與人群繫結。人群被產品層構建好後,會被同步到Ali 物件儲存和MongoDB兩種儲存,基於這兩種儲存引擎,又構建了相應的Api服務。

離線場景下,如訊息中心繫統的push,站內信,以及優惠券系統的發劵,透過構建在物件儲存上的Api獲取人群CDN地址,下載到系統內遍歷檔案進行相關處理。

線上場景下,如開機彈窗,首頁banner的策略判斷。呼叫基於MongoDB構建的Api,依賴於MongoDB高擴充套件性,構建合理的索引支撐高吞吐,服務線上的大量實時低RT請求。

至此,分層架構完全確認了。產品層的使用者分群系統,將分群條件轉化為高效的 Hive sql語句,透過HiveServer 服務調起分群計算任務。最終將結果,也就是uid列表,轉存至物件儲存OSS和MongoDB,在這兩個儲存介質之上構建對應的Api分別處理離線和實時兩類人群需求。

四、踩坑和未來規劃

正如前面所述,由於應用了新架構,在專案落地中踩了很多坑。

在Debezium方面,由於任務的本質是持續的流式任務,由於各種因素導致的Binlog丟失,是很難能找回來的。對於這種異常,採用對每張表每週定時一次的穩定 Snapshot 來儘量減小可能的損失,從原理和實際情況看,Debezium Snapshot是足夠穩定的。另外,緊急情況下出現存量資料不一致問題時,使用備用線路Datax拉取全量資料替代Snapshot,因為datax的全量拉取速度比Debezium Snapshot更快,當然,該操作都需要做更多的人工運維操作。

另外由於架構太新且持續迭代,無法避免官方bug的影響,目前已經累計觸發3次官方issue,原因均為對同步資料表的規範驗證過於嚴格,比如Date型別不允許預設值為公元起點,這類issue透過原始碼分析都可以修復掉,但再嘗試向官方提交時發現總是晚了一步,嘗試成為commit失敗。所以在長期運維的過程中,資料中心會保持各類元件與官網的版本一致。在升級過程中,也踩過版本匹配的坑導致平滑升級失敗,如Debezium和Connetor,以及HDFS Connect的版本均必須一致。

在Connetor應用方面,由於我們採用的是叢集模式,影響我們最大的是叢集的重平衡機制導致任務頻繁banlance,尤其在快速提交多個Debezium任務時,在Snapshot的階段下,任務重新banlance很大機率會導致資料丟失,甚至導致資料庫死鎖。目前初期的解決方案仍以運維規範為主,不會短時間提交大量connect任務。而長期的優質解決方案,是放棄Connetor叢集模式,採用standalone+ k8s的容器模式,完全實現任務隔離。

當然,在發生問題的第一步,及時向用戶同步問題並道歉,是我們從《程式設計師的職業素養》學到的,“要練習的第一件事就是道歉”。

在專案的落地過程中,得到還沒有做到真正實時的分群,止步在延遲30分鐘的準實時。在未來資料團隊的規模和能力能cover更復雜的架構時,必然會做一次升級,做到真正的精準運營。另一方面,在新型架構的探索和應用過程中,對異構資料同步場景的平臺化思路越來越清晰。異構資料同步的需求不僅僅是在使用者分群中,相信在網際網路開發中,都會有類似的使用資料庫CDC資料的場景。而Debezium+Connector的先進特性帶來的高擴充套件效能對平臺化是非常強有力的支援。

目前對這個平臺的設想是:開發人員只需點點點配置,就能自動化的啟動一個資源隔離的Debezium Connector任務,輸出一個實時的CDC日誌kafka topic,平臺在遮蔽底層複雜任務細節上,支撐任務的全生命週期管理和監控報警等。

15
最新評論
  • 整治雙十一購物亂象,國家再次出手!該跟這些套路說再見了
  • 「人臉表情識別」情緒識別相關會議、比賽(2018-2020)