欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink空闲窗口-withIdleness

时间:2023-07-29
flink 空闲窗口

flink多并行时,如果有窗口中没数据,那么有数据的窗口即使watermark到达了触发边界,barren没对齐,窗口也不会触发计算。这样的空窗口即空闲窗口。可通过设置空闲时间(withIdleness)来使有数据的窗口进行触发。

parallellism:2

windowSize: 10s

forBoundOutofOrderness: 5s

withIdleness:10s

窗口数据触发范围为[n*size, (n+1)*size+5)

窗口数据有效范围为[n*size, (n+1)*size)

输入:

a 1a 10a 15-- 到达第一个窗口触发边界,因有两个task,另一个task中并无数据,所以不会正常触发a 20 第二个窗口task2 数据b 1-- 过期被丢弃b 19第二个窗口task1 数据b 25第二个窗口task1 数据,且到达触发边界,窗口触发

输出:

2> (a,1)======================= ^^^ window_num:0 ^^^ ========================================= range: [0___10000 )======================= key: a======================= watermark: 9999======================================================只有一个窗口中有数据,到达空闲时间10s后才会触发1> (b,19)2> (a,10)======================= ^^^ window_num:0 ^^^ ========================================= range: [10000___20000 )======================= key: b2> (side-a,10)======================= watermark: 19999======================================================2> (a,15)======================= ^^^ window_num:1 ^^^ ========================================= range: [10000___20000 )======================= key: a======================= watermark: 19999======================================================两个窗口中都有数据,当有一个窗口数到达触发时间戳数据两个窗口都会触发

demo:

//定义侧流标签 private static OutputTag> outputTag = new OutputTag>("dayle_data2"){}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource> dataStreams = env .addSource(new SourceFunction>() { Random random = new Random(); int tp2 = 0; int speed = 0; int f1 = -1; @Override public void run(SourceContext> ctx) throws Exception { while (true) { TimeUnit.SECONDS.sleep(1); tp2 = Math.abs(random.nextInt() % 7); f1 = Math.abs(++speed + tp2); ctx.collect(Tuple2.of("a", f1)); System.out.println("source generator :t" + f1); } } @Override public void cancel() { } }); //必须为匿名内部类 OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ; SingleOutputStreamOperator> socketTextStream = env .socketTextStream("localhost", 10086) //异常数据侧流输出 .process(new FlatMapProcess(erroOutputTag)); SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5); //获取测流 socketTextStream.getSideOutput(erroOutputTag).print(); tuple2Stream.getSideOutput(outputTag).print(); //正常处理的主流 tuple2Stream.print(); env.execute("event time process "); } private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) { return dataStreams .assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime)) .withTimestampAssigner((e, t) -> e.f1*1000) //多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。 // 空窗口将导致下游算子都无法进行计算, // 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s) // 窗口就会触发 .withIdleness(Duration.ofSeconds(10)) ) .keyBy(e -> e.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) //允许上个窗口数据迟到时间// .allowedLateness(Time.seconds(5)) .process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() { int i; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); i = 0; } @Override public synchronized void process(String s, Context context, Iterable> elements, Collector> out) throws Exception { Iterator> iterator = elements.iterator(); while (iterator.hasNext()) { Tuple2 next = iterator.next(); out.collect(next); if (next.f1%2 == 0) { //收集数据到侧流 context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1)); } } System.out.println("======================= ^^^ window_num:t" + i++ + " ^^^ =================="); System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )"); System.out.println("======================= key: " + s ); System.out.println("======================= watermark: " + context.currentWatermark()); System.out.println("======================================================"); } }); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。