代码存于github:https://github.com/zuodaoyong/Hadoop
在运行MapReduce程序时,输入的文件格式有:日志文件,二进制文件,数据库表等,那么针对不同的数据类型,MapReduce提供了相应的读取数据接口实现类
TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextInputFormat和自定义InputFormat
1、TextInputFormat(Text类型)
TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录。
键是存储该行在整个文件中的起始字节偏移量,LongWritable类型,值是该行内容,不包含任何终止符(换行符和回车符)
2、KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割成key,value,在驱动类中设定分隔符,默认分隔符是tab(\t)
//获取配置信息,job对象实例
Configuration configuration=new Configuration();
configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");
(3)NLineInputFormat
每个map进程处理的InputSplit不再按照Block块划分,而是按照NLineInputFormat指定的行数N来划分,即输入文件的总行数/N=切片数。如果不能够整除,切片数=商+1
(4)自定义InputFormat
Hadoop自带的InputFormat类型不能满足所有应用场景时,需要自定义InputFormat来解决
自定义InputFormat步骤
(1)自定义InputFormat类继承FileInputFormat
i)重写isSplitable方法,返回false不可切割
ii)重写RecordReader方法,创建自定义的RecordReader对象并初始化
public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeRecordReader recordReader=new WholeRecordReader();
recordReader.initialize(null, context);
return recordReader;
}
}
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
i)采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件都封装在value中
ii)获取文件路径信息+名称,并设置key
public class WholeRecordReader extends RecordReader<Text,BytesWritable>{
private BytesWritable value=new BytesWritable();
private Text key=new Text();
private boolean isProcess=false;
private FileSplit fileSplit;
private Configuration configuration;
@Override
public void close() throws IOException {
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return isProcess?1:0;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
fileSplit=(FileSplit) inputSplit;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!isProcess){
FSDataInputStream inputStream=null;
FileSystem fileSystem=null;
try {
byte[] bs=new byte[(int) fileSplit.getLength()];
//获取文件系统
Path path = fileSplit.getPath();
fileSystem = path.getFileSystem(configuration);
//打开文件流
inputStream = fileSystem.open(path);
IOUtils.readFully(inputStream, bs, 0,bs.length);
value.set(bs, 0, bs.length);
key.set(path.toString());
}catch(Exception e){
e.printStackTrace();
}finally {
if(inputStream!=null){
inputStream.close();
}
if(fileSystem!=null){
fileSystem.close();
}
}
isProcess=true;
return true;
}
return false;
}
}
(3)在输出时使用SequenceFileOutputFormat输出合并文件
job.setInputFormatClass(WholeInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
Driver端
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration configuration=new Configuration();
Job job = Job.getInstance(configuration);
//设置输入的inputFormat
job.setInputFormatClass(WholeInputFormat.class);
//设置输出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(SequenceFileMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setReducerClass(SequenceFileReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path("/mapreduce/inputformat/sequencefiles"));
FileOutputFormat.setOutputPath(job, new Path("/mapreduce/inputformat/output"));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion==true?0:1);
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38587.html