简单演示,主要是TableMapper和TableReducer的使用
读取hdfs数据到hbase中,测试数据文件见附件
package hbase;
import java.sql.Date;
import java.text.SimpleDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class HDFSToHBase {
private static final String INPUT_PATH = "hdfs://hadoop:9000/in/kpi";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置zookeeper
conf.set("hbase.zookeeper.quorum", "hadoop");
//设置hbase表名称
conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
//将该值改大,防止hbase超时退出
conf.set("dfs.socket.timeout", "180000");
Job job = new Job(conf);
//1.读取文件
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
//2.设置自己的map
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
protected void map(LongWritable k1, Text v1, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable,Text>.Context context) throws java.io.IOException ,InterruptedException {
String[] splited = v1.toString().split("\t");
try{
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
String datestr = df.format(new Date(Long.parseLong(splited[0].trim())));
String rowkey = splited[1]+":"+datestr;
context.write(k1, new Text(rowkey+"\t"+v1));
}catch(NumberFormatException e){
final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
counter.increment(1L);
System.out.println("出错了"+splited[0]+" "+e.getMessage());
}
}
}
static class MyReducer extends TableReducer<LongWritable, Text, LongWritable>{
protected void reduce(LongWritable k2, java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<LongWritable,Text,LongWritable,org.apache.hadoop.io.Writable>.Context context) throws java.io.IOException ,InterruptedException {
for(Text v2:v2s){
String[] splited = v2.toString().split("\t");
Put put = new Put(Bytes.toBytes(splited[0]));
//导出各列
put.add(Bytes.toBytes("f1"), Bytes.toBytes("time"), Bytes.toBytes(splited[1]));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("mobile"), Bytes.toBytes(splited[2]));
context.write(k2, put);
}
};
}
}
------------------------------------------------------------------
从hbase读取数据到hdfs
package hbase;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HBaseToHDFS {
private static final String OUT_PATH = "hdfs://hadoop:9000/out/kpi";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置zookeeper
conf.set("hbase.zookeeper.quorum", "hadoop");
//设置hbase表名称
conf.set(TableInputFormat.INPUT_TABLE, "wlan_log");
//将该值改大,防止hbase超时退出
conf.set("dfs.socket.timeout", "180000");
Job job = new Job(conf);
job.setInputFormatClass(TableInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//job.setReducerClass(MyReducer.class);
//job.setOutputFormatClass(TableOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
FileSystem fs = FileSystem.get(new URI(OUT_PATH),conf);
if(fs.exists(new Path(OUT_PATH))){
fs.delete(new Path(OUT_PATH), true);
}
job.waitForCompletion(true);
}
static class MyMapper extends TableMapper<Text, Text>{
protected void map(org.apache.hadoop.hbase.io.ImmutableBytesWritable key,
org.apache.hadoop.hbase.client.Result result,
org.apache.hadoop.mapreduce.Mapper<org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result,Text,Text>.Context context) throws IOException ,InterruptedException {
String mobile = new String(result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("mobile")));
System.out.println(key +"----------"+mobile);
context.write(new Text(mobile), new Text(mobile));
};
}
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
protected void reduce(Text arg0, java.lang.Iterable<LongWritable> arg1, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,LongWritable>.Context arg2) throws IOException ,InterruptedException {
};
}
}
分享到:
相关推荐
18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28.Flume 29.Kafka 30.Kafka开发 31.Strom 32.Spark入门之...
技术点56 通过MapReduce 对Bloom filter 进行semi-join 7.3 本章小结 8 结合R 和Hadoop 进行数据统计 8.1 比较R 和MapReduce 集成的几种方法 8.2 R 基础知识 8.3 R 和Streaming 8.3.1 Streaming 和...
技术点5 使用Sqoop 从MySQL 导入数据 2.2.4 HBase 技术点6 HBase 导入HDFS 技术点7 将HBase 作为MapReduce 的数据源2.3 将数据导出Hadoop 2.3.1 将数据导入本地文件系统技术点8 自动复制HDFS 中的文件...
sqoop : 将 hdfs 文件系统的文件 ,导出到 linux 文件系统的文件中 ,就像 "豌豆荚" 应用程序 ,实现 android 系统和 windows 系统之间的导入和导出 7. ooize/azkaban : 一个完整的业务 ( work ) 是由多个任务 ( task...
Sqoop是SQL-to-Hadoop的缩写,是Hadoop的周边工具,它的主要作用是在结构化数据存储...Sqoop充分利用了Hadoop的优点,整个数据导入导出过程都是用MapReduce实现并行化,同时,该过程中的大部分步骤自动执行,非常方便。
18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28.Flume 29.Kafka 30.Kafka开发 31.Strom 32.Spark入门之...
18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28.Flume 29.Kafka 30.Kafka开发 31.Strom 32.Spark入门之...
18.HBase 19.Pig 20.Pig Latin 21.Pig模式与函数 22.Zookeeper 23.Zookeeper服务 24.使用Zookeeper构建应用 25.Sqoop 26.深入Sqoop的导入 27.深入Sqoop导出 28.Flume 29.Kafka 30.Kafka开发 31.Strom 32.Spark入门之...
当大数据存储和Hadoop生态系统的MapReduce,Hive,HBase等分析器出现时,他们需要一种工具来与关系数据库服务器进行交互,以导入和导出驻留在其中的大数据。Sqoop在Hadoop生态系统中占据一席之地,为关系数据库...
第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 ...
MapReduce简介;Hadoop分布式文件系统;Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何构建Hadoop集群,如何管理Hadoop;Pig简介;Hbase简介;Hive简介;...
此外,Hadoop广义上指的是一个更广泛的概念,即Hadoop生态系统,其中还包括了Hive数据仓库工具、HBase非关系型数据库、Zookeeper分布式协调服务、Kafka消息队列、Sqoop数据导入导出等其他组件。 Hadoop的创始人是...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...
大数据与云计算教程课件 优质大数据课程 18.HBase(共43页).pptx 大数据与云计算教程课件 优质大数据课程 19.Pig(共33页).pptx 大数据与云计算教程课件 优质大数据课程 20.Pig Latin(共36页).pptx 大数据与...