回覆列表
  • 1 # 愛可生雲資料庫

    可以只用一行程式碼來執行MapReduce作業:JobClient.runJon(conf),Job作業執行時參與的四個實體:

    1.JobClient 寫程式碼,配置作業,提交作業。

    2.JobTracker:初始化作業,分配作業,協調作業執行。這是一個java程式,主類是JobTracker。

    3.TaskTracker:執行作業劃分後的任務,即分配資料分配上執行Map或Reduce任務。

    4.HDFS:儲存作業資料、配置資訊等,儲存作業結果。

    Map/Reduce 作業總體執行流程:

    程式碼編寫 ----> 作業配置 ----> 作業提交 ----> Map任務分配和執行 ----> 處理中間結果 ----> Reduce任務分配與執行 ----> 輸出結果

    而對於每個作業的執行,又包含:

    輸入準備 ----> 任務執行 ----> 輸出結果

    作業提交JobClient:

    JobClient的runJob方法產生一個Jobclient例項並呼叫其submitJob方法,然後runJob開始迴圈嗎,並在迴圈中呼叫getTaskCompetionEvents方法,獲得TaskCompletionEvent例項,每秒輪詢作業進度(後面有介紹進度和狀態更新),把進度寫到控制檯,作業完成後顯示作業計數器,若失敗,則把錯誤記錄到控制檯。

    submitJob方法作業提交的過程:

    1.向JobTracker請求一個新的JobId。

    2.檢查作業相關路徑,如果路徑不正確就會返回錯誤。

    3.計算作業輸入分片及其劃分資訊。

    4.將作業執行需要的資源(jar檔案、配置檔案等)複製到Shared HDFS,並

    複製多個副本(引數控制,預設值為10)供tasktracker訪問,也會將計算的分片複製到HDFS。

    5.呼叫JobTracker物件的submitJob()方法來真正提交作業,告訴JobTracker作業準備執行。

    作業的初始化JobTracker:

    JobTracker收到submitJob方法呼叫後,會把呼叫放入到一個內部佇列,由作業排程器(Job scheduler)進行排程並對其初始化。Job初始化即建立一個作業物件。

    當作業被排程後,JobTracker會建立一個代表這個作業的JobInProgress物件,並將任務和記錄資訊封裝在這個物件中,以便跟蹤任務狀態和程序。

    初始化過程就是JobInProgress物件的initTasks方法進行初始化的。

    初始化步驟:

    1.從HDFS中讀取作業對應的job.split資訊,為後面的初始化做好準備。

    2.建立並初始化map和reduce任務。根據資料分片資訊中的個數確定map task的個數,然後為每個map task生成一個TaskInProgress物件來處理資料分片,先將其放入nonRunningMapCache,以便JobTracker分配任務的時候使用。接下來根據JobConf中的mapred.reduce.tasks屬性利用setNumReduceTasks()方法設定reduce task的數量,然後同map task建立方式。

    3.最後就是建立兩個初始化task,進行map和reduce的初始化。

    任務的分配JobTracker:

    訊息傳遞HeartBeat: tasktracker執行一個簡單迴圈定期傳送心跳(heartbeat)給JobTracker。由心跳告知JobTracker自己是否存活,同時作為訊息通道傳遞其它資訊(請求新task)。作為心跳的一部分,tasktracker會指明自己是否已準備好執行新的任務,如果是,jobtracker會分配它一個任務。

    分配任務所屬於的作業:在Jobtracker分配任務前需先確定任務所在的作業。後面會介紹到各種作業排程演算法,預設是一個FIFO的作業排程。

    分配Map和Reduce任務:tasktracker有固定數量的任務槽,一個tasktracker可以同時執行多個Map和Reduce任務,但其準確的數量由tasktracker的核的數量和記憶體大小決定。預設排程器會先填滿Map任務槽,再填Reduce任務槽。jobtracker會選擇距離離分片檔案最近的tasktracker,最理想情況下,任務是資料本地化(data-local)的,當然也可以是機架本地化(rack-local),如果不是本地化的,那麼他們就需要從其他機架上檢索資料。Reduce任務分配很簡單,jobtracker會簡單的從待執行的reduce任務列表中選取下一個來執行,不用考慮資料本地化。

    任務的執行TaskTracker:

    TaskTracker收到新任務後,就要在本地執行任務了,執行任務的第一步就是透過localizedJob將任務本地化所需要的注入配置、資料、程式等資訊進行本地化。

    1.本地化資料:從共享檔案系統將job.split 、job.jar (在分散式快取中)複製本地,將job配置資訊寫入job.xml。

    2.新建本地工作目錄:tasktracker會加壓job.jar檔案到本工作目錄。

    3.呼叫launchTaskForJob方法釋出任務(其中會新建TaskRunner例項執行任務),如果是Map任務就啟用MapTaskRunner,對於Reduce就是ReduceTaskRunner。

    在這之後,TaskRunner會啟用一個新的JVM來執行每個Map/Reduce任務,防止程式原因而導致tasktracker崩潰,但不同任務間重用JVM還是可以的,後續會講到任務JVM重用。

    對於單個Map,任務執行的簡單流程是:

    1.分配任務執行引數

    2.在Child臨時檔案中新增map任務資訊(Child是執行Map和Reduce任務的主程序)

    3.配置log資料夾,配置map任務的通訊和輸出引數

    4.讀取input split,生成RecordReader讀取資料

    5.為Map生成MapRunnable,依次從RecordReader中接收資料,並呼叫Map函式進行處理。

    6.最後將map函式的輸出呼叫collect收集到MapOutputBuffer(引數控制其大小)中。

    Streaming和Pipes:

    Streaming和Pipes都執行特殊的Map和Reduce任務,目的是執行使用者提供的可執行程式並與之通訊。

    Streaming:使用標準輸入輸出Streaming與程序進行通訊。

    Pipes:用來監聽套接字,會發送一個埠號給C++程式,兩者便可建立連結。

    進度和狀態更新:

    一個作業和它的任務都有狀態(status),其中包括:執行成功失敗狀態、Map/Reduce進度、作業計數器值、狀態訊息。

    狀態訊息與客戶端的通訊:

    1.對於Map任務Progress的追蹤:progress是已經處理完的輸入所佔的比例。

    2.對於Reduce:稍複雜,reduce任務分三個階段(每個階段佔1/3),複製、排序和Reduce處理,若reduce已執行一半的輸入的話,那麼任務進度便是1/3+1/3+1/6=5/6。

    3.任務計數器:任務有一組計數器,負責對任務執行各個事件進行計數。

    4.任務進度報告:如果任務報告了進度,便會設定一個標記以表明狀態將被髮送到tasktracker。有一個獨立執行緒每隔三秒檢查一次此標記,如果已設定,則告知tasktracker當前狀態。

    5.tasktracker進度報告:tasktracker會每隔5秒(這個心跳是由叢集大小決定,叢集越大時間會越長)傳送heartbeat到jobtracker,並且tasktracker執行的所有狀態都會在呼叫中被髮送到jobtracker。

    6.jobtracker合併各任務報告:產生一個表明所有執行作業機器所含任務狀態的全域性檢視。

    前面提到的JobClient就是透過每秒查詢JobTracker來接收最新狀態,而且客戶端JobClient的getJob方法可以得到一個RunningJob的例項,其包含了作業的所以狀態資訊。

    作業的完成:

    當jobtracker收到作業最後一個任務已完成的通知後,便把作業狀態設定成成功。JobClient查詢狀態時,便知道任務已成功完成,於是JobClient列印一條訊息告知使用者,然後從runJob方法返回。

    如果jobtracker有相應設定,也會發送一個Http作業通知給客戶端,希望收到回撥指令的客戶端可以透過job.end.notification.url屬性來進行設定。

    失敗

    實際情況下,使用者的程式碼存在軟體錯誤程序會崩潰,機器也會產生故障,但Hadoop能很好的應對這些故障並完成作業。

    1.任務失敗

    子任務異常:如Map/Reduce任務中的使用者程式碼丟擲異常,子任務JVM程序會在退出前向父程序tasktracker傳送錯誤報告,錯誤被記錄使用者日誌。tasktracker會將此次task attempt標記為tailed,並釋放這個任務槽執行另外一個任務。

    子程序JVM突然退出:可能由於JVM bug導致使用者程式碼造成的某些特殊原因導致JVM退出,這種情況下,tasktracker會注意到程序已經退出,並將此次嘗試標記為failed。

    任務掛起:一旦tasktracker注意一段時間沒有收到進度更新,便會將任務標記為failed,JVM子程序將被自動殺死。任務失敗間隔時間通常為10分鐘,可以以作業或者叢集為基礎設定過期時間,引數為mapred.task.timeout。注意:如果引數值設定為0,則掛起的任務永遠不會釋放掉它的任務槽,隨著時間的推移會降低整個叢集的效率。

    任務失敗嘗試次數:jobtracker得知一個tasktracker失敗後,它會重新排程該任務執行,當然,jobtracker會嘗試避免重新排程失敗過的tasktracker任務。如果一個任務嘗試次數超過4次,它將不再被重試。這個值是可以設定的,對於Map任務,引數是mapred.map.max.attempts,對於reduce任務,則由mapred.reduce.max.attempts屬性控制。如果次數超過限制,整個作業都會失敗。當然,有時我們不希望少數幾個任務失敗就終止執行的整個作業,因為即使有些任務失敗,作業的一些結果可能還是有用的,這種情況下,可以為作業設定在不觸發作業失敗情況下的允許任務失敗的最大百分比,Map任務和Reduce任務可以獨立控制,引數為mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。

    任務嘗試中止(kill):任務終止和任務失敗不同,task attempt可以中止是因為他是一個推測副本或因為它所處的tasktracker失敗,導致jobtracker將它上面的所有task attempt標記為killed。被終止的task attempt不會被計入任務執行嘗試次數,因為嘗試中止並不是任務的錯。

    2.tasktracker失敗

    tasktracker由於崩潰或者執行過慢而失敗,他將停止向jobtracker傳送心跳(或很少傳送心跳)。jobtracker注意已停止傳送心跳的tasktracker(過期時間由引數mapred.tasktracker.expiry.interval設定,單位毫秒),並將它從等待排程的tasktracker池中移除。如果是未完成的作業,jobtracker會安排次tasktracker上已經執行成功的Map任務重新執行,因為此時reduce任務已無法訪問(中間輸出存放在失敗的tasktracker的本地檔案系統上)。

    即使tasktracker沒有失敗,也有可能被jobtracker列入黑名單。如果tasktracker上面的失敗任務數量遠遠高於叢集的平均失敗任務次數,他就會被列入黑名單,被列入黑名單的tasktracker可以透過重啟從jobtracker黑名單中移除。

    3.jobtracker失敗

    老版本的JobTracker失敗屬於單點故障,這種情況下作業註定失敗。

    作業排程:

    早期作業排程FIFO:按作業提交順序先進先出。可以設定優先順序,透過設定mapred.job.priority屬性或者JobClient的setJobPriority()方法制定優先順序(優先級別:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO排程演算法不支援搶佔(preemption),所以高優先順序作業仍然會被那些已經開始的長時間執行的低優先順序作業所阻塞。

    Fair Scheduler:目標是讓每個使用者公平地共享叢集能力。當叢集存在很多作業時,空閒的任務槽會以”讓每個使用者共享叢集“的方式進行分配。預設每個使用者都有自己的作業池。FairScheduler支援搶佔,所以,如果一個池在特定的一段時間未得到公平地資源共享,它會終止池中得到過多的資源任務,以便把任務槽讓給資源不足的池。FairScheduler是一個後續模組,使用它需要將其jar檔案放在Hadoop的類路徑下。可以透過引數map.red.jobtracker.taskScheduler屬性配置(值為org.apache.hadoop.mapred.FairScheduler)

    Capacity Scheduler:

    叢集由很多佇列組成,每個佇列都有一個分配能力,這一點與FairScheduler類似,只不過在每個佇列內部,作業根據FIFO方式進行排程。本質上說,Capacity Scheduler允許使用者或組織為每個使用者模擬一個獨立使用FIFO的叢集。

    shuffle和排序:

    MapReduce確保每個Reducer的輸入都是按鍵排序的。系統執行排序的過程-將map輸出作為輸入傳給reducer的過程稱為shuffle。shuffle屬於不斷被最佳化和改進的程式碼庫的一部分,從許多方面來看,shuffle是MapReduce的心臟。

    整個shuffle的流程應該是這樣:

    map結果劃分partition 排序sort 分割spill 合併同一劃分 合併同一劃分 合併結果排序 reduce處理 輸出

    Map端:

    寫入緩衝區:Map函式的輸出,是由collector處理的,它並不是簡單的將結果寫到磁碟。它利用緩衝的方式寫到記憶體,並處於效率的考慮進行預排序。每個map都有一個環形的記憶體緩衝區,用於任務輸出,預設緩衝區大小為100MB(由引數io.sort.mb調整),一旦緩衝區內容達到閾值(預設0.8),後臺程序邊開始把內容寫到磁碟(spill),在寫磁碟過程中,map輸出繼續被寫到緩衝區,但如果緩衝區被填滿,map會阻塞知道寫磁碟過程完成。寫磁碟將按照輪詢方式寫到mapred.local.dir屬性制定的作業特定子目錄中。

    寫出緩衝區:collect將緩衝區的內容寫出時,會呼叫sortAndSpill函式,這個函式作用主要是建立spill檔案,按照key值對資料進行排序,按照劃分將資料寫入檔案,如果配置了combiner類,會先呼叫combineAndSpill函式再寫檔案。sortAndSpill每被呼叫一次,就會寫一個spill檔案。

    合併所有Map的spill檔案:TaskTracker會在每個map任務結束後對所有map產生的spill檔案進行merge,merge規則是根據分割槽將各個spill檔案中資料同一分割槽中的資料合併在一起,並寫入到一個已分割槽且排序的map輸出檔案中。待唯一的已分割槽且已排序的map輸出檔案寫入最後一條記錄後,map端的shuffle階段就結束了。

    在寫磁碟前,執行緒首先根據資料最終要傳遞到的reducer把資料劃分成響應的分割槽(partition),在每個分割槽中,後臺執行緒按鍵進行內排序,如果有一個combiner,它會在排序後的輸出上執行。

    記憶體達到溢位寫的閾值時,就會新建一個溢位寫檔案,因為map任務完成其最後一個輸出記錄之後,會有幾個溢位寫檔案。在任務完成前,溢位寫檔案會被合併成一個已分割槽且已排序的輸出檔案。配置屬性io.sort.facor控制一次最多能合併多少流,預設值是10。

    如果已經指定combiner,並且寫次數至少為3(透過min.mum.spills.for.combine設定)時,則combiner就會在輸出檔案寫到磁碟之前執行。執行combiner的意義在於使map輸出更緊湊,捨得寫到本地磁碟和傳給reducer的資料更少。

    寫磁碟時壓縮:寫磁碟時壓縮會讓寫的速度更快,節約磁碟空間,並且減少傳給reducer的資料量。預設情況下,輸出是不壓縮的,但可以透過設定mapred.compress.map.output值為true,就可以啟用壓縮。使用的壓縮庫是由mapred.map.output.compression.codec制定。

    reducer獲得檔案分割槽的工作執行緒:reducer透過http方式得到輸出檔案的分割槽,用於檔案分割槽的工作執行緒數量由tracker.http.threads屬性指定,此設定針對的是每個tasktracker,而不是每個map任務槽。預設值為40,在大型叢集上此值可以根據需要而增加。

    Reduce端:

    複製階段:reduce會定期向JobTracker獲取map的輸出位置,一旦拿到輸出位置,reduce就會從對應的TaskTracker上覆制map輸出到本地(如果map輸出很小,則會被複制到TaskTracker節點的記憶體中,否則會被讓如磁碟),而不會等到所有map任務結束(當然這個也有引數控制)。

    合併階段:從各個TaskTracker上覆制的map輸出檔案(無論在磁碟還是記憶體)進行整合,並維持資料原來的順序。

    Reduce階段:從合併的檔案中順序拿出一條資料進行reduce函式處理,然後將結果輸出到本地HDFS。

    Map的輸出檔案位於執行map任務的tasktracker的本地磁碟,現在,tasktracker要為分割槽檔案執行reduce任務。每個任務完成時間可能不同,但是隻要有一個任務完成,reduce任務就開始複製其輸出,這就是reduce任務的複製階段(copy phase)。reduce任務有少量複製執行緒,因此能夠並行取得map輸出。預設值是5個執行緒,可以透過mapred.reduce.parallel.copies屬性設定。

    Reducer如何得知從哪個tasktracker獲得map輸出:map任務完成後會通知其父tasktracker狀態已更新,tasktracker進而通知(透過heart beat)jobtracker。因此,JobTracker就知道map輸出和tasktracker之間的對映關係,reducer中的一個執行緒定期詢問jobtracker以便獲知map輸出位置。由於reducer有可能失敗,因此tasktracker並沒有在第一個reducer檢索到map輸出時就立即從磁碟上刪除它們,相反他會等待jobtracker告示它可以刪除map輸出時才刪除,這是作業完成後最後執行的。

    如果map輸出很小,則會被直接複製到reduce tasktracker的記憶體緩衝區(大小由mapred.job.shuffle.input.buffer.percent控制,佔堆空間的百分比),否則,map輸出被複制到磁碟。一旦記憶體緩衝區達到閾值大小(由mapred.iob.shuffle.merge.percent)

    或達到map輸出閾值大小(mapred.inmem.threadhold),則合併後溢位寫到磁碟中。

    隨著磁碟上副本增多,後臺執行緒會將他們合併為更大的、排好序的檔案。注意:為了合併,壓縮的map輸出必須在記憶體中被解壓縮。

    排序階段:複製階段完成後,reduce任務會進入排序階段,更確切的說是合併階段,這個階段將合併map輸出,維持其順序排列。合併是迴圈進行的,由合併因子決定每次合併的輸出檔案數量。但讓有可能會產生中間檔案。

    reduce階段:在最後reduce階段,會直接把排序好的檔案輸入reduce函式,不會對中間檔案進行再合併,最後的合併即可來自記憶體,也可來自磁碟。此階段的輸出會直接寫到檔案系統,一般為hdfs。

    細節:這裡合併是並非平均合併,比如有40個檔案,合併因子為10,我們並不是每趟合併10個,合併四趟。而是第一趟合併4個,後三趟合併10,在最後一趟中4個已合併的檔案和餘下6個未合併會直接併入reduce。

  • 中秋節和大豐收的關聯?
  • 網戀見面靠譜嗎?