最近做了一個小的mapreduce程序,主要目的是計算環比值最高的前5名,本來打算使用spark計算,可是本人目前spark還只是簡單看了下,因此就先改用mapreduce計算了,今天和大家分享下這個例子,也算是對自己寫的程序的總結了。
首先解釋下環比,例如我們要算本周的環比,那么計算方式就是本周的數據和上周數字的差值除以上周數值就是環比了,如果是月的環比就是本月和上月數據的差值除以上月數字就是本月環比了。不過本mapreduce實例不會直接算出比值,只是簡單求出不同時間段數值的差值,最終環比結果由業務系統進行運算了。
下面看看本人構造的測試數據了,測試數據分成兩個文件,文件一的內容如下所示:
guanggu,1;90 hongshan,1;80 xinzhou,1;70 wuchang,1;95 hankou,1;85 hanyang,1;75
第二個文件的測試數據如下:
guanggu,2;66 hongshan,2;68 xinzhou,2;88 wuchang,2;59 hankou,2;56 hanyang,2;38
這里每行第一列的字段就是key了,key和value使用逗號分割,1;90是value值,value值包含兩個內容,1為時間段標記,90就是數值,大家可以看到同一個key會有兩個不同的時間段(使用1和2來標記)。
Mapreduce的運算邏輯如下:首先第一步我們要求出環比數值,第二步就是排序了,做這個算法我曾考慮許久就是想把求環比值和排序兩個過程合并,但是最后發現很難做到,只好將整個運算過程拆分成兩個不同mapreduce,第一個mapreduce計算環比,第二個進行排序,二者是迭代關系。這里解釋下分成兩個mapreduce原因吧,主要原因就是最原始數據很難把兩個不同時間段的數據按照key合并在一起變成一行數據,因此mapreduce計算時候必須有一個過程就是執行相同key合并操作,因此不得不分成兩個步驟完成計算。
接下來就是具體代碼了,首先是第一個mapreduce,用來計算環比值的mapreduce了,它的map實現代碼如下:
import java.io.IOException; import org.apache.hadoop.io.Text; // 使用輸入為object,text,輸出為Text,Text的數據結構,Object其實是行號,在本計算里意義不大,Text就是每行的內容 public class MrByAreaHBMap extends org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, Text>{ private static String firstSeparator = ",";//每行的key和value值使用逗號分割 @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { /* 本map的邏輯非常簡單,就是從行里拆分key和value,對于有些初學者可能疑惑,我們到底如何讓相同的key合并在一起了?這個就要看reduce計算了*/ Text areaKey = new Text();// reduce輸入是Text類型 Text areaVal = new Text();// reduce輸入是Text類型 String line = value.toString(); if (line != null && !line.equals("")){ String[] arr = line.split(firstSeparator); areaKey.set(arr[0]); areaVal.set(arr[1]); context.write(areaKey, areaVal); } } }
下面是reduce代碼了,具體如下:
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MrByAreaHBReduce extends Reducer<Text, Text, Text, Text>{ private static String firstSeparator = ";"; private static String preFlag = "1"; private static String nextFlag = "2"; /*reduce的輸入也是key,value的形式,不過這個輸入是會將map里相同的key的值進行合并,合并形式就是一個數組形式,不過reduce方法里是通過迭代器進行數值處理*/ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int num1 = 0,num2 = 0,hbNum = 0; for(Text value : values){ String inVal = value.toString(); String[] arr = inVal.split(firstSeparator); // 下面的邏輯是通過不同時間段標記獲取不同時間段數值 if (arr[0].equals(preFlag)){ num1 = Integer.valueOf(arr[1]); } if (arr[0].equals(nextFlag)){ num2 = Integer.valueOf(arr[1]); } } hbNum = num1 - num2;// 這里計算環比 Text valueText = new Text(); valueText.set(hbNum + ""); Text retKey = new Text(); /* 對reduce的key進行了修改,將原來key和各個時間段的數值合并在一起,這樣讀取計算結果時候就可以讀取到原始計算數據了,這是key,value計算模式可以簡陋的無奈之舉*/ retKey.set(key.toString() + firstSeparator + num1 + firstSeparator + num2); context.write(valueText,retKey); } }
求環比的mapredue代碼介紹結束了,下面是排序的算法,排序算法更加簡單,在計算環比的mapreduce輸出里我將環比值和原始key進行了互換,然后輸出到結果文件里,這個結果文件就是第二個mapreduce的輸入了,下面我們就要對這個新key進行排序,mapredcue計算模型里從map到reduce有默認的排序機制,如果map輸出的key是字符類型那么排序規則就是按照字典進行排序,如果key是數字,那么就會按照數字由小到大進行排序,下面就是排序mapreduce的具體算法,map代碼如下:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MrByAreaSortMap extends Mapper<LongWritable, Text, IntWritable, Text> { /* 我們需要的排序是按照key的數值排序,不過這個排序是map的輸出才做的,因此代碼里輸出的key使用了IntWritable類型 其實排序的map邏輯非常簡單就是保證map的輸出key是數字類型即可 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); /*reduce的輸出結果文件格式是按照空格分隔的,不過也搞不清有幾個空格,或者是tab分割了,這里使用正則表達式s+就不怕多少空格和tab了*/ String[] arr = line.split("\\s+"); IntWritable outputKey = new IntWritable(Integer.valueOf(arr[0])); Text outputValue = new Text(); outputValue.set(arr[1]); context.write(outputKey, outputValue); } }
reduce代碼如下:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* reduce代碼很讓人吃驚吧,就是把map結果原樣輸出即可 */ public class MrByAreaSortReduce extends Reducer<IntWritable, Text, IntWritable, Text> { @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text text : values){ context.write(key, text); } } }
代碼里的注釋對代碼邏輯進行了詳細的解釋,這里就不累述了。
下面就是調用兩個mapreduce的main函數了,也就是我們該如何執行mapreduce的方式,這個main函數還是非常有特點的,特點一就是兩個mapreduce有迭代關系,具體就是第一個mapredcue執行完畢后第二個mapredcue才能執行,或者說第一個mapredcue的輸出就是第二個mapredcue的輸入,特點二就是排序計算里我們使用了map到reduce過程及shuffle過程里的默認排序機制,那么該機制運用可不是像mapreduce代碼那么簡單了,其實背后需要我們更加深入理解mapreduce的原理,這里我們直接看代碼了,代碼如下:
mport java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MrByAreaJob { public static void main(String[] args) throws IOException { // 一個mapreduce就是一個job 一個job需要一個單獨的Configuration,我開始讓兩個job公用Configuration,最后mr報錯 Configuration conf01 = new Configuration(); ControlledJob conJobHB = new ControlledJob(conf01); // 下面代碼很多文章里都會提到這里就不多說了 Job jobHB = new Job(conf01,"hb"); jobHB.setJarByClass(MrByAreaJob.class); jobHB.setMapperClass(MrByAreaHBMap.class); jobHB.setReducerClass(MrByAreaHBReduce.class); jobHB.setMapOutputKeyClass(Text.class); jobHB.setMapOutputValueClass(Text.class); jobHB.setOutputKeyClass(Text.class); jobHB.setOutputValueClass(Text.class); conJobHB.setJob(jobHB); FileInputFormat.addInputPath(jobHB, new Path(args[0])); FileOutputFormat.setOutputPath(jobHB, new Path(args[1])); Configuration conf02 = new Configuration(); Job jobSort = new Job(conf02,"sort"); jobSort.setJarByClass(MrByAreaJob.class); jobSort.setMapperClass(MrByAreaSortMap.class); jobSort.setReducerClass(MrByAreaSortReduce.class); // Partitioner是shuffle的一個步驟,一個Partitioner對應一個reduce // 假如這個mapredue有多個reduce,我們如何保證排序的全局一致性,因此這里需要進行處理 jobSort.setPartitionerClass(PartitionByArea.class); // map對數值排序默認是由小到大,但是需求是由大到小,因此需要我們改變這種排序 jobSort.setSortComparatorClass(IntKeyComparator.class); jobSort.setMapOutputKeyClass(IntWritable.class); jobSort.setMapOutputValueClass(Text.class); jobSort.setOutputKeyClass(IntWritable.class); jobSort.setOutputValueClass(Text.class); ControlledJob conJobSort = new ControlledJob(conf02); conJobSort.setJob(jobSort); // 這里添加job的依賴關系 conJobSort.addDependingJob(conJobHB); // 可以看到第一個mapreduce的輸出就是第二個的輸入 FileInputFormat.addInputPath(jobSort, new Path(args[1])); FileOutputFormat.setOutputPath(jobSort, new Path(args[2])); // 主控制job JobControl mainJobControl = new JobControl("mainHBSort"); mainJobControl.addJob(conJobHB); mainJobControl.addJob(conJobSort); Thread t = new Thread(mainJobControl); t.start(); while(true){ if (mainJobControl.allFinished()){ System.out.println(mainJobControl.getSuccessfulJobList()); mainJobControl.stop(); break; } } } }
這里有兩個類還沒有介紹,一個是IntKeyComparator,這是為了保證排序的mapreduce結果是按數字由大到小排序,代碼如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class IntKeyComparator extends WritableComparator { protected IntKeyComparator() { super(IntWritable.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } }
另一個類就是PartitionByArea,這個是保證排序不會因為reduce設置的個數而不能保證排序的全局一致性,代碼具體如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PartitionByArea<IntWritable, Text> extends Partitioner<IntWritable, Text> { @Override public int getPartition(IntWritable key, Text value, int numReduceTasks) { int maxValue = 50; int keySection = 0; // numReduceTasks就是默認的reduce任務個數 if (numReduceTasks > 1 && key.hashCode() < maxValue){ int sectionValue = maxValue / (numReduceTasks - 1); int count = 0; while((key.hashCode() - sectionValue * count) > sectionValue){ count++; } keySection = numReduceTasks - 1 - count; } return keySection; } }
這里特別要講解的是PartitionByArea,這個原理我花了好一段時間才理解過來,partition是map輸出為reduce對應做的分區,一般一個partition對應一個reduce,如果我們將reduce任務槽設置為一個那么就不用更改Partition類,但是實際生產情況下reduce往往會配置多個,這個時候保證數據的整體排序就十分重要了,那么我們如何保證其數據的整體有序了,這個時候我們要找到輸入數據的最大值,然后讓最大值除以partition的數量的商值作為分割數據的邊界,這樣等分就可以保證數據的整體排序的有效性了。
現在所有的代碼都介紹完畢了,下面就是我們該如何讓這個代碼運行了,我在寫本代碼時候使用的是ide是eclipse,不過我沒有使用mapreduce插件,而是直接放在服務器上運行,下面我來描述下運行該mr的方式,具體如下:
首先我在裝有hadoop服務的服務器上使用root用戶創建一個屬于我自己的文件夾,這里文件夾的名字叫做xiajun,我通過ftp將源文件傳遞到xiajun目錄下的javafile文件夾,執行如下命令:
mkdir /xiajun/javafile javac –classpath /home/hadoop/hadoop/hadoop-core-0.20.2-cdh3u4.jar –d /xiajun/javaclass /xiajun/ javafile/*.java
以上命令是編譯源文件,將javafile文件夾的java代碼編譯到javaclass目錄下。
Jar –cvf /xiajun/mymr.jar –C /xiajun/javaclass/ .
這里將javaclass目錄下class文件打成jar包,放在xiajun目錄下。
接下來我們使用hadoop用戶登錄:
su – hadoop
之所以使用root用戶編譯,打jar包原因是我的hadoop用戶沒有權限上傳文件不得已而為之了。
我們首先將測試數據上傳到HDFS上,接下來執行如下命令:
cd /hadoop/bin
切換目錄到bin目錄下,然后執行:
hadoop jar mymr.jar cn.com.TestMain 輸入目錄 輸出目錄
這里輸入可以是具體文件也可以是目錄,輸出目錄在HDFS上要不存在,如果存在hadoop會無法確認任務是否已經執行完畢,就會強制終止任務。
兩個mapreduce迭代執行日志非常讓人失望,因此如果我們發現任務無法正常執行,我現在都是一個個mapredcue執行查看錯誤日志。
最后我們看看應用服務應該如何調用這個mapreduce程序,這里我使用遠程調用shell 的方式,代碼如下:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.Session; public class TestMain { /** * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) { String hostname = "192.168.1.200"; String username = "hadoop"; String pwd = "hadoop"; Connection conn = new Connection(hostname); Session sess = null; long begin = System.currentTimeMillis(); try { conn.connect(); boolean isAuthenticated = conn.authenticateWithPassword(username, pwd); sess = conn.openSession(); sess.execCommand("cd hadoop/bin && hadoop jar /xiajun/mymr.jar com.test.mr.MrByAreaJob /xiajun/areaHBinput /xiajun/areaHBoutput58 /xiajun/areaHBoutput68"); InputStream stdout = sess.getStdout(); BufferedReader br = new BufferedReader(new InputStreamReader(stdout)); StringBuilder sb = new StringBuilder(); while(true){ String line = br.readLine(); if (line == null) break; sb.append(line); } System.out.println(sb.toString()); long end = System.currentTimeMillis(); System.out.println("耗時:" + (begin - end)/1000 + "秒"); } catch (IOException e) { e.printStackTrace(); }finally{ sess.close(); conn.close(); } } }
好了,本文就此結束了。
文章列表