大数据开发工程师-第十五周 SparkSQL 集成 Hive

大数据开发工程师-第十五周 SparkSQL 集成 Hive

SparkSQL 集成 Hive

1
2
3
SparkSQL集成Hive,其实就是在SparkSQL中直接操作Hive中的表。

注意:在SparkSQL中操作Hive表,底层走的就不是MapReduce计算引擎了,而是Spark引擎。SparkSQL会读取Hive中的元数据,以及存储在HDFS上的数据,最终通过Spark引擎进行计算。
1
2
3
4
5
6
7
8
9
10
11
通过这种方式,可以利用Spark计算引擎提高计算效率,并且也不需要每次在使用的时候临时在SparkSQL中建表,省略了建表这个复杂过程。
本来我们使用SparkSQL时比较麻烦的一个事情就是创建表,现在针对Hive中已有的表,可以直接在SparkSQL中使用,这样就比较方便了。

针对这块的操作,常见的有两种用法:
在SparkSQL命令行中集成Hive
这种方式便于在SparkSQL命令行中进行调试,主要在调试阶段会使用。

在SparkSQL代码中集成Hive
这种方式是在代码中通过SparkSQL直接操作Hive中的表,针对例行化的定时任务一般都是使用这种方式。

下面我们来具体演示一下这两种操作方式。

在SparkSQL 命令行中集成Hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
在之前安装配置好的spark3.2.1客户端节点的基础上修改SparkSQL相关的内容:
1:这里我不需要修改,因为我本身最开始安装的就是spark3,spark2是我的辅助版本

2:修改一下log4j日志文件,否则在使用SparkSQL命令行的时候会显示很多info、warn级别的无用日志,看起来比较烦。
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# cd conf/
[root@bigdata04 conf]# mv log4j.properties.template log4j.properties
[root@bigdata04 conf]# vi log4j.properties
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed wit
h
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version
2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impl
ied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss}
%p %c{1}: %m%n

# Set the default spark-shell/spark-sql log level to WARN. When runnin
g the
# spark-shell/spark-sql, the log level for these classes is used to ov
erwrite
# the root logger's log level, so that the user can have different def
aults
# for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
log4j.logger.org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver=ERROR

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=ERROR
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=E
RROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up none
xistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings invol
ved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilt
er
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred du
ring processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false

