Spark学习笔记(二) 架构解析和RDD编程

1 Spark运行架构

1.1 运行架构

Spark框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。

如下图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master,负责管理整个集群中的作业任务调度。图形中的Executor 则是 slave,负责实际执行任务。

1638183145356

1.2 核心组件

由上图可以看出,对于Spark框架有两个核心组件:

1.2.1 Driver

Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业执行时主要负责:

Ø 将用户程序转化为作业(job)

Ø 在Executor之间调度任务(task)

Ø 跟踪Executor的执行情况

Ø 通过UI展示查询运行情况

实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关Driver的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为Driver类。

1.2.2 Executor

Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。

Executor有两个核心功能:

Ø 负责运行组成Spark应用的任务,并将结果返回给驱动器进程

Ø 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

1.2.3 Master & Worker

Spark集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master和Worker,这里的Master是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于Yarn环境中的RM, 而Worker呢,也是进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM。

1.2.4 ApplicationMaster

Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。

说的简单点就是,ResourceManager(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster。

1.3 核心概念

1.3.1 Executor与Core(核)

Spark Executor是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(Core)数量。

应用程序相关启动参数如下:

名称说明
–num-executors配置Executor的数量
–executor-memory配置每个Executor的内存大小
–executor-cores配置每个Executor的虚拟CPU core数量

1.3.2 并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决于框架的默认配置。应用程序也可以在运行过程中动态修改。

1.3.3 有向无环图(DAG)

1638183179425

大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是Hadoop所承载的MapReduce,它将计算分为两个阶段,分别为 Map阶段 和 Reduce阶段。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。

这里所谓的有向无环图,并不是真正意义的图形,而是由Spark程序直接映射成的数据流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观,更便于理解,可以用于表示程序的拓扑结构。

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。

1.4 提交流程

所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过Spark客户端提交给Spark运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又有细微的区别,我们这里不进行详细的比较,但是因为国内工作中,将Spark引用部署到Yarn环境中会更多一些,所以本课程中的提交流程是基于Yarn环境的。

1638183211849

Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置。

1.2.1 Yarn Client模式

Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试。

Ø Driver在任务提交的本地机器上运行

Ø Driver启动后会和ResourceManager通讯申请启动ApplicationMaster

Ø ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存

Ø ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程

Ø Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数

Ø 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

1.2.2 Yarn Cluster模式

Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行。一般应用于实际生产环境。

Ø 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,

Ø 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。

Ø Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程

Ø Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,

Ø 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

2 Spark核心编程

Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

Ø RDD : 弹性分布式数据集

Ø 累加器:分布式共享只写变量

Ø 广播变量:分布式共享只读变量

接下来我们一起看看这三大数据结构是如何在数据处理中使用的。

2.1 RDD

2.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。(所谓的RDD,其实就是要给数据结构,类似于链表中的Node,RDD中有适合并行计算的分区操作;RDD中封装了最小的计算单元,目的是更适合重复使用;Spark主要就是通过组合RDD的操作完成业务需求。)

那Spark 怎么组合RDD?

RDD的扩展功能采用的也是装饰者设计模式;RDD中的collect方法类似于IO中的read方法。RDD不存储任何数据,只封装逻辑。

1638181914698

Ø 弹性

l 存储的弹性:内存与磁盘的自动切换;

l 容错的弹性:数据丢失可以自动恢复;

l 计算的弹性:计算出错重试机制;

l 分片的弹性:可根据需要重新分片。

Ø 分布式:数据存储在大数据集群不同节点上

Ø 数据集:RDD封装了计算逻辑,并不保存数据

Ø 数据抽象:RDD是一个抽象类,需要子类具体实现

Ø 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑

Ø 可分区、并行计算

2.1.2 核心属性

img

1638182223714

1)分区列表

RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

2) 分区计算函数

Spark在计算时,是使用分区函数对每一个分区进行计算

3)RDD之间的依赖关系

RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系

4) 分区器(可选)

当数据为KV类型数据时,可以通过设定分区器自定义数据的分区

5)首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

2.1.3 执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合

Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理:

  1. 启动Yarn集群环境

1638182323787

  1. Spark通过申请资源创建调度节点和计算节点

1638182335328

  1. Spark框架根据需求将计算逻辑根据分区划分成不同的任务

1638182347091

  1. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

1638182359929

