大数据开发工程师-第十五周 Spark 3.x版本扩展内容

大数据开发工程师-第十五周 Spark 3.x版本扩展内容

快速上手使用Spark3

Spark 3.0.0版本介绍

1
2
3
Spark3.0.0版本是Spark 3.x系列的第一个正式版本,他于2020-6-10日正式发布。
Spark 3.x版本中重点是对Spark SQL模块中的功能进行了优化。其中46%的优化都集中在Spark SQL上。
通过官方基准(TPC-DS 30TB )测试,Spark 3.0的性能大约是Spark 2.4 的两倍。

Spark 3.x的使用

1
2
3
4
5
6
7
8
9
10
11
Spark3.x的核心代码和Spark 2.x没有什么区别,目前我们在常规使用中暂时还没有发现什么不兼容的情况。
主要的区别就是Spark3.x版本默认支持Scala 2.12版本了,之前的Spark 2.x版本最高只能支持到scala 2.11版本。
因为Spark 3.x主要是对SparkSQL模块进行了性能优化,针对架构层面和API层面的内容并没有大幅度的修改。

下面我们的流程是先基于Spark 3.x版本开发一套代码
然后在已有的大数据集群中集成Spark 3.x环境

最后向YARN中同时提交Spark 2.x和Spark 3.x的代码。
也就是说在YARN集群中,支持同时运行Spark 2.x和Spark 3.x的代码。

在企业中也会真正遇到这种场景,假设我们之前的代码都是基于Spark 2.x版本开发的,现在想要使用Spark3.x中的新特性,但是之前的代码也不能推倒重做,重做的话成本太高,并且谁也不能保证新开发的代码不出问题,所以最稳妥的方案就是之前的代码保持不动,新的需求基于Spark 3.x版本进行开发即可。

基于Spark 3.x版本开发代码

1
2
3
4
5
6
7
8
9
10
首先基于Spark 3.x版本开发一套WordCount的代码
创建项目db_spark3
在项目中引入scala 2.12.11
在pom.xml中添加spark 3.x相关的依赖。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
代码直接参考之前的写法即可。
在这里使用scala代码进行开发,创建包:com.imooc.scala.rdd

package com.imooc.scala.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
* 基于Spark3.x版本的WordCount案例
* Created by xuwei
*/
object WordCountScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("WordCountScala")
.setMaster("local")
val sc = new SparkContext(conf)

var path = "D:\\hello.txt"
if(args.length==1){
path = args(0)
}
val linesRDD = sc.textFile(path)
val wordsRDD = linesRDD.flatMap(_.split(" "))
val pairRDD = wordsRDD.map((_,1))
val wordCountRDD = pairRDD.reduceByKey(_ + _)
wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))
sc.stop()

}
}

在已有的大数据集群中集成Spark 3.x环境

1
2
3
4
5
6
在Spark ON YARN架构下,之前默认是使用的Spark2.x环境,现在想要在已有的大数据集群中集成Spark 3.x环境,操作是非常简单的。
只需要在客户端节点安装部署Spark 3.x的环境,修改一些基础配置,这样集群就可以兼容spark 2.x和spark 3.x这两大版本了。

其实简单来说,想要在集群中同时兼容spark 2.x和spark 3.x,只需要在客户端中同时安装这两个版本的Spark即可。

提前下载好spark 3.x版本的安装包,目前最新版本是spark 3.2.1版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
1.将spark-3.2.1-bin-hadoop3.2.tgz压缩包上传到bigdata04的/data/soft目录中。
2.解压。
3.重命名spark-env.sh.template

[root@bigdata04 soft]# cd spark-3.2.1-bin-hadoop3.2/conf/
[root@bigdata04 conf]# mv spark-env.sh.template spark-env.sh

4.修改spark-env.sh
在文件末尾增加下面这些内容,主要是指定JAVA_HOME和Hadoop的配置文件目录。
[root@bigdata04 conf]# vi spark-env.sh
....
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_CONF_DIR=/data/soft/hadoop-3.2.0/etc/hadoop

向YARN中同时提交Spark 2.x和Spark 3.x的代码

编译打包

1
2
3
4
5
首先向YARN集群中提交Spark 3.x的代码
修改pom.xml文件中依赖的配置,将依赖设置为provided。

1.编译打包。
修改前面我们开发的代码,将.setMaster("local")注释掉,修改后的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.imooc.scala.rdd

import org.apache.spark.{SparkConf, SparkContext}

/**
* 基于Spark3.x版本的WordCount案例
* Created by xuwei
*/
object WordCountScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("WordCountScala")
//.setMaster("local")
val sc = new SparkContext(conf)

var path = "D:\\hello.txt"
if(args.length==1){
path = args(0)
}
val linesRDD = sc.textFile(path)
val wordsRDD = linesRDD.flatMap(_.split(" "))
val pairRDD = wordsRDD.map((_,1))
val wordCountRDD = pairRDD.reduceByKey(_ + _)
wordCountRDD.foreach(wordCount=&gt;println(wordCount._1+"--"+wordCount._2))
sc.stop()

}
}

1
2
3
4
增加编译打包配置。

注意:这里面的scala编译插件里面的scala版本需要修改为2.12。咱们之前在spark
2.x版本中使用的是scala 2.11版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
2.提交任务。
将编译好的任务jar包上传到bigdata04中的spark-3.2.1-bin-hadoop3.2目录中。
开发提交任务脚本,内容如下:

