Skip to content

Spark内核设计的艺术: 第10章 Spark API #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
shilinlee opened this issue Aug 30, 2019 · 0 comments
Open

Spark内核设计的艺术: 第10章 Spark API #21

shilinlee opened this issue Aug 30, 2019 · 0 comments
Labels
Spark Apache Spark 大数据 整个大数据体系

Comments

@shilinlee
Copy link
Owner

shilinlee commented Aug 30, 2019

10.1 基本概念

  1. DataType 简介

    DataType是 Spark SQL的所有数据类型的基本类型, Spark SQL的所有数据类型都继承自 DataType, DataType的继承体系如图所示。

    image

    从图中可以看到, Spark SQL中定义的数据类型与Java的基本数据类型大部分都是一致的。由于 Spark SQL不是本书要讲解的内容,所以读者在这里只需要了解这些内容
    即可。

  2. Metadata简介
    Metadata用来保存 StructField的元数据信息,其本质是底层的 Map[string, Any] Metadata可以对Boolean、Long、 Double、 String、 Metadata、Array[ Boolean]、 Array[Long]、Array[Double]、 Array[String]和Array [Metadata]等类型的元数据进行存储或读取。 Metadata属于 Spark SQL中的内容,这里不多介绍。

  3. StructType 与 StructField
    样例类 StructType与样例类 Structfield共同构建起数据源的数据结构。StructField中共定义了4个属性:字段名称(name)、数据类型( data Type)、是否允许为null( nullable)、元数据( metadata)。 StructField的定义如下:

  case class StructField(
      name: String,
      dataType: DataType,
      nullable: Boolean = true,
      metadata: Metadata = Metadata.empty)

10.2 数据源 DataSource

从 Spark1.3.0开始, Spark推出了 Data Frame的API,与此同时 Data Source也被引入到Spark中。 Spark将文本文件、CSV文件、JSON文件等一系列格式的输入数据都作为数据源。特质 DataSourceRegister是对所有数据源的抽象, DataSourceRegister的所有具体实现都被注册到了 DataSource中。 DataSource将负责对不同类型数据源的查找、创建不同类型数据源的信息、解析不同数据源的关系等。

10.2.1 DataSourceRegister 详解

DataSourceRegister是数据源的抽象,所有数据源都应该实现它。 DataSourceRegister的定义非常简单,代码如下。

trait DataSourceRegister {
	def shortName(): String
}

shortName方法意在获取数据源提供者使用的格式或格式的别名Spark中实现了大量的数据源提供者,如图所示。

image

这里以 TextFileFormat实现的 shortName方法为例。

override def shorName(): String = "text"

10.2.2 DataSource详解

DataSource表示在Spark SQL中可插拔的数据源。

10.3 检查点的实现

检查点是很多分布式系统为了容灾容错引入的机制,其实质是将系统运行期的内存数据结构和状态持久化到磁盘上,在需要时通过对这些持久化数据的读取,重新构造出之前的运行期状态。 Spark使用检查点主要是为了将RDD的执行状态保留下来,在重新执行时就不用重新计算,而直接从检查点读取。 CheckpointRDD是对检查点数据进行操作的RDD,例如,读写检查点数据。 RDDCheckpointData表示RDD检查点的数据。 Spark的检查点离不开 CheckpointRDD和 RDDCheckpointData的支持,本节将对它们的代码实现进行分析。

10.3.1 CheckpointRDD 的实现

CheckpointRDD 是用来存储体系中回复检查点的数据。

/**
 * An RDD that recovers checkpointed data from storage.
 */
private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
  extends RDD[T](sc, Nil) {

  // CheckpointRDD should not be checkpointed again
  override def doCheckpoint(): Unit = { }
  override def checkpoint(): Unit = { }
  override def localCheckpoint(): this.type = this

  // Note: There is a bug in MiMa that complains about `AbstractMethodProblem`s in the
  // base [[org.apache.spark.rdd.RDD]] class if we do not override the following methods.
  // scalastyle:off
  protected override def getPartitions: Array[Partition] = ???
  override def compute(p: Partition, tc: TaskContext): Iterator[T] = ???
  // scalastyle:on

}
  • compute 实际上是从恢复点恢复数据

10.3.2 RDDCheckpointData 的实现

RDDCheckpointData 用来保存和检查点相关的信息。每个RDDCheckpointData 实例都与一个RDD实例相关联。RDDCheckpointData中一共3个属性:

  • rdd
  • cpState: 检查点状态
  • cpRDD: 保存检查点数据的RDD,即CheckpointRDD的实现类。

10.3.3 ReliableRDDCheckpointData 的实现

本节以 RDDCheckpointData 的子类 ReliableRDDCheckpointData为例,来看看RDDCheckpointData的具体实现。

ReliableRDDCheckpointData除继承了 RDDCheckpointData 的属性外,还有自身的一个属性 cpDir。 cpDir是保存ReliableRDDCheckpointData所关联的RDD数据的检查点目录,是通过调用 ReliablerDDCheckpointData的伴生对象的 checkpointPath方法生成的。