从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算,接下来我们就一起看看Spark框架中RDD是具体是如何进行数据处理的。

2.1.4 基础编程

2.1.4.1 RDD创建

在Spark中创建RDD的创建方式可以分为四种

1) 从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD(推荐使用)

val sparkConf =
    new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
    List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
    List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
  1. 从外部存储(文件)创建RDD

由外部存储系统的数据集创建RDD(textFile函数)包括:本地的文件系统,所有Hadoop支持的数据集,比如HDFS、HBase等。。

val sparkConf =
    new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("data/input.txt")
fileRDD.collect().foreach(println)
sparkContext.stop()

3) 从其他RDD创建

主要是通过一个RDD运算完后,再产生新的RDD。详情请参考后续章节

4) 直接创建RDD(new)

使用new的方式直接构造RDD,一般由Spark框架自身使用。

2.1.4.2 RDD并行度与分区

默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

val sparkConf =
    new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
    sparkContext.makeRDD(
        List(1,2,3,4),
        4)
val fileRDD: RDD[String] =
    sparkContext.textFile(
        "input",
        2)
fileRDD.collect().foreach(println)
sparkContext.stop()

l 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  (0 until numSlices).iterator.map { i =>
    val start = ((i * length) / numSlices).toInt
    val end = (((i + 1) * length) / numSlices).toInt
    (start, end)
  }
}

l 读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {

    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    ...

    for (FileStatus file: files) {

        ...

    if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          ...

  }
  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

2.1.4.3 RDD转换算子

RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

l Value类型

1) map

Ø 函数签名

def map[U: ClassTag](f: T => U): RDD[U]

Ø 函数说明

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))

val dataRDD1: RDD[Int] = dataRDD.map(

num => {

​ num * 2

}

)

val dataRDD2: RDD[String] = dataRDD1.map(

num => {

​ “” + num

}

)

v 小功能:从服务器日志数据apache.log中获取用户请求URL资源路径

2) mapPartitions

Ø 函数签名

def mapPartitions[U: ClassTag](

f: Iterator[T] => Iterator[U],

preservesPartitioning: Boolean = false): RDD[U]

Ø 函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

val dataRDD1: RDD[Int] = dataRDD.mapPartitions(

datas => {

​ datas.filter(_==2)

}

)

v 小功能:获取每个数据分区的最大值

img 思考一个问题:map和mapPartitions的区别?

Ø 数据处理角度

Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。

Ø 功能的角度

Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

Ø 性能的角度

Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。

完成比完美更重要

3) mapPartitionsWithIndex

Ø 函数签名

def mapPartitionsWithIndex[U: ClassTag](

f: (Int, Iterator[T]) => Iterator[U],

preservesPartitioning: Boolean = false): RDD[U]

Ø 函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

val dataRDD1 = dataRDD.mapPartitionsWithIndex(

(index, datas) => {

​ datas.map(index, _)

}

)

v 小功能:获取第二个数据分区的数据

4) flatMap

Ø 函数签名

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

Ø 函数说明

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

val dataRDD = sparkContext.makeRDD(List(

List(1,2),List(3,4)

),1)

val dataRDD1 = dataRDD.flatMap(

list => list

)

v 小功能:将List(List(1,2),3,List(4,5))进行扁平化操作

5) glom

Ø 函数签名

def glom(): RDD[Array[T]]

Ø 函数说明

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4

),1)

val dataRDD1:RDD[Array[Int]] = dataRDD.glom()

v 小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

6) groupBy

Ø 函数签名

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

Ø 函数说明

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)

val dataRDD1 = dataRDD.groupBy(

_%2

)

v 小功能:将List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。

v 小功能:从服务器日志数据apache.log中获取每个时间段访问量。

v 小功能:WordCount。

7) filter

Ø 函数签名

def filter(f: T => Boolean): RDD[T]

Ø 函数说明

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4

),1)

val dataRDD1 = dataRDD.filter(_%2 == 0)

v 小功能:从服务器日志数据apache.log中获取2015年5月17日的请求路径

8) sample

Ø 函数签名

def sample(

withReplacement: Boolean,

fraction: Double,

seed: Long = Utils.random.nextLong): RDD[T]

Ø 函数说明

根据指定的规则从数据集中抽取数据

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4

),1)

// 抽取数据不放回(伯努利算法)

// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。

// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要

