Spark on Hive——Spark远程连接Hive,对Hive中的表进行操作
目录
1.开启hadoop集群
2.开启mysql
3.开启hive远程连接
4.将hive下面的hive-site.xml和jar包拷贝到spark的conf目录下
5.重启spark-shell
6.查询数据库的表
7.IDEA操作连接hive
8.Spark读取Hive中的库
9.Spark操作Hive中指定表
10.Spark处理后的数据保存在Hive中——三种方法
11.Spark连接Mysql,操作Mysql中的表
在Spark中操作Hive,此时的HIve基于内存,而不是MR,速度更快。
1.开启hadoop集群
start-dfs.sh
start-yarn.sh
2.开启mysql
mysql -uroot -p
3.开启hive远程连接
nohup hive --service hiveserver2 &
nohup hive --service metastore &
4.将hive下面的hive-site.xml和jar包拷贝到spark的conf目录下
[root@lxm147 jars]# pwd
/opt/soft/spark312/conf[root@lxm147 conf]# cp /opt/soft/hive312/conf/hive-site.xml /opt/soft/spark312/conf/[root@lxm147 jars]# pwd
/opt/soft/spark312/jars
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-beeline-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-cli-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-exec-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-jdbc-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/hive-metastore-3.1.2.jar ./
[root@lxm147 jars]# cp /opt/soft/hive312/lib/mysql-connector-java-8.0.29.jar ./
hive-site.xml内容如下:
下面的配置一定要加上,否则无法连接:
<property>
<name>hive.metastore.uris</name>
<value>thrift://lxm147:9083</value>
</property>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property><name>hive.metastore.warehouse.dir</name><value>/opt/soft/hive312/warehouse</value><description></description>
</property>
<property><name>hive.metastore.db.type</name><value>mysql</value><description>使用连接的数据库</description>
</property>
<property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://192.168.180.141:3306/hive147?createDatabaseIfNotExist=true</value><description>Mysql的url</description>
</property>
<property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.cj.jdbc.Driver</value><description></description>
</property>
<property><name>javax.jdo.option.ConnectionUserName</name><value>root</value><description></description>
</property>
<property><name>javax.jdo.option.ConnectionPassword</name><value>root</value><description></description>
</property>
<property><name>hive.metastore.schema.verification</name><value>false</value><description>关闭schema验证</description>
</property>
<property><name>hive.cli.print.current.db</name><value>true</value><description>提示当前数据库名</description>
</property>
<property><name>hive.cli.print.header</name><value>true</value><description>查询输出时带列名一起输出</description>
</property>
<property><name>hive.server2.active.passive.ha.enable</name><value>true</value>
</property><property>
<name>hive.metastore.local</name>
<value>false</value>
<description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://lxm147:9083</value>
</property><property><name>hive.zookeeper.quorum</name><value>192.168.180.147</value>
</property>
<property><name>hbase.zookeeper.quorum</name><value>192.168.180.147</value>
</property>
<property><name>hive.aux.jars.path</name><value>file:///opt/soft/hive312/lib/hive-hbase-handler-3.1.2.jar,file:///opt/soft/hive312/lib/zookeeper-3.4.6.jar,file:///opt/soft/hive312/lib/hbase-client-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-common-2.3.5-tests.jar,file:///opt/soft/hive312/lib/hbase-server-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-common-2.3.5.jar,file:///opt/soft/hive312/lib/hbase-protocol-2.3.5.jar,file:///opt/soft/hive312/lib/htrace-core-3.2.0-incubating.jar</value>
</property></configuration>
5.重启spark-shell
[root@lxm147 jars]# spark-shell
6.查询数据库的表
scala> spark.table("db_hive2.login_events").show
2023-04-04 13:33:35,138 WARN conf.HiveConf: HiveConf of name hive.metastore.local does not exist
2023-04-04 13:33:35,138 WARN conf.HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist
2023-04-04 13:33:35,139 WARN conf.HiveConf: HiveConf of name hive.metastore.db.type does not exist
+-------+-------------------+
|user_id| login_datetime|
+-------+-------------------+
| 100|2021-12-01 19:00:00|
| 100|2021-12-01 19:30:00|
| 100|2021-12-02 21:01:00|
| 100|2021-12-03 11:01:00|
| 101|2021-12-01 19:05:00|
| 101|2021-12-01 21:05:00|
| 101|2021-12-03 21:05:00|
| 101|2021-12-05 15:05:00|
| 101|2021-12-06 19:05:00|
| 102|2021-12-01 19:55:00|
| 102|2021-12-01 21:05:00|
| 102|2021-12-02 21:57:00|
| 102|2021-12-03 19:10:00|
| 104|2021-12-04 21:57:00|
| 104|2021-12-02 22:57:00|
| 105|2021-12-01 10:01:00|
+-------+-------------------+
7.IDEA操作连接hive
import org.apache.spark.sql.SparkSessionobject SparkGive {def main(args: Array[String]): Unit = {// 创建会话val spark: SparkSession = SparkSession.builder().appName("sparkhive").master("local[*]").config("hive.metastore.uris","thrift://192.168.180.147:9083").enableHiveSupport().getOrCreate()// 关闭会话spark.close()}
}
8.Spark读取Hive中的库
// 读取数据库 spark.sql("show databases").show()/*
+---------+
|namespace|
+---------+
| atguigu|
| bigdata|
| db_hive2|
| default|
| lalian|
| mydb|
| shopping|
+---------+
*/
9.Spark操作Hive中指定表
// 读取指定数据库指定表val ecd = spark.table("shopping.ext_customer_details")ecd.show()+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
| 1| Spencer| Raffeorty|sraffeorty0@dropb...| Male| 9274 Lyons Court| China| Khmer|Safety Technician...| jcb| 3589373385487669|
| 2| Cherye| Poynor| cpoynor1@51.la|Female|1377 Anzinger Avenue| China| Czech| Research Nurse| instapayment| 6376594861844533|
| 3| Natasha| Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane| China| Yiddish|Budget/Accounting...| visa| 4041591905616356|
| 4| Huntley| Seally| hseally3@prlog.org| Male| 694 Del Sol Lane| China| Albanian|Environmental Spe...| laser| 677118310740263477|
| 5| Druci| Coad| dcoad4@weibo.com|Female| 16 Debs Way| China| Hebrew| Teacher| jcb| 3537287259845047|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+import spark.implicits._import org.apache.spark.sql.functions._// 过滤首行val cuDF: Dataset[Row] = ecd.filter($"customer_id" =!= "customer_id")+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
|customer_id|first_name| last_name| email|gender| address|country| language| job| credit_type| credit_no|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+
| 1| Spencer| Raffeorty|sraffeorty0@dropb...| Male| 9274 Lyons Court| China| Khmer|Safety Technician...| jcb| 3589373385487669|
| 2| Cherye| Poynor| cpoynor1@51.la|Female|1377 Anzinger Avenue| China| Czech| Research Nurse| instapayment| 6376594861844533|
| 3| Natasha| Abendroth|nabendroth2@scrib...|Female| 2913 Evergreen Lane| China| Yiddish|Budget/Accounting...| visa| 4041591905616356|
| 4| Huntley| Seally| hseally3@prlog.org| Male| 694 Del Sol Lane| China| Albanian|Environmental Spe...| laser| 677118310740263477|
| 5| Druci| Coad| dcoad4@weibo.com|Female| 16 Debs Way| China| Hebrew| Teacher| jcb| 3537287259845047|
+-----------+----------+-----------+--------------------+------+--------------------+-------+---------+--------------------+--------------------+-------------------+// 求每个国家有多少人val userDF: DataFrame = cuDF.groupBy("country").agg(count("customer_id").as("userNum"))userDF.printSchema()/*root|-- country: string (nullable = true)|-- userNum: long (nullable = false)*/userDF.show()/*+-------------+-------+| country|userNum|+-------------+-------+| China| 426||United States| 51|| Canada| 23|+-------------+-------+*/
10.Spark处理后的数据保存在Hive中——三种方法
// 将计算出的结果保存
// userDF.write.saveAsTable("shopping.userDF")
// userDF.write.mode("append").saveAsTable("shopping.userDF")
// 使用枚举也可以进行存储userDF.write.mode(SaveMode.Append).saveAsTable("shopping.userDF")// 也可以将spark处理好的数据写入Mysql,用BI工具展示
11.Spark连接Mysql,操作Mysql中的表
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import java.util.Propertiesobject DataFrameMysql {def main(args: Array[String]): Unit = {// 创建会话val spark: SparkSession = SparkSession.builder().appName("dataframemysql").master("local[*]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// spark连接Mysql数据库,注意指定数据库名val url = "jdbc:mysql://192.168.180.141:3306/mysql50"val user = "root"val pwd = "数据库密码"val driver = "com.mysql.cj.jdbc.Driver"val properties = new Properties()properties.setProperty("user", user)properties.setProperty("password", pwd)properties.setProperty("driver", driver)// spark读取所有文件,所有文件的区别// spark读取数据库下的表,转换为DataFrameval studentDF: DataFrame = spark.read.jdbc(url, "student", properties)val scDF: DataFrame = spark.read.jdbc(url, "sc", properties)val courseDF: DataFrame = spark.read.jdbc(url, "course", properties)val teacherDF: DataFrame = spark.read.jdbc(url, "teacher", properties)studentDF.show()scDF.show()courseDF.show()teacherDF.show()// 将scDF作为新表进行存储scDF.write.mode(SaveMode.Overwrite).jdbc(url, "sc2", properties)spark.close()}
}