欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

FlinkProcessWindowFunction和WindowFunction使用

时间:2023-06-17

错误提示一:

overloaded method value aggregate with alternatives: [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[V,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$13: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$14: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[V], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$12: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.WindowFunction[V,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] cannot be applied to (org.example.hot.items.CountAgg, org.example.hot.items.MyWindowFunction) .aggregate(new CountAgg(), new MyWindowFunction() )

1.利用算子aggregate()进行聚合操作

class CountAgg extends AggregateFunction[UserBehavior, Long, Long]{ override def createAccumulator(): Long = 0L override def add(in: UserBehavior, acc: Long): Long = { acc + 1L } override def getResult(acc: Long): Long = { acc } override def merge(acc: Long, acc1: Long): Long = acc + acc1}class MyWindowFunction extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] { override def apply(key: Long, w: TimeWindow, iterable: lang.Iterable[Long], collector: Collector[ItemViewCount]): Unit = { val itemId = key val windowEnd = w.getEnd val count = iterable.iterator.next() collector.collect( ItemViewCount(itemId, windowEnd, count) ) }}// DataStream数据流处理逻辑,分组、开窗、聚合val aggStream: DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") .keyBy("itemId") .window(SlidingEventTimeWindows.of(Time.seconds(2L), Time.seconds(1L))) .aggregate(new CountAgg(), new MyWindowFunction() )

2.查找问题原因

根据报错信息提示为类型不匹配,keyBy()算子之后的得到的KeyedStream[T,K]数据流的K的类型为org.apache.flink.api.java.tuple.Tuple,而MyWindowFunction的泛型给的是Long类型,分析原因为keyBy()算子使用的方式问题,在Flink-1.14.3版本中,有3种方式:

@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")def keyBy(fields: Int*): KeyedStream[T, JavaTuple]@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]

上面代码中使用的是def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]这种方式,所以返回的K值类型为org.apache.flink.api.java.tuple.Tuple类型,而不是Long类型。

这里我将keyBy()算子的使用方式改为def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K],因为上面两种已经将要过期了,个人习惯问题。修改后的处理逻辑代码如下:

// DataStream数据流处理逻辑,分组、开窗、聚合val aggStream: DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") .keyBy(_.itemId) .window(SlidingEventTimeWindows.of(Time.seconds(2L), Time.seconds(1L))) .aggregate(new CountAgg(), new MyWindowFunction() )

错误提示二:

overloaded method value aggregate with alternatives: [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[V,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$13: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$14: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[V], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$12: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.WindowFunction[V,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] cannot be applied to (org.example.hot.items.CountAgg, org.example.hot.items.MyWindowFunction) .aggregate(new CountAgg(), new MyWindowFunction() )

经过上面keyBy()算子使用方式修改过后,依然提示上述错误,经反复检查代码发现,在MyWindowFunciton上继承的WindowFunction引入的包类型错误,引入包提示有两个

org.apache.flink.streaming.api.scala.function.WindowFunctionorg.apache.flink.streaming.api.functions.windowing.WindowFunction

正确引入的应该是第一个类路径:org.apache.flink.streaming.api.scala.function.WindowFunction

正确引入后,重新编写MyWindowFunction的override def process()方法

class MyWindowFunction extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] { override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key val windowEnd = window.getEnd val count = input.iterator.next() out.collect( ItemViewCount(itemId, windowEnd, count) ) }}

总结:

1.keyBy()算子使用方式:具体可以查看DataStream[T]源码

2.WindowFunction类import包路径问题:

        org.apache.flink.streaming.api.scala.function.WindowFunction

注:在Flink-1.14.3版本中推荐使用ProcessWindowFunction类代替WindowFunction类

class MyProcessWindowFunction extends ProcessWindowFunction[Long, ItemViewCount, Long, TimeWindow]{ override def process(key: Long, context: Context, elements: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key val windowEnd = context.window.getEnd val count = elements.iterator.next() out.collect( ItemViewCount(itemId, windowEnd, count) ) }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。