一、簡介#
1.1 多資料來源支援#
Spark 支援以下六個核心資料來源,同時 Spark 社群還提供了多達上百種資料來源的讀取方式,能夠滿足絕大部分使用場景。
CSV
JSON
Parquet
ORC
JDBC/ODBC connections
Plain-text files
注:以下所有測試檔案均可從本倉庫的resources 目錄進行下載
1.2 讀資料格式#
所有讀取 API 遵循以下呼叫格式:
Copy
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()
// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 讀取模式
.option("inferSchema", "true") // 是否自動推斷 schema
.option("path", "path/to/file(s)") // 檔案路徑
.schema(someSchema) // 使用預定義的 schema
.load()
讀取模式有以下三種可選項:
讀模式 描述
permissive 當遇到損壞的記錄時,將其所有欄位設定為 null,並將所有損壞的記錄放在名為 _corruption t_record 的字串列中
failFast 遇到格式不正確的資料時立即失敗
1.3 寫資料格式#
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //寫模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
寫資料模式有以下四種可選項:
Scala/Java 描述
SaveMode.ErrorIfExists 如果給定的路徑已經存在檔案,則丟擲異常,這是寫資料預設的模式
SaveMode.Append 資料以追加的方式寫入
SaveMode.Overwrite 資料以覆蓋的方式寫入
SaveMode.Ignore 如果給定的路徑已經存在檔案,則不做任何操作
二、CSV#
CSV 是一種常見的文字檔案格式,其中每一行表示一條記錄,記錄中的每個欄位用逗號分隔。
2.1 讀取CSV檔案#
自動推斷型別讀取讀取示例:
.option("header", "false") // 檔案中的第一行是否為列的名稱
.option("mode", "FAILFAST") // 是否快速失敗
.load("/usr/file/csv/dept.csv")
.show()
使用預定義型別:
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//預定義資料格式
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
.option("mode", "FAILFAST")
.schema(myManualSchema)
2.2 寫入CSV檔案#
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
也可以指定具體的分隔符:
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
2.3 可選配置#
為節省主文篇幅,所有讀寫配置項見文末 9.1 小節。三、JSON#
3.1 讀取JSON檔案#
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
需要注意的是:預設不支援一條資料記錄跨越多行 (如下),可以透過配置 multiLine 為 true 來進行更改,其預設值為 false。
// 預設支援單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//預設不支援多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
3.2 寫入JSON檔案#
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
3.3 可選配置#
為節省主文篇幅,所有讀寫配置項見文末 9.2 小節。
四、Parquet#
Parquet 是一個開源的面向列的資料儲存,它提供了多種儲存最佳化,允許讀取單獨的列非整個檔案,這不僅節省了儲存空間而且提升了讀取效率,它是 Spark 是預設的檔案格式。
4.1 讀取Parquet檔案#
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
2.2 寫入Parquet檔案#
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
Parquet 檔案有著自己的儲存規則,因此其可選配置項比較少,常用的有如下兩個:
讀寫操作 配置項 可選值 預設值 描述
Write compression or codec None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy None 壓縮檔案格式
Read mergeSchema true, false 取決於配置項 spark.sql.parquet.mergeSchema
五、ORC#
ORC 是一種自描述的、型別感知的列檔案格式,它針對大型資料的讀寫進行了最佳化,也是大資料中常用的檔案格式。
5.1 讀取ORC檔案#
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
4.2 寫入ORC檔案#
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
六、SQL Databases#
Spark 同樣支援與傳統的關係型資料庫進行資料讀寫。但是 Spark 程式預設是沒有提供資料庫驅動的,所以在使用前需要將對應的資料庫驅動上傳到安裝目錄下的 jars 目錄中。下面示例使用的是 Mysql 資料庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar 上傳到 jars 目錄下。
6.1 讀取資料#
讀取全表資料示例如下,這裡的 help_keyword 是 mysql 內建的字典表,只有 help_keyword_id 和 name 兩個欄位。
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驅動
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //資料庫地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)
從查詢結果讀取資料:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//輸出
+---------------+-----------+
|help_keyword_id| name|
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 1
七、Text#
Text 檔案在讀寫效能方面並沒有任何優勢,且不能表達明確的資料結構,所以其使用的比較少,讀寫操作如下:
7.1 讀取Text資料#
spark.read.textFile("/usr/file/txt/dept.txt").show()
7.2 寫入Text資料#
df.write.text("/tmp/spark/txt/dept")
八、資料讀寫高階特性#
8.1 並行讀#
多個 Executors 不能同時讀取同一個檔案,但它們可以同時讀取不同的檔案。這意味著當您從一個包含多個檔案的資料夾中讀取資料時,這些檔案中的每一個都將成為 DataFrame 中的一個分割槽,並由可用的 Executors 並行讀取。
8.2 並行寫#
寫入的檔案或資料的數量取決於寫入資料時 DataFrame 擁有的分割槽數量。預設情況下,每個資料分割槽寫一個檔案。
8.3 分割槽寫入#
分割槽和分桶這兩個概念和 Hive 中分割槽表和分桶表是一致的。都是將資料按照一定規則進行拆分儲存。需要注意的是 partitionBy 指定的分割槽和 RDD 中分割槽不是一個概念:這裡的分割槽表現為輸出目錄的子目錄,資料分別儲存在對應的子目錄中。
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出檔案。
8.3 分桶寫入#
分桶寫入就是將資料按照指定的列和桶數進行雜湊,目前分桶寫入只支援儲存為表,實際上這就是 Hive 的分桶表。
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
.......
一、簡介#
1.1 多資料來源支援#
Spark 支援以下六個核心資料來源,同時 Spark 社群還提供了多達上百種資料來源的讀取方式,能夠滿足絕大部分使用場景。
CSV
JSON
Parquet
ORC
JDBC/ODBC connections
Plain-text files
注:以下所有測試檔案均可從本倉庫的resources 目錄進行下載
1.2 讀資料格式#
所有讀取 API 遵循以下呼叫格式:
Copy
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()
// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 讀取模式
.option("inferSchema", "true") // 是否自動推斷 schema
.option("path", "path/to/file(s)") // 檔案路徑
.schema(someSchema) // 使用預定義的 schema
.load()
讀取模式有以下三種可選項:
讀模式 描述
permissive 當遇到損壞的記錄時,將其所有欄位設定為 null,並將所有損壞的記錄放在名為 _corruption t_record 的字串列中
failFast 遇到格式不正確的資料時立即失敗
1.3 寫資料格式#
Copy
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //寫模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
寫資料模式有以下四種可選項:
Scala/Java 描述
SaveMode.ErrorIfExists 如果給定的路徑已經存在檔案,則丟擲異常,這是寫資料預設的模式
SaveMode.Append 資料以追加的方式寫入
SaveMode.Overwrite 資料以覆蓋的方式寫入
SaveMode.Ignore 如果給定的路徑已經存在檔案,則不做任何操作
二、CSV#
CSV 是一種常見的文字檔案格式,其中每一行表示一條記錄,記錄中的每個欄位用逗號分隔。
2.1 讀取CSV檔案#
自動推斷型別讀取讀取示例:
Copy
spark.read.format("csv")
.option("header", "false") // 檔案中的第一行是否為列的名稱
.option("mode", "FAILFAST") // 是否快速失敗
.option("inferSchema", "true") // 是否自動推斷 schema
.load("/usr/file/csv/dept.csv")
.show()
使用預定義型別:
Copy
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//預定義資料格式
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
2.2 寫入CSV檔案#
Copy
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
也可以指定具體的分隔符:
Copy
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
2.3 可選配置#
為節省主文篇幅,所有讀寫配置項見文末 9.1 小節。三、JSON#
3.1 讀取JSON檔案#
Copy
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
需要注意的是:預設不支援一條資料記錄跨越多行 (如下),可以透過配置 multiLine 為 true 來進行更改,其預設值為 false。
Copy
// 預設支援單行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//預設不支援多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
3.2 寫入JSON檔案#
Copy
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
3.3 可選配置#
為節省主文篇幅,所有讀寫配置項見文末 9.2 小節。
四、Parquet#
Parquet 是一個開源的面向列的資料儲存,它提供了多種儲存最佳化,允許讀取單獨的列非整個檔案,這不僅節省了儲存空間而且提升了讀取效率,它是 Spark 是預設的檔案格式。
4.1 讀取Parquet檔案#
Copy
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
2.2 寫入Parquet檔案#
Copy
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
2.3 可選配置#
Parquet 檔案有著自己的儲存規則,因此其可選配置項比較少,常用的有如下兩個:
讀寫操作 配置項 可選值 預設值 描述
Write compression or codec None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy None 壓縮檔案格式
Read mergeSchema true, false 取決於配置項 spark.sql.parquet.mergeSchema
五、ORC#
ORC 是一種自描述的、型別感知的列檔案格式,它針對大型資料的讀寫進行了最佳化,也是大資料中常用的檔案格式。
5.1 讀取ORC檔案#
Copy
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
4.2 寫入ORC檔案#
Copy
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
六、SQL Databases#
Spark 同樣支援與傳統的關係型資料庫進行資料讀寫。但是 Spark 程式預設是沒有提供資料庫驅動的,所以在使用前需要將對應的資料庫驅動上傳到安裝目錄下的 jars 目錄中。下面示例使用的是 Mysql 資料庫,使用前需要將對應的 mysql-connector-java-x.x.x.jar 上傳到 jars 目錄下。
6.1 讀取資料#
讀取全表資料示例如下,這裡的 help_keyword 是 mysql 內建的字典表,只有 help_keyword_id 和 name 兩個欄位。
Copy
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驅動
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //資料庫地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)
從查詢結果讀取資料:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//輸出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 1
七、Text#
Text 檔案在讀寫效能方面並沒有任何優勢,且不能表達明確的資料結構,所以其使用的比較少,讀寫操作如下:
7.1 讀取Text資料#
Copy
spark.read.textFile("/usr/file/txt/dept.txt").show()
7.2 寫入Text資料#
Copy
df.write.text("/tmp/spark/txt/dept")
八、資料讀寫高階特性#
8.1 並行讀#
多個 Executors 不能同時讀取同一個檔案,但它們可以同時讀取不同的檔案。這意味著當您從一個包含多個檔案的資料夾中讀取資料時,這些檔案中的每一個都將成為 DataFrame 中的一個分割槽,並由可用的 Executors 並行讀取。
8.2 並行寫#
寫入的檔案或資料的數量取決於寫入資料時 DataFrame 擁有的分割槽數量。預設情況下,每個資料分割槽寫一個檔案。
8.3 分割槽寫入#
分割槽和分桶這兩個概念和 Hive 中分割槽表和分桶表是一致的。都是將資料按照一定規則進行拆分儲存。需要注意的是 partitionBy 指定的分割槽和 RDD 中分割槽不是一個概念:這裡的分割槽表現為輸出目錄的子目錄,資料分別儲存在對應的子目錄中。
Copy
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
輸出結果如下:可以看到輸出被按照部門編號分為三個子目錄,子目錄中才是對應的輸出檔案。
8.3 分桶寫入#
分桶寫入就是將資料按照指定的列和桶數進行雜湊,目前分桶寫入只支援儲存為表,實際上這就是 Hive 的分桶表。
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
.......