[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# vi spark3WordCountJob.sh
/data/soft/spark-3.2.1-bin-hadoop3.2/bin/spark-submit \
--class com.imooc.scala.rdd.WordCountScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark3-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://bigdata01:9000/test/hello.txt
1
2
3
注意:在这里需要指定spark-submit的全路径,要不然容易冲突,因为这个客户端节点上有两个版本的spark,他们都有spark-submit脚本。

如果感觉指定全路径比较麻烦,其实还有一种比较好的解决方案,就是将spark 3.2.1版本中的spark-submit脚本的名字改为spark-submit-3 ,然后再把spark 3.2.1的bin目录指定到PATH环境变量中,这样就可以在任意目录下使用spark-submit-3这个脚本提交spark 3.x版本的代码了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
修改spark 3.2.1中的spark-submit脚本名称。
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# cd bin
[root@bigdata04 bin]# mv spark-submit spark-submit-3

配置环境变量,主要是在PATH中增加/data/soft/spark-3.2.1-bin-hadoop3.2/bin这个路径。
[root@bigdata04 bin]# vi /etc/profile
...
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$SPARK_HOME/bin:$HIVE_HO
ME/bin:/data/soft/spark-3.2.1-bin-hadoop3.2/bin:$PATH
[root@bigdata04 bin]# source /etc/profile

重新修改任务脚本,内容如下。
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# vi spark3WordCountJob.sh
spark-submit-3 \
--class com.imooc.scala.rdd.WordCountScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark3-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://bigdata01:9000/test/hello.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
执行任务脚本:
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# sh -x spark3WordCountJob.sh
结果发现任务执行报错了,报错信息如下:

Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction2$mcIII$sp
at com.imooc.scala.rdd.WordCountScala$.main(WordCountScala.scala:23)
at com.imooc.scala.rdd.WordCountScala.main(WordCountScala.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction2$mcIII$sp
... 14 more
Caused by: java.lang.ClassNotFoundException: scala.runtime.java8.JFunction2$mcIII$sp
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more
1
通过查看Caused by后面的报错信息可以大致看出来是和scala有关的,如果不能指定定位到具体问题,可以尝试把这个核心错误信息(scala.runtime.java8.JFunction2$mcIII$sp)拿到网上查一下
1
2
3
4
5
6
7
通过查询公开资料大致可以知道是因为编译环境的scala版本和执行环境的scala版本不一样导致的。
我们这个spark 3.x的代码里面使用的是scala版本是2.12的,那么大概率是在运行的时候集群里面使用的是2.11版本的。

集群中这个2.11版本的scala是从哪里传过去的呢?
可以来看一下spark的提交日志:
2027-09-16 11:53:04,585 INFO yarn.Client: Uploading resource file:/tmp/spark-e79be783-7483-4ddf-acea-b448dfc87a77/__spark_libs__6722356865143855145.zip -> hdfs://bigdata01:9000/user/root/.sparkStaging/application_1821060972317_0003/__spark_libs__6722356865143855145.zip
2027-09-16 11:53:08,066 INFO yarn.Client: Uploading resource file:/tmp/spark-e79be783-7483-4ddf-acea-b448dfc87a77/__spark_conf__4721759241490386269.zip -> hdfs://bigdata01:9000/user/root/.sparkStaging/application_1821060972317_0003/__spark_conf__.zip
1
2
3
4
5
6
7
这两个zip压缩包是Spark客户端在提交任务的时候,从本地的spark安装包中得到的。
其中__spark_libs__6722356865143855145.zip压缩包中存储的就是spark安装目录下jars目录中的所有jar包。

重新提交这个任务,获取__spark_libs这个zip包。

注意:任务执行结果,就获取不到这个zip包了,只能在任务运行期间才可以获取到,所以在操作的时候手速一定要快。
[root@bigdata04 ~]# hdfs dfs -get hdfs://bigdata01:9000/user/root/.sparkStaging/application_1821060972317_0004/__spark_libs__3047651213740252943.zip .
1
把下载下来的这个zip包传到windows中,解压查看一下

image-20240405152236031

1
2
3
4
5
6
7
8
9
10
11
12
13
此时发现这里面的scala依赖确实是2.11版本的。
我们提交的代码是基于scala 2.12编译的,版本差异比较大的时候代码执行确实会报错。

但是spark 3.2.1这个安装包中的scala版本肯定也是2.12的,我们来确认一下。
查询spark 3.2.1的jars目录中和scala相关的jar包:

[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# ll jars/scala-*
-rw-r--r--. 1 501 supergroup 112235 Jan 21 2022 jars/scala-collection-compat_2.12-2.1.1.jar
-rw-r--r--. 1 501 supergroup 10978529 Jan 21 2022 jars/scala-compiler-2.12.15.jar
-rw-r--r--. 1 501 supergroup 5443542 Jan 21 2022 jars/scala-library-2.12.15.jar
-rw-r--r--. 1 501 supergroup 222980 Jan 21 2022 jars/scala-parser-combinators_2.12-1.1.2.jar
-rw-r--r--. 1 501 supergroup 3678167 Jan 21 2022 jars/scala-reflect-2.12.15.jar
-rw-r--r--. 1 501 supergroup 556575 Jan 21 2022 jars/scala-xml_2.12-1.2.0.jar
1
2
3
4
5
6
7
8
9
10
此时发现这里面scala的jar包都是2.12版本的,和我们代码编译时使用的大版本是一致的,只要都是2.12版本就行,后面的第3位小版本号不用太过于关注,一般都是兼容的。

分析到这其实我们就可以大致猜到应该是在提交任务的时候,客户端节点在获取这些依赖的时候,还是找的spark 2.4.3版本。
可以到spark 2.4.3中确认一下scala的版本是不是和前面的一样:
[root@bigdata04 spark-2.4.3-bin-hadoop2.7]# ll jars/scala*
-rw-r--r--. 1 xuwei supergroup 15612191 May 1 2019 jars/scala-compiler-2.11.12.jar
-rw-r--r--. 1 xuwei supergroup 5749423 May 1 2019 jars/scala-library-2.11.12.jar
-rw-r--r--. 1 xuwei supergroup 471925 May 1 2019 jars/scala-parser-combinators_2.11-1.1.0.jar
-rw-r--r--. 1 xuwei supergroup 4623075 May 1 2019 jars/scala-reflect-2.11.12.jar
-rw-r--r--. 1 xuwei supergroup 671138 May 1 2019 jars/scala-xml_2.11-1.0.5.jar
1
2
3
4
5
6
7
8
9
此时发现这里面的scala版本就是2.11.12,和任务运行期间生成的那个zip包中的依赖版本是一致的。

此时客户端在提交任务查找依赖的时候找到了spark-2.4.3版本,大概率是因为我们在/etc/profile中配置了SPARK_HOME,并且指向了spark-2.4.3版本的安装目录。

想要解决这个问题,可以去修改/etc/profile中的SPARK_HOME,让它指向spark 3.2.1版本,但是这样的话,后期提交spark 2.x版本的任务也会出现类似的问题。

还有一种办法是在spark 3.x版本的任务提交脚本中临时定义一下SPARK_HOME,让它指向spark 3.2.1版本。但是这样后期每个任务提交脚本中都需要指定,也比较麻烦。

终极解决方案是在spark-submit-3这个脚本中定义一下SPARK_HOME,让它指向spark 3.2.1版本,这样就完美解决了。
1
2
3
4
[root@bigdata04 bin]# vi spark-submit-3 
...
export SPARK_HOME=/data/soft/spark-3.2.1-bin-hadoop3.2
...
1
2
3
重新提交任务。

这个时候任务就可以正常成功执行了。

historyserver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
到这其实大家还有一个疑问,现在是有2个版本的spark,那对应的historyserver服务是不是也需要启动2个?
不是的,historyserver服务只需要启动一个即可。

但是需要在spark 3.2.1的配置文件中修改historyserver相关的配置。
下面我们来演示一下:
在spark 3.2.1中修改。
首先修改spark-defaults.conf
在文件末尾增加下面的配置

[root@bigdata04 conf]# mv spark-defaults.conf.template spark-defaults.conf
[root@bigdata04 conf]# vi spark-defaults.conf
......
spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=http://bigdata04:18080
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后修改spark-env.sh
在文件末尾增加下面的配置

[root@bigdata04 conf]# vi spark-env.sh
......
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.histo
ry.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs"

接下来需要先确认Hadoop和Spark2.x中的historyserver服务是否启动。
启动Hadoop集群的historyserver服务。

在bigdata01上执行。
[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver

在bigdata01上执行。

在bigdata01上执行。
1
2
3
4
5
6
7
8
9
10
11
12
启动spark的historyserver服务,在bigdata04上执行。

注意:在这里直接使用spark-2.4.3中的historyserver服务即可。
不需要在spark-3.2.1中再额外启动historyserver服务了。

[root@bigdata04 spark-2.4.3-bin-hadoop2.7]# sbin/start-history-server.sh

接下来重新提交spark 3.x版本的代码。
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# sh -x spark3WordCountJob.sh

任务执行结束之后,到YARN上查看一下任务的执行信息。
从图中是可以看到目前的spark任务界面是3.2.1这个版本的。

image-20240405153536212

1
2
3
4
5
6
接下来提交一个spark 2.x版本的代码。
执行之前开发的wordCountJob.sh。
[root@bigdata04 sparkjars]# sh -x wordCountJob.sh

任务执行结束之后,到YARN上查看一下任务的执行信息。
从图中是可以看到目前的spark任务界面是2.4.3这个版本的。

image-20240405153558508

1
到这里我们可以支持在同一个YARN集群中同时执行spark 2.x和spark 3.x版本的代码,并且对应的spark 任务界面也都是正常的。

Spark 3.x版本中新特性的原理及应用

Spark 1.x~3.x的演变历史

1
2
3
4
5
6
7
8
9
10
11
12
13
14
在Spark 1.x中,主要是通过Rule这个执行框架,把一批规则应用在一个执行计划上进而得到一个新的执行计划。

在Spark 2.x中增加了基于计算的Cost模型,该Cost模型是为了让SparkSQL获得更好的优化效果,进而获得高效的执行计划,在应用基于 Cost模型优化的时候,需要对数据进行统计。
但是Cost模型在Spark中表现的并不好,主要是由于下面这几点原因:

一次性计算。
因为Spark的主要计算场景就是ETL,对数据只需要计算一次,但是它收集数据的成本是比较昂贵的,所以在最初生成任务执行计划的时候会缺少真实数据的统计信息,统计信息的缺失会导致基于Cost的优化基本不可能完成。

存储和计算的分离。
Spark不存储数据,用户可以通过不同的方式操作数据,如果统计信息出现错误,无法保证基于Cost优化的正确性,甚至优化后的结果可能会更差。

多种环境的部署。
在不同环境中Cost的模型是多样的。无法使用一套通用的Cost模型,而且针对Spark中的UDF功能,用户需要根据自己的需要任意添加。这种情况下也无法实现基于Cost的优化。
基于以上原因导致很多时候难以计算Cost模型,进而导致无法获取有效的优化计划。
1
2
3
因此,spark 3.0在Cost基础之上增加了Runtime,Runtime可以收集任务在运行期间的统计信息,实现动态优化任务的执行计划。

也就是说任务在最开始的时候先生成一个初始的执行计划,随着任务的执行,根据runtime收集到的运行期间的统计信息,可以对初始的执行计划进行动态优化修改,生成新的执行计划去执行。

Spark 3.x新特性

1
2
3
4
5
6
7
8
9
10
11
12
下面来看一下Spark 3.x版本中具有代表的新特性

首先到官网上来看一下:
进入下载界面,

点击Archive,进入到Spark的更新历史界面。

找到Spark 3.0.0 released,点进去,可以看到这个版本都更新了哪些东西:

这里面对Spark 3.0.0 新版本进行了介绍。

下面这里面列出来了Spark 3.0版本中的核心新特性:

image-20240406225207119

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
总结下来大致包括下面这几个:

-自适应查询执行(Adaptive Query Execution),可以简称为AQE
-动态分区裁剪(Dynamic Partition Pruning),可以简称为DPP
-加速器感知调度(Accelerator-aware Scheduling)
-Catalog 插件 API(Catalog plugin API)
-支持 Hadoop 3.x / Java 11 / Scala 2.12
-更好的 ANSI SQL 兼容性(Better ANSI SQL compatibility)
-Pandas API 的重大改进(Redesigned pandas UDF API with type hints)
-用于流计算的新UI(Structured Streaming UI)

这个Pandas API 的重大改进是针对PySpark的。
这个用于流计算的新UI是针对Spark中的流计算的。
这两块内容对我们而言意义不大,后面不再重点分析。

下面我们首先开看一下自适应查询执行这个新特性。

自适应查询执行

1
2
3
4
5
6
自适应查询执行:可以简称为AQE。它是对Spark执行计划的优化,它可以基于任务运行时统计的数据指标动态修改Spark 的执行计划。

Spark 原有的执行计划是静态生成的,一旦代码编译好,即使后续发现执行计划可优化,也无法改变了。而自适应查询执行功能是在执行查询计划的同时,基于精确的运行时统计信息,对执行计划进行优化,进而提升性能。

通俗一点来说可以这样理解:
一个复杂的Spark任务在运行期间会产生多个State。假设有State0、Stage1和Stage2。

image-20240406225608571

1
2
3
4
5
6
7
8
9
10
11
12
13
每个Stage内部都会有一系列的执行逻辑。
当Stage0执行结束之后,会产生一个中间结果,其实就是RDD,此时这个RDD中的数据量是可以准确获取到的。自适应查询机制就是根据这里的数据统计信息来决定是否对后面Stage1中的执行计划进行优化。
当Stage1执行结束后,再根据它产生的RDD数据来决定是否对后面Stage2中的执行计划进行优化
后面如果还有Stage,以此类推。

自适应查询执行主要带来了下面这3点优化功能:

-自适应调整Shuffle分区数量。
-动态调整 Join 策略。
-动态优化倾斜的 Join。

下面我们来具体分析一下
首先看第一个:自适应调整Shuffle分区数量
自适应调整Shuffle分区数量
1
2
3
4
5
6
7
8
9
10
11
Spark在处理海量数据的时候,其中的Shuffle过程是比较消耗资源的,也比较影响性能,因为它需要在网络中传输数据。

shuffle 中的一个关键属性是:分区的数量。

分区的最佳数量取决于数据自身大小,但是数据大小可能在不同的阶段、不同的查询之间有很大的差异,这使得这个数字很难精准调优。

如果分区数量太多,每个分区的数据就很小,读取小的数据块会导致IO效率降低,并且也会产生过多的task, 这样会给Spark任务带来更多负担。

如果分区数量太少,那么每个分区处理的数据可能非常大,处理这些大分区的数据可能需要将数据溢写到磁盘(例如:排序或聚合操作),这样也会降低计算效率。

想要解决这个问题,就需要给Shuffle设置合适的分区数量,如果手工设置,基本上是无法达到最优效率的。想要达到最优效率,就需要依赖于我们这里所说的自适应调整Shuffle分区数量这个策略了。
1
2
3
4
5
6
7
8
那么这个自适应调整Shuffle分区数量的底层策略是怎么实现的呢?
Spark初始会设置一个较大的Shuffle分区个数,这个数值默认是200,后续在运行时会根据动态统计到的数据信息,将小的分区合并,也就是慢慢减少分区数量。

假设我们运行select flag,max(num) from t1 group by flag这个SQL语句,表t1中的数据比较少。

现在我们把初始的Shuffle 分区数量设置为5,所以在 Shuffle 过程中数据会产生5 个分区。如果没有开启自适应调整Shuffle分区数量这个策略,Spark会启动5个Recuce任务来完成最后的聚合。但是这里面有3个非常小的分区,为每个分区分别启动一个单独的任务会浪费资源,并且也无法提高执行效率。因为这3个非常小的分区对应的任务很快就执行完了,另外2个比较大的分区对应的任务需要执行很长时间,资源没有被充分利用到。

看下面这个图:

image-20240406230258263

1
2
开启自适应调整Shuffle分区数量之后,Spark 会将这3个数据量比较小的分区合并为1个分区,让1个reduce任务处理,这个时候最终的聚合操作只需要启动3个reduce任务就可以了。
看下面这个图:

image-20240406230503624

1
2
3
4
5
6
关于自适应调整Shuffle分区数量这个机制的核心参数主要包括下面这几个:

核心参数 默认值 解释
spark.sql.adaptive.enabled true 是否开启AQE机制
spark.sql.adaptive.coalescePartitions.enabled true 是否开启AQE中的自适应调整Shuffle分区数量机制
spark.sql.adaptive.advisoryPartitionSizeInBytes 67108864b (64M) 建议的Shuffle分区大小
1
2
3
4
5
spark.sql.adaptive.enabled:这个参数是控制整个自适应查询执行机制是否开启的,也就是控制AQE机制的。默认值是true,表示默认是开启的。

spark.sql.adaptive.coalescePartitions.enabled:这个参数才是真正控制自适应调整Shuffle分区数量这个机制是否开启的。默认值是true,表示默认也是开启的。注意:想要开启这个功能,第1个参数肯定也要设置为true。因为自适应调整Shuffle分区数量这个机制是AQE机制中的一个子功能。

spark.sql.adaptive.advisoryPartitionSizeInBytes:这个参数是控制shuffle中分区大小的,默认是64M。理论上来说,这个参数越大,shuffle中最终产生的分区数量就越少,但是也不能太大,太大的话的产生的分区数量就太少了,会导致产生的任务数量也变少,最终会影响执行效率。
1
2
这些参数在官网文档中也是可以找到的,我们来看一下:
https://spark.apache.org/docs/3.2.1/sql-performance-tuning.html#adaptive-query-execution

image-20240406230926974

image-20240406231220014

image-20240406231436359

1
2
3
4
5
6
7
8
在这里面还有一个参数大家可以了解一下:

spark.sql.adaptive.coalescePartitions.parallelismFirst:这个参数默认值是true,表示要优先保证任务的并行度,也就意味要保证Shuffle分区数量不能太少,太少的话并行执行的任务就少了。
spark.sql.adaptive.coalescePartitions.parallelismFirst这个参数是从Spark 3.2.0版本开始才有的。

当这个参数为true时表示要优先保证任务的并行度,所以Spark在合并shuffle分区的时候会自动忽略spark.sql.adaptive.advisoryPartitionSizeInBytes这个参数的值(64M),只依据spark.sql.adaptive.coalescePartitions.minPartitionSize这个参数的值(1M),这样可以最大化保证并行度。这样主要是为了在启用自适应调整Shuffle分区数量的时候避免性能下降。官方建议将此配置设置为false,并使用spark.sql.adaptive.advisoryPartitionSizeInBytes这个参数。

注意:这个参数经过实际测试发现目前并没有生效,估计是因为文档和源码没有同步导致的,这种问题在开源项目中也是存在的。
1
2
3
4
5
6
7
8
9
10
11
12
13
接下来我们通过一个具体的案例来演示一下。
在src/main/java下创建包:com.imooc.java.sql
将课程中提供的GenerateJSONData类拷贝过来。
执行类中的generateCoalescePartitionData函数,生成测试数据文件。

接下来开发一个SparkSQL程序来验证自适应调整Shuffle分区数量这个特性:
在这里需要在项目中引入SparkSQL的依赖。

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
</dependency>
1
2
3
开发代码:AQECoalescePartitionsScala
在src/main/scala下创建包:com.imooc.scala.sql
为了对比试验,首先关闭自适应调整Shuffle分区数量的策略。
scala
spark sql shuffle默认分区数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.imooc.scala.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

/**
* 自适应调整Shuffle分区数量
* Created by xuwei
*/
object AQECoalescePartitionsScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQECoalescePartitionsScala")
.config(conf)
//禁用AQE机制
.config("spark.sql.adaptive.enabled","false")
//禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
.config("spark.sql.adaptive.coalescePartitions.enabled","false")
.getOrCreate()

//读取Json格式数据,获取DataFrame
val jsonDf = sparkSession.read.json("D:\\spark_json_1.dat")
//创建临时表
jsonDf.createOrReplaceTempView("t1")
//执行SQL语句
//注意:这个SQL中需要有可以产生shuffle的语句,否则无法验证效果。
//在这里使用group by语句实现shuffle效果,并且还要注意尽量在group by后面多指定几个字段,否则shuffle阶段传输的数据量比较小,效果不明显。
val sql =
"""
|select
| id,uid,lat,lnt,hots,title,status,
| topicId,end_time,watch_num,share_num,
| replay_url,replay_num,start_time,timestamp,area
|from t1
|group by id,uid,lat,lnt,hots,title,status,
| topicId,end_time,watch_num,share_num,
| replay_url,replay_num,start_time,timestamp,area
|""".stripMargin
sparkSession.sql(sql).write.format("json").save("hdfs://bigdata01:9000/aqe/coalesce_partition_"+System.currentTimeMillis())


//让程序一直运行,这样本地的Spark任务界面就可以一直查看
while (true){
;
}
}
}
1
2
3
4
5
6
7
8
9
我们可以把前面提到的那几个核心参数在代码中验证一下,因为不同的Spark版本中这些参数可能会有一些差异,通过下面这几行代码可以打印出来spark中这些参数的默认值。

注意:这些代码需要在获取到sparksession变量之后执行,因为这些参数是控制sparksql的

println("======================================")
println(sparkSession.conf.get("spark.sql.adaptive.enabled"))
println(sparkSession.conf.get("spark.sql.adaptive.coalescePartitions.enabled"))
println(sparkSession.conf.get("spark.sql.adaptive.advisoryPartitionSizeInBytes"))
println("======================================")
1
2
3
4
5
6
7
8
9
10
11
12
13
为了方便一些,我们就直接在本地idea中运行代码。
在运行代码之前先在hdfs中创建aqe目录
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -mkdir /aqe

运行程序,首先到控制台查看日志,验证参数的情况。
这样可以真正确认当前AQE机制的实际设置情况。

日志中打印的内容如下:
======================================
false
false
67108864b
======================================
1
2
3
4
5
6
然后到任务界面中查看任务执行情况:
到Stage中进行查看。

前面2个Stage在运行时都是产生了13个task,是因为读取的数据是1655M,按照128M切分,正好是13个task。

最上面的那个Stage是根据Shuffle分区之后产生的任务数量,默认情况下SparkSql中的shuffle分区数量是200,每个分区会产生一个task,所以这里最终产生了200个task。其实这个Shuffle过程中产生的数据量并没有多少,一共只有87.9M。最终这个Stage执行了1.1分钟。

image-20240406234414424

1
可以点进去看一下,每个task大致处理450KB的数据。

image-20240406234516406

1
2
3
4
这个时候其实拆分出这么多的分区只会降低计算效率,因为数据量太小了。
但是在Spark 3.0之前,Spark无法提前预知中间某一个Shuffle的数据量,考虑到Spark是处理海量数据的,所以给SparkSQL的Shuffle分区数量设置了一个比较大的默认值200。

这些数据指标也可以在SQL界面中查看。

image-20240406234610354

image-20240406234715421

1
2
3
4
5
6
这两个地方都可以查看:
在SQL界面中查看的其实是这个SQL任务的执行流程,或者说是执行计划。
在Stage界面查看的是SQL转换为RDD之后执行的具体情况。

针对这个没有开启自适应调整Shuffle分区数量的代码,或者是Spark3.0版本之前的代码,如果想要进行优化,我们可以考虑手工指定shuffle分区数量。
修改代码,将Shuffle分区数量设置为10。
手工指定shuffle分区数量
1
2
3
4
5
6
7
8
9
10
11
12
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQECoalescePartitionsScala")
.config(conf)
//手工指定Shuffle分区数量,默认是200
.config("spark.sql.shuffle.partitions","10")
//禁用AQE机制
.config("spark.sql.adaptive.enabled","false")
//禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
.config("spark.sql.adaptive.coalescePartitions.enabled","false")
.getOrCreate()
1
执行代码,到任务界面中查看效果。

image-20240406235113633

1
2
3
4
5
6
此时发现在Stage中产生了10个Task,因为我们在代码中设置了Shuffle分区数量是10,所以产生了10个Task,这个是正确的。
分区变小之后,整个Stage的执行时间也减少了很多,现在只需要执行28秒了,还是有很大性能提升的。

但是通过手工调整有2个问题:
-手工设置的Shuffle分区数量不一定是最优的。
-针对每个SparkSQL任务都依赖手工调整会非常麻烦。
AQE&自适应调整shuffle分区
1
2
3
4
所以从Spark3.0开始,引入了AQE机制,可以实现自适应调整Shuffle分区数量。
修改代码,验证一下效果。

注意:默认情况下AQE机制是开启的,自适应调整Shuffle分区数量功能也是开启的。将之前添加的配置注释掉即可。
1
2
3
4
5
6
7
8
9
10
11
12
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQECoalescePartitionsScala")
.config(conf)
//手工指定Shuffle分区数量,默认是200
//.config("spark.sql.shuffle.partitions","10")
//禁用AQE机制
//.config("spark.sql.adaptive.enabled","false")
//禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
//.config("spark.sql.adaptive.coalescePartitions.enabled","false")
.getOrCreate()
1
执行代码,到任务界面中查看效果

image-20240406235430879

1
此时发现Stage中的task数量是2,这个就表示经过自适应调整Shuffle分区数量之后,将分区数量调整成了2个。最终产生2个Task。整个Stage的执行耗时为37秒

image-20240406235532593

1
2
3
其中一个task处理61M的数据,另外一个task处理26M的数据。

也可以到SQL模块中查看相关指标:

image-20240406235600500

image-20240406235640060

1
2
3
4
5
6
7
大家在这里可能会有一个问题:

为什么会自动调整成2个分区呢?为什么不调整成3个分区呢?

自适应调整Shuffle分区数量机制在自动调整分区数量的时候有什么依据吗?
是有依据的,在自动调整分区数量的时候,会根据总的Shuffle数据量,参考一个分区大小的阈值来切分分区,其实就是spark.sql.adaptive.advisoryPartitionSizeInBytes参数。
这个参数默认是64M,所以会大致参考64M进行切分,最终切分出来一个是61M,一个是26M。
调整一下默认的Shuffle分区大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
接下来我们调整一下默认的Shuffle分区大小,改为30 M,看一下此时任务会产生多少个Shuffle分区。
修改后的代码如下:

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQECoalescePartitionsScala")
.config(conf)
//手工指定Shuffle分区数量,默认是200
//.config("spark.sql.shuffle.partitions","10")
//禁用AQE机制
//.config("spark.sql.adaptive.enabled","false")
//禁用自适应调整Shuffle分区数量(其实只要禁用了AQE机制,这个参数就不用设置了)
//.config("spark.sql.adaptive.coalescePartitions.enabled","false")
//设置建议的分区大小
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes","31457280b")
.getOrCreate()
1
执行程序,到任务界面查看效果:

image-20240406235942739

1
2
此时发现在Stage中产生了4个Task,耗时为29秒。
接下来看一下每个task处理的大致数据量:

image-20240407000042956

1
2
3
4
5
6
7
8
9
最终产生了3个28M的分区,1个2M的分区,说明我们修改的spark.sql.adaptive.advisoryPartitionSizeInBytes这个参数是生效了。
其实这个默认参数也没必要修改,针对海量数据的场景,按照64M切分也是合理的。

通过前面的演示,可以验证自适应调整Shuffle分区数量这个机制的实际效果,最终通过自适应调整Shuffle分区数量机制提高了SparkSQL任务的计算效率。

最后要注意:这个功能验证完毕之到HDFS中把生成的结果数据删一下,释放磁盘空间。
删除的时候忽略回收站。

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /aqe/coalesce_partition_*

动态调整Join策略

1
2
3
4
5
6
7
Spark中支持多种Join 策略,其中 BroadcastHashJoin的性能通常是最好的,但是前提是参加Join的其中一张表的数据能够存入内存。
基于这个原因,当Spark评估参加Join的表的数据量小于广播大小的阈值时,会将Join策略调整为BroadcastHashJoin。广播大小的阈值默认是10M。
但是,很多情况都可能导致这种大小的评估出错。例如:Join的时候SQL语句中存在过滤器。

开启了自适应查询执行机制之后,可以在运行时根据最精确的数据指标重新规划Join策略,实现动态调整Join策略。

看下面这个图:

image-20240408101036084

1
2
3
4
5
6
从这个图里面可以看到,对表t2进行过滤之后的数据大小比预估值小得多,并且小到足以进行广播,因此在重新优化之后,之前静态生成的SortMergeJoin策略就会被转换为BroadcastHashJoin策略了。

针对动态调整Join策略的核心参数主要包括下面这1个:

核心参数 默认值 解释
spark.sql.adaptive.autoBroadcastJoinThreshold none 设置允许广播的表的最大值。设置为-1表示禁用。如果未设置会参考spark.sql.autoBroadcastJoinThreshold参数的值(10M)。
1
spark.sql.adaptive.autoBroadcastJoinThreshold:这个参数没有默认值,通过这个参数可以控制允许广播的表的最大值。当两个表进行join的时候,如果一个表比较小,可以通过广播机制广播出去,这样就可以把本来是reduce端的join,改为map端的join,提高join效率。如果把这个参数的值设置为-1,表示禁用自动广播策略。如果我们没有给这个参数设置值,则默认会使用spark.sql.autoBroadcastJoinThreshold参数的值,这个参数的值默认是10M。那也就是说当一个表中的数据小于10M的时候在这里支持将这个表广播出去。
1
2
3
4
5
6
7
下面我们通过一个具体案例演示一下:

首先执行GenerateJSONData类中的generateJoinData函数,生成测试数据文件。

为了对比实验,在这里我们先开发一个未开启动态调整Join策略的程序。
因为针对动态调整Join策略这个功能没有单独提供参数进行控制,只要开启了自适应查询执行,当满足条件的时候,动态调整Join策略就会触发了
核心代码如下:
scala
关闭自适应查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.imooc.scala.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 动态调整Join策略
* Created by xuwei
*/
object AQEJoinScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQEJoinScala")
.config(conf)
.config("spark.sql.adaptive.enabled","false")
.getOrCreate()


//读取Json格式数据,获取DataFrame
val jsonDf1 = sparkSession.read.json("D:\\spark_json_t1.dat")
val jsonDf2 = sparkSession.read.json("D:\\spark_json_t2.dat")
//创建临时表
jsonDf1.createOrReplaceTempView("t1")
jsonDf2.createOrReplaceTempView("t2")
//执行SQL语句
val sql =
"""
|select
| t1.id,
| t2.name,
| t1.score
|from t1 join t2
| on t1.id = t2.id
| and t2.city like 'bj'
|""".stripMargin
sparkSession.sql(sql).write.format("json").save("hdfs://bigdata01:9000/aqe/join_"+System.currentTimeMillis())


//让程序一直运行,这样本地的Spark任务界面就可以一直查看
while (true){
;
}
}
}
1
2
查看任务界面信息:
到SQL界面中查看具体的执行情况