10.4 RDD 的再次分析

笔者早在7.2.2节就对RDD的实现进行了分析,但当时只介绍了与调度系统相关的API。RDD还提供了很多其他类型的API,包括对RDD进行转换的API、对RDD进行计算(动作)的API及RDD检查点相关的API。转换API里的计算是延迟的,也就是说调用转换API不会向 Spark集群提交Job,更不会执行转换计算。只有调用了动作API,才会提交Job并触发对转换计算的执行。由于RDD提供的AP非常多,本书不可能一一展示。由于在10.8节将要介绍 word count的例子,因此本节主要挑选 word count例子中使用到的API进行分析。

10.4.1 转换 API

转换(transform)是指对现有RDD执行某个函数后转换为新的RDD的过程。转换前的RDD与转换后的RDD之间具有依赖和血缘关系。RDD的多次转换将创建出多个RDD这些RDD构成了一张单向依赖的图,也就是DAG。下面挑选10.8节的 word count例子所涉及的转换AP进行介绍。

  1. mapPartitions
    mapPartitions方法用于将RDD转换为 MapPartitionsRDD。
  2. mapPartitionsWithIndex

用于创建爱你一个将与分区索引先关的函数应用到RDD的每个分区的MapPartitionsRDD。mapPartitionsWithIndex与 mapAritions相似,区别在于多接收分区索引的参数

  1. mapPartitionsWithIndexInternal

    用于创建一个将函数应用到RDD的每个分区的MapPartitionsRDD。此方法是私有的,只有Spark SQL内部可使用。

  2. flatMap

  3. map

  4. toJavaRDD

    用于将RDD自己转换成JavaRDD。

10.4.2 动作 API

由于转换API都是预先编织好,但是不会执行的,所以Spak需要一些API来触发对转换的执行。动作API触发对数据的转换后,将接收到一些结果数据,动作AP因此还具备对这些数据进行收集、遍历、叠加的功能。下面挑选10.8节的 word count例子使用的动作 API-collect进行介绍。此外再介绍 foreach和 reduce两个动作API。

  1. collect
    collect方法将调用 Spark Context的 runJob方法提交基于RDD的所有分区上的作业,并返回数组形式的结果。

  2. foreach

  3. reduce

    reduce 方法按照指定的函数对RDD中的元素进行叠加操作。

10.4.3 检查点API的实现分析

RDD中提供了很多与检查点相关的API,通过对这些AP的使用,Spark应用程序才能够启用、保存及使用检查点,提高应用程序的容灾和容错能力。下面进行介绍。

  1. 检查点的启用
    用户提交的 Spark作业必须主动调用RDD的 checkpoint方法,才会启动检查点功能。给 Spark Context指定 checkpointDir是启用检查点机制的前提。可以使用 Spark Context的 setcheckpointDir方法设置checkpointDir。如果没有指定 RDDCheckpointData,那么创建 ReliableRDDCheckpointData
  2. 检查点的保存
    RDD的 doCheckpoint方法用于将RDD的数据保存到检查点。由于此方法是私有的,只能在RDD内部使用。
  3. 检查点的使用

10.4.4 迭代计算

在8.3.3节和8.5.4节分析 ShuffleMapTask和 ResultTask的 runtask方法时已经看到,Task的执行离不开对RDD的iterator方法的调用。RDD的 Iterator方法是迭代计算的人口。

10.5 数据集合 Dataset

Dataset是特定领域对象的强类型集合,可通过功能或关系操作进行并行转换。当Dataset的泛型类型是Row时, Dataset还可以作为 DataFrame。 DataFrame的类型定义如下。

type DataFrame Dataset [Row]

有了行的数据集合, Dataframe看起来更像是关系数据库中的表。 Data Frame是专门为了数据科学应用设计,支持从KB到PB级的数据量。 Spark支持从文本文件、CSV文件、Oracle脚本文件、JSON文件及所有支持JDBC的数据源(例如, MySQL和Hive)转换为Dataframe。

Dataset I中提供的API非常丰富,本书不可能逐一进行源码分析。

  1. ofRows

    ofRows是Dataset的伴生对象中提供的方法,用于将逻辑执行计划LogicalPlan转换为泛型是Row的Dataset(即DataFrame)。

  2. 多种多样的select

    Dataset提供了多个重载的 select方法,以实现类似于SQL中的 SELECT语句的选择功能。 Dataset虽然提供了多个 select操作的API,但这些API最终都将转换为统一的select方法。

  3. rdd

    rdd是 Dataset的属性之一,由于被关键字lazy修饰,因此在需要rdd的值时才会进行“懒”执行。

10.6 DataFrameReader

DataFrameReader用于通过外部数据源的格式(如text、cvs等)和数据结构加载Dataset, DataFrameReader还提供了非常丰富的操作DataFrame的API。有了DataReader,我们就可以将各种格式的数据源转换为Dataset或Dataframe,进而以面向关系型数据的方式操纵各种格式的数据。

