分布式基礎學習【二】 —— 分布式計算系統(Map/Reduce)

作者: duguguiyu  來源: 博客園  發布時間: 2010-11-03 16:38  閱讀: 1259 次  推薦: 0   原文鏈接   [收藏]  

  二. 分布式計算(Map/Reduce)

  分布式式計算,同樣是一個寬泛的概念,在這里,它狹義的指代,按Google Map/Reduce框架所設計的分布式框架。在Hadoop中,分布式文件系統,很大程度上,是為各種分布式計算需求所服務的。我們說分布式文件系統就是加了分布式的文件系統,類似的定義推廣到分布式計算上,我們可以將其視為增加了分布式支持的計算函數。從計算的角度上看,Map/Reduce框架接受各種格式的鍵值對文件作為輸入,讀取計算后,最終生成自定義格式的輸出文件。而從分布式的角度上看,分布式計算的輸入文件往往規模巨大,且分布在多個機器上,單機計算完全不可支撐且效率低下,因此Map/Reduce框架需要提供一套機制,將此計算擴展到無限規模的機器集群上進行。依照這樣的定義,我們對整個Map/Reduce的理解,也可以分別沿著這兩個流程去看。。。
  在Map/Reduce框架中,每一次計算請求,被稱為作業。在分布式計算Map/Reduce框架中,為了完成這個作業,它進行兩步走的戰略,首先是將其拆分成若干個Map任務,分配到不同的機器上去執行,每一個Map任務拿輸入文件的一部分作為自己的輸入,經過一些計算,生成某種格式的中間文件,這種格式,與最終所需的文件格式完全一致,但是僅僅包含一部分數據。因此,等到所有Map任務完成后,它會進入下一個步驟,用以合并這些中間文件獲得最后的輸出文件。此時,系統會生成若干個Reduce任務,同樣也是分配到不同的機器去執行,它的目標,就是將若干個Map任務生成的中間文件為匯總到最后的輸出文件中去。當然,這個匯總不總會像1 + 1 = 2那么直接了當,這也就是Reduce任務的價值所在。經過如上步驟,最終,作業完成,所需的目標文件生成。整個算法的關鍵,就在于增加了一個中間文件生成的流程,大大提高了靈活性,使其分布式擴展性得到了保證。。。

  I. 術語對照

  和分布式文件系統一樣,Google、Hadoop和....我,各執一種方式表述統一概念,為了保證其統一性,特有下表。。。
文中翻譯 Hadoop術語 Google術語 相關解釋
作業 Job Job 用戶的每一個計算請求,就稱為一個作業。
作業服務器 JobTracker Master 用戶提交作業的服務器,同時,它還負責各個作業任務的分配,管理所有的任務服務器。
任務服務器 TaskTracker Worker 任勞任怨的工蜂,負責執行具體的任務。
任務 Task Task 每一個作業,都需要拆分開了,交由多個服務器來完成,拆分出來的執行單位,就稱為任務。
備份任務 Speculative Task Buckup Task 每一個任務,都有可能執行失敗或者緩慢,為了降低為此付出的代價,系統會未雨綢繆的實現在另外的任務服務器上執行同樣一個任務,這就是備份任務。

  II. 基本架構

  與分布式文件系統類似,Map/Reduce的集群,也由三類服務器構成。其中作業服務器,在Hadoop中稱為Job Tracker,在Google論文中稱為Master。前者告訴我們,作業服務器是負責管理運行在此框架下所有作業的,后者告訴我們,它也是為各個作業分配任務的核心。與HDFS的主控服務器類似,它也是作為單點存在的,簡化了負責的同步流程。具體的負責執行用戶定義操作的,是任務服務器,每一個作業被拆分成很多的任務,包括Map任務Reduce任務等,任務是具體執行的基本單元,它們都需要分配到合適任務服務器上去執行,任務服務器一邊執行一邊向作業服務器匯報各個任務的狀態,以此來幫助作業服務器了解作業執行的整體情況,分配新的任務等等。。。
