欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flink常用算子

时间:2023-06-22
Operators

map
DataStream → DataStreamflatMap
DataStream → DataStreamfliter
DataStream → DataStreamkeyBy
DataStream → KeyedStream
对数据进行分流reduce
KeyedStream/WindowedStream/AllWindowedStream → DataStream
用于keyBy或者window/windowAll之后window
KeyedStream → WindowedStream
用于keyBy之后windowAll
DataStream → AllWindowedStream
不用于keyBy之后,此算子并行度始终为1apply
WindowedStream/AllWindowedStream → DataStreamunion
DataStream* → DataStream
合并相同类型的流join
DataStream,DataStream → DataStream
比较两条流中的元素,如果相等输出,否则不进行输出。

dataStream.join(otherStream) .where().equalTo() .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...});

Interval Join
KeyedStream,KeyedStream → DataStream

// this will join the two streams so that// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...});

CoGroup
DataStream,DataStream → DataStream
比较两条流中的元素,如果相等则放在一起输出,否则分开输出。重点是group。

dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});

Connect
DataStream,DataStream → ConnectedStream
“连接”两条数据流,并保留他们的类型(类型可以不一样)。连接允许两个流之间共享状态。

DataStream someStream = //...DataStream otherStream = //...ConnectedStreams connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap
ConnectedStream → DataStream
专门针对ConnectedStream流的算子

connectedStreams.map(new CoMapFunction() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; }});connectedStreams.flatMap(new CoFlatMapFunction() { @Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } }});

Iterate
DataStream → IterativeStream → ConnectedStream
一个流被分为两部分,一部分持续不断循环输出,另一部分正常输出。

IterativeStream iteration = initialStream.iterate();DataStream iterationBody = iteration.map ();DataStream feedback = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Long value) throws Exception { return value > 0; }});iteration.closeWith(feedback);DataStream output = iterationBody.filter(new FilterFunction(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; }});

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。