回覆列表
  • 1 # 仙仙奇聞

    Spark提供了一個Python_Shell,即pyspark,從而可以以互動的方式使用Python編寫Spark程式。

    有關Spark的基本架構介紹參考http://blog.csdn.net/cymy001/article/details/78483614;

    有關Pyspark的環境配置參考http://blog.csdn.net/cymy001/article/details/78430892。

    pyspark裡最核心的模組是SparkContext(簡稱sc),最重要的資料載體是RDD。RDD就像一個NumPy array或者一個Pandas Series,可以視作一個有序的item集合。只不過這些item並不存在driver端的記憶體裡,而是被分割成很多個partitions,每個partition的資料存在叢集的executor的記憶體中。

    引入Python中pyspark工作模組

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    #任何Spark程式都是SparkContext開始的,SparkContext的初始化需要一個SparkConf物件,SparkConf包含了Spark叢集配置的各種引數(比如主節點的URL)。初始化後,就可以使用SparkContext物件所包含的各種方法來建立和操作RDD和共享變數。Spark shell會自動初始化一個SparkContext(在Scala和Python下可以,但不支援Java)。

    #getOrCreate表明可以視情況新建session或利用已有的session

    1

    2

    3

    4

    5

    6

    7

    SparkSession是Spark 2.0引入的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。 在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們透過sparkcontext來建立和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點。SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。

    初始化RDD的方法

    (1)本地記憶體中已經有一份序列資料(比如python的list),可以透過sc.parallelize去初始化一個RDD。當執行這個操作以後,list中的元素將被自動分塊(partitioned),並且把每一塊送到叢集上的不同機器上。

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    #(a)利用list建立一個RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD。

    rdd = sc.parallelize([1,2,3,4,5])

    rdd

    #Output:ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

    #(b)getNumPartitions()方法檢視list被分成了幾部分

    rdd.getNumPartitions()

    #Output:4

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    #(c)glom().collect()檢視分割槽狀況

    rdd.glom().collect()

    #Output:[[1], [2], [3], [4, 5]]

    1

    2

    3

    在這個例子中,是一個4-core的CPU筆記本;Spark建立了4個executor,然後把資料分成4個塊。colloect()方法很危險,資料量上BT檔案讀入會爆掉記憶體……

    (2)建立RDD的另一個方法是直接把文字讀到RDD。文字的每一行都會被當做一個item,不過需要注意的一點是,Spark一般預設給定的路徑是指向HDFS的,如果要從本地讀取檔案的話,給一個file://開頭(windows下是以file:\\開頭)的全域性路徑。

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    #(a)記錄當前pyspark工作環境位置

    import os

    cwd=os.getcwd()

    cwd

    #Output:"C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark"

    #(b)要讀入的檔案的全路徑

    rdd=sc.textFile("file:\\\\\\" + cwd + "\\names\yob1880.txt")

    rdd

    #Output:file:\\\C:\Users\Yu\0JulyLearn\5weekhadoopspark\names\yob1880.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

    #(c)first()方法取讀入的rdd資料第一個item

    rdd.first()

    #Output:"Mary,F,7065"

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    甚至可以sc.wholeTextFiles讀入整個資料夾的所有檔案。但是要特別注意,這種讀法,RDD中的每個item實際上是一個形如(檔名,檔案所有內容)的元組。讀入整個資料夾的所有檔案。

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    #記錄當前pyspark工作環境位置

    import os

    cwd=os.getcwd()

    cwd

    #Output:"C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark"

    rdd = sc.wholeTextFiles("file:\\\\\\" + cwd + "\\names\yob1880.txt")

    rdd

    #Output:org.apache.spark.api.java.JavaPairRDD@12bcc15

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    rdd.first()

    Output:

    ("file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt",

    1

    2

    3

    4

    5

    其餘初始化RDD的方法,包括:HDFS上的檔案,Hive中的資料庫與表,Spark SQL得到的結果。這裡暫時不做介紹。

    RDD Transformation

    (1)RDDs可以進行一系列的變換得到新的RDD,有點類似列表推導式的操作,先給出一些RDD上最常用到的transformation:

    map() 對RDD的每一個item都執行同一個操作

    flatMap() 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list

    filter() 篩選出來滿足條件的item

    distinct() 對RDD中的item去重

    sample() 從RDD中的item中取樣一部分出來,有放回或者無放回

    sortBy() 對RDD中的item進行排序

    1

    2

    3

    4

    5

    6

    如果想看操作後的結果,可以用一個叫做collect()的action把所有的item轉成一個Python list。資料量大時,collect()很危險……

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    numbersRDD = sc.parallelize(range(1,10+1))

    print(numbersRDD.collect())

    #map()對RDD的每一個item都執行同一個操作

    squaresRDD = numbersRDD.map(lambda x: x**2) # Square every number

    print(squaresRDD.collect())

    #filter()篩選出來滿足條件的item

    filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0) # Only the evens

    print(filteredRDD.collect())

    #Output:

    #[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    #[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

    #[2, 4, 6, 8, 10]

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    #flatMap() 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list

    sentencesRDD=sc.parallelize(["Hello world","My name is Patrick"])

    wordsRDD=sentencesRDD.flatMap(lambda sentence: sentence.split(" "))

    print(wordsRDD.collect())

    print(wordsRDD.count())

    #Output:

    #["Hello", "world", "My", "name", "is", "Patrick"]

    #6

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    對比一下:

    這裡如果使用map的結果是[[‘Hello’, ‘world’], [‘My’, ‘name’, ‘is’, ‘Patrick’]],

    使用flatmap的結果是全部展開[‘Hello’, ‘world’, ‘My’, ‘name’, ‘is’, ‘Patrick’]。

    flatmap即對應Python裡的如下操作:

    l = ["Hello world", "My name is Patrick"]

    ll = []

    for sentence in l:

    ll = ll + sentence.split(" ") #+號作用,two list拼接

    ll

    1

    2

    3

    4

    5

    (2)最開始列出的各個Transformation,可以一個接一個地串聯使用,比如:

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    def doubleIfOdd(x):

    if x % 2 == 1:

    return 2 * x

    else:

    return x

    numbersRDD = sc.parallelize(range(1,10+1))

    resultRDD = (numbersRDD

    .map(doubleIfOdd) #map,filter,distinct()

    .filter(lambda x: x > 6)

    .distinct()) #distinct()對RDD中的item去重

    resultRDD.collect()

    #Output:[8, 10, 18, 14]

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    (3)當遇到更復雜的結構,比如被稱作“pair RDDs”的以元組形式組織的k-v對(key, value),Spark中針對這種item結構的資料,定義了一些transform和action:

    reduceByKey(): 對所有有著相同key的items執行reduce操作

    groupByKey(): 返回類似(key, listOfValues)元組的RDD,後面的value List 是同一個key下面的

    sortByKey(): 按照key排序

    countByKey(): 按照key去對item個數進行統計

    collectAsMap(): 和collect有些類似,但是返回的是k-v的字典

    1

    2

    3

    4

    5

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    rdd=sc.parallelize(["Hello hello", "Hello New York", "York says hello"])

    resultRDD=(rdd

    .flatMap(lambda sentence:sentence.split(" "))

    .map(lambda word:word.lower())

    .map(lambda word:(word, 1)) #將word對映成(word,1)

    .reduceByKey(lambda x, y: x + y)) #reduceByKey對所有有著相同key的items執行reduce操作

    resultRDD.collect()

    #Output:[("hello", 4), ("york", 2), ("says", 1), ("new", 1)]

    result = resultRDD.collectAsMap() #collectAsMap類似collect,以k-v字典的形式返回

    result

    #Output:{"hello": 4, "new": 1, "says": 1, "york": 2}

    resultRDD.sortByKey(ascending=True).take(2) #sortByKey按鍵排序

    #Output:[("hello", 4), ("new", 1)]

    #取出現頻次最高的2個詞

    print(resultRDD

    .sortBy(lambda x: x[1], ascending=False)

    .take(2))

    #Output:[("hello", 4), ("york", 2)]

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    RDD間的操作

    (1)如果有2個RDD,可以透過下面這些操作,對它們進行集合運算得到1個新的RDD

    rdd1.union(rdd2): 所有rdd1和rdd2中的item組合(並集)

    rdd1.intersection(rdd2): rdd1 和 rdd2的交集

    rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集)

    rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡爾乘積(正交和)

    1

    2

    3

    4

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    #初始化兩個RDD

    numbersRDD = sc.parallelize([1,2,3])

    moreNumbersRDD = sc.parallelize([2,3,4])

    1

    2

    3

    4

    5

    6

    7

    8

    9

    numbersRDD.union(moreNumbersRDD).collect() #union()取並集

    #Output:[1, 2, 3, 2, 3, 4]

    numbersRDD.intersection(moreNumbersRDD).collect() #intersection()取交集

    #Output:[2, 3]

    numbersRDD.subtract(moreNumbersRDD).collect() #substract()取差集

    #Output:[1]

    numbersRDD.cartesian(moreNumbersRDD).collect() #cartesian()取笛卡爾積

    #Output:[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    (2)在給定2個RDD後,可以透過一個類似SQL的方式去join它們

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    # Home of different people

    homesRDD = sc.parallelize([

    ("Brussels", "John"),

    ("Brussels", "Jack"),

    ("Leuven", "Jane"),

    ("Antwerp", "Jill"),

    ])

    # Quality of life index for various cities

    lifeQualityRDD = sc.parallelize([

    ("Brussels", 10),

    ("Antwerp", 7),

    ("RestOfFlanders", 5),

    ])

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    homesRDD.join(lifeQualityRDD).collect() #join

    #Output:

    #[("Antwerp", ("Jill", 7)),

    # ("Brussels", ("John", 10)),

    # ("Brussels", ("Jack", 10))]

    homesRDD.leftOuterJoin(lifeQualityRDD).collect() #leftOuterJoin

    #Output:

    #[("Antwerp", ("Jill", 7)),

    # ("Leuven", ("Jane", None)),

    # ("Brussels", ("John", 10)),

    # ("Brussels", ("Jack", 10))]

    homesRDD.rightOuterJoin(lifeQualityRDD).collect() #rightOuterJoin

    #Output:

    #[("Antwerp", ("Jill", 7)),

    # ("RestOfFlanders", (None, 5)),

    # ("Brussels", ("John", 10)),

    # ("Brussels", ("Jack", 10))]

    homesRDD.cogroup(lifeQualityRDD).collect() #cogroup

    #Output:

    #[("Antwerp",

    # (<pyspark.resultiterable.ResultIterable at 0x73d2d68>,

    # <pyspark.resultiterable.ResultIterable at 0x73d2940>)),

    # ("RestOfFlanders",

    # (<pyspark.resultiterable.ResultIterable at 0x73d2828>,

    # <pyspark.resultiterable.ResultIterable at 0x73d2b70>)),

    # ("Leuven",

    # (<pyspark.resultiterable.ResultIterable at 0x73d26a0>,

    # <pyspark.resultiterable.ResultIterable at 0x7410a58>)),

    # ("Brussels",

    # (<pyspark.resultiterable.ResultIterable at 0x73d2b38>,

    # <pyspark.resultiterable.ResultIterable at 0x74106a0>))]

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    # Oops! Those <ResultIterable>s are Spark"s way of returning a list

    # that we can walk over, without materializing the list.

    # Let"s materialize the lists to make the above more readable:

    (homesRDD

    .cogroup(lifeQualityRDD)

    .map(lambda x:(x[0], (list(x[1][0]), list(x[1][1]))))

    .collect())

    #Output:

    #[("Antwerp", (["Jill"], [7])),

    # ("RestOfFlanders", ([], [5])),

    # ("Leuven", (["Jane"], [])),

    # ("Brussels", (["John", "Jack"], [10]))]

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    惰性計算,actions方法

    特別注意:Spark的一個核心概念是惰性計算。當你把一個RDD轉換成另一個的時候,這個轉換不會立即生效執行!!!Spark會把它先記在心裡,等到真的有actions需要取轉換結果時,才會重新組織transformations(因為可能有一連串的變換)。這樣可以避免不必要的中間結果儲存和通訊。

    常見的action如下,當它們出現的時候,表明需要執行上面定義過的transform了:

    collect(): 計算所有的items並返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果

    first(): 和上面是類似的,不過只返回第1個item

    take(n): 類似,但是返回n個item

    count(): 計算RDD中item的個數

    top(n): 返回頭n個items,按照自然結果排序

    reduce(): 對RDD中的items做聚合

    1

    2

    3

    4

    5

    6

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    rdd = sc.parallelize(range(1,10+1))

    rdd.reduce(lambda x, y: x + y) #reduce(): 對RDD中的items做聚合

    #Output:55

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    reduce的原理:先在每個分割槽(partition)裡完成reduce操作,然後再全域性地進行reduce。

    有時候需要重複用到某個transform序列得到的RDD結果。但是一遍遍重複計算顯然是要開銷的,所以我們可以透過一個叫做cache()的操作把它暫時地儲存在記憶體中。快取RDD結果對於重複迭代的操作非常有用,比如很多機器學習的演算法,訓練過程需要重複迭代。

    import pyspark

    from pyspark import SparkContext as sc

    from pyspark import SparkConf

    conf=SparkConf().setAppName("miniProject").setMaster("local[*]")

    sc=SparkContext.getOrCreate(conf)

    import numpy as np

    numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))

    squaresRDD = numbersRDD.map(lambda x: x**2)

    squaresRDD.cache() # Preserve the actual items of this RDD in memory

    avg = squaresRDD.reduce(lambda x, y: x + y) / squaresRDD.count()

    print(avg)

    #Output:38.5

  • 2 # 騰訊技術工程

    ok,接下來可以愉快的除錯了

    二、pyspark訪問TDW

    TDW介面資訊可參考http://git.code.oa.com/tdw/tdw-spark-common/wikis/api

    這裡附上一個讀取TDW表示例:

    三、在TESLA上部署任務

    這裡附上一個蟲洞完整示例:

    1、在tesla上執行任務需先配置資源

    2、在輸入元件裡選擇一個蟲洞依賴任務

    3、配置蟲洞依賴

    這裡需要注意的是依賴週期{YYYYMMDD}預設是T-1的,跟TDW的時間一致。

    4、配置pyspark元件

    在元件-機器學習里拉入一個PySpark元件

    1)配置元件引數

    上傳在idex上除錯透過的指令碼,並配置演算法引數

    2)配置資源引數

    3)配置特殊引數

    4)新增排程時間

    5、對接洛子配置蟲洞任務

    tesla任務配置ok了之後,那麼還需要一個蟲洞任務用於判斷該tesla任務是否完成。

    在洛子上新建一個蟲洞任務:

    這裡需要注意的是目標型別選擇節點,檢查ID即tesla任務id:

    開始時間和結束時間均需精確到秒(${YYYYMMDD}000000)

    ok,到這裡整個tesla蟲洞流程就搞定了。

  • 中秋節和大豐收的關聯?
  • 世界各國的步槍更新了幾代?步槍的發展歷史是怎樣的?