目录
0、相关文章链接
1、CDC简介
1.1、什么是CDC
1.2、CDC的种类
1.3、Flink-CDC
2、基于DataStream方式的FlinkCDC应用
2.1、导入依赖
2.2、编写代码
2.2.1、主类-从业务库中获取数据并写入到kafka中
2.2.2、自定义反序列化器
2.2.3、各方法参数详解
3. FlinkSQL方式的应用
0、相关文章链接
Flink文章汇总
1、CDC简介 1.1、什么是CDC
CDC 是 Change Data Capture(变更数据获取) 的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
1.2、CDC的种类
CDC 主要分为 基于查询 和 基于Binlog 两种方式,我们主要了解一下这两种之间的区别:
1.3、Flink-CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。
FlinkCDC开源地址: https://github.com/ververica/flink-cdc-connectors
2、基于DataStream方式的FlinkCDC应用 2.1、导入依赖
2.2、编写代码 2.2.1、主类-从业务库中获取数据并写入到kafka中
package com.ouyang.gmall.realtime.app.ods;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.ouyang.gmall.realtime.app.function.CustomerDeserialization;import com.ouyang.gmall.realtime.utils.FlinkUtil;import com.ouyang.gmall.realtime.utils.ModelUtil;import com.ouyang.gmall.realtime.utils.MyKafkaUtil;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class GmallCDC { public static Logger logger = LoggerFactory.getLogger(GmallCDC.class); public static final String ODS_base_DB_TOPIC_NAME = ModelUtil.getConfigValue("kafka.topic.ods.base.db"); public static void main(String[] args) throws Exception { String applicationName = "gmall-cdc"; long interval = 5000L; // 1.获取执行环境,并配置checkpoint StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true); // 2.通过FlinkCDC构建SourceFunction并读取数据 DebeziumSourceFunction
2.2.2、自定义反序列化器
package com.ouyang.gmall.realtime.app.function;import com.alibaba.fastjson.JSONObject;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;public class CustomerDeserialization implements DebeziumDeserializationSchema
2.2.3、各方法参数详解
package com.ouyang.gmall.realtime.app.ods;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Test { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序 //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK env.enableCheckpointing(5000L); //2.2 指定 CK 的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从 CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //2.5 设置状态后端 env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC")); //2.6 设置访问 HDFS 的用户名 System.setProperty("HADOOP_USER_NAME", "atguigu"); //3.创建 Flink-MySQL-CDC 的 Source //initial (default): Performs an initial snapshot on the monitored database tables uponfirst startup, and continue to read the latest binlog. //latest-offset: Never to perform snapshot on the monitored database tables upon firststartup, just read from the end of the binlog which means only have the changes since theconnector was started. //timestamp: Never to perform snapshot on the monitored database tables upon firststartup, and directly read binlog from the specified timestamp、The consumer will traverse thebinlog from the beginning and ignore change events whose timestamp is smaller than thespecified timestamp. //specific-offset: Never to perform snapshot on the monitored database tables uponfirst startup, and directly read binlog from the specified offset. DebeziumSourceFunction
3. FlinkSQL方式的应用
package com.ouyang.gmall.cdc.app;import com.ouyang.gmall.common.utils.FlinkUtil;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class GmallCDCWithSQL { public static void main(String[] args) throws Exception { String applicationName = "gmall-cdc-sql"; long interval = 5000L; // 1.获取执行环境,并配置checkpoint StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.DDL方式建表 tableEnv.executeSql("CREATE TABLE mysql_binlog ( " + " id STRING NOT NULL, " + " tm_name STRING, " + " logo_url STRING " + ") WITH ( " + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'bigdata1', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = '123456', " + " 'database-name' = 'gmall', " + " 'table-name' = 'base_trademark' " + ")"); //3.查询数据 Table table = tableEnv.sqlQuery("select * from mysql_binlog"); //4.将动态表转换为流 DataStream
注:此博客根据某马2020年贺岁视频改编而来 -> B站网址
注:其他相关文章链接由此进 -> Flink文章汇总
注:此博文为介绍FlinkCDC的详细使用,如果需要了解FlinkCDC2.x的新特性可以查看 Flink(59):Flink之FlinkCDC(下) 博文