Hadoop MapReduce编程:计算最大值

其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。

测试数据

我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:

SG 253654006139495 253654006164392 619850464
KG 253654006225166 253654006252433 743485698
UZ 253654006248058 253654006271941 570409379
TT 253654006282019 253654006286839 23236775
BE 253654006276984 253654006301435 597874033
BO 253654006293624 253654006315946 498265375
SR 253654006308428 253654006330442 484613339
SV 253654006320312 253654006345405 629640166
LV 253654006330384 253654006359891 870680704
FJ 253654006351709 253654006374468 517965666

上面文本数据一行一行存储,一行包含4部分,分别表示:

  1. 国家代码
  2. 起始时间
  3. 截止时间
  4. 随机成本/权重估值

各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。

编程实现

因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.extremum.max;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class GlobalCostMapper extends
		Mapper<LongWritable, Text, Text, LongWritable> {

	private final static LongWritable costValue = new LongWritable(0);
	private Text code = new Text();

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// a line, such as 'SG 253654006139495 253654006164392 619850464'
		String line = value.toString();
		String[] array = line.split("\\s");
		if (array.length == 4) {
			String countryCode = array[0];
			String strCost = array[3];
			long cost = 0L;
			try {
				cost = Long.parseLong(strCost);
			} catch (NumberFormatException e) {
				cost = 0L;
			}
			if (cost != 0) {
				code.set(countryCode);
				costValue.set(cost);
				context.write(code, costValue);
			}
		}
	}
}

上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。
接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.extremum.max;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class GlobalCostReducer extends
		Reducer<Text, LongWritable, Text, LongWritable> {

	@Override
	protected void reduce(Text key, Iterable<LongWritable> values,
			Context context) throws IOException, InterruptedException {
		long max = 0L;
		Iterator<LongWritable> iter = values.iterator();
		while (iter.hasNext()) {
			LongWritable current = iter.next();
			if (current.get() > max) {
				max = current.get();
			}
		}
		context.write(key, new LongWritable(max));
	}
}

上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。
下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:

package org.shirdrn.kodz.inaction.hadoop.extremum.max;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class GlobalMaxCostDriver {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: maxcost <in> <out>");
			System.exit(2);
		}

		Job job = new Job(conf, "max cost");

		job.setJarByClass(GlobalMaxCostDriver.class);
		job.setMapperClass(GlobalCostMapper.class);
		job.setCombinerClass(GlobalCostReducer.class);
		job.setReducerClass(GlobalCostReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

		int exitFlag = job.waitForCompletion(true) ? 0 : 1;
		System.exit(exitFlag);
	}
}

运行程序

首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:

  • 编译代码(我直接使用Maven进行),打成jar文件
shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-max-cost.jar -C ./ org
  • 拷贝上面生成的jar文件,到NameNode环境中
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ scp shirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-max-cost.jar ./
global-max-cost.jar
  • 上传待处理的数据文件
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/
  • 运行我们编写MapReduce任务,计算最大值
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-max-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.max.GlobalMaxCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/cost

运行过程控制台输出内容,大概如下所示:

