FileInputFormat源码解析(input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt);
b)计算切片大小
在drive类中 FileInputFormat 源码里面 。用ctrl+ l 搜索Math ,在源码的279行:
切片的计算:
Math.max(minSize(1M默认的), Math.min(maxSize(200M默认的), blockSize(128M)))=128M
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
c)默认情况下,切片大小=blocksize=128M
d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成。
g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h)注意:block是HDFS物理上存储的数据,切片是对数据逻辑上的划分。
( 4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。
多个小文件(0-128M内)就切分成多少片。归档文件,只是节省namenode元数据内存。如果进行切片,依然切成对应文件个数
1)FileInputFormat中默认的切片机制:
(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 320M file2.txt 10M |
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1– 0~128 file1.txt.split2– 128~256 file1.txt.split3– 256~320 file2.txt.split1– 0~10M |
FileInputFormat切片大小的参数配置
通过分析源码,在FileInputFormat的280行中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。
maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。
3)获取切片信息API
// 根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 获取切片的文件名称 String name = inputSplit.getPath().getName(); |
分区案例:
package com.itstar.mr.wc0908;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author 李庆
* 2019-09-10
* 星期一
*/
public class WCPartion extends Partitioner<Text, IntWritable> {
/**
* Partitioner的数据是来源到mapper,然后到环形缓冲区,之后进行切片
*
* 自定义分区规则:单词长度为奇数时放到0分区
* 单词长度为偶数时放到1分区
*
* */
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
//1获取单词
String word = text.toString();
//2计算单词的长度
int length = word.length();
//3按单词长度自定义分区
if (length % 2 == 0) {
return 1;
} else {
return 0;
}
}
}
在drive端在提交job信息:
job.setPartitionerClass(WCPartion.class);
job.setNumReduceTasks(2);
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38783.html