欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink(58):Flink之FlinkCDC(上)

时间:2023-07-23

目录

0、相关文章链接

1、CDC简介

1.1、什么是CDC

1.2、CDC的种类

2、基于DataStream方式的FlinkCDC应用

2.1、导入依赖

2.2、编写代码

2.2.1、主类-从业务库中获取数据并写入到kafka中

2.2.2、自定义反序列化器

2.2.3、各方法参数详解

3. FlinkSQL方式的应用


0、相关文章链接

Flink文章汇总

1、CDC简介

1.1、什么是CDC

        CDC 是 Change Data Capture(变更数据获取) 的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2、CDC的种类

CDC 主要分为 基于查询 和 基于Binlog 两种方式,我们主要了解一下这两种之间的区别:

2、基于DataStream方式的FlinkCDC应用

2.1、导入依赖

UTF-8 UTF-8 8 8 1.12 1.12.0 1.2.0 2.12 3.1.3 1.2.17 1.7.21 5.1.49 1.2.75 1.9.4 29.0-jre 3.6.0 2.4.1 2.6.1 3.10 org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} com.alibaba.ververica flink-connector-mysql-cdc ${flink.cdc.version} org.apache.flink flink-hadoop-compatibility_2.11 ${flink.version} org.apache.hadoop hadoop-client ${hadoop.version} log4j log4j ${log4j.version} org.slf4j slf4j-api ${slf4j.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.slf4j jcl-over-slf4j ${slf4j.version} commons-beanutils commons-beanutils ${commons.beanutils.version} com.google.guava guava ${guava.version} mysql mysql-connector-java ${mysql.version} com.alibaba fastjson ${fastjson.version} com.squareup.okhttp3 okhttp ${okhttp.version} org.springframework.boot spring-boot-starter-jdbc ${springboot.version} com.zaxxer HikariCP ${HikariCP.version} org.apache.commons commons-lang3 ${commons.lang3.version}

2.2、编写代码

2.2.1、主类-从业务库中获取数据并写入到kafka中

package com.ouyang.gmall.realtime.app.ods;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.ouyang.gmall.realtime.app.function.CustomerDeserialization;import com.ouyang.gmall.realtime.utils.FlinkUtil;import com.ouyang.gmall.realtime.utils.ModelUtil;import com.ouyang.gmall.realtime.utils.MyKafkaUtil;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class GmallCDC { public static Logger logger = LoggerFactory.getLogger(GmallCDC.class); public static final String ODS_base_DB_TOPIC_NAME = ModelUtil.getConfigValue("kafka.topic.ods.base.db"); public static void main(String[] args) throws Exception { String applicationName = "gmall-cdc"; long interval = 5000L; // 1.获取执行环境,并配置checkpoint StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true); // 2.通过FlinkCDC构建SourceFunction并读取数据 DebeziumSourceFunction sourceFunction = MySQLSource.builder() .hostname(ModelUtil.getConfigValue("mysql.hostname")) .port(Integer.parseInt(ModelUtil.getConfigValue("mysql.port"))) .username(ModelUtil.getConfigValue("mysql.username")) .password(ModelUtil.getConfigValue("mysql.password")) .databaseList(ModelUtil.getConfigValue("mysql.database.gmall")) .deserializer(new CustomerDeserialization()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource streamSource = env.addSource(sourceFunction); //3.对数据进行日志打印,并将数据输出到Kafka中 streamSource .map(new MapFunction() { @Override public String map(String value) throws Exception { logger.warn(value); return value; } }) .addSink(MyKafkaUtil.getKafkaProducerExactlyonce(ODS_base_DB_TOPIC_NAME)); //4.启动任务 env.execute(applicationName); }}

2.2.2、自定义反序列化器

package com.ouyang.gmall.realtime.app.function;import com.alibaba.fastjson.JSONObject;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;public class CustomerDeserialization implements DebeziumDeserializationSchema { @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { //1.创建JSON对象用于存储最终数据 JSonObject result = new JSonObject(); //2.获取库名&表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\."); String database = fields[1]; String tableName = fields[2]; //3.获取 "before"数据 和 "after"数据 Struct value = (Struct) sourceRecord.value(); // 3.1、"before"数据 Struct before = value.getStruct("before"); JSonObject beforeJson = new JSonObject(); if (before != null) { Schema beforeSchema = before.schema(); for (Field field : beforeSchema.fields()) { Object beforevalue = before.get(field); beforeJson.put(field.name(), beforevalue); } } // 3.2、"after"数据 Struct after = value.getStruct("after"); JSonObject afterJson = new JSonObject(); if (after != null) { Schema afterSchema = after.schema(); for (Field field : afterSchema.fields()) { Object afterValue = after.get(field); afterJson.put(field.name(), afterValue); } } //4.获取timestamp long ts = (long) value.get("ts_ms"); //5.获取操作类型 CREATE UPDATe DELETE,并转换为小写,其中create转换为insert,方便后续写入 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if ("create".equals(type)) { type = "insert"; } //6.将字段写入JSON对象 result.put("database", database); result.put("tableName", tableName); result.put("before", beforeJson); result.put("after", afterJson); result.put("type", type); result.put("ts", ts); //7.输出数据 collector.collect(result.toJSonString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; }}

2.2.3、各方法参数详解

package com.ouyang.gmall.realtime.app.ods;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Test { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序 //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK env.enableCheckpointing(5000L); //2.2 指定 CK 的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从 CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //2.5 设置状态后端 env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC")); //2.6 设置访问 HDFS 的用户名 System.setProperty("HADOOP_USER_NAME", "atguigu"); //3.创建 Flink-MySQL-CDC 的 Source //initial (default): Performs an initial snapshot on the monitored database tables uponfirst startup, and continue to read the latest binlog. //latest-offset: Never to perform snapshot on the monitored database tables upon firststartup, just read from the end of the binlog which means only have the changes since theconnector was started. //timestamp: Never to perform snapshot on the monitored database tables upon firststartup, and directly read binlog from the specified timestamp、The consumer will traverse thebinlog from the beginning and ignore change events whose timestamp is smaller than thespecified timestamp. //specific-offset: Never to perform snapshot on the monitored database tables uponfirst startup, and directly read binlog from the specified offset. DebeziumSourceFunction mysqlSource = MySQLSource.builder() .hostname("hadoop102") .port(3306) .username("root") .password("000000") .databaseList("gmall-flink") //可选配置项,如果不指定该参数,则会 读取上一个配置下的所有表的数据, 注意:指定的时候需要使用"db.table"的方式 .tableList("gmall-flink.z_user_info") .startupOptions(StartupOptions.initial()) .deserializer(new StringDebeziumDeserializationSchema()) .build(); //4.使用 CDC Source 从 MySQL 读取数据 DataStreamSource mysqlDS = env.addSource(mysqlSource); //5.打印数据 mysqlDS.print(); //6.执行任务 env.execute(); } }

3. FlinkSQL方式的应用

package com.ouyang.gmall.cdc.app;import com.ouyang.gmall.common.utils.FlinkUtil;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class GmallCDCWithSQL { public static void main(String[] args) throws Exception { String applicationName = "gmall-cdc-sql"; long interval = 5000L; // 1.获取执行环境,并配置checkpoint StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.DDL方式建表 tableEnv.executeSql("CREATE TABLE mysql_binlog ( " + " id STRING NOT NULL, " + " tm_name STRING, " + " logo_url STRING " + ") WITH ( " + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'bigdata1', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = '123456', " + " 'database-name' = 'gmall', " + " 'table-name' = 'base_trademark' " + ")"); //3.查询数据 Table table = tableEnv.sqlQuery("select * from mysql_binlog"); //4.将动态表转换为流 DataStream> retractStream = tableEnv.toRetractStream(table, Row.class); retractStream.print(); //5.启动任务 env.execute(applicationName); }}


注:此博客根据某马2020年贺岁视频改编而来 -> B站网址

注:其他相关文章链接由此进 -> Flink文章汇总

注:此博文为介绍FlinkCDC的详细使用,如果需要了解FlinkCDC2.x的新特性可以查看 Flink(59):Flink之FlinkCDC(下) 博文 


Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。