package old;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
* 旧的api写法
*
* @author Administrator
*
*/
public class WordCount {
private static final String INPUT_PATH = "hdfs://hadoop:9000/in/hello";
private static final String OUT_PATH = "hdfs://hadoop:9000/oldout";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
JobConf job = new JobConf(conf);
job.setJarByClass(WordCount.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//在定义一个FileSystem变量的时候伪分布式和单机版的方法是不一样的,单机版使用的是FileSystem类的静态函数
//FileSystem hdfs = FileSystem.get(conf)
//不然的话会出现如下错误
//Wrong FS: hdfs://localhost:9000/home/hadoop/hadoop, expected: file:///
//伪分布式下需要使用Path来获得
//Path dstDir = new Path("hdfs://hadoop:9000/");
//FileSystem fs = dstDir.getFileSystem(conf);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH), true);
}
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormat(TextOutputFormat.class);
JobClient.runJob(job);
}
public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{
Text k2 = new Text();
LongWritable v2 = new LongWritable();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
String[] splited = value.toString().split("\t");
for (String split : splited) {
k2.set(split);
v2.set(1L);
output.collect(k2, v2);
}
}
}
public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{
Text k3 = new Text();
LongWritable v3 = new LongWritable();
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
long times = 0;
while (values.hasNext()) {
LongWritable v2 = (LongWritable) values.next();
times += v2.get();
}
v3.set(times);
output.collect(key, v3);
}
}
}
分享到:
相关推荐
avro-mapred-1.7.7-hadoop2.jar
在MapReduce任务中读取Avro文件,会使用到avro-mapred.jar。 然而目前的avro-mapred.jar是基于较老的版本的,使用时会报错: org.apache.hadoop.mapred.YarnChild: Error running child : java.lang....
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用