SparkStreaming的大体流程图
SparkStreaming分周期将数据封装在RDD中,底层抽象使用Dstream。本质还是对RDD中数据的处理。
SparkStreaming基本数据源案例 1、文件流案例SparkStreaming支持各种文件系统中的文件流,如:HDFS、本地文件系统
创建文件流的方式:
读取HDFS上面的文件:
streamingContext.fileStream[KeyClass, ValueClass,InputFormatClass](dataDirectory)
读取简单的文本文件:
streamingContext.textFileStream(dataDirectory)
这里我们分析windows本地文件系统中的文件数据,进行词频统计。
package cn.streaming.localfilestreamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}object TextFileStream { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TextStream") // 创建流上下文环境 val ssc = new StreamingContext(conf, Seconds(3)) val path = "d:/input" // 创建文本输入流 val texts: DStream[String] = ssc.textFileStream(path) val word : DStream[String] = texts.flatMap(words => words.split(" ")) val wo: DStream[(String, Int)] = word.map(x => (x, 1)) val res: DStream[(String, Int)] = wo.reduceByKey((x, y) => x + y) res.print() // 开始启动 ssc.start() ssc.awaitTermination() }}
2、socket流案例注意:这里我们需要触发保存事件才能被统计
我们可以在别的目录创建好文件,然后复制到目标文件夹。
或者创建文件,然后另存为其他文件,这都可以让程序读取文件流。
我们可以监听某个端口上的数据:
val conf = new SparkConf().setMaster("local[*]").setAppName("streaming");val ssc = new StreamingContext(conf, Seconds(3));val socketstreaming = ssc.socketTextStream("localhost", 8888);
这里是监视本地的8888端口的数据,滑动窗口设置为3s
这里由于在windows环境下测试,我们需要先安装netcat环境。
netcat下载地址:netcat 1.11 for Win32/Win64 (eternallybored.org)
解压文件夹。
将nc.exe复制到C:WindowsSystem32目录下,然可即可在cmd命令行执行nc指令
案例程序为:
package cn.streaming.localfilestreamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object SocketsStream { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TextStream") // 创建流上下文环境 val ssc = new StreamingContext(conf, Seconds(3)) val texts: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888) val word : DStream[String] = texts.flatMap(words => words.split(" ")) val wo: DStream[(String, Int)] = word.map(x => (x, 1)) val res: DStream[(String, Int)] = wo.reduceByKey((x, y) => x + y) res.print() // 开始启动 ssc.start() ssc.awaitTermination() }}
在cmd启动服务端:
启动spark流程序:
在窗口发送数据:
可以看出,数据在周期内被统计汇总了。
3、RDD队列流案例SparkStreaming可以接受队列里面的数据,将队列里的RDD分批次处理。
package cn.streaming.word.demoimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject queueStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("queuestreaming") val ssc = new StreamingContext(conf, Seconds(3)) // 创建队列 val rddqueue = new mutable.Queue[RDD[Int]]() // 创建队列流 val queuestream = ssc.queueStream(rddqueue) val mapstreaming = queuestream.map(x => (x % 10, 1)) val reducestream = mapstreaming.reduceByKey((a, b) => a + b) reducestream.print() // 启动流处理 ssc.start() // 向队列里面添加RDD for(i <- 1 to 30){ rddqueue.synchronized{ rddqueue += ssc.sparkContext.makeRDD(1 to 1000,10) } Thread.sleep(1000) } ssc.awaitTermination() }}
以上就是简单数据源汇总。