Apache Flink:特性、概念、组件栈、架构及原理分析

2020/01/28 Knowledge

参考来源

目录

Apache Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个 Flink 运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型:流处理一般需要支持低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有 MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有 Samza、Storm。

Flink 在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个 Flink 运行时(Flink Runtime),分别提供了流处理和批处理 API,而这两种 API 也是实现上层面向流处理、批处理类型应用框架的基础。

基本特性

流处理特性

  • 支持高吞吐、低延迟、高性能的流处理
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算的 Exactly-once 语义
  • 支持高度灵活的窗口(Window)操作,支持基于 time、count、session,以及 data-driven 的窗口操作
  • 支持具有 Backpressure 功能的持续流模型
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 一个运行时同时支持 Batch on Streaming 处理和 Streaming 处理
  • Flink 在 JVM 内部实现了自己的内存管理
  • 支持迭代计算
  • 支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果有必要进行缓存

API 分类

  • 对 Streaming 数据类应用,提供 DataStream API
  • 对批处理类应用,提供 DataSet API(支持 Java/Scala)

Libraries 支持

  • 支持机器学习(FlinkML)
  • 支持图分析(Gelly)
  • 支持关系数据处理(Table)
  • 支持复杂事件处理(CEP)

整合支持

  • 支持 Flink on YARN
  • 支持 HDFS
  • 支持来自 Kafka 的输入数据
  • 支持 Apache HBase
  • 支持 Hadoop程序
  • 支持 Tachyon
  • 支持 ElasticSearch
  • 支持 RabbitMQ
  • 支持 Apache Storm
  • 支持 S3
  • 支持 XtreemFS

基本概念

编程模型和数据流

用户实现的 Flink 程序是由 Stream 和 Transformation 这两个基本构建块组成,其中 Stream 是一个中间结果数据,而 Transformation 是一个操作,它对一个或多个输入 Stream 进行计算处理,输出一个或多个结果 Stream。当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow。一个 Streaming Dataflow 是由一组 Stream 和 Transformation Operator 组成,它类似于一个 DAG 图(有向无环图),在启动的时候从一个或多个 Source Operator 开始,经过多个 Transformation Operator,结束于一个或多个 Sink Operator。

所有的操作算子都是 Operator。

编程模型

上图中,FlinkKafkaConsumer 是一个 Source Operator,map、keyBy、timeWindow、apply 是 Transformation Operator,RollingSink 是一个 Sink Operator。

平行数据流

在 Flink 中,程序天生是并行和分布式的:一个 Stream 可以被分成多个 Stream 分区(Stream Partitions),一个 Operator 可以被分成多个 Operator Subtask,每一个 Operator Subtask 是在不同的线程中独立执行的。一个 Operator 的并行度,等于 Operator Subtask 的个数,一个 Stream 的并行度总是等于生成它的 Operator 的并行度。 有关 Parallel Dataflow 的实例,如下图所示:

并行

上图 Streaming Dataflow 的并行视图中,展现了在两个 Operator 之间的 Stream 的两种模式:

  • One-to-one 模式

    比如从 Source[1] 到 map()[1],它保持了 Source 的分区特性(Partitioning)和分区内元素处理的有序性,也就是说 map()[1] 的 Subtask 看到数据流中记录的顺序,与 Source[1] 中看到的记录顺序是一致的。

  • Redistributing 模式

    这种模式改变了输入数据流的分区,比如从 map()[1]、map()[2] 到 keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的 Subtask 向下游的多个不同的 Subtask 发送数据,改变了数据流的分区,这与实际应用所选择的 Operator 有关系。

    另外,Source Operator 对应 2 个 Subtask,所以并行度为 2,而 Sink Operator 的 Subtask 只有 1 个,故而并行度为 1。

Task & Operator Chain

在 Flink 分布式执行环境中,会将多个 Operator Subtask 串起来组成一个 Operator Chain,实际上就是一个执行链,每个执行链会在 TaskManager 上一个独立的线程中执行。

并行

上图中上半部分表示的是一个 Operator Chain,多个 Operator 通过 Stream 连接,而每个 Operator 在运行时对应一个 Task;图中下半部分是上半部分的一个并行版本,也就是对每一个 Task 都并行化为多个 Subtask。(在不同机器上运行)

Time & Window

Flink 支持基于时间窗口操作,也支持基于数据的窗口操作,如下图所示:

windows