除了作業的管理者執行者,還需要有一個任務的提交者,這就是客戶端。與分布式文件系統一樣,客戶端也不是一個單獨的進程,而是一組API,用戶需要自定義好自己需要的內容,經由客戶端相關的代碼,將作業及其相關內容和配置,提交到作業服務器去,并時刻監控執行的狀況。。。
  同作為Hadoop的實現,與HDFS的通信機制相同,Hadoop Map/Reduce也是用了協議接口來進行服務器間的交流。實現者作為RPC服務器,調用者經由RPC的代理進行調用,如此,完成大部分的通信,具體服務器的架構,和其中運行的各個協議狀況,參見下圖。從圖中可以看到,與HDFS相比,相關的協議少了幾個,客戶端與任務服務器,任務服務器之間,都不再有直接通信關系。這并不意味著客戶端就不需要了解具體任務的執行狀況,也不意味著,任務服務器之間不需要了解別家任務執行的情形,只不過,由于整個集群各機器的聯系比HDFS復雜的多,直接通信過于的難以維系,所以,都統一由作業服務器整理轉發。另外,從這幅圖可以看到,任務服務器不是一個人在戰斗,它會像孫悟空一樣招出一群寶寶幫助其具體執行任務。這樣做的好處,個人覺得,應該有安全性方面的考慮,畢竟,任務的代碼是用戶提交的,數據也是用戶指定的,這質量自然良莠不齊,萬一碰上個搞破壞的,把整個任務服務器進程搞死了,就因小失大了。因此,放在單獨的地盤進行,愛咋咋地,也算是權責明確了。。。
  與分布式文件系統相比,Map/Reduce框架的還有一個特點,就是可定制性強。文件系統中很多的算法,都是很固定和直觀的,不會由于所存儲的內容不同而有太多的變化。而作為通用的計算框架,需要面對的問題則要復雜很多,在各種不同的問題、不同的輸入、不同的需求之間,很難有一種包治百病的藥能夠一招鮮吃遍天。作為Map/Reduce框架而言,一方面要盡可能的抽取出公共的一些需求,實現出來。更重要的,是需要提供良好的可擴展機制,滿足用戶自定義各種算法的需求。Hadoop是由Java來實現的,因此通過反射來實現自定義的擴展,顯得比較小菜一碟了。在JobConf類中,定義了大量的接口,這基本上是Hadoop Map/Reduce框架所有可定制內容的一次集中展示。在JobConf中,有大量set接口接受一個Class<? extends xxx>的參數,通常它都有一個默認實現的類,用戶如果不滿意,則可自定義實現。。。

  III. 計算流程

  如果一切都按部就班的進行,那么整個作業的計算流程,應該是作業的提交 -> Map任務的分配和執行 -> Reduce任務的分配和執行 -> 作業的完成。而在每個任務的執行中,又包含輸入的準備 -> 算法的執行 -> 輸出的生成,三個子步驟。沿著這個流程,我們可以很快的整理清晰整個Map/Reduce框架下作業的執行。。。

  1、作業的提交

  一個作業,在提交之前,需要把所有應該配置的東西都配置好,因為一旦提交到了作業服務器上,就陷入了完全自動化的流程,用戶除了觀望,最多也就能起一個監督作用,懲治一些不好好工作的任務。。。