13/03/22 16:30:16 INFO input.FileInputFormat: Total input paths to process : 1
13/03/22 16:30:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/03/22 16:30:16 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/22 16:30:16 INFO mapred.JobClient: Running job: job_201303111631_0004
13/03/22 16:30:17 INFO mapred.JobClient:  map 0% reduce 0%
13/03/22 16:30:33 INFO mapred.JobClient:  map 22% reduce 0%
13/03/22 16:30:36 INFO mapred.JobClient:  map 28% reduce 0%
13/03/22 16:30:45 INFO mapred.JobClient:  map 52% reduce 9%
13/03/22 16:30:48 INFO mapred.JobClient:  map 57% reduce 9%
13/03/22 16:30:57 INFO mapred.JobClient:  map 80% reduce 9%
13/03/22 16:31:00 INFO mapred.JobClient:  map 85% reduce 19%
13/03/22 16:31:10 INFO mapred.JobClient:  map 100% reduce 28%
13/03/22 16:31:19 INFO mapred.JobClient:  map 100% reduce 100%
13/03/22 16:31:24 INFO mapred.JobClient: Job complete: job_201303111631_0004
13/03/22 16:31:24 INFO mapred.JobClient: Counters: 29
13/03/22 16:31:24 INFO mapred.JobClient:   Job Counters
13/03/22 16:31:24 INFO mapred.JobClient:     Launched reduce tasks=1
13/03/22 16:31:24 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=76773
13/03/22 16:31:24 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/22 16:31:24 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/03/22 16:31:24 INFO mapred.JobClient:     Launched map tasks=7
13/03/22 16:31:24 INFO mapred.JobClient:     Data-local map tasks=7
13/03/22 16:31:24 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=40497
13/03/22 16:31:24 INFO mapred.JobClient:   File Output Format Counters
13/03/22 16:31:24 INFO mapred.JobClient:     Bytes Written=3029
13/03/22 16:31:24 INFO mapred.JobClient:   FileSystemCounters
13/03/22 16:31:24 INFO mapred.JobClient:     FILE_BYTES_READ=142609
13/03/22 16:31:24 INFO mapred.JobClient:     HDFS_BYTES_READ=448913653
13/03/22 16:31:24 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=338151
13/03/22 16:31:24 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=3029
13/03/22 16:31:24 INFO mapred.JobClient:   File Input Format Counters
13/03/22 16:31:24 INFO mapred.JobClient:     Bytes Read=448912799
13/03/22 16:31:24 INFO mapred.JobClient:   Map-Reduce Framework
13/03/22 16:31:24 INFO mapred.JobClient:     Map output materialized bytes=21245
13/03/22 16:31:24 INFO mapred.JobClient:     Map input records=10000000
13/03/22 16:31:24 INFO mapred.JobClient:     Reduce shuffle bytes=18210
13/03/22 16:31:24 INFO mapred.JobClient:     Spilled Records=12582
13/03/22 16:31:24 INFO mapred.JobClient:     Map output bytes=110000000
13/03/22 16:31:24 INFO mapred.JobClient:     CPU time spent (ms)=80320
13/03/22 16:31:24 INFO mapred.JobClient:     Total committed heap usage (bytes)=1535639552
13/03/22 16:31:24 INFO mapred.JobClient:     Combine input records=10009320
13/03/22 16:31:24 INFO mapred.JobClient:     SPLIT_RAW_BYTES=854
13/03/22 16:31:24 INFO mapred.JobClient:     Reduce input records=1631
13/03/22 16:31:24 INFO mapred.JobClient:     Reduce input groups=233
13/03/22 16:31:24 INFO mapred.JobClient:     Combine output records=10951
13/03/22 16:31:24 INFO mapred.JobClient:     Physical memory (bytes) snapshot=1706708992
13/03/22 16:31:24 INFO mapred.JobClient:     Reduce output records=233
13/03/22 16:31:24 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=4316872704
13/03/22 16:31:24 INFO mapred.JobClient:     Map output records=10000000
  • 验证Job结果输出
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -cat /user/xiaoxiang/output/cost/part-r-00000
AD     999974516
AE     999938630
AF     999996180
AG     999991085
AI     999989595
AL     999998489
AM     999976746
AO     999989628
AQ     999995031
AR     999953989
AS     999935982
AT     999999909
AU     999937089
AW     999965784
AZ     999996557
BA     999949773
BB     999987345
BD     999992272
BE     999925057
BF     999999220
BG     999971528
BH     999994900
BI     999978516
BJ     999977886
BM     999991925
BN     999986630
BO     999995482
BR     999989947
BS     999980931
BT     999977488
BW     999935985
BY     999998496
BZ     999975972
CA     999978275
CC     999968311
CD     999978139
CF     999995342
CG     999788112
CH     999997524
CI     999998864
CK     999968719
CL     999967083
CM     999998369
CN     999975367
CO     999999167
CR     999971685
CU     999976352
CV     999990543
CW     999987713
CX     999987579
CY     999982925
CZ     999993908
DE     999985416
DJ     999997438
DK     999963312
DM     999941706
DO     999945597
DZ     999973610
EC     999920447
EE     999949534
EG     999980522
ER     999980425
ES     999949155
ET     999987033
FI     999966243
FJ     999990686
FK     999966573
FM     999972146
FO     999988472
FR     999988342
GA     999982099
GB     999970658
GD     999996318
GE     999991970
GF     999982024
GH     999941039
GI     999995295
GL     999948726
GM     999967823
GN     999951804
GP     999904645
GQ     999988635
GR     999999672
GT     999972984
GU     999919056
GW     999962551
GY     999999881
HK     999970084
HN     999972628
HR     999986688
HT     999970913
HU     999997568
ID     999994762
IE     999996686
IL     999982184
IM     999987831
IN     999914991
IO     999968575
IQ     999990126
IR     999986780
IS     999973585
IT     999997239
JM     999982209
JO     999977276
JP     999983684
KE     999996012
KG     999991556
KH     999975644
KI     999994328
KM     999989895
KN     999991068
KP     999967939
KR     999992162
KW     999924295
KY     999977105
KZ     999992835
LA     999989151
LB     999963014
LC     999962233
LI     999986863
LK     999989876
LR     999897202
LS     999957706
LT     999999688
LU     999999823
LV     999945411
LY     999992365
MA     999922726
MC     999978886
MD     999996042
MG     999996602
MH     999989668
MK     999968900
ML     999990079
MM     999987977
MN     999969051
MO     999977975
MP     999995234
MQ     999913110
MR     999982303
MS     999974690
MT     999982604
MU     999988632
MV     999961206
MW     999991903
MX     999978066
MY     999995010
MZ     999981189
NA     999961177
NC     999961053
NE     999990091
NF     999989399
NG     999985037
NI     999965733
NL     999949789
NO     999993122
NP     999972410
NR     999956464
NU     999987046
NZ     999998214
OM     999967428
PA     999924435
PE     999981176
PF     999959978
PG     999987347
PH     999981534
PK     999954268
PL     999996619
PM     999998975
PR     999906386
PT     999993404
PW     999991278
PY     999985509
QA     999995061
RE     999952291
RO     999994148
RS     999999923
RU     999894985
RW     999980184
SA     999973822
SB     999972832
SC     999973271
SD     999963744
SE     999972256
SG     999977637
SH     999983638
SI     999980580
SK     999998152
SL     999999269
SM     999941188
SN     999990278
SO     999973175
SR     999975964
ST     999980447
SV     999999945
SX     999903445
SY     999988858
SZ     999992537
TC     999969540
TD     999999303
TG     999977640
TH     999968746
TJ     999983666
TK     999971131
TM     999958998
TN     999963035
TO     999947915
TP     999986796
TR     999995112
TT     999984435
TV     999971989
TW     999975092
TZ     999992734
UA     999970993
UG     999976267
UM     999998377
US     999912229
UY     999989662
UZ     999982762
VA     999975548
VC     999991495
VE     999997971
VG     999949690
VI     999990063
VN     999974393
VU     999953162
WF     999947666
WS     999970242
YE     999984650
YT     999994707
ZA     999998692
ZM     999973392
ZW     999928087

可见,结果是我们所期望的。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>