前置课程: HDFS开发环境搭建
简介
TextInputFormat可以将文本文件分块并逐行读入以便Map节点进行处理。
读入一行时,所产生的主键Key就是当前行在整个文本文件中的字符偏移量,而value就是该行的内容。
示例:单词个数统计
准备工作
在hdfs的根目录下创建input文件夹,然后在里面放置4个大小分别为1.5M、35M、5.5M、6.5M的小文件作为输入数据
具体代码
Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text mapOutputKey = new Text();
private IntWritable mapOutputValue = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String linevalue = value.toString(); //1.将读取的文件变成,偏移量+内容//读取一行数据
StringTokenizer st = new StringTokenizer(linevalue);//使用空格分隔
while (st.hasMoreTokens()) {
//判断是否还有分隔符,有的话代表还有单词
String word = st.nextToken();//返回从当前位置到下一个分隔符之间的字符串(单词)
mapOutputKey.set(word);
mapOutputValue.set(1);
context.write(mapOutputKey, mapOutputValue);
}
}
}
Reducer类
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outputValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0; //汇总
for (IntWritable value : values) {
sum += value.get();
}
outputValue.set(sum);
context.write(key, outputValue);
}
}
Driver类
public class WordCountDriver {
public static void main(String[] args) throws Exception {
//需要在resources下面提供core-site.xml文件
args = new String[]{
"/input/",
"/output/"
};
Configuration cfg = new Configuration(); //获取配置
Job job = Job.getInstance(cfg, WordCountDriver.class.getSimpleName());
job.setJarByClass(WordCountDriver.class);
//设置map与需要设置的内容类 + 输出key与value
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置input与output
FileInputFormat.addInputPath(job, new Path(args[0]));
Path op1 = new Path(args[1]);
FileOutputFormat.setOutputPath(job, op1);
FileSystem fs = FileSystem.get(cfg);
if (fs.exists(op1)) {
fs.delete(op1, true);
System.out.println("存在此输出路径,已删除!!!");
}
//将job交给Yarn
boolean issucess = job.waitForCompletion(true);
int status= issucess ? 0 : 1;
System.exit(status);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38422.html