1、流程分析
前面已经将日志数据(ods_base_log)及业务数据(ods_base_db_m)发送到kafka,作为ods层,接下来要做的就是通过flink消费kafka 的ods数据,进行简单的处理作为dwd层,然后再写回到kafka。
每层职能
分层 | 数据描述 | 计算工具 | 存储介质 | ODS原始数据,日志和业务日志服务器,maxwellkafkaDWD根据数据对象为单位进行分流,比如订单、页面访问等等。flinkkafkaDWM对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。flinkkafkaDIM维度数据flinkhbaseDWS根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。flinkclickhouseADS把 Clickhouse 中的数据根据可视化需要进行筛选聚合。clickhouse,sql可视化展示
目前进行到的阶段
2、环境搭建
https://github.com/zhangbaohpu/gmall-flink-parent
环境
jdk-1.8flink-1.12scala-2.12hadoop-2.7.7
在项目中新建maven模块gmall-realtime,没有父工程,pom文件如下
<?xml version="1.0" encoding="UTF-8"?> 4.0.0 com.zhangbao.gmall gmall-realtime 1.0-SNAPSHOT 1.8 8 8 1.12.0 2.12 2.7.7 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-cep_${scala.version} ${flink.version} org.apache.flink flink-json ${flink.version} com.alibaba fastjson 1.2.68 org.apache.hadoop hadoop-client ${hadoop.version} org.slf4j slf4j-api 1.7.25 org.slf4j slf4j-log4j12 1.7.25 org.apache.logging.log4j log4j-to-slf4j 2.14.0 org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
项目结构
log4j.properties
log4j.rootLogger=warn,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.target=System.outlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
接下来的准备:
我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日志 DWD 层。