欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flinksql实时查询hudi的数据

时间:2023-07-22
1.版本 组件版本hudi10.0flink13.52、场景:

在flink 中新建一张表(t1)插入数据, 然后同时用过另外一张表进行查询(t2)

场景如图

3、t1 建表

CREATE TABLE t1( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( 'connector' = 'hudi', 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/hudi-t1', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ');

3.1 插入数据

INSERT INTO t1 VALUES('id1','Danny',28,TIMESTAMP '1970-01-01 00:00:01','par1');insert into t1 values ('id9','Danny',18,TIMESTAMP'1970-01-01 00:00:01','par9');INSERT INTO t1 VALUES('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

4.另外开一个窗口创建表t2

CREATE TABLE t2(uuid VARCHAR(20), name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ('connector' = 'hudi','path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/hudi-t1','table.type' = 'MERGE_ON_READ','read.tasks' = '1', 'read.streaming.enabled' = 'true', -- this option enable the streaming read'read.streaming.start-commit' = '20210316134557', -- specifies the start commit instant time'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.);

5、在t1 表插入数据 在t2表中会实时增加

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。