欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

用sqoop将Oracle数据全量/增量导入到HIve的Shell/python脚本

时间:2023-05-03
Shell脚本

全量导入

#!/usr/bin/env bash# /bin/bashbiz_date=20210101biz_fmt_date=2021-01-01dw_parent_dir=/data/dw/ods/one_make/full_impworkhome=/opt/sqoop/one_makefull_imp_tables=${workhome}/full_import_tables.txtmkdir ${workhome}/logorcl_srv=oracle.bigdata.cnorcl_port=1521orcl_sid=helowinorcl_user=cissorcl_pwd=123456sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"# load hadoop/sqoop envsource /etc/profilewhile read p; do # parallel execution import ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 & cur_time=`date "+%F %T"` echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_full_imp.log sleep 30done < ${full_imp_tables}# 如果使用了MR的Uber模式,必须在程序中加上以下-Dmapreduce.job.user.classpath.first=true 避免类冲突问题#--outdir:Sqoop解析出来的MR的Java程序等输出文件输出的文件# p^^ 变成 大写# cur_time=`date "+%F %T"` 获取当前时间# sleep 30 防止内存资源不足

增量导入

#!/usr/bin/env bash# 编写SHELL脚本的时候要特别小心,特别是编写SQL的条件,如果中间加了空格,就会导致命令执行失败# /bin/bashbiz_date=20210101biz_fmt_date=2021-01-01dw_parent_dir=/data/dw/ods/one_make/incr_impworkhome=/opt/sqoop/one_makeincr_imp_tables=${workhome}/incr_import_tables.txtorcl_srv=oracle.bigdata.cnorcl_port=1521orcl_sid=helowinorcl_user=cissorcl_pwd=123456mkdir ${workhome}/logsqoop_condition_params="--where "'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')""sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"# load hadoop/sqoop envsource /etc/profilewhile read p; do # clean old directory in HDFS hdfs dfs -rm -r ${dw_parent_dir}/${p}/${biz_date} # parallel execution import ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 & cur_time=`date "+%F %T"` echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} ${sqoop_condition_params} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_incr_imp.log sleep 30 done < ${incr_imp_tables}

执行

cd /opt/sqoop/one_make执行前,需要更改时间biz_date和biz_fmt_datesh -x full_import_tables.sh

python 脚本

全量导入

#!/usr/bin/env python# @Time : 2021/7/14 15:34# @desc :__coding__ = "utf-8"__author__ = "itcast"import osimport subprocessimport datetimeimport timeimport loggingbiz_date = '20210101'biz_fmt_date = '2021-01-01'dw_parent_dir = '/data/dw/ods/one_make/full_imp'workhome = '/opt/sqoop/one_make'full_imp_tables = workhome + '/full_import_tables.txt'if os.path.exists(workhome + '/log'): os.system('make ' + workhome + '/log')orcl_srv = 'oracle.bigdata.cn'orcl_port = '1521'orcl_sid = 'helowin'orcl_user = 'ciss'orcl_pwd = '123456'sqoop_import_params = 'sqoop import -Dmapreduce.job.user.classpath.first=true --outdir %s/java_code --as-avrodatafile' % workhomesqoop_jdbc_params = '--connect jdbc:oracle:thin:@%s:%s:%s --username %s --password %s' % (orcl_srv, orcl_port, orcl_sid, orcl_user, orcl_pwd)# load hadoop/sqoop envsubprocess.call("source /etc/profile", shell=True)print('executing...')# read filefr = open(full_imp_tables)for line in fr.readlines(): tblName = line.rstrip('n') # parallel execution import # ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 & # sqoopimportCommand = f''' {sqoop_import_params} {sqoop_jdbc_params} --target-dir {dw_parent_dir}/{tblName}/{biz_date} --table {tblName.upper()} -m 1 &''' sqoopimportCommand = ''' %s %s --target-dir %s/%s/%s --table %s -m 1 & ''' % (sqoop_import_params, sqoop_jdbc_params, dw_parent_dir, tblName, biz_date, tblName.upper()) # parallel execution import subprocess.call(sqoopimportCommand, shell=True) # cur_time=`date "+%F %T"` # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') logging.basicConfig(level=logging.INFO, # 控制台打印的日志级别 filename='%s/log/%s_full_imp.log' % (workhome, biz_fmt_date), # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志; a是追加模式,默认如果不写的话,就是追加模式 filemode='a', # 日志格式 format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') # logging.info(cur_time + ' : ' + sqoopimportCommand) logging.info(sqoopimportCommand) time.sleep(15)

增量导入

#!/usr/bin/env python# @Time : 2021/7/20 15:19# @desc :__coding__ = "utf-8"__author__ = "itcast"import osimport subprocessimport datetimeimport timeimport loggingbiz_date = '20210101'biz_fmt_date = '2021-01-01'dw_parent_dir = '/data/dw/ods/one_make/incr_imp'workhome = '/opt/sqoop/one_make'incr_imp_tables = workhome + '/incr_import_tables.txt'if os.path.exists(workhome + '/log'): os.system('make ' + workhome + '/log')orcl_srv = 'oracle.bigdata.cn'orcl_port = '1521'orcl_sid = 'helowin'orcl_user = 'ciss'orcl_pwd = '123456'sqoop_import_params = 'sqoop import -Dmapreduce.job.user.classpath.first=true --outdir %s/java_code --as-avrodatafile' % workhomesqoop_jdbc_params = '--connect jdbc:oracle:thin:@%s:%s:%s --username %s --password %s' % (orcl_srv, orcl_port, orcl_sid, orcl_user, orcl_pwd)# load hadoop/sqoop envsubprocess.call("source /etc/profile", shell=True)print('executing...')# read filefr = open(incr_imp_tables)for line in fr.readlines(): tblName = line.rstrip('n') # clean old directory in HDFS hdfs_command = 'hdfs dfs -rm -r %s/%s/%s' % (dw_parent_dir, tblName, biz_date) # parallel execution import # ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 & # sqoopimportCommand = f''' {sqoop_import_params} {sqoop_jdbc_params} --target-dir {dw_parent_dir}/{tblName}/{biz_date} --table {tblName.upper()} -m 1 &''' sqoopimportCommand = ''' %s %s --target-dir %s/%s/%s --table %s -m 1 & ''' % (sqoop_import_params, sqoop_jdbc_params, dw_parent_dir, tblName, biz_date, tblName.upper()) # parallel execution import subprocess.call(sqoopimportCommand, shell=True) # cur_time=`date "+%F %T"` # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') logging.basicConfig(level=logging.INFO, filename='%s/log/%s_full_imp.log' % (workhome, biz_fmt_date), filemode='a', format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') # logging.info(cur_time + ' : ' + sqoopimportCommand) logging.info(sqoopimportCommand) time.sleep(15)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。