> 文章列表 > 【Python学习笔记】4. Python大数据编程入门

【Python学习笔记】4. Python大数据编程入门

【Python学习笔记】4. Python大数据编程入门

4. Python大数据编程入门

    • 4.1 Python操作MySQL
    • 4.2 Spark与PySpark
      • 4.2.1 PySpark基础
      • 4.2.2 数据输入
        • 4.2.2.1 Python数据容器转换为RDD对象
        • 4.2.2.2 读取文本文件得到RDD对象
      • 4.2.3 数据计算
        • 4.2.3.1 map算子
        • 4.2.3.2 flatMap算子
        • 4.2.3.3 reduceByKey算子
        • 4.2.3.4 案例:单词计数
        • 4.2.3.5 filter算子
        • 4.2.3.6 distinct算子
        • 4.2.3.7 sortBy算子
        • 4.2.3.8 案例:货物售卖统计
      • 4.2.4 数据输出
        • 4.2.4.1 collect算子
        • 4.2.4.2 reduce算子
        • 4.2.4.3 take算子
        • 4.2.4.4 count算子
        • 4.2.4.5 输出到文件(saveAsTextFile算子)
      • 4.2.5 案例:搜索引擎日志分析

4.1 Python操作MySQL

首先需要安装pymysql外部包

from pymysql import Connection# 构建到MySQL数据库的连接
connection = Connection(host="localhost",port=3306,user="root",password="123456"
)# 获取游标
cunsor = connection.cursor()
# 选择数据库
connection.select_db("test")
# 执行sql
cunsor.execute("select * from people")
# 获取查询结果
results : tuple = cunsor.fetchall()
for result in results:print(result)
# 关闭连接
connection.close()

【Python学习笔记】4. Python大数据编程入门

​ 在插入数据时,需要手动提交事务

connection.commit()

​ 或者在Connection构造方法中传参

autocommit=True

4.2 Spark与PySpark

4.2.1 PySpark基础

  • Spark是Apache基金会旗下的顶级开源项目,用于对海量数据进行大规模分布式计算。

  • PySpark是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发

  • PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。

  • 国内代理镜像网站(清华大学源)
    pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

  • PySpark的编程,主要分为三个步骤

    【Python学习笔记】4. Python大数据编程入门

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类 SparkContext 的类对象

SparkContext类对象,是PySpark编程中一切功能的入口。

from pyspark import SparkConf, SparkContext# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")# 基于SparkConf对象创建SparkContext对象
sc = SparkContext(conf=conf)# 打印PySpark的运行版本
print(sc.version)# 停止SparkContext运行(停止PySpark程序)
sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.2 数据输入

  • PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

  • RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

  • RDD对象称之为分布式弹性数据集,是PySpark中数据计算的载体,它可以:

    • 提供数据存储
    • 提供数据计算的各类方法
    • 数据计算的方法,返回值依旧是RDD(RDD迭代计算

4.2.2.1 Python数据容器转换为RDD对象

from pyspark import SparkConf, SparkContext# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 通过SparkContext的parallelize成员方法,将Python数据容器转换为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})# 如果要查看RDD中的内容,需要调用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())# 停止SparkContext运行(停止PySpark程序)
sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.2.2 读取文本文件得到RDD对象

from pyspark import SparkConf, SparkContext# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 通过SparkContext的textFile成员方法,读取文本文件得到RDD对象
rdd = sc.textFile("D:\\\\MyStudy\\\\MyCode\\\\PycharmProjects\\\\py_mysql\\\\test.txt")
print(rdd.collect())# 停止SparkContext运行(停止PySpark程序)
sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3 数据计算

4.2.3.1 map算子

  • map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# 通过map方法将全部数据都乘以十,链式调用
rdd2 = rdd.map(lambda data: data * 10).map(lambda data: data + 5)
print(rdd2.collect())# 停止SparkContext运行(停止PySpark程序)
sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.2 flatMap算子

  • 对rdd执行map操作,然后进行解除嵌套操作。
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"# 创建SparkConf对象和SparkContext对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["this is", "python 310", "flatmap", "rdd spark"])# 通过map方法将全部数据都乘以十,链式调用
rdd2 = rdd.flatMap(lambda data: data.split(" "))
print(rdd2.collect())# 停止SparkContext运行(停止PySpark程序)
sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.3 reduceByKey算子

  • 针对KV型RDD,自动按照key分组**,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize([('boy', 99), ('boy', 88), ('boy', 77), ('girl', 99), ('girl', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.4 案例:单词计数

提前设置好一个等待读取的txt文档:

spark python java zhang rdd python rdd
python java cpp c cpp spark pyspark zhang
rdd c python py pyspark python pp rdd
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 读取数据文件
rdd = sc.textFile("D:\\\\MyStudy\\\\MyCode\\\\PycharmProjects\\\\py_mysql\\\\test.txt")# 取出全部单词
word_rdd = rdd.flatMap(lambda data: data.split(" "))# 将单词转换为二元元组,单词为key,value设置为1
word_one_rdd = word_rdd.map(lambda word: (word, 1))# 分组求和
result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)# 打印结果
print(result_rdd.collect())sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.5 filter算子

  • 接受一个处理函数,可用lambda快速编写

  • 函数对RDD数据逐个处理,得到True的保留至返回值的RDD中

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 取出列表中的奇数
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd2 = rdd.filter(lambda num: num % 2 == 0)print(rdd2.collect())sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.6 distinct算子

  • 去重,返回新RDD对象
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 1, 2, 6, 7, 3])
rdd2 = rdd.distinct()print(rdd2.collect())sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.7 sortBy算子

  • 对RDD数据进行排序,基于指定的排序依据
  • 对单词统计案例中的结果进行排序
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.textFile("D:\\\\MyStudy\\\\MyCode\\\\PycharmProjects\\\\py_mysql\\\\test.txt")
word_rdd = rdd.flatMap(lambda data: data.split(" "))
word_one_rdd = word_rdd.map(lambda word: (word, 1))
result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)result_rdd_sortBy = result_rdd.sortBy(lambda t: t[1], ascending=False, numPartitions=1)print(result_rdd_sortBy.collect())sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.3.8 案例:货物售卖统计

