Maxwell原理Maxwell安装部署Maxwell启停脚本使用Maxwell将增量同步的数据导入kafka从Kafka上传到hdfs历史数据全量同步
官网地址:http://maxwells-daemon.io/
·实时读取mysql数据库中的二进制日志(binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka;maxwell将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据·MySQL主从复制·maxsql的binlog模式·statement based:基于语句·Row-based:基于行·mixed:混合模式,默认statement based,可能导致数据不一致的自动切换Row-based
Maxwell安装部署地址:https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz(注:Maxwell-1.30.0及以上版本不再支持JDK1.8。)·修改mysql配置---/etc/my.conf·增加配置#数据库idserver-id = 1#启动binlog,该参数的值会作为binlog的文件名log-bin=mysql-bin#binlog类型,maxwell要求为row类型binlog_format=row#启用binlog的数据库,需根据实际情况作出修改binlog-do-db=gmall·Maxwell需要在MySQL中存储其运行过程中的所需的一些数据,包括binlog同步的断点位置(Maxwell支持断点续传)等等,故需要在MySQL为Maxwell创建数据库及用户。·创建maxwell数据库以及用户·创建数据库·CREATE DATAbase maxwell;·如果没有初始化mysql,需要调整策略级别 ·set global validate_password_policy=0; ·set global validate_password_length=4; ·创建maxwell用户并赋予其必要权限 · CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell'; · GRANT ALL ON maxwell.* TO 'maxwell'@'%'; · GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';·配置maxwell的config.properties#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redisproducer=kafka#目标Kafka集群地址kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}kafka_topic=maxwell#MySQL相关配置host=hadoop102user=maxwellpassword=maxwelljdbc_options=useSSL=false&serverTimezone=Asia/Shanghai·启停maxwell·/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon· ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9
Maxwell启停脚本#!/bin/bashif [[ $# -lt 1 ]]; thenecho "Pleace Input Args"exitfiMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){#grep -v grep:过滤掉包含grep的进程性 ; wc -l:wc统计文件的行数:lineresult=`ps -ef |grep maxwell |grep -v grep |wc -l`return $result}start_maxwell(){status_maxwell# $?:能调上一个命令的返回结果if [[ $? -lt 1 ]]; thenecho "启动maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell已经启动,请勿再次启动"fi}stop_maxwell(){status_maxwellif [[ $? -eq 0 ]]; thenecho "Maxwell未启动"else#xargs之前的结果作为参数传给后面ps -ef | grep maxwell |grep -v grep | awk '{print $2}'| xargs kill -9fi}case $1 in"start" )start_maxwell;;"stop" )stop_maxwell;;"restart" )stop_maxwellstart_maxwell;;esac
使用Maxwell将增量同步的数据导入kafka·修改maxwell的config.properties·Kafka的topic以表名来自动创建·kafka_topic=%{table}·表过滤(include包含、exclude不包含)·filter= include:gmall.cart_info,include:gmall.comment_info,include:gmall.coupon_use·修改kafka自动创建的topic的分区数与副本数量·num.partitions=3·default.replication.factor=3
从Kafka上传到hdfs·编写agent的.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source | kafka source 头信息中会自带topic和时间戳,时间为进入flume的时间a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092a1.sources.r1.kafka.topics = cart_info,comment_info,coupon_use,favor_info,order_detail_activity,order_detail_coupon,order_detail,order_info,order_refund_info,order_status_log,payment_info,refund_payment,user_infoa1.sources.r1.kafka.consumer.group.id = groupa1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.collect.interceptor.db.TimestampInterceptor$Builder# Describe the sinka1.sinks.k1.type = hdfs###kafka source中有两个参数 setTopicHeader:默认为true topicHeader:默认为topic;表示头信息中有一对kv值,key为topic、value为kafka的topic,因此在hdfs sink中可以使用占位符%{topic}来直接获取topica1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{topic}_inc/%Y-%m-%da1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 134217728a1.sinks.k1.hdfs.rollCount = 0##控制输出文件时原生文件,不序列化同时可以压缩a1.sinks.k1.hdfs.fileType = CompressedStreama1.sinks.k1.hdfs.codeC = gzip# Use a channel which buffers events in filea1.channels.c1.type = file#如果之前使用过file channel,需要将下面behavior进行修改a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1·拦截器(记得分发到集群lib下)public class TimestampInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { String body = new String(event.getBody()); JSonObject js = JSONObject.parseObject(body); String ts = js.getString("ts"); Map
·对历史数据进行全量同步·bin/maxwell-bootstrap --database 数据库名 --table 表名 --config config.properties·bootstrap数据格式注意事项:1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。