image-20240408102100437

1
从下面这个图里面可以看出来,此时JOIN的时候使用的是SortMergeJoin策略。

image-20240408102140304

1
2
3
这个策略是任务一开始的时候就指定好的,因为前期任务无法真正知道过滤后的t2表有多大,原始的t2表有146M,原始的t1表有90M,所以不满足广播策略的条件。

在这里其实可以看到,真正过滤后的数据只有139KB,所以t2这个表过滤后的大小是小于广播阈值的,正常情况下在Join的时候是应该广播出去的,这样才可以提高计算效率的。但是由于此时没有开启自适应查询执行策略,任务运行期间无法根据数据的实际大小修改执行计划。
开启动态join
1
2
3
4
5
6
7
8
接下来修改代码,开启动态调整Join策略,其实也就是开启自适应查询执行机制。

val sparkSession = SparkSession
.builder()
.appName("AQEJoinScala")
.config(conf)
//.config("spark.sql.adaptive.enabled","false")
.getOrCreate()
1
2
3
执行代码,查看任务界面效果:

注意:任务在刚开始运行的时候我们来查看这个执行计划,可以发现默认使用的还是SortMergeJoin策略,如下图所示:

image-20240408103801570

1
2
3
因为针对t2表,通过like过滤之后剩下的真实数据只有139KB,这个数据大小是小于10M的,可以广播出去提高Join的效率。

