欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

canal:mysql数据实时同步kafka

时间:2023-05-01
文章目录

1,环境准备:zk, kafka, mysql-master

1.1,mysql启用binlog1.2,启动zk, kafka 2,安装配置canal3,测试使用
官网介绍:https://github.com/alibaba/canal/wiki/Introduction
1,环境准备:zk, kafka, mysql-master 角色IPmysql-master192.168.56.1 (windows)zk, kafka, canal1,canal2192.168.56.7 (centos7)1.1,mysql启用binlog

[root@c7 kafka_2.11-1.1.0-packs]# mysql -uroot -proot -h192.168.56.1Warning: Using a password on the command line interface can be insecure.Welcome to the MySQL monitor、 Commands end with ; or g.Your MySQL connection id is 50Server version: 5.7.34-log MySQL Community Server (GPL)Copyright (c) 2000, 2015, Oracle and/or its affiliates、All rights reserved.Oracle is a registered trademark of Oracle Corporation and/or itsaffiliates、Other names may be trademarks of their respectiveowners.Type 'help;' or 'h' for help、Type 'c' to clear the current input statement.mysql> show variables like 'binlog_format';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW |+---------------+-------+1 row in set, 1 warning (0.00 sec)mysql> show variables like 'server_id';+---------------+-------+| Variable_name | Value |+---------------+-------+| server_id | 1 |+---------------+-------+1 row in set, 1 warning (0.01 sec)mysql> show master status;+------------------+----------+--------------+------------------+-------------------+| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |+------------------+----------+--------------+------------------+-------------------+| mysql-bin.000007 | 1817 | | | |+------------------+----------+--------------+------------------+-------------------+1 row in set (0.01 sec)

1.2,启动zk, kafka

[root@c7 zookeeper-3.5.5]# cd conf/[root@c7 conf]# cat zoo.cfg |grep -v ^#tickTime=2000initLimit=10syncLimit=5dataDir=/var/lib/zookeeper/clientPort=2181admin.serverPort=8881[root@c7 conf]# cd ..[root@c7 zookeeper-3.5.5]# sh bin/zkServer.sh startZooKeeper JMX enabled by defaultUsing config: /root/zookeeper-3.5.5/bin/../conf/zoo.cfgStarting zookeeper ..、already running as process 3108.[root@c7 kafka_2.11-1.1.0-packs]# cat config/server.properties |grep -v ^# |grep -v ^$broker.id=0num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/export/common/kafka_2.11-1.1.0/logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0[root@c7 kafka_2.11-1.1.0-packs]# sh bin/kafka-server-start.sh -daemon config/server.properties[root@c7 kafka_2.11-1.1.0-packs]# tail logs/server.log[2022-02-14 14:56:44,444] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)[2022-02-14 14:56:44,444] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)[2022-02-14 14:56:44,444] INFO Client environment:user.dir=/root/kafka_2.11-1.1.0-packs (org.apache.zookeeper.ZooKeeper)[2022-02-14 14:56:44,446] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@6c64cb25 (org.apache.zookeeper.ZooKeeper)[2022-02-14 14:56:44,883] INFO [ZooKeeperClient] Waiting until connected、(kafka.zookeeper.ZooKeeperClient)[2022-02-14 14:56:44,883] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181、Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)[2022-02-14 14:56:44,935] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)[2022-02-14 14:56:44,943] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100002e9f38000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)[2022-02-14 14:56:44,945] INFO [ZooKeeperClient] Connected、(kafka.zookeeper.ZooKeeperClient)[2022-02-14 14:56:46,492] INFO Cluster ID = Laod0bPIRiCmex0wK3QRjw (kafka.server.KafkaServer)

2,安装配置canal

canal2 只要修改canal.properties中对应的端口,即可启动