// 第一个参数:抽取的数据是否放回,false:不放回

// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;

// 第三个参数:随机数种子

val dataRDD1 = dataRDD.sample(false, 0.5)

// 抽取数据放回(泊松算法)

// 第一个参数:抽取的数据是否放回,true:放回;false:不放回

// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数

// 第三个参数:随机数种子

val dataRDD2 = dataRDD.sample(true, 2)

img思考一个问题:有啥用,抽奖吗?

9) distinct

Ø 函数签名

def distinct()(implicit ord: Ordering[T] = null): RDD[T]

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

Ø 函数说明

将数据集中重复的数据去重

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4,1,2

),1)

val dataRDD1 = dataRDD.distinct()

val dataRDD2 = dataRDD.distinct(2)

img思考一个问题:如果不用该算子,你有什么办法实现数据去重?

10) coalesce

Ø 函数签名

def coalesce(numPartitions: Int, shuffle: Boolean = false,

​ partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

​ (implicit ord: Ordering[T] = null)

: RDD[T]

Ø 函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4,1,2

),6)

val dataRDD1 = dataRDD.coalesce(2)

img思考一个问题:我想要扩大分区,怎么办?

11) repartition

Ø 函数签名

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

Ø 函数说明

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4,1,2

),2)

val dataRDD1 = dataRDD.repartition(4)

思考一个问题:coalesce和repartition区别?

12) sortBy

Ø 函数签名

def sortBy[K](

f: (T) => K,

ascending: Boolean = true,

numPartitions: Int = this.partitions.length)

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

Ø 函数说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程

val dataRDD = sparkContext.makeRDD(List(

1,2,3,4,1,2

),2)

val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)

l 双Value类型

13) intersection

Ø 函数签名

def intersection(other: RDD[T]): RDD[T]

Ø 函数说明

对源RDD和参数RDD求交集后返回一个新的RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))

val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))

val dataRDD = dataRDD1.intersection(dataRDD2)

img思考一个问题:如果两个RDD数据类型不一致怎么办?

14) union

Ø 函数签名

def union(other: RDD[T]): RDD[T]

Ø 函数说明

对源RDD和参数RDD求并集后返回一个新的RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))

val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))

val dataRDD = dataRDD1.union(dataRDD2)

img思考一个问题:如果两个RDD数据类型不一致怎么办?

15) subtract

Ø 函数签名

def subtract(other: RDD[T]): RDD[T]

Ø 函数说明

以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))

val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))

val dataRDD = dataRDD1.subtract(dataRDD2)

思考一个问题:如果两个RDD数据类型不一致怎么办?

16) zip

Ø 函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

Ø 函数说明

将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))

val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))

val dataRDD = dataRDD1.zip(dataRDD2)

img思考一个问题:如果两个RDD数据类型不一致怎么办?

img思考一个问题:如果两个RDD数据分区不一致怎么办?

img思考一个问题:如果两个RDD分区数据数量不一致怎么办?

l Key - Value类型

17) partitionBy

Ø 函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

Ø 函数说明

将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

val rdd: RDD[(Int, String)] =

sc.makeRDD(Array((1,”aaa”),(2,”bbb”),(3,”ccc”)),3)

import org.apache.spark.HashPartitioner

val rdd2: RDD[(Int, String)] =

rdd.partitionBy(new HashPartitioner(2))

img思考一个问题:如果重分区的分区器和当前RDD的分区器一样怎么办?

img思考一个问题:Spark还有其他分区器吗?

img思考一个问题:如果想按照自己的方法进行数据分区怎么办?

img思考一个问题:哪那么多问题?

18) reduceByKey

Ø 函数签名

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

Ø 函数说明

可以将数据按照相同的Key对Value进行聚合

val dataRDD1 = sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val dataRDD2 = dataRDD1.reduceByKey(+)

val dataRDD3 = dataRDD1.reduceByKey(+, 2)

v 小功能:WordCount

19) groupByKey

Ø 函数签名

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

Ø 函数说明

将数据源的数据根据key对value进行分组

val dataRDD1 =

sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val dataRDD2 = dataRDD1.groupByKey()

val dataRDD3 = dataRDD1.groupByKey(2)

val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

考一个问题:reduceByKey和groupByKey的区别?

从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。

从功能的角度:reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

v 小功能:WordCount