这些数据大小在stage界面中也可以看到

image-20240408104055219

1
针对这两种情况的执行效果进行对比,如下图所示,开启了自适应查询执行机制之后,会在运行期间获取到t2表过滤后的真实数据,从而修改之前的执行策略。

image-20240408104140311

1
2
3
4
5
6
7
8
9
10
11
注意:当我们把spark.sql.adaptive.autoBroadcastJoinThreshold参数设置为-1的时候,可以禁用sparksql中的自动广播机制,就算开启了自适应查询执行机制,也无法转换为BroadcastHashJoin策略。
修改代码:

val sparkSession = SparkSession
.builder()
.appName("AQEJoinScala")
.config(conf)
//.config("spark.sql.adaptive.enabled","false")
//禁用自动广播机制
.config("spark.sql.adaptive.autoBroadcastJoinThreshold","-1")
.getOrCreate()
1
执行代码,查看任务界面效果:

image-20240408104224462

1
2
3
4
5
6
7
此时发现就算是开启了自适应查询执行机制,依然还是使用的SortMergeJoin策略。

最后要注意:这个功能验证完毕之到HDFS中把生成的结果数据删一下,这个数据量还是比较占用磁盘空间的。

删除的时候忽略回收站。

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /aqe/join_*

动态优化倾斜的 Join

