多种自定义文件格式的文件输入处理
MultipleInputs可以让MR支持多种输入格式
比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormat
InputFormat(extends FileInputFormat)—>RecordReader(extends RecordReader)—>RecordClass(implements Writable)
MultipleInpts需要不同的InputFormat,一种InputFormat使用一种RecordReader来读取文件并返回一种Record格式的值
这就是这三个典型的关系,也是map过程中涉及的三个步骤的工具和产物
数据准备
a文件
1t80
2t90
3t100
4t50
5t73
b文件
1tlilit3
2txiaomingt3
3tfeifeit3
4tzhangsant3
5tlisit3
t表示分隔符
设计思路
将t前面的Text表示给map将要输入的key
t后面的作为给map要输入的value
要求自定义实现InputFormat,输出key,value格式数据。以产生Map的输入的数据(key,value)
!!!三个文件步骤!!!
InputFormat(extends FileInputFormat)—>RecordReader(extends RecordReader)—>RecordClass(implements Writable)
本例是对两个文件操作
1.两个RecordClass类(实现Writable接口)
package test.mr.multiinputs2;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/*
* 对map输入的value的预处理
* 对原始数据的预加工
*/
/*
* 第一张表数据
*/
public class FirstClass implements Writable {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public FirstClass() {
super();
// TODO Auto-generated constructor stub
}
public FirstClass(String value) {
super();
this.value = value;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.value);
}
@Override
public void readFields(DataInput in) throws IOException {
this.value = in.readUTF();
}
@Override
public String toString() {
return "FirstClasst" + value;
}
}
package test.mr.multiinputs2;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/*
* 对map输入的value的预处理
* 对原始数据的预加工
*/
/*
* 第二张表数据
*/
public class SecondClass implements Writable {
private String username;
private int classNo;
public SecondClass() {
super();
}
public SecondClass(String username, int classNo) {
super();
this.username = username;
this.classNo = classNo;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public int getClassNo() {
return classNo;
}
public void setClassNo(int classNo) {
this.classNo = classNo;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(username);
out.writeInt(classNo);
}
@Override
public void readFields(DataInput in) throws IOException {
this.username = in.readUTF();
this.classNo = in.readInt();
}
@Override
public String toString() {
return "SecondClasst" + username + "t" + classNo;
}
}
2.两个自定义RecordReader类(继承RecordReader类)
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class FirstRecordReader extends RecordReader<Text, FirstClass> {
// 定义一个真正读取split中文件的读取器
private LineRecordReader lineRecordReader = null;
private Text key = null;
private FirstClass value = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 没有读取到东西
if (!lineRecordReader.nextKeyValue()) {
key = null;
value = null;
return false;
}
Text val = lineRecordReader.getCurrentValue();
String line = val.toString();
String[] str = line.split("t");
key = new Text(str[0]);
value = new FirstClass(str[1].trim()); // 实现对原始数据的预分割
return true;
}
// 读取key的当前值
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
// 读取value的当前值
@Override
public FirstClass getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
value = null;
}
}
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class SecondRecordReader extends RecordReader<Text, SecondClass> {
// 定义一个真正读取split中文件的读取器
private LineRecordReader lineRecordReader = null;
private Text key = null;
private SecondClass value = null;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineRecordReader.nextKeyValue()) {
key = null;
value = null;
return false;
}
Text val = lineRecordReader.getCurrentValue();
String line = val.toString();
String str[] = line.split("t");
key = new Text(str[0]);
value = new SecondClass(str[1], Integer.parseInt(str[2]));
return true;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public SecondClass getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
value = null;
}
}
3.自定义两个FileInputFormat类(继承FileInputFormat类)
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class FirstInputFormat extends FileInputFormat<Text, FirstClass> {
@Override
public RecordReader<Text, FirstClass> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new FirstRecordReader();
}
}
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class SecondInputFormat extends FileInputFormat<Text, SecondClass> {
@Override
public RecordReader<Text, SecondClass> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new SecondRecordReader();
}
}
4.两个Map类
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
@Override
protected void map(Text key, FirstClass value,
Mapper<Text, FirstClass, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, new Text(value.toString()));
}
}
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
@Override
protected void map(Text key, SecondClass value,
Mapper<Text, SecondClass, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, new Text(value.toString()));
}
}
5.reduce类
package test.mr.multiinputs2;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MultiInputsRedu extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}
6.Job类
/*
* 要求自定义实现InputFormat,输出key,value格式数据
*/
public class MultiInputsMain extends Configuration implements Tool {
private String input1 = null; // 定义的多个输入文件
private String input2 = null;
private String output = null;
@Override
public void setConf(Configuration conf) {
}
@Override
public Configuration getConf() {
return new Configuration();
}
@Override
public int run(String[] args) throws Exception {
setArgs(args);
checkParam();// 对参数进行检测
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultiInputsMain.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MultiInputsRedu.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// MultipleInputs类添加文件路径
// 添加上自定义的fileInputFormat(分别是FirstInputFormat和SecondInputFormat)格式
MultipleInputs.addInputPath(job, new Path(input1),
FirstInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(job, new Path(input2),
SecondInputFormat.class, SecondMap.class);
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
return 0;
}
private void checkParam() {
if (input1 == null || "".equals(input1.trim())) {
System.out.println("no input phone-data path");
userMaunel();
System.exit(-1);
}
if (input2 == null || "".equals(input2.trim())) {
System.out.println("no input user-data path");
userMaunel();
System.exit(-1);
}
if (output == null || "".equals(output.trim())) {
System.out.println("no output path");
userMaunel();
System.exit(-1);
}
}
// 用户手册
private void userMaunel() {
System.err.println("Usage:");
System.err.println("-i1 input \t phone data path.");
System.err.println("-i2 input \t user data path.");
System.err.println("-o output \t output data path.");
}
// 对属性进行赋值
// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录)
private void setArgs(String[] args) {
for (int i = 0; i < args.length; i++) {
if ("-i1".equals(args[i])) {
input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
} else if ("-i2".equals(args[i])) {
input2 = args[++i];
} else if ("-o".equals(args[i])) {
output = args[++i];
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
ToolRunner.run(conf, new MultiInputsMain(), args); // 调用run方法
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38882.html