FileInputFormat分析

FileInputFormat分析一.程序简介        在mapreduce程序运行的开始阶段,hadoop需要将待处理的文件进行切分,按定义格式读取等操作,这些操作都在InputFormat中进行。         InputFormat是一个抽象类,他含有getSplits()和createRecordReader()抽象方法,在子类中必须被实现。这两个就是InputFormat的基本方法。getSplit

一. 程序简介

        mapreduce程序运行的开始阶段,hadoop需要将待处理的文件进行切分,按定义格式读取等操作,这些操作都在InputFormat中进行。

         InputFormat是一个抽象类,他含有getSplits()createRecordReader()抽象方法,在子类中必须被实现。这两个就是InputFormat的基本方法。getSplits()确定输入对象的切分原则,而createRecordReader()则可以按一定格式读取相应数据。

二.程序详细分析

FileInputFormat中实现的getSplits()方法如下:

public List<InputSplit> getSplits(JobContext job

                                    ) throws IOException {

    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

    long maxSize = getMaxSplitSize(job);

 

    // generate splits

    List<InputSplit> splits = new ArrayList<InputSplit>();

    for (FileStatus file: listStatus(job)) {

      Path path = file.getPath();

      FileSystem fs = path.getFileSystem(job.getConfiguration());

      long length = file.getLen();

      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

      if ((length != 0) && isSplitable(job, path)) { 

        long blockSize = file.getBlockSize();

        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

 

        long bytesRemaining = length;

        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 

                                   blkLocations[blkIndex].getHosts()));

          bytesRemaining -= splitSize;

        }

        

        if (bytesRemaining != 0) {

          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 

                     blkLocations[blkLocations.length-1].getHosts()));

        }

      } else if (length != 0) {

        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));

      } else { 

        //Create empty hosts array for zero length files

        splits.add(new FileSplit(path, 0, length, new String[0]));

      }

    }

    LOG.debug("Total # of splits: " + splits.size());

    return splits;

  }

根据对代码的分析,可以看到它是对一个目标文件进行切分操作。如何拆分文件依据以下几个参数: maxsize
BlockSize
minsize


 long minSize = Math.max(getFormatMinSplitSize(),getMinSplitSize(job));

long maxSize = getMaxSplitSize(job);

long blockSize = file.getBlockSize();

 

根据这3个参数来确定切分文件块的size大小

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

         minSize: mapred.min.split.size

         maxSize : mapred.max.split.size

 
简单来讲就是切分的原则是 splitSize
不会小于 minSize
,不会大于 maxSize
,如果 blockSize
能够满足以上要求就取 blockSize
,如果不能的话就在 maxSize
minSize
中取值。

    确定了文件的切分块大小后就能进行切分操作,如下图:

    

    这里的RecordReader就是对拆分后的文件如何读取进行了封装,例如以回车符分隔行,按逗号分隔列的典型文件就在RecordReader中定义。

    当文件被split以后,会和ReacordReader一起被传递到Mapper类中,Mapper就根据这两个参数将文件转化成可被MapReduce读取的源数据结构。之后再完成mapreduce的操作。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38481.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注