FlinkCDC-Hudi:Mysql数据实时入湖全攻略一:初试风云
FlinkCDC-Hudi:Mysql数据实时入湖全攻略二:Hudi与Spark整合时所遇异常与解决方案
FlinkCDC-Hudi:Mysql数据实时入湖全攻略三:探索实现FlinkCDC mysql 主从库同步高可用
FlinkCDC-Hudi:Mysql数据实时入湖全攻略四:两种FlinkSql kafka connector的特征与应用
到目前为止,我们掌握了FlinkCDC分别同步数据到Hudi和Kafka的方法。
在生产应用中,会存在同时同步入湖和入kafka的需求。一般会怎么实现呢?
mysql> create table mysql_test_1(id bigint primary key not enforced,data String,create_time Timestamp(3),) with ('connector'='mysql-cdc','hostname'='mysqlhost','port'='3306','server-id'='5800-5804','username'='user','password'='user_password','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='flink_cdc','table-name'='test_1');
2.2 hudi ddlcreate table hudi_test_1(id bigint,data String,create_time Timestamp(3),PRIMARY KEY (`id`) NOT ENFORCED)with('connector'='hudi','path'='hdfs://hdfs-namespace/tmp/flink/cdcdata/hudi_test_1','hoodie.datasource.write.recordkey.field'='id','hoodie.parquet.max.file.size'='268435456','write.precombine.field'='create_time','write.tasks'='1','write.bucket_assign.tasks'='1','write.task.max.size'='1024','write.rate.limit'='30000','table.type'='MERGE_ON_READ','compaction.tasks'='1','compaction.async.enabled'='true','compaction.delta_commits'='1','compaction.max_memory'='500','changelog.enabled'='true','read.streaming.enabled'='true','read.streaming.check.interval'='3','hive_sync.enable'='true','hive_sync.mode'='hms','hive_sync.metastore.uris'='thrift://hiveserver2:9083','hive_sync.db'='test','hive_sync.table'='hudi_test_1','hive_sync.username'='flinkcdc','hive_sync.support_timestamp'='true');
2.3 kafka ddlcreate table upsert_kafka_test_1( id bigint, data String, create_time Timestamp(3), PRIMARY KEY (`id`) NOT ENFORCED ) with ( 'connector'='upsert-kafka', 'topic'='test1', 'properties.bootstrap.servers' = 'broker:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'key.format'='json', 'value.format'='json' );
二、需求实现面对这个需求,我们可以使用数据pipeline,先将数据从mysql同步到kafka,再从kafka读取数据同步到hudi。
Flink SQL> insert into upsert_kafka_test_1 select * from mysql_test_1;Flink SQL> insert into hudi_test_1 select * from upsert_kafka_test_1 ;
另一种实现是,从Mysql同步后,分别输出到kafka 和hudi。
Flink SQL> insert into upsert_kafka_test_1 select * from mysql_test_1;Flink SQL> insert into hudi_test_1 select * from mysql_test_1 ;
在Flink Sql中,正常语境下,一条insert语句将触发一个flink 作业,当我们以上述方法执行sql时会触发2个flink作业。如果想节省运行资源,可以把两个作业合并成一个。同一个作业进行多输出时,可能会互相影响进度,触发反压。
三、 statement set语法
在Flink sql中,将多个insert作业合并为一个作业,需要用到statement set语法。先用"begin statement set;"启动语句集,然后执行任意条insert语句,最后以"end;"结束以触发作业执行。
以前述一个实现为例,使用statement set实现如下:
Flink SQL> begin statement set; --开启语句集Flink SQL> insert into upsert_kafka_test_1 select * from mysql_test_1;Flink SQL> insert into hudi_test_1 select * from mysql_test_1 ;Flink SQL> end; --结束语句集,执行作业提交
至此,我们可以根据需求结合statement set语句可以轻松实现同一作业多个输出的需求。