欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Maxwell:实时监控MySQL数据库的数据变更操作,并以JSON格式发送给Kafka

时间:2023-04-19
文章目录

Maxwell原理Maxwell安装部署Maxwell启停脚本使用Maxwell将增量同步的数据导入kafka从Kafka上传到hdfs历史数据全量同步

官网地址:http://maxwells-daemon.io/


字段解释database变更数据所属的数据库table表更数据所属的表*type*数据变更类型*ts*数据变更发生的时间xid事务idcommit事务提交标志,可用于重新组装事务*data*对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据*old*对于update类型,表示修改之前的数据,只包含变更字段Maxwell原理

·实时读取mysql数据库中的二进制日志(binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka;maxwell将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据·MySQL主从复制·maxsql的binlog模式·statement based:基于语句·Row-based:基于行·mixed:混合模式,默认statement based,可能导致数据不一致的自动切换Row-based

Maxwell安装部署

地址:https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz(注:Maxwell-1.30.0及以上版本不再支持JDK1.8。)·修改mysql配置---/etc/my.conf·增加配置#数据库idserver-id = 1#启动binlog,该参数的值会作为binlog的文件名log-bin=mysql-bin#binlog类型,maxwell要求为row类型binlog_format=row#启用binlog的数据库,需根据实际情况作出修改binlog-do-db=gmall·Maxwell需要在MySQL中存储其运行过程中的所需的一些数据,包括binlog同步的断点位置(Maxwell支持断点续传)等等,故需要在MySQL为Maxwell创建数据库及用户。·创建maxwell数据库以及用户·创建数据库·CREATE DATAbase maxwell;·如果没有初始化mysql,需要调整策略级别 ·set global validate_password_policy=0; ·set global validate_password_length=4; ·创建maxwell用户并赋予其必要权限 · CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'; · GRANT ALL ON maxwell.* TO 'maxwell'@'%'; · GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';·配置maxwell的config.properties#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redisproducer=kafka#目标Kafka集群地址kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}kafka_topic=maxwell#MySQL相关配置host=hadoop102user=maxwellpassword=maxwelljdbc_options=useSSL=false&serverTimezone=Asia/Shanghai·启停maxwell·/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon· ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

Maxwell启停脚本

#!/bin/bashif [[ $# -lt 1 ]]; thenecho "Pleace Input Args"exitfiMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){#grep -v grep:过滤掉包含grep的进程性 ; wc -l:wc统计文件的行数:lineresult=`ps -ef |grep maxwell |grep -v grep |wc -l`return $result}start_maxwell(){status_maxwell# $?:能调上一个命令的返回结果if [[ $? -lt 1 ]]; thenecho "启动maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell已经启动,请勿再次启动"fi}stop_maxwell(){status_maxwellif [[ $? -eq 0 ]]; thenecho "Maxwell未启动"else#xargs之前的结果作为参数传给后面ps -ef | grep maxwell |grep -v grep | awk '{print $2}'| xargs kill -9fi}case $1 in"start" )start_maxwell;;"stop" )stop_maxwell;;"restart" )stop_maxwellstart_maxwell;;esac

使用Maxwell将增量同步的数据导入kafka

·修改maxwell的config.properties·Kafka的topic以表名来自动创建·kafka_topic=%{table}·表过滤(include包含、exclude不包含)·filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use·修改kafka自动创建的topic的分区数与副本数量·num.partitions=3·default.replication.factor=3

从Kafka上传到hdfs

·编写agent的.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source | kafka source 头信息中会自带topic和时间戳,时间为进入flume的时间a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092a1.sources.r1.kafka.topics = cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_infoa1.sources.r1.kafka.consumer.group.id = groupa1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.collect.interceptor.db.TimestampInterceptor$Builder# Describe the sinka1.sinks.k1.type = hdfs###kafka source中有两个参数 setTopicHeader:默认为true topicHeader:默认为topic;表示头信息中有一对kv值,key为topic、value为kafka的topic,因此在hdfs sink中可以使用占位符%{topic}来直接获取topica1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%da1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 134217728a1.sinks.k1.hdfs.rollCount = 0##控制输出文件时原生文件,不序列化同时可以压缩a1.sinks.k1.hdfs.fileType = CompressedStreama1.sinks.k1.hdfs.codeC = gzip# Use a channel which buffers events in filea1.channels.c1.type = file#如果之前使用过file channel,需要将下面behavior进行修改a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1·拦截器(记得分发到集群lib下)public class TimestampInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { String body = new String(event.getBody()); JSonObject js = JSONObject.parseObject(body); String ts = js.getString("ts"); Map headers = event.getHeaders(); headers.put("timestamp",ts + "000"); return event; } @Override public List intercept(List events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new TimestampInterceptor(); } @Override public void configure(Context context) { } }}

历史数据全量同步

·对历史数据进行全量同步·bin/maxwell-bootstrap --database 数据库名 --table 表名 --config config.properties·bootstrap数据格式注意事项:1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。