上图中,基于时间的窗口操作,在每个相同的时间间隔对 Stream 中的记录进行处理,通常各个时间间隔内的窗口操作处理的记录数不固定;而基于数据驱动的窗口操作,可以在 Stream 中选择固定数量的记录作为一个窗口,对该窗口中的记录进行处理。 有关窗口操作的不同类型,可以分为如下几种:倾斜窗口(Tumbling Windows,记录没有重叠)、滑动窗口(Slide Windows,记录有重叠)、会话窗口(Session Windows),具体可以查阅相关资料。 在处理 Stream 中的记录时,记录中通常会包含各种典型的时间字段,Flink 支持多种时间的处理,如下图所示:

Time

上图描述了在基于 Flink 的流处理系统中,各种不同的时间所处的位置和含义,其中:

  • Event Time 表示事件创建时间
  • Ingestion Time 表示事件进入到 Flink Dataflow 的时间
  • Processing Time 表示某个 Operator 对事件进行处理事的本地系统时间(是在 TaskManager 节点上)。

这里,谈一下基于 Event Time 进行处理的问题,通常根据 Event Time 会给整个 Streaming 应用带来一定的延迟性,因为在一个基于事件的处理系统中,进入系统的事件可能会基于 Event Time 而发生乱序现象,比如事件来源于外部的多个系统,为了增强事件处理吞吐量会将输入的多个 Stream 进行自然分区,每个 Stream 分区内部有序,但是要保证全局有序必须同时兼顾多个 Stream 分区的处理,设置一定的时间窗口进行暂存数据,当多个 Stream 分区基于 Event Time 排列对齐后才能进行延迟处理。所以,设置的暂存数据记录的时间窗口越长,处理性能越差,甚至严重影响 Stream 处理的实时性。

watermark

    val watermark = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 10000L//最大允许的乱序时间是10s

      var a : Watermark = null

      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      override def getCurrentWatermark: Watermark = {
        a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        a
      }

      override def extractTimestamp(t: (String,Long), l: Long): Long = {
        val timestamp = t._2
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        println("timestamp:" + t._1 +","+ t._2 + "|" +format.format(t._2) +","+  currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ","+ a.toString)
        timestamp
      }
    })

上面代码 watermark 是收到时间减 10 秒。watermark 到达的时间代表窗口开始处理的最后时间。例如窗口大小是 3 秒,到的时间是 10:15 分,那开始处理的窗口就是 10:02 分到 10:05 分。

Watermark

在下文中的例子中,我们有一个带有时间戳的事件流,但是由于某种原因它们并不是按顺序到达的。图中的数字代表事件发生的时间戳。第一个到达的事件发生在时间 4,然后它后面跟着的是发生在更早时间(时间 2)的事件,以此类推:

  1. 数据有延迟:

    数据流中的第一个元素的时间是 4,但是我们不能直接将它作为排序后数据流的第一个元素并输出它。因为数据是乱序到达的,也许有一个更早发生的数据还没有到达。事实上,我们能预见一些这个流的未来,也就是我们的排序算子至少要等到 2 这条数据的到达再输出结果。

  2. watermark 的作用,定义了何时不再等待更早的数据:

    如果我们做错了,我们可能会永远等待下去。首先,我们的应用程序从看到时间 4 的数据,然后看到时间 2 的数据。是否会有一个比时间 2 更早的数据到达呢?也许会,也许不会。我们可以一直等下去,但可能永远看不到 1 。

    我们需要的是某种策略,它定义了对于任何带时间戳的事件流,何时停止等待更早数据的到来。

    Flink 中的事件时间处理依赖于一种特殊的带时间戳的元素,成为 watermark,它们会由数据源或是 watermark 生成器插入数据流中。具有时间戳 t 的 watermark 可以被理解为断言了所有时间戳小于或等于 t 的事件都(在某种合理的概率上)已经到达了。

    Watermark是一个非常重要概念,我将尽力给你一个简短的概述。如果你有兴趣了解更多信息,你可以从Google中观看这个演讲,还可以从dataArtisans那里阅读此博客。 Watermark本质上是一个时间戳。当Flink中的算子(operator)接收到Watermark时,它明白它不会再看到比该时间戳更早的消息。因此Watermark也可以被认为是告诉Flink在EventTime中多远的一种方式。

    Watermark 看作是告诉 Flink 一个消息可能延迟多少的方式。如果我们将 Watermark 设置为当前系统时间。因此,期望消息没有任何的延迟。现在我们将 Watermark 设置为当前时间减去5秒,这就告诉 Flink 我们期望消息最多延迟 5 秒钟,这是因为每个窗口仅在 Watermark 通过时被评估。由于我们的 Watermark 是当前时间减去 5 秒,所以第一个窗口,例如:[5s-15s] 将会在第 20 秒被评估。类似地,窗口 [10s-20s] 将会在第 25 秒进行评估,依此类推。

