> 文章列表 > Spark IPmapping方案

Spark IPmapping方案

Spark IPmapping方案

使用数据中的uid imei imsi mac androidid uuid 等标识字段,按优先级取一个标识,作为这条数据的用户唯一标识。有严重的漏洞。第一天登陆了,取uid,第二天没登录,取imei 是一个人吗。
 

在现实的日志数据中,用户可以使用各种各样的设备,有着各种各样的入口。甚至同一个人用户有多个设备以及使用多种前端入口。就会导致,日志数据中的同一个人,不同时段收集的数据中,取得的标识个数、种类各不相同。
手机、平板
        android ios winphone 手机
        android 系统的各种版本
        ios 也有各种版本

常见的用户设备标识:没办法轻易定制一个规则来取某个作为唯一标识
        mac 
        imei 入网许可证序号
        imsi  sim卡序号
        androidid 安卓系统id
        openuuid (app 自己生成的序号) 卸载重装app就会变更
        idfa  广告跟踪码
        deviceid app 日志收集埋点开发人员自己定义一种逻辑id
在复杂的各类id中,分辨出那些id属于同一设备,用普通的where很难实现

解决方案之一:redis
        从日志数据中抽取各种标识id
        将抽取出的标识id,去redis中查询是否存在
        如果不存在,新增一个guid
        存在,则使用以存在的标识
        存在问题:有些数据可能属于同一个人,但在每个时段,这些数据没有任何联系,此人的数据可能会标识上两个不同的标识。
        补救措施:定期对redis中id映射库进行整理合并。但是合并可能导致的问题,两个不同的人标识为同一个人

利用图计算实现ipmapping的过程:
        将当日的数据中的所有id标识,及标识字段之间的关联,生成点集合、边集合。
        将上一日的ids—>guid的映射关系,也生成点集合,边集合。
        将上面两类点集合、边集合合并到一起生成一个图。
        再对上述图执行最大连通子图的算法,得到一个连通子图结果
        再冲结果图中取得哪些属于同一组,并生成唯一标识。
        将上面步骤生成的唯一标识去比对前日的ids->映射表===》如果一个人已经存在guid,则沿用原来的guid

数据:

13866558899,zhangsan,wx_hq,2000
13877559988,sange,wx_hq,3000
,zhangsan,wx_lhq,5000
13912344678,lisi,wx_lxlt,12000
13912344678,lisi-1,wx_wk,3500
13912664678,lisi-2,wx_wk,5600

pom.xml文件添加

  <dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.11</artifactId><version>2.2.0</version></dependency>

完整pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>SparkIPMappingDemo</artifactId><version>1.0-SNAPSHOT</version><name>scala-demo-project</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.11.0.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc --><dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc_2.11</artifactId><version>3.1.0</version></dependency><!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc-config --><dependency><groupId>org.scalikejdbc</groupId><artifactId>scalikejdbc-config_2.11</artifactId><version>3.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.69</version></dependency><dependency><groupId>ch.hsr</groupId><artifactId>geohash</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.3</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api-scala_2.11</artifactId><version>11.0</version></dependency></dependencies><build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.kgc.kafak.demo.ThreadProducer</mainClass></transformer></transformers><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude><META-I></META-I>NF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>

Spark IPMappingDemo

