欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink测流输出

时间:2023-04-18
测流输出示例

public class SideOutputDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource lines = env.socketTextStream("linux01", 7777); //需指定泛型的类型 或者new OutputTag的子类(即使用匿名内部类) //OutputTag oddTag = new OutputTag("odd-data") {}; OutputTag oddTag = new OutputTag<>("odd-data", Types.INT); OutputTag evenTag = new OutputTag<>("even-data",Types.INT); OutputTag strTag = new OutputTag<>("str-data",Types.STRING); SingleOutputStreamOperator mainStream = lines.process(new ProcessFunction() { @Override public void processElement(String line, Context ctx, Collector out) throws Exception { try { int num = Integer.parseInt(line); if (num % 2 == 0) { //偶数 ctx.output(oddTag, num); } else { //奇数 ctx.output(evenTag, num); } } catch (NumberFormatException e) { //字符串 ctx.output(strTag, line); } } }); DataStream oddStream = mainStream.getSideOutput(oddTag); DataStream strStream = mainStream.getSideOutput(strTag); oddStream.print("偶数流"); strStream.print("字符流"); mainStream.print("主流"); env.execute(); }}

使用侧流输出获取窗口中迟到的数据

public class GetWindowLateDataDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //开启checkpointing env.enableCheckpointing(10000); //设置重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000)); DataStreamSource lines = env.socketTextStream("linux01", 7777); //获取Watermark SingleOutputStreamOperator linesWithWatermark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(String element, long recordTimestamp) { return Long.parseLong(element.split(",")[0]); } })); SingleOutputStreamOperator> wordAndCount = linesWithWatermark.map(new MapFunction>() { @Override public Tuple2 map(String line) throws Exception { if (line.startsWith("error")){ throw new RuntimeException("错误数据"); } String[] fields = line.split(","); return Tuple2.of(fields[1], Integer.parseInt(fields[2])); } }); //按照单词keyBy KeyedStream, String> keyedStream = wordAndCount.keyBy(tp -> tp.f0); //划分窗口 WindowedStream, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); //给迟到的数据打上标签 OutputTag> lateDataTag = new OutputTag>("late-data"){}; windowedStream.sideOutputLateData(lateDataTag); SingleOutputStreamOperator> result = windowedStream.sum(1); //获取迟到的数据 DataStream> lateDataStream = result.getSideOutput(lateDataTag); result.print(); lateDataStream.print("迟到的数据"); env.execute(); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。