需求:把下述内容写入txt文件,使用Spark读取文件进行计算

  • 各个城市销售额排名,从大到小

  • 全部城市,有哪些商品类别在售卖

  • 北京市有哪些商品类别在售卖

{"id":1,"timestamp":"2019-05-08T01:03.00Z","category":"平板电脑","areaName":"北京","money":"1450"}|{"id":2,"timestamp":"2019-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}|{"id":3,"timestamp":"2019-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":"2019-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}|{"id":5,"timestamp":"2019-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}|{"id":6,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"杭州","money":"1550"}
{"id":7,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"北京","money":"5611"}|{"id":8,"timestamp":"2019-05-08T03:01.00Z","category":"家电","areaName":"北京","money":"4410"}|{"id":9,"timestamp":"2019-05-08T01:03.00Z","category":"家具","areaName":"郑州","money":"1120"}
{"id":10,"timestamp":"2019-05-08T01:01.00Z","category":"家具","areaName":"北京","money":"6661"}|{"id":11,"timestamp":"2019-05-08T05:03.00Z","category":"家具","areaName":"杭州","money":"1230"}|{"id":12,"timestamp":"2019-05-08T01:01.00Z","category":"书籍","areaName":"北京","money":"5550"}
{"id":13,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"5550"}|{"id":14,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"1261"}|{"id":15,"timestamp":"2019-05-08T03:03.00Z","category":"电脑","areaName":"杭州","money":"6660"}
{"id":16,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"天津","money":"6660"}|{"id":17,"timestamp":"2019-05-08T01:03.00Z","category":"书籍","areaName":"北京","money":"9000"}|{"id":18,"timestamp":"2019-05-08T05:01.00Z","category":"书籍","areaName":"北京","money":"1230"}
{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}
{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}
{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}
{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}
{"id":27,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"5600"}|{"id":28,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"8000"}|{"id":29,"timestamp":"2019-05-08T02:03.00Z","category":"服饰","areaName":"杭州","money":"7000"}
from pyspark import SparkConf, SparkContext
import os
import jsonos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)# 公共操作
# 打开文件并获取每个json字符串
file_rdd = sc.textFile("D:\\\\MyStudy\\\\MyCode\\\\PycharmProjects\\\\py_mysql\\\\data.txt")
json_str_rdd = file_rdd.flatMap(lambda data: data.split("|"))
# 通过json类的类方法转化为字典对象
dict_rdd = json_str_rdd.map(lambda data: json.loads(data))# TODO 需求1:城市销售额排名
# 清洗数据,保留字典中的有用信息
clear_data_rdd = dict_rdd.map(lambda data: (data['areaName'], int(data['money'])))
# 按照地名,聚合数据,并进行排序
aggregation_rdd = clear_data_rdd.reduceByKey(lambda a, b: a + b)
aggregation_sort_rdd = aggregation_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
# 打印结果
print(f"城市销售额排名为:{aggregation_sort_rdd.collect()}")# TODO 需求2:全部城市中有哪些商品在售卖
# 清洗数据,保留字典中的有用信息
clear_data_rdd = dict_rdd.map(lambda data: (data['category']))
# 去重
category_rdd = clear_data_rdd.distinct()
# 打印结果
print(f"所有正在售卖的商品有:{category_rdd.collect()}")# TODO 需求3:北京有哪些商品在售卖
# 过滤数据,保留满足地名为北京的json数据
bj_rdd = dict_rdd.filter(lambda data: data['areaName'] == '北京')
# 清洗数据,保留字典中的有用信息
clear_data_rdd = bj_rdd.map(lambda data: (data['category']))
# 去重
category_rdd = clear_data_rdd.distinct()
# 打印结果
print(f"北京正在售卖的商品有:{category_rdd.collect()}")# 公共操作
sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.4 数据输出

  • collect:将RDD内容转换为list

  • reduce:对RDD内容进行自定义聚合

  • take:取出RDD的前N个元素组成list

  • count:统计RDD元素个数