import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}object SparkDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]").getOrCreate()import spark.implicits._// 加载原始数据val ds: Dataset[String] = spark.read.textFile("file:///F:/JavaTest/SparkIPMappingDemo/data/graphx/input")// 构造一个点RDDval vertices: RDD[(Long, String)] = ds.rdd.flatMap(line => {val fields = line.split(",")// 在spark的图计算api中,点需要表示成一个tuple ==》  (点的唯一标识Long, 点的数据)for(ele <- fields if StringUtils.isNotBlank(ele)) yield (ele.hashCode.toLong,ele)/*Array((fields(0).hashCode.toLong, fields(0)),(fields(1).hashCode.toLong, fields(1)),(fields(2).hashCode.toLong, fields(2)))*/})// 构造一个边RDD// spark graphx中对边的描述结构:   Edge(起始点id,目标点id,边数据)val edges: RDD[Edge[String]] = ds.rdd.flatMap(line => {val fields = line.split(",")/*val lst = new ListBuffer[Edge[String]]()for (i <- 0 to fields.length - 2) {val edge1 = Edge(fields(i).hashCode.toLong, fields(i + 1).hashCode.toLong, "")lst += edge1}lst*/for (i <- 0 to fields.length - 2 if StringUtils.isNotBlank(fields(i))) yield Edge(fields(i).hashCode.toLong, fields(i + 1).hashCode.toLong, "")})
/*    edges.toDF().show()
//    +-----------+-----------+----+
//    |      srcId|      dstId|attr|
//    +-----------+-----------+----+
//    |  149291990|-1432604556|    |
//    |-1432604556|  113568551|    |
//    |  113568551|    1537214|    |
//    |-1515330570|  109203646|    |
//    |  109203646|  113568551|    |
//    |  113568551|    1567005|    |
//    |-1432604556| -774338537|    |
//    | -774338537|    1626587|    |
//    |-1095629956|    3322003|    |
//    |    3322003| 1765324466|    |
//    | 1765324466|   46789743|    |
//    |-1095629956|-1102520969|    |
//    |-1102520969|  113569010|    |
//    |  113569010|    1571810|    |
//    |-1007895461|-1102520968|    |
//    |-1102520968|  113569010|    |
//    |  113569010|    1632353|    |
//    +-----------+-----------+----+ */// 用  点集合  和  边集合  构造一张图val graph = Graph(vertices,edges)// 调用图的算法: 连通子图算法val graph2 = graph.connectedComponents()// 从结果图中,取出图的点集合,即可以得到我们想要的分组结果val vertices2: VertexRDD[VertexId] = graph2.vertices//(点id-0,点数据-0)//(点id-1,点数据-0)//(点id-4,点数据-4)//(点id-5,点数据-4)/	取最小的值* (-1102520968,-1102520969)* (149291990,-1515330570)* (1567005,-1515330570)* (-1095629956,-1102520969)* (113569010,-1102520969)* (-1515330570,-1515330570)* (-774338537,-1515330570)* (113568551,-1515330570)* (1765324466,-1102520969)* (1571810,-1102520969)* (-1432604556,-1515330570)* (3322003,-1102520969)* (46789743,-1102520969)* (-1102520969,-1102520969)* (1632353,-1102520969)* (1626587,-1515330570)* (109203646,-1515330570)* (1537214,-1515330570)* (-1007895461,-1102520969)*///vertices2.take(30).foreach(println)// 将上面得到的的映射关系rdd,收集到Driver端val idmpMap = vertices2.collectAsMap()// 然后作为变量广播出去val bc = spark.sparkContext.broadcast(idmpMap)// 利用这个映射关系结果,来加工原始数据val res = ds.map(line=>{val bc_map = bc.valueval name = line.split(",").filter(StringUtils.isNotBlank(_))(0)val gid = bc_map.get(name.hashCode.toLong).getgid+","+line})res.show(10,false)/* +-------------------------------------------+* |value                                      |* +-------------------------------------------+* |-1515330570,13866558899,zhangsan,wx_hq,2000|* |-1515330570,13877559988,sange,wx_hq,3000   |* |-1515330570,,zhangsan,wx_lhq,5000          |* |-1102520969,13912344678,lisi,wx_lxlt,12000 |* |-1102520969,13912344678,lisi-1,wx_wk,3500  |* |-1102520969,13912664678,lisi-2,wx_wk,5600  |* +-------------------------------------------+/spark.close()}
}

报错:ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\\bin\\winutils.exe in the Hadoop binaries.

 在hadoop服务器上把core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml四个文件放入idea里的src\\main\\resources下

 

注意:如果本地windows没有做hosts 域名指定,需修改配置文件内的地址,将域名改成ip地址或者在本地hosts文件中添加对应的域名与地址。

然后就可以了