基本上,用戶在提交代碼階段,需要做的工作主要是這樣的:
  首先,書寫好所有自定的代碼,最起碼,需要有Map和Reduce的執行代碼。在Hadoop中,Map需要派生自Mapper<K1, V1, K2, V2>接口,Reduce需要派生自Reducer<K2, V2, K3, V3>接口。這里都是用的泛型,用以支持不同的鍵值類型。這兩個接口都僅有一個方法,一個是map,一個是reduce,這兩個方法都直接受四個參數,前兩個是輸入的相關的數據結構,第三個是作為輸出相關的數據結構,最后一個,是一個Reporter類的實例,實現的時候可以利用它來統計一些計數。除了這兩個接口,還有大量可以派生的接口,比如分割的Partitioner<K2, V2>接口。。。
  然后,需要書寫好主函數的代碼,其中最主要的內容就是實例化一個JobConf類的對象,然后調用其豐富的setXXX接口,設定好所需的內容,包括輸入輸出的文件路徑,Map和Reduce的類,甚至包括讀取寫入文件所需的格式支持類,等等。。。
  最后,調用JobClientrunJob方法,提交此JobConf對象。runJob方法會先行調用到JobSubmissionProtocol接口所定義的submitJob方法,將此作業,提交給作業服務器。接著,runJob開始循環,不停的調用JobSubmissionProtocol的getTaskCompletionEvents方法,獲得TaskCompletionEvent類的對象實例,了解此作業各任務的執行狀況。。。

  2、Map任務的分配

  當一個作業提交到了作業服務器上,作業服務器會生成若干個Map任務,每一個Map任務,負責將一部分的輸入轉換成格式與最終格式相同的中間文件。通常一個作業的輸入都是基于分布式文件系統的文件(當然在單機環境下,文件系統單機的也可以...),因為,它可以很天然的和分布式的計算產生聯系。而對于一個Map任務而言,它的輸入往往是輸入文件的一個數據塊,或者是數據塊的一部分,但通常,不跨數據塊。因為,一旦跨了數據塊,就可能涉及到多個服務器,帶來了不必要的復雜性。。。
  當一個作業,從客戶端提交到了作業服務器上,作業服務器會生成一個JobInProgress對象,作為與之對應的標識,用于管理。作業被拆分成若干個Map任務后,會預先掛在作業服務器上的任務服務器拓撲樹。這是依照分布式文件數據塊的位置來劃分的,比如一個Map任務需要用某個數據塊,這個數據塊有三份備份,那么,在這三臺服務器上都會掛上此任務,可以視為是一個預分配。。。
  關于任務管理和分配的大部分的真實功能和邏輯的實現,JobInProgress則依托JobInProgressListenerTaskScheduler的子類。TaskScheduler,顧名思義是用于任務分配的策略類(為了簡化描述,用它代指所有TaskScheduler的子類...)。它會掌握好所有作業的任務信息,其assignTasks函數,接受一個TaskTrackerStatus作為參數,依照此任務服務器的狀態和現有的任務狀況,為其分配新的任務。而為了掌握所有作業相關任務的狀況,TaskScheduler會將若干個JobInProgressListener注冊到JobTracker中去,當有新的作業到達、移除或更新的時候,JobTracker會告知給所有的JobInProgressListener,以便它們做出相應的處理。。。
  任務分配是一個重要的環節,所謂任務分配,就是將合適作業的合適任務分配到合適的服務器上。不難看出,里面蘊含了兩個步驟,先是選擇作業,然后是在此作業中選擇任務。和所有分配工作一樣,任務分配也是一個復雜的活。不良好的任務分配,可能會導致網絡流量增加、某些任務服務器負載過重效率下降,等等。不僅如此,任務分配還是一個無一致模式的問題,不同的業務背景,可能需要不同的算法才能滿足需求。因此,在Hadoop中,有很多TaskScheduler的子類,像Facebook,Yahoo,都為其貢獻出了自家用的算法。在Hadoop中,默認的任務分配器,是JobQueueTaskScheduler類。它選擇作業的基本次序是:Map Clean Up Task(Map任務服務器的清理任務,用于清理相關的過期的文件和環境...) -> Map Setup Task(Map任務服務器的安裝任務,負責配置好相關的環境...) -> Map Tasks -> Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在這個前提下,具體到Map任務的分配上來。當一個任務服務器工作的游刃有余,期待獲得新的任務的時候,JobQueueTaskScheduler會按照各個作業的優先級,從最高優先級的作業開始分配。每分配一個,還會為其留出余量,已被不時之需。舉一個例子:系統目前有優先級3、2、1的三個作業,每個作業都有一個可分配的Map任務,一個任務服務器來申請新的任務,它還有能力承載3個任務的執行,JobQueueTaskScheduler會先從優先級3的作業上取一個任務分配給它,然后再留出一個1任務的余量。此時,系統只能在將優先級2作業的任務分配給此服務器,而不能分配優先級1的任務。這樣的策略,基本思路就是一切為高優先級的作業服務,優先分配不說,分配了好保留有余力以備不時之需,如此優待,足以讓高優先級的作業喜極而泣,讓低優先級的作業感慨既生瑜何生亮,甚至是活活餓死。。。
  確定了從哪個作業提取任務后,具體的分配算法,經過一系列的調用,最后實際是由JobInProgressfindNewMapTask函數完成的。它的算法很簡單,就是盡全力為此服務器非配且盡可能好的分配任務,也就是說,只要還有可分配的任務,就一定會分給它,而不考慮后來者。作業服務器會從離它最近的服務器開始,看上面是否還掛著未分配的任務(預分配上的),從近到遠,如果所有的任務都分配了,那么看有沒有開啟多次執行,如果開啟,考慮把未完成的任務再分配一次(后面有地方詳述...)。。。
  對于作業服務器來說,把一個任務分配出去了,并不意味著它就徹底解放,可以對此任務可以不管不顧了。因為任務可以在任務服務器上執行失敗,可能執行緩慢,這都需要作業服務器幫助它們再來一次。因此在Task中,記錄有一個TaskAttemptID,對于任務服務器而言,它們每次跑的,其實都只是一個Attempt而已,Reduce任務只需要采信一個的輸出,其他都算白忙乎了。。。

  3、Map任務的執行

  與HDFS類似,任務服務器是通過心跳消息,向作業服務器匯報此時此刻其上各個任務執行的狀況,并向作業服務器申請新的任務的。具體實現,是TaskTracker調用InterTrackerProtocol協議的heartbeat方法來做的。這個方法接受一個TaskTrackerStatus對象作為參數,它描述了此時此任務服務器的狀態。當其有余力接受新的任務的時候,它還會傳入acceptNewTasks為true的參數,表示希望作業服務器委以重任。JobTracker接收到相關的參數后,經過處理,會返回一個HeartbeatResponse對象。這個對象中,定義了一組TaskTrackerAction,用于指導任務服務器進行下一步的工作。系統中已定義的了一堆其TaskTrackerAction的子類,有的對攜帶的參數進行了擴充,有的只是標明了下ID,具體不詳寫了,一看便知。。。
  當TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它會開始執行所分配的新的任務。在TaskTracker中,有一個TaskTracker.TaskLauncher線程(確切的說是兩個,一個等Map任務,一個等Reduce任務),它們在癡癡的守候著新任務的來到。一旦等到了,會最終調用到Task的createRunner方法,構造出一個TaskRunner對象,新建一個線程來執行。對于一個Map任務,它對應的Runner是TaskRunner的子類MapTaskRunner,不過,核心部分都在TaskRunner的實現內。TaskRunner會先將所需的文件全部下載并拆包好,并記錄到一個全局緩存中,這是一個全局的目錄,可以供所有此作業的所有任務使用。它會用一些軟鏈接,將一些文件名鏈接到這個緩存中來。然后,根據不同的參數,配置出一個JVM執行的環境,這個環境與JvmEnv類的對象對應。
  接著,TaskRunner會調用JvmManagerlaunchJvm方法,提交給JvmManager處理。JvmManager用于管理該TaskTracker上所有運行的Task子進程。在目前的實現中,嘗試的是池化的方式。有若干個固定的槽,如果槽沒有滿,那么就啟動新的子進程,否則,就尋找idle的進程,如果是同Job的直接放進去,否則殺死這個進程,用一個新的進程代替。每一個進程都是由JvmRunner來管理的,它也是位于單獨線程中的。但是從實現上看,這個機制好像沒有部署開,子進程是死循環等待,而不會阻塞在父進程的相關線程上,父線程的變量一直都沒有個調整,一旦分配,始終都處在繁忙的狀況了。
