Spark 架构与原理

2020/01/25 Knowledge

参考来源

目录

Spark 提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求官方资料介绍 Spark 可以将 Hadoop 集群中的应用在内存中的运行速度提升 100 倍,甚至能够将应用在磁盘上的运行速度提升 10 倍。

基本概念

  • Application:

    Application 的概念和 Hadoop MapReduce 中的类似,都是用户编写的 Spark 应用程序,其中包含了一个 Driver 功能的代码和分布在集群中多个节点上运行的 Executor 代码。

  • Driver:

    使用 Driver 这一概念的分布式框架很多,比如 Hive 等。 Spark 中的 Driver 即运行 Application 的 main() 函数并创建 SparkContext,创建 SparkContext 的目的是为了准备 Spark 应用程序的运行环境。在 Spark 中由 SparkContext 负责与 ClusterManager 通信,进行资源的申请、任务的分配和监控等。当 Executor 部分运行完毕后,Driver 同时负责将 SparkContext 关闭。通常用 SparkContext 代表 Driver。

  • Executor:

    某个 Application 运行在 Worker 节点上的一个进程,该进程负责运行某些 Task,并且负责将数据存在内存或者磁盘上,每个 Application 都有各自独立的一批 Executor。 在 Spark on Yarn 模式下其进程名称为 CoarseGrainedExecutor Backend,类似于 Hadoop MapReduce 中的 YarnChild。一个 CoarseGrainedExecutor Backend 进程有且仅有一个 executor 对象,它负责将 Task 包装成 taskRunner,并从线程池抽取出一个空闲线程运行 Task。这样,每个 CoarseGrainedExecutor Backend 能并行运行 Task 的数量就取决于分配给它的 CPU 的个数了。

  • Cluster Manager:

    指的是在集群上获取资源的外部服务,目前有三种类型:

    • Standalone :Spark 原生的资源管理,由 Master 负责资源的分配,可以在亚马逊的 EC2 上运行。
    • Apache Meso:与 Hadoop MapReduce 兼容性良好的一种资源调度框架。
    • Hadoop Yarn:主要是指的 Yarn 中的 ResourceManager。
  • Worker:

    集群中任何可以运行 Application 代码的节点,类似于 Yarn 中的 NodeManager 节点。在 Standalone 模式中指的就是通过 slave 文件配置的 Worker节点,在 Spark on Yarn 模式中指的就是 NodeManager 节点。

  • RDD(resillient distributed dataset):

    Spark 的基本计算单元,可以通过一系列算子进行操作(主要有 Transformation 和Action操作)。同时,RDD 是 Spark 最核心的东西,它表示已被分区、被序列化的、不可变的、有容错机制的,并且能够被并行操作的数据集合。其存储级别可以是内存,也可以是磁盘,可通过 spark.storage.StorageLevel 属性配置。

  • Job:

    用户提交的作业。一个 Job 可能由一到多个 Task 组成。

    spark 中的数据都是抽象为 RDD 的,它支持两种类型的算子操作:Transformation 和 Action(还有一种是 controller 算子,主要是 cache 和 persist)。Transformation 算子的代码不会真正被执行。只有当我们的程序里面遇到一个 action 算子的时候,代码才会真正的被执行。

    Transformation 算子主要包括:map、mapPartitions、flatMap、filter、union、groupByKey、repartition、cache 等。

    Action 算子主要包括:reduce、collect、show、count、foreach、saveAsTextFile 等。

    当在程序中遇到一个 action 算子的时候,就会提交一个 job,执行前面的一系列操作。因此平时要注意,如果声明了数据需要 cache 或者 persist,但在 action 操作前释放掉的话,该数据实际上并没有被缓存。

    通常一个任务(application)会有多个 job,job 之间是按照串行的方式执行的。一个 job 执行完成后,才会起下一个 job。

  • Stage:

    每个Job会拆分很多组 Task,作为一个 TaskSet,其名称为 Stage。Stage 的划分或调度由下面的 DAGScheduler 负责。Stage 有非最终的 Stage(即 Shuffle Map Stage)和最终的 Stage(即Result Stage)两种。Stage 的边界就是发生 Shuffle 的地方。

    一个 job 通常包含一个或多个 stage。各个 stage 之间按照顺序执行。上面已经说过,一个 job 会有多个算子操作。这些算子都是将一个父 RDD 转换成子 RDD。如果一个父 RDD 的数据只进入到一个子 RDD,比如 map、union 等操作,称之为 narrow dependency(窄依赖)。否则,就会形成 wide dependency(宽依赖),一般也成为 shuffle 依赖,比如 groupByKey 等操作。

    job 中 stage 的划分就是根据 shuffle(宽)依赖进行的。shuffle 依赖是两个 stage 的分界点。shuffle 操作一般都是任务中最耗时耗资源的部分。因为数据可能存放在 HDFS 不同的节点上,下一个 stage 的执行首先要去拉取上一个 stage 的数据(shuffle read 操作),保存在自己的节点上,就会增加网络通信和 IO。Shuffle 操作其实是一个比较复杂的过程,这里暂且不表。

    Stage 由于是宽依赖时划分,这样一个 stage 内的操作都可以确保资源可以不需要从别的机器中获取。避免了不必要的通信。

  • Task:

    被送到某个 Executor 上的工作单元,和 Hadoop MapReduce 中的 MapTask 和 ReduceTask 概念一样,是运行 Application 的基本单元。多个 Task 组成一个 Stage,而Task 的调度和管理等由下面的 TaskScheduler 负责。一个 rdd 有多少个 partition,就会有多少个 task。

  • Partition:

    数据分区。即一个 RDD 的数据可以划分为多少个分区。一个分区在一个 Executor 中运行。

  • NarrowDependency:

    窄依赖。即子 RDD 依赖于父 RDD 中固定的 Partition。一对一依赖

  • ShuffleDependency:

    shuffle 依赖,也称为宽依赖。即子 RDD 对父 RDD 中的所有 Partition 都有依赖

  • DAG(Directed Acycle graph):

    根据 Job 构建基于 Stage 的 DAG,有向无环图。用于反映各 RDD 之间的依赖关系。

  • 共享变量 :

    在 Spark Application 运行时,可能需要一些变量,提供给 Task 或 Driver 等使用。 Spark 提供了两种共享变量,一种是可以缓存到各个节点的广播变量,另一种是只支持加法操作,可以实现求和的累加变量

  • DAGScheduler:

    根据 Job 构建基于 Stage 的 DAG,并提交 Stage 给 TaskScheduler。其划分 Stage 的依据是 RDD 之间的依赖关系。

  • TaskScheduler:

    将 Taskset 提交给 Worker(集群)运行,每个 Executor 运行什么 Task 就是在此处分配的。

