`
jsh0401
  • 浏览: 10951 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

mapred代码示例--map阶段使用combiner(归约)

 
阅读更多
package combiner;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
* 在mapper端reduce,map产生的数据量减少
* reducer是reduce所有mapper
* 不是所有算法都适合,允许多次reduce的算法适合
*
* @author Administrator
*
*/
public class WordCountApp {

private static String INPUT_PATH = "hdfs://hadoop:9000/in/hello";
private static String OUT_PATH = "hdfs://hadoop:9000/out";

/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
fileSystem.delete(new Path(OUT_PATH), true);

Job job = new Job(conf);
job.setJarByClass(WordCountApp.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
//归约
job.setCombinerClass(MyCombiner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.waitForCompletion(true);
}

public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {

final String[] splited = value.toString().split("\t");
for (String word : splited) {
final Text key2 = new Text(word);
final LongWritable value2 = new LongWritable(1L);
context.write(key2, value2);
}
};
}

public static class MyReducer extends Reducer<Text,LongWritable, Text,LongWritable>{
protected void reduce(Text key2, java.lang.Iterable<LongWritable> value2s, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
Long sum = 0L;
for (LongWritable value2 : value2s) {
sum += value2.get();
}
context.write(key2, new LongWritable(sum));
};
}

public static class MyCombiner extends Reducer<Text,LongWritable, Text,LongWritable>{ protected void reduce(Text key2, java.lang.Iterable<LongWritable> value2s, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
Long sum = 0L;
for (LongWritable value2 : value2s) {
sum += value2.get();
}
context.write(key2, new LongWritable(sum));
};
}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics