首頁>技術>

我寫這篇文章,是總結我幾年前學習Spark時候做過的一些練習,一般Spark程式設計的小套路。

架構圖

模擬業務場景離線業務和實時業務。

離線資料流

Hive -> Spark -> MySQL

實時業務

Kafka ->Spark Streaming ->MySQL

相關表

表2 user_info 使用者基本資訊表,存放註冊使用者資訊

表3 product_info 商品基本資訊表,存放所有商品的基本資訊

需求使用者訪問 session 統計

在電商網站上,使用者 session被定義為會話,指使用者第一次進入網站頁面,做了成千上百次操作後,到最後一次離開網站,或者長時間不響應的這一段過程。

統計的問題包括

session 聚合指標計算按時間比例隨機抽取 session獲取每天點選、下單和購買排名前 10 的品類獲 取 top10 品類中排名前 10 的 session

2. 頁面單跳轉化率統計

計算關鍵頁面之間的單步跳轉轉化率,涉及到頁面切片演算法以及 頁面流匹配演算法。

3. 區域熱門商品統計

統計出各個區域的 top3 熱門商品

4. 廣告流量實時統計

實現動態黑名單機制,以及黑名單過濾;實現滑動視窗內的各城市的廣告展現流量和 廣告點選流量的統計;實現每個區域每個廣告的點選流量實時統計;實現每個區域 top3 點選量的廣告的統計;Spark特性共享變數

如果在一個運算元的函式中使用到了某個外部的變數,那麼這個變 量的值會被複製到每個 task 中。為了便於重複利用,Spark提供了兩種共享變數,一種是 Broadcast Variable(廣播變數),另一 種是 Accumulator(累加變數)。

2. DataFrame與DataSet

DataFrame 只知道欄位,但是不知道欄位的型別,而 DataSet時強型別 不僅僅知道欄位,而且知道欄位 型別,所以有更為嚴格的錯誤檢查。

3. DataSet 與 RDD 互操作

透過反射獲取 Schema:使用 case class 的方式;透過程式設計獲取 Schema

4. 使用者自定義聚合函式(UDAF)

繼承 UserDefinedAggregateFunction

5. 開窗函式

第一大類:聚合開窗函式  聚合函式(列) OVER (選項),這裡的選項可以是 PARTITION BY 子句,但不可是 ORDER BY 子句。

COUNT(*) OVER()

排序開窗函式  排序函式(列) OVER(選項),這裡的選項可以是 ORDER BY 子句,也可以是 OVER(PARTITION BY 子句 ORDER BY 子句), 但不可以是 PARTITION BY 子句。

row_number() over(order by score) as rownum

rank() over(order by score) as rank

6. 流處理運算元

updataStateByKey,須開啟 Checkpoint 機制

為每一個 key 維護一根 state,並持續不斷 地更新該 state。

步驟如下

1. 首先,要定義一個 state,可以是任意的資料型別; 2. 其次,要定義 state 更新函式——指定一個函式如何使用之前的 state 和新值 來更新 state;

對於每個 batch,Spark 都會為每個之前已經存在的 key 去應用一次 state 更新函 數,無論這個 key 在 batch 中是否有新的資料。如果 state 更新函式返回 none,那麼 key 對應的 state 就會被刪除。 當然,對於每一個新出現的 key,也會執行 state 更新函式。

需求解題思路1. Session 各範圍訪問步長、訪問時長佔比統計

資料來源

問題分析

統計出符合篩選條件的 session 中,訪問時長在 1s~3s、4s~6s、7s~9s、 10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m 以上各個範圍內的 session 佔比;訪問步長在 1~3、4~6、7~9、10~30、30~60、60 以上各個範圍內的 session 佔比,並將結果儲存到 MySQL 資料庫中。

session 訪問時長,是指session 對應的開始的 action,到結束的 action, 之間的時間範圍;

舉例說明