基本架构

Flink 系统的架构与 Spark 类似,是一个基于 Master-Slave 风格的架构,如下图所示:

基本架构

Flink 集群启动时,会启动一个 JobManager 进程、至少一个 TaskManager 进程。在 Local 模式下,会在同一个JVM内部启动一个 JobManager 进程和 TaskManager 进程。当 Flink 程序提交后,会创建一个 Client 来进行预处理,并转换为一个并行数据流,这是对应着一个 Flink Job,从而可以被 JobManager 和 TaskManager 执行。在实现上,Flink 基于 Actor 实现了 JobManager 和 TaskManager,所以 JobManager 与 TaskManager 之间的信息交换,都是通过事件的方式来进行处理。

如上图所示,Flink 系统主要包含如下3个主要的进程:

  • JobManager

    JobManager 是 Flink 系统的协调者,它负责接收 Flink Job,调度组成 Job 的多个 Task 的执行。同时,JobManager 还负责收集 Job 的状态信息,并管理 Flink 集群中从节点 TaskManager。JobManager 所负责的各项管理功能。

  • TaskManager

    TaskManager 也是一个 Actor,它是实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task。每个 TaskManager 负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向 JobManager 汇报。TaskManager端可以分成两个阶段:

    • 注册阶段

      TaskManager 会向 JobManager 注册,发送 RegisterTaskManager 消息,等待 JobManager 返回 AcknowledgeRegistration,然后 TaskManager 就可以进行初始化过程。

    • 可操作阶段

      该阶段 TaskManager 可以接收并处理与 Task 有关的消息,如 SubmitTask、CancelTask、FailTask。如果 TaskManager 无法连接到 JobManager,这时 TaskManager 就失去了与 JobManager 的联系,会自动进入“注册阶段”,只有完成注册才能继续处理 Task 相关的消息。

  • Client

    当用户提交一个 Flink 程序时,会首先创建一个 Client,该 Client 首先会对用户提交的 Flink 程序进行预处理,并提交到 Flink 集群中处理,所以 Client 需要从用户提交的 Flink 程序配置中获取 JobManager 的地址,并建立到 JobManager 的连接,将 Flink Job 提交给 JobManager。Client 会将用户提交的 Flink 程序组装一个 JobGraph, 并且是以 JobGraph 的形式提交的。一个 JobGraph 是一个 Flink Dataflow,它由多个 JobVertex 组成的 DAG。其中,一个 JobGraph 包含了一个 Flink 程序的如下信息:JobID、Job 名称、配置信息、一组 JobVertex 等。

Slot

Flink 集群是由 JobManager(JM)、TaskManager(TM)两大组件组成的,每个 JM/TM 都是运行在一个独立的 JVM 进程中。JM 相当于 Master,是集群的管理节点,TM 相当于 Worker,是集群的工作节点,每个 TM 最少持有 1 个 Slot,Slot 是 Flink 执行 Job 时的最小资源分配单位,在 Slot 中运行着具体的 Task 任务。

对 TM 而言:它占用着一定数量的 CPU 和 Memory 资源,在不考虑 Slot Sharing(下文详述)的情况下,一个 Slot 内运行着一个 SubTask(Task 实现 Runable,SubTask 是一个执行 Task 的具体实例),所以官方建议 taskmanager.numberOfTaskSlots 配置的 Slot 数量和 CPU 相等或成比例。

对 Job 而言:一个 Job 所需的 Slot 数量大于等于 Operator 配置的最大 Parallelism 数(因为每个 subtask 都需要一个 slot),在保持所有 Operator 的 slotSharingGroup 一致的前提下 Job 所需的 Slot 数量与 Job 中 Operator 配置的最大 Parallelism 相等。

基本架构

还是这张架构图,图中有两个 TM,各自有 3 个 Slot,2 个 Slot 内有 Task 在执行,1 个 Slot 空闲。若这两个 TM 在不同 Container 或容器上,则其占用的资源是互相隔离的。在 TM 内多个 Slot 间是各自拥有 1/3 TM 的 Memory,共享 TM 的 CPU、网络(Tcp:ZK、 Akka、Netty 服务等)、心跳信息、Flink 结构化的数据集等。

Task Slot 内部结构图

Slot内部

