由于Spark仅仅是一种计算框架,不负责数据的存储和管理,因此,通常都会将Spark和Hadoop进行统一部署,由Hadoop中的HDFS、HBase等组件负责数据的存储管理,Spark负责数据计算。
安装Spark集群前,需要安装Hadoop环境
软件 | 版本 |
---|---|
Linux系统 | CentOS7.9版本 |
Hadoop | 3.3.4版本 |
JDK | 1.8版本 (jdk8u231) |
Spark | 3.3.2版本 |
/opt
目录执行命令:tar -zxvf spark-3.3.2-bin-hadoop3.tgz -C /usr/local
查看解压之后的spark目录
vim /etc/profile
export SPARK_HOME=/usr/local/spark-3.3.2-bin-hadoop3
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
source /etc/profile
,让环境配置生效run-example SparkPi 2
(其中参数2是指两个并行度)Pi is roughly 3.1412357061785308
执行spark-shell
命令,启动Scala版的Spark-Shell
注意:Spark 3.3.2使用的Scala版本其实是2.12.15
利用print函数输出了一条信息
计算1 + 2 + 3 + …… + 100
输出字符直角三角形
打印九九表
执行:quit
命令,退出Spark Shell交互式环境
pyspark
命令启动Python版的Spark-Shellyum -y install python3
pyspark
/home
目录下创建test.txt
文件在pyspark命令行,执行命令:lines = sc.textFile('test.txt')
创建出来后,RDD 支持两种类型的操作: 转化操作(transformation) 和行动操作(action)。转化操作会由一个RDD 生成一个新的RDD。另一方面,行动操作会对RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中。
sparkLines = lines.filter(lambda line: 'spark' in line)
sparkLines.first()
惰性
计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。这种策略刚开始看起来可能会显得有些奇怪,不过在大数据领域是很有道理的。比如,看看例2
和例3
,我们以一个文本文件定义了数据,然后把其中包含spark的行筛选出来。如果Spark 在我们运行lines = sc.textFile(...)
时就把文件中所有的行都读取并存储起来,就会消耗很多存储空间,而我们马上就要筛选掉其中的很多数据。相反, 一旦Spark 了解了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。事实上,在行动操作first()
中,Spark 只需要扫描文件直到找到第一个匹配的行为止,而不需要读取整个文件。spark
的行,执行命令:sparkLines.collect()
client
和cluster
,默认是client
。可以在向Spark集群提交应用程序时使用--deploy-mode
参数指定提交方式。当提交方式为client时,运行架构如下图所示
集群的主节点称为Master节点,在集群启动时会在主节点启动一个名为Master的守护进程,类似YARN集群的ResourceManager;从节点称为Worker节点,在集群启动时会在各个从节点上启动一个名为Worker的守护进程,类似YARN集群的NodeManager。
Spark
在执行应用程序的过程中会启动Driver
和Executor
两种JVM进程。
Driver
为主控进程,负责执行应用程序的main()方法,创建SparkContext
对象(负责与Spark
集群进行交互),提交Spark作业,并将作业转化为Task
(一个作业由多个Task
任务组成),然后在各个Executor
进程间对Task进行调度和监控。通常用SparkContext
代表Driver
。在上图的架构中,Spark
会在客户端启动一个名为SparkSubmit
的进程,Driver
程序则运行于该进程。
Executor
为应用程序运行在Worker节点上的一个进程,由Worker进程启动,负责执行具体的Task,并存储数据在内存或磁盘上。每个应用程序都有各自独立的一个或多个Executor进程。在Spark Standalone模式和Spark on YARN模式中,Executor进程的名称为CoarseGrainedExecutorBackend
,类似运行MapReduce
程序所产生的YarnChild
进程,并且同时与Worker
、Driver
都有通信。
Standalone cluster
提交方式提交应用程序后,客户端仍然会产生一个名为SparkSubmit
的进程,但是该进程会在应用程序提交给集群之后就立即退出。当应用程序运行时,Master
会在集群中选择一个Worker
进程启动一个名为DriverWrapper
的子进程,该子进程即为Driver
进程,所起的作用相当于YARN
集群的ApplicationMaster
角色,类似MapReduce
程序运行时所产生的MRAppMaster
进程。节点 | 角色 |
---|---|
master | Master |
slave1 | Worker |
slave2 | Worker |
/opt
目录,查看上传的spark安装包tar -zxvf spark-3.3.2-bin-hadoop3.tgz -C /usr/local
vim /etc/profile
export SPARK_HOME=/usr/local/spark-3.3.2-bin-hadoop3
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
存盘退出后,执行命令:source /etc/profile
,让配置生效
查看spark安装目录(bin
、sbin
和conf
三个目录很重要)
cp spark-env.sh.template spark-env.sh
与vim spark-env.sh
export JAVA_HOME=/usr/local/jdk1.8.0_231
export SPARK_MASTER_HOST=master
export SPARK_MASTER_PORT=7077
JAVA_HOME
:指定JAVA_HOME的路径。若集群中每个节点在/etc/profile文件中都配置了JAVA_HOME,则该选项可以省略,Spark集群启动时会自动读取。为了防止出错,建议此处将该选项配置上。SPARK_MASTER_HOST
:指定集群主节点(master)的主机名,此处为master
。SPARK_MASTER_PORT
:指定Master节点的访问端口,默认为7077
。source spark-env.sh
,让配置生效vim slaves
,添加两个从节点主机名scp -r $SPARK_HOME root@slave1:$SPARK_HOME
在master虚拟机上,执行命令:scp /etc/profile root@slave1:/etc/profile
在slave1虚拟机上,执行命令:source /etc/profile
,让环境配置生效
source spark-env.sh
scp -r $SPARK_HOME root@slave2:$SPARK_HOME
在master虚拟机上,执行命令:scp /etc/profile root@slave2:/etc/profile
在slave2虚拟机上,执行命令:source /etc/profile
,让环境配置生效
source spark-env.sh
start-dfs.sh
执行命令:start-all.sh
查看start-all.sh
的源码启动Master与Worker的命令
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh
# Start Worker
s"${SPARK_HOME}/sbin"/start-slaves.sh
可以看到,当执行start-all.sh命令时,会分别执行start-master.sh命令启动Master,执行start-slaves.sh命令启动Worker。
注意,若spark-evn.sh
中配置了SPARK_MASTER_HOST
属性,则必须在该属性指定的主机上启动Spark
集群,否则会启动不成功;若没有配置SPARK_MASTER_HOST
属性,则可以在任意节点上启动Spark
集群,当前执行启动命令的节点即为Master
节点。
启动完毕后,分别在各节点执行jps
命令,查看启动的进程。若在master节点存在Master进程,slave1节点存在Worker进程,slave2节点存在Worker进程,则说明集群启动成功。
查看master节点进程
查看slave1节点进程
查看slave2节点进程
http://master:8080
spark-shell --master spark://master:7077
vim test.txt
val rdd = sc.textFile("hdfs://master:9000/park/test.txt")
rdd.collect
val wordcount = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2, false)
与wordcount.collect.foreach(println)
spark-submit
,使用该工具可以将编写好的Spark应用程序提交到Spark集群。$ bin/spark-submit [options] [app options]
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \
./examples/jars/spark-examples_2.11-2.1.1.jar
查看运行结果
上述命令中的–master参数指定了Master节点的连接地址。该参数根据不同的Spark集群模式,其取值也有所不同,常用取值如下表所示。
取值 | 描述 |
---|---|
spark://host:port | Standalone模式下的Master节点的连接地址,默认端口为7077 |
yarn | 连接到YARN集群。若YARN中没有指定ResourceManager的启动地址,则需要在ResourceManager所在的节点上进行应用程序的提交,否则将因找不到ResourceManager而提交失败 |
local | 运行本地模式,使用1个CPU核心 |
local [N] | 运行本地模式,使用N个CPU核心。例如,local[2]表示使用两个CPU核心运行程序 |
local[*] | 运行本地模式,尽可能使用最多的CPU核心 |
cluster
(Driver进程运行在集群的工作节点中),执行命令如下:bin/spark-submit \
--master spark://master:7077 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
--driver-memory 512m \
--executor-memory 1g \
--executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar
http://master:8080
Worder
超链接stdout
超链接stop-all.sh