20) aggregateByKey

Ø 函数签名

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,

combOp: (U, U) => U): RDD[(K, U)]

Ø 函数说明

将数据根据不同的规则进行分区内计算和分区间计算

val dataRDD1 =

sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val dataRDD2 =

dataRDD1.aggregateByKey(0)(+,+)

v 取出每个分区内相同key的最大值然后分区间相加

// TODO : 取出每个分区内相同key的最大值然后分区间相加

// aggregateByKey算子是函数柯里化,存在两个参数列表

// 1. 第一个参数列表中的参数表示初始值

// 2. 第二个参数列表中含有两个参数

// 2.1 第一个参数表示分区内的计算规则

// 2.2 第二个参数表示分区间的计算规则

val rdd =

sc.makeRDD(List(

​ (“a”,1),(“a”,2),(“c”,3),

​ (“b”,4),(“c”,5),(“c”,6)

),2)

// 0:(“a”,1),(“a”,2),(“c”,3) => (a,10)(c,10)

// => (a,10)(b,10)(c,20)

// 1:(“b”,4),(“c”,5),(“c”,6) => (b,10)(c,10)

val resultRDD =

rdd.aggregateByKey(10)(

​ (x, y) => math.max(x,y),

​ (x, y) => x + y

)

resultRDD.collect().foreach(println)

img思考一个问题:分区内计算规则和分区间计算规则相同怎么办?(WordCount)

21) foldByKey

Ø 函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

Ø 函数说明

当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey

val dataRDD1 = sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val dataRDD2 = dataRDD1.foldByKey(0)(+)

22) combineByKey

Ø 函数签名

def combineByKey[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C): RDD[(K, C)]

Ø 函数说明

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

小练习:将数据List((“a”, 88), (“b”, 95), (“a”, 91), (“b”, 93), (“a”, 95), (“b”, 98))求每个key的平均值

val list: List[(String, Int)] = List((“a”, 88), (“b”, 95), (“a”, 91), (“b”, 93), (“a”, 95), (“b”, 98))

val input: RDD[(String, Int)] = sc.makeRDD(list, 2)

val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(

(_, 1),

(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),

(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

)

img思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?

reduceByKey: 相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同

foldByKey: 相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同

aggregateByKey:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同

combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

23) sortByKey

Ø 函数签名

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

: RDD[(K, V)]

Ø 函数说明

在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的

val dataRDD1 = sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)

val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

v 小功能:设置key为自定义类User

24) join

Ø 函数签名

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

Ø 函数说明

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, “a”), (2, “b”), (3, “c”)))

val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))

rdd.join(rdd1).collect().foreach(println)

img思考一个问题:如果key存在不相等呢?

25) leftOuterJoin

Ø 函数签名

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

Ø 函数说明

类似于SQL语句的左外连接

val dataRDD1 = sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val dataRDD2 = sparkContext.makeRDD(List((“a”,1),(“b”,2),(“c”,3)))

val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

26) cogroup

Ø 函数签名

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

Ø 函数说明

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

val dataRDD1 = sparkContext.makeRDD(List((“a”,1),(“a”,2),(“c”,3)))

val dataRDD2 = sparkContext.makeRDD(List((“a”,1),(“c”,2),(“c”,3)))

val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =

dataRDD1.cogroup(dataRDD2)

2.1.4.4 案例实操

  1. 数据准备

agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。

  1. 需求描述

统计出每一个省份每个广告被点击数量排行的Top3

  1. 需求分析

  2. 功能实现

2.1.4.5 RDD行动算子

1) reduce

Ø 函数签名

def reduce(f: (T, T) => T): T

Ø 函数说明

聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 聚合数据

val reduceResult: Int = rdd.reduce(+)

2) collect

Ø 函数签名

def collect(): Array[T]

Ø 函数说明

在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 收集数据到Driver

rdd.collect().foreach(println)

3) count

Ø 函数签名

def count(): Long

Ø 函数说明

返回RDD中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 返回RDD中元素的个数

val countResult: Long = rdd.count()

4) first

Ø 函数签名

def first(): T

Ø 函数说明

返回RDD中的第一个元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 返回RDD中元素的个数

val firstResult: Int = rdd.first()

println(firstResult)

5) take

Ø 函数签名

def take(num: Int): Array[T]

Ø 函数说明