Task Slot 的内部结构图,Slot 内运行着具体的 Task,它是在线程中执行的 Runable 对象(每个虚线框代表一个线程),这些 Task 实例在源码中对应的类是 org.apache.flink.runtime.taskmanager.Task。每个 Task 都是由一组 Operators Chaining 在一起的工作集合,Flink Job 的执行过程可看作一张 DAG 图,Task 是 DAG 图上的顶点(Vertex),顶点之间通过数据传递方式相互链接构成整个 Job 的 Execution Graph。

进程线程

TaskManager 运行的是进程,每个 Slot 运行的是线程。

内部原理

容错机制

Flink 基于 Checkpoint 机制实现容错,它的原理是不断地生成分布式 Streaming 数据流 Snapshot。在流处理失败时,通过这些 Snapshot 可以恢复数据流处理。理解 Flink 的容错机制,首先需要了解一下 Barrier 这个概念:

Stream Barrier 是 Flink 分布式 Snapshotting 中的核心元素,它会作为数据流的记录被同等看待,被插入到数据流中,将数据流中记录的进行分组,并沿着数据流的方向向前推进。每个 Barrier 会携带一个 Snapshot ID,属于该 Snapshot 的记录会被推向该 Barrier 的前方。因为 Barrier 非常轻量,所以并不会中断数据流。带有 Barrier 的数据流,如下图所示:

Barriers

基于上图,我们通过如下要点来说明:

  • 出现一个 Barrier,在该 Barrier 之前出现的记录都属于该 Barrier 对应的 Snapshot,在该 Barrier 之后出现的记录属于下一个 Snapshot
  • 来自不同 Snapshot 多个 Barrier 可能同时出现在数据流中,也就是说同一个时刻可能并发生成多个 Snapshot
  • 当一个中间(Intermediate)Operator 接收到一个 Barrier 后,它会发送 Barrier 到属于该 Barrier 的 Snapshot 的数据流中,等到 Sink Operator 接收到该 Barrier 后会向 Checkpoint Coordinator 确认该 Snapshot,直到所有的 Sink Operator 都确认了该 Snapshot,才被认为完成了该 Snapshot

这里还需要强调的是,Snapshot 并不仅仅是对数据流做了一个状态的 Checkpoint,它也包含了一个 Operator 内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。也就是说,如果一个 Operator 包含任何形式的状态,这种状态必须是 Snapshot 的一部分。

Operator 的状态包含两种:一种是系统状态,一个 Operator 进行计算处理的时候需要对数据进行缓冲,所以数据缓冲区的状态是与 Operator 相关联的,以窗口操作的缓冲区为例,Flink 系统会收集或聚合记录数据并放到缓冲区中,直到该缓冲区中的数据被处理完成;另一种是用户自定义状态(状态可以通过转换函数进行创建和修改),它可以是函数中的 Java 对象这样的简单变量,也可以是与函数相关的 Key/Value 状态。

对于具有轻微状态的 Streaming 应用,会生成非常轻量的 Snapshot 而且非常频繁,但并不会影响数据流处理性能。Streaming 应用的状态会被存储到一个可配置的存储系统中,例如 HDFS。在一个 Checkpoint 执行过程中,存储的状态信息及其交互过程,如下图所示:

Checkpoint流程

在 Checkpoint 过程中,还有一个比较重要的操作——Stream Aligning。当 Operator 接收到多个输入的数据流时,需要在 Snapshot Barrier 中对数据流进行排列对齐,如下图所示:

对齐

具体排列过程如下:

  1. Operator从一个 incoming Stream接收到 Snapshot Barrier n,然后暂停处理,直到其它的 incoming Stream的 Barrier n(否则属于 2 个 Snapshot 的记录就混在一起了)到达该 Operator
  2. 接收到 Barrier n 的 Stream 被临时搁置,来自这些 Stream 的记录不会被处理,而是被放在一个 Buffer 中
  3. 一旦最后一个 Stream 接收到 Barrier n,Operator 会 emit 所有暂存在 Buffer 中的记录,然后向 Checkpoint Coordinator 发送 Snapshot n
  4. 继续处理来自多个 Stream 的记录

基于 Stream Aligning 操作能够实现 Exactly Once 语义,但是也会给流处理应用带来延迟,因为为了排列对齐 Barrier,会暂时缓存一部分 Stream 的记录到 Buffer 中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐 Barrier 的一个 Stream 为处理 Buffer 中缓存记录的时刻点。在 Flink 中,提供了一个开关,选择是否使用 Stream Aligning,如果关掉则 Exactly Once 会变成 At least once。

