回覆列表
-
1 # 仙仙奇聞
-
2 # 騰訊技術工程
ok,接下來可以愉快的除錯了
二、pyspark訪問TDWTDW介面資訊可參考http://git.code.oa.com/tdw/tdw-spark-common/wikis/api
這裡附上一個讀取TDW表示例:
三、在TESLA上部署任務這裡附上一個蟲洞完整示例:
1、在tesla上執行任務需先配置資源
2、在輸入元件裡選擇一個蟲洞依賴任務
3、配置蟲洞依賴
這裡需要注意的是依賴週期{YYYYMMDD}預設是T-1的,跟TDW的時間一致。
4、配置pyspark元件
在元件-機器學習里拉入一個PySpark元件
1)配置元件引數
上傳在idex上除錯透過的指令碼,並配置演算法引數
2)配置資源引數
3)配置特殊引數
4)新增排程時間
5、對接洛子配置蟲洞任務
tesla任務配置ok了之後,那麼還需要一個蟲洞任務用於判斷該tesla任務是否完成。
在洛子上新建一個蟲洞任務:
這裡需要注意的是目標型別選擇節點,檢查ID即tesla任務id:
開始時間和結束時間均需精確到秒(${YYYYMMDD}000000)
ok,到這裡整個tesla蟲洞流程就搞定了。
Spark提供了一個Python_Shell,即pyspark,從而可以以互動的方式使用Python編寫Spark程式。
有關Spark的基本架構介紹參考http://blog.csdn.net/cymy001/article/details/78483614;
有關Pyspark的環境配置參考http://blog.csdn.net/cymy001/article/details/78430892。
pyspark裡最核心的模組是SparkContext(簡稱sc),最重要的資料載體是RDD。RDD就像一個NumPy array或者一個Pandas Series,可以視作一個有序的item集合。只不過這些item並不存在driver端的記憶體裡,而是被分割成很多個partitions,每個partition的資料存在叢集的executor的記憶體中。
引入Python中pyspark工作模組
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#任何Spark程式都是SparkContext開始的,SparkContext的初始化需要一個SparkConf物件,SparkConf包含了Spark叢集配置的各種引數(比如主節點的URL)。初始化後,就可以使用SparkContext物件所包含的各種方法來建立和操作RDD和共享變數。Spark shell會自動初始化一個SparkContext(在Scala和Python下可以,但不支援Java)。
#getOrCreate表明可以視情況新建session或利用已有的session
1
2
3
4
5
6
7
SparkSession是Spark 2.0引入的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。 在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們透過sparkcontext來建立和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點。SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。
初始化RDD的方法
(1)本地記憶體中已經有一份序列資料(比如python的list),可以透過sc.parallelize去初始化一個RDD。當執行這個操作以後,list中的元素將被自動分塊(partitioned),並且把每一塊送到叢集上的不同機器上。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#(a)利用list建立一個RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD。
rdd = sc.parallelize([1,2,3,4,5])
rdd
#Output:ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480
#(b)getNumPartitions()方法檢視list被分成了幾部分
rdd.getNumPartitions()
#Output:4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#(c)glom().collect()檢視分割槽狀況
rdd.glom().collect()
#Output:[[1], [2], [3], [4, 5]]
1
2
3
在這個例子中,是一個4-core的CPU筆記本;Spark建立了4個executor,然後把資料分成4個塊。colloect()方法很危險,資料量上BT檔案讀入會爆掉記憶體……
(2)建立RDD的另一個方法是直接把文字讀到RDD。文字的每一行都會被當做一個item,不過需要注意的一點是,Spark一般預設給定的路徑是指向HDFS的,如果要從本地讀取檔案的話,給一個file://開頭(windows下是以file:\\開頭)的全域性路徑。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#(a)記錄當前pyspark工作環境位置
import os
cwd=os.getcwd()
cwd
#Output:"C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark"
#(b)要讀入的檔案的全路徑
rdd=sc.textFile("file:\\\\\\" + cwd + "\\names\yob1880.txt")
rdd
#Output:file:\\\C:\Users\Yu\0JulyLearn\5weekhadoopspark\names\yob1880.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
#(c)first()方法取讀入的rdd資料第一個item
rdd.first()
#Output:"Mary,F,7065"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
甚至可以sc.wholeTextFiles讀入整個資料夾的所有檔案。但是要特別注意,這種讀法,RDD中的每個item實際上是一個形如(檔名,檔案所有內容)的元組。讀入整個資料夾的所有檔案。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#記錄當前pyspark工作環境位置
import os
cwd=os.getcwd()
cwd
#Output:"C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark"
rdd = sc.wholeTextFiles("file:\\\\\\" + cwd + "\\names\yob1880.txt")
rdd
#Output:org.apache.spark.api.java.JavaPairRDD@12bcc15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rdd.first()
Output:
("file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt",
1
2
3
4
5
其餘初始化RDD的方法,包括:HDFS上的檔案,Hive中的資料庫與表,Spark SQL得到的結果。這裡暫時不做介紹。
RDD Transformation
(1)RDDs可以進行一系列的變換得到新的RDD,有點類似列表推導式的操作,先給出一些RDD上最常用到的transformation:
map() 對RDD的每一個item都執行同一個操作
flatMap() 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list
filter() 篩選出來滿足條件的item
distinct() 對RDD中的item去重
sample() 從RDD中的item中取樣一部分出來,有放回或者無放回
sortBy() 對RDD中的item進行排序
1
2
3
4
5
6
如果想看操作後的結果,可以用一個叫做collect()的action把所有的item轉成一個Python list。資料量大時,collect()很危險……
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
numbersRDD = sc.parallelize(range(1,10+1))
print(numbersRDD.collect())
#map()對RDD的每一個item都執行同一個操作
squaresRDD = numbersRDD.map(lambda x: x**2) # Square every number
print(squaresRDD.collect())
#filter()篩選出來滿足條件的item
filteredRDD = numbersRDD.filter(lambda x: x % 2 == 0) # Only the evens
print(filteredRDD.collect())
#Output:
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
#[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
#[2, 4, 6, 8, 10]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#flatMap() 對RDD中的item執行同一個操作以後得到一個list,然後以平鋪的方式把這些list裡所有的結果組成新的list
sentencesRDD=sc.parallelize(["Hello world","My name is Patrick"])
wordsRDD=sentencesRDD.flatMap(lambda sentence: sentence.split(" "))
print(wordsRDD.collect())
print(wordsRDD.count())
#Output:
#["Hello", "world", "My", "name", "is", "Patrick"]
#6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
對比一下:
這裡如果使用map的結果是[[‘Hello’, ‘world’], [‘My’, ‘name’, ‘is’, ‘Patrick’]],
使用flatmap的結果是全部展開[‘Hello’, ‘world’, ‘My’, ‘name’, ‘is’, ‘Patrick’]。
flatmap即對應Python裡的如下操作:
l = ["Hello world", "My name is Patrick"]
ll = []
for sentence in l:
ll = ll + sentence.split(" ") #+號作用,two list拼接
ll
1
2
3
4
5
(2)最開始列出的各個Transformation,可以一個接一個地串聯使用,比如:
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
def doubleIfOdd(x):
if x % 2 == 1:
return 2 * x
else:
return x
numbersRDD = sc.parallelize(range(1,10+1))
resultRDD = (numbersRDD
.map(doubleIfOdd) #map,filter,distinct()
.filter(lambda x: x > 6)
.distinct()) #distinct()對RDD中的item去重
resultRDD.collect()
#Output:[8, 10, 18, 14]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(3)當遇到更復雜的結構,比如被稱作“pair RDDs”的以元組形式組織的k-v對(key, value),Spark中針對這種item結構的資料,定義了一些transform和action:
reduceByKey(): 對所有有著相同key的items執行reduce操作
groupByKey(): 返回類似(key, listOfValues)元組的RDD,後面的value List 是同一個key下面的
sortByKey(): 按照key排序
countByKey(): 按照key去對item個數進行統計
collectAsMap(): 和collect有些類似,但是返回的是k-v的字典
1
2
3
4
5
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd=sc.parallelize(["Hello hello", "Hello New York", "York says hello"])
resultRDD=(rdd
.flatMap(lambda sentence:sentence.split(" "))
.map(lambda word:word.lower())
.map(lambda word:(word, 1)) #將word對映成(word,1)
.reduceByKey(lambda x, y: x + y)) #reduceByKey對所有有著相同key的items執行reduce操作
resultRDD.collect()
#Output:[("hello", 4), ("york", 2), ("says", 1), ("new", 1)]
result = resultRDD.collectAsMap() #collectAsMap類似collect,以k-v字典的形式返回
result
#Output:{"hello": 4, "new": 1, "says": 1, "york": 2}
resultRDD.sortByKey(ascending=True).take(2) #sortByKey按鍵排序
#Output:[("hello", 4), ("new", 1)]
#取出現頻次最高的2個詞
print(resultRDD
.sortBy(lambda x: x[1], ascending=False)
.take(2))
#Output:[("hello", 4), ("york", 2)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
RDD間的操作
(1)如果有2個RDD,可以透過下面這些操作,對它們進行集合運算得到1個新的RDD
rdd1.union(rdd2): 所有rdd1和rdd2中的item組合(並集)
rdd1.intersection(rdd2): rdd1 和 rdd2的交集
rdd1.substract(rdd2): 所有在rdd1中但不在rdd2中的item(差集)
rdd1.cartesian(rdd2): rdd1 和 rdd2中所有的元素笛卡爾乘積(正交和)
1
2
3
4
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#初始化兩個RDD
numbersRDD = sc.parallelize([1,2,3])
moreNumbersRDD = sc.parallelize([2,3,4])
1
2
3
4
5
6
7
8
9
numbersRDD.union(moreNumbersRDD).collect() #union()取並集
#Output:[1, 2, 3, 2, 3, 4]
numbersRDD.intersection(moreNumbersRDD).collect() #intersection()取交集
#Output:[2, 3]
numbersRDD.subtract(moreNumbersRDD).collect() #substract()取差集
#Output:[1]
numbersRDD.cartesian(moreNumbersRDD).collect() #cartesian()取笛卡爾積
#Output:[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(2)在給定2個RDD後,可以透過一個類似SQL的方式去join它們
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
# Home of different people
homesRDD = sc.parallelize([
("Brussels", "John"),
("Brussels", "Jack"),
("Leuven", "Jane"),
("Antwerp", "Jill"),
])
# Quality of life index for various cities
lifeQualityRDD = sc.parallelize([
("Brussels", 10),
("Antwerp", 7),
("RestOfFlanders", 5),
])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
homesRDD.join(lifeQualityRDD).collect() #join
#Output:
#[("Antwerp", ("Jill", 7)),
# ("Brussels", ("John", 10)),
# ("Brussels", ("Jack", 10))]
homesRDD.leftOuterJoin(lifeQualityRDD).collect() #leftOuterJoin
#Output:
#[("Antwerp", ("Jill", 7)),
# ("Leuven", ("Jane", None)),
# ("Brussels", ("John", 10)),
# ("Brussels", ("Jack", 10))]
homesRDD.rightOuterJoin(lifeQualityRDD).collect() #rightOuterJoin
#Output:
#[("Antwerp", ("Jill", 7)),
# ("RestOfFlanders", (None, 5)),
# ("Brussels", ("John", 10)),
# ("Brussels", ("Jack", 10))]
homesRDD.cogroup(lifeQualityRDD).collect() #cogroup
#Output:
#[("Antwerp",
# (<pyspark.resultiterable.ResultIterable at 0x73d2d68>,
# <pyspark.resultiterable.ResultIterable at 0x73d2940>)),
# ("RestOfFlanders",
# (<pyspark.resultiterable.ResultIterable at 0x73d2828>,
# <pyspark.resultiterable.ResultIterable at 0x73d2b70>)),
# ("Leuven",
# (<pyspark.resultiterable.ResultIterable at 0x73d26a0>,
# <pyspark.resultiterable.ResultIterable at 0x7410a58>)),
# ("Brussels",
# (<pyspark.resultiterable.ResultIterable at 0x73d2b38>,
# <pyspark.resultiterable.ResultIterable at 0x74106a0>))]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Oops! Those <ResultIterable>s are Spark"s way of returning a list
# that we can walk over, without materializing the list.
# Let"s materialize the lists to make the above more readable:
(homesRDD
.cogroup(lifeQualityRDD)
.map(lambda x:(x[0], (list(x[1][0]), list(x[1][1]))))
.collect())
#Output:
#[("Antwerp", (["Jill"], [7])),
# ("RestOfFlanders", ([], [5])),
# ("Leuven", (["Jane"], [])),
# ("Brussels", (["John", "Jack"], [10]))]
1
2
3
4
5
6
7
8
9
10
11
12
13
惰性計算,actions方法
特別注意:Spark的一個核心概念是惰性計算。當你把一個RDD轉換成另一個的時候,這個轉換不會立即生效執行!!!Spark會把它先記在心裡,等到真的有actions需要取轉換結果時,才會重新組織transformations(因為可能有一連串的變換)。這樣可以避免不必要的中間結果儲存和通訊。
常見的action如下,當它們出現的時候,表明需要執行上面定義過的transform了:
collect(): 計算所有的items並返回所有的結果到driver端,接著 collect()會以Python list的形式返回結果
first(): 和上面是類似的,不過只返回第1個item
take(n): 類似,但是返回n個item
count(): 計算RDD中item的個數
top(n): 返回頭n個items,按照自然結果排序
reduce(): 對RDD中的items做聚合
1
2
3
4
5
6
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
rdd = sc.parallelize(range(1,10+1))
rdd.reduce(lambda x, y: x + y) #reduce(): 對RDD中的items做聚合
#Output:55
1
2
3
4
5
6
7
8
9
10
reduce的原理:先在每個分割槽(partition)裡完成reduce操作,然後再全域性地進行reduce。
有時候需要重複用到某個transform序列得到的RDD結果。但是一遍遍重複計算顯然是要開銷的,所以我們可以透過一個叫做cache()的操作把它暫時地儲存在記憶體中。快取RDD結果對於重複迭代的操作非常有用,比如很多機器學習的演算法,訓練過程需要重複迭代。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
import numpy as np
numbersRDD = sc.parallelize(np.linspace(1.0, 10.0, 10))
squaresRDD = numbersRDD.map(lambda x: x**2)
squaresRDD.cache() # Preserve the actual items of this RDD in memory
avg = squaresRDD.reduce(lambda x, y: x + y) / squaresRDD.count()
print(avg)
#Output:38.5