返回一个由RDD的前n个元素组成的数组

vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 返回RDD中元素的个数

val takeResult: Array[Int] = rdd.take(2)

println(takeResult.mkString(“,”))

6) takeOrdered

Ø 函数签名

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

Ø 函数说明

返回该RDD排序后的前n个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))

// 返回RDD中元素的个数

val result: Array[Int] = rdd.takeOrdered(2)

7) aggregate

Ø 函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

Ø 函数说明

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)

// 将该RDD所有元素相加得到结果

//val result: Int = rdd.aggregate(0)(_ + _, _ + _)

val result: Int = rdd.aggregate(10)(_ + _, _ + _)

8) fold

Ø 函数签名

def fold(zeroValue: T)(op: (T, T) => T): T

Ø 函数说明

折叠操作,aggregate的简化版操作

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))

val foldResult: Int = rdd.fold(0)(+)

9) countByKey

Ø 函数签名

def countByKey(): Map[K, Long]

Ø 函数说明

统计每种key的个数

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, “a”), (1, “a”), (1, “a”), (2, “b”), (3, “c”), (3, “c”)))

// 统计每种key的个数

val result: collection.Map[Int, Long] = rdd.countByKey()

10) save相关算子

Ø 函数签名

def saveAsTextFile(path: String): Unit

def saveAsObjectFile(path: String): Unit

def saveAsSequenceFile(

path: String,

codec: Option[Class[_ <: CompressionCodec]] = None): Unit

Ø 函数说明

将数据保存到不同格式的文件中

// 保存成Text文件

rdd.saveAsTextFile(“output”)

// 序列化成对象保存到文件

rdd.saveAsObjectFile(“output1”)

// 保存成Sequencefile文件

rdd.map((_,1)).saveAsSequenceFile(“output2”)

11) foreach

Ø 函数签名

def foreach(f: T => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

}

Ø 函数说明

分布式遍历RDD中的每一个元素,调用指定函数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 收集后打印

rdd.map(num=>num).collect().foreach(println)

println(“****“)

// 分布式打印

rdd.foreach(println)

2.1.4.6 RDD序列化

  1. 闭包检查

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变

  1. 序列化方法和属性

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,看如下代码:

object serializable02_function {

def main(args: Array[String]): Unit = {

​ //1.创建SparkConf并设置App名称

​ val conf: SparkConf = new SparkConf().setAppName(“SparkCoreTest”).setMaster(“local[*]”)

​ //2.创建SparkContext,该对象是提交Spark App的入口

​ val sc: SparkContext = new SparkContext(conf)

​ //3.创建一个RDD

​ val rdd: RDD[String] = sc.makeRDD(Array(“hello world”, “hello spark”, “hive”, “atguigu”))

​ //3.1创建一个Search对象

​ val search = new Search(“hello”)

​ //3.2 函数传递,打印:ERROR Task not serializable

​ search.getMatch1(rdd).collect().foreach(println)

​ //3.3 属性传递,打印:ERROR Task not serializable

​ search.getMatch2(rdd).collect().foreach(println)

​ //4.关闭连接

​ sc.stop()

}

}

class Search(query:String) extends Serializable {

def isMatch(s: String): Boolean = {

​ s.contains(query)

}

// 函数序列化案例

def getMatch1 (rdd: RDD[String]): RDD[String] = {

​ //rdd.filter(this.isMatch)

​ rdd.filter(isMatch)

}

// 属性序列化案例

def getMatch2(rdd: RDD[String]): RDD[String] = {

​ //rdd.filter(x => x.contains(this.query))

​ rdd.filter(x => x.contains(query))

​ //val q = query

​ //rdd.filter(x => x.contains(q))

}

}

  1. Kryo序列化框架

参考地址: https://github.com/EsotericSoftware/kryo

Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

注意:即使使用Kryo序列化,也要继承Serializable接口。

object serializable_Kryo {

def main(args: Array[String]): Unit = {

​ val conf: SparkConf = new SparkConf()

​ .setAppName(“SerDemo”)

​ .setMaster(“local[*]”)

​ // 替换默认的序列化机制

​ .set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

​ // 注册需要使用 kryo 序列化的自定义类

​ .registerKryoClasses(Array(classOf[Searcher]))

​ val sc = new SparkContext(conf)

​ val rdd: RDD[String] = sc.makeRDD(Array(“hello world”, “hello atguigu”, “atguigu”, “hahah”), 2)

​ val searcher = new Searcher(“hello”)

​ val result: RDD[String] = searcher.getMatchedRDD1(rdd)

​ result.collect.foreach(println)

}

}

