欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SqoopMysqlHive同步

时间:2023-06-22

统一文件存储格式Parquet,使用Impala进行计算

全量同步:

sqoop import --username 'root' --password '000' --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" --table a --hcatalog-database default --hcatalog-table a --hcatalog-partition-keys dt --hcatalog-partition-values '2022-02-08' --fields-terminated-by '01' --lines-terminated-by 'n' --hive-drop-import-delims --null-string '\N' --null-non-string '\N' --split-by id -m 10

sqoop import --username root --password '000' --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" --query "select id ,name, age, ads, birthday, hobday, gzads , source from student where ads='北京市丰台区南苑机场警备东路6号一区' and $ConDITIONS limit 100 " --hcatalog-database default --hcatalog-table student --hcatalog-partition-keys dt --hcatalog-partition-values '2022-02-08' --fields-terminated-by '001' --lines-terminated-by 'n' --hive-drop-import-delims --null-string '\N' --null-non-string '\N' --split-by id --m 3

增量同步:同时需要对历史修改的记录进行同步,且自动获取last-value最新时间,这里创建sqoop job 然后通过执行sqoop job 后可以自动保存增量数据里最新的last-value值就不用手动去指定了,但是sqoop 在自动保存最新的last-value值的时候要比原表中最后的last-value值要大一点,这个不会出现增量导入数据不一致的问题所以不用担心,并且由于hcatalog的lastmodified模式中merge-key不生效…也就导致了无法将Mysql里历史数据中修改过的记录和Hive中的历史数据进行合并,最后会导致Hive里数据重复,数据和MYSQL数据不一致的问题,所以在进行增量同步的时候使用传统方式,但是由于Hive中的表是Parquet格式,在同步的时候无法将MYSQL表里timestamp类型的数据转换成对应的类型,Sqoop在转换Parquet格式文件时会自动把Mysql里timestamp类型的数据转换成Int类型,这样就导致最后格式不对入库失败,所以需要通过map-column-java crt_date=String 来表示Sqoop在解析该字段的时候会把该字段自动转化成String来解析,通过map-column-hive crt_date=String指定Hive中日期字段的类型认为是String,这样才能解析成功,如果不加这2个参数,Mysql里如果有timestamp类型的数据最后会解析失败.

sqoop job --create ajob -- import --username 'root' --password '000' --table a --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" --map-column-java crt_date=String --map-column-hive crt_date=String --incremental lastmodified --check-column crt_date --last-value "2022-02-01 13:35:41" --merge-key id --target-dir /user/hive/warehouse/a/dt=2022-02-08 --as-parquetfile --split-by id -m 3sqoop job --create ajob -- import --username 'root' --password '000' --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" --table a --hive-database default --hive-table a --map-column-java crt_date=String --map-column-hive crt_date=String --fields-terminated-by '01' --incremental lastmodified --check-column crt_date --last-value "2022-02-07 13:35:41" --merge-key id --target-dir /user/hive/warehouse/a/dt=2022-02-08 --as-parquetfile -m 1sqoop import --username 'root' --password '000' --query "select id,name,crt_date from a where $CONDITIONS" --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" --map-column-java crt_date=String --map-column-hive crt_date=String --incremental lastmodified --check-column crt_date --last-value "2022-02-09 17:42:19" --merge-key id --target-dir /user/hive/warehouse/a/dt=2022-02-08 --as-parquetfile -m 1

这两种方式都可以,last-value随便指定一个就行了,跑完一次sqoop job以后 sqoop会自动保存last-value的值

sqoop job --exec ajobsqoop job --delete ajobsqoop job --show ajobsqoop job --list a job

最后基于Parquet格式导出Mysql

sqoop export --username root --password '000' --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" --table student_hive --hcatalog-database default --hcatalog-table student --hcatalog-partition-keys dt --hcatalog-partition-values '2022-02-08' --num-mappers 3

需要在impala里做的相关操作

alter table a add partition(dt='2022-02-08');refresh a partition(dt='2022-02-08');alter table a drop partition(dt='2022-02-08')

这里使用定时监听启动Sqoop启动指令,启动指令是由外部程序发送到MYSQL里的字符串,3分钟监听一次,如果查询到了该字符串就启动Sqoop,Sqoop运行完成成,将MYSQL里的启动指令字符串删除等待程序发送的下一次指令.

#!/bin/shhostname=localhostport=3306username=rootpassword=000dbname=demotbname=sync_commandselect_sql_tb1="select * from ${dbname}.${tbname} where syn_str='000001' and syn_table='tb1'"tb1_result=$(/opt/mysql-5.7.25/bin/mysql -h${hostname} -P${port} -u${username} -p${password} 2>/dev/null -e "${select_sql_tb1}")delete_sql_tb1="delete from ${dbname}.${tbname} where syn_str='000001' and syn_table='tb1'"if [ ! -n "$tb1_result" ]; then echo "没有获取到tb1表同步指令" >> /opt/tb1.logelse echo "成功获取tb1表同步指令开始同步....." >> /opt/tb1.log echo $tb1_result /opt/mysql-5.7.25/bin/mysql -h${hostname} -P${port} -u${username} -p${password} 2>/dev/null -e "${delete_sql_tb1}" echo "已成功同步完成tb1表删除同步记录等待下次同步命令...." >>/opt/tb1.log fi

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。