设计思路
首先明确DWS层的定位:
轻度聚合,DWS层要应对很多实时查询,大而全的明细查询的压力将会很大
将更多的实时数据以主题的方式组合起来方便管理,同时也减少了维度查询的次数
其实设计一张DWS层的表就两件事:维度和度量(事实数据)
维度包括分析中的一些重要字段:渠道,地区,版本,新老用户进行聚合
访问主题宽表计算
需求分析和思路
接受各个明细数据,转为数据流
把数据流合并在一起,成为相同格式对象的数据流
合并的数据流进行聚合(聚合的时间窗口决定了数据的时效性)
将聚合数据写入数据库中
功能实现
封装VisitorStatsApp,读取Kafka各个数据流
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.读取Kafka数据创建流 String groupId = "visitor_stats_app_210325"; String uniqueVisitSourceTopic = "dwm_unique_visit"; String userJumpDetailSourceTopic = "dwm_user_jump_detail"; String pageViewSourceTopic = "dwd_page_log"; DataStreamSource uvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(uniqueVisitSourceTopic, groupId)); DataStreamSource ujDS = env.addSource(MyKafkaUtil.getKafkaConsumer(userJumpDetailSourceTopic, groupId)); DataStreamSource pvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(pageViewSourceTopic, groupId)); //TODO 3.将每个流处理成相同的数据类型 //3.1 处理UV数据 SingleOutputStreamOperator visitorStatsWithUvDS = uvDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); //提取公共字段 JSONObject common = jsonObject.getJSONObject("common"); return new VisitorStats("", "", common.getString("vc"), common.getString("ch"), common.getString("ar"), common.getString("is_new"), 1L, 0L, 0L, 0L, 0L, jsonObject.getLong("ts")); }); //3.2 处理UJ数据 SingleOutputStreamOperator visitorStatsWithUjDS = ujDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); //提取公共字段 JSONObject common = jsonObject.getJSONObject("common"); return new VisitorStats("", "", common.getString("vc"), common.getString("ch"), common.getString("ar"), common.getString("is_new"), 0L, 0L, 0L, 1L, 0L, jsonObject.getLong("ts")); }); //3.3 处理PV数据 SingleOutputStreamOperator visitorStatsWithPvDS = pvDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); //获取公共字段 JSONObject common = jsonObject.getJSONObject("common"); //获取页面信息 JSONObject page = jsonObject.getJSONObject("page"); String last_page_id = page.getString("last_page_id"); long sv = 0L; if (last_page_id == null || last_page_id.length() <= 0) { sv = 1L; } return new VisitorStats("", "", common.getString("vc"), common.getString("ch"), common.getString("ar"), common.getString("is_new"), 0L, 1L, sv, 0L, page.getLong("during_time"), jsonObject.getLong("ts")); }); //TODO 4.Union几个流 DataStream unionDS = visitorStatsWithUvDS.union( visitorStatsWithUjDS, visitorStatsWithPvDS); //TODO 5.提取时间戳生成WaterMark SingleOutputStreamOperator visitorStatsWithWMDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(11)) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(VisitorStats element, long recordTimestamp) { return element.getTs(); } })); //TODO 6.按照维度信息分组 KeyedStream> keyedStream = visitorStatsWithWMDS.keyBy(new KeySelector>() { @Override public scala.Tuple4 getKey(VisitorStats value) throws Exception { return new scala.Tuple4( value.getAr(), value.getCh(), value.getIs_new(), value.getVc()); } }); //TODO 7.开窗聚合 10s的滚动窗口 WindowedStream, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10))); SingleOutputStreamOperator result = windowedStream.reduce(new ReduceFunction() { @Override public VisitorStats reduce(VisitorStats value1, VisitorStats value2) throws Exception { value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct()); value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct()); value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct()); value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct()); value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum()); return value1; } }, new WindowFunction, TimeWindow>() { @Override public void apply(Tuple4 stringStringStringStringTuple4, TimeWindow window, Iterable input, Collector out) throws Exception { long start = window.getStart(); long end = window.getEnd(); VisitorStats visitorStats = input.iterator().next(); //补充窗口信息 visitorStats.setStt(DateTimeUtil.toYMDhms(new Date(start))); visitorStats.setEdt(DateTimeUtil.toYMDhms(new Date(end))); out.collect(visitorStats); } }); //TODO 8.将数据写入ClickHouse result.print(">>>>>>>>>>>"); result.addSink(ClickHouseUtil.getSink("insert into visitor_stats_2021 values(?,?,?,?,?,?,?,?,?,?,?,?)")); //TODO 9.启动任务 env.execute("VisitorStatsApp"); }
商品主题宽表计算
与访客dws的宽表类似,也是把多个事实表明细数据汇总起来组成宽表
需求分析和思路
从Kafka主题中获取数据流
把Json字符串数据流转为统一数据对象数据流
把统一的数据结构流合并为一个流
设定事件事件与水位线
分组,开窗,聚合
关联维度补充数据
写入ClickHouse
功能实现
封装商品统计实体类ProductStats
========略
创建ProductStatsApp,从Kafka主题中获取数据流
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.读取Kafka 7个主题的 数据创建流 String groupId = "product_stats_app"; String pageViewSourceTopic = "dwd_page_log"; String orderWideSourceTopic = "dwm_order_wide"; String paymentWideSourceTopic = "dwm_payment_wide"; String cartInfoSourceTopic = "dwd_cart_info"; String favorInfoSourceTopic = "dwd_favor_info"; String refundInfoSourceTopic = "dwd_order_refund_info"; String commentInfoSourceTopic = "dwd_comment_info"; DataStreamSource pvDS = env.addSource(MyKafkaUtil.getKafkaConsumer(pageViewSourceTopic, groupId)); DataStreamSource favorDS = env.addSource(MyKafkaUtil.getKafkaConsumer(favorInfoSourceTopic, groupId)); DataStreamSource cartDS = env.addSource(MyKafkaUtil.getKafkaConsumer(cartInfoSourceTopic, groupId)); DataStreamSource orderDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderWideSourceTopic, groupId)); DataStreamSource payDS = env.addSource(MyKafkaUtil.getKafkaConsumer(paymentWideSourceTopic, groupId)); DataStreamSource refundDS = env.addSource(MyKafkaUtil.getKafkaConsumer(refundInfoSourceTopic, groupId)); DataStreamSource commentDS = env.addSource(MyKafkaUtil.getKafkaConsumer(commentInfoSourceTopic, groupId)); //TODO 3.将7个流统一数据格式 SingleOutputStreamOperator productStatsWithClickAndDisplayDS = pvDS.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { //将数据转换为JSON对象 JSONObject jsonObject = JSON.parseObject(value); //取出page信息 JSONObject page = jsonObject.getJSONObject("page"); String pageId = page.getString("page_id"); Long ts = jsonObject.getLong("ts"); if ("good_detail".equals(pageId) && "sku_id".equals(page.getString("item_type"))) { out.collect(ProductStats.builder() .sku_id(page.getLong("item")) .click_ct(1L) .ts(ts) .build()); } //尝试取出曝光数据 JSONArray displays = jsonObject.getJSONArray("displays"); if (displays != null && displays.size() > 0) { for (int i = 0; i < displays.size(); i++) { //取出单条曝光数据 JSONObject display = displays.getJSONObject(i); if ("sku_id".equals(display.getString("item_type"))) { out.collect(ProductStats.builder() .sku_id(display.getLong("item")) .display_ct(1L) .ts(ts) .build()); } } } } }); SingleOutputStreamOperator productStatsWithFavorDS = favorDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); return ProductStats.builder() .sku_id(jsonObject.getLong("sku_id")) .favor_ct(1L) .ts(DateTimeUtil.toTs(jsonObject.getString("create_time"))) .build(); }); SingleOutputStreamOperator productStatsWithCartDS = cartDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); return ProductStats.builder() .sku_id(jsonObject.getLong("sku_id")) .cart_ct(1L) .ts(DateTimeUtil.toTs(jsonObject.getString("create_time"))) .build(); }); SingleOutputStreamOperator productStatsWithOrderDS = orderDS.map(line -> { OrderWide orderWide = JSON.parseObject(line, OrderWide.class); HashSet orderIds = new HashSet<>(); orderIds.add(orderWide.getOrder_id()); return ProductStats.builder() .sku_id(orderWide.getSku_id()) .order_sku_num(orderWide.getSku_num()) .order_amount(orderWide.getSplit_total_amount()) .orderIdSet(orderIds) .ts(DateTimeUtil.toTs(orderWide.getCreate_time())) .build(); }); SingleOutputStreamOperator productStatsWithPaymentDS = payDS.map(line -> { PaymentWide paymentWide = JSON.parseObject(line, PaymentWide.class); HashSet orderIds = new HashSet<>(); orderIds.add(paymentWide.getOrder_id()); return ProductStats.builder() .sku_id(paymentWide.getSku_id()) .payment_amount(paymentWide.getSplit_total_amount()) .paidOrderIdSet(orderIds) .ts(DateTimeUtil.toTs(paymentWide.getPayment_create_time())) .build(); }); SingleOutputStreamOperator productStatsWithRefundDS = refundDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); HashSet orderIds = new HashSet<>(); orderIds.add(jsonObject.getLong("order_id")); return ProductStats.builder() .sku_id(jsonObject.getLong("sku_id")) .refund_amount(jsonObject.getBigDecimal("refund_amount")) .refundOrderIdSet(orderIds) .ts(DateTimeUtil.toTs(jsonObject.getString("create_time"))) .build(); }); SingleOutputStreamOperator productStatsWithCommentDS = commentDS.map(line -> { JSONObject jsonObject = JSON.parseObject(line); String appraise = jsonObject.getString("appraise"); long goodCt = 0L; if (GmallConstant.APPRAISE_GOOD.equals(appraise)) { goodCt = 1L; } return ProductStats.builder() .sku_id(jsonObject.getLong("sku_id")) .comment_ct(1L) .good_comment_ct(goodCt) .ts(DateTimeUtil.toTs(jsonObject.getString("create_time"))) .build(); }); //TODO 4.Union 7个流 DataStream unionDS = productStatsWithClickAndDisplayDS.union( productStatsWithFavorDS, productStatsWithCartDS, productStatsWithOrderDS, productStatsWithPaymentDS, productStatsWithRefundDS, productStatsWithCommentDS); //TODO 5.提取时间戳生成WaterMark SingleOutputStreamOperator productStatsWithWMDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(ProductStats element, long recordTimestamp) { return element.getTs(); } })); //TODO 6.分组、开窗、聚合 按照sku_id分组,10秒的滚动窗口,结合增量聚合(累加值)和全量聚合(提取窗口信息) SingleOutputStreamOperator reduceDS = productStatsWithWMDS.keyBy(ProductStats::getSku_id) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction() { @Override public ProductStats reduce(ProductStats stats1, ProductStats stats2) throws Exception { stats1.setDisplay_ct(stats1.getDisplay_ct() + stats2.getDisplay_ct()); stats1.setClick_ct(stats1.getClick_ct() + stats2.getClick_ct()); stats1.setCart_ct(stats1.getCart_ct() + stats2.getCart_ct()); stats1.setFavor_ct(stats1.getFavor_ct() + stats2.getFavor_ct()); stats1.setOrder_amount(stats1.getOrder_amount().add(stats2.getOrder_amount())); stats1.getOrderIdSet().addAll(stats2.getOrderIdSet()); //stats1.setOrder_ct(stats1.getOrderIdSet().size() + 0L); stats1.setOrder_sku_num(stats1.getOrder_sku_num() + stats2.getOrder_sku_num()); stats1.setPayment_amount(stats1.getPayment_amount().add(stats2.getPayment_amount())); stats1.getRefundOrderIdSet().addAll(stats2.getRefundOrderIdSet()); //stats1.setRefund_order_ct(stats1.getRefundOrderIdSet().size() + 0L); stats1.setRefund_amount(stats1.getRefund_amount().add(stats2.getRefund_amount())); stats1.getPaidOrderIdSet().addAll(stats2.getPaidOrderIdSet()); //stats1.setPaid_order_ct(stats1.getPaidOrderIdSet().size() + 0L); stats1.setComment_ct(stats1.getComment_ct() + stats2.getComment_ct()); stats1.setGood_comment_ct(stats1.getGood_comment_ct() + stats2.getGood_comment_ct()); return stats1; } }, new WindowFunction() { @Override public void apply(Long aLong, TimeWindow window, Iterable input, Collector out) throws Exception { //取出数据 ProductStats productStats = input.iterator().next(); //设置窗口时间 productStats.setStt(DateTimeUtil.toYMDhms(new Date(window.getStart()))); productStats.setEdt(DateTimeUtil.toYMDhms(new Date(window.getEnd()))); //设置订单数量 productStats.setOrder_ct((long) productStats.getOrderIdSet().size()); productStats.setPaid_order_ct((long) productStats.getPaidOrderIdSet().size()); productStats.setRefund_order_ct((long) productStats.getRefundOrderIdSet().size()); //将数据写出 out.collect(productStats); } }); //TODO 7.关联维度信息 //7.1 关联SKU维度 SingleOutputStreamOperator productStatsWithSkuDS = AsyncDataStream.unorderedWait(reduceDS, new DimAsyncFunction("DIM_SKU_INFO") { @Override public String getKey(ProductStats productStats) { return productStats.getSku_id().toString(); } @Override public void join(ProductStats productStats, JSONObject dimInfo) throws ParseException { productStats.setSku_name(dimInfo.getString("SKU_NAME")); productStats.setSku_price(dimInfo.getBigDecimal("PRICE")); productStats.setSpu_id(dimInfo.getLong("SPU_ID")); productStats.setTm_id(dimInfo.getLong("TM_ID")); productStats.setCategory3_id(dimInfo.getLong("CATEGORY3_ID")); } }, 60, TimeUnit.SECONDS); //7.2 关联SPU维度 SingleOutputStreamOperator productStatsWithSpuDS = AsyncDataStream.unorderedWait(productStatsWithSkuDS, new DimAsyncFunction("DIM_SPU_INFO") { @Override public void join(ProductStats productStats, JSONObject jsonObject) throws ParseException { productStats.setSpu_name(jsonObject.getString("SPU_NAME")); } @Override public String getKey(ProductStats productStats) { return String.valueOf(productStats.getSpu_id()); } }, 60, TimeUnit.SECONDS); //7.3 关联Category维度 SingleOutputStreamOperator productStatsWithCategory3DS = AsyncDataStream.unorderedWait(productStatsWithSpuDS, new DimAsyncFunction("DIM_base_CATEGORY3") { @Override public void join(ProductStats productStats, JSONObject jsonObject) throws ParseException { productStats.setCategory3_name(jsonObject.getString("NAME")); } @Override public String getKey(ProductStats productStats) { return String.valueOf(productStats.getCategory3_id()); } }, 60, TimeUnit.SECONDS); //7.4 关联TM维度 SingleOutputStreamOperator productStatsWithTmDS = AsyncDataStream.unorderedWait(productStatsWithCategory3DS, new DimAsyncFunction("DIM_base_TRADEMARK") { @Override public void join(ProductStats productStats, JSONObject jsonObject) throws ParseException { productStats.setTm_name(jsonObject.getString("TM_NAME")); } @Override public String getKey(ProductStats productStats) { return String.valueOf(productStats.getTm_id()); } }, 60, TimeUnit.SECONDS); //TODO 8.将数据写入ClickHouse productStatsWithTmDS.print(); productStatsWithTmDS.addSink(ClickHouseUtil.getSink("insert into table product_stats_2021 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")); //TODO 9.启动任务 env.execute("ProductStatsApp"); }
地区主题表
使用FlinkSQL
地区主题主要反映各个地区的销售情况,从业务逻辑上地区主题比起商品主题简单
需求分析与思路
定义Table流环境
把数据源定义为动态表
通过SQL查询出结果表
把结果表转换为数据流
把数据流写入目标数据库
如果是Flink 官方支持的数据库,也可以直接把目标数据表定义为动态表,用 insert into 写入。由于ClickHouse目前官方没有支持的jdbc连接器(目前支持Mysql、PostgreSQL、Derby)。也可以制作自定义 sink,实现官方不支持的连接器。但是比较繁琐。
功能实现
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //TODO 2.使用DDL创建表 提取时间戳生成WaterMark String groupId = "province_stats"; String orderWideTopic = "dwm_order_wide"; tableEnv.executeSql("CREATE TABLE order_wide ( " + " `province_id` BIGINT, " + " `province_name` STRING, " + " `province_area_code` STRING, " + " `province_iso_code` STRING, " + " `province_3166_2_code` STRING, " + " `order_id` BIGINT, " + " `split_total_amount` DECIMAL, " + " `create_time` STRING, " + " `rt` as TO_TIMESTAMP(create_time), " + " WATERMARK FOR rt AS rt - INTERVAL '1' SECOND ) with(" + MyKafkaUtil.getKafkaDDL(orderWideTopic, groupId) + ")"); //TODO 3.查询数据 分组、开窗、聚合 Table table = tableEnv.sqlQuery("select " + " DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') stt, " + " DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') edt, " + " province_id, " + " province_name, " + " province_area_code, " + " province_iso_code, " + " province_3166_2_code, " + " count(distinct order_id) order_count, " + " sum(split_total_amount) order_amount, " + " UNIX_TIMESTAMP()*1000 ts " + "from " + " order_wide " + "group by " + " province_id, " + " province_name, " + " province_area_code, " + " province_iso_code, " + " province_3166_2_code, " + " TUMBLE(rt, INTERVAL '10' SECOND)"); //TODO 4.将动态表转换为流 DataStream provinceStatsDataStream = tableEnv.toAppendStream(table, ProvinceStats.class); //TODO 5.打印数据并写入ClickHouse provinceStatsDataStream.print(); provinceStatsDataStream.addSink(ClickHouseUtil.getSink("insert into province_stats_2021 values(?,?,?,?,?,?,?,?,?,?)")); //TODO 6.启动任务 env.execute("ProvinceStatsSqlApp"); }
关键词主题表
需求分析和思路
关键词主题主要是可视化大屏上字符云的实现效果,用于感性的让大屏观看者感知目前的用户都更关心的那些商品和关键词,关键词的展示也是一种维度聚合的结果,根据聚合的大小来决定关键词的大小,关键词的第一重要来源的就是用户在搜索栏的搜索,另外就是从以商品为主题的统计中获取关键词
功能实现
使用到一个分词器 lk
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //1.1 设置CK&状态后端 //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck")); //env.enableCheckpointing(5000L); //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //env.getCheckpointConfig().setCheckpointTimeout(10000L); //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000); //env.setRestartStrategy(RestartStrategies.fixedDelayRestart()); //TODO 2.使用DDL方式读取Kafka数据创建表 String groupId = "keyword_stats_app"; String pageViewSourceTopic = "dwd_page_log"; tableEnv.executeSql("create table page_view( " + " `common` Map, " + " `page` Map, " + " `ts` BIGINT, " + " `rt` as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)), " + " WATERMARK FOR rt AS rt - INTERVAL '1' SECOND " + ") with (" + MyKafkaUtil.getKafkaDDL(pageViewSourceTopic, groupId) + ")"); //TODO 3.过滤数据 上一跳页面为"search" and 搜索词 is not null Table fullWordTable = tableEnv.sqlQuery("" + "select " + " page['item'] full_word, " + " rt " + "from " + " page_view " + "where " + " page['last_page_id']='search' and page['item'] is not null"); //TODO 4.注册UDTF,进行分词处理 tableEnv.createTemporarySystemFunction("split_words", SplitFunction.class); Table wordTable = tableEnv.sqlQuery("" + "SELECt " + " word, " + " rt " + "FROM " + " " + fullWordTable + ", LATERAL TABLE(split_words(full_word))"); //TODO 5.分组、开窗、聚合 Table resultTable = tableEnv.sqlQuery("" + "select " + " 'search' source, " + " DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') stt, " + " DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') edt, " + " word keyword, " + " count(*) ct, " + " UNIX_TIMESTAMP()*1000 ts " + "from " + wordTable + " " + "group by " + " word, " + " TUMBLE(rt, INTERVAL '10' SECOND)"); //TODO 6.将动态表转换为流 DataStream keywordStatsDataStream = tableEnv.toAppendStream(resultTable, KeywordStats.class); //TODO 7.将数据打印并写入ClickHouse keywordStatsDataStream.print(); keywordStatsDataStream.addSink(ClickHouseUtil.getSink("insert into keyword_stats_2021(keyword,ct,source,stt,edt,ts) values(?,?,?,?,?,?)")); //TODO 8.启动任务 env.execute("KeywordStatsApp"); }
DWS层总结
DWS层主要是基于DWD和DWM层的数据进行轻度聚合统计掌握union操作多流的合并掌握窗口聚合操作掌握对clickhouse数据库的写入操作掌握FlinkSQL实现业务掌握分词器的使用掌握在FlinkSQL中自定义函数的使用