4.2.4.1 collect算子

  • 将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
print(type(rdd.collect()))sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.4.2 reduce算子

  • 对RDD数据集按照你传入的逻辑进行聚合
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])
num = rdd.reduce(lambda a, b: a + b)
print(num)sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.4.3 take算子

  • 取RDD的前N个元素,组合成list返回
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.take(3)
print(rdd2)sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.4.4 count算子

  • 计算RDD有多少条数据,返回值是一个数字
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.count()
print(rdd2)sc.stop()

【Python学习笔记】4. Python大数据编程入门

4.2.4.5 输出到文件(saveAsTextFile算子)

  • 将RDD的数据写入文本文件中

  • 支持本地写出,hdfs等文件系统

  • 调用保存文件的算子,需要配置Hadoop依赖

    • 下载Hadoop安装包

      • http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
    • 解压到电脑任意位置

    • 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’

    • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内

      • https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
    • 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内

      • https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
  • 修改rdd分区为1个

    • case1:SparkConf对象设置属性全局并行度为1:

      conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
      conf.set("spark.default.parallelism", "1")
      sc = SparkContext(conf=conf)
      
    • case2:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

      rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
      # 或者
      rdd2 = sc.parallelize([1, 2, 3, 4, 5], 1)rdd1.saveAsTextFile("rdd1")
      rdd2.saveAsTextFile("rdd2")
      

【Python学习笔记】4. Python大数据编程入门

4.2.5 案例:搜索引擎日志分析

  • 读取文件转换成RDD,并完成:
    • 打印输出:热门搜索时间段(小时精度)Top3
    • 打印输出:热门搜索词Top3
    • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
    • 将数据转换为JSON格式,写出为文件
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:\\\\MyStudy\\\\Environment\\\\Python\\\\Python310\\\\python.exe"
os.environ['HADOOP_HOME'] = "D:\\\\MyStudy\\\\Environment\\\\Hadoop\\\\spark-3.3.1-bin-hadoop3"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)# 读取文件转化为RDD
file_rdd = sc.textFile("D:\\\\MyStudy\\\\MyCode\\\\PycharmProjects\\\\py_mysql\\\\search_log.txt")# TODO 需求1:热门搜索时间段(小时精度)Top3
# 将rdd返回的list中的每个元素按\\t划分,列表嵌套
# 每行(内层列表)第0个元素即为时间,如:00:00:00,且前两位为小时
# 将小时组装为元组,并按照小时进行聚合
# 按照出现总次数进行降序排序,取前三位即为TOP3
result1 = file_rdd.map(lambda x: x.split("\\t")). \\map(lambda x: x[0][:2]). \\map(lambda x: (x + "点", 1)). \\reduceByKey(lambda a, b: a + b). \\sortBy(lambda x: x[1], ascending=False, numPartitions=1). \\take(3)print(f"热门搜索时间段(小时精度)Top3:{result1}")# TODO 需求2:热门搜索词Top3
result2 = file_rdd.map(lambda x: (x.split("\\t")[2], 1)). \\reduceByKey(lambda a, b: a + b). \\sortBy(lambda x: x[1], ascending=False, numPartitions=1). \\take(3)print(f"热门搜索词Top3:{result2}")# TODO 需求3:统计黑马程序员关键字在哪个时段被搜索最多
result3 = file_rdd.map(lambda x: x.split("\\t")). \\filter(lambda x: x[2] == "黑马程序员"). \\map(lambda x: (x[0][:2], 1)). \\reduceByKey(lambda a, b: a + b). \\sortBy(lambda x: x[1], ascending=False, numPartitions=1). \\take(1)print(f"统计黑马程序员关键字在哪个时段被搜索最多:{result3}")# TODO 需求4:将数据转换为JSON格式,写出为文件
file_rdd.map(lambda x: x.split("\\t")). \\map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}). \\saveAsTextFile("output_json")print("文件已写出")sc.stop()

【Python学习笔记】4. Python大数据编程入门