case class Searcher(val query: String) {

def isMatch(s: String) = {

​ s.contains(query)

}

def getMatchedRDD1(rdd: RDD[String]) = {

​ rdd.filter(isMatch)

}

def getMatchedRDD2(rdd: RDD[String]) = {

​ val q = query

​ rdd.filter(_.contains(q))

}

}

2.1.4.7 RDD依赖关系

  1. RDD 血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

val fileRDD: RDD[String] = sc.textFile(“input/1.txt”)

println(fileRDD.toDebugString)

println(“———————-“)

val wordRDD: RDD[String] = fileRDD.flatMap(_.split(“ “))

println(wordRDD.toDebugString)

println(“———————-“)

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))

println(mapRDD.toDebugString)

println(“———————-“)

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(+)

println(resultRDD.toDebugString)

resultRDD.collect()

  1. RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻RDD之间的关系

val sc: SparkContext = new SparkContext(conf)

val fileRDD: RDD[String] = sc.textFile(“input/1.txt”)

println(fileRDD.dependencies)

println(“———————-“)

val wordRDD: RDD[String] = fileRDD.flatMap(_.split(“ “))

println(wordRDD.dependencies)

println(“———————-“)

val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))

println(mapRDD.dependencies)

println(“———————-“)

val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(+)

println(resultRDD.dependencies)

resultRDD.collect()

  1. RDD 窄依赖

窄依赖表示每一个父(上游)RDD的Partition最多被子(下游)RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependencyT

  1. RDD 宽依赖

宽依赖表示同一个父(上游)RDD的Partition被多个子(下游)RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](

@transient private val rdd: RDD[ <: Product2[K, V]],

val partitioner: Partitioner,

val serializer: Serializer = SparkEnv.get.serializer,

val keyOrdering: Option[Ordering[K]] = None,

val aggregator: Option[Aggregator[K, V, C]] = None,

val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]]

  1. RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

img img img

  1. RDD 阶段划分源码

try {

// New stage creation may throw an exception if, for example, jobs are run on a

// HadoopRDD whose underlying HDFS files have been deleted.

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

} catch {

case e: Exception =>

logWarning(“Creating new stage failed due to exception - job: “ + jobId, e)

listener.jobFailed(e)

return

}

……

private def createResultStage(

rdd: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

jobId: Int,

callSite: CallSite): ResultStage = {

val parents = getOrCreateParentStages(rdd, jobId)

val id = nextStageId.getAndIncrement()

val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)

stageIdToStage(id) = stage

updateJobIdStageIdMaps(jobId, stage)

stage

}

……

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

getShuffleDependencies(rdd).map { shuffleDep =>

getOrCreateShuffleMapStage(shuffleDep, firstJobId)

}.toList

}

……

private[scheduler] def getShuffleDependencies(

rdd: RDD[]): HashSet[ShuffleDependency[, _, _]] = {

val parents = new HashSet[ShuffleDependency[_, _, _]]

val visited = new HashSet[RDD[_]]

val waitingForVisit = new Stack[RDD[_]]

waitingForVisit.push(rdd)

while (waitingForVisit.nonEmpty) {

val toVisit = waitingForVisit.pop()

if (!visited(toVisit)) {

visited += toVisit

toVisit.dependencies.foreach {

case shuffleDep: ShuffleDependency[_, _, _] =>

​ parents += shuffleDep

case dependency =>

​ waitingForVisit.push(dependency.rdd)

}

}

}

parents

}

  1. RDD 任务划分

RDD任务切分中间分为:Application、Job、Stage和Task

l Application:初始化一个SparkContext即生成一个Application;

l Job:一个Action算子就会生成一个Job;

l Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;

l Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

img

  1. RDD 任务划分源码

val tasks: Seq[Task[_]] = try {

stage match {

case stage: ShuffleMapStage =>

partitionsToCompute.map { id =>

​ val locs = taskIdToLocations(id)

​ val part = stage.rdd.partitions(id)

​ new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

​ taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),

​ Option(sc.applicationId), sc.applicationAttemptId)

}

