离线数据处理 任务三:指标计算
目录
在mysql中建数据表
任务
接着数据清洗后的数据继续练习
在mysql中建数据表
1、创建shtd_result数据库
create database shtd_result;
2、创建provinceeverymonth表
CREATE TABLE `provinceeverymonth` (`provinceid` int(20) COMMENT '省份表主键',`provincename` varchar(20) COMMENT '省份名称',`regionid` int(20) COMMENT '地区表主键',`regionname` varchar(20) COMMENT '地区名称',`totalconsumption` double COMMENT '订单金额',`totalorder` int(20) COMMENT '订单总数',`year` int(20) COMMENT '订单产生的年',`month` int(20) COMMENT '订单产生的月'
);
3、创建 provinceavgcmp表
CREATE TABLE `provinceavgcmp` (`provinceid` int(20) COMMENT '省份表主键',`provincename` varchar(20) COMMENT '省份名称',`provinceavgconsumption` double COMMENT '该省平均订单金额',`allprovinceavgconsumption` double COMMENT '所有省平均订单金额',`comparison` varchar(20) COMMENT '比较结果'
);
4、创建usercontinueorder表
CREATE TABLE `usercontinueorder` (`userid` int(20) COMMENT '客户主键',`username` varchar(20) COMMENT '客户名称',`day` varchar(20) COMMENT '日',`totalconsumption` double COMMENT '订单总金额',`totalorder` int(20) COMMENT '订单总数'
);
任务
使用Scala编写spark工程代码,并计算相关指标。
注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。
1、根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条
如果存入mysql中字符类型数据变成了问号
useUnicode=yes&characterEncoding=utf8
则更改配置数据库的部分,一般是url,我的是application.yml里的配置url里或者在指定存储路径后加上该配置。
def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sum")val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()//mysql对象val pro: Properties = new Properties()pro.put("user","root")pro.put("password","p@ssw0rd")pro.put("driver","com.mysql.jdbc.Driver")//mysql连接val mTable: Unit = spark.read.jdbc("jdbc:mysql://master:3306/shtd_result", "provinceeverymonth", pro).createOrReplaceTempView("mysql_table")val df: DataFrame = spark.sql("""|select|cast(t1.id as int) provinceid,|cast(t1.name as string) provincename,|cast(t2.id as int) regionid,|cast(t2.region_name as string) regionname,|cast(SUM(t3.final_total_amount) as double) totalconsumption,|cast(COUNT(t4.order_id)as int) totalorder ,|cast(YEAR(t2.create_time) as int) year,|cast(MONTH(t2.create_time) as int) month|from dwd.dim_base_province t1| left join dwd.dim_base_region t2| on t1.region_id = t2.id and t2.etl_date = '20230403'| left join dwd.fact_order_info t3| on t1.id = t3.province_id and t3.etl_date = '20230401'| left join dwd.fact_order_detail t4| on t1.id = t4.sku_id and t4.etl_date = '20230401'|where t1.etl_date = '20230403'|group by|t1.id,t1.name,t2.id,t2.region_name,year,month|""".stripMargin)//写入mysqldf.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://master:3306/shtd_result?useUnicode=yes&characterEncoding=utf8","provinceeverymonth",pro)//查看写入的数据spark.sql("select * from mysql_table order by totalorder desc,totalconsumption desc,provinceid desc limit 5").showspark.close()}
2、请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条
def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sum")val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()//mysql对象val pro: Properties = new Properties()pro.put("user","root")pro.put("password","p@ssw0rd")pro.put("driver","com.mysql.jdbc.Driver")//mysql连接val mTable: Unit = spark.read.jdbc("jdbc:mysql://master:3306/shtd_result", "provinceavgcmp", pro).createOrReplaceTempView("mysql_table")val df: DataFrame = spark.sql("""|select| cast(t1.id as int) provinceid,| cast(t1.name as string) provincename,| cast(t2.pavg as double) provinceavgconsumption,| cast(t2.allpavg as double) allprovinceavgconsumption,| CASE| WHEN t2.pavg > t2.allpavg THEN '高'| WHEN t2.pavg < t2.allpavg THEN '低'| ELSE '相同'| END comparison|from dwd.dim_base_province t1|join (| select| tmp.province_id id,| AVG(tmp.final_total_amount) pavg,| (| select AVG(final_total_amount)| from dwd.fact_order_info| where YEAR(create_time) = 2020 and MONTH(create_time) = 4| ) allpavg| from dwd.fact_order_info tmp| where YEAR(tmp.create_time) = 2020 and MONTH(tmp.create_time) = 4| group by tmp.province_id|) t2|on t1.id = t2.id|""".stripMargin)//写入mysqldf.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://master:3306/shtd_result?useUnicode=yes&characterEncoding=utf8","provinceavgcmp",pro)//查看写入的数据spark.sql("select * from mysql_table order by provinceid desc,provinceavgconsumption desc limit 5").show()spark.close()}
3、根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条
def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sum")val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()//mysql对象val pro: Properties = new Properties()pro.put("user","root")pro.put("password","p@ssw0rd")pro.put("driver","com.mysql.jdbc.Driver")//mysql连接val mTable: Unit = spark.read.jdbc("jdbc:mysql://master:3306/shtd_result", "usercontinueorder", pro).createOrReplaceTempView("mysql_table")val df: DataFrame = spark.sql("""|select|T1.user_id userid,|T1.consignee username,|CONCAT(T1.day, '-', T2.day) day,|SUM(T1.final_total_amount + T2.final_total_amount) totalconsumption,|COUNT(T1.order_id) totalorder|from|(select|t1.user_id,|t1.consignee,|date_sub(t1.create_time, 1) day,|SUM(t1.final_total_amount) final_total_amount,|COUNT(t2.order_id) order_id|from dwd.fact_order_info t1|left join dwd.fact_order_detail t2|on t1.user_id = t2.id|group by t1.user_id,t1.consignee,date_sub(t1.create_time, 1)|) T1 left join|(select|t1.user_id,|t1.consignee,|date_sub(t1.create_time, 2) day,|SUM(t1.final_total_amount) final_total_amount,|COUNT(t2.order_id) order_id|from dwd.fact_order_info t1|left join dwd.fact_order_detail t2|on t1.user_id = t2.id|group by t1.user_id,t1.consignee,date_sub(t1.create_time, 2)|) T2|on T1.user_id = T2.user_id and T1.final_total_amount > T2.final_total_amount|group by T1.user_id, T1.consignee, CONCAT(T1.day, '-', T2.day)|""".stripMargin)df//写入mysqldf.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://master:3306/shtd_result?useUnicode=yes&characterEncoding=utf8","usercontinueorder",pro)//查看写入的数据spark.sql("select * from mysql_table order by totalorder desc,totalconsumption desc,userid desc limit 5").show()spark.close()}