Spark高级操作之json复杂和嵌套数据结构的操作
目录
一、配置基础信息,导包
二、创建一个没有任何嵌套的JSon Schema
三、Json数据转为DataSet
四、Json数据转为DataFrame
五、get_json_object从Json中取值——DataFrame
六、from_json()从Json中取值——DataSet
七、使用to_json()将DataSet转为Json格式
八、使用selectExpr()
九、读写文件
1.将处理后的json结果写入文件
2.读取parquet文件
之前写过一篇文章是分析日志数据的,数据比较规范,这次分析json格式的数据。
一、配置基础信息,导包
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/* ETL数据清洗*/// 创建好样例类
case class DeviceData(id: Int, device: String)object JsonStu {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("retentionDemo").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._import org.apache.spark.sql.functions._import org.apache.spark.sql.types._spark.close()sc.stop()}
}
二、创建一个没有任何嵌套的JSon Schema
这里的add方法与Array方法创建的结果都是一样的
val jsonSchema: StructType = new StructType().add("battery_level", LongType).add("c02_level", LongType).add("cca3", StringType).add("cn", StringType).add("device_id", LongType).add("device_type", StringType).add("signal", LongType).add("ip", StringType).add("temp", LongType).add("timestamp", TimestampType)
三、Json数据转为DataSet
val eventsDS: Dataset[DeviceData] = Seq((0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),(1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),(2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }""".stripMargin)).toDF("id", "device").as[DeviceData]eventsDS.printSchema()eventsDS.show(false)
DataSet结果:
root|-- id: integer (nullable = false)|-- device: string (nullable = true)+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |device |
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0 |{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }|
|1 |{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 } |
|2 |{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 } |
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
四、Json数据转为DataFrame
val eventsFromJSONDF: DataFrame = Seq((0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),(1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),(2,"""{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }""".stripMargin)).toDF("id", "device")
DataFrame结果:
root|-- id: integer (nullable = false)|-- device: string (nullable = true)+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |device |
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0 |{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }|
|1 |{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 } |
|2 |{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 } |
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
五、get_json_object从Json中取值——DataFrame
val jsDF: DataFrame = eventsFromJSONDF.select($"id" // 第一个字段, get_json_object($"device", "$.device_type").as("device_type") // 从第二个字段中抽取到相应的值, get_json_object($"device", "$.sensor-ipad").as("sensor-ipad"), get_json_object($"device", "$.ip").as("ip"), get_json_object($"device", "$.cca3").as("cca3"))jsDF.printSchema()jsDF.show(false)
结果:
root|-- id: integer (nullable = false)|-- device_type: string (nullable = true)|-- sensor-ipad: string (nullable = true)|-- ip: string (nullable = true)|-- cca3: string (nullable = true)+---+-------------+-----------+-------------+----+
|id |device_type |sensor-ipad|ip |cca3|
+---+-------------+-----------+-------------+----+
|0 |sensor-ipad |null |68.161.225.1 |USA |
|1 |sensor-igauge|null |213.161.254.1|NOR |
|2 |sensor-ipad |null |88.36.5.1 |ITA |
+---+-------------+-----------+-------------+----+
六、from_json()从Json中取值——DataSet
与get_json_object不同的是该方法,使用schema去抽取单独列。在dataset的api select中使用from_json()方法,可以从一个json 字符串中按照指定的schema格式抽取出来作为DataFrame的列。
也可以将所有在json中的属性和值当做一个devices的实体。我们不仅可以使用device.arrtibute去获取特定值,也可以使用*通配符。
直接从DataSet中获取:
val devicesDF : DataFrame = eventsDS.select($"id", from_json($"device", jsonSchema) as "devices").select($"devices.*") // 用*获取json中的值.filter($"devices.temp" > 25) // 对第二列中的值直接进行数据处理devicesDF .show(false)+-------------+---------+----+------+---------+-------------+------+-------------+----+-------------------+|battery_level|c02_level|cca3|cn |device_id|device_type |signal|ip |temp|timestamp |+-------------+---------+----+------+---------+-------------+------+-------------+----+-------------------+|6 |1413 |NOR |Norway|1 |sensor-igauge|18 |213.161.254.1|30 |2016-10-05 01:01:38|+-------------+---------+----+------+---------+-------------+------+-------------+----+-------------------+
val df1: Dataset[Row] = eventsDS.select($"id", from_json($"device", jsonSchema) as "json").select($"json.battery_level", $"json.ip", $"json.timestamp").filter($"json.temp" < 30)df1.show(false)+-------------+------------+-------------------+|battery_level|ip |timestamp |+-------------+------------+-------------------+|8 |68.161.225.1|2016-10-05 01:01:36||5 |88.36.5.1 |2016-10-05 01:01:40|+-------------+------------+-------------------+
七、使用to_json()将DataSet转为Json格式
val stringJsonDF : DataFrame = eventsDS.select(to_json(struct($"*"))).toDF("devices")stringJsonDF .show(false)/*+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+|devices |+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+|{"id":0,"device":"{\\"device_id\\": 0, \\"device_type\\": \\"sensor-ipad\\", \\"ip\\": \\"68.161.225.1\\", \\"cca3\\": \\"USA\\", \\"cn\\": \\"United States\\", \\"temp\\": 25, \\"signal\\": 23, \\"battery_level\\": 8, \\"c02_level\\": 917, \\"timestamp\\" :1475600496 }"}||{"id":1,"device":"{\\"device_id\\": 1, \\"device_type\\": \\"sensor-igauge\\", \\"ip\\": \\"213.161.254.1\\", \\"cca3\\": \\"NOR\\", \\"cn\\": \\"Norway\\", \\"temp\\": 30, \\"signal\\": 18, \\"battery_level\\": 6, \\"c02_level\\": 1413, \\"timestamp\\" :1475600498 }"} ||{"id":2,"device":"{\\"device_id\\": 2, \\"device_type\\": \\"sensor-ipad\\", \\"ip\\": \\"88.36.5.1\\", \\"cca3\\": \\"ITA\\", \\"cn\\": \\"Italy\\", \\"temp\\": 18, \\"signal\\": 25, \\"battery_level\\": 5, \\"c02_level\\": 1372, \\"timestamp\\" :1475600500 }"} |+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+*/
保存数据到kafka,注意依赖
stringJsonDF.write.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("topic", "iot-devices").save()// 依赖groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.1.0
八、使用selectExpr()
1.将列转化为一个JSON对象的另一种方式是使用selectExpr()功能函数。例如我们可以将device列转化为一个JSON对象。
val stringsDF: DataFrame = eventsDS.selectExpr("CAST(id AS INT)", "CAST(device AS STRING)")stringsDF.show(false)/*+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+|id |device |+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+|0 |{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }||1 |{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 } ||2 |{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 } |+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+*/
2.SelectExpr()方法的另一个用法,就是使用表达式作为参数,将它们转化为指定的列。
// DataFrame API
devicesDF.selectExpr("c02_level","round(c02_level/temp) as a1").orderBy("a1").show(false)// sparkSQL
devicesDF.createOrReplaceTempView("table")spark.sql("""|select c02_level,| round(c02_level/temp) as a1|from table|""".stripMargin).show(false)/*+---------+----+|c02_level|a1 |+---------+----+|1413 |47.0|+---------+----+*/
九、读写文件
1.将处理后的json结果写入文件
stringJsonDF.write.mode("overwrite").format("parquet").save("output")
2.读取parquet文件
val parquetDF: DataFrame = spark.read.parquet("output")parquetDF.show(false)+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|devices |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"id":0,"device":"{\\"device_id\\": 0, \\"device_type\\": \\"sensor-ipad\\", \\"ip\\": \\"68.161.225.1\\", \\"cca3\\": \\"USA\\", \\"cn\\": \\"United States\\", \\"temp\\": 25, \\"signal\\": 23, \\"battery_level\\": 8, \\"c02_level\\": 917, \\"timestamp\\" :1475600496 }"}|
|{"id":1,"device":"{\\"device_id\\": 1, \\"device_type\\": \\"sensor-igauge\\", \\"ip\\": \\"213.161.254.1\\", \\"cca3\\": \\"NOR\\", \\"cn\\": \\"Norway\\", \\"temp\\": 30, \\"signal\\": 18, \\"battery_level\\": 6, \\"c02_level\\": 1413, \\"timestamp\\" :1475600498 }"} |
|{"id":2,"device":"{\\"device_id\\": 2, \\"device_type\\": \\"sensor-ipad\\", \\"ip\\": \\"88.36.5.1\\", \\"cca3\\": \\"ITA\\", \\"cn\\": \\"Italy\\", \\"temp\\": 18, \\"signal\\": 25, \\"battery_level\\": 5, \\"c02_level\\": 1372, \\"timestamp\\" :1475600500 }"} |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+