回覆列表
  • 1 # IT大資料科技

    1.RDD是彈性分散式資料集,是一個分散式物件的集合。對個RDD可分為多個片,分片可以在叢集環境下的不同節點上計算

    2.可以透過兩種方式建立RDD:

    a.載入外部資料集

    b.在驅動程式中部署物件集合。

    c.建立RDD最簡單的方法就是採用現有的記憶體集合並把它傳遞給sc的並行化方法。適合測試,不適合生產

    優勢在於可以快速建立自己的RDD並對其執行相關的操作。

    val line = sc.parallelize(List("pandas","i like pandas"))

    d.可以載入外部儲存資料用sc.textFile("file:///home/ubuntu/simple.txt")來載入一個將文字檔案作為字串的RDD.

    val r = sc.textFile("file:///home/ubuntu/simple.txt")

    r: org.apache.spark.rdd.RDD[String] = file:///home/ubuntu/simple.txt MapPartitionsRDD[5] at textFile at <console>:24

    4.RDD兩種型別的操作:裝換和動作

    a.轉換就是將原來的RDD構建成新的RDD,例如:map,filter

    val r1 = r.filter(line => line.contains("20"))

    r1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:27

    val r2 = r.filter(line => line.contains("10"))

    val r3 = r1.union(r2)

    r3: org.apache.spark.rdd.RDD[String] = UnionRDD[11] at union at <console>:32

    a_1.當相互轉換得到並使用新的RDD時,Spark跟蹤記錄且設定不同的RDD之間的依賴關係,這種關係稱為血統圖(圖1-7)

    它使用這個資訊來按照需求計算每個RDD,以及恢復持續化的RDD丟失的那一部分資料。我們每次呼叫一個新的動作,

    整個RDD必須從頭開始計算,可以使用持久化來提高效率。

    b.動作是透過RDD來計算的結果,並且將結果返回給驅動程式或者儲存到外部儲存系統(HDFS),

    如:count()它返回計數,

    first()

    take(n)包含了前n行記錄。

    collect()用來獲取整個RDD,不應該用在大型資料集上。

    大多數情況下,RDD不能僅僅被collect()到驅動,原因是資料量太大,一般是把資料寫到HDFS或S3

    RDD的內容可以使用saveAsTextFile()或者savaAsSequenceFile()以及其他動作來儲存。

    scala> r1.first

    res12: String = 1201 wang 20

    c.惰性評估(Lazy Evaluation)

    c1.它意味著,當我們呼叫RDD的轉換時,不立即執行該操作,相反,Spark在內部記錄元資料以表明該操作已被請求,而不是考慮RDD包含的具體的資料。

    c2.Spark透過使用惰性評估,以減少其在各種轉換操作中所需要儲存的中間資料。

    5.RDD只有在第一次使用它們中的動作時才計算,可以避免浪費大量的儲存空間。因為我們隨後會立即過濾掉一部分不需要的行

    一旦Spark看到整個變換鏈,他可以計算僅需其結果的資料,對於first()動作,Spark只掃描檔案,直到他找到第一個匹配的行,不讀整個檔案。

    6.RDDS在預設的情況下每次執行它們都要進行重新計算。如果重用多個動作,可以使用持久化的方法:RDD.persist(),計算第一次後,Spark將RDD

    內容儲存在記憶體中(整個機器的叢集分割槽),預設不適用持久化的意義在於:如果不重用大資料集,可以避免浪費空間。

    7.一般會經常使用持久化去載入資料集到記憶體中,方面重複的查詢和使用

  • 2 # 小生經驗談

    rdd是spark的靈魂,中文翻譯彈性分散式資料集,一個rdd代表一個可以被分割槽的只讀資料集。rdd內部可以有許多分割槽(partitions),每個分割槽又擁有大量的記錄(records)。

    RDD本身就是一個Berkeley的博士們在寫論文時,抽象出的概念,其本質與Hadoop MapReduce處理時輸入輸出的key-value,Flink的dataset沒有本質區別。處理時,任然使用iterator一邊載入部分資料,一邊執行運算(每個partition的實現內部實際就是一個iterator)。

    我個人認為,如果要通俗的解釋RDD,第一步可以簡單的把它想象成一個數組/列表,我們可以用迭代器類的東東遍歷它,可以分片,可以打散;第二步,可以找一些例子,看些介面文件,畢竟直接以列表來理解還是優點偏差的;再之後,當用例子瞭解了功能後,可以更進一步的閱讀下它的原理。rdd的運算元主要分成2類,action和transformation。也就是變換以及一些操作。

    關於rdd的特徵,摘錄下知乎上答友的一部分:

    rdd的五個特徵:

    dependencies:建立RDD的依賴關係,主要rdd之間是寬窄依賴的關係,具有窄依賴關係的rdd可以在同一個stage中進行計算。

    partition:一個rdd會有若干個分割槽,分割槽的大小決定了對這個rdd計算的粒度,每個rdd的分割槽的計算都在一個單獨的任務中進行。

    prefered locations:按照“移動資料不如移動計算”原則,在spark進行任務排程的時候,優先將任務分配到資料塊儲存的位置。

    compute:spark中的計算都是以分割槽為基本單位的,compute函式只是對迭代器進行復合,並不儲存單次計算的結果。

    partitioner:只存在於(K,V)型別的rdd中,非(K,V)型別的partitioner的值就是None。

  • 3 # 程式技術員

    RDD概念

    RDD就是一個分散式物件集合,本質上是一個只讀的分割槽記錄集合。

    每個RDD可以分成多個分割槽,每個分割槽就是一個數據集片段(HDFS上的塊),並且一個RDD的不同分割槽可以被儲存到叢集中不同的節點上,從而可以在叢集中的不同節點上進行平行計算。

    RDD提供了一種高度受限的共享記憶體模型,即RDD是隻讀的記錄分割槽的集合,不能直接修改,只能基於穩定的物理儲存中的資料集來建立RDD,或者透過在其他RDD上執行確定的轉換操作(如map、join和groupBy)而建立得到新的RDD。

    RDD提供了一組豐富的操作以支援常見的資料運算,分為“行動”(Action)和“轉換”(Transformation)兩種型別,前者用於執行計算並指定輸出的形式,後者指定RDD之間的相互依賴關係。

    兩類操作的主要區別是,轉換操作(比如map、filter、groupBy、join等)接受RDD並返回RDD,而行動操作(比如count、collect等)接受RDD但是返回非RDD(即輸出一個值或結果)。

    直白點就是spark RDD 跟java中的API很像,每種不同的API實現的功能不一樣。RDD也是這樣,不過RDD是一種函數語言程式設計,有轉換和行動兩種RDD。

    寫個虛擬碼:

    val conf: SparkConf = new SparkConf().setAppName("sparkhello").setMaster("local")

    val sc = new SparkContext(conf)

    val text: RDD[String] = sc.textFile("/Users/leohe/Data/input/txt")

    val value: RDD[(String, Int)] = text.filter(_.contains("h")).map((_, 1))

    val l1: Long = value.count()

    println(l1)

    }

    }

    像count和filter,都是RDD

  • 中秋節和大豐收的關聯?
  • 30億美元替伊朗背鍋?卡達對伊朗的雪中送炭到底打的什麼算盤?