尽可能将里面的日志级别都改为ERROR级别即可。
1
2
3
3:将hive-site.xml文件中的核心配置保留下来,放到spark3.2.1的conf目录下。
hive-site.xml中只需要保留这些配置即可:
这里面的配置需要大家根据自己本地Hive中的实际配置进行修改。
1
2
3
4
4:将mysql的驱动jar包拷贝到spark3.2.1的jar目录中
[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# cd jars/
[root@bigdata04 jars]# ll mysql-connector-java-8.0.16.jar
-rw-r--r--. 1 root root 2293144 Sep 26 15:16 mysql-connector-java-8.0.16.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
5:执行spark-sql脚本,对hive中已有的表进行操作
注意:需要提前启动Hadoop集群。

[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# bin/spark-sql-3 --master yarn
hive.cli.print.current.db true
hive.cli.print.header true
Spark master: yarn, Application Id: application_1821941866439_0009
spark-sql> show tables;
t1
t2
...
Time taken: 0.147 seconds, Fetched 21 row(s)
spark-sql> create table s1(name string,age int);
Time taken: 2.024 seconds
spark-sql> insert into s1(name,age) values('zs',10);
Time taken: 5.97 seconds
spark-sql> select * from s1;
zs 10
Time taken: 5.183 seconds, Fetched 1 row(s)
1
2
3
注意:此时sparksql使用的就是hive的元数据了,所以在sparksql中创建的表会存储到hive中,在hive中也可以使用。

先查看一下hive中的元数据,是可以看到刚才创建的表信息的。

image-20240411100157376

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
然后在hive命令行中确认一下是否可以操作:
hive (default)> show tables;
s1
....
hive (default)> select * from s1;
OK
s1.name s1.age
zs 10

确认一下这个表的表结构信息,以及数据存储的位置:
hive (default)> show create table s1;
OK
createtab_stmt
CREATE TABLE `s1`(
`name` string,
`age` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://bigdata01:9000/user/hive/warehouse/s1'
TBLPROPERTIES (
'spark.sql.create.version'='3.2.1',
'spark.sql.sources.schema'='{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}}]}',
'transient_lastDdlTime'='1822029523')
Time taken: 0.177 seconds, Fetched: 15 row(s)

在LOCATION中可以看到这个表的存储目录是在hive的默认存储目录中。
在TBLPROPERTIES 可以看到spark.sql.create.version,表示这个表是通过sparksql创建的。
从这里可以看出来,通过SparkSQL创建的表和Hive创建的表没有什么区别。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
最后,大家如果想知道spark-sql-3脚本后面都可以传递哪些参数,可以在后面指定--help参数即可

[root@bigdata04 spark-3.2.1-bin-hadoop3.2]# bin/spark-sql-3 --help
Usage: ./bin/spark-sql [options] [cli option]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while
resolving the dependencies provided in --packages to avoid
dependency conflicts.
--repositories Comma-separated list of additional remote repositories to
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working
directory of each executor. File paths of these files
in executors can be accessed via SparkFiles.get(fileName).
--archives ARCHIVES Comma-separated list of archives to be extracted into the
working directory of each executor.

--conf, -c PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.

--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the
classpath.

--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).

--proxy-user NAME User to impersonate when submitting the application.
This argument does not work with --principal / --keytab.

--help, -h Show this help message and exit.
--verbose, -v Print additional debug output.
--version, Print the version of current Spark.

Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).

Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.

Spark standalone, Mesos or K8s with cluster deploy mode only:
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.

Spark standalone, Mesos and Kubernetes only:
--total-executor-cores NUM Total cores for all executors.

Spark standalone, YARN and Kubernetes only:
--executor-cores NUM Number of cores used by each executor. (Default: 1 in
YARN and K8S modes, or all available cores on the worker
in standalone mode).

Spark on YARN and Kubernetes only:
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of
executors will be at least NUM.
--principal PRINCIPAL Principal to be used to login to KDC.
--keytab KEYTAB The full path to the file that contains the keytab for the
principal specified above.

Spark on YARN only:
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").

CLI options:
-d,--define <key=value> Variable substitution to apply to Hive
commands. e.g. -d A=B or --define A=B
--database <databasename> Specify the database to use
-e <quoted-query-string> SQL from command line
-f <filename> SQL from files
-H,--help Print help information
--hiveconf <property=value> Use value for given property
--hivevar <key=value> Variable substitution to apply to Hive
commands. e.g. --hivevar A=B
-i <filename> Initialization SQL file
-S,--silent Silent mode in interactive shell
-v,--verbose Verbose mode (echo executed SQL to the
console)

在SparkSQL 代码中集成Hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
想要通过SparkSQL代码操作Hive,首先需要引入额外引入spark-hive相关的依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>

然后在项目的resouces目录中引入hive-site.xml配置。
如果集群中用到了HDFS的HA架构,需要引入HDFS相关的配置文件

image-20240411103041371

1
hive-site.xml配置文件内容和前面在SparkSQL命令行中使用的是一样的,只需要包含核心配置即可。

image-20240411103102780

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
在com.imooc.scala.sql包中创建一个类:SparkSQLReadHive

package com.imooc.scala.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 在代码中通过SparkSQL操作 Hive
* Created by xuwei
*/
object SparkSQLReadHive{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("SparkSQLReadHive")
.config(conf)
//开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
.enableHiveSupport()
.getOrCreate()
//执行sql查询
sparkSession.sql("select *from student_score").show()

sparkSession.stop()
}
}
1
2
3
4
5
代码在执行的时候会加载resources目录中hive-site.xml的配置。
22/04/12 15:45:42 INFO SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir.
22/04/12 15:45:43 INFO SharedState: Warehouse path is 'file:/user/hive/warehouse'.
22/04/12 15:45:44 INFO HiveUtils: Initializing HiveMetastoreConnection version 2.3.9 using Spark classes.
22/04/12 15:45:44 INFO HiveConf: Found configuration file file:/D:/IdeaProjects/db_spark3/target/classes/hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
在这里先注意日志中的这个参数:spark.sql.warehouse.dir其实这个参数指向的就是hive的数据存储目录。
在hive中是通过hive.metastore.warehouse.dir指定的,之前在sparksql中也是使用这个参数,但是从sparksql2.0版本开始,官方建议使用spark.sql.warehouse.dir参数。
但是也是兼容hive.metastore.warehouse.dir这个参数的,如果只设置了hive.metastore.warehouse.dir,则会把hive.metastore.warehouse.dir的值赋值给spark.sql.warehouse.dir。

最终可以看到sparksql程序的执行结果:

+---+----+-------+-----+
| id|name| sub|score|
+---+----+-------+-----+
| 1| zs1|chinese| 80|
| 2| zs1| math| 90|
| 3| zs1|english| 89|
| 4| zs2|chinese| 60|
| 5| zs2| math| 75|
| 6| zs2|english| 80|
| 7| zs3|chinese| 79|
| 8| zs3| math| 83|
| 9| zs3|english| 72|
| 10| zs4|chinese| 90|
| 11| zs4| math| 76|
| 12| zs4|english| 80|
| 13| zs5|chinese| 98|
| 14| zs5| math| 80|
| 15| zs5|english| 70|
+---+----+-------+-----+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
这样就可以实现通过SparkSQL查询hive中的数据了。

针对这个代码其实可以这样改造一下,后期在里面写多个SQL语句的时候看起来会比较简洁:

package com.imooc.scala.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 在代码中通过SparkSQL操作 Hive
* Created by xuwei
*/
object SparkSQLReadHive{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("SparkSQLReadHive")
.config(conf)
//开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
.enableHiveSupport()
.getOrCreate()

//执行sql查询
//sparkSession.sql("select *from student_score").show()

import sparkSession.sql
sql("select *from student_score").show()

sparkSession.stop()
}
}

SparkSQL写入Hive表的几种方式