文章目录
FileInputFormat切片源码解析
在提交任务过程中,我们需要对数据进行逻辑上的切片,对应代码为input.getSplits(job)
1、MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
- 一个Job的Map阶段你并行度由客户端在提交Job时的切片数决定;
- 每个Split切片分配一个MapTask并行实例处理;
- 默认情况下,切片大小=块大小;
- 切片时不考虑数据集整体,而是逐个针对每个文件单独切片
注意: 在任务提交过程中,先按文件大小排序,然后先切大文件后切小文件。
2、源码步骤
-
程序先找到数据存储的目录;
-
开始遍历处理目录下的每个文件;
-
获取文件大小:fs.sizeOf(ss.txt);
-
计算切片大小:computeSplitSize(minSize,Math.min(maxSize,blockSize))
blockSize=128M
-
每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分为一个切片
-
将切片信息写到一个切片规划文件中
-
整个切片的核心过程在getSplit()方法中完成
-
InputSplit只记录切片的元数据信息,如起始位置、长度以及节点列表等。
-
-
提交切片规划文件到Yarn上,Yarn上的MrAppMaster就可以根据切片规划文件计算MapTask个数。
3、FileInputFormat切片机制
- 按照文件内容长度进行切片
- 默认切片大小等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每个文件单独切片
案例:
3.1 源代码中计算切片大小的公式
computeSplitSize(minSize,Math.min(maxSize,blockSize))
minSize默认为1
maxSize默认为Long.MAXValue
因此,默认情况切片大小=Block大小=128M
maxSize可以调整切片的最大值:如果比blockSize小,则会让切片变小;
minSize可以调整切片的最小值:如果比blockSize大,则会让切片变大;
3.2 获取切片信息API
//获取切片的文件名称
String name = inputSplit.getPath().getName();
//根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit)context.getInputSplit();
4、FileInputFormat的实现类
FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
4.1 TextInputFormat
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
4.2 CombineTextInputFormat切片机制
-
缘由:框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
-
应用场景:小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
-
虚拟存储切片最大值设置:
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
-
切片机制:虚拟存储过程+切片过程
-
虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
-
切片过程:
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片;
如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片;
-
5、代码验证
在wordcount的基础上准备四个小文件进行测试
-
未改动前:切片为4;
-
Driver中添加代码:
//如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
切片为3;
- Driver中添加代码:
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
切片为1;
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38643.html