> 文章列表 > Spark on Hive——Spark远程连接Hive,对Hive中的表进行操作

Spark on Hive——Spark远程连接Hive,对Hive中的表进行操作

Spark on Hive——Spark远程连接Hive,对Hive中的表进行操作

目录

1.开启hadoop集群

2.开启mysql

3.开启hive远程连接

4.将hive下面的hive-site.xml和jar包拷贝到spark的conf目录下

5.重启spark-shell

6.查询数据库的表

7.IDEA操作连接hive

8.Spark读取Hive中的库

9.Spark操作Hive中指定表

10.Spark处理后的数据保存在Hive中——三种方法

11.Spark连接Mysql,操作Mysql中的表


        在Spark中操作Hive,此时的HIve基于内存,而不是MR,速度更快。

1.开启hadoop集群

start-dfs.sh
start-yarn.sh

2.开启mysql

mysql -uroot -p

3.开启hive远程连接

nohup hive --service hiveserver2 &
nohup hive --service metastore &

4.将hive下面的hive-site.xml和jar包拷贝到spark的conf目录下

[root@lxm147 jars]# pwd
/opt/soft/spark312/conf[root@lxm147 conf]# cp /opt/soft/hive312/conf/hive-site.xml /opt/soft/spark312/conf/[root@lxm147 jars]# pwd
/opt/soft/spark312/jars
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-beeline-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-cli-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-exec-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-jdbc-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-metastore-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/mysql-connector-java-8.0.29.jar ./

hive-site.xml内容如下:

下面的配置一定要加上,否则无法连接:
<property>
<name>hive.metastore.uris</name>
<value>thrift://lxm147:9083</value>
</property>

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property><name>hive.metastore.warehouse.dir</name><value>/opt/soft/hive312/warehouse</value><description></description>
</property>
<property><name>hive.metastore.db.type</name><value>mysql</value><description>使用连接的数据库</description>
</property>
<property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://192.168.180.141:3306/hive147?createDatabaseIfNotExist=true</value><description>Mysql的url</description>
</property>
<property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.cj.jdbc.Driver</value><description></description>
</property>
<property><name>javax.jdo.option.ConnectionUserName</name><value>root</value><description></description>
</property>
<property><name>javax.jdo.option.ConnectionPassword</name><value>root</value><description></description>
</property>
<property><name>hive.metastore.schema.verification</name><value>false</value><description>关闭schema验证</description>
</property>
<property><name>hive.cli.print.current.db</name><value>true</value><description>提示当前数据库名</description>
</property>
<property><name>hive.cli.print.header</name><value>true</value><description>查询输出时带列名一起输出</description>
</property>
<property><name>hive.server2.active.passive.ha.enable</name><value>true</value>
</property><property>
<name>hive.metastore.local</name>
<value>false</value>
<description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://lxm147:9083</value>
</property><property><name>hive.zookeeper.quorum</name><value>192.168.180.147</value>
</property>
<property><name>hbase.zookeeper.quorum</name><value>192.168.180.147</value>
</property>
<property><name>hive.aux.jars.path</name><value>file:///opt/soft/hive312/lib/hive-hbase-handler-3.1.2.jar,file:///opt/soft/hive312/lib/zookeeper-3.4.6.jar,file:///opt/soft/hive312/lib/hbase-client-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-common-2.3.5-tests.jar,file:///opt/soft/hive312/lib/hbase-server-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-common-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-protocol-2.3.5.jar,file:///opt/soft/hive312/lib/htrace-core-3.2.0-incubating.jar</value>
</property></configuration>

5.重启spark-shell

[root@lxm147 jars]# spark-shell

6.查询数据库的表

scala> spark.table("db_hive2.login_events").show
2023-04-04 13:33:35,138 WARN conf.HiveConf: HiveConf of name hive.metastore.local does not exist
2023-04-04 13:33:35,138 WARN conf.HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist
2023-04-04 13:33:35,139 WARN conf.HiveConf: HiveConf of name hive.metastore.db.type does not exist
+-------+-------------------+                                                   
|user_id|     login_datetime|
+-------+-------------------+
|    100|2021-12-01 19:00:00|
|    100|2021-12-01 19:30:00|
|    100|2021-12-02 21:01:00|
|    100|2021-12-03 11:01:00|
|    101|2021-12-01 19:05:00|
|    101|2021-12-01 21:05:00|
|    101|2021-12-03 21:05:00|
|    101|2021-12-05 15:05:00|
|    101|2021-12-06 19:05:00|
|    102|2021-12-01 19:55:00|
|    102|2021-12-01 21:05:00|
|    102|2021-12-02 21:57:00|
|    102|2021-12-03 19:10:00|
|    104|2021-12-04 21:57:00|
|    104|2021-12-02 22:57:00|
|    105|2021-12-01 10:01:00|
+-------+-------------------+

7.IDEA操作连接hive

