欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

sparkstreaming和sparksql的整合

时间:2023-04-16
软件环境

使用idea编程

新建一个maven项目,添加scala语言环境,并且在resources中添加log4J.properties

pom.xml依赖是

 <?xml version="1.0" encoding="UTF-8"?>     4.0.0 ​    org.example    sz2103-sparkstreaming    1.0-SNAPSHOT                        org.apache.spark            spark-streaming_2.11            2.2.3                            org.apache.spark            spark-streaming-kafka-0-10_2.11            2.2.3                            redis.clients            jedis            3.0.0                            org.apache.spark            spark-sql_2.11            2.2.3             ​ 

sparkstreaming基本代码

 package com.qf.day03 ​ import java.util.logging.{Level, Logger} ​ import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Durations, StreamingContext} ​ ​ object sparkstreamigAndSsql {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local[*]").setAppName("ss")     val ssc = new StreamingContext(conf,Durations.seconds(10))      val dStream:ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01",10086)    //主机号 端口               dStream.print()     ssc.start()     ssc.awaitTermination() ​  } } ​

在idea运行这个代码之前,要开启主机号对应的虚拟机,然后敲下 nc -l 10086

整合升级

 package com.qf.day03 ​ import java.util.logging.{Level, Logger} ​ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataframe, SparkSession} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Durations, StreamingContext} ​ object sparkstreamigAndSsql {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local[*]").setAppName("ss")     val ssc = new StreamingContext(conf,Durations.seconds(10))     val sparkSession:SparkSession = SparkSession.builder().config(conf).getOrCreate()     import sparkSession.implicits._      val dStream:ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01",10086)     dStream.window(Durations.minutes(1),Durations.seconds(20)).foreachRDD(rdd=>{       //将rdd转成四列形式的rdd      val rdd1: RDD[(String,String,String,Int)] = rdd.map(line=>{        val arr:Array[String] =line.split(" ")        (arr(0),arr(1),arr(2),arr(3).toInt)      })       //rdd-》 DF       val df:Dataframe = rdd1.toDF("time","product_id","product_name","num")       //构建表       df.createOrReplaceTempView("product_sale_info")       val sql =         """           |select *           |from           |   (select t1.*,dense_rank() over(order by total desc) rk           |   from           |     (           |     select product_id,product_name,sum(num) total           |     from product_sale_info           |     group by product_id,product_name) t1           |   ) t2           | where rk < 4           |""".stripMargin       sparkSession.sql(sql).show()    }) ​     ssc.start()     ssc.awaitTermination() ​  } } ​

在执行linux运行nc -l 10086,

运行上面编写的api

然后在nc指令后面继续敲

 8:00 1001 毛衣 10 8:00 1002 毛衣 1 8:00 1003 毛衣 10 8:00 1004 毛衣 10 8:00 1005 毛衣 10

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。