用 Hadoop 進行分布式并行編程, 第 1 部分

作者: 曹羽中  來源: IBM中國  發布時間: 2010-09-03 06:59  閱讀: 2797 次  推薦: 0   原文鏈接   [收藏]  
摘要:Hadoop 是一個實現了 MapReduce 計算模型的開源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運行于計算機集群上,完成海量數據的計算。本文將介紹 MapReduce 計算模型,分布式并行計算等基本概念,以及 Hadoop 的安裝部署和基本運行方法。

Hadoop 簡介

  Hadoop 是一個開源的可運行于大規模集群上的分布式并行編程框架,由于分布式存儲對于分布式編程來說是必不可少的,這個框架中還包含了一個分布式文件系統 HDFS( Hadoop Distributed File System )。也許到目前為止,Hadoop 還不是那么廣為人知,其最新的版本號也僅僅是 0.16,距離 1.0 似乎都還有很長的一段距離,但提及 Hadoop 一脈相承的另外兩個開源項目 Nutch 和 Lucene (三者的創始人都是 Doug Cutting ),那絕對是大名鼎鼎。Lucene 是一個用 Java 開發的開源高性能全文檢索工具包,它不是一個完整的應用程序,而是一套簡單易用的 API 。在全世界范圍內,已有無數的軟件系統,Web 網站基于 Lucene 實現了全文檢索功能,后來 Doug Cutting 又開創了第一個開源的 Web 搜索引擎(http://www.nutch.org) Nutch,它在 Lucene 的基礎上增加了網絡爬蟲和一些和 Web 相關的功能,一些解析各類文檔格式的插件等,此外,Nutch 中還包含了一個分布式文件系統用于存儲數據。從 Nutch 0.8.0 版本之后,Doug Cutting 把 Nutch 中的分布式文件系統以及實現 MapReduce 算法的代碼獨立出來形成了一個新的開源項 Hadoop。Nutch 也演化為基于 Lucene 全文檢索以及 Hadoop 分布式計算平臺的一個開源搜索引擎。

  基于 Hadoop,你可以輕松地編寫可處理海量數據的分布式并行程序,并將其運行于由成百上千個結點組成的大規模計算機集群上。從目前的情況來看,Hadoop 注定會有一個輝煌的未來:"云計算"是目前灸手可熱的技術名詞,全球各大 IT 公司都在投資和推廣這種新一代的計算模式,而 Hadoop 又被其中幾家主要的公司用作其"云計算"環境中的重要基礎軟件,如:雅虎正在借助 Hadoop 開源平臺的力量對抗 Google,除了資助 Hadoop 開發團隊外,還在開發基于 Hadoop 的開源項目 Pig,這是一個專注于海量數據集分析的分布式計算程序。Amazon 公司基于 Hadoop 推出了 Amazon S3 ( Amazon Simple Storage Service ),提供可靠,快速,可擴展的網絡存儲服務,以及一個商用的云計算平臺 Amazon EC2 ( Amazon Elastic Compute Cloud )。在 IBM 公司的云計算項目--"藍云計劃"中,Hadoop 也是其中重要的基礎軟件。Google 正在跟IBM合作,共同推廣基于 Hadoop 的云計算。

 

迎接編程方式的變革

  在摩爾定律的作用下,以前程序員根本不用考慮計算機的性能會跟不上軟件的發展,因為約每隔 18 個月,CPU 的主頻就會增加一倍,性能也將提升一倍,軟件根本不用做任何改變,就可以享受免費的性能提升。然而,由于晶體管電路已經逐漸接近其物理上的性能極限,摩爾定律在 2005 年左右開始失效了,人類再也不能期待單個 CPU 的速度每隔 18 個月就翻一倍,為我們提供越來越快的計算性能。Intel, AMD, IBM 等芯片廠商開始從多核這個角度來挖掘 CPU 的性能潛力,多核時代以及互聯網時代的到來,將使軟件編程方式發生重大變革,基于多核的多線程并發編程以及基于大規模計算機集群的分布式并行編程是將來軟件性能提升的主要途徑。

  許多人認為這種編程方式的重大變化將帶來一次軟件的并發危機,因為我們傳統的軟件方式基本上是單指令單數據流的順序執行,這種順序執行十分符合人類的思考習慣,卻與并發并行編程格格不入。基于集群的分布式并行編程能夠讓軟件與數據同時運行在連成一個網絡的許多臺計算機上,這里的每一臺計算機均可以是一臺普通的 PC 機。這樣的分布式并行環境的最大優點是可以很容易的通過增加計算機來擴充新的計算結點,并由此獲得不可思議的海量計算能力,同時又具有相當強的容錯能力,一批計算結點失效也不會影響計算的正常進行以及結果的正確性。Google 就是這么做的,他們使用了叫做 MapReduce 的并行編程模型進行分布式并行編程,運行在叫做 GFS ( Google File System )的分布式文件系統上,為全球億萬用戶提供搜索服務。

  Hadoop 實現了 Google 的 MapReduce 編程模型,提供了簡單易用的編程接口,也提供了它自己的分布式文件系統 HDFS,與 Google 不同的是,Hadoop 是開源的,任何人都可以使用這個框架來進行并行編程。如果說分布式并行編程的難度足以讓普通程序員望而生畏的話,開源的 Hadoop 的出現極大的降低了它的門檻,讀完本文,你會發現基于 Hadoop 編程非常簡單,無須任何并行開發經驗,你也可以輕松的開發出分布式的并行程序,并讓其令人難以置信地同時運行在數百臺機器上,然后在短時間內完成海量數據的計算。你可能會覺得你不可能會擁有數百臺機器來運行你的并行程序,而事實上,隨著"云計算"的普及,任何人都可以輕松獲得這樣的海量計算能力。例如現在 Amazon 公司的云計算平臺 Amazon EC2 已經提供了這種按需計算的租用服務,有興趣的讀者可以去了解一下。

  掌握一點分布式并行編程的知識對將來的程序員是必不可少的,Hadoop 是如此的簡便好用,何不嘗試一下呢?也許你已經急不可耐的想試一下基于 Hadoop 的編程是怎么回事了,但畢竟這種編程模型與傳統的順序程序大不相同,掌握一點基礎知識才能更好地理解基于 Hadoop 的分布式并行程序是如何編寫和運行的。因此本文會先介紹一下 MapReduce 的計算模型,Hadoop 中的分布式文件系統 HDFS, Hadoop 是如何實現并行計算的,然后才介紹如何安裝和部署 Hadoop 框架,以及如何運行 Hadoop 程序。

 

MapReduce 計算模型

  MapReduce 是 Google 公司的核心計算模型,它將復雜的運行于大規模集群上的并行計算過程高度的抽象到了兩個函數,Map 和 Reduce,這是一個令人驚訝的簡單卻又威力巨大的模型。適合用 MapReduce 來處理的數據集(或任務)有一個基本要求: 待處理的數據集可以分解成許多小的數據集,而且每一個小數據集都可以完全并行地進行處理。
 

圖 1. MapReduce 計算流程

  圖一說明了用 MapReduce 來處理大數據集的過程,這個 MapReduce 的計算過程簡而言之,就是將大數據集分解為成百上千的小數據集,每個(或若干個)數據集分別由集群中的一個結點(一般就是一臺普通的計算機)進行處理并生成中間結果,然后這些中間結果又由大量的結點進行合并,形成最終結果。

  計算模型的核心是 Map 和 Reduce 兩個函數,這兩個函數由用戶負責實現,功能是按一定的映射規則將輸入的<key,value> 對轉換成另一個或一批<key,value> 對輸出。
 

表一 Map 和 Reduce 函數

函數 輸入 輸出 說明
Map <k1,v1> List(<k2,v2>) 1.將小數據集進一步解析成一批<key,value> 對,輸入 Map 函數中進行處理。
2.每一個輸入的<k1,v1> 會輸出一批<k2,v2>。<k2,v2> 是計算的中間結果。
Reduce <k2,List(v2)> <k3,v3> 輸入的中間結果<k2,List(v2)> 中的 List(v2)表示是一批屬于同一個 k2 的value

 

  以一個計算文本文件中每個單詞出現的次數的程序為例,<k1,v1> 可以是<行在文件中的偏移位置,文件中的一行>,經 Map 函數映射之后,形成一批中間結果<單詞,出現次數>,而 Reduce 函數則可以對中間結果進行處理,將相同單詞的出現次數進行累加,得到每個單詞的總的出現次數。

  基于 MapReduce 計算模型編寫分布式并行程序非常簡單,程序員的主要編碼工作就是實現 Map 和 Reduce 函數,其它的并行編程中的種種復雜問題,如分布式存儲,工作調度,負載平衡,容錯處理,網絡通信等,均由 MapReduce 框架(比如 Hadoop )負責處理,程序員完全不用操心。

 

集群上的并行計算

  MapReduce 計算模型非常適合在大量計算機組成的大規模集群上并行運行。圖一中的每一個 Map 任務和每一個 Reduce 任務均可以同時運行于一個單獨的計算結點上,可想而知其運算效率是很高的,那么這樣的并行計算是如何做到的呢?

數據分布存儲

  Hadoop 中的分布式文件系統 HDFS 由一個管理結點( NameNode )和N個數據結點( DataNode )組成,每個結點均是一臺普通的計算機。在使用上同我們熟悉的單機上的文件系統非常類似,一樣可以建目錄,創建,復制,刪除文件,查看文件內容等。但其底層實現上是把文件切割成 Block,然后這些 Block 分散地存儲于不同的 DataNode 上,每個 Block 還可以復制數份存儲于不同的 DataNode 上,達到容錯容災之目的。NameNode 則是整個 HDFS 的核心,它通過維護一些數據結構,記錄了每一個文件被切割成了多少個 Block,這些 Block 可以從哪些 DataNode 中獲得,各個 DataNode 的狀態等重要信息。如果你想了解更多的關于 HDFS 的信息,可進一步閱讀參考資料: The Hadoop Distributed File System:Architecture and Design

分布式并行計算

  Hadoop 中有一個作為主控的 JobTracker,用于調度和管理其它的 TaskTracker, JobTracker 可以運行于集群中任一臺計算機上。TaskTracker 負責執行任務,必須運行于 DataNode 上,即 DataNode 既是數據存儲結點,也是計算結點。 JobTracker 將 Map 任務和 Reduce 任務分發給空閑的 TaskTracker,讓這些任務并行運行,并負責監控任務的運行情況。如果某一個 TaskTracker 出故障了,JobTracker 會將其負責的任務轉交給另一個空閑的 TaskTracker 重新運行。

本地計算

  數據存儲在哪一臺計算機上,就由這臺計算機進行這部分數據的計算,這樣可以減少數據在網絡上的傳輸,降低對網絡帶寬的需求。在 Hadoop 這樣的基于集群的分布式并行系統中,計算結點可以很方便地擴充,而因它所能夠提供的計算能力近乎是無限的,但是由是數據需要在不同的計算機之間流動,故網絡帶寬變成了瓶頸,是非常寶貴的,“本地計算”是最有效的一種節約網絡帶寬的手段,業界把這形容為“移動計算比移動數據更經濟”。
 

圖 2.分布存儲與并行計算 

任務粒度

  把原始大數據集切割成小數據集時,通常讓小數據集小于或等于 HDFS 中一個 Block 的大小(缺省是 64M),這樣能夠保證一個小數據集位于一臺計算機上,便于本地計算。有 M 個小數據集待處理,就啟動 M 個 Map 任務,注意這 M 個 Map 任務分布于 N 臺計算機上并行運行,Reduce 任務的數量 R 則可由用戶指定。

Partition

  把 Map 任務輸出的中間結果按 key的范圍劃分成 R 份( R 是預先定義的 Reduce 任務的個數),劃分時通常使用 hash 函數如: hash(key) mod R,這樣可以保證某一段范圍內的 key,一定是由一個 Reduce 任務來處理,可以簡化 Reduce 的過程。

Combine

  在 partition 之前,還可以對中間結果先做 combine,即將中間結果中有相同 key的<key,value> 對合并成一對。combine 的過程與 Reduce 的過程類似,很多情況下就可以直接使用 Reduce 函數,但 combine 是作為 Map 任務的一部分,在執行完 Map 函數后緊接著執行的。Combine 能夠減少中間結果中<key,value> 對的數目,從而減少網絡流量。

Reduce 任務從 Map 任務結點取中間結果

  Map 任務的中間結果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盤。中間結果文件的位置會通知主控 JobTracker, JobTracker 再通知 Reduce 任務到哪一個 DataNode 上去取中間結果。注意所有的 Map 任務產生中間結果均按其 Key用同一個 Hash 函數劃分成了 R 份,R 個 Reduce 任務各自負責一段 Key區間。每個 Reduce 需要向許多個 Map 任務結點取得落在其負責的 Key區間內的中間結果,然后執行 Reduce 函數,形成一個最終的結果文件。

任務管道

有 R 個 Reduce 任務,就會有 R 個最終結果,很多情況下這 R 個最終結果并不需要合并成一個最終結果。因為這 R 個最終結果又可以做為另一個計算任務的輸入,開始另一個并行計算任務。

Hadoop 初體驗

  Hadoop 支持 Linux及 Windows 操作系統,但其官方網站聲明 Hadoop 的分布式操作在 Windows 上未做嚴格測試,建議只把 Windows 作為 Hadoop 的開發平臺。在 Windows 環境上的安裝步驟如下( Linux平臺類似,且更簡單一些):

  1. 在 Windows 下,需要先安裝 Cgywin,安裝 Cgywin 時注意一定要選擇安裝 openssh (在 Net category)。安裝完成之后,把 Cgywin 的安裝目錄如 c:\cygwin\bin 加到系統環境變量 PATH 中,這是因為運行 Hadoop 要執行一些 linux環境下的腳本和命令。
  2. 安裝 Java 1.5.x,并將 JAVA_HOME 環境變量設置為 Java 的安裝根目錄如 C:\Program Files\Java\jdk1.5.0_01。
  3. 到 Hadoop 官方網站 http://hadoop.apache.org下載Hadoop Core,最新的穩定版本是 0.16.0.將下載后的安裝包解壓到一個目錄,本文假定解壓到 c:\hadoop-0.16.0。
  4. 修改 conf/hadoop-env.sh 文件,在其中設置 JAVA_HOME 環境變量: export JAVA_HOME="C:\Program Files\Java\jdk1.5.0_01”(因為路徑中 Program Files 中間有空格,一定要用雙引號將路徑引起來)

  至此,一切就緒,可以運行 Hadoop 了。以下的運行過程,需要啟動 cygwin,進入模擬 Linux環境。在下載的 Hadoop Core 包中,帶有幾個示例程序并且已經打包成了 hadoop-0.16.0-examples.jar。其中有一個 WordCount 程序,功能是統計一批文本文件中各個單詞出現的次數,我們先來看看怎么運行這個程序。Hadoop 共有三種運行模式: 單機(非分布式)模式,偽分布式運行模式,分布式運行模式,其中前兩種運行模式體現不了 Hadoop 分布式計算的優勢,并沒有什么實際意義,但對程序的測試及調試很有幫助,我們先從這兩種模式入手,了解基于 Hadoop 的分布式并行程序是如何編寫和運行的。

單機(非分布式)模式

  這種模式在一臺單機上運行,沒有分布式文件系統,而是直接讀寫本地操作系統的文件系統。

代碼清單1          

代碼清單1
 
$ cd /cygdrive/c/hadoop-0.16.0
$ mkdir test-in
$ cd test-in
#在 test-in 目錄下創建兩個文本文件, WordCount 程序將統計其中各個單詞出現次數
$ echo "hello world bye world" >file1.txt
$ echo "hello hadoop goodbye hadoop" >file2.txt
$ cd ..
$ bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out
#執行完畢,下面查看執行結果:
$ cd test-out
$ cat part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

注意事項:運行 bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out 時,務必注意第一個參數是 jar,不是-jar,當你用-jar 時,不會告訴你是參數錯了,報告出來的錯誤信息是:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/util/ProgramDriver,筆者當時以為是 classpath 的設置問題,浪費了不少時間。通過分析 bin/hadoop 腳本可知,-jar 并不是 bin/hadoop 腳本定義的參數,此腳本會把-jar 作為 Java 的參數,Java 的-jar 參數表示執行一個 Jar 文件(這個 Jar 文件必須是一個可執行的 Jar,即在 MANIFEST 中定義了主類),此時外部定義的 classpath 是不起作用的,因而會拋出 java.lang.NoClassDefFoundError 異常。而 jar 是 bin/hadoop 腳本定義的參數,會調用 Hadoop 自己的一個工具類 RunJar,這個工具類也能夠執行一個 Jar 文件,并且外部定義的 classpath 有效。

偽分布式運行模式

  這種模式也是在一臺單機上運行,但用不同的 Java 進程模仿分布式運行中的各類結點( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),請注意分布式運行中的這幾個結點的區別:從分布式存儲的角度來說,集群中的結點由一個 NameNode 和若干個 DataNode 組成,另有一個 Secondary NameNode 作為 NameNode 的備份。從分布式應用的角度來說,集群中的結點由一個 JobTracker 和若干個 TaskTracker 組成,JobTracker 負責任務的調度,TaskTracker 負責并行執行任務。TaskTracker 必須運行在 DataNode 上,這樣便于數據的本地計算。JobTracker 和 NameNode 則無須在同一臺機器上。

(1) 按代碼清單2修改 conf/hadoop-site.xml。注意 conf/hadoop-default.xml 中是 Hadoop 缺省的參數,你可以通過讀此文件了解 Hadoop 中有哪些參數可供配置,但不要修改此文件。可通過修改 conf/hadoop-site.xml 改變缺省參數值,此文件中設置的參數值會覆蓋 conf/hadoop-default.xml 的同名參數。

代碼清單 2             

代碼清單 2
 
<configuration>
<property>
<name>fs.default.name</name>
<value>localhost:9000</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

參數 fs.default.name 指定 NameNode 的 IP 地址和端口號。缺省值是 file:///,表示使用本地文件系統,用于單機非分布式模式。此處我們指定使用運行于本機 localhost 上的 NameNode。

  參數 mapred.job.tracker 指定 JobTracker 的 IP 地址和端口號。缺省值是 local,表示在本地同一 Java 進程內執行 JobTracker 和 TaskTracker,用于單機非分布式模式。此處我們指定使用運行于本機 localhost 上的 JobTracker (用一個單獨的 Java 進程做 JobTracker )。

  參數 dfs.replication 指定 HDFS 中每個 Block 被復制的次數,起數據冗余備份的作用。在典型的生產系統中,這個數常常設置為3。

(2) 配置 SSH,如代碼清單3所示:

代碼清單3

 
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

  配置完后,執行一下 ssh localhost,確認你的機器可以用 SSH 連接,并且連接時不需要手工輸入密碼。

(3)格式化一個新的分布式文件系統,如代碼清單4所示:
代碼清單 4

 
$ cd /cygdrive/c/hadoop-0.16.0
$ bin/hadoop namenode –format

(4)啟動 hadoop 進程,如代碼清單5所示。控制臺上的輸出信息應該顯示啟動了 namenode, datanode, secondary namenode, jobtracker, tasktracker。啟動完成之后,通過 ps –ef 應該可以看到啟動了5個新的 java 進程。
代碼清單 5               

 

 
$ bin/start-all.sh
$ ps –ef

(5)運行wordcount 應用,如代碼清單6所示:
代碼清單 6                

代碼清單 6
 
$ bin/hadoop dfs -put ./test-in input
#將本地文件系統上的 ./test-in 目錄拷到 HDFS 的根目錄上,目錄名改為 input
#執行 bin/hadoop dfs –help 可以學習各種 HDFS 命令的使用。
$ bin/hadoop jar hadoop-0.16.0-examples.jar wordcount input output
#查看執行結果:
#將文件從 HDFS 拷到本地文件系統中再查看:
$ bin/hadoop dfs -get output output
$ cat output/*
#也可以直接查看
$ bin/hadoop dfs -cat output/*
$ bin/stop-all.sh #停止 hadoop 進程

 

故障診斷

  1. 執行$ bin/start-all.sh 啟動 Hadoop 進程后,會啟動5個 java 進程,同時會在/tmp 目錄下創建五個 pid 文件記錄這些進程 ID 號。通過這五個文件,可以得知 namenode, datanode, secondary namenode, jobtracker, tasktracker 分別對應于哪一個 Java 進程。當你覺得 Hadoop 工作不正常時,可以首先查看這5個 java 進程是否在正常運行。
  2. 使用web 接口。訪問 http://localhost:50030 可以查看 JobTracker 的運行狀態。訪問 http://localhost:50060 可以查看 TaskTracker 的運行狀態。訪問 http://localhost:50070 可以查看 NameNode 以及整個分布式文件系統的狀態,瀏覽分布式文件系統中的文件以及 log 等。
  3. 查看${HADOOP_HOME}/logs 目錄下的 log 文件,namenode, datanode, secondary namenode, jobtracker, tasktracker 各有一個對應的 log 文件,每一次運行的計算任務也有對應用 log 文件。分析這些 log 文件有助于找到故障原因。

結束語

   現在,你已經了解了 MapReduce 計算模型,分布式文件系統 HDFS,分布式并行計算等的基本原理,并且有了一個可以運行的 Hadoop 環境,運行了一個基于 Hadoop 的并行程序。

用 Hadoop 進行分布式并行編程,第 1 部分

用 Hadoop 進行分布式并行編程,第2 部分
0
0
 
 
 

文章列表

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()