其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。
测试数据
我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:
SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666
上面文本数据一行一行存储,一行包含4部分,分别表示:
- 国家代码
- 起始时间
- 截止时间
- 随机成本/权重估值
各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。
编程实现
因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum.max; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class GlobalCostMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private final static LongWritable costValue = new LongWritable(0); private Text code = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // a line, such as 'SG 253654006139495 253654006164392 619850464' String line = value.toString(); String[] array = line.split("\\s"); if (array.length == 4) { String countryCode = array[0]; String strCost = array[3]; long cost = 0L; try { cost = Long.parseLong(strCost); } catch (NumberFormatException e) { cost = 0L; } if (cost != 0) { code.set(countryCode); costValue.set(cost); context.write(code, costValue); } } } }
上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。
接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum.max; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class GlobalCostReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long max = 0L; Iterator<LongWritable> iter = values.iterator(); while (iter.hasNext()) { LongWritable current = iter.next(); if (current.get() > max) { max = current.get(); } } context.write(key, new LongWritable(max)); } }
上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。
下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum.max; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class GlobalMaxCostDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: maxcost <in> <out>"); System.exit(2); } Job job = new Job(conf, "max cost"); job.setJarByClass(GlobalMaxCostDriver.class); job.setMapperClass(GlobalCostMapper.class); job.setCombinerClass(GlobalCostReducer.class); job.setReducerClass(GlobalCostReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); int exitFlag = job.waitForCompletion(true) ? 0 : 1; System.exit(exitFlag); } }
运行程序
首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:
- 编译代码(我直接使用Maven进行),打成jar文件
shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-max-cost.jar -C ./ org
- 拷贝上面生成的jar文件,到NameNode环境中
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ scp shirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-max-cost.jar ./ global-max-cost.jar
- 上传待处理的数据文件
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/
- 运行我们编写MapReduce任务,计算最大值
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-max-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.max.GlobalMaxCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/cost
运行过程控制台输出内容,大概如下所示:
13/03/22 16:30:16 INFO input.FileInputFormat: Total input paths to process : 1 13/03/22 16:30:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/03/22 16:30:16 WARN snappy.LoadSnappy: Snappy native library not loaded 13/03/22 16:30:16 INFO mapred.JobClient: Running job: job_201303111631_0004 13/03/22 16:30:17 INFO mapred.JobClient: map 0% reduce 0% 13/03/22 16:30:33 INFO mapred.JobClient: map 22% reduce 0% 13/03/22 16:30:36 INFO mapred.JobClient: map 28% reduce 0% 13/03/22 16:30:45 INFO mapred.JobClient: map 52% reduce 9% 13/03/22 16:30:48 INFO mapred.JobClient: map 57% reduce 9% 13/03/22 16:30:57 INFO mapred.JobClient: map 80% reduce 9% 13/03/22 16:31:00 INFO mapred.JobClient: map 85% reduce 19% 13/03/22 16:31:10 INFO mapred.JobClient: map 100% reduce 28% 13/03/22 16:31:19 INFO mapred.JobClient: map 100% reduce 100% 13/03/22 16:31:24 INFO mapred.JobClient: Job complete: job_201303111631_0004 13/03/22 16:31:24 INFO mapred.JobClient: Counters: 29 13/03/22 16:31:24 INFO mapred.JobClient: Job Counters 13/03/22 16:31:24 INFO mapred.JobClient: Launched reduce tasks=1 13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=76773 13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 13/03/22 16:31:24 INFO mapred.JobClient: Launched map tasks=7 13/03/22 16:31:24 INFO mapred.JobClient: Data-local map tasks=7 13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=40497 13/03/22 16:31:24 INFO mapred.JobClient: File Output Format Counters 13/03/22 16:31:24 INFO mapred.JobClient: Bytes Written=3029 13/03/22 16:31:24 INFO mapred.JobClient: FileSystemCounters 13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_READ=142609 13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_READ=448913653 13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=338151 13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3029 13/03/22 16:31:24 INFO mapred.JobClient: File Input Format Counters 13/03/22 16:31:24 INFO mapred.JobClient: Bytes Read=448912799 13/03/22 16:31:24 INFO mapred.JobClient: Map-Reduce Framework 13/03/22 16:31:24 INFO mapred.JobClient: Map output materialized bytes=21245 13/03/22 16:31:24 INFO mapred.JobClient: Map input records=10000000 13/03/22 16:31:24 INFO mapred.JobClient: Reduce shuffle bytes=18210 13/03/22 16:31:24 INFO mapred.JobClient: Spilled Records=12582 13/03/22 16:31:24 INFO mapred.JobClient: Map output bytes=110000000 13/03/22 16:31:24 INFO mapred.JobClient: CPU time spent (ms)=80320 13/03/22 16:31:24 INFO mapred.JobClient: Total committed heap usage (bytes)=1535639552 13/03/22 16:31:24 INFO mapred.JobClient: Combine input records=10009320 13/03/22 16:31:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=854 13/03/22 16:31:24 INFO mapred.JobClient: Reduce input records=1631 13/03/22 16:31:24 INFO mapred.JobClient: Reduce input groups=233 13/03/22 16:31:24 INFO mapred.JobClient: Combine output records=10951 13/03/22 16:31:24 INFO mapred.JobClient: Physical memory (bytes) snapshot=1706708992 13/03/22 16:31:24 INFO mapred.JobClient: Reduce output records=233 13/03/22 16:31:24 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4316872704 13/03/22 16:31:24 INFO mapred.JobClient: Map output records=10000000
- 验证Job结果输出
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat /user/xiaoxiang/output/cost/part-r-00000 AD 999974516 AE 999938630 AF 999996180 AG 999991085 AI 999989595 AL 999998489 AM 999976746 AO 999989628 AQ 999995031 AR 999953989 AS 999935982 AT 999999909 AU 999937089 AW 999965784 AZ 999996557 BA 999949773 BB 999987345 BD 999992272 BE 999925057 BF 999999220 BG 999971528 BH 999994900 BI 999978516 BJ 999977886 BM 999991925 BN 999986630 BO 999995482 BR 999989947 BS 999980931 BT 999977488 BW 999935985 BY 999998496 BZ 999975972 CA 999978275 CC 999968311 CD 999978139 CF 999995342 CG 999788112 CH 999997524 CI 999998864 CK 999968719 CL 999967083 CM 999998369 CN 999975367 CO 999999167 CR 999971685 CU 999976352 CV 999990543 CW 999987713 CX 999987579 CY 999982925 CZ 999993908 DE 999985416 DJ 999997438 DK 999963312 DM 999941706 DO 999945597 DZ 999973610 EC 999920447 EE 999949534 EG 999980522 ER 999980425 ES 999949155 ET 999987033 FI 999966243 FJ 999990686 FK 999966573 FM 999972146 FO 999988472 FR 999988342 GA 999982099 GB 999970658 GD 999996318 GE 999991970 GF 999982024 GH 999941039 GI 999995295 GL 999948726 GM 999967823 GN 999951804 GP 999904645 GQ 999988635 GR 999999672 GT 999972984 GU 999919056 GW 999962551 GY 999999881 HK 999970084 HN 999972628 HR 999986688 HT 999970913 HU 999997568 ID 999994762 IE 999996686 IL 999982184 IM 999987831 IN 999914991 IO 999968575 IQ 999990126 IR 999986780 IS 999973585 IT 999997239 JM 999982209 JO 999977276 JP 999983684 KE 999996012 KG 999991556 KH 999975644 KI 999994328 KM 999989895 KN 999991068 KP 999967939 KR 999992162 KW 999924295 KY 999977105 KZ 999992835 LA 999989151 LB 999963014 LC 999962233 LI 999986863 LK 999989876 LR 999897202 LS 999957706 LT 999999688 LU 999999823 LV 999945411 LY 999992365 MA 999922726 MC 999978886 MD 999996042 MG 999996602 MH 999989668 MK 999968900 ML 999990079 MM 999987977 MN 999969051 MO 999977975 MP 999995234 MQ 999913110 MR 999982303 MS 999974690 MT 999982604 MU 999988632 MV 999961206 MW 999991903 MX 999978066 MY 999995010 MZ 999981189 NA 999961177 NC 999961053 NE 999990091 NF 999989399 NG 999985037 NI 999965733 NL 999949789 NO 999993122 NP 999972410 NR 999956464 NU 999987046 NZ 999998214 OM 999967428 PA 999924435 PE 999981176 PF 999959978 PG 999987347 PH 999981534 PK 999954268 PL 999996619 PM 999998975 PR 999906386 PT 999993404 PW 999991278 PY 999985509 QA 999995061 RE 999952291 RO 999994148 RS 999999923 RU 999894985 RW 999980184 SA 999973822 SB 999972832 SC 999973271 SD 999963744 SE 999972256 SG 999977637 SH 999983638 SI 999980580 SK 999998152 SL 999999269 SM 999941188 SN 999990278 SO 999973175 SR 999975964 ST 999980447 SV 999999945 SX 999903445 SY 999988858 SZ 999992537 TC 999969540 TD 999999303 TG 999977640 TH 999968746 TJ 999983666 TK 999971131 TM 999958998 TN 999963035 TO 999947915 TP 999986796 TR 999995112 TT 999984435 TV 999971989 TW 999975092 TZ 999992734 UA 999970993 UG 999976267 UM 999998377 US 999912229 UY 999989662 UZ 999982762 VA 999975548 VC 999991495 VE 999997971 VG 999949690 VI 999990063 VN 999974393 VU 999953162 WF 999947666 WS 999970242 YE 999984650 YT 999994707 ZA 999998692 ZM 999973392 ZW 999928087
可见,结果是我们所期望的。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。