首頁>技術>

引言:KubeEdge 是一個開源的邊緣計算平臺,它在Kubernetes原生的容器編排和排程能力之上,擴充套件實現了 雲邊協同、計算下沉、海量邊緣裝置管理、邊緣自治等能力。KubeEdge還將透過外掛的形式支援5G MEC、AI雲邊協同等場景,目前在很多領域都已落地應用。

本篇文章主要分享基於KubeEdge和Kuiper實現邊緣流式資料處理的實踐經驗。

Kuiper : 在邊緣的流失處理產品

Kuiper是從2019年初開始研發,在2019年10月份,釋出了第一個版本,一直持續迭代到現在,它的整體架構是一個比較經典的流失處理架構。

產品設計目標:在雲端執行的流式處理,像Spark與Flink可以執行在邊緣端

Kuiper架構圖

右側為Sinks,代表資料處理完成後所要儲存的位置,也就是目標系統,目標可以是MQTT,可以將其存到檔案、資料庫裡面,也可以呼叫HTTP service;

中間部分包括這幾層,最上層為資料業務邏輯處理,這個層面提供了SQL statement、Rule Parser,SQL processors進行處理後並將其轉化成SQL plan;下面層為Streaming runtime和SQL runtime, 執行最終執行出來的 plan;最底層為storage,用來儲存部分訊息流出。

Kuiper使用場景

流失處理:實現在邊緣端的實時流式處理

資料格式與協議轉換:實現邊緣與雲端不同型別的資料格式與異構協議之間靈活轉換,實現IT&OT融合

KubeEdge與Kuiper整合

部分架構圖

Kuiper是裝在 KubeEdge MQTT Broker後面,整個都執行在邊緣端,底下為不同的Mapper,也就是接入各種各樣不同的協議。邊緣MQTT Broker用來交換訊息。

資料處理的型別:從裝置模型檔案定義中獲取型別定義將資料轉換為Kuiper的資料型別建立流時,可使用schema-less流定義

支援的資料型別有int、string、bool、float

KubeEdge模型檔案和配置

下圖為部分配置檔案,包括裝置的名稱、屬性、name、data type、Description等。

部分配置檔案

1)儲存裝置模型檔案

2)在ect/mqtt_source.yaml中配置模型檔案資訊

KubeEdgeVersion:目前未使用,為適配將來不同的版本模型檔案預留KubeEdgeModelFile:模型檔案路徑

3)透過config-map下發配置,儲存到相關目錄下

Kuiper使用過程

1) 定義流:類似餘數據庫中表格的定義

DATASOURCE=”$hw/events/device/+/twin/update”為 KubeEdge 裡定義好的topic。

2)定義並提交規則用SQL實現業務邏輯,並將執行結果傳送到指定目標支援的SQL

SELECT/FROM/WHERE/ORDER

JOIN/GROUP/HAVING

4類時間視窗+1個計數視窗

60+SQL函式

3)執行

KubeEdge中部署Kuiper規則

方法一:

1) 運用Kuiper-Kubernetes-tool

2)該程式為一個工具類,單獨執行在容器中,執行透過config-map下發的命令配置檔案

配置檔案中用於指定kuiper服務所在的地址和埠等資訊命令檔案所在的目錄

3) 透過config-map下發命令執行檔案,該工具定期自動掃描檔案,然後執行命令

方法二:Kuiper manager-雲邊協同管理控制檯

另外一種方式是透過管理控制檯來管理很多Kuiper節點,因為Kuiper可以執行在很多節點上。

比如Kuiper可以執行在車聯網的盒子裡面,車聯網有很多車,可以透過Kuiper-manager把所有的例項都接入進來,統一對其進行規則更新。

第一步是安裝外掛,我們提供了一些外掛的知識,比如要接入不同的源,如果我們這邊的源不支援,則可以自己寫個外掛,將外掛進行安裝,安裝上去之後我們提供安卓外掛介面,就可以使用了。

接下來為建立流定義:

下圖為資料儲存的位置,下圖所示為將資料儲存到檔案系統,進行路徑的指定。

下圖為視覺化的編輯介面,可以進行規則的編寫。

應用案例

案例一:國家工業網際網路大資料中心

注:完整案例介紹見文末 推薦閱讀

該案例是一個非常典型的使用場景。K8s+CloudCore部署在雲端,將規則透過管理通道下放到Kuiper,Kuiper的位置是放在MQTT broker,會將資料定義,實現資料的清洗。

目前通道有兩條,第一條是將處理完的訊息發往Cloud MQTT broker,第二條通道比如本地要做資料持久化,可將其存到Influxdb這個持續資料庫,我們在邊緣發生的一些第三方應用可以直接去調Influxdb裡面的資料,做一些展示視覺化等。底層是透過Mapper把不同的資料給接上來。

Kuiper裡規則引擎的使用場景

LF EdgeX Foundry內建規則引擎,於2020年4月Geneva版本中已經正式釋出。

案例二:異構系統對接資料格式轉換

實現與ERP、MES等IT系統資料交換,我們提供了一個非常靈活的擴充套件能力,包括異構資料透過擴充套件外掛採集後,可以利用SQL內建函式或者擴充套件函式進行快速、靈活處理;

第二點是拿到資料處理結果後,透過sink的資料模板可以對分析結果進行轉換,靈活適配各類目標系統所需的資料格式和協議,比如同樣一條溫度大於30度的規則,如果要去傳送控制裝置的指令,並且要發到微信上。這兩個不同的目標系統,它所需要的介面和資料是不一樣的,但對於這個規則是一樣的,那麼可以在 data裡面,根據同一條規則觸發兩個不同的操作,你可以指定不同的 topic,資料即可傳送,不需再進行復雜的程式設計;

第三點是利用SAP NetWeaver RFC SDK,實現從SAP中讀取資料,處理並轉換後傳送到別的異構系統。

效能資料

1)Kuiper 支援併發執行數千條規則8000規則*0.1訊息/秒/規則,共計的TPS為800條/秒規則定義

源:MQTT

SQL:select temperature from source where temperature>20(90%資料被過濾)

目標:日誌

配置

AWS:2core*4GB

Ubuntu

資源使用

Memory:89%~72%;0.4MB/rule

GPU:25%

2)AWS t2.micro 配置10k+/s訊息吞吐

出處:https://mp.weixin.qq.com/s?__biz=MzIzNzU5NTYzMA==&mid=2247491492&idx=1&sn=a4028c3fe18b3ebbc32549fc2aa8dd81

10
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • ElasticSearch基本dsl