博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop基础-MapReduce的常用文件格式介绍
阅读量:6312 次
发布时间:2019-06-22

本文共 14304 字,大约阅读时间需要 47 分钟。

              Hadoop基础-MapReduce的常用文件格式介绍  

                                    作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

 

 

一.MR文件格式-SequenceFile

1>.生成SequenceFile文件(SequenceFileOutputFormat)

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
word.txt 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.output; 7  8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Mapper;11 12 import java.io.IOException;13 14 public class SeqMapper extends Mapper
{15 16 @Override17 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {18 19 context.write(key,value);20 21 22 }23 }
SeqMapper.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.output; 7  8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem;10 import org.apache.hadoop.fs.Path;11 import org.apache.hadoop.io.LongWritable;12 import org.apache.hadoop.io.SequenceFile;13 import org.apache.hadoop.io.Text;14 import org.apache.hadoop.mapreduce.Job;15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;18 19 /**20  * 把wc.txt变为SequenceFile21  * k-偏移量-LongWritable22  * v-一行文本-Text23  */24 public class SeqApp {25 26     public static void main(String[] args) throws Exception {27 28         Configuration conf = new Configuration();29         conf.set("fs.defaultFS","file:///");30         FileSystem fs = FileSystem.get(conf);31         Job job = Job.getInstance(conf);32 33         job.setJobName("Seq-Out");34         job.setJarByClass(SeqApp.class);35 36         //设置输出格式,这里的输出格式要和咱们Mapper程序的格式要一致哟!37         job.setOutputKeyClass(LongWritable.class);38         job.setOutputValueClass(Text.class);39 40         job.setMapperClass(SeqMapper.class);41 42         FileInputFormat.addInputPath(job, new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\word.txt"));43 44         Path outPath = new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\seqout");45         if (fs.exists(outPath)){46             fs.delete(outPath);47         }48         FileOutputFormat.setOutputPath(job,outPath);49 50         //设置文件输出格式为SequenceFile51         job.setOutputFormatClass(SequenceFileOutputFormat.class);52 53         //设置SeqFile的压缩类型为块压缩54         SequenceFileOutputFormat.setOutputCompressionType(job,SequenceFile.CompressionType.BLOCK);55 56         //以上设置参数完毕后,我们通过下面这行代码就开始运行job57         job.waitForCompletion(true);58     }59 }

  运行以上代码之后,我们可以去输出目录通过hdfs命令查看生成的SequenceFile文件内容,具体操作如下:

2>.对SequenceFile文件进行单词统计测试(SequenceFileInputFormat)

  我们就不用去可以找具体的SequenceFile啦,我们直接用上面生成的Sequence进行测试,具体代码如下:

1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.input; 7  8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Mapper;12 13 import java.io.IOException;14 15 public class SeqMapper extends Mapper
{16 17 @Override18 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {19 20 String line = value.toString();21 String[] arr = line.split(" ");22 for(String word: arr){23 context.write(new Text(word),new IntWritable(1));24 25 }26 27 28 }29 }
SeqMapper.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.input; 7  8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Reducer;11 import java.io.IOException;12 13 public class SeqReducer extends Reducer
{14 protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {15 Integer sum = 0;16 for (IntWritable value : values) {17 sum += value.get();18 }19 context.write(key, new IntWritable(sum));20 }21 }
SeqReducer.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.input; 7  8  9 import org.apache.hadoop.conf.Configuration;10 import org.apache.hadoop.fs.FileSystem;11 import org.apache.hadoop.fs.Path;12 import org.apache.hadoop.io.IntWritable;13 import org.apache.hadoop.io.Text;14 import org.apache.hadoop.mapreduce.Job;15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;16 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;18 19 public class SeqApp  {20     public static void main(String[] args) throws Exception {21         Configuration conf = new Configuration();22         conf.set("fs.defaultFS","file:///");23         FileSystem fs = FileSystem.get(conf);24         Job job = Job.getInstance(conf);25         job.setJobName("Seq-in");26         job.setJarByClass(SeqApp.class);27         job.setOutputKeyClass(Text.class);28         job.setOutputValueClass(IntWritable.class);29         job.setMapperClass(SeqMapper.class);30         job.setReducerClass(SeqReducer.class);31         //将我们生成的SequenceFile文件作为输入32         FileInputFormat.addInputPath(job, new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\seqout"));33         Path outPath = new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\out");34         if (fs.exists(outPath)){35             fs.delete(outPath);36         }37         FileOutputFormat.setOutputPath(job, outPath);38         //设置输入格式39         job.setInputFormatClass(SequenceFileInputFormat.class);40         //以上设置参数完毕后,我们通过下面这行代码就开始运行job41         job.waitForCompletion(true);42     }43 }

  运行以上代码之后,我们可以查看输出的单词统计情况,具体操作如下:

 

二.MR文件格式-DB

1>.创建数据库表信息

create database yinzhengjie;use yinzhengjie;create table wordcount(id int,line varchar(100));insert into wordcount values(1,'hello my name is yinzhengjie');insert into wordcount values(2,'I am a good boy');create table wordcount2(word varchar(100),count int);

 

2>.编写代码

1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7  8 import org.apache.hadoop.io.Writable; 9 import org.apache.hadoop.mapreduce.lib.db.DBWritable;10 11 import java.io.DataInput;12 import java.io.DataOutput;13 import java.io.IOException;14 import java.sql.PreparedStatement;15 import java.sql.ResultSet;16 import java.sql.SQLException;17 18 /**19  *  设置数据对应的格式,需要实现两个接口,即Writable, DBWritable。20  */21 public class MyDBWritable implements Writable, DBWritable {22 23     //注意 : 这里我们定义了2个私有属性,这两个属性分别对应的数据库中的字段,id和line24     private int id;25     private String line;26 27 28     //wrutable串行化29     public void write(DataOutput out) throws IOException {30         out.writeInt(id);31         out.writeUTF(line);32     }33 34     //writable反串行化,注意反串行化的顺序要和串行化的顺序保持一致35     public void readFields(DataInput in) throws IOException {36         id = in.readInt();37         line = in.readUTF();38 39     }40 41 42     //DB串行化,设置值的操作43     public void write(PreparedStatement st) throws SQLException {44         //指定表中的第一列为id列45         st.setInt(1, id);46         //指定表中的第二列为line列47         st.setString(2,line);48 49     }50 51     //DB反串行,赋值操作52     public void readFields(ResultSet rs) throws SQLException {53         //读取数据库的第一列,我们赋值给id54         id = rs.getInt(1);55         //读取数据库的第二列,我们赋值给line56         line = rs.getString(2);57     }58 59     public int getId() {60         return id;61     }62 63     public void setId(int id) {64         this.id = id;65     }66 67     public String getLine() {68         return line;69     }70 71     public void setLine(String line) {72         this.line = line;73     }74 }
MyDBWritable.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7  8 import org.apache.hadoop.io.Writable; 9 import org.apache.hadoop.mapreduce.lib.db.DBWritable;10 11 import java.io.DataInput;12 import java.io.DataOutput;13 import java.io.IOException;14 import java.sql.PreparedStatement;15 import java.sql.ResultSet;16 import java.sql.SQLException;17 18 public class MyDBWritable2 implements Writable, DBWritable {19     //这两个属性分别对应的数据库中的字段,word和count分别对应的是输出表中的字段哟。20     private String word;21     private int count;22     //wrutable串行化23     public void write(DataOutput out) throws IOException {24         out.writeUTF(word);25         out.writeInt(count);26     }27     //writable反串行化28     public void readFields(DataInput in) throws IOException {29         word = in.readUTF();30         count = in.readInt();31 32     }33     //DB串行化34     public void write(PreparedStatement st) throws SQLException {35         st.setString(1,word);36         st.setInt(2,count);37 38     }39     //DB反串行40     public void readFields(ResultSet rs) throws SQLException {41         word = rs.getString(1);42         count = rs.getInt(2);43     }44     public String getWord() {45         return word;46     }47     public void setWord(String word) {48         this.word = word;49     }50     public int getCount() {51         return count;52     }53     public void setCount(int count) {54         this.count = count;55     }56 }
MyDBWritable2.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7  8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Mapper;12 13 import java.io.IOException;14 15 /**16  * 注意MyDBWritable为数据库输入格式哟17  */18 public class DBMapper extends Mapper
{19 @Override20 protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {21 String line = value.getLine();22 String[] arr = line.split(" ");23 for(String word : arr){24 context.write(new Text(word), new IntWritable(1));25 }26 }27 }
DBMapper.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7  8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.NullWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Reducer;12 13 import java.io.IOException;14 15 public class DBReducer extends Reducer
{16 protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {17 Integer sum = 0;18 for (IntWritable value : values) {19 sum += value.get();20 }21 MyDBWritable2 db = new MyDBWritable2();22 //设置需要往数据表中写入数据的值23 db.setWord(key.toString());24 db.setCount(sum);25 //将数据写到到数据库中26 context.write(db,NullWritable.get());27 }28 }
DBReducer.java 文件内容
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7  8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.io.IntWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Job;12 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;13 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;14 import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;15 16 public class DBApp {17 18     public static void main(String[] args) throws Exception {19 20         Configuration conf = new Configuration();21         conf.set("fs.defaultFS","file:///");22         Job job = Job.getInstance(conf);23 24         job.setJobName("DB");25         job.setJarByClass(DBApp.class);26 27         job.setOutputKeyClass(Text.class);28         job.setOutputValueClass(IntWritable.class);29 30         job.setMapperClass(DBMapper.class);31         job.setReducerClass(DBReducer.class);32 33         String driver = "com.mysql.jdbc.Driver";34         String url = "jdbc:mysql://192.168.0.254:5200/yinzhengjie";35         String name = "root";36         String pass = "yinzhengjie";37 38         DBConfiguration.configureDB(job.getConfiguration(), driver, url, name, pass);39 40         DBInputFormat.setInput(job, MyDBWritable.class,"select * from wordcount", "select count(*) from wordcount");41 42         //指定表名为“wordcount2”并指定字段为243         DBOutputFormat.setOutput(job,"wordcount2",2);44 45         //指定输入输出格式46         job.setInputFormatClass(DBInputFormat.class);47         job.setOutputFormatClass(DBOutputFormat.class);48 49         job.waitForCompletion(true);50     }51 }

 运行以上代码之后,我们可以查看数据库wordcount2表中的数据是否有新的数据生成,具体操作如下:

 

转载于:https://www.cnblogs.com/yinzhengjie/p/9297573.html

你可能感兴趣的文章
HTTP Status Codes
查看>>
WPF在XAML中Binding使用StringFormat属性
查看>>
Bootstrap的学习以及简单运用
查看>>
论文笔记之: Hierarchical Convolutional Features for Visual Tracking
查看>>
[数分提高]2014-2015-2第2教学周第1次课
查看>>
JavaScript 正则表达式
查看>>
HA模式手动切换namenode状态
查看>>
技术性能领先,阿里云网络产品全面升级为企业级
查看>>
从源码解析ERROR 1129 (HY000):Host is blocked because of many connection errors
查看>>
深入解析:由SQL解析失败看开发与DBA的性能之争
查看>>
MySQL第三方客户端工具
查看>>
Linux入门学习教程:GNU C及将Vim打造成C/C++的半自动化IDE
查看>>
融合健康理念,智能机个人简易检测成为潮流趋势!
查看>>
Oracle11gR2集群心跳单网卡改bond实施方案
查看>>
关于冒泡排序复杂度O(n)
查看>>
【分布式】分布式事务原理与实践
查看>>
HDOJ 2095 find your present (2)
查看>>
架构设计 - 自动化运维之架构设计六要点
查看>>
HDOJ 1339 A Simple Task(简单数学题,暴力)
查看>>
第十七章——配置SQLServer(2)——32位和64位系统中的内存配置
查看>>