欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkStreamingWordCount入门案例

时间:2023-05-05

package com.lqs.sparkstreamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}object Qs01_WordCount { def main(args: Array[String]): Unit = { //TODO 1、初始化SparkStreaming配置信息 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamin") //TODO 2、初始化SparkStreamingContext //Seconds(3):设置微批次流的间隔时间为3sec val streamingContext = new StreamingContext(conf, Seconds(3)) //通过端口创建DStream,读取进来的数据为一行行 val lineDStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("bdc112", 8888) //将数据按行进行切分成一个个的单词 val wordDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //将单词映射成元组(word,1) val wordToOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1)) //将相同的单词数做统计 val wordToSumDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _) //打印到控制台 wordToSumDStream.print() //TODO 3、启动SparkStreamingContext streamingContext.start() //TODO 4、将主线程阻塞,主线程不退出 streamingContext.awaitTermination() }}

启动程序并通过netcat发送数据

[lqs@bdc112 datas]$ nc -lk 8888lqs lqs Hello Word h^HH^H^H^Hlqs Hello Woedlqs

运行结果:

-------------------------------------------Time: 1645009179000 ms--------------------------------------------------------------------------------------Time: 1645009182000 ms--------------------------------------------------------------------------------------Time: 1645009185000 ms-------------------------------------------,1)(Hello,1)(Word,1)(lqs,2)-------------------------------------------Time: 1645009188000 ms--------------------------------------------------------------------------------------Time: 1645009203000 ms-------------------------------------------(Hello,1)(Woed,1)(lqs,1)-------------------------------------------Time: 1645009206000 ms-------------------------------------------(lqs,1)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。