適用面很廣的Storm
與之前提到的Actor面向單條訊息的分散式計算模型不同,Apache Storm(後簡稱Storm)提供的是面向連續的訊息流(Stream)的一種通用的分散式計算解決框架。但兩者都擁有簡單、唯美的程式設計模型,提供的程式設計模型很簡單也足夠靈活,具備很強的適用性,可以在多個領域發揮重要作用。
Storm是一個免費、開源的分散式實時計算系統,它的前身是Twitter Storm,後來被捐獻給Apache併成功孵化。大家在討論Apache Spark(後簡稱Spark)與Storm之間的流資料處理能力時,往往會給出共識性的結論:Storm確實擁有更好的規模化能力與速度表現,但使用難度較大;另外,Storm逐漸被Spark 取代,因此選擇更新且更熱門的Spark 往往成為主流。
為了扭轉頹勢,Storm於 2016年4月釋出了重要的1.0版本,處理速度大幅增加,延時減少60%,在實際應用中至少提升3倍以上的效能,在某些場合下甚至可以提升10倍以上的速度。另外,新版本中的大部分改動都使得Storm更易於使用,比如增加了支援流式處理的滑動視窗API的支援,這與Spark Stream類似。從 Storm 1.0版本的革新性變化,我們看到 Storm正在嘗試重新打破Spark一家獨大的局面,希望在實時流領域重新贏回更多的話語權。
Storm的流式程式設計模型簡單、靈活,同時支援多種程式語言,包括科學計算中常見的Python,處理速度非常快,每節點每秒可以處理百萬級的元組(Tuples),因此,Storm有很多應用場景,例如實時資料分析、機器學習、持續計算、分散式RPC、ETL等。採用了Storm的一些知名企業有百度、阿里巴巴、雅虎、愛奇藝、Twitter、Spotify、Yelp、Rubicon、OOYALA、PARC、Cerner、KLOUT等。
下圖顯示了Storm的流式計算模型。執行在 Storm叢集上的是Topology(拓撲),Topology與Hadoop 上的 MapReduce Job之間的最大區別是後者最終會結束,而前者會永遠執行,除非被手動關掉。
一個Topology是由多個Spout和 Bolt節點組成的有向無環圖,節點之間透過StreamGrouping 進行連線,在Topology 裡流動的資料是一種被稱為Tuple序列的特殊資料結構——Stream。由Tuple序列構成的Stream沒有邊界,有始無終,源源不斷地從一個或多個Spout節點發出,流向有向圖裡的後繼Blot節點並被層層加工和轉換,最終產生業務所需要的處理結果。
作為訊息源的Spout節點分為兩類:可靠的和不可靠的。對於一個可靠的Spout來說,如果它發出的某個Tuple沒有被成功處理,則Spout可以重新發送一次;但對於不可靠的Spout,Tuple一旦發出就不能重發了。所有訊息處理邏輯都被封裝在Bolt裡,一個 Bolt可以做很多事情,例如過濾、聚合、查詢資料庫等。按照軟體設計中的單一職責原則,每種 Bolt都應該只承擔一項職責,多種 Bolt 相互配合,從而實現複雜的訊息流處理邏輯。
Topology 中的最後一個重要概念是Stream grouping,它用來定義每個Bolt接收什麼樣的流作為輸入,比如Shuffle Grouping(隨機分組〉隨機派發Stream裡面的Tuple,保證每個Bolt接收到的Tuple數量都大致相同;Fields Grouping (按欄位分組)則可以按照Tuple裡的某個屬性欄位的值來分組,類似於分片方式,具有同樣欄位值的Tuple會被分到同一個Bolt 裡;DirectGrouping (直接分組)則由使用者透過程式設計來控制如何分發Tuple。使用者開發的Topology最終會被打包為一個JAR檔案並透過工具上傳到Storm叢集中,最終觸發Topology 的執行。
如下所示是Storm叢集架構圖。
從上圖可以看到,Storm叢集由3部分組成,其中 ZooKeeper 主要用來實現服務發現機制及任務和系統狀態資料的儲存;Nimbus 則是Storm叢集的 Master,它其實是一個Thrift RPC(又叫RPC)協議的服務端,處理客戶端發起的RPC呼叫請求,例如提交一個計算拓撲作業的請求,Nimbus在啟動時會連線ZooKeeper,並在ZooKeeper中建立節點以儲存作業執行過程中的所有狀態資訊。同時,任務分配是 Nimbus透過ZooKeeper實現的,Nimbus將任務分配資訊(TaskAssignment)寫到ZooKeeper 中, Supervisor隨後會從ZooKeeper中讀取這些資訊,並啟動Worker來執行任務。
Supervisor是Slave節點,工作節點對於叢集而言就是計算資源,屬於“工人”。總體來看,Supervisor其實是一個包括多程序的複雜子系統,如下圖所示給出了Supervisor 的架構細節。每個Supervisor節點都會啟動多個Worker程序,具體啟動幾個Worker,取決於Slot配置引數列表,每個Slot 都代表一個Worker程序的監聽埠號。在一個Worker程序裡會執行多個Task。Task指的是執行某個具體的Spout或Bolt 例項的程式碼邏輯,每個Task 都會在Worker 的一個執行緒中被排程執行,在任務分配過程中,Nimbus根據Topology 設定的Spout、Bolt 數量進行排程,儘量把Sprout與 Bolt平均分配到每個Worker 上。
在執行任務之前,每個節點上的Supervisor都會從Nimbus下載Topology程式碼到本地目錄,在執行期間,Supervisor與其上的Task也會定期傳送心跳資訊到ZooKeeper,因此Nimbus可以監控整個Storm叢集的狀態,從而重啟一些掛掉的Task。
Nimbus程序和Supervisor程序都是快速失敗(fail-fast)和無狀態的,所有狀態要麼被儲存在ZooKeeper中,要麼被儲存在本地磁碟中。這也就意味著你可以用kill-9來刪除 Nimbus和Supervisor程序,然後重啟它們,就好像什麼都沒有發生,使得Storm叢集異常穩定。如果一些機器意外宕機,那麼它上面的所有任務就會被轉移到其他機器上,Storm 會自動重新分配失敗的任務,並且保證不會有資料丟失。但如果 Nimbus程序掛掉,無法管理現有的拓撲作業,如果此刻某個Supervisor節點宕機,則已有的拓撲作業無法完成故障轉移和恢復,新的拓撲作業也就無法被提交到Storm叢集中了。我們知道Nimbus是有狀態的,其中最重要的狀態資料是Client提交的Topology的二進位制程式碼(JAR檔案),這些資料被存放在 Nimbus 所在機器的本地磁碟中,所以Nimbus 作為叢集的Master,有必要保證 Nimbus 的HA。為此,Storm 實現了基於ZooKeeper的 Nimbus Master選舉和切換機制。假設我們的Nimbus由3個節點組成,並且配置的拓撲副本數為2,目前在集群裡運行了4個拓撲作業,以此來舉例說明Nimbus 的選舉切換過程。
首先,在當前的Nimbus Leader節點上儲存了4個Topology 的所有狀態資料,為了滿足拓撲副本數為2個的要求,在 nonleader-1節點上儲存了兩個Topology的狀態資料,在nonleader-2節點上儲存了另外兩個Topology的狀態資料,假如某一刻Leader節點宕機(而且磁碟損壞),則nonLeader-1節點從ZooKeeper 處立即得到這個事件通知,準備競選新一任Leader,在準備接受Leader職位之前,它需要確保所有Topology的狀態資料都在本地。若在對比本地的Topology狀態資料與ZooKeeper 上的記錄(路徑為/storm/storms/)後發現自己還缺乏其他兩個Topology狀態資料,就開始嘗試從其他節點上獲取這些資料。首先,它會獲取ZooKeeper 上的分散式鎖(路徑為/storm/code-distributor/topologyld),然後去對應的節點下載這些資料;與此同時,nonLeader-2節點也會嘗試競選Leader並獲取它缺失的Topology狀態資料,最後至少會有一個節點擁有全部的Topology 狀態資料,併成功競選為新一任Leader。
一個Topology其實指開發一系列Spout與 Bolt類,並且用合適的Stream grouping將其串聯起來,組成一個有向無環圖。下面是Spout的Java介面定義:
public interface ISpout extends Serializable {void open (Map conf,TopologyContext context,Spoutoutputcollector collector) ;void close();void nextTuple(;void ack(Object msgId);void fail (0bject msgId);}
其中,open方法是Spout的初始化方法,這裡傳入了Storm的上下文物件及用於傳送Tuple的SpoutOutputCollector物件;nextTuple方法是Spout的關鍵方法,這個方法用來建立源源不斷的Tuple資料併發送出去; ack方法是Storm成功處理Tuple時的回撥方法,在通常情況下,此方法的實現從佇列中移除對應的Tuple,防止訊息重發;而 fail方法是處理Tuple 失敗時的回撥的方法,在通常情況下,此方法的實現是將該Tuple 放回訊息佇列中,稍後重新發送。為了方便開發,Storm提供了一個實現了ISpout 介面的BaseRichSpout,這樣我們就不用實現 close、activate、 deactivate、 ack、 fail等介面方法了。
類似地,Bolt的介面 IBolt提供了以下方法。
prepare方法:此方法與Spout中的open方法類似,在叢集的一個worker中的 task初始化時呼叫,它提供了Bolt 執行的環境。cleanup方法:同ISpout的close方法,在關閉前呼叫。execute方法:這是 Bolt中最關鍵的一個方法,對Tuple的處理都可以放到此方法中進行。Execute方法接收一個Tuple進行處理,並用OutputCollector 的 ack方法(表示成功)或fail方法(表示失敗)來反饋Tuple的處理結果。Storm提供了BaseRichBolt 抽象類,其目的就是實現IBolt介面的Bolt不用在程式碼中提供反饋結果了,在Storm內部會自動反饋成功。為了指導Storm 上的應用開發,Storm提供了一系列的Storm starter例子,這些例子都很實用,有些例子甚至可以直接拿來應用到實際的業務場景中。Storm starter的原始碼在GitHub 上也可以找到。
本節最後,我們一起分析Storm starter中的經典Topology作業 WordCountTopology 的程式碼,看看一個Topology是如何定義和實現的,如下所示是WordCountTopology的拓撲圖。
WordCountTopology的邏輯過程大致為:首先,在 spout節點 (RandomSentenceSpout)中定義了一個字串陣列來模擬一個Stream,隨機選擇這個字串陣列中的一句話作為一個Tuple傳送出去;隨後,split節點 (SplitSentence)接收到這些Tuple後再將一句話分割成多個單詞,並將每個單詞作為一組Tuple傳送出去;最後,這些Tuple到了count節點(WordCount),count節點將接收到的每個單詞的出現次數進行累加,並將<單詞:出現次數>作為新Tuple傳送出去。
下面,我們看看具體的程式碼實現,首先是spout 節點對應的程式碼:
public class RandomSentenceSpout extends BaseRichSpout{private static final Logger LOG =LoggerFactory.getLogger(RandomSentenceSpout.class);SpoutoutputCollector_collector;Random rand;eoverridepublic void open (Map conf,TopologyContext context,SpoutoutputCollectorcollector) {collector -collector;rand - new Random;}coverridepublic void declareOutputFields (0utputFieldsDeclarer declarer)(declarer.declare(new Fields ( "word"));Coverridepublic void nextTuple() {Utils.sleep (100);String[] sentences = new String[]{sentence ("the cow jumped cver the moon"),sentence("an apple a day keeps the doctor away"),sentence ("four score and seven years ago"),sentence ( "snow white andthe seven dwarfs "),sentence("i am at two with nature"));final string sentence = sentences[_rand.nextInt(sentences.length)];LOG.debug( ""Emitting tuple: {]",sentence);collector.emit (new values(sentence));}protected String sentence (string input) {return input;}}
RandomSentenceSpout的declareOutputFields方法表明這裡的Tuple會輸出一個名稱為word的欄位:
declarer.declare(new Eields ( "word"));
nextTuple方法實際上隨機選擇了下面某句話對應的字串作為Tuple傳送出去:
"the cow jumped over the moon""an apple a day keeps the doctor away""four score and seven years ago""snow white and the seven dwarfs""i am at two with nature"接下來,我們看看split節點對應的程式碼:
public static class SplitSentence extends ShellBolt implements IRichBolt (public SplitSentence() {super("python","splitsentence.py");}@overridepublic void declareoutputFields (outputFieldsDeclarer declarer){declarer.declare(new Fields ( "word"));}}
在SplitSentence 的程式碼中呼叫了一個 Python指令碼來實現將字串 Tuple 分割為單詞Tuple的目標,splitsentence.py的程式碼如下:
import stormclass SplitsentenceBolt(storm.BasicBolt):def process(self, tup):words =tup .values[0].split("")for word in words:storm.emit([word])SplitSentenceBolt( .run ()
接下來,我們看看count節點對應的程式碼:
public static class WordCount extends BaseBasicBolt(Map<String, Integer> counts = new HashMap<String, Integer>();@overridepublic void execute (Tuple tuple,Basicoutputcollector collector){String word - tuple.getString(0);Integer count -counts.get(word);if(count ==null)count =0;count++;counts.put(word, count) ;collector .emit (new values(word, count));)coverridepublic void declareOutputFields (OutputFieldsDeclarer declarer){declarer.declare(new Fields ( "word", "count"));}}
我們看到,在 WordCount 內部儲存了一個HashMap <String, Integer>,在收到一個單詞後,就用這個HashMap去完成count分組統計功能,隨後作為<String, Integer>的Tuple 傳送出去。
最後,我們看看如何組裝上述Spout和 Bolt,使之成為一個完整的Topology作業。這段程式碼被存放在 org.apache.storm.starter.WordCountTopology類中,下面是它的主要程式碼片段:
public class wordCountTopology extends ConfigurableTopologypublic static void main (String[] args) throws Exception {ConfigurableTopology .start(new WordCountTopology(),args);}protected int run (String[]args){TopologyBuilder builder = new TopologyBuilder( ;builder.setSpout("spout", new RandomSentenceSpout(),5);builder.setBolt ("split",new Splitsentence(),8) .shuffleGrouping ("spout");builder.setBolt ("count",new WordCount(),12).fieldsGrouping ("split",newFields( "word"));conf.setDebug(true);String topologyName ="word-count";if(isLocaly {conf.setMaxTaskParallelism(3);ttl =10;lelse {conf.setNumworkers(3);}if (args !=- null & & args.length >0){topologyName = args[O];}return submit(topologyName,conf,builder) ;}}
首先,WordCountTopology 繼承了ConfigurableTopology,後者透過命令列引數來決定此Topology是在本地執行(為了方便測試)還是被提交到Storm叢集中執行,如果在本地執行,就會啟動一個LocalCluster Storm環境來提交拓撲作業。
其次,位於run方法中的如下程式碼是Topology定義的關鍵:
TopologyBuilder builder = new TopologyBuilder();builder.setSpout ("spout",new RandomSentenceSpout(),5);builder.setBolt ("split",new SplitSentence(),8) .shuffleGrouping ("spout");builder.setBolt ("count", new Wordcount (),12).fieldsGrouping ("split",newEields( "word"));上述程式碼首先定義了一個名為spout 的 Spout,對應的類是RandomSentenceSpout。在執行過程中,Storm會啟動5個對應的 Task 來併發執行它,spout產生的Tuple會被隨機派發( shuffleGrouping)到名為split的 Bolt上進行處理。在處理完成後產生的新的Tuple(單詞)又會被按照欄位word分組並派發(分片路由)到名為count的 Bolt上匯聚。
在後面的章節中,我們會繼續學習Storm,用Kubernetes部署一個 Storm叢集,提交上述拓撲作業並觀察執行情況。