Hadoop的FileInputFormat解析

Hadoop的FileInputFormat解析代码存于github:https://github.com/zuodaoyong/Hadoop在运行MapReduce程序时,输入的文件格式有:日志文件,二进制文件,数据库表等,那么针对不同的数据类型,MapReduce提供了相应的读取数据接口实现类TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextIn…

代码存于github:https://github.com/zuodaoyong/Hadoop

在运行MapReduce程序时,输入的文件格式有:日志文件,二进制文件,数据库表等,那么针对不同的数据类型,MapReduce提供了相应的读取数据接口实现类

TextInputFormat,KeyValueTextInputFormat,NLineInputFormat,CombineTextInputFormat和自定义InputFormat

1、TextInputFormat(Text类型)

TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录。

键是存储该行在整个文件中的起始字节偏移量,LongWritable类型,值是该行内容,不包含任何终止符(换行符和回车符)

Hadoop的FileInputFormat解析

2、KeyValueTextInputFormat

每一行均为一条记录,被分隔符分割成key,value,在驱动类中设定分隔符,默认分隔符是tab(\t)

//获取配置信息,job对象实例
Configuration configuration=new Configuration();
configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");

Hadoop的FileInputFormat解析

(3)NLineInputFormat

每个map进程处理的InputSplit不再按照Block块划分,而是按照NLineInputFormat指定的行数N来划分,即输入文件的总行数/N=切片数。如果不能够整除,切片数=商+1

Hadoop的FileInputFormat解析

(4)自定义InputFormat

Hadoop自带的InputFormat类型不能满足所有应用场景时,需要自定义InputFormat来解决

自定义InputFormat步骤

(1)自定义InputFormat类继承FileInputFormat

        i)重写isSplitable方法,返回false不可切割

        ii)重写RecordReader方法,创建自定义的RecordReader对象并初始化

public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable>{


   @Override
   protected boolean isSplitable(JobContext context, Path filename) {
      return false;
   }
   @Override
   public RecordReader<NullWritable, BytesWritable> createRecordReader(
         InputSplit inputSplit, TaskAttemptContext context)
         throws IOException, InterruptedException {
      WholeRecordReader recordReader=new WholeRecordReader();
      recordReader.initialize(null, context);
      return recordReader;
   }


}

(2)改写RecordReader,实现一次读取一个完整文件封装为KV

i)采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文件都封装在value中

ii)获取文件路径信息+名称,并设置key

public class WholeRecordReader extends RecordReader<Text,BytesWritable>{

   private BytesWritable value=new BytesWritable();
   private Text key=new Text();
    private boolean isProcess=false;
    private FileSplit fileSplit;
    private Configuration configuration;
   @Override
   public void close() throws IOException {
   }

   @Override
   public Text getCurrentKey() throws IOException, InterruptedException {
      return key;
   }

   @Override
   public BytesWritable getCurrentValue() throws IOException, InterruptedException {
      return value;
   }

   @Override
   public float getProgress() throws IOException, InterruptedException {
      return isProcess?1:0;
   }

   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
      fileSplit=(FileSplit) inputSplit;
      configuration = context.getConfiguration();
   }

   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
      
      if(!isProcess){
         FSDataInputStream inputStream=null;
         FileSystem fileSystem=null;
         try {
            byte[] bs=new byte[(int) fileSplit.getLength()];
            //获取文件系统
            Path path = fileSplit.getPath();
            fileSystem = path.getFileSystem(configuration);
            //打开文件流
            inputStream = fileSystem.open(path);
            IOUtils.readFully(inputStream, bs, 0,bs.length);
            value.set(bs, 0, bs.length);
                key.set(path.toString());
         }catch(Exception e){
            e.printStackTrace();
         }finally {
            if(inputStream!=null){
               inputStream.close();
            }
            if(fileSystem!=null){
               fileSystem.close();
            }
         }
         isProcess=true;
         return true;
      }
      
      return false;
   }

   
}

(3)在输出时使用SequenceFileOutputFormat输出合并文件

job.setInputFormatClass(WholeInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

Driver端

public static void main(String[] args) throws Exception {
   System.setProperty("HADOOP_USER_NAME", "root");
   Configuration configuration=new Configuration();
   Job job = Job.getInstance(configuration);
   //设置输入的inputFormat
   job.setInputFormatClass(WholeInputFormat.class);
   //设置输出的outputFormat
   job.setOutputFormatClass(SequenceFileOutputFormat.class);
   job.setMapperClass(SequenceFileMapper.class);
   job.setMapOutputKeyClass(Text.class);
   job.setMapOutputValueClass(BytesWritable.class);
   job.setReducerClass(SequenceFileReduce.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(BytesWritable.class);
   FileInputFormat.setInputPaths(job, new Path("/mapreduce/inputformat/sequencefiles"));
    FileOutputFormat.setOutputPath(job, new Path("/mapreduce/inputformat/output"));
    boolean waitForCompletion = job.waitForCompletion(true);
    System.exit(waitForCompletion==true?0:1);
}

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38587.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注