> 文章列表 > 离线数据处理 任务三:指标计算

离线数据处理 任务三:指标计算

离线数据处理 任务三:指标计算

目录

在mysql中建数据表

任务

        接着数据清洗后的数据继续练习

在mysql中建数据表

        1、创建shtd_result数据库

create database shtd_result;

        2、创建provinceeverymonth表

CREATE TABLE `provinceeverymonth`  (`provinceid` int(20) COMMENT '省份表主键',`provincename` varchar(20) COMMENT '省份名称',`regionid` int(20) COMMENT '地区表主键',`regionname` varchar(20) COMMENT '地区名称',`totalconsumption` double COMMENT '订单金额',`totalorder` int(20) COMMENT '订单总数',`year` int(20) COMMENT '订单产生的年',`month` int(20) COMMENT '订单产生的月'
);

        3、创建 provinceavgcmp表

CREATE TABLE `provinceavgcmp`  (`provinceid` int(20) COMMENT '省份表主键',`provincename` varchar(20) COMMENT '省份名称',`provinceavgconsumption` double COMMENT '该省平均订单金额',`allprovinceavgconsumption` double COMMENT '所有省平均订单金额',`comparison` varchar(20) COMMENT '比较结果'
);

        4、创建usercontinueorder表

CREATE TABLE `usercontinueorder`  (`userid` int(20) COMMENT '客户主键',`username` varchar(20) COMMENT '客户名称',`day` varchar(20) COMMENT '日',`totalconsumption` double COMMENT '订单总金额',`totalorder` int(20) COMMENT '订单总数'
);

 任务

使用Scala编写spark工程代码,并计算相关指标。

:在指标计算中,不考虑订单信息表中order_status字段的值将所有订单视为有效订单计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。

1、根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条 

        如果存入mysql中字符类型数据变成了问号 

useUnicode=yes&characterEncoding=utf8

        则更改配置数据库的部分,一般是url,我的是application.yml里的配置url里或者在指定存储路径后加上该配置。 

 def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sum")val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()//mysql对象val pro: Properties = new Properties()pro.put("user","root")pro.put("password","p@ssw0rd")pro.put("driver","com.mysql.jdbc.Driver")//mysql连接val mTable: Unit = spark.read.jdbc("jdbc:mysql://master:3306/shtd_result", "provinceeverymonth", pro).createOrReplaceTempView("mysql_table")val df: DataFrame = spark.sql("""|select|cast(t1.id as int) provinceid,|cast(t1.name as string) provincename,|cast(t2.id as int) regionid,|cast(t2.region_name as string) regionname,|cast(SUM(t3.final_total_amount) as double) totalconsumption,|cast(COUNT(t4.order_id)as int) totalorder ,|cast(YEAR(t2.create_time) as int) year,|cast(MONTH(t2.create_time) as int) month|from dwd.dim_base_province t1| left join dwd.dim_base_region t2| on t1.region_id = t2.id and t2.etl_date = '20230403'| left join dwd.fact_order_info t3| on t1.id = t3.province_id and t3.etl_date = '20230401'| left join dwd.fact_order_detail t4| on t1.id = t4.sku_id and t4.etl_date = '20230401'|where t1.etl_date = '20230403'|group by|t1.id,t1.name,t2.id,t2.region_name,year,month|""".stripMargin)//写入mysqldf.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://master:3306/shtd_result?useUnicode=yes&characterEncoding=utf8","provinceeverymonth",pro)//查看写入的数据spark.sql("select * from mysql_table order by totalorder desc,totalconsumption desc,provinceid desc limit 5").showspark.close()}

2、请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条 

def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sum")val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()//mysql对象val pro: Properties = new Properties()pro.put("user","root")pro.put("password","p@ssw0rd")pro.put("driver","com.mysql.jdbc.Driver")//mysql连接val mTable: Unit = spark.read.jdbc("jdbc:mysql://master:3306/shtd_result", "provinceavgcmp", pro).createOrReplaceTempView("mysql_table")val df: DataFrame = spark.sql("""|select|  cast(t1.id as int) provinceid,|  cast(t1.name as string) provincename,|  cast(t2.pavg as double) provinceavgconsumption,|  cast(t2.allpavg as double) allprovinceavgconsumption,|  CASE|    WHEN t2.pavg > t2.allpavg THEN '高'|    WHEN t2.pavg < t2.allpavg THEN '低'|    ELSE '相同'|  END comparison|from dwd.dim_base_province t1|join (|  select|    tmp.province_id id,|    AVG(tmp.final_total_amount) pavg,|    (|      select AVG(final_total_amount)|      from dwd.fact_order_info|      where YEAR(create_time) = 2020 and MONTH(create_time) = 4|    ) allpavg|  from dwd.fact_order_info tmp|  where YEAR(tmp.create_time) = 2020 and MONTH(tmp.create_time) = 4|  group by tmp.province_id|) t2|on t1.id = t2.id|""".stripMargin)//写入mysqldf.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://master:3306/shtd_result?useUnicode=yes&characterEncoding=utf8","provinceavgcmp",pro)//查看写入的数据spark.sql("select * from mysql_table order by provinceid desc,provinceavgconsumption desc limit 5").show()spark.close()}

3、根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条 

def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sum")val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()//mysql对象val pro: Properties = new Properties()pro.put("user","root")pro.put("password","p@ssw0rd")pro.put("driver","com.mysql.jdbc.Driver")//mysql连接val mTable: Unit = spark.read.jdbc("jdbc:mysql://master:3306/shtd_result", "usercontinueorder", pro).createOrReplaceTempView("mysql_table")val df: DataFrame = spark.sql("""|select|T1.user_id userid,|T1.consignee username,|CONCAT(T1.day, '-', T2.day) day,|SUM(T1.final_total_amount + T2.final_total_amount) totalconsumption,|COUNT(T1.order_id) totalorder|from|(select|t1.user_id,|t1.consignee,|date_sub(t1.create_time, 1) day,|SUM(t1.final_total_amount) final_total_amount,|COUNT(t2.order_id) order_id|from dwd.fact_order_info t1|left join dwd.fact_order_detail t2|on t1.user_id = t2.id|group by t1.user_id,t1.consignee,date_sub(t1.create_time, 1)|) T1 left join|(select|t1.user_id,|t1.consignee,|date_sub(t1.create_time, 2) day,|SUM(t1.final_total_amount) final_total_amount,|COUNT(t2.order_id) order_id|from dwd.fact_order_info t1|left join dwd.fact_order_detail t2|on t1.user_id = t2.id|group by t1.user_id,t1.consignee,date_sub(t1.create_time, 2)|) T2|on T1.user_id = T2.user_id and T1.final_total_amount > T2.final_total_amount|group by T1.user_id, T1.consignee, CONCAT(T1.day, '-', T2.day)|""".stripMargin)df//写入mysqldf.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://master:3306/shtd_result?useUnicode=yes&characterEncoding=utf8","usercontinueorder",pro)//查看写入的数据spark.sql("select * from mysql_table order by totalorder desc,totalconsumption desc,userid desc limit 5").show()spark.close()}