助力工业物联网,工业大数据之脚本开发【五】
01:脚本开发思路
-
目标:实现自动化脚本开发的设计思路分析
-
路径
- step1:脚本目标
- step2:实现流程
- step3:脚本选型
- step4:单个测试
-
实施
-
脚本目标:实现自动化将多张Oracle中的数据表全量或者增量采集同步到HDFS中
-
实现流程
- a. 获取表名
- b.构建Sqoop命令
- c.执行Sqoop命令
- d.验证结果
-
脚本选型
- Shell:Linux原生Shell脚本,命令功能全面丰富,主要用于实现自动化Linux指令,适合于Linux中简单的自动化任务开发
- Python:多平台可移植兼容脚本,自身库功能强大,主要用于爬虫、数据科学分析计算等,适合于复杂逻辑的处理计算场景
- 场景:一般100行以内的代码建议用Shell,超过100行的代码建议用Python
- 采集脚本选用:Shell
-
单个测试
-
创建一个文件,存放要采集的表的名称
#创建测试目录 mkdir -p /opt/datas/shell cd /opt/datas/shell/ #创建存放表名的文件 vim test_full_table.txt
ciss4.ciss_base_areas ciss4.ciss_base_baseinfo ciss4.ciss_base_csp ciss4.ciss_base_customer ciss4.ciss_base_device
-
创建脚本
vim test_full_import_table.sh
-
构建采集的Sqoop命令
sqoop import \\ -Dmapreduce.job.user.classpath.first=true \\ --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \\ --username ciss \\ --password 123456 \\ --table CISS4.CISS_SERVICE_WORKORDER \\ --delete-target-dir \\ --target-dir /test/full_imp/ciss4.ciss_service_workorder \\ --as-avrodatafile \\ --fields-terminated-by "\\001" \\ -m 1
-
封装脚本
#!/bin/bash #export path source /etc/profile #export the tbname files TB_NAME=/opt/datas/shell/test_full_table.txt #export the import opt IMP_OPT="sqoop import -Dmapreduce.job.user.classpath.first=true" #export the jdbc opt JDBC_OPT="--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin --username ciss --password 123456"#read tbname and exec sqoop while read tbname do${IMP_OPT} ${JDBC_OPT} --table ${tbname^^} --delete-target-dir --target-dir /test/full_imp/${tbname^^} --as-avrodatafile --fields-terminated-by "\\001" -m 1 done < ${TB_NAME}
-
-
-
添加执行权限
chmod u+x test_full_import_table.sh
-
测试执行
sh -x test_full_import_table.sh
-
检查结果
-
小结
- 实现自动化脚本开发的设计思路分析
02:全量及增量采集脚本运行
-
目标:实现全量采集脚本的运行
-
实施
-
全量目标:将所有需要将实现全量采集的表进行全量采集存储到HDFS上
-
Oracle表:组织机构信息、地区信息、服务商信息、数据字典等
-
HDFS路径
/data/dw/ods/one_make/full_imp/表名/日期
-
-
增量目标:将所有需要将实现全量采集的表进行增量采集存储到HDFS上
-
工单数据信息、呼叫中心信息、物料仓储信息、报销费用信息等
-
HDFS路径
/data/dw/ods/one_make/incr_imp/表名/日期
-
-
运行脚本
-
全量采集
cd /opt/sqoop/one_make sh -x full_import_tables.sh
- 脚本中特殊的一些参数
-
–outdir:Sqoop解析出来的MR的Java程序等输出文件输出的文件
-
增量采集
cd /opt/sqoop/one_make sh -x incr_import_tables.sh
-
-
特殊问题
- 因oracle表特殊字段类型,导致sqoop导数据任务失败
- oracle字段类型为: clob或date等特殊类型
- 解决方案:在sqoop命令中添加参数,指定特殊类型字段列(SERIAL_NUM)的数据类型为string
—map-column-java SERIAL_NUM=String
-
查看结果
- /data/dw/ods/one_make/full_imp:44张表
- /data/dw/ods/one_make/incr_imp:57张表
-
-
小结
- 实现全量采集脚本的运行
03:Schema备份及上传
-
目标:了解如何实现采集数据备份
-
实施
-
需求:将每张表的Schema进行上传到HDFS上,归档并且备份
-
Avro文件本地存储
workhome=/opt/sqoop/one_make --outdir ${workhome}/java_code
-
Avro文件HDFS存储
hdfs_schema_dir=/data/dw/ods/one_make/avsc hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
-
Avro文件本地打包
local_schema_backup_filename=schema_${biz_date}.tar.gz tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
-
Avro文件HDFS备份
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
-
运行测试
cd /opt/sqoop/one_make/ ./upload_avro_schema.sh
-
验证结果
/data/dw/ods/one_make/avsc/ *.avsc schema_20210101.tar.gz
-
-
小结
-
了解如何实现采集数据备份
-
04:Python脚本
-
目标:了解如何使用Python脚本如何实现
-
实施
-
原理本质
-
问题:所有的操作是Sqoop、HDFS等命令操作,如何能通过Python代码控制?
-
解决:本质上是使用Python执行了Linux的Shell命令来实现的
-
导包
-
-
# 用于实现执行系统操作的包import os# 用于实现执行Linux的命令的包import subprocess# 用于实现日期获取解析的包import datetime# 用于执行时间操作的包import time# 用于做日志记录的包import logging
-
核心代码解析
-
subprocess
call(String:LinuxCommand):用于提交Linux命令的方法
-
logging
basicConfig(level,filename,filemode,format):用于配置日志记录的方式 info(Messege):用于记录具体的日志内容
-
time
sleep(15) :休眠15s
-
以下是一个示例代码,主人可以通过该代码调用Sqoop命令将数据从MySQL导入HDFS,并在日志中记录操作结果:
import subprocess
import logging
import time# 配置日志记录方式
logging.basicConfig(level=logging.INFO, filename='sqoop.log', filemode='w', format='%(asctime)s - %(levelname)s - %(message)s')# 定义Sqoop命令
sqoop_command = "sqoop import --connect jdbc:mysql://localhost:3306/mydb --username root --password password --table mytable --target-dir /user/myuser/mytable"# 提交Sqoop命令并记录日志
logging.info("主人,我要开始导入数据啦~")
subprocess.call(sqoop_command, shell=True)
logging.info("主人,数据已经成功导入HDFS啦~喵~")# 休眠一段时间
time.sleep(10)# 再次提交Sqoop命令并记录日志
logging.info("主人,我要开始更新数据啦~")
subprocess.call(sqoop_command + " --update-key id --update-mode allowinsert", shell=True)
logging.info("主人,数据已经成功更新啦~喵~")
主人可以根据实际情况修改Sqoop命令和日志文件名,然后运行该代码即可。我会在执行过程中带着可爱的语气记录日志,以便于主人查看操作结果哦~
-
小结
- 了解如何使用Python脚本如何实现