Hadoop基础-MapReduce的常用文件格式介绍
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.MR文件格式-SequenceFile
1>.生成SequenceFile文件(SequenceFileOutputFormat)
The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.output; 7 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Mapper;11 12 import java.io.IOException;13 14 public class SeqMapper extends Mapper{15 16 @Override17 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {18 19 context.write(key,value);20 21 22 }23 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.output; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem;10 import org.apache.hadoop.fs.Path;11 import org.apache.hadoop.io.LongWritable;12 import org.apache.hadoop.io.SequenceFile;13 import org.apache.hadoop.io.Text;14 import org.apache.hadoop.mapreduce.Job;15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;17 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;18 19 /**20 * 把wc.txt变为SequenceFile21 * k-偏移量-LongWritable22 * v-一行文本-Text23 */24 public class SeqApp {25 26 public static void main(String[] args) throws Exception {27 28 Configuration conf = new Configuration();29 conf.set("fs.defaultFS","file:///");30 FileSystem fs = FileSystem.get(conf);31 Job job = Job.getInstance(conf);32 33 job.setJobName("Seq-Out");34 job.setJarByClass(SeqApp.class);35 36 //设置输出格式,这里的输出格式要和咱们Mapper程序的格式要一致哟!37 job.setOutputKeyClass(LongWritable.class);38 job.setOutputValueClass(Text.class);39 40 job.setMapperClass(SeqMapper.class);41 42 FileInputFormat.addInputPath(job, new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\word.txt"));43 44 Path outPath = new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\seqout");45 if (fs.exists(outPath)){46 fs.delete(outPath);47 }48 FileOutputFormat.setOutputPath(job,outPath);49 50 //设置文件输出格式为SequenceFile51 job.setOutputFormatClass(SequenceFileOutputFormat.class);52 53 //设置SeqFile的压缩类型为块压缩54 SequenceFileOutputFormat.setOutputCompressionType(job,SequenceFile.CompressionType.BLOCK);55 56 //以上设置参数完毕后,我们通过下面这行代码就开始运行job57 job.waitForCompletion(true);58 }59 }
运行以上代码之后,我们可以去输出目录通过hdfs命令查看生成的SequenceFile文件内容,具体操作如下:
2>.对SequenceFile文件进行单词统计测试(SequenceFileInputFormat)
我们就不用去可以找具体的SequenceFile啦,我们直接用上面生成的Sequence进行测试,具体代码如下:
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.input; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Mapper;12 13 import java.io.IOException;14 15 public class SeqMapper extends Mapper{16 17 @Override18 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {19 20 String line = value.toString();21 String[] arr = line.split(" ");22 for(String word: arr){23 context.write(new Text(word),new IntWritable(1));24 25 }26 27 28 }29 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.input; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text;10 import org.apache.hadoop.mapreduce.Reducer;11 import java.io.IOException;12 13 public class SeqReducer extends Reducer{14 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {15 Integer sum = 0;16 for (IntWritable value : values) {17 sum += value.get();18 }19 context.write(key, new IntWritable(sum));20 }21 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.sequencefile.input; 7 8 9 import org.apache.hadoop.conf.Configuration;10 import org.apache.hadoop.fs.FileSystem;11 import org.apache.hadoop.fs.Path;12 import org.apache.hadoop.io.IntWritable;13 import org.apache.hadoop.io.Text;14 import org.apache.hadoop.mapreduce.Job;15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;16 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;18 19 public class SeqApp {20 public static void main(String[] args) throws Exception {21 Configuration conf = new Configuration();22 conf.set("fs.defaultFS","file:///");23 FileSystem fs = FileSystem.get(conf);24 Job job = Job.getInstance(conf);25 job.setJobName("Seq-in");26 job.setJarByClass(SeqApp.class);27 job.setOutputKeyClass(Text.class);28 job.setOutputValueClass(IntWritable.class);29 job.setMapperClass(SeqMapper.class);30 job.setReducerClass(SeqReducer.class);31 //将我们生成的SequenceFile文件作为输入32 FileInputFormat.addInputPath(job, new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\seqout"));33 Path outPath = new Path("D:\\10.Java\\IDE\\yhinzhengjieData\\MyHadoop\\out");34 if (fs.exists(outPath)){35 fs.delete(outPath);36 }37 FileOutputFormat.setOutputPath(job, outPath);38 //设置输入格式39 job.setInputFormatClass(SequenceFileInputFormat.class);40 //以上设置参数完毕后,我们通过下面这行代码就开始运行job41 job.waitForCompletion(true);42 }43 }
运行以上代码之后,我们可以查看输出的单词统计情况,具体操作如下:
二.MR文件格式-DB
1>.创建数据库表信息
create database yinzhengjie;use yinzhengjie;create table wordcount(id int,line varchar(100));insert into wordcount values(1,'hello my name is yinzhengjie');insert into wordcount values(2,'I am a good boy');create table wordcount2(word varchar(100),count int);
2>.编写代码
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7 8 import org.apache.hadoop.io.Writable; 9 import org.apache.hadoop.mapreduce.lib.db.DBWritable;10 11 import java.io.DataInput;12 import java.io.DataOutput;13 import java.io.IOException;14 import java.sql.PreparedStatement;15 import java.sql.ResultSet;16 import java.sql.SQLException;17 18 /**19 * 设置数据对应的格式,需要实现两个接口,即Writable, DBWritable。20 */21 public class MyDBWritable implements Writable, DBWritable {22 23 //注意 : 这里我们定义了2个私有属性,这两个属性分别对应的数据库中的字段,id和line24 private int id;25 private String line;26 27 28 //wrutable串行化29 public void write(DataOutput out) throws IOException {30 out.writeInt(id);31 out.writeUTF(line);32 }33 34 //writable反串行化,注意反串行化的顺序要和串行化的顺序保持一致35 public void readFields(DataInput in) throws IOException {36 id = in.readInt();37 line = in.readUTF();38 39 }40 41 42 //DB串行化,设置值的操作43 public void write(PreparedStatement st) throws SQLException {44 //指定表中的第一列为id列45 st.setInt(1, id);46 //指定表中的第二列为line列47 st.setString(2,line);48 49 }50 51 //DB反串行,赋值操作52 public void readFields(ResultSet rs) throws SQLException {53 //读取数据库的第一列,我们赋值给id54 id = rs.getInt(1);55 //读取数据库的第二列,我们赋值给line56 line = rs.getString(2);57 }58 59 public int getId() {60 return id;61 }62 63 public void setId(int id) {64 this.id = id;65 }66 67 public String getLine() {68 return line;69 }70 71 public void setLine(String line) {72 this.line = line;73 }74 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7 8 import org.apache.hadoop.io.Writable; 9 import org.apache.hadoop.mapreduce.lib.db.DBWritable;10 11 import java.io.DataInput;12 import java.io.DataOutput;13 import java.io.IOException;14 import java.sql.PreparedStatement;15 import java.sql.ResultSet;16 import java.sql.SQLException;17 18 public class MyDBWritable2 implements Writable, DBWritable {19 //这两个属性分别对应的数据库中的字段,word和count分别对应的是输出表中的字段哟。20 private String word;21 private int count;22 //wrutable串行化23 public void write(DataOutput out) throws IOException {24 out.writeUTF(word);25 out.writeInt(count);26 }27 //writable反串行化28 public void readFields(DataInput in) throws IOException {29 word = in.readUTF();30 count = in.readInt();31 32 }33 //DB串行化34 public void write(PreparedStatement st) throws SQLException {35 st.setString(1,word);36 st.setInt(2,count);37 38 }39 //DB反串行40 public void readFields(ResultSet rs) throws SQLException {41 word = rs.getString(1);42 count = rs.getInt(2);43 }44 public String getWord() {45 return word;46 }47 public void setWord(String word) {48 this.word = word;49 }50 public int getCount() {51 return count;52 }53 public void setCount(int count) {54 this.count = count;55 }56 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.LongWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Mapper;12 13 import java.io.IOException;14 15 /**16 * 注意MyDBWritable为数据库输入格式哟17 */18 public class DBMapper extends Mapper{19 @Override20 protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {21 String line = value.getLine();22 String[] arr = line.split(" ");23 for(String word : arr){24 context.write(new Text(word), new IntWritable(1));25 }26 }27 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.NullWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Reducer;12 13 import java.io.IOException;14 15 public class DBReducer extends Reducer{16 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {17 Integer sum = 0;18 for (IntWritable value : values) {19 sum += value.get();20 }21 MyDBWritable2 db = new MyDBWritable2();22 //设置需要往数据表中写入数据的值23 db.setWord(key.toString());24 db.setCount(sum);25 //将数据写到到数据库中26 context.write(db,NullWritable.get());27 }28 }
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.dbformat; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.io.IntWritable;10 import org.apache.hadoop.io.Text;11 import org.apache.hadoop.mapreduce.Job;12 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;13 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;14 import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;15 16 public class DBApp {17 18 public static void main(String[] args) throws Exception {19 20 Configuration conf = new Configuration();21 conf.set("fs.defaultFS","file:///");22 Job job = Job.getInstance(conf);23 24 job.setJobName("DB");25 job.setJarByClass(DBApp.class);26 27 job.setOutputKeyClass(Text.class);28 job.setOutputValueClass(IntWritable.class);29 30 job.setMapperClass(DBMapper.class);31 job.setReducerClass(DBReducer.class);32 33 String driver = "com.mysql.jdbc.Driver";34 String url = "jdbc:mysql://192.168.0.254:5200/yinzhengjie";35 String name = "root";36 String pass = "yinzhengjie";37 38 DBConfiguration.configureDB(job.getConfiguration(), driver, url, name, pass);39 40 DBInputFormat.setInput(job, MyDBWritable.class,"select * from wordcount", "select count(*) from wordcount");41 42 //指定表名为“wordcount2”并指定字段为243 DBOutputFormat.setOutput(job,"wordcount2",2);44 45 //指定输入输出格式46 job.setInputFormatClass(DBInputFormat.class);47 job.setOutputFormatClass(DBOutputFormat.class);48 49 job.waitForCompletion(true);50 }51 }
运行以上代码之后,我们可以查看数据库wordcount2表中的数据是否有新的数据生成,具体操作如下: