欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flinkscala热门页面浏览量统计

时间:2023-04-17

1.基于服务器 log 的热门页面浏览量统计
每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL

package com.chuangyan.network35import java.langimport java.sql.Timestampimport java.text.SimpleDateFormatimport java.time.Durationimport org.apache.flink.api.common.RuntimeExecutionModeimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}import org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffercase class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)object NetworkFlow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) env.setParallelism(1) val source: DataStream[String] = env.readTextFile("D:\study\Code\UserBehavior\NetworkFlowAnalysis\src\main\resources\apache.log") val dataStream: DataStream[ApacheLogEvent] = source.map(line => { val split = line.split(" ") val ip = split(0) val userId = split(1) val format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val date = format.parse(split(3)) val eventTime = date.getTime val method = split(5) val url = split(6) ApacheLogEvent(ip, userId, eventTime, method, url) }) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner[ApacheLogEvent] { override def extractTimestamp(element: ApacheLogEvent, ApacheLogEvent: Long): Long = element.eventTime })) //每隔五秒输出近十分钟内访问量最多的前N个URL dataStream.filter(_.url!="/") .keyBy(_.url) .timeWindow(Time.minutes(10),Time.seconds(5)) .aggregate(new CountAgg(),new WindowResultFunction()) //窗内排序 .keyBy(_.windowEnd) .process(new TopNHotUrls(5)) .print() env.execute("url job") }}class CountAgg extends AggregateFunction[ApacheLogEvent,Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1}case class UrlViewCount(url:String,windowEnd:Long,count:Long)class WindowResultFunction extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = { val url=key val windowEnd=window.getEnd val count=input.iterator.next() out.collect(UrlViewCount(url,windowEnd,count)) }}class TopNHotUrls(size: Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{ var listState: ListState[UrlViewCount] =_ override def open(parameters: Configuration): Unit = { listState= getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("listState",classOf[UrlViewCount])) } override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = { //添加到状态 listState.add(i) //注册定时器 val ts=i.windowEnd+1 context.timerService().registerEventTimeTimer(ts) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { val viewCounts: lang.Iterable[UrlViewCount] = listState.get() val buffer= ListBuffer[UrlViewCount]() val it = viewCounts.iterator() while (it.hasNext){ buffer += it.next() } val urlViewCount=buffer.sortBy(_.count).reverse.take(size) //将排名信息格式化成String便于打印 val result: StringBuilder = new StringBuilder result.append("====================") result.append("时间:").append(new Timestamp(timestamp - 1)).append("n") for(i <- urlViewCount.indices){ val currentURL=urlViewCount(i) result.append("No.").append(i+1).append(":") .append("URL=").append(currentURL.url) .append("浏览量=").append(currentURL.count).append("n") } result.append("===================") //控制输出频率 模拟实时滚动结果 Thread.sleep(1000) out.collect(result.toString) }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。