MapReduce对输入多文件的处理2自定义FileInputFormat类

MapReduce对输入多文件的处理2自定义FileInputFormat类多种自定义文件格式的文件输入处理MultipleInputs可以让MR支持多种输入格式比如我们有两种文件格式,那么我们就要有两套RecordClass,RecordReader和InputFormatInputFormat(extendsFileInputFormat)—>RecordReader(extendsRecordReader)—>RecordClass(imp

多种自定义文件格式的文件输入处理

MultipleInputs可以让MR支持多种输入格式
比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormat
InputFormat(extends FileInputFormat)—>RecordReader(extends RecordReader)—>RecordClass(implements Writable)
MultipleInpts需要不同的InputFormat,一种InputFormat使用一种RecordReader来读取文件并返回一种Record格式的值
这就是这三个典型的关系,也是map过程中涉及的三个步骤的工具和产物

数据准备
a文件
1t80
2t90
3t100
4t50
5t73

b文件
1tlilit3
2txiaomingt3
3tfeifeit3
4tzhangsant3
5tlisit3

t表示分隔符

设计思路

将t前面的Text表示给map将要输入的key
t后面的作为给map要输入的value

要求自定义实现InputFormat,输出key,value格式数据。以产生Map的输入的数据(key,value)

 

!!!三个文件步骤!!!

InputFormat(extends FileInputFormat)—>RecordReader(extends RecordReader)—>RecordClass(implements Writable)

本例是对两个文件操作

1.两个RecordClass类(实现Writable接口)

package test.mr.multiinputs2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
 * 对map输入的value的预处理
 * 对原始数据的预加工
 */
/*
 * 第一张表数据
 */
public class FirstClass implements Writable {
	private String value;

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}

	public FirstClass() {
		super();
		// TODO Auto-generated constructor stub
	}

	public FirstClass(String value) {
		super();
		this.value = value;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.value);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.value = in.readUTF();
	}

	@Override
	public String toString() {
		return "FirstClasst" + value;
	}
}
package test.mr.multiinputs2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
 * 对map输入的value的预处理
 * 对原始数据的预加工
 */
/*
 * 第二张表数据
 */
public class SecondClass implements Writable {
	private String username;
	private int classNo;

	public SecondClass() {
		super();
	}

	public SecondClass(String username, int classNo) {
		super();
		this.username = username;
		this.classNo = classNo;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public int getClassNo() {
		return classNo;
	}

	public void setClassNo(int classNo) {
		this.classNo = classNo;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(username);
		out.writeInt(classNo);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.username = in.readUTF();
		this.classNo = in.readInt();
	}

	@Override
	public String toString() {
		return "SecondClasst" + username + "t" + classNo;
	}

}

2.两个自定义RecordReader类(继承RecordReader类)

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FirstRecordReader extends RecordReader<Text, FirstClass> {

	// 定义一个真正读取split中文件的读取器
	private LineRecordReader lineRecordReader = null;
	private Text key = null;
	private FirstClass value = null;

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		close();
		lineRecordReader = new LineRecordReader();
		lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		// 没有读取到东西
		if (!lineRecordReader.nextKeyValue()) {
			key = null;
			value = null;
			return false;
		}
		Text val = lineRecordReader.getCurrentValue();
		String line = val.toString();
		String[] str = line.split("t");
		key = new Text(str[0]);
		value = new FirstClass(str[1].trim()); // 实现对原始数据的预分割
		return true;
	}

	// 读取key的当前值
	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	// 读取value的当前值
	@Override
	public FirstClass getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void close() throws IOException {
		if (null != lineRecordReader) {
			lineRecordReader.close();
			lineRecordReader = null;
		}
		key = null;
		value = null;
	}

}

 

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class SecondRecordReader extends RecordReader<Text, SecondClass> {
	// 定义一个真正读取split中文件的读取器
	private LineRecordReader lineRecordReader = null;
	private Text key = null;
	private SecondClass value = null;

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		close();
		lineRecordReader = new LineRecordReader();
		lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!lineRecordReader.nextKeyValue()) {
			key = null;
			value = null;
			return false;
		}
		Text val = lineRecordReader.getCurrentValue();
		String line = val.toString();
		String str[] = line.split("t");
		key = new Text(str[0]);
		value = new SecondClass(str[1], Integer.parseInt(str[2]));
		return true;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public SecondClass getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void close() throws IOException {
		if (null != lineRecordReader) {
			lineRecordReader.close();
			lineRecordReader = null;
		}
		key = null;
		value = null;
	}

}

3.自定义两个FileInputFormat类(继承FileInputFormat类)

package test.mr.multiinputs2;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class FirstInputFormat extends FileInputFormat<Text, FirstClass> {

	@Override
	public RecordReader<Text, FirstClass> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new FirstRecordReader();
	}
}

 

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class SecondInputFormat extends FileInputFormat<Text, SecondClass> {

	@Override
	public RecordReader<Text, SecondClass> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new SecondRecordReader();
	}

}

4.两个Map类

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
	@Override
	protected void map(Text key, FirstClass value,
			Mapper<Text, FirstClass, Text, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(key, new Text(value.toString()));
	}
}

 

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
	@Override
	protected void map(Text key, SecondClass value,
			Mapper<Text, SecondClass, Text, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(key, new Text(value.toString()));
	}
}

5.reduce类

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MultiInputsRedu extends Reducer<Text, Text, Text, Text> {
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		for (Text val : values) {
			context.write(key, val);
		}
	}
}

6.Job类

/*
 * 要求自定义实现InputFormat,输出key,value格式数据
 */
public class MultiInputsMain extends Configuration implements Tool {
	private String input1 = null; // 定义的多个输入文件
	private String input2 = null;
	private String output = null;

	@Override
	public void setConf(Configuration conf) {

	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}

	@Override
	public int run(String[] args) throws Exception {
		setArgs(args);
		checkParam();// 对参数进行检测

		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(MultiInputsMain.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(MultiInputsRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// MultipleInputs类添加文件路径
		// 添加上自定义的fileInputFormat(分别是FirstInputFormat和SecondInputFormat)格式
		MultipleInputs.addInputPath(job, new Path(input1),
				FirstInputFormat.class, FirstMap.class);
		MultipleInputs.addInputPath(job, new Path(input2),
				SecondInputFormat.class, SecondMap.class);

		FileOutputFormat.setOutputPath(job, new Path(output));
		job.waitForCompletion(true);
		return 0;
	}

	private void checkParam() {
		if (input1 == null || "".equals(input1.trim())) {
			System.out.println("no input phone-data path");
			userMaunel();
			System.exit(-1);
		}
		if (input2 == null || "".equals(input2.trim())) {
			System.out.println("no input user-data path");
			userMaunel();
			System.exit(-1);
		}
		if (output == null || "".equals(output.trim())) {
			System.out.println("no output path");
			userMaunel();
			System.exit(-1);
		}
	}

	// 用户手册
	private void userMaunel() {
		System.err.println("Usage:");
		System.err.println("-i1 input \t phone data path.");
		System.err.println("-i2 input \t user data path.");
		System.err.println("-o output \t output data path.");
	}

	// 对属性进行赋值
	// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) 
	private void setArgs(String[] args) {
		for (int i = 0; i < args.length; i++) {
			if ("-i1".equals(args[i])) {
				input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
			} else if ("-i2".equals(args[i])) {
				input2 = args[++i];
			} else if ("-o".equals(args[i])) {
				output = args[++i];
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		ToolRunner.run(conf, new MultiInputsMain(), args); // 调用run方法
	}
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/38882.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注