> 文章列表 > Spark SQL 数据的加载和保存

Spark SQL 数据的加载和保存

Spark SQL 数据的加载和保存

目录

 

通用的加载和保存方式

1.1 加载数据

 1.2保存数据

1.3 Parquet

1. 加载数据

2.保存数据

1.4 JSON

1.导入隐式转换

2.加载 JSON 文件

3.创建临时表

4.数据查询

1.5 CSV


 

通用的加载和保存方式

SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的
API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式
为 parquet

1.1 加载数据

spark.read.load 是加载数据的通用方法

如果读取不同格式的数据,可以对不同的数据格式进行设定

scala> spark.read.format("…")[.option("…")].load("…")

  • format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
  • load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径。
  • option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
  • 我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,其实,我们也可以直接在文件上进行查询: 文件格式.`文件路径`

 
1.2保存数据

df.write.save 是保存数据的通用方法

scala>df.write.
csv jdbc json orc parquet textFile… …

如果保存不同格式的数据,可以对不同的数据格式进行设定


format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和
"textFile"。
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。
有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。

 
SaveMode 是一个枚举类,其中的常量包括:

1.3 Parquet

Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式
存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。
修改配置项 spark.sql.sources.default,可修改默认数据源格式。

 Apache Parquet是一种常见的列式存储文件格式,常用于Pig, Spark, Hive等大数据组件中,其后缀是.parquet。

核心特点有:

  1. 跨平台
  2. 可被各种文件系统识别的格式
  3. 按列存储数据
  4. 存储元数据

1. 加载数据

scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
scala> df.show

2.保存数据

scala> var df = spark.read.json("/opt/module/data/input/people.json")
//保存为 parquet 格式
scala> df.write.mode("append").save("/opt/module/data/output")

1.4 JSON

Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以
通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。
式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]

1.导入隐式转换

import spark.implicits._

2.加载 JSON 文件

val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)

3.创建临时表

peopleDF.createOrReplaceTempView("people")

4.数据查询

val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13
AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+

1.5 CSV

Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为
数据列
spark.read.format("csv").option("sep", ";").option("inferSchema",
"true").option("header", "true").load("data/user.csv")