文章目录
1.Flink概述
Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink 是做 Batch 计算的,但是在 2014 年,StratoSphere 里面的核心成员孵化出 Flink,同年将 Flink 捐赠 Apache,并在后来成为 Apache 的顶级大数据项目,同时 Flink 计算的主流方向被定位为 Streaming,即用流式计算来做所有大数据的计算,这就是 Flink 技术诞生的背景。
Flink 官网:https://flink.apache.org/
核心理念:Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架
Apache Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同 一个 Flink 运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们它们所提供的 SLA 是完全不相同的:流处理一般需要支持低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有 MapReduce、Tez、Crunch、 Spark,实现流处理的开源方案有 Samza、Storm。
Flink 在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基 于同一个 Flink 运行时(Flink Runtime),分别提供了流处理和批处理 API,而这两种 API 也是 实现上层面向流处理、批处理类型应用框架的基础。
总的来说,现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供 的 SLA(Service-Level-Aggreement,服务等级协议)是完全不相同的:
-
流处理一般需要支持低延迟、Exactly-once 保证,
-
批处理需要支持高吞吐、高效处理。
Flink 从另一个视角看待流处理和批处理,将二者统一起来:
-
Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;
-
批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。
2.Flink的特性
2.1Flink核心特性
-
支持高吞吐、低延迟、高性能的流处理
-
支持带有事件时间的窗口(Window)操作
-
支持有状态计算的 Exactly-once 语义
-
支持高度灵活的窗口(time/count/session)Window 操作,以及 data-driven 驱动
-
支持具有 BackPressure 功能的持续流模型
-
支持基于轻量级分布式快照(Snapshot)实现的容错
-
一个运行时同时支持 Batch on Streaming 处理和Streaming 处理
-
Flink 在 JVM 内部实现了自己的内存管理
-
支持迭代计算
-
支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果进行缓存
2.2Flink特点
-
Streaming-first:流处理引擎
-
Fault-tolerant:容错,可靠性,checkpoint
-
Scalable:可扩展性,1000 节点以上
-
Performance:性能,高吞吐量,低延迟
2.3Flink关键特性
-
低延时:提供 ms 级时延的处理能力
-
Exactly Once:提供异步快照机制,保证所有数据真正处理一次
-
HA:JobManager 支持主备模式,保证无单点故障
-
水平扩展能力:TaskManager 支持手动水平扩展
2.4Hadoop兼容性
Flink 能够支持 YARN,能够从 HDFS 和 HBase 中获取数据
能够使用所有的 Hadoop 的格式化输入和输出
能够使用 Hadoop 原有的 Mappers 和 Reducers,并且能与 Flink 的操作混合使用
能够更快的运行 Hadoop 作业
3.Flink的优势
流场景使用案例 | 正确性保证 | API 分层体系 |
---|---|---|
1、数据驱动的应用 2、批流数据分析 3、数据通道和 ETL | 1、Exactly-once 状态一致性保 证 2、事件时间处理 3、复杂的 late date 处理 | 1、统一 SQL 支持 Stream 和 Batch 数据处理 2、DataStream API & DataSet API 3、ProcessFunction (Time & State) |
Operational Focus | 适用于各种应用场景 | 高性能 |
1、部署灵活 2、高可用配置 3、Savepoints | 1、架构可扩展 2、超大 state 支持 3、增量 checkpointing | 1、低延时 2、高吞吐 3、内存计算 |
-
All streaming use cases:所有的流式用例
-
Guaranteed correctness:保证数据的准确性
-
Layered APIs:分层的API
-
Operational Focus:重点放在流处理的操作方面
-
Scales to any use case:可扩展至任何用例
-
Excellent Performance:完美的表现
4.Flink核心四大基石
Apache Flink 之所以能越来越受欢迎,我们认为离不开它最重要的四个基石:
Checkpoint、State、Time、Window
5.Flink应用场景
-
Event-driven Applications:事件驱动应用
-
Data Analytics Applications:数据分析应用
-
Data Pipeline Applications:数据挖掘应用
Flink 最适合的应用场景是低延时的数据处理场景:高并发处理数据,实验毫秒级,且兼具可靠性。
典型应用场景有互联网金融业务、点击流日志处理、舆情监控。
-
优化电子商务的实时搜索结果:阿里巴巴的所有基础设施团队使用 flink 实时更新产品细节和库存信息,为用户提供更高的关联性。
-
针对数据分析团队提供实时流处理服务:king 通过 flink-powered 数据分析平台提供实 时数据分析,从游戏数据中大幅缩短了观察时间
-
网络/传感器检测和错误检测:Bouygues 电信公司,是法国最大的电信供应商之一, 使用 flink 监控其有线和无线网络,实现快速故障响应。
-
商业智能分析 ETL:Zalando 使用 flink 转换数据以便于加载到数据仓库,将复杂的转换操作转化为相对简单的并确保分析终端用户可以更快的访问数据。
Flink 的典型功能:
-
Flink 是一个纯流式系统,吞吐量实际测试可达 100K EPS。而不像某些框架是用 mini batch 的模式来达到所谓的流式处理的;
-
面对不同的用户数据格式,我们必须支持多种数据源,这一点上 Flink 内置的对多种数 据源的支持(CSV,Kafka,Hbase,Text,Socket 数据等)也为用户数据的接入提供了便利;
-
Flink 强大的窗口机制(包括翻转窗口,滑动窗口,session 窗口,全窗口以及允许用户 自定义窗口)可以满足复杂的业务逻辑,使得用户可以编写复杂的业务规则;
-
Flink 内置的 RocksDB 数据存储格式使其数据处速度快且资源消耗少,在 Checkpoint 上 起到了至关重要的作用;
-
Flink 对算子(operator)的高可控性,使得用户可以灵活添加删除或更改算子行为。 这一点对于动态部署有着至关重要的意义。
Flink 非常适合于:
-
多种数据源(有时不可靠):当数据是由数以百万计的不同用户或设备产生的,它是安全的假设数据会按照事件产生的顺序到达,和在上游数据失败的情况下,一些事件可能会比他们晚几个小时,迟到的数据也需要计算,这样的结果是准确的。
-
应用程序状态管理:当程序变得更加的复杂,比简单的过滤或者增强的数据结构,这个时候管理这些应用的状态将会变得比较难(例如:计数器,过去数据的窗口,状态机,内置数据库)。Flink 提供了工具,这些状态是有效的,容错的,和可控的,所以你不需要 自己构建这些功能。
-
数据的快速处理:有一个焦点在实时或近实时用例场景中,从数据生成的那个时刻, 数据就应该是可达的。在必要的时候,Flink 完全有能力满足这些延迟。
-
海量数据处理:这些程序需要分布在很多节点运行来支持所需的规模。Flink 可以在大 型的集群中无缝运行,就像是在一个小集群一样。
6.Flink执行引擎解析/架构
6.1Flink集群架构
我们可以了解到 Flink 几个最基础的概念,Client、JobManager 和 TaskManager。Client 用来提交任务给 JobManager,JobManager 分发任务给 TaskManager 去执行,然后 TaskManager 会心跳的汇报任务状态。看到这里,有的人应该已经有种回到 Hadoop 一代的错觉。确实,从架构图去看,JobManager 很像当年的 JobTracker,TaskManager 也很像当年的 TaskTracker。 然而有一个最重要的区别就 TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之间的 Shuffle,而对 Flink 而言,可能是很多级,并且在 TaskManager 内 部和 TaskManager 之间都会有数据传递,而不像 Hadoop,是固定的 Map 到 Reduce。
6.2JobManagers, TaskManagers, Clients
Flink 是一个分布式的主从架构,既集群运行时是由主节点和从节点组成。Flink 的分布式执行包括两个重要的进程:Master 和 Worker。执行 Flink 程序时,多个进程参与执行,即作业 管理器(JobManager),任务管理器(TaskManager)和作业客户端(JobClient)。
Flink程序需要提交给 Job Client。然后,JobClient 将作业提交给 JobManager。JobManager 负责协调资源分配和作业执行。它首先要做的是分配所需的资源。资源分配完成后,任务将提交给相应的 TaskManager。在接收任务时,TaskManager 启动一个线程以开始执行。执行到 位时,TaskManager 会继续向 JobManager 报告状态更改。可以有各种状态,例如开始执行, 正在进行或已完成。作业执行完成后,结果将发送回客户端(JobClient)。
要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是 Flink 集群任务启动后架构图:
-
Program Code:编写的 Flink 应用程序代码
-
Job Client:JobClient 不是 Flink 程序执行的内部部分,但它是任务执行的起点。JobClient 负责接受用户的程序代码,然后创建数据流,将数据流提交给 JobManager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户
-
JobManager:主进程(也称为作业管理器)协调和管理程序的执行。它的主要职责包括安排任务,管理 checkpoint,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多 master,但要保证一个是 leader, 其他是 standby;JobManager 包含 ActorSystem、Scheduler、CheckPoint三个重要的组件
-
TaskManager:从 JobManager 处接收需要部署的 Task。TaskManager 是在 JVM 中的一个或多个线程中执行任务的工作节点。任务执行的并行性由每个 TaskManager 上可用的任务槽决定。每个任务代表分配给任务槽的一组资源。例如,如果 TaskManager 有四个插槽,那么 它将为每个插槽分配 25%的内存。可以在任务槽中运行一个或多个线程。同一插槽中的线程共享相同的 JVM。同一 JVM 中的任务共享 TCP 连接和心跳消息。TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。 默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。
-
TaskSlot:任务槽,类似于 YARN 当中的 Container,用于资源的封装。但是在 Flink 中, taskSlot 只负责封装内存的资源,不包含 CPU 的资源。每一个 TaskManager 中会包含 3 个 TaskSlot,所以每一个 TaskManager 中最多能并发执行的任务是可控的,最多 3 个。TaskSlot 有独占的内存资源,在一个 TaskManager 中可以运行不同的任务。
-
Task:TsakSlot 当中的 Task 就是任务执行的具体单元。
6.3Task Slots and Resources
每个 worker (TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个子任务 task。为了控制 worker 接受多少 task,worker 具有所谓的 task slot(至少一个)。
每个 task slot 表示 TaskManager 资源的一个固定子集。例如,一个有三个 slots 的 TaskManager 会将其 1/3 的托管内存分配给每个插槽。对资源进行插槽管理意味着子任务不会与来自其他作业的子任务争夺托管内存,而是拥有一定数量的预留托管内存。注意,这里没有发生 CPU 隔离;当前插槽只分隔任务的托管内存。
通过调整任务槽的数量,用户可以定义子任务如何彼此隔离。每个 TaskManager 有一个插槽意味着每个任务组运行在单独的 JVM 中(例如,可以在单独的容器中启动 JVM)。拥有多个插槽意味着更多的子任务共享同一个 JVM。相同 JVM 中的任务共享 TCP 连接(通过多路复用) 和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。
默认情况下,Flink 允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自相 同的作业。结果是一个槽可以容纳作业的整个管道。允许这个插槽共享有两个主要好处:
-
Flink 集群需要的任务插槽与作业中使用的最高并行度一样多。不需要计算一个程序总共包 含多少任务(具有不同的并行度)。
-
更容易得到更好的资源利用。如果没有插槽共享,非密集型 source/map()子任务将阻塞与资 源密集型窗口子任务一样多的资源。使用插槽共享,将我们示例中的基本并行度从 2 提高到 6,可以充分利用插槽资源,同时确保繁重的子任务在任务管理器中得到公平分配。
6.4Tasks and Operator Chains
对于分布式执行,将操作符子任务一起链接到任务中。每个任务由一个线程执行。将操作符链接到任务中是一种有用的优化:它减少了线程到线程切换和缓冲的开销,增加了总体吞吐量,同时降低了延迟,可以对链接行为进行配置。下图中的示例数据流使用 5 个子任务执行,因此使用 5 个并行线程。
6.5Flink组织架构
Flink 具有分层架构,其中每个组件都是特定层的一部分。每个层都建立在其他层之上,以实现清晰的抽象。Flink 旨在在本地,YARN 群集或云上运行。Runtime 是 Flink 的核心数据处理引擎,它通过 JobGraph 形式的 API 接收程序,在执行 JobGraph 时,Flink 提供了多种候选部署方案(如 local,remote,YARN 等)。JobGraph 即为一个一般化的并行数据流图(data flow), 它拥有任意数量的 Task 来接收和产生 data stream。
DataStream 和 DataSet API 是程序员可用于定义 Job 的接口。编译程序时,这些 API 会生成 JobGraphs。编译后,DataSet API 允许优化器生成最佳执行计划,而 DataStream API 使用流 构建来实现高效的执行计划。DataStream API 和 DataSet API 都会使用单独编译的处理方式生 成 JobGraph。DataSet API 使用 optimizer 来决定针对程序的优化方法,而 DataStream API 则 使用 stream builder 来完成该任务。
然后根据部署模型将优化的 JobGraph 提交给执行程序。可以选择本地,远程或 YARN 部署模式。如果已经运行了 Hadoop 集群,那么最好使用 YARN 部署模式。
Flink附随了一些产生DataSet或DataStream API程序的的类库和API:处理逻辑表查询的Table, 机器学习的 FlinkML,图像处理的 Gelly,复杂事件处理的 CEP。
7.Flink容错 State 和 Checkpoint
7.1状态解释
在批处理过程中,数据是划分为块分片去完成的,然后每一个 Task 去处理一个分片。当 分片执行完成后,把输出聚合起来就是最终的结果。在这个过程当中,对于 state 的需求还是比较小的。在流计算过程中,对 State 有非常高的要求,因为在流系统中输入是一个无限制的流, 会持续运行从不间断。在这个过程当中,就需要将状态数据很好的管理起来。Flink 的失败恢复依赖于“检查点机制+可部分重发的数据源”。检查点机制:检查点定期触发,产生快照,快照中记录了:(1)当前检查点开始时数据源(例如 Kafka)中消息的 offset (2)记录了所有有状态的 operator 当前的状态信息(例如 sum 中的数值)。可部分重发的数据源:Flink 选择最近完成的检查点 K。然后系统重放整个分布式的数据 流,然后给予每个 operator 他们在检查点 k 快照中的状态。数据源被设置为从位置 Sk 开始 重新读取流。例如在 Apache Kafka 中,那意味着告诉消费者从偏移量 Sk 开始重新消费。Flink 中有两种基本类型的 State,即 Keyed State 和 Operator State。State 可以被记录,在失败的情况下数据还可以恢复。state 一般指一个具体的 task/operator 的状态【state 数据默认保存在 JVM 的 堆内存中】
7.2State详解
我们写的 wordcount 的程序没有包含状态管理。如果一个 task 在运行中挂掉了,那 么它在内存当中的状态数据都会丢失。所有的数据都需要重新计算。从容错和消息处理的语义上,Flink 引入了 State 和 CheckPoint
首先区分两个概念:
-
State 一般是指一个具体的 task/operator 的状态(state 数据默认保存在 java 的堆内存中)
-
checkpoint 则表示了一个 Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有 的 task/operator 的状态。可以理解成 checkpoint 是把所有 state 数据持久化存储了。
State 可以被记录,这样可以在失败的情况下进行恢复
Flink 中有两种 State
-
KeyedState
-
OperatorState
KeyedState 和 OperatorState 都有两种形式存在
-
原始状态 RawState
原始状态,是由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte[]来读写内容,对其内部数据结构一无所知
-
托管状态 ManagedState
托管状态是由 Flink 框架管理的状态
通常,在 DataStream 上的状态,推荐使用托管的状态,当实现一个用户自定义的 Operator 的时候,会使用到原始状态
7.3CheckPoint
checkpoint【可以理解为 checkpoint 是把 state 数据持久化存储了】,则表示 了一个 Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有 task/operator 的状态
CheckPoint 是 Flink 容错的主要机制。它不断为分布式数据流和 executor 状态拍摄快照。它的思想来自 Chandy-Lamport 算法,但已根据 Flink 的定制要求进行了修改。
Flink 基于 Chandy-Lamport 算法实现了一个分布式的一致性快照,从而提供了一致性的语义。 Chandy-Lamport 算法实际上在 1985 年的时候已经被提出来,但并没有被很广泛的应用,而 Flink 则把这个算法发扬光大了。Spark 最近在实现 Continue streaming,Continue streaming 的目的是为了 降低它处理的延时,其也需要提供这种一致性的语义,最终采用 Chandy-Lamport 这个算法,说明 Chandy-Lamport 算法在业界得到了一定的肯定。
每个快照状态都会报告给 Flink 作业管理器(JobManager)的检查点协调器。在制作快照时, Flink 处理记录对齐,以避免因任何故障而重新处理相同的记录。这种对齐通常需要几毫秒。 但是对于某些要求高的应用程序,即使毫秒级的延迟也是不可接受的,我们可以选择在单个记录处理中选择低延迟。默认情况下,Flink 只处理每个记录一次。如果任何应用程序需要低延迟并且至少在一次交付就可以,我们可以关闭该触发器。这将跳过对齐并将改善延迟。
容错: checkpoint 是很重要的机制,因为 Flink 的检查点是通过分布式快照实现的,所以这里对快照和检查点不进行区分。
分布式数据流的轻量级异步快照
分布式有状态流处理支持在云中部署和执行大规模连续计算,同时针对低延迟和高吞吐量。这种模式最基本的挑战之一是在潜在的失败下提供处理保证。现有方法依赖于可用于故障恢复的周期性全局状态快照。这些方法有两个主要缺点。首先,它们经常会停止影响摄取的整 体计算。其次,他们急切地坚持传输中的所有记录以及操作状态,这导致比所需更快的快照。 在这项工作中,提出了异步屏障快照(ABS),这是一种适用于现代数据流执行引擎的轻量级算法,可最大限度地减少空间需求。ABS 仅保留非循环执行拓扑上的运算符状态,同时保持循环数据流的最小记录日志。我们在 Apache Flink 上实现了 ABS,这是一个支持有状态流处理的分布式分析引擎。我们的评估表明,我们的算法对执行没有太大的影响,保持线性可扩展性并且在频繁的快照中表现良好。
7.4Barriers Flink
分布式快照的核心概念之一是 barriers。这些 barriers 被注入数据流并与记录一起作为 数据流的一部分向下流动。barriers 永远不会超过记录,数据流严格有序。barriers 将数据流中的记录分为进入当前快照的记录和进入下一个快照的记录。每个 barriers 都带有快照的 ID, 并且 barriers 之前的记录都进入了该快照。barriers 不会中断流的流动,非常轻量级。来自不同快照的多个 barriers 可以同时在流中出现,这意味着可以同时发生各种快照。
流 barriers 是 Flink 快照的核心要素。它们被摄取到数据流中而不会影响流量。barriers 永远不会超过记录。他们将记录集合分为快照。每个 barriers 都带有唯一的 ID。下图显示了如何 将 barriers 注入到快照的数据流中:
基于上图:
-
出现一个 Barrier,在该 Barrier 之前出现的记录都属于该 Barrier 对应的 Snapshot,在该 Barrier 之后出现的记录属于下一个 Snapshot
-
来自不同 Snapshot 多个 Barrier 可能同时出现在数据流中,也就是说同一个时刻可能并发生成多个 Snapshot
-
当一个中间(Intermediate)Operator 接收到一个 Barrier 后,它会发送 Barrier 到属于该 Barrier 的 Snapshot 的数据流中,等到 Sink Operator 接收到该 Barrier 后会向 Checkpoint Coordinator 确认该 Snapshot,直到所有的 Sink Operator 都确认了该 Snapshot,才被认为 完成了该 Snapshot
-
一旦操作算子从一个输入流接收到快照 barriers n,它就不能处理来自该流的任何记录, 直到它从其他输入接收到 barriers n 为止。否则,它会搞混属于快照 n 的记录和属于快照 n + 1 的记录。
-
barriers n 所属的流暂时会被搁置。从这些流接收的记录不会被处理,而是放入输入缓冲区。
-
一旦从最后一个流接收到 barriers n,操作算子就会发出所有挂起的向后传送的记录, 然后自己发出快照 n 的 barriers。
-
之后,它恢复处理来自所有输入流的记录,在处理来自流的记录之前优先处理来自输 入缓冲区的记录
Barrier 分为两类:
BarrierBuffer 通过阻塞已接收到 barrier 的 input channel 并缓存被阻塞的 channel 中后续流入的数据流,直到所有的 barrier 都接收到或者不满足特定的检查点的条件后,才会释放这些被阻塞的 channel,这个机制被称之为–aligning(对齐)。正是这种机制来实现 EXACTLY_ONCE 的一致性(它将检查点中的数据精准得隔离开)。
而 BarrierTrack 的实现就要简单地多,它仅仅是对数据流中的 barrier 进行跟踪,但是数据流 中的元素 buffer 是直接放行的。这种情况会导致同一个检查点中可能会预先混入后续检查 点的元素,从而只能提供 AT_LEAST_ONCE 的一致性。
Snapshot 并不仅仅是对数据流做了一个状态的 Checkpoint,它也包含了一个 Operator 内部所 持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。
barrier 作用:它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进 行分组,并沿着数据流的方向向前推进
具体排列过程如下:
-
Operator 从一个 incoming Stream 接收到 Snapshot Barrier n,然后暂停处理,直到其它的 incoming Stream 的 Barrier n(否则属于 2 个 Snapshot 的记录就混在一起了)到达该 Operator 接收到 Barrier n 的 Stream被临时搁置,来自这些 Stream 的记录不会被处理,而 是被放在一个 Buffer 中。
-
一旦最后一个 Stream 接收到 Barrier n,Operator 会 emit 所有暂存在 Buffer 中的记录, 然后向 Checkpoint Coordinator 发送 Snapshot n,继续处理来自多个 Stream 的记录
-
基于 Stream Aligning 操作能够实现 Exactly Once 语义,但是也会给流处理应用带来延迟,因为为了排列对齐 Barrier,会暂时缓存一部分 Stream 的记录到 Buffer 中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐 Barrier 的一个 Stream 为处理 Buffer 中缓存记录的时刻点。在 Flink 中,提供了一个开关,选择是否使用 Stream Aligning, 如果关掉则 Exactly Once 会变成 At least once。
8.Flink Time
8.1Time 解读
Event Time 也就是事件发生的时间,事件的发生时间。这里举个例子,我们产生日志的时间,这个应该清楚的,日志的时间戳就是发生时间。 在 Flink 的流式处理中,绝大部分的业务都会使用 EventTime,一般只在 EventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。
Ingestion Time 也就是提取时间,也就是进入 Flink 计算程序的时间,这个时候数据已经发送给窗口,也就是发送给窗口的时间,也就是程序处理计算的时间。
Processing Time 也就是处理时间,我们看到了这个已经进入 Flink 程序,也就是我们读取数据源时间,也就是日志到达 Flink 的时间,但是这个时间是本地机器的时间。
详细解释:
-
事件时间:事件时间是每个事件在其生产设备上发生的时间。
此时间通常在进入 Flink 之前嵌入记录中,并且可以从每个记录中提取该事件时间戳。在 事件时间,时间的进展取决于数据,而不是任何时钟。事件时间程序必须指定如何生成 事件时间水位线,这是表示事件时间进度的机制。在一个理想的情况下,事件时间处理 将产生完全一致和确定的结果,无论事件何时到达或其排序。但是,除非事件已知按顺 序到达(按时间戳),否则事件时间处理会在等待无序事件时产生一些延迟。由于只能等 待一段有限的时间,因此限制了确定性事件时间应用程序的可能性。假设所有数据都已 到达,事件时间操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时 也会产生正确且一致的结果。例如,每小时事件时间窗口将包含带有落入该小时的事件 时间戳的所有记录,无论它们到达的顺序如何,或者何时处理它们。注意,有时当事件 时间程序实时处理实时数据时,它们将使用一些处理时间操作,以确保它们及时进行。
-
提取时间:提取时间是事件进入 Flink 的时间。
在源 operator 处,每个记录将源的当前时间作为时间戳,并且基于时间的操作(如时间 窗口)引用该时间戳。提取时间在概念上位于事件时间和处理时间之间。与处理时间相 比,它稍早一些,但可以提供更可预测的结果。因为提取时间使用稳定的时间戳(在源 处分配一次),所以对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个 窗口 operate 可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。与事 件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生 成水位线。在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水印 生成功能。
-
处理时间:处理时间是指执行相应操作的机器的系统时间。
当流程序在处理时间运行时,所有基于时间的操作(如时间窗口)将使用相应 operator 的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时之间到达特定 operator 的所有记录。例如,如果应用程序在上午 9:15 开始运行,则第一个每小时处理时间窗口将包括在上午 9:15 到上午 10:00 之间处理的事件,下一个窗口将包括在上午 10:00 到 11:00 之间处理的事件。处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确 定性,因为它容易受到记录到达系统的速度(例如从消息队列)到记录在系统内的 operator 之间流动的速度的影响,和停电(调度或其他)。
8.2Watermark
流处理从事件产生是先流经 source,再到 operator,中间是有一个过程和时间的, 虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的。
那么此时出现一个问题,一旦出现乱序,如果只根据 EventTime 决定 window 的运行,我们 不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个 特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark。 Flink Watermark 不容易理解,而且和 window 混合,就容易造成混乱了,Watermark 直接翻 译为“水位线”。首先白话 Watermark 的作用,首先我们都知道流式数据,但是流并不是那么顺畅的,比如网络卡等原因,造成数据流乱序,这时候该如何解决,于是产生了 Watermarks。
Watermark 是一种衡量 EventTime 进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的 Watermark。
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。
Watermark 本质上是一个带有时间戳的特殊 event,当 Flink 中的运算符 Operator 接收到 Watermark 时,它明白(假设)它不会看到比该时间戳更早的消息。数据流中的 Watermark用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime – t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。
下图显示了带有(逻辑)时间戳和内联水印的事件流。在本例中,事件是按顺序排列的(相对于它们的时间戳),这意味着水印只是流中的周期性标记。
Watermark 对于无序流是至关重要的,如下所示,其中事件不按时间戳排序。通常,Watermark 是一种声明,通过流中的该点,到达某个时间戳的所有事件都应该到达。一旦 Watermark 到达 operator,operator 就可以将其内部事件时间提前到 Watermark 的值。
针对进入窗口的每条数据,计算当前所有达到窗口的数据的最大 eventTime,将这个 eventTime 和延迟时间(Watermark)做减法,差值如果大于某一个窗口的的结束时间,那么该窗口就进行算子操作
有序流的 Watermarker 如下图所示:(Watermark 设置为 0)
乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)
当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime – 延迟时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。 由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s5s,窗口2是6s10s, 那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时 的 Watermark 恰好触发窗口 2。
8.3并发 Watermark Watermark
是在源函数处生成的,或直接在源函数之后生成的。源函数的每个并行子任务通常独立生成其 Watermark。这些 Watermark 定义了特定并行源处的事件时间。当 Watermark 通过流程序时,它们会提前到达 Operator 的事件时间。当一个 Operator 提前它的事件时间时,它为它的后续操作符在下游生成一个新的 Watermark。一些 Operator 消耗多个输入流; 例如,一个 union,或者跟随 keyBy(…)或 partition(…)函数的 Operator。这样的 Operator 当前事件时间是其输入流的事件时间的最小值。由于其输入流更新其事件时间,因此 Operator 也是如此。
Flink使用 Watermark 的四个认识
在下面的示例中,我们有一个带时间戳的事件流,这些事件在某种程度上不按顺序到达。 显示的数字是指示实际发生这些事件的时间戳。到达的第一个事件发生在时间 4,然后是之前发生的事件,时间 2,依此类推:
注意,这是事件时间(Event Time)处理的示例,这意味着时间戳反映事件发生的时间,而不是事件被处理的时间。事件时处理是一个抽象,它使得创建流式应用程序成为可能,无论它们是在处理实时数据还是在重新处理历史数据,这些应用程序的行为都是一致的。
现在假设我们正在尝试创建一个流分类器。这意味着应用程序在流到达时处理每个事件,并 emit 包含相同事件但按其时间戳排序的新流。
-
我们的流分类器看到的第一个元素是 4,但我们不能立即将它作为排序流的第一个元素释放。 它可能已经故障,而早期的事件可能还会到来。 实际上,我们可以从这个流的未来中获得一些内容,我们可以看到我们的流分类器应该至少等到 2 到达之后再产生任何结果。有些缓冲,有些延迟是必要的。
-
如果出错了(故障),可能会永远等待。 首先,应用程序从第 4 个时间开始看到一个事件,然后从第 2 个时间开始看到一个事件。从早于 2 的时间开始的事件是否会到达?答案是不确定的。我们可以永远等待,永远不会看到 1。 最终,我们必须做决定,并将 2 作为排序流的开始。
-
我们需要的是某种策略,它定义了对于任何给定的时间戳事件何时停止等待早期事件的到来。这正是 Watermarks 的作用 – 它们定义何时停止等待早期事件。Flink 中的事件时间处理取决于特殊的带时间戳的元素,称为 watermarks,由数据源或 watermarks 生成器插入到流中。 具有时间戳 t 的 watermarks 可以被理解为断言(assertion )所有具有时间戳<t 的事件已经(具有合理的概率)已经到达。什么时候我们的流分类器应该停止等待,
-
我们可以设想不同的策略来决定如何生成 watermarks。 我们知道每个事件都会在延迟一段时间后到达并且这些延迟会有所不同,因此有些事件会比其他事件延迟更多。一种简单的方法是假设这些延迟受到一些最大延迟的限制。Flink 将此策略称为有界无序 watermarks。很容易想象出更复杂的 watermarks 方法,但对于许多应用来说,固定延迟效果还不错。如果要构建像流分类器这样的应用程序,Flink 的 ProcessFunction 是正确的构建块。它提供对事件时间(event-time)计时器的访问(即, 基于 watermarks 到达而触发的回调),并具有用于管理缓冲事件所需状态的挂钩,直到轮到它们被发送到下游。
9.Flink 内部原理
9.1容错机制
Flink 基于 Checkpoint 机制实现容错,它的原理是不断地生成分布式 Streaming 数据流 Snapshot。在流处理失败时,通过这些 Snapshot 可以恢复数据流处理。理解 Flink 的容错机 制,首先需要了解一下 Barrier 这个概念:
Stream Barrier 是 Flink 分布式 Snapshotting 中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个 Barrier 会携带一个 Snapshot ID,属于该 Snapshot 的记录会被推向该 Barrier 的前方。因为 Barrier 非常轻量,所以并不会中断数据流。带有 Barrier 的数据流,如下图所示:
说明:
-
出现一个 Barrier,在该 Barrier 之前出现的记录都属于该 Barrier 对应的 Snapshot,在该 Barrier 之后出现的记录属于下一个 Snapshot
-
来自不同 Snapshot 多个 Barrier 可能同时出现在数据流中,也就是说同一个时刻可能并发生成多个 Snapshot
-
当一个中间(Intermediate)Operator 接收到一个 Barrier 后,它会发送 Barrier 到属于该 Barrier 的 Snapshot 的数据流中,等到 Sink Operator 接收到该 Barrier 后会向 Checkpoint Coordinator 确认该 Snapshot,直到所有的 Sink Operator 都确认了该 Snapshot,才被认为完 成了该 Snapshot
这里还需要强调的是,Snapshot 并不仅仅是对数据流做了一个状态的 Checkpoint,它也包含了一个 Operator 内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。也就是说,如果一个 Operator 包含任何形式的状态,这种状态必须是 Snapshot 的一部分。
Operator 的状态包含两种:一种是系统状态,一个 Operator 进行计算处理的时候需要对数据进行缓冲,所以数据缓冲区的状态是与 Operator 相关联的,以窗口操作的缓冲区为例, Flink 系统会收集或聚合记录数据并放到缓冲区中,直到该缓冲区中的数据被处理完成;另一种是用户自定义状态(状态可以通过转换函数进行创建和修改),它可以是函数中的 Java 对象这样的简单变量,也可以是与函数相关的 Key/Value 状态。
对于具有轻微状态的 Streaming 应用,会生成非常轻量的 Snapshot 而且非常频繁,但并不会 影响数据流处理性能。Streaming应用的状态会被存储到一个可配置的存储系统中,例如HDFS。 在一个 Checkpoint 执行过程中,存储的状态信息及其交互过程,如下图所示:
在 Checkpoint 过程中,还有一个比较重要的操作–Stream Aligning。当 Operator 接收到多个 输入的数据流时,需要在 Snapshot Barrier 中对数据流进行排列对齐,如下图所示:
具体排列过程如下:
-
Operator 从一个 incoming Stream 接收到 Snapshot Barrier n,然后暂停处理,直到其它的 incoming Stream 的 Barrier n(否则属于 2 个 Snapshot 的记录就混在一起了)到达该 Operator
-
接收到 Barrier n 的 Stream 被临时搁置,来自这些 Stream 的记录不会被处理,而是被放在一个 Buffer 中
-
一旦最后一个 Stream 接收到 Barrier n,Operator 会 emit 所有暂存在 Buffer 中的记录,然 后向 Checkpoint Coordinator 发送 Snapshot n
-
继续处理来自多个 Stream 的记录
基于 Stream Aligning 操作能够实现 Exactly Once 语义,但是也会给流处理应用带来延迟,因为为了排列对齐 Barrier,会暂时缓存一部分 Stream 的记录到 Buffer 中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐 Barrier 的一个 Stream 为处理 Buffer 中缓存记录的时刻点。在 Flink 中,提供了一个开关,选择是否使用 Stream Aligning,如果关掉则 Exactly Once 会变成 At least once。
9.2调度机制
在 JobManager 端,会接收到 Client 提交的 JobGraph 形式的 Flink Job,JobManager 会将一个 JobGraph 转换映射为一个 ExecutionGraph,如下图所示:
通过上图可以看出:
JobGraph 是一个 Job 的用户逻辑视图表示,将一个用户要对数据流进行的处理表示为单个 DAG 图(对应于 JobGraph),DAG 图由顶点(JobVertex)和中间结果集(IntermediateDataSet)组成,其中 JobVertex 表示了对数据流进行的转换操作,比如 map、flatMap、filter、keyBy 等操作,而 IntermediateDataSet 是由上游的 JobVertex 所生成,同时作为下游的 JobVertex 的输入。
而 ExecutionGraph 是 JobGraph 的并行表示,也就是实际 JobManager 调度一个 Job 在 TaskManager 上运行的逻辑视图,它也是一个 DAG 图,是由 ExecutionJobVertex、 IntermediateResult(或 IntermediateResultPartition)组成,ExecutionJobVertex 实际对应于 JobGraph 图中的 JobVertex,只不过在 ExecutionJobVertex 内部是一种并行表示,由多个并行 的 ExecutionVertex 所组成。另外,这里还有一个重要的概念,就是 Execution,它是一个 ExecutionVertex 的一次运行 Attempt,也就是说,一个 ExecutionVertex 可能对应多个运行状态的 Execution,比如,一个 ExecutionVertex 运行产生了一个失败的 Execution,然后还会创建一个新的 Execution 来运行,这时就对应这个 2 次运行 Attempt。每个 Execution 通过 ExecutionAttemptID 来唯一标识,在 TaskManager 和 JobManager 之间进行 Task 状态的交换 都是通过 ExecutionAttemptID 来实现的。
下面看一下,在物理上进行调度,基于资源的分配与使用的一个例子,来自官网,如下图所示:
说明如下:
-
左上子图:有 2 个 TaskManager,每个 TaskManager 有 3 个 Task Slot。一个 Flink Job,逻辑上包含了 1 个 data source、1 个 MapFunction、1 个 ReduceFunction,对应一个 JobGraph
-
左下子图:用户提交的 Flink Job 对各个 Operator 进行的配置——data source 的并行度设 置为 4,MapFunction 的并行度也为4,ReduceFunction 的并行度为 3,在 JobManager 端对 应于 ExecutionGraph
-
右上子图:TaskManager 1 上,有 2 个并行的 ExecutionVertex 组成的 DAG 图,它们各占 用一个 Task Slot
-
右下子图:TaskManager 2 上,也有 2 个并行的 ExecutionVertex 组成的 DAG 图,它们也 各占用一个 Task Slot
-
在 2 个 TaskManager 上运行的 4 个 Execution 是并行执行的
9.3迭代机制
机器学习和图计算应用,都会使用到迭代计算,Flink 通过在迭代 Operator 中定义 Step 函数来实现迭代算法,这种迭代算法包括 Iterate 和 Delta Iterate 两种类型,在实现上它们反复地在当前迭代状态上调用 Step 函数,直到满足给定的条件才会停止迭代。下面,对 Iterate 和 Delta Iterate 两种类型的迭代算法原理进行说明:Iterate
Iterate Operator 是一种简单的迭代形式:每一轮迭代,Step 函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为 Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果,具体执行流程如下图 所示:
Step 函数在每一轮迭代中都会被执行,它可以是由 map、reduce、join 等 Operator 组成的数据流。下面通过官网给出的一个例子来说明 Iterate Operator,非常简单直观,如下图所示:
上面迭代过程中,输入数据为 1 到 5 的数字,Step 函数就是一个简单的 map 函数,会对每个输入的数字进行加 1 处理,而 Next Partial Solution 对应于经过 map 函数处理后的结果, 比如第一轮迭代,对输入的数字 1 加 1 后结果为 2,对输入的数字 2 加 1 后结果为 3,直到 对输入数字 5 加 1 后结果为变为 6,这些新生成结果数字 2~6 会作为第二轮迭代的输入。迭代终止条件为进行 10 轮迭代,则最终的结果为 11~15。
Delta Iterate Operator 实现了增量迭代,它的实现原理如下图所示:
基于 Delta Iterate Operator 实现增量迭代,它有 2 个输入,其中一个是初始 Workset,表示输入待处理的增量 Stream 数据,另一个是初始 Solution Set,它是经过 Stream 方向上 Operator 处理过的结果。第一轮迭代会将 Step 函数作用在初始 Workset 上,得到的计算结果 Workset 作为下一轮迭代的输入,同时还要增量更新初始 Solution Set。如果反复迭代知道满足迭代 终止条件,最后会根据 Solution Set 的结果,输出最终迭代结果。
比如,我们现在已知一个 Solution 集合中保存的是,已有的商品分类大类中购买量最多的商 品,而 Workset 输入的是来自线上实时交易中最新达成购买的商品的人数,经过计算会生成新的商品分类大类中商品购买量最多的结果,如果某些大类中商品购买量突然增长,它需要更新 Solution Set 中的结果(原来购买量最多的商品,经过增量迭代计算,可能已经不是最多),最后会输出最终商品分类大类中购买量最多的商品结果集合。
9.4BackPressure 监控
Backpressure 在流式计算系统中会比较受到关注,因为在一个 Stream 上进行处理的多个 Operator 之间,它们处理速度和方式可能非常不同,所以就存在上游 Operator 如果处理速度过快,下游 Operator 处可能机会堆积 Stream 记录,严重会造成处理延迟或下游 Operator 负载过重而崩溃(有些系统可能会丢失数据)。因此,对下游 Operator 处理速度跟不上的情 况,如果下游 Operator 能够将自己处理状态传播给上游 Operator,使得上游 Operator 处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。 Flink Web 界面上提供了对运行 Job 的 Backpressure 行为的监控,它通过使用 Sampling 线程对正在运行的 Task 进行堆栈跟踪采样来实现,具体实现方式如下图所示:
JobManager 会反复调用一个 Job 的 Task 运行所在线程的 Thread.getStackTrace(),默认情况下, JobManager 会每间隔 50ms 触发对一个 Job 的每个 Task 依次进行 100 次堆栈跟踪调用,根据调用调用结果来确定 Backpressure,Flink 是通过计算得到一个比值(Radio)来确定当前运行 Job 的 Backpressure 状态。在 Web 界面上可以看到这个 Radio 值,它表示在一个内部方法调用中阻塞(Stuck)的堆栈跟踪次数,例如,radio=0.01,表示 100 次中仅有 1 次方法调用阻塞。Flink 目前定义了如下 Backpressure 状态:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
另外,Flink 还提供了 3 个参数来配置 Backpressure 监控行为:
参数名称 | 默认值 | 说明 |
---|---|---|
jobmanager.web.backpressure.refresh-interval | 60000 | 默认 1 分钟,表示采样统 计结果刷新时间间隔 |
jobmanager.web.backpressure.num-samples | 100 | 评估 Backpressure 状态, 所使用的堆栈跟踪调用次 数 |
jobmanager.web.backpressure.delay-between-samples | 50 | 默认 50 毫秒,表示对一个 Job 的每个 Task 依次调用 的时间间隔 |
通过上面个定义的 Backpressure 状态,以及调整相应的参数,可以确定当前运行的 Job 的状态是否正常,并且保证不影响 JobManager 提供服务
今天的文章一文教你弄懂Flink核心功能和原理分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/29931.html