欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkStreaming业务逻辑处理的一些高级算子

时间:2023-05-02
1、reduceByKey

  reduceByKey 是按key进行计算,操作的数据是每个批次内的数据(一个采集周期),不能跨批次计算。如果需要实现对历史数据的跨批次统计累加,则需要使用updateStateByKey算子或者mapWithState算子。

package com.sparkscala.streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingWordCountScala { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //一、初始化程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(conf, Seconds(3)) //二、获取数据流,就是数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.244.130", 1234) //三、数据处理 //val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val wordResult: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) //四、数据输出查看 wordResult.print() //五、启动任务 ssc.start() //启动 ssc.awaitTermination() //线程等待,等待处理下一批次任务 ssc.stop() //关闭 }}

2、updateStateByKey

 updateStateByKey 算子是统计历史所有的数据,实现累加

有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordCount)。 针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。重点:首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加

注意:

reduceByKey 是无状态操作,即操作的数据都是每个批次内的数据(一个采集周期)updateStateByKey 是状态操作,即操作从启动到当前的所有采集周期内的数据(跨批次操作)

以WordCount计算为例:

package com.sparkscala.streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //一、初始化程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(conf, Seconds(3)) //为了实现对历史数据的累加,需要设置检查点目录 ssc.checkpoint("D:\Java Project\DATA\UpdateStateByKeyDemo_checkpoint") //二、读取数据流,就是数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9999) //三、数据处理 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val wordResult: DStream[(String, Int)] = wordAndOne.updateStateByKey((values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum //将目前新进来的批次的所有value值相加 val lastCount = state.getOrElse(0) //取出之前累加统计的历史状态值 Some(currentCount + lastCount) //目前值的和加上历史值,完成状态的更新 }) //四、数据输出 wordResult.print() //五、启动任务 ssc.start() ssc.awaitTermination() //线程等待,等待处理下一批次任务 ssc.stop() }}

3、mapWithState

  mapWithState:也是用于全局统计key的状态,但是它如果没有数据输入,在没有设置全局输出的情况下,默认不会返回之前的key的状态,类似于增量的感觉。

注意:mapWithState算子比updateStateByKey效率更高,因为:

updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(在生产环境中建议使用这个)。

package com.sparkscala.streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDimport org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext, Time}object MapWithStateDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //一、初始化程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) //val sc: SparkContext = new SparkContext(conf) val ssc: StreamingContext = new StreamingContext(conf, Seconds(3)) //设置检查点目录 ssc.checkpoint("D:\Java Project\DATA\MapStateByKeyDemo_checkpoint") //二、读取数据流,也就是数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9999) //三、数据处理 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) //可以设置初始值 val initialRDD: RDD[(String, Long)] = ssc.sparkContext.parallelize(List(("flink", 100L), ("spark", 50L))) val stateSpec: StateSpec[String, Int, Long, (String, Long)] = StateSpec.function((currentBatchTime: Time, key: String, value: Option[Int], state: State[Long]) => { val sum = value.getOrElse(0).toLong + state.getOption().getOrElse(0L) val output = (key, sum) //更新状态值,如果你的数据没有超时的话 if (!state.isTimingOut()) { state.update(sum) } Some(output) //返回值,要求返回的是key-value类型的 }).initialState(initialRDD) //设置初始值 .numPartitions(2).timeout(Seconds(10)) //timeout:超时。当一个key超过Seconds(10)这个时间没有接收到新数据的时候,这个key以及对应的状态会被移除掉,也就是重新统计。 val result: MapWithStateDStream[String, Int, Long, (String, Long)] = wordAndOne.mapWithState(stateSpec) //四、数据输出 //result.print() //打印出来发生变化的数据 result.stateSnapshots().print() //打印出来的是全量的数据 //五、启动任务 ssc.start() ssc.awaitTermination() ssc.stop() }}

4、transform算子实现黑名单过滤

package com.sparkscala.streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}object TransformDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //一、设置程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(conf, Seconds(3)) //二、获取数据流,即数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9993) //三、数据处理 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) //具体的黑名单操作 定义黑名单的规则:$ ? ! 过滤掉 //定义黑名单 首先要获取到黑名单,企业中可以从Mysql,Redis里面去获取。 val filterRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List("$", "?", "!")).map((_, true)) //优化:给过滤的规则数据通过广播变量广播出去 val filterBroadCast: Broadcast[Array[(String, Boolean)]] = ssc.sparkContext.broadcast(filterRDD.collect()) //实现过滤 val filterResult: DStream[(String, Int)] = wordAndOne.transform(rdd => { val filterRDD2: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(filterBroadCast.value) val result: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD2) val joinResult: RDD[(String, (Int, Option[Boolean]))] = result.filter(tuple => { tuple._2._2.isEmpty //过滤出来我们想要的数据 }) //在Scala里面最后一行就是方法的返回值 joinResult.map(tuple => (tuple._1, tuple._2._1)) //将黑名单字符替换成 * 号 }) //实现累加 val finalResult: DStream[(String, Int)] = filterResult.reduceByKey(_ + _) //四、数据输出 finalResult.print() //打印出来发生变化的数据 //五、启动任务 ssc.start() ssc.awaitTermination() ssc.stop() }}

5、Window操作——reduceByKeyAndWindow算子

  reduceByKeyAndWindow 窗口函数允许你在一个滑动的窗口中进行计算。

所有这些窗口操作都需要两个参数 windowLength(窗口大小,即窗口的持续时间) 和 slideInterval(滑动间隔,即执行窗口操作的间隔);比如说我们现在要每隔2秒,统计前4秒内每一个单词出现的次数,这个时候就需要用这个窗口函数了;请注意:窗口大小和滑动间隔必须是间隔的整数倍。

package com.sparkscala.streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}object WindowDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) //一、初始化程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(conf, Seconds(2)) //二、获取数据流,即数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9995) //三、数据处理 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(6), Seconds(4)) //四、数据输出 result.print() //五、启动任务 ssc.start() ssc.awaitTermination() ssc.stop() }}

6、SparkStreaming和SparkSQL整合

  SparkStreaming和SparkSQL整合之后,就非常的方便,可以使用SQL的方式操作相应的数据,很方便。

package com.sparkscala.streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.sql.{Dataframe, SparkSession}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamAndSQLDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) //一、设置程序入口 val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(conf, Seconds(3)) //二、获取数据流,即数据源 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop2", 9996) //三、数据处理 //这里必须先转换成DStream才能进行下面的转换 toDF 操作 val words: DStream[String] = lines.flatMap(_.split(" ")) //获取到一个一个的单词 words.foreachRDD(rdd => { val spark: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ //隐式转换 val wordDataframe: Dataframe = rdd.toDF("word") //注册一个临时视图 wordDataframe.createOrReplaceTempView("words") //数据输出 spark.sql("select word, count(*) as totalCount from words group by word").show() }) //五、启动任务 ssc.start() ssc.awaitTermination() ssc.stop() }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。