僅供參考:
def get_hdfs_dir(input: String, sc: SparkContext): Array[String] = {
//建立[LongWritable, Text, TextInputFormat]的rdd
val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input)
//獲取hadoop的rdd
val hadoopRDD = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
//透過檔案進行分割槽,然後透過不同分割槽來獲取分割槽的path
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.take(1).map(x => {
file.getPath.toString() //就是當前資料的所在路徑
})
val dirOut: Array[String] = fileAdnLine
.distinct() //可能會有很多一樣的path,進去去重操作
.coalesce(1)
// val array: Array[List[Char]] = fileAdnLine //這裡面存放的就是path
.map(lines => {
lines.toString
.collect()
dirOut //dirOut 是一個Array型別的資料裡面存放的就是傳入資料夾下的路徑
}
僅供參考:
def get_hdfs_dir(input: String, sc: SparkContext): Array[String] = {
//建立[LongWritable, Text, TextInputFormat]的rdd
val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input)
//獲取hadoop的rdd
val hadoopRDD = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
//透過檔案進行分割槽,然後透過不同分割槽來獲取分割槽的path
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.take(1).map(x => {
file.getPath.toString() //就是當前資料的所在路徑
})
})
val dirOut: Array[String] = fileAdnLine
.distinct() //可能會有很多一樣的path,進去去重操作
.coalesce(1)
// val array: Array[List[Char]] = fileAdnLine //這裡面存放的就是path
.map(lines => {
lines.toString
})
.collect()
dirOut //dirOut 是一個Array型別的資料裡面存放的就是傳入資料夾下的路徑
}