case stage: ResultStage =>

partitionsToCompute.map { id =>

​ val p: Int = stage.partitions(id)

​ val part = stage.rdd.partitions(p)

​ val locs = taskIdToLocations(id)

​ new ResultTask(stage.id, stage.latestInfo.attemptId,

​ taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,

​ Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)

}

}

……

val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

……

override def findMissingPartitions(): Seq[Int] = {

mapOutputTrackerMaster

.findMissingPartitions(shuffleDep.shuffleId)

.getOrElse(0 until numPartitions)

}

2.1.4.8 RDD持久化

  1. RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

// cache操作会增加血缘关系,不改变原有的血缘关系

println(wordToOneRdd.toDebugString)

// 数据缓存。

wordToOneRdd.cache()

// 可以更改存储级别

//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别

object StorageLevel {

val NONE = new StorageLevel(false, false, false, false)

val DISK_ONLY = new StorageLevel(true, false, false, false)

val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

img

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

  1. RDD CheckPoint检查点

所谓的检查点其实就是通过将RDD中间结果写入磁盘

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

// 设置检查点路径

sc.setCheckpointDir(“./checkpoint1”)

// 创建一个RDD,读取指定位置文件:hello atguigu atguigu

val lineRdd: RDD[String] = sc.textFile(“input/1.txt”)

// 业务逻辑

val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(“ “))

val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {

word => {

​ (word, System.currentTimeMillis())

}

}

// 增加缓存,避免再重新跑一个job做checkpoint

wordToOneRdd.cache()

// 数据检查点:针对wordToOneRdd做检查点计算

wordToOneRdd.checkpoint()

// 触发执行逻辑

wordToOneRdd.collect().foreach(println)

  1. 缓存和检查点区别

1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

2.1.4.9 RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。

Ø 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

Ø 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

  1. Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余

class HashPartitioner(partitions: Int) extends Partitioner {

require(partitions >= 0, s”Number of partitions ($partitions) cannot be negative.”)

def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {

case null => 0

case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)

}

override def equals(other: Any): Boolean = other match {

case h: HashPartitioner =>

h.numPartitions == numPartitions

case _ =>

false

}

override def hashCode: Int = numPartitions

}

  1. Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K : Ordering : ClassTag, V](

partitions: Int,

rdd: RDD[_ <: Product2[K, V]],

private var ascending: Boolean = true)

extends Partitioner {

// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.

require(partitions >= 0, s”Number of partitions cannot be negative but found $partitions.”)

private var ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions

private var rangeBounds: Array[K] = {

}

def numPartitions: Int = rangeBounds.length + 1

private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

def getPartition(key: Any): Int = {

val k = key.asInstanceOf[K]

var partition = 0

if (rangeBounds.length <= 128) {

// If we have less than 128 partitions naive search

​ while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {

​ partition += 1

}

} else {

// Determine which binary search method to use only once.

partition = binarySearch(rangeBounds, k)

// binarySearch either returns the match location or -[insertion point]-1

if (partition < 0) {

​ partition = -partition-1

}

if (partition > rangeBounds.length) {

​ partition = rangeBounds.length

}

}

if (ascending) {

partition

} else {

rangeBounds.length - partition

}

}

override def equals(other: Any): Boolean = other match {

}

override def hashCode(): Int = {

}

@throws(classOf[IOException])

private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {

}

@throws(classOf[IOException])

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {

}

}

2.1.4.10 RDD文件读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:text文件、csv文件、sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

Ø text文件

// 读取输入文件

val inputRDD: RDD[String] = sc.textFile(“input/1.txt”)

// 保存数据

inputRDD.saveAsTextFile(“output”)

Ø sequence文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFilekeyClass, valueClass

// 保存数据为SequenceFile

dataRDD.saveAsSequenceFile(“output”)

// 读取SequenceFile文件

sc.sequenceFileInt,Int.collect().foreach(println)

Ø object对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFileT: ClassTag函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

// 保存数据

dataRDD.saveAsObjectFile(“output”)

// 读取数据

sc.objectFileInt.collect().foreach(println)

2.2 累加器

2.2.1 实现原理

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

2.2.2 基础编程

2.2.2.1 系统累加器