DataFrameReader 只有三个属性,分别如下。

  • source:输入数据源的格式。可通过 spark. sql. sources. default属性配置,默认为 parquet

小贴士: Apache Parquet是 Hadoop生态圈中一种新型列式存储格式,它可以兼容 Hadoop生态圈中大多数计算框架( Hadoop、 Spark等),被多种查询引擎支持(Hive、Impala、Drill等),并且是数据处理框架、数据模型和语言无关的。Parquet最初是由Twitter和 Cloudera(由于Impala的缘故)合作开发完成并开源,2015年5月从 Apache的孵化器里华业成为 Apache顶级项目。

  • userSpecifiedSchema:用户指定的 Schema,实际的类型是 StructType
    口 extraOptions:类型为 HashMap[String, String],用于保存额外的选项
  • extraOptions:类型为 HashMap[String, String],用于保存额外的选项Q

DataFrameReader提供了大量的API,本书限于篇幅不可能全部都进行分析。

  1. format
    format方法用于设置输入数据源的格式。

  2. schema

    用于设置用户指定的结构类型(StructType)。

  3. 多种多样的option

    DataFrameReader提供了多个重载的 option方法,用于向 extraCtions中添加选项。无论哪个 option方法,都将转换为对最基本的 option方法的调用

  4. 多种多用的load

    DataFrameReader提供了多个重载的load方法,用于将数据源的数据加载到DataFrame这些load方法实际都会转换为调用基本的load方法来加载数据。

  5. 重载的text

  6. 重载的textFIle

10.7 SparkSession

Spark2.0引入了 SparkSession,为用户提供了一个统一的切入点来使用 Spark的各项功能。 SparkSession还提供了 DataFrame和 Dataset相关的API来编写Spark应用程序。SparkSession降低了学习曲线,使得工程师可以更容易地使用 Spark
SparkSession的属性如下。

  • spark Context: 即 Spark context。
  • sharedState: 在多个 SparkSession之间共享的状态(包括 Spark context、缓存的数
    据、监听器及与外部系统交互的字典信息)。
  • sessionState: SparkSession的状态( SessionState) SessionState中保存着 SparkSession
    指定的状态信息。
  • sqlContext: 即 SQLContext。 SQLContext是 Spark SQL的上下文信息。
  • conf: 类型为 RuntimeConfig,是 Spark运行时的配置接口类。

根据 SparkSession的属性,笔者认为 SparkSession并非是一个新创造的东西,它不过是对 SparkContext、 SQLContext及 Dataframe等的一层封装。 SparkSession的创建离不开构建器。

SparkSession的伴生对象提供了很多属性和方法,以便于我们构造Spaark Session。SparkSession的伴生对象中包含以下属性。

  • sqlListener: 类型定义为 AtomicReference[SQLListener],用于持有 SQLListenerliStener主要用于 SQL UI。
  • active ThreadSession: 类型定义为 InheritableThreadLocal[ SparkSession],用于持有当前线程的激活的 SparkSession。 SparkSession的伴生对象提供了setActiveSession方法、 getActiveSessior方法、 clearActivesession方法分别用于设置、获取、清空activeThreadSessic中持有的 SparkSession。
  • defaultsession: 类型定义为 AtomicReference[SparkSession],用于持有默认的spakSession。 SparkSession的伴生对象提供了 setDefaultsession方法、 getDefaultSession方法、 clearDefaultsession方法分别用于设置、获取、清空defaultsession中持有的SparkSession 。

10.7.1 SparkSession的构建器 Builder

Builder是 SparkSession实例的构建器,对 SparkSession实例的构造都依赖于它。Builder中的 options(类型为 HashMap[String, Stringl)用于缓存构建 SparkConf所需的属性配置。 userSuppliedcontext属性用于持有用户提供的 SparkContext,可以通过 Builder的sparkContext方法来设置。 Builder中提供了很多向 options中增加属性配置的方法,此外还提供了获取或创建 SparkSession的方法。

  1. 多种多样的config

  2. getOrCreate 方法

    getOrCreate 方法用于获取或创建SparkSession。

10.7.2 SparkSession 的API

  1. builder
    SparkSession的伴生对象中提供了 builder方法用于创建 Builder,其实现如下

    def builder (): Builder new Builder
  2. read
    sparkSession的read方法用于创建 DataFrameReader。

  3. baseRelationToDataFrame
    SparkSession的 baseRelationToDataFrame方法用于将 BaseRelation转换为 DataFrame。将 BaseRelation转换为 DataFrame后,就可以用操作关系数据的方式来开发。

10.8 word count 例子

@shilinlee shilinlee added Spark Apache Spark 大数据 整个大数据体系 labels Aug 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Spark Apache Spark 大数据 整个大数据体系
Projects
None yet
Development

No branches or pull requests

1 participant