Spark日志分析项目
目录
一、日志数据清洗
(一)需求概览——数据清洗
(二)代码实现
1.环境配置
2.创建Row对象
3.创建Schema
4.创建DataFrame
5.删除重复数据
6.单独处理url,并转为Row对象
7.再次创建Schema
8.再次创建DataFrame
9.创建JdbcUtils工具类,连接Mysql数据库
10.将结果写入Mysql数据库
二、用户留存分析
(一)需求概览
(二)代码实现——计算次日留存率
1.环境配置
2.在JdbcUtils中新增表和自定义函数
3.Retention中读取Mysql中的数据——步骤一清洗的数据
4.找出所有注册用户
5.找出所有登录用户
6.关联注册表和登录表
7.对时间进行处理
8.过滤出注册日期+86,400,000=登录日期的用户——求出前一天注册,当天登录的用户数量
9.求出当天注册的用户数
10.关联上面两张表
11.求出用户次日留存率
12.结果存入Mysql中
三、活跃用户分析
(一)需求概览
(一)代码实现
1.读取清洗后的日志数据并过滤出活跃用户
2.修改DataSet=>二元组
3.按天进行聚合,求出活跃用户数并去重
4.JdbcUtils中新增活跃用户变量
5.结果存入Mysql中
6.打包,上传到虚拟机运行
使用Spark完成下列日志分析项目需求:
一、日志数据清洗
(一)需求概览——数据清洗
1.读入日志文件并转化为RDD[Row]类型
- 按照Tab切割数据
- 过滤掉字段数量少于8个的
2.对数据进行清洗
- 按照第一列和第二列对数据进行去重
- 过滤掉状态码非200
- 过滤掉event_time为空的数据
- 将url按照”&”以及”=”切割
3.保存数据
- 将数据写入mysql表中
(二)代码实现
1.环境配置
import org.apache.spark.rdd.RDD
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}object EtlDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("etldemo").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._import org.apache.spark.sql.functions._spark.close()sc.stop()}
}
2.创建Row对象
// TODO 加载日志文件数据,按照\\t分割,过滤出长度为8的数据,将数据封装到Row对象中val rowRDD: RDD[Row] = sc.textFile("in/test.log").map(x => x.split("\\t")).filter(x => x.length >= 8).map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
3.创建Schema
// TODO 创建schemaval schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("url", StringType),StructField("method", StringType),StructField("status", StringType),StructField("sip", StringType),StructField("user_uip", StringType),StructField("action_prepend", StringType),StructField("action_client", StringType)))
4.创建DataFrame
// TODO 定义好列的Row对象转为DataFrameval logDF: DataFrame = spark.createDataFrame(rowRDD, schema)logDF.printSchema()logDF.show(3,truncate = false)+-------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------+---------------+--------+--------------+----------------------------------+
|event_time |url |method|status|sip |user_uip|action_prepend|action_client |
+-------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------+---------------+--------+--------------+----------------------------------+
|2018-09-04T20:27:31+08:00|http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157|GET |200 |192.168.168.64 |- |- |Apache-HttpClient/4.1.2 (java 1.5)|
|2018-09-04T20:27:31+08:00|http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106 |GET |200 |192.168.168.63 |- |- |Apache-HttpClient/4.1.2 (java 1.5)|
|2018-09-04T20:27:31+08:00|http://datacenter.bdqn.cn/logs/learn?ifEquipment=005&userSID=sodwly%40163.com&actionValue=23710&actionBegin=1536150451762&actionName=StartLearn&userUIP=192.168.168.14&actionType=3&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.26+Safari%2F537.36+Core%2F1.63.6726.400+QQBrowser%2F10.2.2265.400&actionEnd=1536150451766&userUID=719786&actionTest=0 |GET |200 |192.168.168.124|- |- |Apache-HttpClient/4.1.2 (java 1.5)|
+-------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------+---------------+--------+--------------+----------------------------------+
5.删除重复数据
val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url").filter(x => x(3) == "200") // 3是索引值.filter(x => StringUtils.isNotEmpty(x(0).toString))
filterLogs.show(3,false)// 去重后的结果+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------+--------------+--------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|event_time |url |method|status|sip |user_uip |action_prepend |action_client |
+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------+--------------+--------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-04T20:27:43+08:00|http://datacenter.kgc.cn/logs/user?userUID=17988434&userSID=ar2bfjun52keidu11v5bibi4p0&userUIP=114.252.24.227&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.26+Safari%2F537.36+Core%2F1.63.5773.400+QQBrowser%2F10.2.2059.400&actionBegin=1536150463&actionEnd=1536150463&actionType=3&actionPrepend=http%3A%2F%2Fwww.kgc.cn%2Fmy&actionTest=1&ifEquipment=web&actionName=PlayOnlineTime&pic=31853&pre=33&playtime=60|GET |200 |61.240.144.25 |114.252.24.227|http://www.kgc.cn/my|Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.26 Safari/537.36 Core/1.63.5773.400 QQBrowser/10.2.2059.400|
|2018-09-04T20:27:51+08:00|http://datacenter.bdqn.cn/logs/user?actionBegin=1536150471528&actionClient=CourseWorksBirds%2F2.0.8+%28iPhone%3B+iOS+11.4.1%3B+Scale%2F3.00%29&actionEnd=1536150471919&actionName=RegisteredExam&actionTest=0&actionType=11&actionValue=83320&clientType=008&examType=001&ifEquipment=mobile&userSID=973ECBB62CBFE9750CD4F59AAC1B104C.kgc-tiku-node2.kgc-tiku-node2&userUID=83320&userUIP=223.104.63.96 |GET |200 |192.168.168.63|- |- |Apache-HttpClient/4.1.2 (java 1.5) |
|2018-09-04T20:27:59+08:00|http://datacenter.bdqn.cn/logs/user?userUID=0&actionClient=Android&actionBegin=&actionEnd=&actionType=3&actionName=MinimizeAPP&actionValue=&actionPrepend=&actionTest=1&ifEquipment=mobile&projectName=ykt |GET |200 |183.136.133.44|1.196.181.58 |- |Mozilla/5.0 (Linux; U; Android 7.0; zh-cn; FRD-AL10 Build/HUAWEIFRD-AL10) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 |
+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------+--------------+--------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
6.单独处理url,并转为Row对象
val full_logs_rdd: RDD[Row] = filterLogs.map(line => {val str: String = line.getAs[String]("url")val paramsArray: Array[String] = str.split("\\\\?") // ?左右都有数据var paramsMap: Map[String, String] = nullif (paramsArray.length == 2) {val tuples: Array[(String, String)] = paramsArray(1).split("&").map(x => x.split("=")) // name=zhang [name,zhangsan].filter(x => x.length == 2).map(x => (x(0), x(1))) // (name,zhangsan)paramsMap = tuples.toMap // 转为map进行k,v取值}(line.getAs[String]("event_time"),paramsMap.getOrElse[String]("userUID", ""),paramsMap.getOrElse[String]("userSID", ""),paramsMap.getOrElse[String]("actionBegin", ""),paramsMap.getOrElse[String]("actionEnd", ""),paramsMap.getOrElse[String]("actionType", ""),paramsMap.getOrElse[String]("actionName", ""),paramsMap.getOrElse[String]("actionValue", ""),paramsMap.getOrElse[String]("actionTest", ""),paramsMap.getOrElse[String]("ifEquipment", ""), // 工作后所有属性都要加上line.getAs[String]("method"),line.getAs[String]("status"),line.getAs[String]("sip"),line.getAs[String]("user_uip"),line.getAs[String]("action_prepend"),line.getAs[String]("action_client"),)}).toDF().rdd
7.再次创建Schema
val full_logs_schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("userUID", StringType),StructField("userSID", StringType),StructField("actionBegin", StringType),StructField("actionEnd", StringType),StructField("actionType", StringType),StructField("actionName", StringType),StructField("actionValue", StringType),StructField("actionTest", StringType),StructField("ifEquipment", StringType),StructField("method", StringType),StructField("status", StringType),StructField("sip", StringType),StructField("user_uip", StringType),StructField("action_prepend", StringType),StructField("action_client", StringType)))
8.再次创建DataFrame
val full_logDF: DataFrame = spark.createDataFrame(full_logs_rdd, full_logs_schema)full_logDF.printSchema()full_logDF.show(3, truncate = false) // true 内容过长,会隐藏一部分内容,false 不会隐藏过长内容+-------------------------+--------+--------------------------------------------------------------+-------------+-------------+----------+--------------+-----------+----------+-----------+------+------+--------------+--------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|event_time |userUID |userSID |actionBegin |actionEnd |actionType|actionName |actionValue|actionTest|ifEquipment|method|status|sip |user_uip |action_prepend |action_client |
+-------------------------+--------+--------------------------------------------------------------+-------------+-------------+----------+--------------+-----------+----------+-----------+------+------+--------------+--------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|2018-09-04T20:27:43+08:00|17988434|ar2bfjun52keidu11v5bibi4p0 |1536150463 |1536150463 |3 |PlayOnlineTime| |1 |web |GET |200 |61.240.144.25 |114.252.24.227|http://www.kgc.cn/my|Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.26 Safari/537.36 Core/1.63.5773.400 QQBrowser/10.2.2059.400|
|2018-09-04T20:27:51+08:00|83320 |973ECBB62CBFE9750CD4F59AAC1B104C.kgc-tiku-node2.kgc-tiku-node2|1536150471528|1536150471919|11 |RegisteredExam|83320 |0 |mobile |GET |200 |192.168.168.63|- |- |Apache-HttpClient/4.1.2 (java 1.5) |
|2018-09-04T20:27:59+08:00|0 | | | |3 |MinimizeAPP | |1 |mobile |GET |200 |183.136.133.44|1.196.181.58 |- |Mozilla/5.0 (Linux; U; Android 7.0; zh-cn; FRD-AL10 Build/HUAWEIFRD-AL10) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 |
+-------------------------+--------+--------------------------------------------------------------+-------------+-------------+----------+--------------+-----------+----------+-----------+------+------+--------------+--------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
9.创建JdbcUtils工具类,连接Mysql数据库
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}import java.util.Propertiesobject JdbcUtils {val url = "jdbc:mysql://localhost:3306/logdemo?createDatabaseIfNotExist=true"val user = "root"val password = "root"val driver = "com.mysql.cj.jdbc.Driver"// 数据库表的名字-定义两个表val table_access_logs: String = "access_logs"val table_full_access_logs: String = "full_access_logs"val properties = new Properties()properties.setProperty("user", JdbcUtils.user)properties.setProperty("password", JdbcUtils.password)properties.setProperty("driver", JdbcUtils.driver)def dateFrameToMysql(df: DataFrame, table: String, op: Int = 1): Unit = {if (op == 0) {df.write.mode(SaveMode.Append).jdbc(JdbcUtils.url, table, properties)} else {df.write.mode(SaveMode.Overwrite).jdbc(JdbcUtils.url, table, properties)}}
}
10.将结果写入Mysql数据库
(1)初次清理后的日志数据
JdbcUtils.dateFrameToMysql(filterLogs, JdbcUtils.table_access_logs, 1)
(2)清理完url的数据
JdbcUtils.dateFrameToMysql(full_logDF, JdbcUtils.table_full_access_logs, 1)
二、用户留存分析
(一)需求概览
1.计算用户的次日留存率
- 求当天新增用户总数n
- 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
- m/n*100%
2.计算用户的次周留存率
(二)代码实现——计算次日留存率
1.环境配置
import utils.JdbcUtils
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}import java.text.SimpleDateFormat/* 计算用户的次日留存率*/
object Retention {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("retentionDemo").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._import org.apache.spark.sql.functions._spark.close()sc.stop()}
}
2.在JdbcUtils中新增表和自定义函数
val table_retention:String="retention" // 用户次日留存率// TODO 读取Mysql中的数据def getDataFrameByTableName(spark: SparkSession, table: String): DataFrame = {val frame: DataFrame = spark.read.jdbc(JdbcUtils.url, table, JdbcUtils.properties)frame}
3.Retention中读取Mysql中的数据——步骤一清洗的数据
val logs: DataFrame = JdbcUtils.getDataFrameByTableName(spark, JdbcUtils.table_full_access_logs)logs.printSchema()logs.show(3,truncate = false)+-------------------------+--------+--------------------------------------------------------------+-------------+-------------+----------+--------------+-----------+----------+-----------+------+------+--------------+--------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+
|event_time |userUID |userSID |actionBegin |actionEnd |actionType|actionName |actionValue|actionTest|ifEquipment|method|status|sip |user_uip |action_prepend |action_client |
+-------------------------+--------+--------------------------------------------------------------+-------------+-------------+----------+--------------+-----------+----------+-----------+------+------+--------------+--------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+
|2018-09-04T20:27:34+08:00|248467 |5486EFD9C759F7B078B633D70ADAD6BB.kgc-tiku-node1.kgc-tiku-node1|1536150454841|1536150454887|3 |answerQuestion|248467 |0 |web |GET |200 |192.168.168.71|- |- |Apache-HttpClient/4.1.2 (java 1.5) |
|2018-09-04T20:27:34+08:00|18191750|ascc1fjkotojgev1odnbcfjv63 |1536150454 |1536150454 |3 |PlayOnlineTime| |1 |web |GET |200 |112.25.60.21 |183.214.52.234|http://www.kgc.cn/course/27977.shtml|Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299|
|2018-09-04T20:27:57+08:00|228001 |7A2529C59592B32FE8B35F9396531EFB.kgc-tiku-node2.kgc-tiku-node2|1536150476622|1536150476999|11 |RegisteredExam|228001 |0 |mobile |GET |200 |192.168.168.64|- |- |Apache-HttpClient/4.1.2 (java 1.5) |
+-------------------------+--------+--------------------------------------------------------------+-------------+-------------+----------+--------------+-----------+----------+-----------+------+------+--------------+--------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+
4.找出所有注册用户
// TODO 过滤出所有的事件为Registered的日志,并且修改事件时间(event_time)为注册时间(register_time)// TODO 找出注册用户ID和注册时间val registered: DataFrame = logs.filter($"actionName" === "Registered") // 找出新注册的用户.withColumnRenamed("event_time", "register_time") // 修改字段名为注册时间.select("userUID", "register_time") // 查询指定字段registered.show(3,truncate = false)// 查看用户注册表+-------+-------------------------+|userUID|register_time |+-------+-------------------------+|848343 |2018-09-04T20:28:29+08:00||127677 |2018-09-04T20:28:13+08:00||831533 |2018-09-04T20:29:49+08:00|+-------+-------------------------+
5.找出所有登录用户
// TODO 找出ActionName为Signin的日志数据——找出每日登录用户val signin: DataFrame = logs.filter($"ActionName" === "Signin").withColumnRenamed("event_time", "signin_time").select("userUID", "signin_time")signin.printSchema()signin.show(3, false)// 查看用户登录表/*登录用户信息+-------+-------------------------+|userUID|signin_time |+-------+-------------------------+|81680 |2018-09-05T20:27:35+08:00||268372 |2018-09-05T20:29:34+08:00||853998 |2018-09-05T20:29:42+08:00|+-------+-------------------------+*/
6.关联注册表和登录表
// TODO 上面两表关联,找出所有用户的注册信息val joined: DataFrame = registered.join(signin, Seq("userUID"), "left") // Seq可以加多个字符串joined.printSchema()joined.show(3, false)注册时间 登录时间+--------+-------------------------+-------------------------+|userUID |register_time |signin_time |+--------+-------------------------+-------------------------+|147754 |2018-09-04T20:34:45+08:00|2018-09-05T20:34:45+08:00||18040025|2018-09-04T20:32:35+08:00|2018-09-05T20:32:35+08:00||200002 |2018-09-04T20:28:58+08:00|2018-09-05T20:28:58+08:00|+--------+-------------------------+-------------------------+// TODO 其他使用join的方式:// registered.join(signin, $"userUID"===$"userUID2", "left")// $符号只能有一个相同的字段// val joined2: DataFrame = registered.as("r1").join(signin.as("s1"), $"r1.userUID" === $"s1.userUID")// joined2.show(3, false)// 这种关联有冗余字段/*+--------+-------------------------+--------+-------------------------+|userUID |register_time |userUID |signin_time |+--------+-------------------------+--------+-------------------------+|147754 |2018-09-04T20:34:45+08:00|147754 |2018-09-05T20:34:45+08:00||18040025|2018-09-04T20:32:35+08:00|18040025|2018-09-05T20:32:35+08:00||200002 |2018-09-04T20:28:58+08:00|200002 |2018-09-05T20:28:58+08:00|+--------+-------------------------+--------+-------------------------+*/
7.对时间进行处理
注意:由关联表可知,如果用户第一天注册日期+24h=第二天登录日期,就说明用户在第二天留存了。
// TODO 处理时间val dateFormat = new SimpleDateFormat("yyyy-MM-dd")// 自定义函数UDF 自定义函数名 函数体val mydefDataformat: UserDefinedFunction = spark.udf.register("mydefDataformat", (event_time: String) => {if (StringUtils.isEmpty(event_time)) // 如果时间是null则返回0,说明用户仅注册,未登录过0elsedateFormat.parse(event_time).getTime // 如果时间不为null,那么表中字符串转为日期格式并取出日期})// 新增一列,对关联表的列名进行处理val joined2: DataFrame = joined.withColumn("register_date", mydefDataformat($"register_time")).withColumn("signin_date", mydefDataformat($"signin_time"))joined2.printSchema()joined2.show(3, false)+--------+-------------------------+-------------------------+-------------+-------------+|userUID |register_time |signin_time |register_date|signin_date |+--------+-------------------------+-------------------------+-------------+-------------+|147754 |2018-09-04T20:34:45+08:00|2018-09-05T20:34:45+08:00|1535990400000|1536076800000||18040025|2018-09-04T20:32:35+08:00|2018-09-05T20:32:35+08:00|1535990400000|1536076800000||200002 |2018-09-04T20:28:58+08:00|2018-09-05T20:28:58+08:00|1535990400000|1536076800000|+--------+-------------------------+-------------------------+-------------+-------------+
8.过滤出注册日期+86,400,000=登录日期的用户——求出前一天注册,当天登录的用户数量
最好用时间戳(毫秒值)计算时间:
24h*60s*60m*1000ms=86,400,000ms
// TODO 求出前一天注册,当天登录的用户数量
// TODO 过滤出注册日期+86,400,000=登录日期的用户val siginNumDF: DataFrame = joined2.filter($"register_date" + 86400000 === $"signin_date").groupBy("register_date") // TODO 统计第一天注册并在第二天登录的人数.agg(countDistinct($"userUID").as("signinNum"))// 用户去重siginNumDF.printSchema()siginNumDF.show(truncate = false)// 前一天注册,次日登录的人数// 数据源中只有一天是符合情况的,所以只有一行+-------------+---------+|signin_date |signinNum|+-------------+---------+|1536076800000|355 |+-------------+---------+
9.求出当天注册的用户数
// TODO 求出当天注册用户数量val registerNumDF: DataFrame = joined2.groupBy($"register_date").agg(countDistinct("userUID").as("registerNum"))// registerNumDF.printSchema()// registerNumDF.show(3, false)+-------------+-----------+|register_date|registerNum|+-------------+-----------+|1535990400000|381 |+-------------+-----------+
10.关联上面两张表
// TODO 关联上面两张表val joinRegisAndSignDF: DataFrame = siginNumDF.join(registerNumDF, Seq("register_date"))joinRegisAndSignDF.printSchema()joinRegisAndSignDF.show(3, truncate = false)+-------------+---------+-----------+|register_date|signinNum|registerNum|+-------------+---------+-----------+|1535990400000|355 |381 |+-------------+---------+-----------+
11.求出用户次日留存率
// TODO 两数相除(注册用户中,第二天登录人数/所有注册人数)val resultRetention: DataFrame = joinRegisAndSignDF.select($"register_date", concat(round(($"signinNum" / $"registerNum") * 100, 2),lit("%")).as("percent"))resultRetention.show(3,truncate = false)+-------------+-------+|register_date|percent|+-------------+-------+|1535990400000|93.18% |+-------------+-------+
12.结果存入Mysql中
// TODO 存入Mysql中JdbcUtils.dateFrameToMysql(resultRetention, JdbcUtils.table_retention)
Mysql中的结果:
三、活跃用户分析
(一)需求概览
- 读取数据库,统计每天的活跃用户数
- 统计规则:有看课和买课行为的用户才属于活跃用户
- 对UID进行去重
(一)代码实现
1.读取清洗后的日志数据并过滤出活跃用户
// TODO 读取清洗后的日志数据表val logs: DataFrame = JdbcUtils.getDataFrameByTableName(spark, JdbcUtils.table_full_access_logs)// TODO 过滤出活跃用户——用户中有买课与看课学习的用户// 注意这里得出的是DataSetval ds: Dataset[Row] = logs.filter($"actionName" === "BuyCourse" || $"actionName" === "StartLearn")ds.printSchema()ds.show(3, truncate = false)+-------------------------+-------+--------------------------------+-------------+-------------+----------+----------+-----------+----------+-----------+------+------+---------------+--------+--------------+----------------------------------+|event_time |userUID|userSID |actionBegin |actionEnd |actionType|actionName|actionValue|actionTest|ifEquipment|method|status|sip |user_uip|action_prepend|action_client |+-------------------------+-------+--------------------------------+-------------+-------------+----------+----------+-----------+----------+-----------+------+------+---------------+--------+--------------+----------------------------------+|2018-09-04T20:28:32+08:00|751169 |0f99c62babab4acea2d67c9952ff9885|1536150512974|1536150512978|3 |StartLearn|23935 |0 |005 |GET |200 |192.168.168.148|- |- |Apache-HttpClient/4.1.2 (java 1.5)||2018-09-04T20:30:50+08:00|836603 |2c2878a3ca034d19aea54165ce8a9995|1536150650812|1536150650816|3 |StartLearn|23630 |0 |005 |GET |200 |192.168.168.124|- |- |Apache-HttpClient/4.1.2 (java 1.5)||2018-09-04T20:32:07+08:00|232157 |zhangruixuebjaccp%40126.com |1536150727508|1536150727512|3 |StartLearn|23600 |0 |005 |GET |200 |192.168.168.148|- |- |Apache-HttpClient/4.1.2 (java 1.5)|+-------------------------+-------+--------------------------------+-------------+-------------+----------+----------+-----------+----------+-----------+------+------+---------------+--------+--------------+----------------------------------+
2.修改DataSet=>二元组
// TODO DataSet=>二元组val ds2: Dataset[(String, String)] = ds.map(x =>(x.getAs[String]("userSID"),x.getAs[String]("event_time").substring(0, 10)))/*ds2.printSchema()ds2.show(3,false)*/+--------------------------------+---------+|_1 |_2 |+--------------------------------+---------+|0f99c62babab4acea2d67c9952ff9885|018-09-04||2c2878a3ca034d19aea54165ce8a9995|018-09-04||zhangruixuebjaccp%40126.com |018-09-04|+--------------------------------+---------+
3.按天进行聚合,求出活跃用户数并去重
// TODO 按天进行聚合,求出活跃用户数并去重val frame: DataFrame = ds2.withColumnRenamed("_2", "date").withColumnRenamed("_1", "userid").groupBy($"date").agg(countDistinct("userid").as("activeNum"))frame.printSchema()frame.show(3, truncate = false)+----------+---------+|date |activeNum|+----------+---------+|2018-09-04|275 ||2018-09-05|255 |+----------+---------+
4.JdbcUtils中新增活跃用户变量
val table_dayactive: String = "day_active" // 活跃用户
5.结果存入Mysql中
// TODO 存入Mysql中JdbcUtils.dateFrameToMysql(frame,JdbcUtils.table_dayactive,1)println("操作结束")
6.打包,上传到虚拟机运行
[root@lxm147 opt]# spark-submit \\
--master local \\
--executor-memory 2G --total-executor-cores 1 \\
--class nj.zb.kb21.etl0407.Active \\
./sparkstu-2.1.0-SNAPSHOT.jar
Mysql数据库: