昨天,收到一個需求,應該大晚上8點左右吧,突然被上級告知,你現在停下一個手頭的工作,我這邊有一個緊急的需求。
我什麼也沒有說,不管自己再忙什麼,先聽他講什麼,最後的操作,大致是 資料清洗,不知道這有什麼緊急的,上級說緊急可能就是緊急吧。
流程上:
讀取hdfs上的目錄,然後做資料清洗,寫入一張hive表中,當然中間是沒什麼問題。
就是需要一些注意點:
例如 讀取 /user/hive/warehouse/20210203 下面,有
xxx1_payment_20210203,
xxx2_payment_20210203
xxx3_payment_20210203
xxx4_payment_20210203
xxxx,
zzzz
各種檔案,需要讀取的檔案是 xxxx_payment_20210203這樣的檔案,那顯然需要在檔案讀取的時候進行過濾。
這個時候需要呼叫 Hadoop的 FileSystem了,需要透過_gateway獲取了,倒不是有多麼難,就是記錄下 python中呼叫 Java中的類。
payment_read_path=[]Path=sc._gateway.jvm.org.apache.hadoop.fs.PathURI =sc._gateway.jvm.java.net.URIfs =sc._gateway.jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())payment_paths=fs.listStatus(Path(" /user/hive/warehouse/20210203"))for t in payment_paths: pp=t.getPath().getName().split('_') if pp[0] 。。。。 and pp[1]=='payment' and pp[2]=='20210203': payment_read_path.append(t.getPath().toString())這樣就獲取了讀取的檔案路徑,然後spark.read.....就可以了
最新評論