欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkStreaming基本数据源

时间:2023-07-16
SparkStreaming基本数据源

SparkStreaming的大体流程图

SparkStreaming分周期将数据封装在RDD中,底层抽象使用Dstream。本质还是对RDD中数据的处理。

SparkStreaming基本数据源案例 1、文件流案例

SparkStreaming支持各种文件系统中的文件流,如:HDFS、本地文件系统

创建文件流的方式:

读取HDFS上面的文件:

streamingContext.fileStream[KeyClass, ValueClass,InputFormatClass](dataDirectory)

读取简单的文本文件:

streamingContext.textFileStream(dataDirectory)

这里我们分析windows本地文件系统中的文件数据,进行词频统计。

package cn.streaming.localfilestreamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}object TextFileStream { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TextStream") // 创建流上下文环境 val ssc = new StreamingContext(conf, Seconds(3)) val path = "d:/input" // 创建文本输入流 val texts: DStream[String] = ssc.textFileStream(path) val word : DStream[String] = texts.flatMap(words => words.split(" ")) val wo: DStream[(String, Int)] = word.map(x => (x, 1)) val res: DStream[(String, Int)] = wo.reduceByKey((x, y) => x + y) res.print() // 开始启动 ssc.start() ssc.awaitTermination() }}

注意:这里我们需要触发保存事件才能被统计

我们可以在别的目录创建好文件,然后复制到目标文件夹。

或者创建文件,然后另存为其他文件,这都可以让程序读取文件流。

2、socket流案例

我们可以监听某个端口上的数据:

val conf = new SparkConf().setMaster("local[*]").setAppName("streaming");val ssc = new StreamingContext(conf, Seconds(3));val socketstreaming = ssc.socketTextStream("localhost", 8888);

这里是监视本地的8888端口的数据,滑动窗口设置为3s

这里由于在windows环境下测试,我们需要先安装netcat环境。

netcat下载地址:netcat 1.11 for Win32/Win64 (eternallybored.org)

解压文件夹。

将nc.exe复制到C:WindowsSystem32目录下,然可即可在cmd命令行执行nc指令

案例程序为:

package cn.streaming.localfilestreamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object SocketsStream { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TextStream") // 创建流上下文环境 val ssc = new StreamingContext(conf, Seconds(3)) val texts: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888) val word : DStream[String] = texts.flatMap(words => words.split(" ")) val wo: DStream[(String, Int)] = word.map(x => (x, 1)) val res: DStream[(String, Int)] = wo.reduceByKey((x, y) => x + y) res.print() // 开始启动 ssc.start() ssc.awaitTermination() }}

在cmd启动服务端:

启动spark流程序:

在窗口发送数据:

可以看出,数据在周期内被统计汇总了。

3、RDD队列流案例

SparkStreaming可以接受队列里面的数据,将队列里的RDD分批次处理。

package cn.streaming.word.demoimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject queueStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("queuestreaming") val ssc = new StreamingContext(conf, Seconds(3)) // 创建队列 val rddqueue = new mutable.Queue[RDD[Int]]() // 创建队列流 val queuestream = ssc.queueStream(rddqueue) val mapstreaming = queuestream.map(x => (x % 10, 1)) val reducestream = mapstreaming.reduceByKey((a, b) => a + b) reducestream.print() // 启动流处理 ssc.start() // 向队列里面添加RDD for(i <- 1 to 30){ rddqueue.synchronized{ rddqueue += ssc.sparkContext.makeRDD(1 to 1000,10) } Thread.sleep(1000) } ssc.awaitTermination() }}

以上就是简单数据源汇总。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。