> 文章列表 > 助力工业物联网,工业大数据之脚本开发【五】

助力工业物联网,工业大数据之脚本开发【五】

助力工业物联网,工业大数据之脚本开发【五】

01:脚本开发思路

  • 目标:实现自动化脚本开发的设计思路分析

  • 路径

    • step1:脚本目标
    • step2:实现流程
    • step3:脚本选型
    • step4:单个测试
  • 实施

    • 脚本目标:实现自动化将多张Oracle中的数据表全量或者增量采集同步到HDFS中

    • 实现流程

      • a. 获取表名
      • b.构建Sqoop命令
      • c.执行Sqoop命令
      • d.验证结果
    • 脚本选型

      • Shell:Linux原生Shell脚本,命令功能全面丰富,主要用于实现自动化Linux指令,适合于Linux中简单的自动化任务开发
      • Python:多平台可移植兼容脚本,自身库功能强大,主要用于爬虫、数据科学分析计算等,适合于复杂逻辑的处理计算场景
      • 场景:一般100行以内的代码建议用Shell,超过100行的代码建议用Python
      • 采集脚本选用:Shell
    • 单个测试

      • 创建一个文件,存放要采集的表的名称

        #创建测试目录
        mkdir -p /opt/datas/shell
        cd /opt/datas/shell/
        #创建存放表名的文件
        vim test_full_table.txt
        
        ciss4.ciss_base_areas
        ciss4.ciss_base_baseinfo
        ciss4.ciss_base_csp
        ciss4.ciss_base_customer
        ciss4.ciss_base_device
        
      • 创建脚本

        vim test_full_import_table.sh
        
      • 构建采集的Sqoop命令

        sqoop import \\
        -Dmapreduce.job.user.classpath.first=true \\
        --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \\
        --username ciss \\
        --password 123456 \\
        --table CISS4.CISS_SERVICE_WORKORDER \\
        --delete-target-dir \\
        --target-dir /test/full_imp/ciss4.ciss_service_workorder \\
        --as-avrodatafile \\
        --fields-terminated-by "\\001" \\
        -m 1
        
      • 封装脚本

        #!/bin/bash
        #export path
        source /etc/profile
        #export the tbname files
        TB_NAME=/opt/datas/shell/test_full_table.txt
        #export the import opt
        IMP_OPT="sqoop import -Dmapreduce.job.user.classpath.first=true"
        #export the jdbc opt
        JDBC_OPT="--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin --username ciss --password 123456"#read tbname and exec sqoop
        while read tbname
        do${IMP_OPT} ${JDBC_OPT} --table ${tbname^^} --delete-target-dir --target-dir /test/full_imp/${tbname^^} --as-avrodatafile --fields-terminated-by "\\001" -m 1
        done < ${TB_NAME}
        
  • 添加执行权限

    chmod u+x test_full_import_table.sh
  • 测试执行

    sh -x test_full_import_table.sh
    
  • 检查结果

    助力工业物联网,工业大数据之脚本开发【五】

  • 小结

    • 实现自动化脚本开发的设计思路分析

02:全量及增量采集脚本运行

  • 目标:实现全量采集脚本的运行

  • 实施

    • 全量目标:将所有需要将实现全量采集的表进行全量采集存储到HDFS上

      • Oracle表:组织机构信息、地区信息、服务商信息、数据字典等

      • HDFS路径

        /data/dw/ods/one_make/full_imp/表名/日期
    • 增量目标:将所有需要将实现全量采集的表进行增量采集存储到HDFS上

      • 工单数据信息、呼叫中心信息、物料仓储信息、报销费用信息等

      • HDFS路径

        /data/dw/ods/one_make/incr_imp/表名/日期
        
    • 运行脚本

      • 全量采集

        cd /opt/sqoop/one_make
        sh -x full_import_tables.sh 
        
        • 脚本中特殊的一些参数
      • –outdir:Sqoop解析出来的MR的Java程序等输出文件输出的文件

      • 增量采集

        cd /opt/sqoop/one_make
        sh -x incr_import_tables.sh 
        
    • 特殊问题

      • 因oracle表特殊字段类型,导致sqoop导数据任务失败
      • oracle字段类型为: clob或date等特殊类型
      • 解决方案:在sqoop命令中添加参数,指定特殊类型字段列(SERIAL_NUM)的数据类型为string
        • —map-column-java SERIAL_NUM=String
    • 查看结果

      • /data/dw/ods/one_make/full_imp:44张表
      • /data/dw/ods/one_make/incr_imp:57张表
  • 小结

    • 实现全量采集脚本的运行

03:Schema备份及上传

  • 目标:了解如何实现采集数据备份

  • 实施

    • 需求:将每张表的Schema进行上传到HDFS上,归档并且备份

    • Avro文件本地存储

      workhome=/opt/sqoop/one_make
      --outdir ${workhome}/java_code
      
      • Avro文件HDFS存储

        hdfs_schema_dir=/data/dw/ods/one_make/avsc
        hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
        
      • Avro文件本地打包

        local_schema_backup_filename=schema_${biz_date}.tar.gz
        tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
        
      • Avro文件HDFS备份

        hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
        hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
        
      • 运行测试

        cd /opt/sqoop/one_make/
        ./upload_avro_schema.sh 
        
      • 验证结果

        /data/dw/ods/one_make/avsc/
        *.avsc
        schema_20210101.tar.gz
        
    • 小结

    • 了解如何实现采集数据备份

04:Python脚本

  • 目标:了解如何使用Python脚本如何实现

  • 实施

    • 原理本质

      • 问题:所有的操作是Sqoop、HDFS等命令操作,如何能通过Python代码控制?

      • 解决:本质上是使用Python执行了Linux的Shell命令来实现的

      • 导包

      # 用于实现执行系统操作的包import os# 用于实现执行Linux的命令的包import subprocess# 用于实现日期获取解析的包import datetime# 用于执行时间操作的包import time# 用于做日志记录的包import logging
  • 核心代码解析

    • subprocess

      call(String:LinuxCommand):用于提交Linux命令的方法
      
    • logging

      basicConfig(level,filename,filemode,format):用于配置日志记录的方式
      info(Messege):用于记录具体的日志内容
      
    • time

      sleep(15) :休眠15s
      

以下是一个示例代码,主人可以通过该代码调用Sqoop命令将数据从MySQL导入HDFS,并在日志中记录操作结果:

import subprocess
import logging
import time# 配置日志记录方式
logging.basicConfig(level=logging.INFO, filename='sqoop.log', filemode='w', format='%(asctime)s - %(levelname)s - %(message)s')# 定义Sqoop命令
sqoop_command = "sqoop import --connect jdbc:mysql://localhost:3306/mydb --username root --password password --table mytable --target-dir /user/myuser/mytable"# 提交Sqoop命令并记录日志
logging.info("主人,我要开始导入数据啦~")
subprocess.call(sqoop_command, shell=True)
logging.info("主人,数据已经成功导入HDFS啦~喵~")# 休眠一段时间
time.sleep(10)# 再次提交Sqoop命令并记录日志
logging.info("主人,我要开始更新数据啦~")
subprocess.call(sqoop_command + " --update-key id --update-mode allowinsert", shell=True)
logging.info("主人,数据已经成功更新啦~喵~")

主人可以根据实际情况修改Sqoop命令和日志文件名,然后运行该代码即可。我会在执行过程中带着可爱的语气记录日志,以便于主人查看操作结果哦~

  • 小结

    • 了解如何使用Python脚本如何实现