mysql-cdc支撑正则表达式的库名表名来匹配多个库多个表来获取分库分表情况下的mysql数据。只需要在创建flink源表时在数据库和表名上使用正则匹配即可。
建表语句:
DROp TABLE IF EXISTS `2person`;CREATE TABLE `2person` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) NOT NULL DEFAULT '', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=2003 DEFAULT CHARSET=utf8;-- ------------------------------ Records of 1person-- ----------------------------INSERT INTO `2person` VALUES ('2001', '2001name');INSERT INTO `2person` VALUES ('2002', '2name');DROP TABLE IF EXISTS `3person`;CREATE TABLE `3person` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) NOT NULL DEFAULT '', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3003 DEFAULT CHARSET=utf8;-- ------------------------------ Records of 1person-- ----------------------------INSERT INTO `3person` VALUES ('3001', '3001name');INSERT INTO `3person` VALUES ('3002', '3name');CREATE TABLE `person_sum` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(20) NOT NULL DEFAULT '', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3003 DEFAULT CHARSET=utf8;
java调用sql(也可以直接在flinksql客户端执行其中的sql):
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Mysql2MysqlRemote { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String sourceDDL = "CREATE TABLE mysql_binlog (n" + " id Int,n" + " name STRING,n" + " primary key (id) not enforcedn" + ") WITH (n" + " 'connector' = 'mysql-cdc',n" + " 'hostname' = '192.168.128.1',n" + " 'port' = '3306',n" + " 'username' = 'root',n" + " 'password' = '123456',n" + " 'database-name' = 'db_[0-9]?',n" + " 'table-name' = '[0-9]?persion[0-9]?'n" +// ", 'scan.startup.mode' = 'latest-offset'n" + ")"; String sinkDDL = "CREATE TABLE test_cdc (" + " id Int," + " name STRING," + " primary key (id) not enforced" + ") WITH (" + " 'connector' = 'jdbc'," + " 'driver' = 'com.mysql.cj.jdbc.Driver'," + " 'url' = 'jdbc:mysql://192.168.128.1:3306/db0?serverTimezone=UTC&useSSL=false'," + " 'username' = 'root'," + " 'password' = '123456'," + " 'table-name' = 'person_sum'" + ")"; // 简单的聚合处理 String transformDmlSQL = "insert into test_cdc select * from mysql_binlog"; System.out.println(sourceDDL); System.out.println(sinkDDL); System.out.println(transformDmlSQL); TableResult tableResult = tableEnv.executeSql(sourceDDL); TableResult sinkResult = tableEnv.executeSql(sinkDDL); TableResult result = tableEnv.executeSql(transformDmlSQL); // 等待flink-cdc完成快照 result.print(); env.execute("Mysql2MysqlRemote"); }}
pom.xml :
<?xml version="1.0" encoding="UTF-8"?>
执行后,flink会启动任务将存量数据同步到目标表,并且如果增量修改数据也会被同步过去,可以修改源表数据后再查看目标表中的数据库是否变化。
其他问题:
如果各表中的主键有相同的可以通过拼接数据库名和表名来组成联合主键。
在源表建表语句中中增加
database_name STRING metaDATA VIRTUAL,
table_name STRING metaDATA VIRTUAL,
在目标表建表语句中增加
database_name STRING,
table_name STRING,
并设置联合主键
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
如果分库不在一个机器上,可以使用union all来解决。
参考:
基于 Flink CDC 同步 MySQL 分库分表构建实时数据湖 — Flink CDC documentation
Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖 - 尚码园