Spark学习笔记(一) 搭建Spark

1 Spark概述

1.1 spark or haddop

MR框架主要应用于数据的一次性计算:存存储介质中读取数据,然后进行过处理后,再存储到文件中。IO读取多,效率低。

1)Spark是基于MR框架的,但是优化了其中的计算过程,使用内存来代替计算结果。减少了磁盘IO,因此快。(MR多作业之间会多次磁盘的IO,因此慢)

2)Spark基于Scala语言开发的,更适合迭代计算和数据挖掘计算

3) Spark中计算模型非常丰富(MR中只有两个计算模型:mapper和reducer);spark的计算模型有:map,filter,groupby,sortby。

工作中:是Spark中和Yarn联合使用:资源用的是Yarn,计算用的是spark。

1638156481825

1.2 Spark核心模块

1.1 Spark 核心模块

1638176912041

1)Spark Core

Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的

2) Spark SQL

Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。

  1. Spark Streaming

Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。

4) Spark MLlib

MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。

5)Spark GraphX

GraphX是Spark面向图计算提供的框架与算法库。

2 Spark快速上手

在大数据早期的课程中我们已经学习了MapReduce框架的原理及基本使用,并了解了其底层数据处理的实现方式。接下来,就让咱们走进Spark的世界,了解一下它是如何带领我们完成数据处理的。

2.1 创建Maven项目

2.1.1 增加Scala插件

Spark由Scala语言开发的,所以本课件接下来的开发所使用的语言也为Scala,咱们当前使用的Spark版本为3.0.0,默认采用的Scala编译版本为2.12,所以后续开发时。我们依然采用这个版本。开发前请保证IDEA开发工具中含有Scala开发插件。

1638176561427

2.1.2 增加依赖关系

修改Maven项目中的POM文件,增加Spark框架的依赖关系。本课件基于Spark3.0版本,使用时请注意对应版本。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!-- 该插件用于将Scala代码编译成class文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <!-- 声明绑定到maven的compile阶段 -->
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2.1.3 WordCount

为了能直观地感受Spark框架的效果,接下来我们实现一个大数据学科中最常见的教学案例WordCount

// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

// 创建Spark上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 读取文件数据
val fileRDD: RDD[String] = sc.textFile("input/word.txt")

// 将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )

// 转换数据结构 word => (word, 1)
val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))

// 将转换结构后的数据按照相同的单词进行分组聚合
val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)

// 将数据聚合结果采集到内存中
val word2Count: Array[(String, Int)] = word2CountRDD.collect()

// 打印结果
word2Count.foreach(println)

//关闭Spark连接
sc.stop()

执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项目的resources目录中创建log4j.properties文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d&#123;yy/MM/dd HH:mm:ss&#125; %p %c&#123;1&#125;: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

2.1.4 异常处理

如果本机操作系统是Windows,在程序中使用了Hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常:

1638176731823

出现这个问题的原因,并不是程序的错误,而是windows系统用到了hadoop相关的服务,解决办法是通过配置关联到windows的系统依赖就可以了

1638176805506

在IDEA中配置Run Configuration,添加HADOOP_HOME变量

1638176855314

1638176879056

3 Spark运行环境

Spark作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行, 在国内工作中主流的环境为Yarn,不过逐渐容器式环境也慢慢流行起来。接下来,我们就分别看看不同环境下Spark的运行。

1638176475372

3.1 Local模式