[root@c7 ~]# ll -h canal.deployer-1.1.5.tar.gz-rw-r--r-- 1 root root 58M Feb 14 11:07 canal.deployer-1.1.5.tar.gz[root@c7 ~]# mkdir canal[root@c7 ~]# tar -xf canal.deployer-1.1.5.tar.gz -C canal[root@c7 ~]# ll canaltotal 4drwxr-xr-x 2 root root 93 Feb 14 14:37 bindrwxr-xr-x 5 root root 151 Feb 14 14:44 confdrwxr-xr-x 2 root root 4096 Feb 14 11:08 libdrwxrwxrwx 4 root root 34 Feb 14 11:18 logsdrwxrwxrwx 2 root root 177 Apr 19 2021 plugin[root@c7 ~]# cd canal/conf[root@c7 conf]# vim canal.properties########################################################## common argument ############################################################### tcp bind ipcanal.ip =# register ip to zookeepercanal.register.ip =192.168.56.7canal.port = 11111canal.metrics.pull.port = 11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config#canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# admin auto register#canal.admin.register.auto = true#canal.admin.register.cluster =#canal.admin.register.name =canal.zkServers = 192.168.56.7:2181# flush data to zkcanal.zookeeper.flush.period = 1000canal.withoutNetty = false# tcp, kafka, rocketMQ, rabbitMQcanal.serverMode = kafka....########################################################## destinations ##############################################################canal.destinations = example# conf root dircanal.conf.dir = ../conf....########################################################### Kafka ###############################################################kafka.bootstrap.servers = 192.168.56.7:9092....[root@c7 conf]# vim example/instance.properties################################################### mysql serverId , v1.0.26+ will autoGen# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address=192.168.56.1:3306canal.instance.master.journal.name=mysql-bin.000007canal.instance.master.position=1817....# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=rootcanal.instance.connectionCharset = UTF-8....# mq config (需提前创建好topic )canal.mq.topic=canal_test....

3,测试使用

问题:主备节点切换时,发现会向kafka发送部分重复数据。

[root@c7 ~]# sh canal/bin/startup.shfound canal.pid , Please run stop.sh first ,then startup.sh[root@c7 ~]# sh canal2/bin/startup.shfound canal.pid , Please run stop.sh first ,then startup.sh#查看 zk 上的记录[zk: localhost:2181(CONNECTED) 0] ls /[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, otter, controller_epoch, clickhouse, consumers, latest_producer_id_block, config][zk: localhost:2181(CONNECTED) 1] ls /otter[canal][zk: localhost:2181(CONNECTED) 2] ls /otter/canal[cluster, destinations][zk: localhost:2181(CONNECTED) 3] ls /otter/canal/cluster[192.168.56.7:11111, 192.168.56.7:22222][zk: localhost:2181(CONNECTED) 4] ls /otter/canal/destinations[example][zk: localhost:2181(CONNECTED) 5] ls /otter/canal/destinations/example[running, cluster][zk: localhost:2181(CONNECTED) 6] ls /otter/canal/destinations/example/running[][zk: localhost:2181(CONNECTED) 7] get /otter/canal/destinations/example/running{"active":true,"address":"192.168.56.7:11111"}cZxid = 0xa53ctime = Mon Feb 14 14:19:08 CST 2022mZxid = 0xa53mtime = Mon Feb 14 14:19:08 CST 2022pZxid = 0xa53cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x100002e9f380008dataLength = 46numChildren = 0[zk: localhost:2181(CONNECTED) 8][zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster[192.168.56.7:11111, 192.168.56.7:22222]####创建数据库表,并插入数据mysql> create database t1;Query OK, 1 row affected (0.11 sec)mysql> use t1;Database changedmysql> create table per(id int);Query OK, 0 rows affected (0.25 sec)mysql> insert into per values(6),(7); Query OK, 2 rows affected (0.06 sec)Records: 2 Duplicates: 0 Warnings: 0mysql> insert into per values(8);Query OK, 1 row affected (0.16 sec)###########消费kafka topic[root@c7 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canal_test{"data":null,"database":"t1","es":1644819646000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database t1","sqlType":null,"table":"","ts":1644819647070,"type":"QUERY"}{"data":null,"database":"t1","es":1644819657000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table per(id int)","sqlType":null,"table":"per","ts":1644819658225,"type":"CREATE"}{"data":[{"id":"6"},{"id":"7"}],"database":"t1","es":1644819670000,"id":6,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"per","ts":1644819670986,"type":"INSERT"}{"data":[{"id":"8"}],"database":"t1","es":1644820545000,"id":7,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"per","ts":1644820546063,"type":"INSERT"}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。