> 文章列表 > spark第八章:Pyspark

spark第八章:Pyspark

spark第八章:Pyspark

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作
spark第七章:SparkStreaming实例
spark第八章:Pyspark


文章目录

  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.安装Python3
    • 2.安装Pyspark
    • 3.测试环境
    • 4.提交py任务
  • 二、编写代码
  • 总结

前言

之前我们用scala完成了spark内容的学习,现在我们用Pyspark来进行spark集群操作.


一、环境准备

1.安装Python3

用Pyspark编写文件,自然要用Python的环境,centos7中可以用以下命令安装.

yum install python3

spark第八章:Pyspark
pyspark建议使用Python3.7及以上版本,但是centos7的默认源里边只有3.6,不过做最基本的练习还是够了.

2.安装Pyspark

用pip命令安装.

pip install pyspark==3.2.3

建议版本和自己的spark版本一致

3.测试环境

我这里用的是本地环境测试,没有打开全部集群.
spark第八章:Pyspark
spark第八章:Pyspark
这里就可以写代码了,但是咱们就不再这里写了.

4.提交py任务

这里我们用官方给的案例进行测试,如果没有问题,我们就可以开始学习了.

./bin/spark-submit examples/src/main/python/pi.py 10

spark第八章:Pyspark

二、编写代码

1.安装库

由于咱们在本地编写和测试文件,所以本地也需要Pyspark,但是不需要spark环境.

pip install pyspark==3.2.3

spark第八章:Pyspark
一个全新的虚拟环境

2.core操作

01_WC.py

from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 1.创建sparkContext对象conf = SparkConf().setMaster("local[*]").setAppName("WC")sc = SparkContext(conf=conf)# 2.读取文本数据rdd_init = sc.textFile("/home/atguigu/bigdatas/datas/wc.txt")# 3.数据切割flat_map = rdd_init.flatMap(lambda line: line.split(" "))# 4.数据格式转化rdd_map = flat_map.map(lambda word: (word, 1))# 5数据分组求和res = rdd_map.reduceByKey(lambda a,b:a+b)# 6.打印输出print(res.collect())# 7.关闭输出sc.stop()

spark第八章:Pyspark
从HDFS中读取文件.

from pyspark import SparkContext, SparkConf
import os# 模拟集群用户
os.environ["HADOOP_USER_NAME"] = "atguigu"if __name__ == '__main__':# 1.创建sparkContext对象conf = SparkConf().setMaster("local[*]").setAppName("WC")sc = SparkContext(conf=conf)# 2.读取文本数据rdd_init = sc.textFile("hdfs://192.168.10.102/wc.txt")# 3.数据切割flat_map = rdd_init.flatMap(lambda line: line.split(" "))# 4.数据格式转化rdd_map = flat_map.map(lambda word: (word, 1))# 5数据分组求和res = rdd_map.reduceByKey(lambda a, b: a + b)# 6.输出到HDFSres.saveAsTextFile("hdfs://192.168.10.102/output1")# 7.关闭输出sc.stop()

spark第八章:Pyspark

3.SQL操作

01_WC.py

from pyspark import SparkContext, SparkConf
import osfrom pyspark.sql import SparkSessionos.environ["HADOOP_USER_NAME"] = "atguigu"if __name__ == '__main__':# SparkSQL对象创建spark = SparkSession.builder.master("local[*]").appName("WC").getOrCreate()df = spark.read.format("text").load("datas/wc.txt")df.createTempView("t1")spark.sql("""select * from t1""").show()

spark第八章:Pyspark
成功打印表,可以使用.

02_udf.py
自定义函数

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as Fos.environ["HADOOP_USER_NAME"] = "atguigu"if __name__ == '__main__':spark = SparkSession.builder.master("local[*]").appName("WC").getOrCreate()df_init = spark.createDataFrame([(1, "张三", "北京"), (2, "李四", "上海"), (3, "王五", "深圳"), ],schema="id integer,name String,address String")df_init.createTempView("t1")# 自定义函数def strJoin(str):return "name is " + str# 注册自定义函数strJoin_2 = spark.udf.register("strJoin_1", strJoin, StringType())# 使用函数# 方式1# SQL使用# spark.sql("""#     select id,strJoin_1(name),address from t1# """).show()# DSL使用# df_init.select(df_init["id"],strJoin_2(df_init["name"]),df_init["address"]).show()# 方式2strJoin_3 = F.udf(strJoin, StringType())# 简写@F.udf(returnType=StringType())def strJoin_udf(str):return "name is " + str# DSL使用# df_init.select(df_init["id"],strJoin_3(df_init["name"]),df_init["address"]).show()df_init.select(df_init["id"],strJoin_udf(df_init["name"]),df_init["address"]).show()

spark第八章:Pyspark
02_udf1.py

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as Fos.environ["HADOOP_USER_NAME"] = "atguigu"if __name__ == '__main__':spark = SparkSession.builder.master("local[*]").appName("WC").getOrCreate()df_init = spark.createDataFrame([(1, "张三 北京"), (2, "李四 上海"), (3, "王五 深圳"), ],schema="id integer,nameANDaddress String")df_init.createTempView("t1")def str_split_udf(nameANDaddress: str):arr = nameANDaddress.split(" ")return [arr[0], arr[1]]schema = StructType().add("nn", StringType()).add("ar", StringType())str_split_udf_D = spark.udf.register("str_split_udf", str_split_udf, returnType=schema)# spark.sql("select id,str_split_udf(nameANDaddress).nn ,str_split_udf(nameANDaddress).ar from t1").show()df_init.select("id",str_split_udf_D("nameANDaddress")["nn"]).show()

spark第八章:Pyspark
连接hive
这里要先修改一下hive-site.xml文件
spark第八章:Pyspark
增加远程连接接口,如果集群需要连接hive,将这个文件扔到spark的conf目录.
03_hive.py


from pyspark.sql import SparkSessionif __name__ == '__main__':spark = SparkSession.builder.appName("hive")\\.config("hive.metastore.uris", "thrift://192.168.10.102:9083")\\.config("spark.sql.warehouse.dir", "hdfs://192.168.10.102:8020/usr/hive/warehouse")\\.enableHiveSupport()\\.getOrCreate()spark.sql("show databases").show()

然后启动hive,运行程序.
spark第八章:Pyspark

4.Streaming操作

01_WC.py

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContextif __name__ == '__main__':# 1.创建sparkContext对象conf = SparkConf().setMaster("local[*]").setAppName("WC")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 5)lines = ssc.socketTextStream("127.0.0.1", 9999)result = lines.flatMap(lambda line: line.split(" ")) \\.map(lambda word: (word, 1)) \\.reduceByKey(lambda a, b: a + b)result.pprint()ssc.start()ssc.awaitTermination()

spark第八章:Pyspark

总结

由于咱们之前用scala做过很多案例,所以这利用Python就不写了,只完成最基本的操作即可.