欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink写数据到hudi中,hive读取

时间:2023-05-12
flink hive on hudi 手动创建表

文档 : https://www.yuque.com/docs/share/879349ce-7de4-4284-9126-9c2a3c93a91d?#%20%E3%80%8AHive%20On%20Hudi%E3%80%8B

在 /data/app/hive/auxlib 目录放入hudi jar–>hudi-hadoop-mr-bundle-0.10.0.jar或者 修改配置项 hive-site.xml

hive.default.aux.jars.path hive.aux.jars.path// 示例: hive.default.aux.jars.pathfile:///mypath/hudi-hadoop-mr-bundle-0.9.0xxx.jar,file:///mypath/hudi-hive-sync-bundle-0.9.0xx.jar

创建对应的 方法 与表

//flink中 创建 MERGE_ON_READ 表,需自己在hive 中创建表映射 String sinkDDL ="CREATE TABLE hudi_datagen_mor ( " + "id INT Primary key, " + "userid INT, " + "amt BIGINT, " + "proctime TIMESTAMP(3) " + " ) WITH ( " + "'connector' = 'hudi', " + "'path' = 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor', " + "'table.type' = 'MERGE_ON_READ', " + "'write.bucket_assign.tasks' = '1'," + "'write.tasks' = '1', " + "'compaction.tasks' = '1' " ")"; //hive中 手动创建外部表 MERGE_ON_READ 模式//方式一:INPUTFORMAT是org.apache.hudi.hadoop.HoodieParquetInputFormat//这种方式只会查询出来parquet数据文件中的内容,但是刚刚更新或者删除的数据不能查出来CREATE EXTERNAL TABLE `hudi_datagen_mor`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint, `userid` bigint, `amt` bigint, `proctime` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor'; // 方式二 : INPUTFORMAT是org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat // 这种方式是能够实时读出来写入的数据,会将基于Parquet的基础列式文件、和基于行的Avro日志文件合并在一起呈现给用户。CREATE EXTERNAL TABLE `hudi_datagen_mor_2`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` int, `userid` int, `amt` bigint, `proctime` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor'TBLPROPERTIES ( 'bucketing_version'='2', 'transient_lastDdlTime'='1642573236')

如果是分区表 则 需要 加载数据

// 添加分区alter table hudi_datagen_mor add if not exists partition(`partition`='20210414') location 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor/20210414'; alter table hudi_datagen_mor add if not exists partition(`partition`='20210414') location 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor/20210414'; // 查询分区的数据select * from hudi_datagen_mor where `partition`=20210414;select * from hudi_datagen_mor where `partition`=20210414;

flink hive on hudi 自动创建表

文档 : https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#

开启hive 服务

// 依据 hive_sync.mode 选择开启 不同 服务// 1、hmsnohup hive --service metastore &// 2、jdbc 默认nohup hive --service metastore &nohup hive --service hiveserver2 &

flink中 自动创建表语句

String sinkDDL ="CREATE TABLE hudi_datagen_mor ( " + "id INT Primary key, " + "userid INT, " + "amt BIGINT, " + "proctime TIMESTAMP(3) " + " ) WITH ( " + "'connector' = 'hudi', " + "'path' = 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor', " + "'table.type' = 'MERGE_ON_READ', " + "'write.bucket_assign.tasks' = '1'," + "'write.tasks' = '1', " + "'compaction.tasks' = '1', " + //开启自动创建表 "'hive_sync.enable' = 'true', " + "'hive_sync.mode' = 'hms', " + "'hive_sync.table' = 'hudi_datagen_mor3', " + //连接hive的方式 默认是jdbc "'hive_sync.db' = 'ods', " + "'hive_sync.metastore.uris' = 'thrift://hadoop001:9083', " + //是否开启流读 "'read.streaming.enabled' = 'true' ," + "'read.streaming.check-interval' = '4'" + ")"; //jdbc 模式 二选一 'hive_sync.metastore.uris' = 'thrift://hadoop001:9083', -- required, metastore的端口 'hive_sync.jdbc_url'='jdbc:hive2://hadoop001:10000', -- required, hiveServer地址 'hive_sync.table'='hudi_datagen_mor3', -- required, hive 新建的表名 'hive_sync.db'='ods', -- required, hive 新建的数据库名 'hive_sync.username'='root', -- required, HMS 用户名 'hive_sync.password'='123456' // table.type' = 'MERGE_ON_READ' hive中 自动创建2张表// rt表--实时读取checkpoint之后的数据 // ro表-- 读取未合并 之前的数据,即 parquet数据文件中的内容// table.type' = 'COPY_ON_WRITE' hive中 自动创建 hudi_datagen_cow_ro 一张表CREATE EXTERNAL TABLE `ods.hudi_datagen_mor3_rt`( `_hoodie_commit_time` string COMMENT '', `_hoodie_commit_seqno` string COMMENT '', `_hoodie_record_key` string COMMENT '', `_hoodie_partition_path` string COMMENT '', `_hoodie_file_name` string COMMENT '', `id` int COMMENT '', `userid` int COMMENT '', `amt` bigint COMMENT '', `proctime` bigint COMMENT '')ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='false', 'path'='hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3'TBLPROPERTIES ( 'last_commit_time_sync'='20220119153209172', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"userid","type":"integer","nullable":true,"metadata":{}},{"name":"amt","type":"long","nullable":true,"metadata":{}},{"name":"proctime","type":"timestamp","nullable":true,"metadata":{}}]}', 'transient_lastDdlTime'='1642576441'); CREATE EXTERNAL TABLE `ods.hudi_datagen_mor3_ro`( `_hoodie_commit_time` string COMMENT '', `_hoodie_commit_seqno` string COMMENT '', `_hoodie_record_key` string COMMENT '', `_hoodie_partition_path` string COMMENT '', `_hoodie_file_name` string COMMENT '', `id` int COMMENT '', `userid` int COMMENT '', `amt` bigint COMMENT '', `proctime` bigint COMMENT '')ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='true', 'path'='hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor3'TBLPROPERTIES ( 'last_commit_time_sync'='20220119153209172', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"userid","type":"integer","nullable":true,"metadata":{}},{"name":"amt","type":"long","nullable":true,"metadata":{}},{"name":"proctime","type":"timestamp","nullable":true,"metadata":{}}]}', 'transient_lastDdlTime'='1642576441') // hudi_datagen_cow_ro CREATE EXTERNAL TABLE `hudi_datagen_cow_ro`( `_hoodie_commit_time` string COMMENT '', `_hoodie_commit_seqno` string COMMENT '', `_hoodie_record_key` string COMMENT '', `_hoodie_partition_path` string COMMENT '', `_hoodie_file_name` string COMMENT '', `id` int COMMENT '', `userid` int COMMENT '', `amt` bigint COMMENT '', `proctime` bigint COMMENT '')ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='true', 'path'='hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs://hadoop001:9000/flink-hudi/hudi_datagen_mor'TBLPROPERTIES ( 'last_commit_time_sync'='20220217144235326', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"userid","type":"integer","nullable":true,"metadata":{}},{"name":"amt","type":"long","nullable":true,"metadata":{}},{"name":"proctime","type":"timestamp","nullable":true,"metadata":{}}]}', 'transient_lastDdlTime'='1645080237')

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。