比如說,符合第一步篩選出來的 session 的數量大概是有 1000 萬個。 我們要計算出,訪問時長在 1s~3s 內的 session 的數量,併除以符合條件的總 session 數量(比如 1000 萬),比如是 100 萬/1000 萬,那麼 1s~3s 內的 session 佔比就是 10%。

解題

2. 使用Spark累加器將相同的每一個訪問時長,相同的每個訪問步長,總訪問時長個數數量和總訪問個數數量統計出來,然後再進行邏輯計算。

3. 將統計結果寫入Mysql,結束

2. Session 隨機抽取

按照時間比例隨機抽取 1000 個 session

比如說,這一天總共有三天 1000 萬得 session。那麼我現在總共要從 這 1000 萬 session 中,隨機抽取出來。 1000 個 session。要求如下

如果這一天的 12:00~13:00 的 session 數量是 100 萬,那麼這個小時的 session 佔比就是 1/10,那麼這個小時中的 100 萬得 session,我們就要抽取 1/10 * 1000 = 100 個。以此類推。

資料來源

將原始資料 (sessionId,使用者資料)的格式轉換成 [dateHour,使用者資料]CountByKey計算出每個小時的Session 總數

原始資料結構 [dateHour,使用者資料] ——> countMap : Map[dataHour, count ]

——> 轉化成 <yyyy-MM-dd,<HH,count>>的格式 dateCountMap:Map[date,Map[Hour,count]],將抽取的數量根據天數,平均分配到每一天,按照每個小時的比例,產生隨機的Index,如下資料結構

Map[date,Map[Hour,List]],這個List裡面包含count數里的隨機取數,比如count=10 ,我可以隨機生成[1,3,4,9]索引,索引個數按照每個小時分配到的數量比例。

最後將這個資料結果進行廣播。broadcast。

3. 將步驟1的資料格式進行按小時分組(GroupByKey),再結合第2步驟的每個小時索引列表,出去對應的索引下標的sessionId。

4. 根據第三步的資料格式,輸出sessionId的聚合資料,然後將這些SessionId紛紛抽取出來,和原來的原始資料進行Join關聯操作。

資料來源

來自需求1中獲取的 Session 用 戶 訪 問 數 據 (UserVisitAction)。

解題

原始資料結構[sessionId, 使用者資料資訊]轉化成以各個主題的資料結構

比如sessionId2detailRDD:RDD[(sessionId,使用者資料)] ——>

訂單主題 orderCategoryIdCountRdd: RDD[CategoryId,Count]

支付主題 payCategoryId2CountRdd: RDD[(CategoryId,Count)]

這樣我們可以獲取到每一個分類目錄主題下面的總數量。

2. 原始資料結構[sessionId, 使用者資料資訊]透過FlatMap運算元壓平出所有的分類Id,

最後轉化成的格式是categoryidRDD[CategoryId,CategoryId,CategoryId]

3.在第2步驟的輸出結果集上向第1步驟的結果集發出LeftJoin得出如下結果

categoryid2countRDD:RDD[(CategoryId,Value)]

其中Value是包含第2步驟中有關該分類目錄ID下的所有總和計數的結果。

4. 在第3步輸出的資料結構結果集進行排序 (Map運算元)

sortByKey2countRDD:RDD[(CategorySortKey(clickCount,orderCount,payCount),Value)]

最終的資料結構是

top10Category:RDD[Top10Category(taskId,categoryid,clickCount,orderCount,payCount)]

最終將結果存入MySQL。

clickCount排名前10的使用者session取出來,這樣有益於對關鍵客戶進行分析。

資料來源

需求1中Session使用者訪問資料

解題

根據需求3的輸出鍵Map運算元抽取出前10的品牌分類目錄

top10CategoryIdRDD:RDD[(categoryid,categoryid)]

2. 需求一的輸出資料集進行包裝

原始資料集 結構明細集合 RDD[(sessionId,使用者資料資訊)] 經過GroupByKey變成分類集合

sessionid2detailsRDD:RDD[(sessionid,使用者資料資訊)]

然後經歷一道FlatMap運算元,抽出categoryid,最終結果集資料結構如下

