1.实时热门商品统计
需求:每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品
package userbehavior35//需求 每隔五分钟输出最近一小时内点击量最多的前 N 个商品。import java.langimport java.sql.Timestampimport java.time.Durationimport org.apache.flink.api.common.RuntimeExecutionModeimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}import org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffercase class UserBehavior(userId:Long,itemId:Long,categoryId:Long,behavior:String,timestamp:Long)object HotItem { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) env.setParallelism(1) val source = env.readTextFile("D:\study\Code\UserBehavior\src\main\resources\UserBehavior.csv") val dataStream: DataStream[UserBehavior] = source.map(line => { val split = line.split(",") val userId = split(0).trim.toLong val itemId = split(1).trim.toLong val categoryId = split(2).trim.toLong val behavior = split(3).trim val timestamp = split(4).trim.toLong UserBehavior(userId, itemId, categoryId, behavior, timestamp) }) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner[UserBehavior] { override def extractTimestamp(element: UserBehavior, recordTimestamp: Long): Long = element.timestamp * 1000L })) val aggDataStream = dataStream.filter(_.behavior == "pv") .keyBy(_.itemId) .timeWindow(Time.hours(1),Time.minutes(5)) .aggregate(new CountAgg(),new WindowResultFunction()) //对窗口内的数据 排序输出 aggDataStream.keyBy(_.windowEnd) .process(new TopNHotItems(3)) .print() env.execute("job") }}class CountAgg() extends AggregateFunction[UserBehavior,Long,Long]{ //初始化 override def createAccumulator(): Long = 0L //累加 override def add(in: UserBehavior, acc: Long): Long = acc+1 //取值 override def getResult(acc: Long): Long = acc //多个分区的合并 override def merge(acc: Long, acc1: Long): Long = acc + acc1}case class ItemViewCount(itemId:Long,windowEnd:Long,count:Long)class WindowResultFunction() extends WindowFunction[Long,ItemViewCount,Long,TimeWindow]{ override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId=key val windowEnd=window.getEnd val count=input.iterator.next() out.collect(ItemViewCount(itemId,windowEnd,count)) }}class TopNHotItems(size:Int) extends KeyedProcessFunction[Long,ItemViewCount,String]{//定义状态 保存窗口内的所有数据 var listState: ListState[ItemViewCount] = _ var timerState: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { listState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("listState",classOf[ItemViewCount])) timerState= getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerState",classOf[Long])) } override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { listState.add(i) //注册定时器 窗口下一秒执行 val ts = i.windowEnd + 1 context.timerService().registerEventTimeTimer(ts) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { //取出窗口的所有数据 java集合 val itemViewCounts: lang.Iterable[ItemViewCount] = listState.get() //创建scala集合 存放数据 val buffer: ListBuffer[ItemViewCount] = ListBuffer[ItemViewCount]() import scala.collection.JavaConversions._ for(i <- itemViewCounts){ buffer += i } //对窗口的数据排序 去TopN val sortedItems: ListBuffer[ItemViewCount] = buffer.sortBy(_.count).reverse.take(size) //将排名信息格式化成String,便于打印 val result: StringBuilder = new StringBuilder result.append("=============================") result.append("时间:").append(new Timestamp(timestamp-1)).append("n") for(i<-sortedItems.indices){ val currentItem: ItemViewCount = sortedItems(i) result.append("No.").append(i+1).append(":") .append("商品ID=").append(currentItem.itemId) .append("浏览量=").append(currentItem.count).append("n") } result.append("=========================") Thread.sleep(1000) out.collect(result.toString()) }}