1
2
3
4
5
6
7
在进行Join操作的时候,如果数据在多个分区之间分布不均匀,很容易产生数据倾斜,如果数据倾斜比较严重会显著降低计算性能。

动态优化倾斜的 Join这个机制会从Shuffle文件统计信息中自动检测到这种倾斜,然后它会将倾斜的分区做进一步切分,切分成更小的子分区,这些子分区会连接到对应的分区进行关联。

在Spark 3.0版本之前,如果在join的时候遇到了严重的数据倾斜,是需要我们自己对数据进行切分处理,提高计算效率的。现在有了动态优化倾斜的 Join这个机制之后就很方便了。

假设有两个表 t1和t2,其中表t1中的P0分区里面的数据量明显大于其他分区,默认的执行情况是这样的,看这个图:

image-20240408104631447

1
2
t1表中p0分区的数据比p1\p2\p3这几个分区的数据大很多,可以认为t1表中的数据出现了倾斜。
当t1和t2表中p1、p2、p3这几个分区在join的时候基本上是不会出现数据倾斜的,因为这些分区的数据相对适中。但是P0分区在进行join的时候就会出现数据倾斜了,这样会导致join的时间过长。
1
动态优化倾斜的 Join机制会把P0分区切分成两个子分区P0-1和P0-2,并将每个子分区关联到表t2的对应分区P0,看这个图:

