错误提示一:
overloaded method value aggregate with alternatives: [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[V,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$13: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$14: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[V], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$12: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.WindowFunction[V,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] cannot be applied to (org.example.hot.items.CountAgg, org.example.hot.items.MyWindowFunction) .aggregate(new CountAgg(), new MyWindowFunction() )
1.利用算子aggregate()进行聚合操作
class CountAgg extends AggregateFunction[UserBehavior, Long, Long]{ override def createAccumulator(): Long = 0L override def add(in: UserBehavior, acc: Long): Long = { acc + 1L } override def getResult(acc: Long): Long = { acc } override def merge(acc: Long, acc1: Long): Long = acc + acc1}class MyWindowFunction extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] { override def apply(key: Long, w: TimeWindow, iterable: lang.Iterable[Long], collector: Collector[ItemViewCount]): Unit = { val itemId = key val windowEnd = w.getEnd val count = iterable.iterator.next() collector.collect( ItemViewCount(itemId, windowEnd, count) ) }}// DataStream数据流处理逻辑,分组、开窗、聚合val aggStream: DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") .keyBy("itemId") .window(SlidingEventTimeWindows.of(Time.seconds(2L), Time.seconds(1L))) .aggregate(new CountAgg(), new MyWindowFunction() )
2.查找问题原因
根据报错信息提示为类型不匹配,keyBy()算子之后的得到的KeyedStream[T,K]数据流的K的类型为org.apache.flink.api.java.tuple.Tuple,而MyWindowFunction的泛型给的是Long类型,分析原因为keyBy()算子使用的方式问题,在Flink-1.14.3版本中,有3种方式:
@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")def keyBy(fields: Int*): KeyedStream[T, JavaTuple]@deprecated("use [[DataStream.keyBy(KeySelector)]] instead")def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
上面代码中使用的是def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]这种方式,所以返回的K值类型为org.apache.flink.api.java.tuple.Tuple类型,而不是Long类型。
这里我将keyBy()算子的使用方式改为def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K],因为上面两种已经将要过期了,个人习惯问题。修改后的处理逻辑代码如下:
// DataStream数据流处理逻辑,分组、开窗、聚合val aggStream: DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") .keyBy(_.itemId) .window(SlidingEventTimeWindows.of(Time.seconds(2L), Time.seconds(1L))) .aggregate(new CountAgg(), new MyWindowFunction() )
错误提示二:
overloaded method value aggregate with alternatives: [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[V,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$13: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$14: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$15: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[V], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$11: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$12: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ACC, V, R](preAggregator: org.apache.flink.api.common.functions.AggregateFunction[org.example.hot.items.UserBehavior,ACC,V], windowFunction: org.apache.flink.streaming.api.scala.function.WindowFunction[V,R,Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[V], implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] cannot be applied to (org.example.hot.items.CountAgg, org.example.hot.items.MyWindowFunction) .aggregate(new CountAgg(), new MyWindowFunction() )
经过上面keyBy()算子使用方式修改过后,依然提示上述错误,经反复检查代码发现,在MyWindowFunciton上继承的WindowFunction引入的包类型错误,引入包提示有两个
org.apache.flink.streaming.api.scala.function.WindowFunctionorg.apache.flink.streaming.api.functions.windowing.WindowFunction
正确引入的应该是第一个类路径:org.apache.flink.streaming.api.scala.function.WindowFunction
正确引入后,重新编写MyWindowFunction的override def process()方法
class MyWindowFunction extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] { override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key val windowEnd = window.getEnd val count = input.iterator.next() out.collect( ItemViewCount(itemId, windowEnd, count) ) }}
总结:
1.keyBy()算子使用方式:具体可以查看DataStream[T]源码
2.WindowFunction类import包路径问题:
org.apache.flink.streaming.api.scala.function.WindowFunction
注:在Flink-1.14.3版本中推荐使用ProcessWindowFunction类代替WindowFunction类
class MyProcessWindowFunction extends ProcessWindowFunction[Long, ItemViewCount, Long, TimeWindow]{ override def process(key: Long, context: Context, elements: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key val windowEnd = context.window.getEnd val count = elements.iterator.next() out.collect( ItemViewCount(itemId, windowEnd, count) ) }}