categoryid2sessionCountRDD:RDD[(categoryid,sessionid + "," +count)],這裡的每一個count都是1。僅僅只是打標籤而已

3. 將需求一和需求二的輸出進行JOIN集合,統計每個品牌分類下的session值的總計數。

資料結構結果集合: top10CategorySessionCountRDD:RDD[(categoryid,sessionid + ","+count)]。這裡的count不再是1,而是每個categoryid下每個sessionid訪問計數總和。

4. 排序取出每個品牌分類下最活躍的前10個session,並且和需求1的輸出資料集進行join獲取到詳細的結果集合。

最終的資料結構應該如下

top10SessionObjectRDD:RDD[TOP10Session(taskid,categoryid,sessionid,count)]

5. 頁面轉化率統計5.1 計算頁面單跳轉化率

什麼是頁面單跳轉換率?

比如一個使用者在一次 Session 過程中訪問的頁面路徑 3,5,7,9,10,21,那麼頁面 3 跳到頁面 5 叫一次單跳,7-9 也叫 一次單跳,那麼單跳轉化率就是要統計頁面點選的機率,比如:計算 3-5 的單跳轉 化率,先獲取符合條件的 Session 對於頁面 3 的訪問次數(PV)為 A,然後獲取符 合條件的 Session 中訪問了頁面 3 又緊接著訪問了頁面 5 的次數為 B,那麼 B/A 就 是 3-5 的頁面單跳轉化率,我們記為 C;那麼頁面 5-7 的轉化率怎麼求呢?先需要求 出符合條件的 Session 中訪問頁面 5 又緊接著訪問了頁面 7 的次數為 D,那麼 D/B 即為 5-7 的單跳轉化率。

頁面單跳轉化率的關鍵條件在於,頁面的訪問時有先後的。

資料來源

解題

將需求一中輸出的資料集經歷過GroupByKey

原始資料集 RDD[(SESSIONID,使用者資料)] ——GroupByKey——> RDD[(sessionid,使用者資料)]

2. 分頁抽取單跳標籤

PageSplitRDD:RDD[(flag,1)]

根據物件,過濾所有符合業務物件的單跳標籤,如3_5,返回

如果路徑為 : 1,2,3,4,5,6,7

則單跳標籤是(1_2,2_3,3_4,4_5,56,6_7)

然後再GroupByKey,統計出PageSplitRDD:RDD[(flag,Count)]的count值。

3. 計算每個首頁個數

startPagePV: Long

4. 根據任務物件的頁面流路徑,計算所有單跳頁面的轉化率

//3,5,2,4,6

//3_5

//3_5pv/3pv

//5_2 rate=5_2 pv /3_5pv

得到最終結果Map[flag,Double] 這個Double值是轉化率結果。

最後包裝一下類,輸出到MySQL

資料來源

還有城市表,資料結構如下

Array((0L, "北京", "華北"), (1L, "上海", "華東"), (2L, "南京", "華東"), (3L, "廣州", "華南"), (4L, "三亞", "華南"), (5L, "武漢", "華中"), (6L, "長沙", "華中"), (7L, "西安", "西北"), (8L, "成都", "西南"), (9L, "哈爾濱", "東北"))

解題

無論資料來源有幾張表,都要匯聚成公共鍵的方式

比如以city為公共鍵為key,然後後期再轉到area的區域為key去求TopN

所以此處輸出資料來源公共鍵的結果資料集

joinedRDD: RDD[(city_id,(Row,Row))] ——>map

mappedRDD:RDD[(cityid,cityName,area,productid)]

2. 建立臨時表進行SQL處理

createOrReplaceTmpView("tmp_click_productid)")

DataFrame:RDD[Row(area,product_id,count,cityids)]

DataFrame:createOrReplaceTmpView("tmp_area_product_click_count")

最後將tmp_area_product_click_count和tmp_click_productid進行綜合。

得到新臨時表tmparea_full_prod_click_count