使用场景

Spark 适用场景:

  1. Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小。
  2. 由于 RDD 的特性,Spark 不适用那种异步细粒度更新状态的应用,例如 web 服务的存储或者是增量的 web 爬虫和索引。就是对于那种增量修改的应用模型不适合。
  3. 数据量不是特别大,但是要求近实时统计分析需求

Spark 不适用的场景:

  1. 内存不够的场景,在内存不足的情况下,Spark会下放到磁盘,会降低应有的性能。
  2. 有高实时性要求的流式计算业务,例如实时性要求毫秒级。
  3. 由于 RDD 设计上的只读特点,所以Spark对于待分析数据频繁变动的情景很难做(并不是不可以),比如搜索,假设你的数据集在频繁变化(不停增删改),而且又需要结果具有很强的一致性(不一致时间窗口很小),那么就不合适了。
  4. 流线长或文件流量非常大的数据集不适合。你会发现你的内存不够用,集群压力大时一旦一个 task 失败会导致他前面一条线所有的前置任务全部重跑,然后恶性循环会导致更多的 task 失败,整个 sparkapp 效率极低。就不如 MapReduce。

架构及生态

Spark生态系统如下图所示:

Spark架构

  • Spark Core:包含 Spark 的基本功能;尤其是定义 RDD 的 API、操作以及这两者上的动作。其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的。
  • Spark SQL:提供通过 Apache Hive 的 SQL 变体 Hive 查询语言(HiveQL)与 Spark 进行交互的 API。每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作。
  • Spark Streaming:对实时数据流进行处理和控制。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据
  • MLlib:一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。
  • GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路径上所有顶点的操作

Spark架构的组成图如下:

Spark架构组成

  • Cluster Manager:在 standalone 模式中即为 Master 主节点,控制整个集群,监控 worker。在 YARN 模式中为资源管理器
  • Worker节点:从节点,负责控制计算节点,启动 Executor 或者 Driver。
    • Driver:运行 Application 的 main() 函数
    • Executor:执行器,是为某个 Application 运行在 worker node 上的一个进程

Spark 编程模型