调度机制

在 JobManager 端,会接收到 Client 提交的 JobGraph 形式的 Flink Job,JobManager 会将一个 JobGraph 转换映射为一个 ExecutionGraph,如下图所示:

调度示例

通过上图可以看出:

JobGraph 是一个 Job 的用户逻辑视图表示,将一个用户要对数据流进行的处理表示为单个 DAG 图(对应于JobGraph),DAG 图由顶点(JobVertex)和中间结果集(IntermediateDataSet)组成,其中 JobVertex 表示了对数据流进行的转换操作,比如 map、flatMap、filter、keyBy等操作,而 IntermediateDataSet 是由上游的 JobVertex 所生成,同时作为下游的 JobVertex 的输入。

而 ExecutionGraph 是 JobGraph 的并行表示,也就是实际 JobManager 调度一个 Job 在 TaskManager 上运行的逻辑视图,它也是一个 DAG 图,是由 ExecutionJobVertex、IntermediateResult(或 IntermediateResultPartition)组成,ExecutionJobVertex 实际对应于 JobGraph 图中的 JobVertex,只不过在 ExecutionJobVertex 内部是一种并行表示,由多个并行的 ExecutionVertex 所组成。另外,这里还有一个重要的概念,就是 Execution,它是一个 ExecutionVertex 的一次运行 Attempt,也就是说,一个 ExecutionVertex 可能对应多个运行状态的 Execution,比如,一个 ExecutionVertex 运行产生了一个失败的 Execution,然后还会创建一个新的 Execution 来运行,这时就对应这个2次运行 Attempt。每个 Execution 通过 ExecutionAttemptID 来唯一标识,在 TaskManager 和 JobManager 之间进行 Task 状态的交换都是通过 ExecutionAttemptID 来实现的。

下面看一下,在物理上进行调度,基于资源的分配与使用的一个例子,来自官网,如下图所示:

示例

  • 左上子图:有 2 个 TaskManager,每个 TaskManager 有 3 个 Task Slot
  • 左下子图:一个 Flink Job,逻辑上包含了 1 个 data source、1 个 MapFunction、1 个 ReduceFunction,对应一个 JobGraph
  • 左下子图:用户提交的 Flink Job 对各个 Operator 进行的配置——data source 的并行度设置为 4,MapFunction 的并行度也为 4,ReduceFunction 的并行度为 3,在 JobManager 端对应于 ExecutionGraph
  • 右上子图:TaskManager 1 上,有 2 个并行的 ExecutionVertex 组成的 DAG 图,它们各占用一个 Task Slot
  • 右下子图:TaskManager 2 上,也有 2 个并行的 ExecutionVertex 组成的 DAG 图,它们也各占用一个 Task Slot
  • 在 2 个 TaskManager 上运行的 4 个 Execution 是并行执行的

可以看出在每个 slot 内也是一个链。

常见题

  1. Flink计算单位是什么?

  2. Flink时间类型有那些,他们有什么区别?

    Event Time,Processing Time(计算机系统时间),Ingestion Time(进入 Flink 时间,source 算子获取数据的时间)

  3. Flink窗口类型有哪些,你们目前用的什么窗口?

    Tumbling window(固定时间窗口),Sliding Windows(滑动窗口),Session window(一个session window关闭通常是由于一段时间没有收到元素。),Global window(相同keyed的元素分配到一个窗口)

  4. Flink的状态你们有没有用过,用的什么类型的状态?

    Keyed State:该类状态是基于 KeyedStream 上的状态,这个状态是根据特定的 key 绑定的,对 keyedStream 流上的每一个 key,都对应着一个 state。stream.keyBy(…).window().apply(new KeyedStateRichFunction())

    Operator State:该类State与key无关,整个operator对应一个state,该类State没有Keyed Key支持的数据结构多,仅支持ListState。举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

  5. Flink如何处理延迟数据?

    waterMark,waterMark的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接收到尽可能多的乱序数据。

  6. Flink中managed state和raw state区别?

    管理状态(Managed State)表示在Flink运行时约束的数据结构,比如内部的哈希表或者RocksDB。例如:ValueState, ListState。Flink在运行时对状态进行编码,并将其写入检查点(checkpoint)。

    原始状态(Raw State)是状态操作符保存在自己的数据结构中。当触发检查点时,它们只将字节序列写入检查点。Flink不知道状态的数据结构,只看到原始字节。

  7. Flink的keystate有什么不足,优点是什么,缺点是什么?
  8. Flink的watermark有哪几种?

Search

    Table of Contents