真實的執行載體,是Child,它包含一個main函數,進程執行,會將相關參數傳進來,它會拆解這些參數,并且構造出相關的Task實例,調用其run函數進行執行。每一個子進程,可以執行指定個數量的Task,這就是上面所說的池化的配置。但是,這套機制在我看來,并沒有運行起來,每個進程其實都沒有機會不死而執行新的任務,只是傻傻的等待進程池滿,而被一刀斃命。也許是我老眼昏花,沒看出其中實現的端倪。。。

  4、Reduce任務的分配與執行

  比之Map任務,Reduce的分配及其簡單,基本上是所有Map任務完成了,有空閑的任務服務器,來了就給分配一個Job任務。因為Map任務的結果星羅棋布,且變化多端,真要搞一個全局優化的算法,絕對是得不償失。而Reduce任務的執行進程的構造和分配流程,與Map基本完全的一致,沒有啥可說的了。。。
  但其實,Reduce任務與Map任務的最大不同,是Map任務的文件都在本地隔著,而Reduce任務需要到處采集。這個流程是作業服務器經由此Reduce任務所處的任務服務器,告訴Reduce任務正在執行的進程,它需要的Map任務執行過的服務器地址,此Reduce任務服務器會于原Map任務服務器聯系(當然本地就免了...),通過FTP服務,下載過來。這個隱含的直接數據聯系,就是執行Reduce任務與執行Map任務最大的不同了。。。

  5、作業的完成

  當所有Reduce任務都完成了,所需數據都寫到了分布式文件系統上,整個作業才正式完成了。此中,涉及到很多的類,很多的文件,很多的服務器,所以說起來很費勁,話說,一圖解千語,說了那么多,我還是畫兩幅圖,徹底表達一下吧。。。
  首先,是一個時序圖。它模擬了一個由3個Map任務和1個Reduce任務構成的作業執行流程。我們可以看到,在執行的過程中,只要有人太慢,或者失敗,就會增加一次嘗試,以此換取最快的執行總時間。一旦所有Map任務完成,Reduce開始運作(其實,不一定要這樣的...),對于每一個Map任務來說,只有執行到Reduce任務把它上面的數據下載完成,才算成功,否則,都是失敗,需要重新進行嘗試。。。
  而第二副圖,不是我畫的,就不轉載了,參見這里,它描述了整個Map/Reduce的服務器狀況圖,包括整體流程、所處服務器進程、輸入輸出等,看清楚這幅圖,對Map/Reduce的基本流程應該能完全跑通了。有這幾點,可能圖中描述的不夠清晰需要提及一下,一個是在HDFS中,其實還有日志文件,圖中沒有標明;另一個是步驟5,其實是由TaskTracker主動去拉取而不是JobTracker推送過來的;還有步驟8和步驟11,創建出來的MapTask和ReduceTask,在Hadoop中都是運行在獨立的進程上的。。。

  IV. Map任務詳請

  從上面,可以了解到整個Map和Reduce任務的整體流程,而后面要啰嗦的,是具體執行中的細節。Map任務的輸入,是分布式文件系統上的,包含鍵值對信息的文件。為了給每一個Map任務指定輸入,我們需要掌握文件格式把它分切成塊,并從每一塊中分離出鍵值信息。在HDFS中,輸入的文件格式,是由InputFormat<K, V>類來表示的,在JobConf中,它的默認值是TextInputFormat類(見getInputFormat),此類是特化的FileInputFormat<LongWritable, Text>子類,而FileInputFormat<K, V>正是InputFormat<K, V>的子類。通過這樣的關系我們可以很容易的理解,默認的文件格式是文本文件,且鍵是LongWritable類型(整形數),值是Text類型(字符串)。僅僅知道文件類型是不夠的,我們還需要將文件中的每一條數據,分離成鍵值對,這個工作,是RecordReader<K, V>來做的。在TextInputFormat的getRecordReader方法中我們可以看到,與TextInputFormat默認配套使用的,是LineRecordReader類,是特化的RecordReader<LongWritable, Text>的子類,它將每一行作為一個記錄,起始的位置作為鍵,整行的字符串作為值。有了格式,分出了鍵值,還需要切開分給每一個Map任務。每一個Map任務的輸入用InputSplit接口表示,對于一個文件輸入而言,其實現是FileSplit,它包含著文件名、起始位置、長度和存儲它的一組服務器地址。。。
  當Map任務拿到所屬的InputSplit后,就開始一條條讀取記錄,并調用用于定義的Mapper,進行計算(參見MapRunner<K1, V1, K2, V2>和MapTask的run方法),然后,輸出。MapTask會傳遞給Mapper一個OutputCollector<K, V>對象,作為輸出的數據結構。它定義了一個collect的函數,接受一個鍵值對。在MapTask中,定義了兩個OutputCollector的子類,一個是MapTask.DirectMapOutputCollector<K, V>,人如其名,它的實現確實很Direct,直截了當。它會利用一個RecordWriter<K, V>對象,collect一調用,就直接調用RecordWriter<K, V>的write方法,寫入本地的文件中去。如果覺著RecordWriter<K, V>出現的很突兀,那么看看上一段提到的RecordReader<K, V>,基本上,數據結構都是對應著的,一個是輸入一個是輸出。輸出很對稱也是由RecordWriter<K, V>和OutputFormat<K, V>來協同完成的,其默認實現是LineRecordWriter<K, V>和TextOutputFormat<K, V>,多么的眼熟啊。。。