import org.apache.spark.sql.SparkSessionobject SparkGive {def main(args: Array[String]): Unit = {// 创建会话val spark: SparkSession = SparkSession.builder().appName("sparkhive").master("local[*]").config("hive.metastore.uris","thrift://192.168.180.147:9083").enableHiveSupport().getOrCreate()// 关闭会话spark.close()}
}

8.Spark读取Hive中的库

// 读取数据库   spark.sql("show databases").show()/*
+---------+
|namespace|
+---------+
|  atguigu|
|  bigdata|
| db_hive2|
|  default|
|   lalian|
|     mydb|
| shopping|
+---------+
*/

9.Spark操作Hive中指定表

    // 读取指定数据库指定表val ecd = spark.table("shopping.ext_customer_details")ecd.show()+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|customer_id|first_name|  last_name|               email|gender|             address|country| language|                 job|         credit_type|          credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|customer_id|first_name|  last_name|               email|gender|             address|country| language|                 job|         credit_type|          credit_no|
|          1|   Spencer|  Raffeorty|sraffeorty0@dropb...|  Male|    9274 Lyons Court|  China|    Khmer|Safety Technician...|                 jcb|   3589373385487669|
|          2|    Cherye|     Poynor|      cpoynor1@51.la|Female|1377 Anzinger Avenue|  China|    Czech|      Research Nurse|        instapayment|   6376594861844533|
|          3|   Natasha|  Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane|  China|  Yiddish|Budget/Accounting...|                visa|   4041591905616356|
|          4|   Huntley|     Seally|  hseally3@prlog.org|  Male|    694 Del Sol Lane|  China| Albanian|Environmental Spe...|               laser| 677118310740263477|
|          5|     Druci|       Coad|    dcoad4@weibo.com|Female|         16 Debs Way|  China|   Hebrew|             Teacher|                 jcb|   3537287259845047|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+import spark.implicits._import org.apache.spark.sql.functions._// 过滤首行val cuDF: Dataset[Row] = ecd.filter($"customer_id" =!= "customer_id")+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|customer_id|first_name|  last_name|               email|gender|             address|country| language|                 job|         credit_type|          credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|          1|   Spencer|  Raffeorty|sraffeorty0@dropb...|  Male|    9274 Lyons Court|  China|    Khmer|Safety Technician...|                 jcb|   3589373385487669|
|          2|    Cherye|     Poynor|      cpoynor1@51.la|Female|1377 Anzinger Avenue|  China|    Czech|      Research Nurse|        instapayment|   6376594861844533|
|          3|   Natasha|  Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane|  China|  Yiddish|Budget/Accounting...|                visa|   4041591905616356|
|          4|   Huntley|     Seally|  hseally3@prlog.org|  Male|    694 Del Sol Lane|  China| Albanian|Environmental Spe...|               laser| 677118310740263477|
|          5|     Druci|       Coad|    dcoad4@weibo.com|Female|         16 Debs Way|  China|   Hebrew|             Teacher|                 jcb|   3537287259845047|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+// 求每个国家有多少人val userDF: DataFrame = cuDF.groupBy("country").agg(count("customer_id").as("userNum"))userDF.printSchema()/*root|-- country: string (nullable = true)|-- userNum: long (nullable = false)*/userDF.show()/*+-------------+-------+|      country|userNum|+-------------+-------+|        China|    426||United States|     51||       Canada|     23|+-------------+-------+*/

10.Spark处理后的数据保存在Hive中——三种方法

// 将计算出的结果保存
//    userDF.write.saveAsTable("shopping.userDF")
//    userDF.write.mode("append").saveAsTable("shopping.userDF")
// 使用枚举也可以进行存储userDF.write.mode(SaveMode.Append).saveAsTable("shopping.userDF")// 也可以将spark处理好的数据写入Mysql,用BI工具展示

11.Spark连接Mysql,操作Mysql中的表

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.util.Propertiesobject DataFrameMysql {def main(args: Array[String]): Unit = {// 创建会话val spark: SparkSession = SparkSession.builder().appName("dataframemysql").master("local[*]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// spark连接Mysql数据库,注意指定数据库名val url = "jdbc:mysql://192.168.180.141:3306/mysql50"val user = "root"val pwd = "数据库密码"val driver = "com.mysql.cj.jdbc.Driver"val properties = new Properties()properties.setProperty("user", user)properties.setProperty("password", pwd)properties.setProperty("driver", driver)// spark读取所有文件,所有文件的区别// spark读取数据库下的表,转换为DataFrameval studentDF: DataFrame = spark.read.jdbc(url, "student", properties)val scDF: DataFrame = spark.read.jdbc(url, "sc", properties)val courseDF: DataFrame = spark.read.jdbc(url, "course", properties)val teacherDF: DataFrame = spark.read.jdbc(url, "teacher", properties)studentDF.show()scDF.show()courseDF.show()teacherDF.show()// 将scDF作为新表进行存储scDF.write.mode(SaveMode.Overwrite).jdbc(url, "sc2", properties)spark.close()}
}