image-20240408104816899

1
2
3
4
5
6
t2表中的P0分区会复制出来两份相同的数据,和t1表中切分出来的P0分区的数据进行Join关联。
这样相当于就把t1表中倾斜的分区拆分打散了,最终在join的时候就不会产生数据倾斜了。

如果没有这个优化,将有4个任务运行Join操作,其中P0分区对应的任务将消耗很长时间。优化之后,会有5个任务运行Join操作,每个任务消耗的时间大致相同,这样就可以获得最优的执行性能了。

针对动态优化倾斜的 Join策略的核心参数主要包括下面这3个:
1
2
3
4
核心参数	默认值	解释
spark.sql.adaptive.skewJoin.enabled true 是否开启AQE机制中的动态优化倾斜的Join机制
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 数据倾斜判断因子,必须同时满足(数据倾斜判断阈值)
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 268435456b (256M) 数据倾斜判断阈值,必须同时满足(数据倾斜判断因子)
1
2
3
spark.sql.adaptive.skewJoin.enabled:默认值是true,表示默认开启AQE机制中的动态优化倾斜的Join机制。
spark.sql.adaptive.skewJoin.skewedPartitionFactor:默认值是5,这个参数属于判断分区数据倾斜的一个因子。
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:默认值是256M,这个参数也属于判断分区数据倾斜的一个因子。
1
2
3
4
5
如果Shuffle中的一个分区的大小大于skewedPartitionFactor这个因子乘以Shuffle分区中位数的值,并且这个分区也大于skewedPartitionThresholdInBytes这个参数的值,则认为这个分区是倾斜的。

理想情况下,skewedPartitionThresholdInBytes参数的值应该大于advisoryPartitionSizeInBytes参数的值。因为后期在切分这个倾斜的分区时会依据advisoryPartitionSizeInBytes参数的值进行切分,如果skewedPartitionThresholdInBytes参数的值小于advisoryPartitionSizeInBytes的值,那就无法切分了。

通过下面这个图,可以更加清晰的理解如何判断数据倾斜:

image-20240408105237305

1
2
如果分区A中的数据大小 大于skewedPartitionFactor * 分区大小的中位数。
并且分区A中的数据大小 也大于 skewedPartitionThresholdInBytes参数的值,则分区A就是一个倾斜的分区了,那也就意味着这个任务中的数据出现了数据倾斜,这样才会触发动态优化倾斜的Join功能。
scala
关闭动态优化倾斜的Join
1
2
3
4
5
下面我们来根据一个案例具体分析一下:
首先执行GenerateJSONData类中的generateSkewJoin函数,生成测试数据文件。

为了进行对比试验,我们先开发一个未开启动态优化倾斜的Join功能的程序
核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.imooc.scala.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 动态优化倾斜的Join
* Created by xuwei
*/
object AQESkewJoinScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQESkewJoinScala")
.config(conf)
//禁用AQE机制
.config("spark.sql.adaptive.enabled","false")
//禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
.config("spark.sql.adaptive.skewJoin.enabled","false")
.getOrCreate()


//读取Json格式数据,获取DataFrame
val jsonDf1 = sparkSession.read.json("D:\\spark_json_skew_t1.dat")
val jsonDf2 = sparkSession.read.json("D:\\spark_json_skew_t2.dat")
//创建临时表
jsonDf1.createOrReplaceTempView("t1")
jsonDf2.createOrReplaceTempView("t2")
//执行SQL语句
val sql =
"""
|select
| t1.id,
| t2.name,
| t1.score
|from t1 join t2
| on t1.id = t2.id
|""".stripMargin
sparkSession.sql(sql).write.format("json").save("hdfs://bigdata01:9000/aqe/skew_join_"+System.currentTimeMillis())

while (true){
;
}
}
}
1
2
运行程序,到任务界面中查看运行效果。
在Stage界面中看到Spark SQL 中的Join功能在运行的时候产生了200个任务,因为默认Shuffle分区数量是200,对应产生200个任务没有问题。

image-20240408105702437

1
但是仔细查看这200个任务,发现其中有一个任务处理的数据量非常大,在153M左右。其他的任务处理的数据量很小,都在1.5M左右。从这里可以看出来这个Join操作在执行的时候出现了数据倾斜。

image-20240408105821547

开启动态优化倾斜的Join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
针对这个任务而言,想要提高计算效率,需要把这个倾斜的分区中的数据进行拆分,拆分成多个子任务去执行,这样就可以了。
这个问题正好是动态优化倾斜的Join这个功能可以解决的。
那我们就开启这个功能,验证一下效果:
修改代码:

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQESkewJoinScala")
.config(conf)
//禁用AQE机制
//.config("spark.sql.adaptive.enabled","false")
//禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
//.config("spark.sql.adaptive.skewJoin.enabled","false")
//注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用,避免影响结果
.config("spark.sql.adaptive.coalescePartitions.enabled","false")
.getOrCreate()
1
2
重新执行程序,到任务界面中查看效果:
在Stage界面中发现还是200个任务,任务数量没有增加,也就意味着动态优化倾斜的Join没有真正生效。

image-20240408110209988

1
2
进入这个Stage查看详细信息:
发现之前这个153M的分区的数据依然存在,没有被分开处理。

image-20240408110237741

1
2
为什么动态优化倾斜的 Join这个功能没有生效呢?
大家回头再看一下这个图表中的这2个参数:

image-20240408110336963

1
2
如果我们开启了动态优化倾斜的 Join这个功能,并且确实有数据倾斜了,但是没有真正触发执行,那说明这个倾斜的数据没有满足这2个参数的指标,
咱们前面也通过这个图进行解释了:

image-20240408110406751

1
2
3
4
5
6
只有这2个条件都满足了,才认为数据倾斜了。

首先看一下第一个条件
skewedPartitionFactor 等于 5。
分区大小的中位数可以到SQL任务的执行流程中查看,这个分区大小的中位数是一个预估值。
到任务界面中,点击SQL模块查看:

image-20240408110506705

1
2
3
4
5
6
7
8
9
10
从图中可以看出来分区大小的中位数就是719字节。
最终 5 * 719 B =3595 B(约等于0.003M)。
目前这个倾斜的分区是153M,大于0.003M,第一个条件满足。其实按照这个条件的话,目前所有分区都是满足的。

接下来看第二个条件
skewedPartitionThresholdInBytes这个参数默认是256M,目前我们这个任务中倾斜的分区大小是153M,不大于256M,所以第二个条件不满足。

最终Spark任务不认为这个分区是倾斜的,所以没有触发动态优化倾斜的Join策略的执行。

那接下来我们来修改一下代码,让这个153M的分区同时满足这两个条件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val sparkSession = SparkSession
.builder()
.appName("AQESkewJoinScala")
.config(conf)
//禁用AQE机制
//.config("spark.sql.adaptive.enabled","false")
//禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
//.config("spark.sql.adaptive.skewJoin.enabled","false")
//注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用,避免影响结果
.config("spark.sql.adaptive.coalescePartitions.enabled","false")
//某个分区 > skewedPartitionFactor * 中位数
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor","5")
//某个分区 > 100M
.config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","100mb")
//如果这里指定的分区大小超过了任务中倾斜的分区的大小,这样就无法触发动态优化倾斜的Join这个功能了
//建议这里设置的分区大小最大也不能超过skewedPartitionThresholdInBytes的值(100M)
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes","64mb")
.getOrCreate()
1
2
重新执行程序,到任务界面中查看效果:
此时到Stage界面中发现产生了202个任务。

image-20240408110751652

