文章目录
一、定义
1.MR是一个分布式运算程序的编程框架
,是用户开发“基于Hadoop的数据分析应用”的核心框架。
2.MR的核心功能
是将用户编写的业务逻辑代码和自带默认组件合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
。
二、优缺点
☺优点:
易于编程
: 简单的实现一些接口就可以完成分布式程序 —- 与简单的串行程序一样。
*良好的扩展性
: 可以通过增加机器提高运行。
*高容错性
:可以将故障机器上的计算任务转移至另一个节点上运行,完全有Hadoop内部自动完成。
*适合PB级以上海量数据的离线处理
☹ 缺点:
不擅长实时计算
:无法像mysql一样,在毫秒或者秒级内返回结果。
*不擅长流式计算
:流式计算的输入数据是动态的,而MR的数据数据集是静态的,不能动态变化。
*不擅长DAG有向图的计算
:多个应用程序之间存在依赖关系,后一个应用程序的输入为前一个的输出。此情况下,MR可以做,但是会造成大量的磁盘IO,性能低下。
三、MR核心编程思想 — 案例WordCount
-
MR运算程序分为两个部分:Map 、 Reduce。
-
Map数据的划分:按照块级划分,默认每128M为一块。
-
每一块数据集对应启动一个MapTask,实行完全并行计算,互不干扰。
◑ Task处理时,读取数据并
按行处理
,按空格切分
◑ 形成(K,V)
键值对的形式
◑将所有的键值对按照单词首字母,分成两个分区溢写到磁盘
-
Reduce阶段的并发ReduceTask,完全互不相犯。但是
数据来源于上一个阶段所有MapTask并发实例的输出
。 -
MR编程模型
只能包含一个M端和一个R端
,若业务逻辑复杂,可采用多个MR程序串行运行。
四、MR进程
一个完整的MR程序在分布式运行时有三类实例进程:
■ MrAppMaster
:负责成个程序的过程调度及状态协调 (准备一些资源 ~ 内存~ CPU~ 开启后续进程等等,job级别)
■ MapTask
: 负责Map阶段的整个数据处理流程 — 分
■ ReduceTask
: 负责Reduce阶段的整个数据处理流程 — 合
五、MR编程规范
三部分:Map 、 Reduce 、 Driver
-
Map阶段
– 用户自定义的Mapper
要继承自己的父类
–Mapper的输入、输出数据都是KV对的形式
,且类型可自定义
– Mapper中的业务逻辑写在map()
方法中
– map()方法(MapTask进程)对每一个<K,V>只调用一次 — 对输入行级数据只调用一次,处理后输出 -
Reduce阶段
– 用户自定义的Reducer
要继承自己的父类
–Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对
(Reducer的数入,就是Mapper的输出)
– Reducer中的业务逻辑写在Reducer()
方法中
– ReduceTask进程对每一组相同 K 的 <K,V> 组调用一次reduce()方法 -
Driver阶段
– 相当于Yarn集群的客户端,用于提交我们整个程序到Yarn集群
,提交的是封装了MR程序相关运行参数的Job对象
六、wordCount案例
① 创建工程
▲创建Maven项目
▲pom.xml文件添加依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
▲配置日志文件
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
② 代码实现
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/** * Mapper 阶段 * KEYIN 输入数据的key类型 * VALUEIN 输入数据的value类型 * KEYOUT 输出数据的key类型 * VALUEOUT 输出数据的value类型 */
public class wordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
// 创建对象
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1.获取一行数据
// atguigu atguigu
String line = value.toString();
// 2.切分
String[] words = line.split(" ");
// 3.循环写出
for (String word:words){
// 设置键 atguigu
k.set(word);
// 设置词频为 1 , 也可以在上面创建对象时默认为1
v.set(1);
// 生成键值对 (atguigu,1)
context.write(k,v);
}
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/** * Reducer 阶段 * KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型 * KEYOUT 最终输出数据的key类型 * VALUEOUT 最终输出数据的value类型 */
public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable v = new IntWritable();
@Override
// Iterable<IntWritable> values 对key的value值进行迭代实现词频统计
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// atguigu,1
// atguigu,1
// 1.累加求和
int sum = 0;
for (IntWritable value:values){
// value是IntWritable类型数据,通过get转为int型,才好计算
sum += value.get();
}
// 2.写出结果
v.set(sum);
context.write(key,v);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class wordCountDriver {
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = null;
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.设置jar存储位置
job.setJarByClass(wordCountDriver.class);
// 3.关联map、reduce类
job.setMapperClass(wordCountMapper.class);
job.setReducerClass(wordCountReducer.class);
// 4.设置Mapper阶段输出数据的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置Reducer阶段输出数据的key、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入、出路径
// FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\dataset\\"));
// FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\output\\"));
// 打jar包
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 7.提交job
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
③ 本地运行
- 设置输入输出路径为本地路径
// 6.设置输入、出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\dataset\\"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\output\\"));
- 运行结果
④ 集群运行
▲环境配置 — 打jar包Maven依赖
<plugin>
<artifactId>maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>第一章_MR概述.wordCountDriver</mainClass> // 这里要对应自己的主类路径进行修改
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
▲生成jar包
▲具体执行
- 上传文件到hdfs,以及通过Xshell上传jar包至虚拟机~
- 输出文件不需要创建,在执行jar包时会自动创建,如不删掉,会报文件夹已存在的异常。所以在上面删除了output文件夹~
- 执行jar包运行命令
hadoop jar jar包名 主类 输入文件路径 输出文件路径
▲查看结果
<!-- 这里是以后会用的一些jar的总配置,仅供参考!!! -->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zyx</groupId>
<artifactId>MR</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- 测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<!-- jdbc -->
<!-- <dependency>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <version>5.1.40</version>-->
<!-- </dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- 分词器 -->
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
<!-- fastJson 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<extdirs>libs</extdirs>
<!-- rt包没有打到项目中去 -->
<verbose />
<!-- C:/Program Files/Java/jdk1.8.0_201 是我本地安装的jdk家目录,rt.jar等jar 我在 jdk家目录下的 /jre/lib/ 目录中有发现存在,你们需要注意确认自己的实际情况,Windows分隔符英文分号,linux分隔符英文冒号 -->
<bootclasspath>G:/Projects/jdk1.8.0_202/jre/lib/rt.jar;G:/Projects/jdk1.8.0_202/jre/lib/jce.jar;G:/Projects/jdk1.8.0_202/jre/lib/jsse.jar</bootclasspath>
</compilerArguments>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <version>2.3.2</version>-->
<!-- <configuration>-->
<!-- <source>1.8</source>-->
<!-- <target>1.8</target>-->
<!-- </configuration>-->
<!-- </plugin>-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>通信数据处理.DriverTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
<executions>
<execution>
<id>make-assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
今天的文章【MapReduce】 MR初识分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/68254.html