所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等,之前在IDEA中运行代码的环境我们称之为开发环境,不太一样。(Local=本机提供资源+spark提供计算

3.1.1 解压缩文件

将spark-3.0.0-bin-hadoop3.2.tgz文件上传到Linux并解压缩,放置在指定位置,路径中不要包含中文或空格,课件后续如果涉及到解压缩操作,不再强调。

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
cd /opt/module 
mv spark-3.0.0-bin-hadoop3.2 spark-local

3.1.2 启动Local环境

  1. 进入解压缩后的路径,执行如下指令
bin/spark-shell

1638173742050

  1. 启动成功后,可以输入网址进行Web UI监控页面访问

http://虚拟机地址:4040

1638176427950

3.1.3 命令行工具

在解压缩文件夹下的data目录中,添加word.txt文件。在命令行工具中执行如下代码指令(和IDEA中代码简化版一致)

sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

1638176440182

3.1.4 退出本地模式

按键Ctrl+C或输入Scala指令

:quit

3.1.5 提交应用

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  1. –class表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
  2. –master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量;不知道可以用*
  3. spark-examples_2.12-3.0.0.jar 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
  4. 数字10表示程序的入口参数,用于设定当前应用的任务数量

3.2 Standalone模式

local本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行,这里我们来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。(Standalone=spark提供资源+spark提供计算

1638174301463

集群规划:

Linux1Linux2Linux3
SparkWorker MasterWorkerWorker

3.2.1 解压缩文件

将spark-3.0.0-bin-hadoop3.2.tgz文件上传到Linux并解压缩在指定位置

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
cd /opt/module 
mv spark-3.0.0-bin-hadoop3.2 spark-standalone

3.2.2 修改配置文件

  1. 进入解压缩后路径的conf目录,修改slaves.template文件名为slaves
mv slaves.template slaves 修改slaves文件,添加worker节点
linux1
linux2
linux3
  1. 修改spark-env.sh.template文件名为spark-env.sh
mv spark-env.sh.template spark-env.sh

修改spark-env.sh文件,添加JAVA_HOME环境变量和集群对应的master节点。指定master机器,和集群间spark通信的端口。

export JAVA_HOME=/opt/module/jdk1.8.0_144
SPARK_MASTER_HOST=linux1
SPARK_MASTER_PORT=7077

注意:7077端口,相当于hadoop3内部通信的8020端口,此处的端口需要确认自己的Hadoop配置

  1. 分发spark-standalone目录
xsync spark-standalone

3.2.3 启动集群

  1. 执行脚本命令:
sbin/start-all.sh

1638174700872

  1. 查看三台服务器运行进程
xcall jps
================linux1================
3330 Jps
3238 Worker
3163 Master
================linux2================
2966 Jps
2908 Worker
================linux3================
2978 Worker
3036 Jps
  1. 查看Master资源监控Web UI界面: http://linux1:8080

1638174744996

3.2.4 提交应用

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077 \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  1. –class表示要执行程序的主
  2. –master spark://linux1:7077 独立部署模式,连接到Spark集群
  3. spark-examples_2.12-3.0.0.jar 运行类所在的jar包
  4. 数字10表示程序的入口参数,用于设定当前应用的任务数量

执行任务时,会产生多个Java进程

1638175491608

执行任务时,默认采用服务器集群节点的总核数,每个节点内存1024M。

1638175504305

3.2.5 提交参数说明

在提交应用中,一般会同时一些提交参数

bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \
[application-arguments]
参数解释可选值举例
–classSpark程序中包含主函数的类
–masterSpark程序运行的模式(环境)*模式:local[]、spark://linux1:7077、 Yarn**
–executor-memory 1G指定每个executor可用内存为1G符合集群内存配置即可,具体情况具体分析。
–total-executor-cores 2指定所有executor使用的cpu核数为2个
–executor-cores指定每个executor使用的cpu核数
application-jar打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar
application-arguments传给main()方法的参数

3.2.6 配置历史服务

由于spark-shell停止掉后,集群监控linux1:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。**(历史信息存到了HDFS中)**

  1. 修改spark-defaults.conf.template文件名为spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
  1. 修改spark-default.conf文件,配置日志存储路径
spark.eventLog.enabled     true
spark.eventLog.dir        hdfs://linux1:8020/directory

注意:需要启动hadoop集群,HDFS上的directory目录需要提前存在。

sbin/start-dfs.sh
hadoop fs -mkdir /directory
  1. 修改spark-env.sh文件, 添加日志配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory 
-Dspark.history.retainedApplications=30"

l 参数1含义:WEB UI访问的端口号为18080
l 参数2含义:指定历史服务器日志存储路径
l 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  1. 分发配置文件
xsync conf 
  1. 重新启动集群和历史服务
sbin/start-all.sh

sbin/start-history-server.sh
  1. 重新执行任务
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077 \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  1. 查看历史服务:http://linux1:18080

1638175185282

3.2.7 配置高可用(HA)

所谓的高可用是因为当前集群中的Master节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个Master节点,一旦处于活动状态的Master发生故障时,由备用Master提供服务,保证作业可以继续执行。这里的高可用一般采用Zookeeper设置。

集群规划:

Linux1Linux2Linux3
SparkMaster Zookeeper WorkerMaster Zookeeper WorkerZookeeper Worker
  1. 停止集群
sbin/stop-all.sh 
  1. 启动Zookeeper
xstart zk 
  1. 修改spark-env.sh文件添加如下配置

注释如下内容:

#SPARK_MASTER_HOST=linux1

#SPARK_MASTER_PORT=7077

添加如下内容:

#Master监控页面默认访问端口为8080,但是可能会和Zookeeper冲突,所以改成8989,也可以自定义,访问UI监控页面时请注意

SPARK_MASTER_WEBUI_PORT=8989
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=linux1,linux2,linux3 
-Dspark.deploy.zookeeper.dir=/spark"
  1. 分发配置文件
xsync conf/ 
  1. 启动集群
sbin/start-all.sh 

1638175383495

  1. 启动linux2的单独Master节点,此时linux2节点Master状态处于备用状态
[root@linux2 spark-standalone]# sbin/start-master.sh 

1638175397500

  1. 提交应用到高可用集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077,linux2:7077 \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
  1. 停止linux1的Master资源监控进程

1638175436542

  1. 查看linux2的Master 资源监控Web UI,稍等一段时间后,linux2节点的Master状态提升为活动状态

1638175451854

3.3 Yarn模式

独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。

1638175863107

3.3.1 解压缩文件

将spark-3.0.0-bin-hadoop3.2.tgz文件上传到linux并解压缩,放置在指定位置。

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
cd /opt/module 
mv spark-3.0.0-bin-hadoop3.2 spark-yarn

3.3.2 修改配置文件

  1. 修改hadoop配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
     <name>yarn.nodemanager.pmem-check-enabled</name>
     <value>false</value>
</property>

<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
     <name>yarn.nodemanager.vmem-check-enabled</name>
     <value>false</value>
</property>
  1. 修改conf/spark-env.sh,添加JAVA_HOME和YARN_CONF_DIR配置
mv spark-env.sh.template spark-env.sh

查找yarn的位置

export JAVA_HOME=/opt/module/jdk1.8.0_144
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop

3.3.3 启动HDFS以及YARN集群

瞅啥呢,自己启动去!

3.3.4 提交应用

master指定为yarn,部署模式为集群模式。

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

1638175901609

1638175915913

查看 http://linux2:8088 页面,点击History,查看历史页面

1638175935231

3.3.5 配置历史服务器

  1. 修改spark-defaults.conf.template文件名为spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
  1. 修改spark-default.conf文件,配置日志存储路径
spark.eventLog.enabled     true
spark.eventLog.dir        hdfs://linux1:8020/directory

注意:需要启动hadoop集群,HDFS上的目录需要提前存在。

[root@linux1 hadoop]# sbin/start-dfs.sh
[root@linux1 hadoop]# hadoop fs -mkdir /directory
  1. 修改spark-env.sh文件, 添加日志配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory 
-Dspark.history.retainedApplications=30"

l 参数1含义:WEB UI访问的端口号为18080

l 参数2含义:指定历史服务器日志存储路径

l 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。

  1. 修改spark-defaults.conf
spark.yarn.historyServer.address=linux1:18080
spark.history.ui.port=18080
  1. 启动历史服务
sbin/start-history-server.sh 
  1. 重新提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

1638176022879

1638176037995

  1. Web页面查看日志:http://linux2:8088

1638176059473

1638176071216

3.4 K8S & Mesos模式

Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核,在Twitter得到广泛使用,管理着Twitter超过30,0000台服务器上的应用部署,但是在国内,依然使用着传统的Hadoop大数据框架,所以国内使用Mesos框架的并不多,但是原理其实都差不多,这里我们就不做过多讲解了。

1638176091921

容器化部署是目前业界很流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Spark也在最近的版本中支持了k8s部署模式。这里我们也不做过多的讲解。给个链接大家自己感受一下:https://spark.apache.org/docs/latest/running-on-kubernetes.html

1638176106974

3.5 Windows模式

在同学们自己学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程,并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度,Spark非常暖心地提供了可以在windows系统下启动本地集群的方式,这样,在不使用虚拟机的情况下,也能学习Spark的基本使用!

在后续的教学中,为了能够给同学们更加流畅的教学效果和教学体验,我们一般情况下都会采用windows系统的集群来学习Spark。

3.5.1 解压缩文件

将文件spark-3.0.0-bin-hadoop3.2.tgz解压缩到无中文无空格的路径中

3.5.2 启动本地环境

  1. 执行解压缩文件路径下bin目录中的spark-shell.cmd文件,启动Spark本地环境

1638176202265

  1. 在bin目录中创建input目录,并添加word.txt文件, 在命令行中输入脚本代码

1638176219502

3.5.3 命令行提交应用

在DOS命令行窗口中执行提交指令

spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10

3.6 部署模式对比

模式Spark安装机器数需启动的进程所属者应用场景
Local1Spark测试
Standalone3Master及WorkerSpark单独部署
Yarn1Yarn及HDFSHadoop混合部署

3.7 端口号

  • Spark查看当前Spark-shell运行任务情况端口号:4040(计算)

  • Spark Master内部通信服务端口号:7077

  • Standalone模式下,Spark Master Web端口号:8080(资源)

  • Spark历史服务器端口号:18080

  • Hadoop YARN任务运行情况查看端口号:8088

文章目录
  1. 1 Spark概述
    1. 1.1 spark or haddop
    2. 1.2 Spark核心模块
    3. 1.1 Spark 核心模块
  2. 2 Spark快速上手
    1. 2.1 创建Maven项目
      1. 2.1.1 增加Scala插件
      2. 2.1.2 增加依赖关系
      3. 2.1.3 WordCount
      4. 2.1.4 异常处理
  3. 3 Spark运行环境
    1. 3.1 Local模式
      1. 3.1.1 解压缩文件
      2. 3.1.2 启动Local环境
      3. 3.1.3 命令行工具
      4. 3.1.4 退出本地模式
      5. 3.1.5 提交应用
    2. 3.2 Standalone模式
      1. 3.2.1 解压缩文件
      2. 3.2.2 修改配置文件
      3. 3.2.3 启动集群
      4. 3.2.4 提交应用
      5. 3.2.5 提交参数说明
      6. 3.2.6 配置历史服务
      7. 3.2.7 配置高可用(HA)
    3. 3.3 Yarn模式
      1. 3.3.1 解压缩文件
      2. 3.3.2 修改配置文件
      3. 3.3.3 启动HDFS以及YARN集群
      4. 3.3.4 提交应用
      5. 3.3.5 配置历史服务器
    4. 3.4 K8S & Mesos模式
    5. 3.5 Windows模式
      1. 3.5.1 解压缩文件
      2. 3.5.2 启动本地环境
      3. 3.5.3 命令行提交应用
    6. 3.6 部署模式对比
    7. 3.7 端口号