Spark 应用程序从编写到提交、执行、输出的整个过程如图所示,图中描述的步骤如下:

  1. 用户使用 SparkContext 提供的API(常用的有textFile、sequenceFile、runJob、stop等)编写 Driver application 程序。此外 SQLContext、HiveContext 及 StreamingContext 对 SparkContext 进行封装,并提供了 SQL、Hive 及流式计算相关的 API。
  2. 使用 SparkContext 提交的用户应用程序,首先会使用 BlockManager(BlockManager 是一个嵌入在 spark 中的 key-value 型分布式存储系统)和 BroadcastManager 将任务的配置进行广播。然后由 DAGScheduler 将任务转换为 RDD 并组织成 DAG,DAG 还将被划分为不同的 Stage。最后由 TaskScheduler 借助 ActorSystem 将任务提交给集群管理器(Cluster Manager)。
  3. 集群管理器(ClusterManager)给任务分配资源,即将具体任务分配到 Worker 上,Worker 创建 Executor 来处理任务的运行。Standalone、YARN、Mesos、EC2 等都可以作为 Spark 的集群管理器。

Spark编程模型

spark 计算模型

RDD 可以看做是对各种数据计算模型的统一抽象,Spark 的计算过程主要是 RDD 的迭代计算过程。RDD 的迭代计算过程非常类似于管道。分区数量取决于 partition 数量的设定,每个分区的数据只会在一个 Task 中计算。所有分区可以在多个机器节点的 Executor 上并行执行。

spark计算模型

集群架构设计

集群架构设计

整个集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点。 Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。 Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 executors。 Driver 官方解释是 “The process running the main() function of the application and creating the SparkContext”。Application 就是用户自己写的 Spark 程序。

spark 运行流程与特点:

运行特点

  1. 构建 Spark Application 的运行环境,启动 SparkContext
  2. SparkContext 向资源管理器(可以是Standalone,Mesos,Yarn)申请运行 Executor 资源,并启动 StandaloneExecutorbackend
  3. Executor 向 SparkContext 申请 Task
  4. SparkContext 将应用程序分发给 Executor
  5. SparkContext 构建成 DAG 图,将 DAG 图分解成 Stage,将 Taskset(每个 stage 中包含一个 Taskset)发送给 Task Scheduler,最后由 Task Scheduler 将 Task 发送给Executor运行
  6. Task 在 Executor 上运行,运行完释放所有资源

特点:

  1. 每个 Application 获取专属的 executor 进程,该进程在 Application 期间一直驻留,并以多线程方式运行 Task。这种 Application 隔离机制是有优势的,无论是从调度角度看(每个 Driver 调度他自己的任务),还是从运行角度看(来自不同 Application 的 Task 运行在不同 JVM中),当然这样意味着 Spark Application 不能跨应用程序共享数据,除非将数据写入外部存储系统
  2. Spark 与资源管理器无关,只要能够获取 executor 进程,并能保持相互通信就可以了
  3. 提交 SparkContext 的 Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同一个 Rack 里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换
  4. Task 采用了数据本地性和推测执行的优化机制

RDD 运行流程

RDD 在 Spark 中运行大概分为以下三步:

  1. 创建 RDD 对象
  2. DAGScheduler 模块介入运算,计算 RDD 之间的依赖关系,RDD 之间的依赖关系就形成了 DAG
  3. 每一个 Job 被分为多个 Stage。划分 Stage 的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个 Stage,避免多个 Stage 之间的消息传递开销(也就是遇到宽依赖不能确定上下游 RDD 间的分区一一对应,就划分 stage)

流程1

流程2

  • 创建 RDD 上面的例子除去最后一个 collect 是个 action,不会创建 RDD 之外,前面四个转换都会创建出新的 RDD 。因此第一步就是创建好所有 RDD(内部的五项信息)
  • 创建执行计划 Spark 会尽可能地管道化,并基于是否要重新组织数据来划分 阶段(stage),例如本例中的 groupBy() 转换就会将整个执行计划划分成两阶段执行。
  • 最终会产生一个 DAG(directed acyclic graph ,有向无环图)作为逻辑执行计划

stage

  1. 调度任务:将各阶段划分成不同的 任务(task),每个任务都是数据和计算的合体。在进行下一阶段前,当前阶段的所有任务都要执行完成。因为下一阶段的第一个转换一定是重新组织数据的,所以必须等当前阶段所有结果数据都计算出来了才能继续

面试题

面试题

Search

    Table of Contents