除了這個非常直接的實現之外,MapTask中還有一個復雜的多的實現,是MapTask.MapOutputBuffer<K extends Object, V extends Object>。有道是簡單壓倒一切,那為什么有很簡單的實現,要琢磨一個復雜的呢。原因在于,看上去很美的往往帶著刺,簡單的輸出實現,每調用一次collect就寫一次文件,頻繁的硬盤操作很有可能導致此方案的低效。為了解決這個問題,這就有了這個復雜版本,它先開好一段內存做緩存,然后制定一個比例做閾值開一個線程監控此緩存。collect來的內容,先寫到緩存中,當監控線程發現緩存的內容比例超過閾值,掛起所有寫入操作,建一個新的文件,把緩存的內容批量刷到此文件中去,清空緩存,重新開放,接受繼續collect。。。
  為什么說是刷到文件中去呢。因為這不是一個簡單的照本宣科簡單復制的過程,在寫入之前,會先將緩存中的內存,經過排序、合并器(Combiner)統計之后,才會寫入。如果你覺得Combiner這個名詞聽著太陌生,那么考慮一下Reducer,Combiner也就是一個Reducer類,通過JobConf的setCombinerClass進行設置,在常用的配置中,Combiner往往就是用用戶為Reduce任務定義的那個Reducer子類。只不過,Combiner只是服務的范圍更小一些而已,它在Map任務執行的服務器本地,依照Map處理過的那一小部分數據,先做一次Reduce操作,這樣,可以壓縮需要傳輸內容的大小,提高速度。每一次刷緩存,都會開一個新的文件,等此任務所有的輸入都處理完成后,就有了若干個有序的、經過合并的輸出文件。系統會將這些文件搞在一起,再做一個多路的歸并外排,同時使用合并器進行合并,最終,得到了唯一的、有序的、經過合并的中間文件(注:文件數量等同于分類數量,在不考慮分類的時候,簡單的視為一個...)。它,就是Reduce任務夢寐以求的輸入文件。。。
  除了做合并,復雜版本的OutputCollector,還具有分類的功能。分類,是通過Partitioner<K2, V2>來定義的,默認實現是HashPartitioner<K2, V2>,作業提交者可以通過JobConf的setPartitionerClass來自定義。分類的含義是什么呢,簡單的說,就是將Map任務的輸出,劃分到若干個文件中(通常與Reduce任務數目相等),使得每一個Reduce任務,可以處理某一類文件。這樣的好處是大大的,舉一個例子說明一下。比如有一個作業是進行單詞統計的,其Map任務的中間結果應該是以單詞為鍵,以單詞數量為值的文件。如果這時候只有一個Reduce任務,那還好說,從全部的Map任務那里收集文件過來,分別統計得到最后的輸出文件就好。但是,如果單Reduce任務無法承載此負載或效率太低,就需要多個Reduce任務并行執行。此時,再沿用之前的模式就有了問題。每個Reduce任務從一部分Map任務那里獲得輸入文件,但最終的輸出結果并不正確,因為同一個單詞可能在不同的Reduce任務那里都有統計,需要想方法把它們統計在一起才能獲得最后結果,這樣就沒有將Map/Reduce的作用完全發揮出來。這時候,就需要用到分類。如果此時有兩個Reduce任務,那么將輸出分成兩類,一類存放字母表排序較高的單詞,一類存放字母表排序低的單詞,每一個Reduce任務從所有的Map任務那里獲取一類的中間文件,得到自己的輸出結果。最終的結果,只需要把各個Reduce任務輸出的,拼接在一起就可以了。本質上,這就是將Reduce任務的輸入,由垂直分割,變成了水平分割。Partitioner的作用,正是接受一個鍵值,返回一個分類的序號。它會在從緩存刷到文件之前做這個工作,其實只是多了一個文件名的選擇而已,別的邏輯都不需要變化。。。
  除了緩存、合并、分類等附加工作之外,復雜版本的OutputCollector還支持錯誤數據的跳過功能,在后面分布式將排錯的時候,還會提及,標記一下,按下不表。。。

  V. Reduce任務詳情

  理論上看,Reduce任務的整個執行流程要比Map任務更為的羅嗦一些,因為,它需要收集輸入文件,然后才能進行處理。Reduce任務,主要有這么三個步驟:CopySortReduce(參見ReduceTask的run方法)。所謂Copy,就是從執行各個Map任務的服務器那里,收羅到本地來。拷貝的任務,是由ReduceTask.ReduceCopier類來負責,它有一個內嵌類,叫MapOutputCopier,它會在一個單獨的線程內,負責某個Map任務服務器上文件的拷貝工作。遠程拷貝過來的內容(當然也可以是本地了...),作為MapOutput對象存在,它可以在內存中也可以序列化在磁盤上,這個根據內存使用狀況來自動調節。整個拷貝過程是一個動態的過程,也就是說它不是一次給好所有輸入信息就不再變化了。它會不停的調用TaskUmbilicalProtocol協議的getMapCompletionEvents方法,向其父TaskTracker詢問此作業個Map任務的完成狀況(TaskTracker要向JobTracker詢問后再轉告給它...)。當獲取到相關Map任務執行服務器的信息后,都會有一個線程開啟,做具體的拷貝工作。同時,還有一個內存Merger線程和一個文件Merger線程在同步工作,它們將新鮮下載過來的文件(可能在內存中,簡單的統稱為文件...),做著歸并排序,以此,節約時間,降低輸入文件的數量,為后續的排序工作減負。。。
  Sort,排序工作,就相當于上述排序工作的一個延續。它會在所有的文件都拷貝完畢后進行,因為雖然同步有做著歸并的工作,但可能留著尾巴,沒做徹底。經過這一個流程,該徹底的都徹底了,一個嶄新的、合并了所有所需Map任務輸出文件的新文件,誕生了。而那些千行萬苦從其他各個服務器網羅過來的Map任務輸出文件,很快的結束了它們的歷史使命,被掃地出門一掃而光,全部刪除了。。。
  所謂好戲在后頭,Reduce任務的最后一個階段,正是Reduce本身。它也會準備一個OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一個RecordWriter,collect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統,基本都是分布式文件系統,或者說是HDFS。而在輸入方面,ReduceTask會從JobConf那里調用一堆getMapOutputKeyClass、getMapOutputValueClass、getOutputKeyComparator等等之類的自定義類,構造出Reducer所需的鍵類型,和值的迭代類型Iterator(一個鍵到了這里一般是對應一組值)。具體實現頗為拐彎抹角,建議看一Merger.MergeQueueRawKeyValueIteratorReduceTask.ReduceValuesIterator等等之類的實現。有了輸入,有了輸出,不斷循環調用自定義的Reducer,最終,Reduce階段完成。。。

  VI. 分布式支持

  1、服務器正確性保證

  Hadoop Map/Reduce服務器狀況和HDFS很類似,由此可知,救死扶傷的方法也是大同小異。廢話不多說了,直接切正題。同作為客戶端,Map/Reduce的客戶端只是將作業提交,就開始搬個板凳看戲,沒有占茅坑的行動。因此,一旦它掛了,也就掛了,不傷大雅。而任務服務器,也需要隨時與作業服務器保持心跳聯系,一旦有了問題,作業服務器可以將其上運行的任務,移交給它人完成。作業服務器,作為一個單點,非常類似的是利用還原點(等同于HDFS的鏡像)和歷史記錄(等同于HDFS的日志),來進行恢復。其上,需要持久化用于恢復的內容,包含作業狀況、任務狀況、各個任務嘗試的工作狀況等。有了這些內容,再加上任務服務器的動態注冊,就算挪了個窩,還是很容易恢復的。JobHistory是歷史記錄相關的一個靜態類,本來,它也就是一個干寫日志活的,只是在Hadoop的實現中,對日志的寫入做了面向對象的封裝,同時又大量用到觀察者模式做了些嵌入,使得看起來不是那么直觀。本質上,它就是打開若干個日志文件,利用各類接口來往里面寫內容。只不過,這些日志,會放在分布式文件系統中,就不需要像HDFS那樣,來一個SecondXXX隨時候命,由此可見,有巨人在腳下踩著,真好。JobTracker.RecoveryManager類是作業服務器中用于進行恢復相關的事情,當作業服務器啟動的時候,會調用其recover方法,恢復日志文件中的內容。其中步驟,注釋中寫的很清楚,請自行查看。。。

  2、任務執行的正確和速度

  整個作業流程的執行,秉承著木桶原理。執行的最慢的Map任務和Reduce任務,決定了系統整體執行時間(當然,如果執行時間在整個流程中占比例很小的話,也許就微不足道了...)。因此,盡量加快最慢的任務執行速度,成為提高整體速度關鍵。所使用的策略,簡約而不簡單,就是一個任務多次執行。當所有未執行的任務都分配出去了,并且先富起來的那部分任務已經完成了,并還有任務服務器孜孜不倦的索取任務的時候,作業服務器會開始炒剩飯,把那些正在吭哧吭哧在某個服務器上慢慢執行的任務,再把此任務分配到一個新的任務服務器上,同時執行。兩個服務器各盡其力,成王敗寇,先結束者的結果將被采納。這樣的策略,隱含著一個假設,就是我們相信,輸入文件的分割算法是公平的,某個任務執行慢,并不是由于這個任務本身負擔太重,而是由于服務器不爭氣負擔太重能力有限或者是即將撒手西去,給它換個新環境,人挪死樹挪活事半功倍。。。

  當然,肯定有哽咽的任務,不論是在哪個服務器上,都無法順利完成。這就說明,此問題不在于服務器上,而是任務本身天資有缺憾。缺憾在何處?每個作業,功能代碼都是一樣的,別的任務成功了,就是這個任務不成功,很顯然,問題出在輸入那里。輸入中有非法的輸入條目,導致程序無法辨識,只能揮淚惜別。說到這里,解決策略也浮出水面了,三十六計走位上,惹不起,還是躲得起的。在MapTask中的MapTask.SkippingRecordReader<K, V>和ReduceTask里的ReduceTask.SkippingReduceValuesIterator<KEY,VALUE>,都是用于干這個事情的。它們的原理很簡單,就是在讀一條記錄前,把當前的位置信息,封裝成SortedRanges.Range對象,經由Task的reportNextRecordRange方法提交到TaskTracker上去。TaskTracker會把這些內容,擱在TaskStatus對象中,隨著心跳消息,匯報到JobTracker上面。這樣,作業服務器就可以隨時隨刻了解清楚,每個任務正讀取在那個位置,一旦出錯,再次執行的時候,就在分配的任務信息里面添加一組SortedRanges信息。MapTask或ReduceTask讀取的時候,會看一下這些區域,如果當前區域正好處于上述雷區,跳過不讀。如此反復,正可謂,道路曲折,前途光明啊。。。

  VII. 總結

  對于Map/Reduce而言,真正的困難,在于提高其適應能力,打造一款能夠包治百病的執行框架。Hadoop已經做得很好了,但只有真正搞清楚了整個流程,你才能幫助它做的更好。。。
0
0
 
 
 

文章列表

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()