欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink-基于eventtime的时间窗口学习

时间:2023-04-22

最近在学习flink的窗口的使用,在网上找了一些材料来学习,记录下学习过程中的收获。

要实现的功能:利用netcat生成流数据,并利用flink的窗口进行wordcount。

工具:mac的命令行、idea

步骤1:打开命令行窗口,输入nc -l 777,吊起输入数据的窗口。

数据例子:

1614909800,p

1614909801,p

1614909802,p

1614909790,k

1614909791,k

1614909805,a

1614909806,a

1614909805,a

1614909810,a

1614909805,a

1614909815,a

步骤2:运行flink程序

package demo;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.time.Duration;public class eventtimetest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource inputStream = env.socketTextStream("localhost", 7777);//localhost等于127.0.0.1,不过localhost是域名,127.0.0.1是IP地址 SerializableTimestampAssigner timestampAssigner = new SerializableTimestampAssigner() { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] fields = element.split(","); Long aLong = new Long(fields[0]); return aLong * 1000L; } }; //forBoundedOutOfOrderness,设置watermark的延迟 SingleOutputStreamOperator watermarkStream = inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(timestampAssigner)); watermarkStream.flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector> out) throws Exception { String[] fields = value.split(","); out.collect(new Tuple2<>(fields[1], 1)); } }).keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print(); env.execute("run watermark wc"); }}

注意:

1.extractTimestamp函数返回的是13位的timestamp,例如1614909815*1000=1614909815000,这是ms的timestamp,所以函数return along*1000.2.在nc的输入数据格式如下:1614909815,java3.要先开启nc,再执行代码。nc -l 7777 然后输入数据。4.forboundeoutoforderness设置的时常是10s,例如数据在第15秒才第一次触发计算,计算窗口是第一个5秒窗口的数据以及 窗口的第一秒序号比第15s少了15s以上的窗口也计算。5.窗口是固定的,以5秒的窗口为例子:第0 1 2 3 4秒是一个窗口,第二个窗口是第5 6 7 8 9。

结果:

主要用到的参考文章:Flink1.12 使用WatermarkStrategy生成时间戳

原文链接:https://blog.csdn.net/RonieWhite/article/details/114386907

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。