现在,我们不是计算一个最大值了(想了解如何计算最大值,可以参考Hadoop MapReduce编程:计算最大值),而是计算一个最大值和一个最小值。实际上,实现Mapper和Reducer也是非常简单的,但是我们要输出结果,需要能够区分出最大值和最小值,并同时输出结果,这就需要自定义自己的输出类型,以及需要定义输出格式。
测试数据
数据格式,如下所示:
SG 253654006139495 253654006164392 619850464 KG 253654006225166 253654006252433 743485698 UZ 253654006248058 253654006271941 570409379 TT 253654006282019 253654006286839 23236775 BE 253654006276984 253654006301435 597874033 BO 253654006293624 253654006315946 498265375 SR 253654006308428 253654006330442 484613339 SV 253654006320312 253654006345405 629640166 LV 253654006330384 253654006359891 870680704 FJ 253654006351709 253654006374468 517965666
上面文本数据一行一行存储,一行包含4部分,分别表示:
- 国家代码
- 起始时间
- 截止时间
- 随机成本/权重估值
各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的极值(最大值和最小值)。
编程实现
首先,我们应该考虑需要实现哪些内容,才能满足我们的编程需要。
Mapper中输入数据类型分别为:K1=>LongWritable,V1=>Text,K1表示文本文件中行偏移量,V1表示一行的文本内容;输出数据类型分别为:K2=>Text,V2=>LongWritable,K2表示我们解析出来的国家代码,是字符串类型,V2表示成本估算值,是一个数字类型。
而Reducer的输入即为<K2, list>,输出我们设计为,K3=>Text,是国家代码,V3是我们自己定义的类型,这个类中应该包含最大值和最小值。
另外,因为Reducer运行阶段我们得到最终的输出,而且是输出到HDFS中。我们设计了输出对象的类型,但是还需要设计一个用来描述输出数据的格式,其中包含了应该如何将最终的数据写入到HDFS中。
有关Mapper的实现,可以参考Hadoop MapReduce编程:计算最大值,这里,为了计算极值,需要实现如下内容:
- Reducer输出类型
- Reducer输出规格说明的定义
- Mapper实现
- Reducer实现
- 配置Job
下面,我们详细说明如何去实现:
- Reducer输出类型
Reducer的输出Value类型应该定义两个字段:最大值和最小值,这样才能在最终的输出中同时看到同一个国家代码对应的极值数据。我们定义了Extremum类来代表极值,实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; public class Extremum implements Writable { private LongWritable maxValue; private LongWritable minValue; public Extremum() { super(); maxValue = new LongWritable(0); minValue = new LongWritable(0); } public Extremum(long min, long max) { super(); this.minValue = new LongWritable(min); this.maxValue = new LongWritable(max); } @Override public void readFields(DataInput in) throws IOException { minValue.readFields(in); maxValue.readFields(in); } @Override public void write(DataOutput out) throws IOException { minValue.write(out); maxValue.write(out); } public LongWritable getMaxValue() { return maxValue; } public void setMaxValue(LongWritable maxValue) { this.maxValue = maxValue; } public LongWritable getMinValue() { return minValue; } public void setMinValue(LongWritable minValue) { this.minValue = minValue; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append(minValue.get()).append("\t").append(maxValue.get()); return builder.toString(); } }
我们自己定义的类型必须实现Hadoop定义的Writable接口,这样才能够实用Hadoop的序列化机制,最终将数据写入到HDFS。该接口定义了两个方法,分别对应于序列化和反序列化操作。
这个自定义类型中,封装了最大值和最小值两个字段。
- Reducer输出规格说明的定义
Reducer输出,实际上就是我们写个这个Job的输出。我们定义了ExtremumOutputFormat类,该类描述Reducer输出规格的,你可以参考Hadoop自带的TextOutputFormat类,重写自带的getRecordWriter方法,来实现我们自己输出结果的操作。ExtremumOutputFormat类实现代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class ExtremumOutputFormat extends TextOutputFormat<Text, Extremum> { @Override public RecordWriter<Text, Extremum> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); // 是否压缩输出结果 String fieldSeparator= conf.get("mapred.textoutputformat.separator", "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); if (!isCompressed) { return new ExtremumRecordWriter(fileOut, fieldSeparator); } else { DataOutputStream out = new DataOutputStream(codec.createOutputStream(fileOut)); return new ExtremumRecordWriter(out, fieldSeparator); } } public static class ExtremumRecordWriter extends RecordWriter<Text, Extremum> { private static final String CHARSET = "UTF-8"; protected DataOutputStream out; private final byte[] fieldSeparator; private static final byte[] NEWLINE; static { try { NEWLINE = "\n".getBytes(CHARSET); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + CHARSET + " encoding."); } } public ExtremumRecordWriter(DataOutputStream out) { this(out, "\t"); } public ExtremumRecordWriter(DataOutputStream out, String fieldSeparator) { super(); this.out = out; try { this.fieldSeparator = fieldSeparator.getBytes(CHARSET); } catch (UnsupportedEncodingException e) { throw new IllegalArgumentException("can't find " + CHARSET + " encoding."); } } @Override public synchronized void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); } @Override public synchronized void write(Text key, Extremum value) throws IOException, InterruptedException { if(key != null) { out.write(key.getBytes(), 0, key.getLength()); out.write(fieldSeparator); } if(value !=null) { out.write(value.getMinValue().toString().getBytes()); out.write(fieldSeparator); out.write(value.getMaxValue().toString().getBytes()); } out.write(NEWLINE); } } }
我们实现的ExtremumOutputFormat类,在getRecordWriter方法中返回一个ExtremumRecordWriter实例,这个实例就是用来执行写入输出结果的,上面输出结果的格式就是“国家代码最小值最大值”,各个字段时间使用TAB分隔,一共三列。
- Mapper实现
Mapper实现就是解析一行文本数据,抽取出国家代码和成本估值,直接列出我们实现的ExtremunGlobalCostMapper累代码,如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ExtremunGlobalCostMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private LongWritable costValue = new LongWritable(1); private Text code = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // a line, such as 'SG 253654006139495 253654006164392 619850464' String line = value.toString(); String[] array = line.split("\\s"); if (array.length == 4) { String countryCode = array[0]; String strCost = array[3]; long cost = 0L; try { cost = Long.parseLong(strCost); } catch (NumberFormatException e) { cost = 0L; } if (cost != 0) { code.set(countryCode); costValue.set(cost); context.write(code, costValue); } } } }
- Reducer实现
Reducer实现也不是很复杂,需要注意的是,计算出最小值和最大值之后,将它们包装到非Hadoop定义的类型的实例中,这里是Extremum类。我们实现的ExtremumGlobalCostReducer类,代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ExtremumGlobalCostReducer extends Reducer<Text, LongWritable, Text, Extremum> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long max = 0L; long min = Long.MAX_VALUE; Iterator<LongWritable> iter = values.iterator(); while (iter.hasNext()) { LongWritable current = iter.next(); if (current.get() > max) { max = current.get(); } if (current.get() < min) { min = current.get(); } } Extremum extremum = new Extremum(min, max); context.write(key, extremum); } }
- 配置Job
配置Job,需要按照我们定义的输出值类型,以及输出规格说明来进行配置,我们实现的MapReduce程序的配置逻辑,实现类为ExtremumCostDriver,代码如下所示:
package org.shirdrn.kodz.inaction.hadoop.extremum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class ExtremumCostDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: extremumcost <in> <out>"); System.exit(2); } Job job = new Job(conf, "extremum cost"); job.setJarByClass(ExtremumCostDriver.class); job.setMapperClass(ExtremunGlobalCostMapper.class); job.setReducerClass(ExtremumGlobalCostReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Extremum.class); job.setOutputFormatClass(ExtremumOutputFormat.class); job.setNumReduceTasks(2); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); int exitFlag = job.waitForCompletion(true) ? 0 : 1; System.exit(exitFlag); } }
这里,一定要正确设置对应阶段的Key和Value输出类型,以及我们定义的输出规格描述类型。另外,我们设置了启动2个Reduce任务,最终会输出2个结果文件。
运行程序
下面看运行程序的过程:
- 编译代码(我直接使用Maven进行),打成jar文件
shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-extremum-cost.jar -C ./ org
- 拷贝上面生成的jar文件,到NameNode环境中
scp shirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-extremum-cost.jar ./xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/
- 上传待处理的数据文件
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/
- 运行我们编写MapReduce任务,计算极值
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-extremum-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.ExtremumCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/extremum/cost
- 运行输出
运行过程控制台输出内容,如下所示:
13/03/22 21:38:46 INFO input.FileInputFormat: Total input paths to process : 1 13/03/22 21:38:46 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/03/22 21:38:46 WARN snappy.LoadSnappy: Snappy native library not loaded 13/03/22 21:38:46 INFO mapred.JobClient: Running job: job_201303111631_0012 13/03/22 21:38:47 INFO mapred.JobClient: map 0% reduce 0% 13/03/22 21:39:03 INFO mapred.JobClient: map 21% reduce 0% 13/03/22 21:39:06 INFO mapred.JobClient: map 28% reduce 0% 13/03/22 21:39:18 INFO mapred.JobClient: map 48% reduce 4% 13/03/22 21:39:21 INFO mapred.JobClient: map 57% reduce 9% 13/03/22 21:39:34 INFO mapred.JobClient: map 78% reduce 14% 13/03/22 21:39:37 INFO mapred.JobClient: map 85% reduce 19% 13/03/22 21:39:49 INFO mapred.JobClient: map 100% reduce 23% 13/03/22 21:39:52 INFO mapred.JobClient: map 100% reduce 28% 13/03/22 21:40:01 INFO mapred.JobClient: map 100% reduce 30% 13/03/22 21:40:04 INFO mapred.JobClient: map 100% reduce 33% 13/03/22 21:40:07 INFO mapred.JobClient: map 100% reduce 66% 13/03/22 21:40:10 INFO mapred.JobClient: map 100% reduce 100% 13/03/22 21:40:15 INFO mapred.JobClient: Job complete: job_201303111631_0012 13/03/22 21:40:15 INFO mapred.JobClient: Counters: 29 13/03/22 21:40:15 INFO mapred.JobClient: Job Counters 13/03/22 21:40:15 INFO mapred.JobClient: Launched reduce tasks=2 13/03/22 21:40:15 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=98141 13/03/22 21:40:15 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 13/03/22 21:40:15 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 13/03/22 21:40:15 INFO mapred.JobClient: Launched map tasks=7 13/03/22 21:40:15 INFO mapred.JobClient: Data-local map tasks=7 13/03/22 21:40:15 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=111222 13/03/22 21:40:15 INFO mapred.JobClient: File Output Format Counters 13/03/22 21:40:15 INFO mapred.JobClient: Bytes Written=4339 13/03/22 21:40:15 INFO mapred.JobClient: FileSystemCounters 13/03/22 21:40:15 INFO mapred.JobClient: FILE_BYTES_READ=260079964 13/03/22 21:40:15 INFO mapred.JobClient: HDFS_BYTES_READ=448913653 13/03/22 21:40:15 INFO mapred.JobClient: FILE_BYTES_WRITTEN=390199096 13/03/22 21:40:15 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=4339 13/03/22 21:40:15 INFO mapred.JobClient: File Input Format Counters 13/03/22 21:40:15 INFO mapred.JobClient: Bytes Read=448912799 13/03/22 21:40:15 INFO mapred.JobClient: Map-Reduce Framework 13/03/22 21:40:15 INFO mapred.JobClient: Map output materialized bytes=130000084 13/03/22 21:40:15 INFO mapred.JobClient: Map input records=10000000 13/03/22 21:40:15 INFO mapred.JobClient: Reduce shuffle bytes=116610358 13/03/22 21:40:15 INFO mapred.JobClient: Spilled Records=30000000 13/03/22 21:40:15 INFO mapred.JobClient: Map output bytes=110000000 13/03/22 21:40:15 INFO mapred.JobClient: CPU time spent (ms)=121520 13/03/22 21:40:15 INFO mapred.JobClient: Total committed heap usage (bytes)=1763442688 13/03/22 21:40:15 INFO mapred.JobClient: Combine input records=0 13/03/22 21:40:15 INFO mapred.JobClient: SPLIT_RAW_BYTES=854 13/03/22 21:40:15 INFO mapred.JobClient: Reduce input records=10000000 13/03/22 21:40:15 INFO mapred.JobClient: Reduce input groups=233 13/03/22 21:40:15 INFO mapred.JobClient: Combine output records=0 13/03/22 21:40:15 INFO mapred.JobClient: Physical memory (bytes) snapshot=1973850112 13/03/22 21:40:15 INFO mapred.JobClient: Reduce output records=233 13/03/22 21:40:15 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4880068608 13/03/22 21:40:15 INFO mapred.JobClient: Map output records=10000000
- 验证输出结果
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat /user/xiaoxiang/output/extremum/cost/part-r-00000 121 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat /user/xiaoxiang/output/extremum/cost/part-r-00001 112 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat /user/xiaoxiang/output/extremum/cost/part-r-00000 AD 43328 999974516 AF 11795 999996180 AL 11148 999998489 AR 33649 999953989 AT 5051 999999909 AZ 3726 999996557 BA 89066 999949773 BE 28187 999925057 BG 64672 999971528 BI 50687 999978516 BM 14127 999991925 BO 61786 999995482 BS 52428 999980931 BW 78739 999935985 BY 39216 999998496 CD 51328 999978139 CF 5084 999995342 CH 17996 999997524 CL 359 999967083 CN 17985 999975367 CR 17435 999971685 CV 16166 999990543 CX 74232 999987579 CZ 1400 999993908 DE 1676 999985416 DK 84072 999963312 DM 57727 999941706 DO 9162 999945597 ER 60594 999980425 ET 21273 999987033 FI 19268 999966243 FK 39074 999966573 FM 9493 999972146 FO 47655 999988472 GB 12629 999970658 GD 7713 999996318 GF 42623 999982024 GH 9904 999941039 GL 16512 999948726 GN 2779 999951804 GP 64525 999904645 GR 44233 999999672 GT 159 999972984 HK 24464 999970084 HU 23165 999997568 ID 19812 999994762 IL 14553 999982184 IN 25241 999914991 IR 5173 999986780 IT 19399 999997239 JM 30194 999982209 JO 5149 999977276 KH 5010 999975644 KN 597 999991068 KP 61560 999967939 KR 13295 999992162 KZ 5565 999992835 LA 4002 999989151 LC 8509 999962233 LI 54653 999986863 LK 12556 999989876 LS 6702 999957706 LU 17627 999999823 LY 41618 999992365 MD 8494 999996042 MH 1050 999989668 ML 30858 999990079 MN 4236 999969051 MP 8422 999995234 MR 14023 999982303 MT 91203 999982604 MV 15186 999961206 MX 15807 999978066 MZ 14800 999981189 NA 7673 999961177 NC 9467 999961053 NE 1802 999990091 NG 5189 999985037 NI 29440 999965733 NO 20466 999993122 NU 17175 999987046 PA 51054 999924435 PE 10286 999981176 PG 119327 999987347 PK 1041 999954268 PM 1435 999998975 PW 40353 999991278 PY 9586 999985509 RE 5640 999952291 RO 7139 999994148 RS 1342 999999923 RU 11319 999894985 RW 815 999980184 SB 15446 999972832 SD 14060 999963744 SH 1276 999983638 SL 35087 999999269 SN 117358 999990278 SR 12974 999975964 ST 3796 999980447 SV 20563 999999945 SX 41237 999903445 SZ 18993 999992537 TC 46396 999969540 TG 58484 999977640 TK 11880 999971131 TM 25103 999958998 TO 40829 999947915 TW 5437 999975092 UZ 2620 999982762 VA 774 999975548 VC 9514 999991495 VE 12156 999997971 VG 32832 999949690 VI 895 999990063 VU 1375 999953162 WF 62709 999947666 YT 29640 999994707 ZA 8399 999998692 ZM 25699 999973392 ZW 77984 999928087 xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat /user/xiaoxiang/output/extremum/cost/part-r-00001 AE 1870 999938630 AG 7701 999991085 AI 8609 999989595 AM 16681 999976746 AO 17126 999989628 AQ 40493 999995031 AS 3096 999935982 AU 13311 999937089 AW 1734 999965784 BB 27334 999987345 BD 27928 999992272 BF 20745 999999220 BH 4980 999994900 BJ 11385 999977886 BN 33440 999986630 BR 56601 999989947 BT 84033 999977488 BZ 27250 999975972 CA 54534 999978275 CC 753 999968311 CG 10644 999788112 CI 17263 999998864 CK 7710 999968719 CM 12402 999998369 CO 7616 999999167 CU 30200 999976352 CW 6597 999987713 CY 5686 999982925 DJ 30348 999997438 DZ 31633 999973610 EC 15342 999920447 EE 3834 999949534 EG 60885 999980522 ES 540 999949155 FJ 14016 999990686 FR 14682 999988342 GA 54051 999982099 GE 25641 999991970 GI 9387 999995295 GM 12637 999967823 GQ 13033 999988635 GU 68211 999919056 GW 34868 999962551 GY 4633 999999881 HN 18264 999972628 HR 18192 999986688 HT 2732 999970913 IE 23339 999996686 IM 47842 999987831 IO 2457 999968575 IQ 75 999990126 IS 24153 999973585 JP 8463 999983684 KE 45373 999996012 KG 5037 999991556 KI 1146 999994328 KM 41461 999989895 KW 20441 999924295 KY 23539 999977105 LB 15169 999963014 LR 76821 999897202 LT 346 999999688 LV 17147 999945411 MA 22225 999922726 MC 4131 999978886 MG 15255 999996602 MK 21709 999968900 MM 33329 999987977 MO 19139 999977975 MQ 57391 999913110 MS 38541 999974690 MU 51654 999988632 MW 16153 999991903 MY 1575 999995010 NF 9028 999989399 NL 22375 999949789 NP 15809 999972410 NR 11740 999956464 NZ 4921 999998214 OM 20128 999967428 PF 8544 999959978 PH 58869 999981534 PL 7105 999996619 PR 34529 999906386 PT 50257 999993404 QA 33588 999995061 SA 54320 999973822 SC 64674 999973271 SE 13044 999972256 SG 20304 999977637 SI 17186 999980580 SK 7472 999998152 SM 5636 999941188 SO 52133 999973175 SY 36761 999988858 TD 98111 999999303 TH 11173 999968746 TJ 19322 999983666 TN 40668 999963035 TP 40353 999986796 TR 4495 999995112 TT 32136 999984435 TV 40558 999971989 TZ 5852 999992734 UA 12571 999970993 UG 4142 999976267 UM 5465 999998377 US 52833 999912229 UY 12782 999989662 VN 45107 999974393 WS 19808 999970242 YE 3291 999984650
可见,结果符合预期。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。