hadoop mapreduce相关类 FileInputFormat
官方链接
http://hadoop.apache.org/docs/r2.9.1/api/
功能
InputFormat会生成一个RecordReader。
(inputFile)-> InputFormat->List<InputSplit>
(InputSplit对应的切片数据) ->RecordReader-> <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> ->ReadWriter -> (output)
类继承关系
abstract InputSplit
InputSplit
封装了一个切片信息,每一个mapper获得一个InputSplit。
abstract RecordReader
RecordReader
读取InputSplit
中的切片信息,并将切片中的数据分成key/value对提供给mapper输入端;mapper#run方法中的context.nextKeyValue(),context.getCurrentKey(),context.getCurrentValue()最终都会调用RecordReader对应的nextKeyValue(),getCurrentKey(),getCurrentValue()。
abstract InputFormat
来自官方api文档中关于InputFormat的描述。
InputFormat
describes the input-specification for a Map-Reduce job.
The Map-Reduce framework relies on the InputFormat of the job to:
- Validate the input-specification of the job.
- Split-up the input file(s) into logical
InputSplit
s, each of which is then assigned to an individualMapper
. - Provide the
RecordReader
implementation to be used to glean input records from the logicalInputSplit
for processing by theMapper
.
The default behavior of file-based InputFormat
s, typically sub-classes of FileInputFormat
, is to split the input into logical InputSplit
s based on the total size, in bytes, of the input files. However, the FileSystem
blocksize of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.
Clearly, logical splits based on input-size is insufficient for many applications since record boundaries are to respected. In such cases, the application has to also implement a RecordReader
on whom lies the responsibility to respect record-boundaries and present a record-oriented view of the logical InputSplit
to the individual task.
InputFormat实现类需要实现:
//Create a record reader for a given split.
RecordReader<k,v> createRecordReader(InputSplit inputSplit, TaskAttemptContext context)
//Logically split the set of input files for the job.
List getSplits(JobContext context)
abstract FileInputFormat
实现了InputFormat
的 getSplits(JobContext context)方法,没有实现 createRecordReader(InputSplit inputSplit, TaskAttemptContext context)方法。
FileInputFormat提供了一个函数isSplitable(JobContext, Path)用来判断输入文件是否可以Split,默认返回true(可以切片)。FileInputFormat的具体实现类需要实现createRecordReader(InputSplit inputSplit, TaskAttemptContext context);可以重写isSplitable(JobContext, Path)方法(如果不可以切片,或者自定义是否可以切片的逻辑);可以重写 getSplits(JobContext context)方法。
切片机制:
- 简单地按照文件的内容长度进行切片。
- 切片大小,默认等于BlockSize。
- 切片时逐个针对每一个文件单独切片。
具体代码:
public List<InputSplit> getSplits(JobContext job) throws IOException {
//部分代码
//----------------------------------------切片大小,单位是字节(B)----------------------------------------
long minSize = Math.max(getFormatMinSplitSize()/*long(1)*/, getMinSplitSize(job)/*job conf中可以设置最小切片大小,默认long(1)*/);
//job conf中可以设置最大切片大小,默认long型最大值
long maxSize = getMaxSplitSize(job);
//获取输入目录下的所有文件的信息。
List<FileStatus> files = listStatus(job);
//遍历文件
for (FileStatus file: files) {
//获取文件所在的块信息
if (file instanceof LocatedFileStatus) {
//从本地文件系统获取
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
//从hdfs获取
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//如果当前处理文件所需的减压器支持切片
if (isSplitable(job, path)) {
//获取块大小
long blockSize = file.getBlockSize();
//使用公式Math.max(minSize, Math.min(maxSize, blockSize))计算切片大小。
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
//写入切片信息
//写入时需要判断bytesRemaining/splitSize > SPLIT_SLOP
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP/*1.1*/) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//写入切片信息
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
}
//如果当前环境配置不支持切片,直接将整个文件作为一个切片。
else {
// not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
}
abstract CombineFileInputFormat
CombineFileInputFormat
重写了FileInputFormat
的 getSplits(JobContext context)和isSplitable(JobContext, Path),没有实现 createRecordReader(InputSplit inputSplit, TaskAttemptContext context)。意味着CombineFileInputFormat有自己的切片和判断文件是否可以切片的逻辑。
该类的具体实现类需要实现createRecordReader(InputSplit inputSplit, TaskAttemptContext context)方法。
CombineFileInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
切片机制:
生成切片过程包括:虚拟存储过程和切片过程二部分。
(1)虚拟存储过程:
- 将输入目录下所有文件的大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
- 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
- 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
是否可以切片判断
- 如果根据输入文件的后缀名判断该文件是压缩文件,判断该压缩文件对应的解压器是否支持切片;支持则可以切片,不支持则不可以切片。
- 如果根据输入文件的后缀名判断该文件不是压缩文件,则可以切片。
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
TextInputFormat
TextInputFormat
是默认的FileInputFormat
具体实现类。
实现了createRecordReader(InputSplit inputSplit, TaskAttemptContext context)返回一个LineRecordReader<LongWritable, Text>(按行读取文件内容)。
重写了isSplitable(JobContext, Path)。
没有重写 getSplits(JobContext context),因此该类的切片机制同其父类FileInputFormat。isSplitable方法逻辑同CombineFileInputFormat#isSplitable()。
key
为该行在文件中的起始字节偏移量。
value
该行的内容,不包括换行符和回车符。
例如输入文件为:
hello
word
则每条记录的(key,value)对:
(0,hello)
(6,word)
KeyValueTextInputFormat
KeyValueTextInputFormat
是FileInputFormat
的具体实现类。
实现了createRecordReader(InputSplit inputSplit, TaskAttemptContext context)返回一个KeyValueLineRecordReader<LongWritable, Text>(按行读取文件内容)。
重写了isSplitable(JobContext, Path);isSplitable方法逻辑同CombineFileInputFormat#isSplitable()。
没有重写 getSplits(JobContext context),因此该类的切片机制同其父类FileInputFormat。
每一行均为一条记录,被分隔符成Key,value;可以配置分隔符(默认为”Tab”)。
Key
该记录被分隔符分隔的前一部分。
Value
该记录被分隔符分隔的后一部分。
例如输入文件:
//两单词空格部分为“tab”符。
hello word
hadoop mapreduce
则每条记录的(key,value)对:
(hello,word)
(hadoop,mapreduce)
NlineInputFormat
是FileInputFormat
具体实现类。
实现了createRecordReader(InputSplit inputSplit, TaskAttemptContext context)返回一个LineRecordReader<LongWritable, Text>(按行读取文件内容),所以key和value的格式同TextInputFormat。
重写了getSplits(JobContext context),不再按BlockSize进行切片,而是按NlineInputFormat指定的行数N进行切片。每一个文件仍然和FileInputFormat一样单独处理。
例如输入文件:
hello
word
hadoop
mapreduce
yarn
如果N=2则将该文件切成3个逻辑切片。
InputSplit1:
hello
word
该InputSplit的(key,value)为:
(0,hello)
(6,word)
InputSplit2:
hadoop
mapreduce
InputSplit3
yarn
CombineTextInputFormat
CombineTextInputFormat
是CombineFileInputFormat
的具体实现类。
没有重写**getSplits**(JobContext context),切片机制同父类。
没有重写isSplitable(JobContext, Path),判断是否可切片逻辑同父类。
实现了createRecordReader(InputSplit inputSplit, TaskAttemptContext context)返回一个CombineFileRecordReader<LongWritable,Text>,key和value的格式同TextInputFormat。
参考
http://ercoppa.github.io/HadoopInternals/MapReduceInput.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38833.html