1
2
3
点击查看这个Stage的详细信息。
相当于把这个153M的分区,大致按照64M,切分成了3份。61M、60M和34M(注意:并不是完全按照64M来切分,会大致按照64M来切分)。
之前是1个任务处理这153M的数据,现在是有3个任务一起来处理这153M的数据,这样就解决了数据倾斜的问题,提高了任务的计算效率。

image-20240408110841215

1
当然了,如果感觉这样拆分还是有点大,则可以对应的调整advisoryPartitionSizeInBytes参数的值,这个参数的值越小,拆分出来的分区就越多,但是太多了也不好,那样就是很多小任务了。
advisoryPartitionSizeInBytes大于skewedPartitionThresholdInBytes
1
2
3
4
注意:如果我在这里把advisoryPartitionSizeInBytes参数的值设置的比实际倾斜的分区还要大,那么此时会出现什么样的效果呢?

我们来演示一下看看。
修改代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQESkewJoinScala")
.config(conf)
//禁用AQE机制
//.config("spark.sql.adaptive.enabled","false")
//禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
//.config("spark.sql.adaptive.skewJoin.enabled","false")
//注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用,避免影响结果
.config("spark.sql.adaptive.coalescePartitions.enabled","false")
//某个分区 > skewedPartitionFactor * 中位数
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor","5")
//某个分区 > 100M
.config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","100mb")
//如果这里指定的分区大小超过了任务中倾斜的分区的大小,这样就无法触发动态优化倾斜的Join这个功能了
//建议这里设置的分区大小最大也不能超过skewedPartitionThresholdInBytes的值(100M)
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes","160mb")
.getOrCreate()
1
2
重新执行程序,到任务界面中查看效果:
此时发现Stage中只有200个任务,说明倾斜的分区没有被切分。

image-20240408111830818

1
2
3
这个时候实际上动态优化倾斜的Join这个策略是被触发了,只是因为对这个倾斜的分区进行切分的时候出现了问题。
倾斜的数据是153M,但是我设置的建议分区大小是160M,此时倾斜的数据还不到160M,所以就无法切分了。
所以这个参数不要设置太大,建议这里设置的分区大小最大也不能超过skewedPartitionThresholdInBytes的值(100M)。
1
2
3
最后,我们把这个自适应调整Shuffle分区数量这个策略也打开,因为我们这里面其实是有很多小任务的,所以打开这个策略之后,是可以合并一些分区的,这样是可以提高效率的。
之前没有打开是为了不让他影响产生的任务数量,便于我们分析。
修改代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("AQESkewJoinScala")
.config(conf)
//禁用AQE机制
//.config("spark.sql.adaptive.enabled","false")
//禁用动态优化倾斜的Join(其实只要禁用了AQE机制,这个参数就不用设置了)
//.config("spark.sql.adaptive.skewJoin.enabled","false")
//注意:在验证动态优化倾斜的Join这个功能的时候,最好先把自适应调整Shuffle分区数量这个功能禁用,避免影响结果
//.config("spark.sql.adaptive.coalescePartitions.enabled","false")
//某个分区 > skewedPartitionFactor * 中位数
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor","5")
//某个分区 > 100M
.config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","100mb")
//如果这里指定的分区大小超过了任务中倾斜的分区的大小,这样就无法触发动态优化倾斜的Join这个功能了
//建议这里设置的分区大小最大也不能超过skewedPartitionThresholdInBytes的值(100M)
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes","64mb")
.getOrCreate()
1
2
重新执行程序,到任务界面中查看效果:
此时发现Stage中只有9个任务,说明分区确实是被合并了。

image-20240408112153242

1
2
点进去查看详细信息:
发现之前倾斜的那个153M的分区也确实被拆分开了。

image-20240408112219199

1
2
3
4
5
6
这就是自适应调整Shuffle分区数量和动态优化倾斜的Join功能结合在一起的效果。

最后要注意:这个功能验证完毕之到HDFS中把生成的结果数据删一下
删除的时候忽略回收站。

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /aqe/skew_join_*

总结

1
2
3
4
通过减少查询优化对静态统计的依赖,AQE 解决了 Spark 基于成本优化的最大难题之一:统计信息收集开销和估计精度之间的平衡。
为了获得最佳的估计精度和规划结果,通常需要维护详细的、最新的统计信息,其中一些统计信息的收集成本很高,比如列直方图,它可以用于提高选择性和基数估计或检测数据倾斜。AQE 在很大程度上消除了对此类统计数据的需要,以及对手动调优工作的需要。除此之外,AQE 还使SQL查询优化对于任意 UDF 和不可预测的数据集更改(例如数据大小的突然增加或减少、频繁的和随机的数据倾斜等)更有弹性。

此时我们不再需要提前知道任务中的数据。随着查询的运行,AQE 将计算出数据并改进查询计划,提高查询性能以获得更快的分析效果,这就是AQE的优点。

动态分区裁剪

1
2
3
4
动态分区裁剪是什么意思呢?
当我们针对多个表进行 Join 的时候,动态分区裁剪功能 会基于运行时(runtime)推断出来的信息,当on后面的查询条件满足一定要求后就会自动对表中的数据进行裁剪(过滤),减少Join时参与的数据量,进而提高效率。

动态分区裁剪的具体执行流程是这样的,下面通过图表的形式进行分析:

image-20240409124500069

1
2
3
4
5
6
当我们在执行SELECT t1.id, t2.key FROM t1 JOIN t2 ON t1.key = t2.key AND t2.id <2这个Spark SQL 语句的时候。

首先来看左边的这个图:它表示这个Spark SQL语句在没有使用动态分区裁剪情况下的执行情况。
t1这个表可以认为是一个包含了多个分区的事实表。t2这个表可以认为是一个数据量比较小的维度表。
在Join的时候SQL解析引擎会先对t2中的过滤条件做谓词下推优化,也就是说在Join之前,先执行t2中的过滤条件,减少Join时t2的数据量。
但是t1表的数据是不变的,相当于通过scan进行了全表扫描,根据全表扫描的结果和t2过滤后的结果进行Join关联。
1
2
3
4
5
6
如果在Join关联的时候能够提前对t1表中的数据也进行过滤,这样是可以极大提高Join效率的。
但是由于之前版本的Spark无法动态计算代价,所以会导致对t1表全表扫描,最终会扫描出大量的无效数据和t2表进行Join关联。

接着来看右边这个图:它表示Spark SQL语句在执行的时候使用了动态分区裁剪功能,这里在扫描t1表中的数据的时候会根据t2表过滤后的数据信息,对表t1中的数据进行过滤(对应的底层实现就是t1.key IN (SELECT t2.key FROM t2 WHERE t2.id < 2 )),过滤的时候会将t1表中不满足条件的数据过滤掉,只获取需要的那部分数据。这个过程就是动态分区裁剪。

通俗一点来说,动态分区裁剪的核心思想是想办法提前跳过查询结果中不需要的数据,减少Join时的数据量,提高效率。
1
2
3
4
5
6
7
8
9
10
11
12
13
注意:这个动态分区裁剪操作默认是开启的,但是触发动态分区裁剪是需要一些条件的:

-需要裁剪的表必须是分区表,并且分区字段必须在Join中的on条件里面。

-Join类型必须是Inner Join、Left Semi Join 、Left Outer Join或者Right Outer Join。
针对Inner Join:Join操作左边的表、右边的表可以都是分区表,或者只有某一个表是分区表,至少要有一个表是分区表,这样才能支持裁剪。
针对Left Semi Join:需要保证Join操作左边的表是分区表,这样才能支持裁剪。
针对Left Outer Join:需要保证Join操作右边的表是分区表,这样才能支持裁剪。
针对Right Outer Join:需要保证Join操作左边的表是分区表,这样才能支持裁剪。

-另一张表里面需要至少存在一个过滤条件,例如前面案例中的t2.id<2这个过滤条件,如果两个表在join的时候没有指定过滤条件,那就肯定不会触发动态分区裁剪了。

-满足了前面3点的要求,也不一定会触发动态分区裁剪,此时还需要在运行时统计这个动态分区裁剪操作的代价,如果做了动态分区裁剪,最终对性能也没有多少提高,那还不如不做。针对我们这里所说的这个案例,如果针对t1表进行过滤后的数据量还是很大,那其实就没有必要进行动态分区裁剪优化了。
1
2
3
4
5
针对动态分区裁剪主要包括1个核心参数,就是开启这个功能的参数。

