欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink自定义周期水印,解决最后一个窗口因为缺少数据到达,不触发问题

时间:2023-04-20

1.自定义周期水印

public class MyWaterMark implements AssignerWithPeriodicWatermarks { private final long maxTimeLag = 3000; // 3 seconds @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getCreationTime(); } @Override public Watermark getCurrentWatermark() { // return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag); }}

2.使用自定义水印

val env = StreamExecutionEnvironment.getExecutionEnvironment//便于测试,并行度设置为1env.setParallelism(1)//env.getConfig.setAutoWatermarkInterval(9000)//设置为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置source 本地socketval text: DataStream[String] = env.socketTextStream("localhost", 9000)val lateText = new OutputTag[(String, String, Long, Long)]("late_data")val value = text.filter(new MyFilterNullOrWhitespace).flatMap(new MyFlatMap)//使用自定义水印.assignTimestampsAndWatermarks(new MyWaterMark()).map(x => (x.name, x.datetime, x.timestamp, 1L)).keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).sideOutputLateData(lateText)//.sum(2).apply(new MyWindow)//.window(TumblingEventTimeWindows.of(Time.seconds(3)))//.apply(new MyWindow)value.getSideOutput(lateText).map(x => {"延迟数据|name:" + x._1 + "|datetime:" + x._2}).print()value.print()env.execute("watermark test")

参考:flink Periodic Watermarks 自定义周期性水印 - 我是属车的 - 博客园 

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。