一、项目分点
1.1 集群规模1.2 框架结构,画出来1.3 框架
1.3.1 第一个Flume
1.3.1.1 碰到的问题 1.3.2 kafka
1.3.2.1 框架介绍1.3.2.2 碰到的问题1.3.2.3 优化 1.3.3 第二个flume
1.3.3.1 框架1.3.3.2 遇到的问题 1.3.4 hdfs
1.3.4.1 遇到的问题 1.3.5 业务数据
1.3.5.1 组成1.3.5.2 碰到的问题 1.3.6 hive
1.3.6.1 组成1.3.6.2 使用1.3.6.3 碰到的问题1.3.6.4 常规操作 1.4 数仓
1.4.1 ODS层1.4.2 DWD层
1.4.2.1 建模理论1.4.2.2 维度表1.4.2.3 事实表1.4.2.4 还做了其他3件事情 1.4.3 DWS层1.4.4 DWT层1.4.5 ADS层 1.5 实时项目
1.5.1 框架部分
1.5.1.1 Canal 1.5.2 实时指标1.5.3 实现方式
个人介绍
一、项目分点 1.1 集群规模(12台物理机:128G内存,8T机械硬盘,2T固态硬盘,20核40线程,戴尔4万多一台)
1.2 框架结构,画出来 日志部分:
app前端埋点 -> 日志服务器 -> 落盘日志 -> flume -> kafka -> flume -> hdfs -> hive -> mysql
业务数据部分
Java后台采集数据 -> 日志服务器 -> mysql -> hdfs -> hive -> mysql
组件:source 、 channel 、 sink 、三个器 、碰到的问题
①source
我们使用的是taildirsource,这个是apache 1.7版本才有,选择这个source的原因是taildirsource可以实时监控多个文件且有断点续传功能
②channel
Channel一共有三种:filechannel、memorychannel和kafkachannel
fileChannel是基于磁盘,io多,性能差,但是可靠性高
Memorychannel基于内存,性能高,但是可靠性相对低,存在丢失数据的风险
Kafkachannel是基于磁盘,可靠性高,性能还优于memorychannel + kafkasink
我们是将数据采集到kafka,所以我们使用了kafkachannel
③sink
kafkachannel可以直接将数据发送到kafka,所以我们没有使用sink。
④拦截器
我们使用了etl拦截器,过滤掉不完整的josn数据
同时还使用了分类拦截器,我们的日志分为了5类数据,启动、页面、动作、曝光和错误数据,我通过给event的header加上对应的标签,后面配合多路复用的选择器,指定不同类型的数据去到不同的topic中。
我们定义拦截器的步骤:
①自定义一个类,实现interceptor,实现4个抽象方法:分别是:初始化、关闭资源、单个event和多个event方法,
②创建一个内部类实现builder类,实现两个抽象方法。
③最后打包 -> 上传到flume 的lib包下 -> 在配置文件中添加拦截器,写上全类名$bulid类
⑤选择器:
一共有两种选择器,一种是replicating,默认的选择器,每一个通道发送一份数据,另外一种是multiplexing,多路复用,根据event的header指定数据去到哪个通道,我们选择的多路复用选择器,将数据发送到kafka不同topic中。
⑥监控器
我们还使用到ganglia监控器,对flume的运行状况进行监控,主要是监控flume的put和take事务,当尝试提交的次数远远大于成功提交的次数以后,我们对flume进行优化,通过配置flume-env文件,增大flume的内存,默认是2G,我们调整到4G
同时在双十一、618等大型活动,flume采集通道也是抗不住这么大的数据量,我们通过临时购买阿里云的日志服务器,然后将flume部署到日志服务器上。
我们遇到过flume挂掉的情况,我们当时的分析是:
Source -> channel有put事务,channel 到sink有take事务,所以这个两个环节是没有问题的,后端是kafka,那么是暂时无法获取数据,也没有关系。
采集数据的前端,使用的是taildirsource,它的特性是先拉取数据,再维护offset,所以不会丢失数据,可能存在重复数据,针对重复数据,我们开会分讨论是:
可以通过增加事务的方式实现不重复数据,但我们评估这样做性能非常低,我们在hive的dwd层,可以通过groupby进行去重,也可以使用sparkSQL或者redis进行去重,所以在这里我们就没有进行处理。
Kafka我们从三个方面向您介绍,框架、碰到的问题和优化
1.3.2.1 框架介绍 ①kafka有4个组件:生产者 、消费者 、 brokers 和 zk
Zk保存着消费者的offset和brokers的ids等信息
②数据量:日常每天80-100g,大概平均的数据生产速度1M/s,不过到了峰值时间,晚上7-9点时间段,最高达到了20MB/S,大概有2W人同时在线
③kafka的台数,通过kafak自带的压测工具,测试生产峰值速度,我们当时选择了3台kafka。
④分区的数量
分区数量 = 期望的吞吐量 / min(生产峰值速度,消费最大的速度)
我们当时设置的是5个分区
⑤存储大小
Kafka数据默认保存7天,我们调整到3天,因为正常情况下,我们是可以跑完当天的数据,当出现异常时,第二天可以重跑一次,所以我们保留3天是足够的。我们给kakfa硬盘的大小为:每天的数据量 *副本数 * 保留的天数 / buffer(0.7) ,大概是0.8T,我们预留了1T
我们还遇到了kafka挂了,数据重复、数据丢失、数据挤压问题
①对于kafka挂了,kafka收不到消息,后端flume无法获取数据,没有什么问题,前面的flume,数据会快速挤压在channel中,最多就是后面挤压满了,但是往前是日志服务器,保留了30天的数据,所以就没有什么关系,直接重启就可以了
②对于数据丢失,这个重要看ack的配置,ack有0,1,-1三种,0表示leader一收到数据就回复ack,可能丢失数据,企业中已经不使用,1表示leader落盘以后回复ack,可靠性相对可靠,-1表示所有的副本都落盘以后再回复ack,可靠性高,性能相对较慢,我们传输的是日志数据,所以采用了ack=1的方式,
③对于数据重复,可以通过事务 + ack =-1 + 幂等性来解决这个问题,幂等性通过(生产者id,分区号,序列号)对一个分区内的数据进行标识,可以保证分区内的数据不会重复,当生产者重启时,生产者的id号会发生改变,这样同一条数据就可能会被重复发送到kafka,通过事务将pid和事务id进行绑定,解决了这个问题,不过我们通过会议讨论,这样会严重影响性能,所以这里我们就不做处理,等hive的dwd层进行去重。
④同时我们还遇到了数据挤压的问题,我们做了两个优化:
一是:增加分区,同时增加下一级消费者的cpu核数
二是:通过修改batchsize参数,提高消费者每次拉取的数据量,默认是1000条,我们把它调整到2000,极端情况下我们也调整过到3000条/s
我们通过修改参数对kafka进行优化:
①将数据保存时间由默认的7天修改为3天
②副本数由默认1个修改为2个,增加可靠性
③增加副本拷贝过程中leader和follower之间的通信时间,避免因为网络问题,不断的拷贝同一副本
④对producer发送的数据采用是压缩
⑤我们还调整了kafka的内存大小,由1g调整到6g,提高性能
我从三个方面介绍:
框架 、 遇到的问题、优化
①source:我们是采集kafka的数据,所以使用kafka source
②channel:我们选择了memory channel,传输普通的日志,如果出现宕机,最多是100evnet个数据发生丢失,我们评估能接受
③sink:数据写到hdfs上,使用了hdfs sink
①刚开始我们把数据写到hdfs时,遇到了有很多小文件问题,通过flume的官网,发现有三个参数可以减少小文件的情况,分别是文件滚动大小、时间和event的个数,我将文件滚动的大小设置为128M,时间设置为2H,就是每128m生成一个新文件,如果超过2h没有达到128M,也是生成一个文件,event设置为0,就是不启用,因为我们不知道每一个event的数据大小,如果使用的话,会造成hdfs上的数据量大小不一。
②还遇到了头一天的数据发送到了第二天的文件夹中。通过查询GitHub的资料,发现是kafkasource会获取系统时间并添加到event的header中,而我们是依据这个header中时间戳,将数据发送到指定的文件夹中。
我们解决方式是:通过自定义拦截器,获取event的body中时间戳,并把时间戳添加到timestamp属性中,这样就实现了当天的数据能进入当天的文件中。
③同时我们还遇到了hadoop宕机了,通过调小从kafka中拉取数据的batchsize大小来调整往hdfs写数据的速度,解决了这个问题。
Log数据写到hdfs的时候,发现还有一些小文件,由于在namenode中,一个文件占用的内存大小是固定150字节,那namenode的资源利用率就低了,并且在后面计算任务的时候,每一个小文件会创建一个maptask,对集群资源也是浪费。
对于小文件,我们采用了三种方式
①har归档,将多个小文件压缩在一起,类似Windows下的zip压缩,对于namenode来说就是一个整体,同时采用har协议,可以将里面的文件一个一个取出来。
②后面我们采用combinerInputFormat,将多个小文件捏合在一起进行统一切片,减少maptask的数量
③同时开启了jvm重用,提高处理的效率,因为对于小文件来说,执行的时间比开关jvm的时间还短,这个时候我们就多个任务开关一次,不过开启jvm重启是一个双刃剑,在没有小文件的场景下,不要开启这个功能。
这样我们的log数据就去到了hdfs中。每天一个文件夹,每个文件中存储了当天日志数据。
①配置
同时我们还有业务数据,通过sqoop将业务数据导入到hdfs层,sqoop的配置比较简单,主要配置的参数有mysql连接的url、用户、密码以及数据写到hdfs的路径、如果文件存在则删除。
②同步策略
我们将数据导入到hdfs时,根据数据量和数据的变化情况,使用不同的同步策略,主要是考虑在满足业务需求的情况下减少数据重复。
同步规则是(画图说明)
a、如果表数据比较少,采用全量同步策略,导入时过滤条件是1=1
b、如果表数据量大,但是数据只有新增数没有变化,采用新增同步策略,导入时过滤条件是创建时间等于今天
c、如果表数据量大,数据有新增也有变化,就采用新增及变化同步策略,导入时过滤条件是创建时间或操作时间等于今天
d、还有一种是特殊表,默认数据不会变化,如地区和省份表,只导入一次。
我们是每天的凌晨12点半左右开始导表,大概需要40分钟,遇到数据量比较大的时候,如双十一、618等大型活动,大概需要1个小时左右。
1.3.5.2 碰到的问题 在导入的时候也碰到了一些问题
①hive中null值存储是N,但是mysql中是null,数据不一样,我们通过配置–null-string,–null-no-string参数进行数据转换
最终log数据和业务数据都存储到了hdfs中,之后通过load的方式,将数据加载到hive里面,像hive这个框架也有很多技术点,从三个方面介绍这个框架
1.3.6.1 组成 Hive有元数据、4个器、计算引擎和存储
元素据
元数据默认是存储在derby数据库,但由于这个数据库仅支持单客户端访问,所以后面我们就把元数据存储在mysql中。
4个器
这边4个器是解析器、编译器、优化器和执行器
计算引擎
计算引擎有mr、tez和spark。
Mr引擎是基于磁盘,任务内部要落盘,io大,性能差,我们一般用来执行周、月、季度等长周期任务。
Tez引擎,是基于内存,对内存的要求高,计算的数据量如果很大,会出现oom的情况,所以我们一般用来执行一些临时任务
Spark引擎,是基于内存,中间也会有落盘,我们一般用来执行当天的任务。
存储
数据最终是存储在hdfs中。
内部表和外部表
hive数仓中,我们用到了内部表和外部表,两者的最大区别是:删除内部表元数据和原始数据都会被删除,而删除外部表,只会删除元数据不会删除原始数据,我自己使用的一些临时表采用内部表,其他的表基本是外部表,用来防止因误操作将原始数据删除了。
4个by
当然,还使用了4个by,分别是order by 、 sort by 、distribute by和cluster by。Order by 很少使用,因为是全局排序,很容易出现oom,sort by 和distribute by 一般是配合使用,分区内排序,当分区字段和排序字段相同时,可以使用cluster by 代替,不过用的比较少。
系统函数
在计算指标时,我们使用了各种函数,如系统函数,用next_day处理周指标,用get_json_object对log数据进行解析,还使用了开窗函数,rank 和over函数,计算topN指标
在数仓的过程中,也遇到了很多问题。
问题1:大表 和 大表问题2:小表和大表问题3:单个key数据倾斜问题4:多个key数据倾斜
在数仓计算的过程中,遇到了数据倾斜的问题,当时我们发现有一个reducetask卡在99%,而其他的任务都执行完成了,第一反应可能是数据倾斜了,然后对数据进行group by 求count,发现null的数据很多,然后我们采取使用随机数将null值打散,然后对计算结果数据进行转换,去掉随机数,再进行一次聚合。这个问题解决了,
后来我们还开启了负载均衡的功能。
在hive使用的过程中,做了一些常规优化
一是参数优化:
①开启mapjoin、开启map端的combiner和使用压缩
遇到小文件时
①开启了merge功能:就是将任务结束时产生的多个小于16m的小文件捏合成一个256M的大文件
②使用combinerhiveinputformat;将多个文件捏合在一起,减少maptask的数量
③开启jvm重用
二是业务优化:
①创建分区表:避免全局扫描
②先过滤再计算
③列式存储:提高数据查询的速度
④合理设置reduce的个数:避免资源的浪费
⑤根据需求更换对应的计算引擎
这就是hive当中的一些事情。
1.4 数仓 之后我们基于hive,搭建了一个离线数仓,我们的数仓分为5层,ods、dwd、dws、dwt和ads层。
首先,我们通过ezdml工具分析mysql表之间的关系。
从三个方面聊一聊ods层
①表的字段:
Ods层表的字段:
Log数据,创建一个新表,表只有一个字段,line,log一条日志就是一条数据
业务数据,字段和mysql的字段保持一致。
②表的维护
获取hdfs中当天的数据,直接load进去
③在ods层做的3件事情
①创建分区表,每天一个分区,每个分区的数据和数据导入hdfs的策略保持一致
②数据从hdfs原封不动到ods层,对数据进行备份
③采用lzo压缩,减少磁盘空间,压缩后数据从100g大概压缩到10g左右
对于日志数据,我是使用get_json_object对日志数据进行解析,将数据解析为:启动、页面、动作、曝光、错误数据
对于业务数据,从4个方面阐述
Dwd层采用了维度建模。标准的4步:
①选择业务过程
②声明粒度
③确定维度
④确定事实
因为我们是中小型公司,所以我们把后台的50多张表全部拿过来,所有的业务我们都要了,声明粒度,我们是声明最小的粒度,一行信息代表一次下单,一次支付、一次收藏,并没有对它做任何的聚合操作,确定维度,后面通过画矩形图方式确定和事实相关的维度,最后是确定事实,就是确定事实表的度量值,有次数、件数、金额、个数。以订单为例,最关心的就是下单数量和下单金额。
业务数据我们分为维度表和事实表。
对于维度表来说,根据维度退化理论创建,我们当时定了6个维度,用户、地区、时间、商品、活动和优惠券维度,从这6个维度进行指标统计,我从三个方面介绍维度表:
1、维度表分类
根据数据的特点,将维度表分为全量表、特殊表和拉链表三种。
全量表:数据量不大的维度表,采用全量表,如商品维度,活动维度等
特殊表:数据默认不会变化,如地区和时间维度表
拉链表:数据会发生变化,但是大部分是不变的表,采用拉链表。如用户表维度表。
2、表的字段
维度表的字段:从ods层找到这个维度相关的表,字段全部取过来,如和商品维度相关的有6张表:sku、spu、一、二、三级表、品类表。
3、表的维护
讲一下拉链表构建过程,我们是将用户表做成了拉链表
创建拉链表步骤是【画图说明】:
第一步:初始化用户表:取ods层用户表中数据,增加起始时间和结束时间字段,将起始时间设置为当天导表的时间,结束时间设置为很大的值,我们设置为9999-01-01,我们暂时称为旧表
第二步:处理新数据:后续导表时,取出ods层当天分区的数据,增加起始时间和结束时间,起始时间修改为今天,结束时间修改为9999-01-01,形成一张新表
第三步:修改旧表数据:初始表 left join ods_user_info表第二天的数据,当ods_user_info的id不为null且初始表的end_date为9999-01-01的数据,将end_date改为昨天,其余字段全部使用旧表的数据
第四步:合并数据,旧表和新表进行unoin all
也从3个方面阐述
1、表的分类
事实表都是分区表,根据数据特点,我们将事实表分为三种:
a、事务型事实表:只有新增数据没有变化数据建事务型事实表,每个分区保存当天新增的数据,如支付事实表、退款事实表
b、周期性快照事实表:对于我们只关心当天最后的数据,不关心中间变化过程的表,每个分区保存当天所有的数据,如收藏表、加购物车表
c、累计型快照事实表:对于表中的一行数据一次性写不完的表,每个分区保存当天新增的数据,当天变化的数据,去到原来分区进行数据修改,如优惠券领用表
2、表的字段
表的字段包含三个部分:维度外键、度量值和一些冗余字段,获取字段的步骤:
第一步:画矩阵图,找到和事实表相关的维度,如和订单事实表相关的维度有:时间、地区、用户、活动
第二步:从ods层找到和这个事实相关的表
第三步:取步骤2中表所有的度量值,以及冗余字段和维度外键作为事实表的字段。
3、表的维护
维护表数据的方式也比较简单,简单说一下稍微复杂一点的累积型事实表中订单表的维护思路:
画图说明
相关表:订单状态表、活动订单表、订单表。
第一步:订单状态表:按订单进行分组,然后使用str_map + concat_ws + collect_set,将多行数据转换为一行map数据,然后和订单表join,再和订单活动进行left join。–新表
第二步:从dwd层取出旧表中分区等于变化数据的创建时间的数据–旧表
第三步:旧表与新表进行full join,新表有数据,就使用新表数据,否则使用旧表数据
第四步:最后采用动态分区的方式对原有的分区数据进行覆盖写。
①数据清洗:过滤重复数据和关键字段空值数据,脏数据一般控制在万分之一,如果遇到了脏数据过多,会和前端沟通。
②采用lzo压缩,减少磁盘空间
③采用了列式存储:提高查询的速度
之后来到dws层,这里是宽表,是站在维度的角度看事实的度量值,统计当天的一些指标,如会员主题,统计用户当天下单次数、下单金额、支付次数、支付金额等
我们分为5个主题,设备主题、会员主题、商品主题、活动主题和地区主题。
①设备主题:
统计设备id活跃次数
②会员主题:
user_id登入次数加入购物车次数下单次数下单金额支付金额支付次数
③商品主题表
商品id被下单次数被下单件数被下单金额被支付次数被支付件数被支付金额被退款次数被退款件数被退款金额被加购物车次数被收藏次数好评数默认评价数差评数
④活动主题表
活动id开始时间结束时间创建时间曝光次数下单次数下单金额支付次数支付金额
⑤地区主题表
地区id地区名称省份名称省份id活跃设备数下单次数下单金额支付次数支付金额 1.4.4 DWT层
之后来到dwt层,也是站在维度的角度看事实,这次是看事实的开始时间、结束时间、度量值的累积值和一段时间内的累积值。也是5个主题
①设备主题
设备id首次活跃时间末次活跃时间当天活跃次数累积活跃次数
②会员主题
user_id首次登入时间末次登入时间累积登入天数最近30天登入天数首次下单时间末次下单时间累积下单次数累积下单金额最近30天下单次数最近30天下单金额首次支付时间末次支付时间累积支付金额累积支付次数最近30天支付次数最近30天的支付金额
③ 商品主题
sku_idSpu_id最近30天被下单次数最近30天被下单件数最近30天被下单金额累积下单次数累积下单件数累积下单金额最近30天被支付次数最近30天被支付件数最近30天被支付金额累积支付次数累积支付件数累积支付金额最近30天被退款次数最近30天被退款件数最近30天被退款金额累积退款次数累积退款件数累积退款金额最近30天被加购物车次数累积被加购物的次数最近30天被加收藏的次数累积被加收藏的次数最近30天好评数最近30天中评数最近30天差评数最近30天默认的评价数累积好评数累积中评数累积差评数累积默认评价数
④活动主题
活动编号活动开始时间活动结束时间活动创建时间当日被曝光次数累积曝光次数当日下单金额当日下单次数累积下单金额累积下单次数当日支付次数当日支付金额累积支付次数累积支付金额
⑤地区主题
省份id省份名称地区id地区名称当天活跃设备最近30天活跃设备当天下单次数当天下单金额最近30天下单次数最近30天下单金额当天支付次数当天支付金额最近30天支付次数最近30天支付金额 1.4.5 ADS层
指标分析,大概100多个指标。
①设备相关指标
1.日活、周活和月活
(从dwt层获取最后的登入时间在今天、这周、这个月)
2.每日新增设备(首次登入时间是今天)
3.统计1日、2日和3日留存率
从dwt层获取数据。
第一步:统计当天所有的活跃用户
第二步:统计昨天的1日留存率,求出昨天的新用户但是今天上线的用户/昨天的新用户
第三步:统计前天的2日留存率,求出前天的新用户但是今天上线的用户/前天的新用户
4.沉默用户:只在安装当天启动过,且启动时间是在7天前
(统计首次活跃时间 = 最后末次活跃时间,且最后活跃时间在7天前的用户)
5.本周回流用户数:上周未活跃,本周活跃的设备,且不是本周新增设备
第一步:获取本周活跃的用户且不是本周新增的用户
第二步:获取上周的活跃的用户
第三步:第一步获取的用户减少第二步获取的用户就是本周回流的用户
6.流失用户:最近7天未活跃的设备(获取最近活跃时间小于7天的用户)
7.最近连续三周活跃的用户数
第一步: 从dws层获取前一周、前两周以及当前周的所有活跃的用户
第二步: 然后进行内连接,能连接上的,则说明这连续的3周都活跃了,最后按照用户进行分组去重后求count。
8.最近7天连续3天活跃
第一步:从dws层获取最近7天的数据,对数据按照用户分组进行开窗,按照活跃时间进行降序排序
第二步:使用活跃时间减去排名,获取一列
第三步:按照用户和第三步的列进行分组,求count(*) >= 3的用户
第四步:分组求和
第五步:求7天内连续3天的活跃用户
②会员主题
1.会员主题分析:
1、总付费会员数:指付费的会员数量
2、会员活跃率 = 今天会员活跃数量 / 总的会员数量
3、会员付费率 = 今天会员付费人数 / 总的会员数量
4、会员新鲜度 = 今天新增会员数量 / 今天活跃的会员数量
2、漏斗分析;浏览页面 -> 进入详情的页面 -> 加入购物车 -> 下单 -> 支付
③商品主题
1.商品个数排名
2.商品销量排名
3.商品收藏排名
4.商品加入购物车排名
5.商品近30天退款率排名
6.商品差评率
④营销主题
1.下单数目统计:单日下单笔数、金额和用户数
2.支付信息统计:单日支付笔数、金额、人数、商品数和下单到支付的平均时长
3.品牌复购率
⑤地区主题
1.地区主题信息:当天活跃设备数,下单次数、金额,支付次数和金额。
留转G复活
留存率计算:会算
转换率的计算:
GMV计算:会计算
复购率:会计算
日活:会计算
高消费的用户->统计高消费用户的前10个商品品类,然后推送对应的优惠力度
生日标签->提前一周触发优惠券的发放,进行引流
优惠券偏好->统计优惠券出现的次数的排名,确定哪一类优惠券用户比较喜欢,然后这个类活动可以经常做。
==导出 ==
后面我们通过sqoop将计算的结果导入到mysql中,在导出的过程也遇到了数据不准确的问题,因为sqoop底层是4个map,有可能出现一半成功,一般失败,这样在查询结果的时候,和实际有偏差,我们通过增加–stage-table参数,先将数据导入到一张临时表,之后通过事务的方式导入到mysql中,这样就要么成功要么失败。
可视化展示
之后我们的数据直接对接superset,使用superset做可视化,免费开源的,用起来效果非常棒。
即席查询:
同时数仓当中还采用了各种即席查询,像presto,还装了kylin,kylin主要用于多维分析,主要用于dwd层进行分析,presto主要是针对ads层快速出指标,产品经理让我统计ads层的某一个指标,一般我用presto可以直接得出指标,因为它是完全基于内存的,速度比较快。
调度
最后我们使用azkaban作为全流程调度,每天晚上凌晨30分开始统一调度,业务数据使用sqoop将mysql数据导入hdfs,日志数据通过flume-kafka-flume,导入到hdfs,然后将hdfs的数据load到hive中。我们指标有100多个,搞活动的时候能达到200多个,数据量还是比较大的,凌晨开始跑,如果跑挂了,我们还配置了邮件报警,电话报警,我们继承了第三方工具,onealert来打电话。
以上是我做的离线数仓项目。
1.5 实时项目 下面介绍一下我的实时项目。
分4个部分讲述我的实时项目:
1.实时项目的框架
2.具体的指标及实现的方式
3.遇到的问题
4.优化
log数据:flume -> kafka -> sparkStreaming,日志数据通过flume采集到kafka两个topic中,start_topic和event_topic,然后SparkStreaming根据需求来读取数据。
业务数据:MySQL -> Kafka -> SparkStreaming。使用cannl实时监控mysql的变化的数据,并对数据进行解析,根据解析结果数据,发送到kafka不同的topic中,然后SparkStreaming来读取数据。
简单介绍一下cannl
我使用cannl实时监控mysql中表变化的数据,并发送到kafka中。
1.实现原理的原理是:
①canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
② mysql master收到dump请求,开始推送 binary log 给 slave(也就是 canal)
③ canal 解析 binary log 对象
Mysql Binarylog一共有三级别:
①statement:binlog 会记录每次执行的写操作的语句,数据量少,但是对于一些随机数函数和自定义函数,slave在重新执行语句时,有可能会出现执行返回结果和主不同
②row:binlog 会记录每次操作后每行记录的变化,保证数据的绝对安全,但是数据量大,因为一条sql语句可能改变多条数据。
③mixed:statement的升级版,对于可能导致数据不同的语句,采用row方式,其他的语句采用statement方式。
由于我是使用cannl监控数据的变化,采用的是row级别。 1.5.2 实时指标
每日订单量实时统计一小时内日活实时统计一 每日日活实时统计小时内订单数实时统计一小时内交易额实时统计一小时内广告点击实时统计一小时内区域订单数统计一小时内区域订单额统计一小时内各品类销售top3商品统计用户购买明细灵活分析(根据区域,性别,品类等) 1.5.3 实现方式
前提:
1.使用cannel采集mysql中指定数库下所有的表,然后对数据进行解析,不同表的数据写到不同topic中。
2.手动维护offset,创建ssc时,从mysql读取offset,如果offset有数据,则从获取的offset位置开始读取数据,否则从头earlist开始读数据,数据操作完成以后,并通过ssc获取offset,并维护到mysql中。
每日订单量和每小时实时统计
定义:采集周期为3s,使用sparkStreaming采集数据以后发送到hbase中
实现:
第一步:采集数据
第二步:将数据封装成一个样例类,在样例类内部添加日期和小时两个字段,并根据数据创建时间进行转换
第三步:将数据写入hbase
购物券风险预警分析:同一个设备id中,更换3个以上的用户领取优惠券
定义:每间隔6秒计算5分钟内领取优惠券的数量>3,且没有做其他任何操作的用户,同一设备id一分钟内预警一次。处理逻辑:在spark中只是获取数据,并进行初步的处理,然后将数据写入hbase中。
实现逻辑:窗口:5min,步长为6s
第一步,从kafka中获取event_topic数据
第二步:格式转换成:mid,数据,并按照mid进行分组
第三步:转换处理,对value进行判断
A、EventList集合,用来同一设备5分钟内所有事件id
B、ItemSet集合,存放优惠券与商品对应的数据
C、UidSet集合,存放领取过优惠券的用户信息
设置一个flag,一旦用户有点击商品的行为,该条数据就结束
①将数据写到EventList集合中
对事件id进行匹配
如果等于coupon,将领取优惠券的useid加到UidSet,同时将event的item加入到itemSet中
如果等于click,flag=false则break
等于其他,不做任何处理。
对同一个用户来说,如果所有的value所有的数据遍历完成后,
将数据写出
(!isClickItem && uidSet.size() >= 3, alertInfo(mid, uidSet, itemSet, eventList, System.currentTimeMillis()))}
②过滤数据:保留没有点击过商品的用户
③转换,只要value
④将数据写入到中,在es中进行查询
用户购买明细分析:
定义:将用户和订单表和订单详情表三个表的数据进行关联起来,一个订单对应多条订单详情表
实现逻辑:订单表和订单详情表使用双流join,借用redis进行缓存,用户表的数据去mysql中查询以后添加进来
具体逻辑:
①手动维护offset:使用cannl监控mysql数据库,不同的表发送到不同的topic中,从mysql中获取两个topic的offset,然后创建两个流。
②双流join。
订单数据和订单详情数据维护到redis的数据格式:
订单数据:
Key:order_info:订单id
Value:一条订单数据
订单详情数据:
Key:order_detail:order_id:sku_id
Value:一条订单详情数据
将两条流的数据进行数据转换成kv形式,key为订单id
A、双流fulljoin
B、(order_id,(订单数据,订单详情数据))
C、Mappartitions:
第一步:获取redis的连接
第二步:对数据进行模式匹配
(order_id,(some,some))
将订单信息缓存到redis中
将订单数据和订单详情数据进行合并,创建一个集合,用来接收合并后的数据,将合并后的数据加到Arraylist集合中,从通过订单id从redis中获取订单详情数据的数据,可能有多条
遍历获取的数据,
将数据封装成样例类,将获取的订单详情数据从redis中删除,将订单信息和数据进行合并,并添加到集合中
最后返回集合。
(order_id,(none,some))=>订单数据没有,有订单详情数据
第一步:从redis中获取订单缓存的数据,判断是否为空,如果不为空,先将数据进行封装成样例类,则进行合并,然后返回,如果为空,则将订单详情数据进行缓存。
③合并用户数据
读取mysql用户表的数据,并数据进行转换(user_id,user),
然后将第二步获取的数据也转换,(user_id,saledetail)
然后进行内连接,并进行格式转换,连接在一起,最后将数据写到hbase中。
3.Sparkstreaming实现精准一次性消费
1.从mysql中获取offset,将数据封装成map(new TopicPartition,offset)
2.根据获取的offset,如果获取的值为空,则从头消费,earlist,如果不为空,则从获取的offset位置开始消费数据
3.数据经过一些处理完成以后,通过kafka流获取offsetRanges,并遍历,最后将数据维护到mysql中。
4.去重
实时统计日活,使用redis进行去重,获取数据以后,将数据写到Redis中,redis中的数据类型是:dau+日期,value是set集合,存储mid,根据添加数据后的返回值,如果返回1,则添加成功,这条数据就要,如果是返回0,则这条数据过滤。将去重以后的数据写到Phoenix中,然后天和小时的日活都有了。
5.Oom,数据倾斜问题
Executor内存:
主要是shuffle阶段:read shuffle 和write shuffle,在shuffle算子的地方会这种情况。
情况1:shuffle阶段的reduce端的缓冲区过大,导致生产的大量的对象导致oom,调小一些缓冲区的大小
情况2:数据倾斜,单个key的数据量太多,使用随机数打散,进行二次聚合
情况3:内存就是不够大,增大内存
情况4:join的时候,采用广播变量的方式,避免shuffle
情况5:增加reduce的并行度
Driver内存:
情况1:当数据从executor使用collect拉取到drive端时,driver的内存不够用,可以增加内存
情况2:在driver创建大集合,导致数据内存不足,可以考虑在executor端加载
数据倾斜出现在连个地方,一是shuffle阶段,二是map阶段,数据量太大导致。
6.Join两个大表优化
a先过滤,看数据量大小,可以考虑广播
b使用reducebykey、mappartition、增加reduce的数量
7.常规的一些优化手段
Mappartition优化和mysql的连接
foreachRDD优化和redis的连接
使用mappartition代替map…
8.sparkstreaming写到hdfs有小文件怎么办
方案1:Kafka -> sparkStreaming -> kafka -> flume -> hdfs
方案2:可以使用结构化流实现向hdfs文件中追加数据
方案3:扩大采集周期 + 使用coalesce
分析完之后,数据我们是灌倒hbase和es中,hebase一般我们存储的是明细数据,es一般是监控数据,异常数据,因为可以直接通过kibana展示在大屏上,hbase存储的是明细数据,可以通过Phoenix,对外暴露接口,让web项目自己进行查询,主要是运营人员通过他读取数据,这个过程中我们还用到redis,用它去重,如果数据量大,用redis去重
离线指标:
留转G复活,topn、热门商品、退款率、日活、周活、月活、日新增流量、
1.流量类指标相关:
Uv(独立访问客户)和pv(页面访问数),
页面跳转率
新增用户数量(日、周、月)
留存率(统计1/2/3日留存率)
7天内连续登录3天的下单、退款、支付
页面平均访问的时长
2.交易相关:按地区划分GMV和下单量(当天、近30天)
3.活动推广相关:活动曝光次数、当天下单、支付次数金额以及累计的下单次数和支付次数,用来判断一个活动的推广情况。
4.商品类相关:topn、哪个商品买的最好、复购率、退款率、评论
5.购物车相关:加购次数、
6.下单相关:笔数、金额、用户数(当天和累积30天)
7.支付相关:笔数、金额、用户数(当天和累积30天)
1、交易:终极目标
GMV 和订单量(GMV:订单金额)
指标的作用:用来判断交易结果的好坏
统计方式:从dws层获取
select '2020-06-14', sum(order_count),//订单数量 sum(order_amount),//订单金额 sum(if(order_count>0,1,0))//下单用户量from dws_user_action_daycountwhere dt='2020-06-14';
转化率(转化率 = 引入订单量 / 流量)指标的作用:漏斗分析,统计浏览页面 -> 进入详情的页面 -> 加入购物车 -> 下单 -> 支付,
步骤
with tmp_action as ( select '2020-06-25' dt, sum(if(cart_count > 0,1,0)) cart_count,--加入购物车的人数 sum(if(order_count > 0,1,0)) order_count , --下单人数 sum(if(payment_count > 0,1,0)) payment_count --支付人数 from dws_user_action_daycount where dt ='2020-06-25'),tmp_page as (select '2020-06-25' dt , --统计日期 sum(if(array_contains(pages,'home'),1,0)) home_count, --浏览首页人数 sum(if(array_contains(pages,'good_detail'),1,0)) good_detail_count, --浏览商品详情页人数 sum(if(array_contains(pages,'good_detail'),1,0)) * 100 /sum(if(array_contains(pages,'home'),1,0)) home2good_detail_convert_ratio --首页到商品详情转化率from ( select mid_id, --对用户进行分组,过滤出今天进入首页和详情页的用户,获取当天用户的页面行为,去重后放到一个集合中 -- 那么一行数据有如下2种情况 -- 用户 page_id -- 243 ["good_detail","home"] -- 63 ["home"] collect_set(page_id) pages from dwd_page_log where dt = '2020-06-25' and page_id in ('home','good_detail') group by mid_id )tmp)insert into ads_user_action_convert_dayselect '2020-06-25' dt , --统计日期 home_count, --浏览首页人数 good_detail_count, --浏览商品详情页人数 home2good_detail_convert_ratio ,--首页到商品详情转化率 cart_count,--加入购物车的人数 cart_count *100/good_detail_count good_detail2cart_convert_ratio,--商品详情页到加入购物车转化率 order_count , --下单人数 order_count *100/cart_count cart2order_convert_ratio ,--加入购物车到下单转化率 payment_count, --支付人数 payment_count * 100 / order_count order2payment_convert_ratio --下单到支付的转化率from tmp_action join tmp_page on tmp_action.dt = tmp_page.dt
客单价(客单价 = GMV / 引入订单量)
它描述了每个订单的平均成交金额,具有比较强的品类特征,比如奢侈品类的客单价,天然是比消费品的客单价高的。同时,如果进行了拼单满减等运营策略,也能够刺激用户一单购买更多的商品,进而提升客单价。
UV 价值(UV 价值 = GMV / 流量)
它描述的是每个 UV 产出的平均金额,也能侧面看出流量的质量、流量与业务的匹配程度。试想一个页面,如果它的 UV 价值高,那么也就代表给它引入更多同类的流量,它就能创造更大的 GMV。因此 UV 价值也是一个很重要的指标,和转化率一起综合看,可以用来评估到底哪个业务 / 页面值得投入更多的流量。
思考:UV 价值和客单价有什么不同?1)影响因素不同:UV 价值更受流量质量的影响;而客单价更受卖的货的影响;2)使用场景不同:UV 价值可以用来评估页面 / 模块的创造价值的潜力;客单价可以用来比较品类和商品特征,但一个页面客单价高,并不代表它创造价值的能力强,只能得出这个页面的品类更趋近于是卖高价格品类的。
2、流量:决定成败
UV & PV(页面浏览人数、页面访问次数)
UV 描述了访问的人数,是一个很重要的数据指标,它的多少往往决定了最终GMV的高低。UV 源自各种途径,例如站外广告、站内的资源位分配、用户主动回访流量、社交裂变活动的分享引流等。
PV 描述了访问的次数,例如用户一天访问了这个页面3次,这时候会计算为 3 PV 和 1 UV。也就是说,PV 比 UV 多了某段时间内用户多次访问的信息。若要看页面的流量量级,无论看 UV 还是 PV 都是可以的。
人均浏览次数(人均浏览次数 = 页面访问次数 / 页面浏览人数)
这个指标描述了某段时间内,每个用户平均浏览页面的次数。不同的场景会有不同的值,需要根据具体的场景来判断高低。有些情况会出现 PV 高出 UV 很多的场景,如存在需要用户多次回访的玩法、有分时段运营的策略(e.g、一天三次红包雨)等等,需要具体场景具体分析。
3、行为:寻根溯源
点击率(点击率 = 模块点击人数 / 页面浏览人数)
用户对此模块的点击人数,在所有进入页面的流量中的百分比。可以看作用户对于模块的需求强烈程度的评判指标之一。与页面流量和页面 GMV的关系类似,模块的点击率与模块的产出是强相关的(如下图,横轴是各模块)。
点击率的影响因素有:1)模块在页面中的位置:若放得越高,则越可能被更多的用户看见,那么点击率高的可能性,就比放置位置低的模块要来得更高。毕竟页面越往下,看到的用户就更少了。2)模块本身的吸引程度:比如模块本身是个优惠券集合楼层,就比没有利益点的普通模块更吸引人、更容易获得更多点击。此外,模块的样式设计、主题表述的清晰与否、主题对用户的吸引力和潜在用户群大小,这些都会影响到模块的吸引力。
曝光点击率(曝光点击率 = 模块点击人数 / 模块曝光人数)用户对此模块的点击人数,在所有看到此模块的流量中的百分比。与点击率的公式对比可发现,点击率的分母是所有进入页面的流量,但用户的浏览行为永远是浏览得越深,流量越少的。这也就导致位置越深的模块算点击率就越吃亏,因为相当一部分流量压根就没有看到这个模块,也被算进分母里了。而曝光点击率,就是一个排除了页面位置对模块的影响后,可以用来相对公平地去比较各模块的吸引力的数据指标。
思考:什么场景用点击率,什么场景用曝光点击率呢?1)当想要单纯评估楼层对用户的吸引力时,可以看曝光点击率;2)当想要综合评估楼层的整体效果与贡献时,看点击率,毕竟它与楼层 GMV 相关性更高;3)曝光需要特殊埋点,且可能会影响页面性能,因此很多时候我们没有办法取到曝光数据,也只能看点击率了。
曝光点击率的使用注意:首屏内的楼层的曝光点击率,数据可能不准确。首屏的曝光UV是最大的,里面包含了各种异常情况,例如一进页面就跳出,也算作曝光。因此导致首屏的曝光点击率往往会偏小(如下图所示),无法与其他楼层比较。若想比较首屏情况,建议与点击率一起综合来看。
曝光率(曝光率 = 模块曝光人数 / 页面浏览人数)
这个数据可以看出用户在页面上的浏览深度如何,有百分之多少的用户看到了哪一屏。从这个数据中,我们可以发现一些关键的节点。例如,若我们的业务主推是在第二~三屏的位置,但最终发现曝光率在第二屏便暴跌,这便是存在问题的,说不定我们需要把主推内容再往上提一些,或者需要去排查首屏是否有会令用户立即跳转和跳出的内容……这便是曝光率这个数据指标,可以带来的分析价值。
停留时长这个数据指标很好理解,是描述用户在页面上平均停留多少秒。
思考:曝光率下跌曲线越慢 / 浏览深度越深 / 停留时长越长,就代表我们的页面做得越好吗?
曝光率和停留时长的影响因素比较一致,因此可以合在一起解释。曝光率的下降曲线、停留时长的长与短,影响因素有这些:
1)人的生理极限:人不是机器,根据研究,“人不受干扰地执行单一操作的时长为 6s ~ 30s ”[注1],超过这一常数,用户就会走神。可想而知,用户在单一页面上停留的时间是有上限的,不因页面放置入的内容多少而变化。一个反例,是通过利益点来吸引用户在页面上浏览得更深,这不但与生理极限相悖,也把用户自然的浏览行为和目标,硬生生变成了为了追寻更多利益点而进行类似完成任务的操作。除了用利益点交换一个好看的数据以外,这样的做法似乎没能带来更多的产出。
2)页面定位及内容:在双11主会场中,用户的行为模式趋近找优惠和找目标品类,那么他可能不会在这里浏览太多屏数、也不会停留太久——这个时候影响曝光率和停留时长的,就是他有多快能找到感兴趣的优惠,因此,并不能说浏览深度越深、停留时长越长就越好;在 BI(千人千面)商品瀑布流中,用户的行为是闲逛和挑选,这时候他更可能浏览更多的屏数、停留更长时间——因此浏览的商品越多,可以说是对最终效益最好的。
3)异常情况:例如加载异常、页面崩溃的场景,就会导致停留时长异常低、二屏后曝光异常低。
综上,我们应该根据具体的场景、通过数次历史数据的对比,去设定和校正目标曝光率、目标停留时长。平日看这两个数据,可以当做一个监测异常的数据,在正常范围内的波动不需要过度解读,一旦发现特别异常的情况,再进行具体的分析。
自己提出的指标:对营销、对运行都非常有价值的指标
指标1:寻找潜在的vip用户
1.准备三个具体的指标。比较难,又有对运营,营销又非常有价值的。帮助他们做了什么事,讲讲怎么做的
寻找潜在VIP:
1.上一周连续3天登录,且上周内下过一单的
先过滤取出上周内下过一单,又是非vip的人。(从订单明细表)
再根据他们每日的最早启动时间,用rank窗口函数进行排序。那么排序的这个字段就应该是以1为公差的等差数列(left join 用户活跃表日)
然后再用date-sub去将启动日期与rank计算,得到了日期差值,根据这个日期差值进行分组,计算这个差有几个。
就是我们所需要的用户。
找出来之后,给她短信,后台消息推送优惠券。减税,免邮,享受会员价等活动。
2.过去一个月内下单商品大于8件,且下单次数大于2
使用用户订单详情表:取出过去一个月的非vip用户购买详情。
计算每个用户的下单商品数,下单次数>2 (group by userID,sum(购买件数),count(distinct 订单号)》2)
推送消息,给免费vip活动体验
这部分的用户在接下来的三个月时间里,真正转换成vip的有35%的人,所以这个指标还挺有意义的
商品季度/半年复购率(购买过这个商品两次以上的用户数/这个季度购买这种商品的总人数):
3.用户购买明细表。
把上个季度的用户购买详情表过滤出来。group by 用户id 商品id分组,求出用户对于某个商品下单的总次数。
然后用sum if(判断订单单数>2),订单单数>1的人数,求比率,
然后对比率根据品类排名,求每个品类中 比率排名前十的。用row_number<11.分区取品类,排序取复购率。
这些商品,是我们的重要维系的商品,要及时补货。然后复购率高说明,受用户喜欢,可以推荐,给用户发送小样,尝试,增大转化率。
4.品牌复购率:
差不多,把具体商品,改成品牌id。各类商品下的品牌复购率(每月来算)
5.每周各品类热门商品销量前三(取每周各热门品类,然后取用户行为宽表的几个字段,热门品类,用户id,商品id。然后用热门品类过滤。得到属于热门品类的数据,再根据热门品类,商品id,去聚合。去前三。)
6.各区域热门商品销量前五:取用户行为宽表,然后得到里面的数据,可以转化成样例类的rdd。然后根据区域分组,然后求商品销量,前五的。
7.各品类中销量前三的品牌
8.购物车各品类占比:以品牌为key,数量为value。从购物车宽表中获取数据。然后根据品牌分类,求总数。(说明大家想买的东西,便于后期铺货。
数据健康问题:
物流信息:有的客户物流信息上显示收到货了,但是快递可能没有送到他手里,然后过程中有丢失的情况。那么我们的物流计算时长,如果单纯按照物流信息来就会出现偏差,所以我们物流到货时间都是以用户,确认收货为准。也不会差很大。
用户的隐私信息,电话号码:我们使用自己的一套脱敏技术,将每个电话号码的4-11位,加1,然后4-7位与8-11位顺序调换。后期我们需要用到他们的隐私信息,电话进行,营销,发送消息是,就把他转换过来。
数据倾斜问题:
1.用时间维度表去join过去一整年的用户购买明细表,查看,用户集中购买的月份和季节。分析用户的行为。之前不是默认的。(默认开启mapJoin嘛)
2.小表join大表的问题。后面这个优化了,但是小表不能超过512M.我们数据量没那么大,应该是可以的。
比如说算品类销售排名的时候,group by 品类,求销售总量是,某一品类像面膜,可能销售量特别大,占60%多,那么有一个任务就会执行特别久。半天出不来。设置推测执行也差不多,就应该是数据倾斜导致的问题
Map端部分聚合
这里需要修改的参数为:
hive.map.aggr=true(用于设定是否在 map 端进行聚合,默认值为真) hive.groupby.mapaggr.checkinterval=100000(用于设定 map 端进行聚合操作的条目数)
有数据倾斜时进行负载均衡
此处需要设定 hive.groupby.skewindata,当选项设定为 true 是,生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。