核心参数 默认值 解释
spark.sql.optimizer.dynamicPartitionPruning.enabled true 是否开启动态分区裁剪功能
spark.sql.optimizer.dynamicPartitionPruning.enabled:这个参数默认是true,所以在工作中使用的时候,如果你的Spark SQL语句和数据可以满足这些要求,就会自动触发动态分区裁剪。
scala
关闭动态分区裁剪功能
1
2
3
下面我们来通过一个案例验证一下效果。
为了进行对比试验,我们先开发一个未开启动态分区裁剪功能的程序
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.imooc.scala.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 动态分区裁剪
* Created by xuwei
*/
object DPPScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("DPPScala")
.config(conf)
//关闭动态分区裁剪
.config("spark.sql.optimizer.dynamicPartitionPruning.enabled","false")
.getOrCreate()

import sparkSession.implicits._

/**
* 创建一个表: t1
* 1:表中有1000条数据
* 2:表中有id和key这两个列,这两个列的值是一样的,起始值为0
* 3:这个表是一个分区表,分区字段为key
*/
sparkSession.range(1000)
.select($"id", $"id".as("key"))
.write
.partitionBy("key")
.mode("overwrite")
.saveAsTable("t1")

/**
* 创建一个表: t2
* 1:表中有10条数据
* 2:表中有id和key这两个列,这两个列的值是一样的,起始值为0
* 3:这个表是一个普通表
*/
sparkSession.range(10)
.select($"id", $"id".as("key"))
.write
.mode("overwrite")
.saveAsTable("t2")

val sql =
"""
|select
| t1.id,
| t2.key
| from t1 join t2
| on t1.key = t2.key
| and t2.id < 2
""".stripMargin

sparkSession.sql(sql).write.format("json").save("hdfs://bigdata01:9000/dpp/dpp_"+System.currentTimeMillis())

sparkSession.stop()
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
注意:这个程序如果在本地执行需要在windows中配置Hadoop的相关环境,比较麻烦,建议提交到YARN集群上执行。

修改pom.xml文件中的依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
1
2
3
4
5
6
打任务jar包。

上传任务jar包

在提交任务之前先在hdfs中创建dpp目录
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -mkdir /dpp
1
2
3
4
5
6
7
8
9
10
11
12
开发任务提交脚本,并且提交任务。
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# vi spark3DPP.sh
spark-submit-3 \
--class com.imooc.scala.sql.DPPScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
db_spark3-1.0-SNAPSHOT-jar-with-dependencies.jar
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# sh -x spark3DPP.sh

等待任务执行结束后到YARN上查看任务的相关信息。
1
2
3
注意:需要提前开启hadoop和spark的historyserver服务。

查看SQL界面的任务执行流程:

image-20240411084135602

image-20240411084232427

1
2
3
4
在join之前会先对t2中的数据进行过滤,然后和t1表中的数据进行join。
这里的执行时间为8秒。

注意:如果后期忘记了这个任务到底设置了哪些参数,有没有关闭动态分区裁剪功能,可以到这里查看,默认的参数是不显示的,我们在代码中通过config配置的参数都是显示的

image-20240411084358373

开启动态分区裁剪功能
1
2
3
4
5
6
7
8
9
10
11
接下来开启动态分区裁剪功能。
修改代码:

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("DPPScala")
.config(conf)
//关闭动态分区裁剪
//.config("spark.sql.optimizer.dynamicPartitionPruning.enabled","false")
.getOrCreate()
1
2
3
4
5
6
接下来重新打jar包

上传jar包、执行任务。

等待任务执行结束后到YARN上查看任务的执行信息。
查看SQL界面的任务执行流程:

image-20240411084622728

1
在join之前会对t2中的数据进行过滤,然后再根据t2过滤出来的数据对t1表中的数据进行过滤,最后对两个表中过滤后的数据进行join。
1
2
3
4
开启了动态分区裁剪的执行时间为3秒。之前没有开启动态分区裁剪时的时间是8s。所以说还是有很大性能提升的。

从Stage界面中也可以看到性能的提升。
没有开启动态分区裁剪的时候这个stage需要启动32个task。

image-20240411084732809

1
开启了动态分区裁剪的时候这个stage只需要启动2个task。

image-20240411084815556

1
这就是动态分区裁剪功能的效果。

加速器感知调度

1
2
3
为了让Spark能够利用目标平台上的硬件加速器(例如目标平台上的GPU),Spark 3.0版本增强了已有的调度程序,使集群管理器可以感知到加速器,这就是加速器感知调度特性。
通俗一点来说就是可以自动感知集群中的GPU资源,并且支持对GPU资源的调度。
我们普通的程序主要是使用CPU资源,不需要使用GPU资源。针对深度学习算法需要用到GPU资源提高效率。

Catalog 插件 API

1
2
3
4
5
6
7
8
9
10
11
Spark 2.x的时候新推出了DataSourceV2 API,主要是为了用来和外部数据存储进行交互。但是这个API缺少了关键的一个环节:对表的元数据进行操作(例如:创建、修改、删除表)。

在Spark 2.x版本中,SparkSQL是支持通过CTAS (Create Table AS Select)的方式来创建一个表并向该表写入数据,但是这是一个操作,并不是API,所以DataSourceV2中缺少创建目标表的API。

这样在实际操作中可能会存在一些问题,为了实现 CTAS,Spark 会创建、写入或者删除表(写入失败时)。这样的话,当元数据管理不可用或者 Driver程序失败的时候,CTAS删除表的时候可能会失败,这样会导致表无法删除。

Spark3.0新版本中增强了DataSourceV2 API,并引入了新的Catalog API。对于同时实现了Catalog API和DataSourceV2 API的外部数据源,用户可以通过标识符直接操作外部表的数据和元数据。

通俗一点来说就是之前只支持对外部数据源的数据进行操作,现在可以支持操作外部数据源的表结构。

DataSourceAPI 所处的位置是在这里,就是数据源接入环节。

image-20240411085354688

1
不过Catalog API目前仍处于试验阶段,官方不建议在生产环境中使用。

支持Hadoop3.x / Java11 / Scala2.12

1
2
3
4
5
Spark 3.x建议集成在Hadoop3.x版本之上。
Saprk 3.x建议使用Java8版本,这个版本的稳定性是最好的,当然现在他也支持java11版本了。
Spark 3.x是基于Scala2.12版本编译的,在Spark 3.2.x中也可以支持Scala2.13版本。

在这里可以看到:

image-20240411085645106

1
不过建议还是使用scala2.12版本,这个版本目前用的比较多。

更好的 ANSI SQL 兼容性

1
2
3
4
5
6
7
8
PostgreSQL是目前比较先进的开源数据库之一,它支持SQL:2011中的大部分主要特性。SQL:2011中要求的179个功能,PostgreSQL至少符合160个。
Spark社区目前专门开了一个ISSUE(SPARK-27764)来解决Spark SQL和PostgreSQL之间的差异,包括功能特性补齐,Bug修改等。
功能特性补齐包括支持ANSI SQL的一些函数,区分SQL保留关键字以及内置函数等。

因为Spark原来的SQL语法、函数 和ANSI SQL还是存在一些差异的。这个版本将缩小Spark SQL和ANSI SQL之间的差异。

具体Spark 3.x版本中对Spark SQL做了哪些功能补齐,我们可以到ISSUE中看一下:
https://issues.apache.org/jira/browse/SPARK-27764

image-20240411085842558

1
2
目前这个ISSUE处于OPEN打开状态,说明需要完善的功能还没有做完。
这里面包含了很多子任务,可以大致看一下:

image-20240411085906201

1
2
3
4
等这些子任务的状态都变为RESOLVED(解决)状态的时候说明完善工作都完成了。
目前里面的很多功能还处于开发过程中。

想要具体看一下某个子任务做了什么事情,可以点击到里面看一下:

image-20240411085932081

image-20240411085954452

1
翻译为中文:

image-20240411090030185