RDD、DataFrame和DataSet是容易產生混淆的概念,必須對其相互之間對比,才可以知道其中異同。
RDD和DataFrame
RDD-DataFrame
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為型別引數,但Spark框架本身不瞭解
Person類的內部結構。而右側的DataFrame卻提供了詳細的結構資訊,使得Spark
SQL可以清楚地知道該資料集中包含哪些列,每列的名稱和型別各是什麼。DataFrame多了資料的結構資訊,即schema。RDD是分散式的
Java物件的集合。DataFrame是分散式的Row物件的集合。DataFrame除了提供了比RDD更豐富的運算元以外,更重要的特點是提升執行效
率、減少資料讀取以及執行計劃的最佳化,比如filter下推、裁剪等。
提升執行效率
RDD
API是函式式的,強調不變性,在大部分場景下傾向於建立新物件而不是修改老物件。這一特點雖然帶來了乾淨整潔的API,卻也使得Spark應用程式在運
行期傾向於建立大量臨時物件,對GC造成壓力。在現有RDD
API的基礎之上,我們固然可以利用mapPartitions方法來過載RDD單個分片內的資料建立方式,用複用可變物件的方式來減小物件分配和GC的
開銷,但這犧牲了程式碼的可讀性,而且要求開發者對Spark執行時機制有一定的瞭解,門檻較高。另一方面,Spark
SQL在框架內部已經在各種可能的情況下儘量重用物件,這樣做雖然在內部會打破了不變性,但在將資料返回給使用者時,還會重新轉為不可變資料。利用
DataFrameAPI進行開發,可以免費地享受到這些最佳化效果。
減少資料讀取
分析大資料,最快的方法就是——忽略它。這裡的“忽略”並不是熟視無睹,而是根據查詢條件進行恰當的剪枝。
對於一些“智慧”資料格式,Spark
SQL還可以根據資料檔案中附帶的統計資訊來進行剪枝。簡單來說,在這類資料格式中,資料是分段儲存的,每段資料都帶有最大值、最小值、null值數量等
一些基本的統計資訊。當統計資訊表名某一資料段肯定不包括符合查詢條件的目標資料時,該資料段就可以直接跳過(例如某整數列a某段的最大值為100,而查
詢條件要求a>200)。
此外,SparkSQL也可以充分利用RCFile、ORC、Parquet等列式儲存格式的優勢,僅掃描查詢真正涉及的列,忽略其餘列的資料。
執行最佳化
人口資料分析示例
為了說明查詢最佳化,我們來看上圖展示的人口資料分析的示例。圖中構造了兩個DataFrame,將它們join之後又做了一次filter操作。如
果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的資料集。如果我們能將filter
下推到join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,便可以有效縮短執行時間。而Spark
SQL的查詢最佳化器正是這樣做的。簡而言之,邏輯查詢計劃最佳化就是一個利用基於關係代數的等價變換,將高成本的操作替換為低成本操作的過程。
得到的最佳化執行計劃在轉換成物理執行計劃的過程中,還可以根據具體的資料來源的特性將過濾條件下推至資料來源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用於執行最終的讀取操作的表掃描節點內。
對於普通開發者而言,查詢最佳化器的意義在於,即便是經驗並不豐富的程式設計師寫出的次優的查詢,也可以被儘量轉換為高效的形式予以執行。
RDD和DataSet
DataSet以Catalyst邏輯執行計劃表示,並且資料以編碼的二進位制形式被儲存,不需要反序列化就可以執行sorting、shuffle等操作。
DataSet創立需要一個顯式的Encoder,把物件序列化為二進位制,可以把物件的scheme對映為SparkSQl型別,然而RDD依賴於執行時反射機制。
透過上面兩點,DataSet的效能比RDD的要好很多。
DataFrame和DataSet
Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record儲存的是一個強型別值而不是一個Row。因此具有如下三個特點:
DataSet可以在編譯時檢查型別
並且是面向物件的程式設計介面。用wordcount舉例:
//DataFrame
//Loadatextfileandinterpreteachlineasajava.lang.String
valds=sqlContext.read.text("/home/spark/1.6/lines").as[String]
valresult=ds
.flatMap(_.split(""))//Splitonwhitespace
.filter(_!="")//Filteremptywords
.toDF()//ConverttoDataFrametoperformaggregation/sorting
.groupBy($"value")//Countnumberofoccurencesofeachword
.agg(count("*")as"numOccurances")
.orderBy($"numOccurances"desc)//Showmostcommonwordsfirst
後面版本DataFrame會繼承DataSet,DataFrame是面向SparkSQL的介面。
//DataSet,完全使用scala程式設計,不要切換到DataFrame
valwordCount=
ds.flatMap(_.split(""))
.filter(_!="")
.groupBy(_.toLowerCase())//Insteadofgroupingonacolumnexpression(i.e.$"value")wepassalambdafunction
.count()
DataFrame和DataSet可以相互轉化,df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。
RDD、DataFrame和DataSet是容易產生混淆的概念,必須對其相互之間對比,才可以知道其中異同。
RDD和DataFrame
RDD-DataFrame
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為型別引數,但Spark框架本身不瞭解
Person類的內部結構。而右側的DataFrame卻提供了詳細的結構資訊,使得Spark
SQL可以清楚地知道該資料集中包含哪些列,每列的名稱和型別各是什麼。DataFrame多了資料的結構資訊,即schema。RDD是分散式的
Java物件的集合。DataFrame是分散式的Row物件的集合。DataFrame除了提供了比RDD更豐富的運算元以外,更重要的特點是提升執行效
率、減少資料讀取以及執行計劃的最佳化,比如filter下推、裁剪等。
提升執行效率
RDD
API是函式式的,強調不變性,在大部分場景下傾向於建立新物件而不是修改老物件。這一特點雖然帶來了乾淨整潔的API,卻也使得Spark應用程式在運
行期傾向於建立大量臨時物件,對GC造成壓力。在現有RDD
API的基礎之上,我們固然可以利用mapPartitions方法來過載RDD單個分片內的資料建立方式,用複用可變物件的方式來減小物件分配和GC的
開銷,但這犧牲了程式碼的可讀性,而且要求開發者對Spark執行時機制有一定的瞭解,門檻較高。另一方面,Spark
SQL在框架內部已經在各種可能的情況下儘量重用物件,這樣做雖然在內部會打破了不變性,但在將資料返回給使用者時,還會重新轉為不可變資料。利用
DataFrameAPI進行開發,可以免費地享受到這些最佳化效果。
減少資料讀取
分析大資料,最快的方法就是——忽略它。這裡的“忽略”並不是熟視無睹,而是根據查詢條件進行恰當的剪枝。
對於一些“智慧”資料格式,Spark
SQL還可以根據資料檔案中附帶的統計資訊來進行剪枝。簡單來說,在這類資料格式中,資料是分段儲存的,每段資料都帶有最大值、最小值、null值數量等
一些基本的統計資訊。當統計資訊表名某一資料段肯定不包括符合查詢條件的目標資料時,該資料段就可以直接跳過(例如某整數列a某段的最大值為100,而查
詢條件要求a>200)。
此外,SparkSQL也可以充分利用RCFile、ORC、Parquet等列式儲存格式的優勢,僅掃描查詢真正涉及的列,忽略其餘列的資料。
執行最佳化
人口資料分析示例
為了說明查詢最佳化,我們來看上圖展示的人口資料分析的示例。圖中構造了兩個DataFrame,將它們join之後又做了一次filter操作。如
果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的資料集。如果我們能將filter
下推到join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,便可以有效縮短執行時間。而Spark
SQL的查詢最佳化器正是這樣做的。簡而言之,邏輯查詢計劃最佳化就是一個利用基於關係代數的等價變換,將高成本的操作替換為低成本操作的過程。
得到的最佳化執行計劃在轉換成物理執行計劃的過程中,還可以根據具體的資料來源的特性將過濾條件下推至資料來源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用於執行最終的讀取操作的表掃描節點內。
對於普通開發者而言,查詢最佳化器的意義在於,即便是經驗並不豐富的程式設計師寫出的次優的查詢,也可以被儘量轉換為高效的形式予以執行。
RDD和DataSet
DataSet以Catalyst邏輯執行計劃表示,並且資料以編碼的二進位制形式被儲存,不需要反序列化就可以執行sorting、shuffle等操作。
DataSet創立需要一個顯式的Encoder,把物件序列化為二進位制,可以把物件的scheme對映為SparkSQl型別,然而RDD依賴於執行時反射機制。
透過上面兩點,DataSet的效能比RDD的要好很多。
DataFrame和DataSet
Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record儲存的是一個強型別值而不是一個Row。因此具有如下三個特點:
DataSet可以在編譯時檢查型別
並且是面向物件的程式設計介面。用wordcount舉例:
//DataFrame
//Loadatextfileandinterpreteachlineasajava.lang.String
valds=sqlContext.read.text("/home/spark/1.6/lines").as[String]
valresult=ds
.flatMap(_.split(""))//Splitonwhitespace
.filter(_!="")//Filteremptywords
.toDF()//ConverttoDataFrametoperformaggregation/sorting
.groupBy($"value")//Countnumberofoccurencesofeachword
.agg(count("*")as"numOccurances")
.orderBy($"numOccurances"desc)//Showmostcommonwordsfirst
後面版本DataFrame會繼承DataSet,DataFrame是面向SparkSQL的介面。
//DataSet,完全使用scala程式設計,不要切換到DataFrame
valwordCount=
ds.flatMap(_.split(""))
.filter(_!="")
.groupBy(_.toLowerCase())//Insteadofgroupingonacolumnexpression(i.e.$"value")wepassalambdafunction
.count()
DataFrame和DataSet可以相互轉化,df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。