val rdd = sc.makeRDD(List(1,2,3,4,5))
// 声明累加器
var sum = sc.longAccumulator("sum");
rdd.foreach(
  num => &#123;
    // 使用累加器
    sum.add(num)
  &#125;
)
// 获取累加器的值
println("sum = " + sum.value)

2.2.2.2 自定义累加器

// 自定义累加器
// 1. 继承AccumulatorV2,并设定泛型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]&#123;

var map : mutable.Map[String, Long] = mutable.Map()

// 累加器是否为初始状态
override def isZero: Boolean = &#123;
  map.isEmpty
&#125;

// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = &#123;
  new WordCountAccumulator
&#125;

// 重置累加器
override def reset(): Unit = &#123;
  map.clear()
&#125;

// 向累加器中增加数据 (In)
override def add(word: String): Unit = &#123;
    // 查询map中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加1
    // 如果没有相同的单词,那么在map中增加这个单词
    map(word) = map.getOrElse(word, 0L) + 1L
&#125;

// 合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = &#123;

  val map1 = map
  val map2 = other.value

  // 两个Map的合并
  map = map1.foldLeft(map2)(
    ( innerMap, kv ) => &#123;
      innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
      innerMap
    &#125;
  )
&#125;

// 返回累加器的结果 (Out)
override def value: mutable.Map[String, Long] = map
&#125;

2.3 广播变量

2.3.1 实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

2.3.2 基础编程

val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)

val resultRDD: RDD[(String, (Int, Int))] = rdd1.map &#123;
  case (key, num) => &#123;
    var num2 = 0
    // 使用广播变量
    for ((k, v) <- broadcast.value) &#123;
      if (k == key) &#123;
        num2 = v
      &#125;
    &#125;
    (key, (num, num2))
  &#125;
&#125;
文章目录
  1. 1 Spark运行架构
    1. 1.1 运行架构
    2. 1.2 核心组件
      1. 1.2.1 Driver
      2. 1.2.2 Executor
      3. 1.2.3 Master & Worker
      4. 1.2.4 ApplicationMaster
    3. 1.3 核心概念
      1. 1.3.1 Executor与Core(核)
      2. 1.3.2 并行度(Parallelism)
      3. 1.3.3 有向无环图(DAG)
    4. 1.4 提交流程
      1. 1.2.1 Yarn Client模式
      2. 1.2.2 Yarn Cluster模式
  2. 2 Spark核心编程
    1. 2.1 RDD
      1. 2.1.1 什么是RDD
      2. 2.1.2 核心属性
      3. 2.1.3 执行原理
      4. 2.1.4 基础编程
        1. 2.1.4.1 RDD创建
        2. 2.1.4.2 RDD并行度与分区
        3. 2.1.4.3 RDD转换算子
          1. 1) map
          2. 2) mapPartitions
          3. 3) mapPartitionsWithIndex
          4. 4) flatMap
          5. 5) glom
          6. 6) groupBy
          7. 7) filter
          8. 8) sample
          9. 9) distinct
          10. 10) coalesce
          11. 11) repartition
          12. 12) sortBy
          13. 13) intersection
          14. 14) union
          15. 15) subtract
          16. 16) zip
          17. 17) partitionBy
          18. 18) reduceByKey
          19. 19) groupByKey
          20. 20) aggregateByKey
          21. 21) foldByKey
          22. 22) combineByKey
          23. 23) sortByKey
          24. 24) join
          25. 25) leftOuterJoin
          26. 26) cogroup
        4. 2.1.4.4 案例实操
        5. 2.1.4.5 RDD行动算子
          1. 1) reduce
          2. 2) collect
          3. 3) count
          4. 4) first
          5. 5) take
          6. 6) takeOrdered
          7. 7) aggregate
          8. 8) fold
          9. 9) countByKey
          10. 10) save相关算子
          11. 11) foreach
        6. 2.1.4.6 RDD序列化
        7. 2.1.4.7 RDD依赖关系
        8. 2.1.4.8 RDD持久化
        9. 2.1.4.9 RDD分区器
        10. 2.1.4.10 RDD文件读取与保存
    2. 2.2 累加器
      1. 2.2.1 实现原理
      2. 2.2.2 基础编程
        1. 2.2.2.1 系统累加器
        2. 2.2.2.2 自定义累加器
    3. 2.3 广播变量
      1. 2.3.1 实现原理
      2. 2.3.2 基础编程