在新的臨時表上進行SQL操作,得到每個區域的TOP3

最後輸出區域Top3產品表

資料來源

Kafka : timestamp province city userid adid

MySQL:歷史黑名單列表

blacklistRDD:RDD[(userid,True)]

解題

從Kafka接收訊息transform運算元過濾掉歷史黑名單使用者列表

filteredAdRealTimeLogDStream:DStream[(userid,log)]

2. 本質上是WordCount,所以reduceByKey進行聚合統計

blacklistDstream[(datekey + "_"+userid+"_"+adid,Count)]

資料來源

Kafka 資料,需求7中過濾掉黑名單列表後的使用者

解題

分離出符合題幹要求的資料結構型別->每天,各省,各城市

filteredAdRealTimeLogDStream:DStream[(userid,log)]經過Map運算元

mappedDStream[(dateKey+ "_" + province + ""+city+"_"+adid,1L)]

2.實時資料累加統計使用updateStateByKey

aggregatedDStream:Dstream[(dateKey+ "" +province + ""+city + "_"+adid,Count)]

存入MYSQL表

9. 各省熱門廣告實時統計

統計每天各省 top3 熱門廣告。

資料來源

解題

1.將原先按照每天各個省各個城市的key值劃分轉變成為按照每天各個省份的key值

aggregatedDStream:Dstream[(dateKey+ "" +province + ""+city + "_"+adid,Count)]

經過reduceByKey運算元變成

dailyAdClickCountByProvinceRDD:RDD[(date,province,adid,clickCount)]

2. 註冊成臨時表,經過SQL統計出TOP3的結果

createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")

輸出結果

provinceTop3AdDF:DataFrame

資料來源

Kafka:timestamp province city userid adid

解題

透過reduceByKeyAndWindow運算元對過去一小時的資料進行聚合統計

adRealTimeValueDStream:Dstream[String]

map操作

pairDStream:DStream[(yyyyMMddHHMM_adid,1L)]

經歷一道reduceByKeyAndWindow((a:Long,b:Long)=>(a+b),Minutes(60L),Second(10L))

aggrRDD:DStream[(yyyyMMddHHMM_adid,Count)]

輸出最後的結果

11. 每個區域下每條道路的車輛數和每個區域下透過車輛數最多的前3條道路

11.1 每個區域下每條道路的車輛數

row_number() over (partition by xxx order by xxx ) as rank ----從1開始

11.2 每個區域下透過車輛數最多的前3條道路

資料來源

第一張表(monitor_flow_action)元資料資訊

date

monitor_id

camera_id

car

action_time

speed

road_id

area_id

日期

監控id

攝像id

汽車

發生時間

速度

道路id

區域id

第二張表(tmp_car_flow_basic)資料資訊

area_id

area_name

road_id

monitor_id

car

區域id

區域名

道路id

監控id

汽車

解題

第一種

select

area_id,road_id,count(car) as carCount ,UDAF(monitor_id) as monitorids

from

t1

group by

area_id,road_id --- t2

第二種

select

area_id,road_id,carCount,monitorids

from

(

select

area_id,road_id ,carCount ,monitorids,row_number() over (partition by area_id order by carCount ) as rank

from

t2

)

where t2.rank <=3

12. 實時統計通路擁堵情況

每隔5秒計算過去5分鐘卡口的平均速度?

資料來源

DStream

日期

監控id

卡口id

車牌

時間

速度

區域

10

06

2020-06-01

0006

03857

魯A79690

2020-06-01 16:35:32

230

07

解題

DStream -- (monitor_id,speed) ———5分鐘資料 放到一個DStream,

(monitor_id,(speed,1))--reduceByKey()

13. 車速相對較高的topN卡口

解題思路

14. 車輛軌跡

解題思路

15. 卡口監控

解題思路

16.廣播機制+filter 代替join

解題思路

17. 隨機抽取車輛

解題思路

18.卡口流量轉換

解題思路

19.車速top10

解題思路

14
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 【量化乾貨】用python搭建量化交易策略