使用idea编程
新建一个maven项目,添加scala语言环境,并且在resources中添加log4J.properties
pom.xml依赖是
<?xml version="1.0" encoding="UTF-8"?>
package com.qf.day03 import java.util.logging.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Durations, StreamingContext} object sparkstreamigAndSsql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("ss") val ssc = new StreamingContext(conf,Durations.seconds(10)) val dStream:ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01",10086) //主机号 端口 dStream.print() ssc.start() ssc.awaitTermination() } }
在idea运行这个代码之前,要开启主机号对应的虚拟机,然后敲下 nc -l 10086
整合升级package com.qf.day03 import java.util.logging.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataframe, SparkSession} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Durations, StreamingContext} object sparkstreamigAndSsql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("ss") val ssc = new StreamingContext(conf,Durations.seconds(10)) val sparkSession:SparkSession = SparkSession.builder().config(conf).getOrCreate() import sparkSession.implicits._ val dStream:ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01",10086) dStream.window(Durations.minutes(1),Durations.seconds(20)).foreachRDD(rdd=>{ //将rdd转成四列形式的rdd val rdd1: RDD[(String,String,String,Int)] = rdd.map(line=>{ val arr:Array[String] =line.split(" ") (arr(0),arr(1),arr(2),arr(3).toInt) }) //rdd-》 DF val df:Dataframe = rdd1.toDF("time","product_id","product_name","num") //构建表 df.createOrReplaceTempView("product_sale_info") val sql = """ |select * |from | (select t1.*,dense_rank() over(order by total desc) rk | from | ( | select product_id,product_name,sum(num) total | from product_sale_info | group by product_id,product_name) t1 | ) t2 | where rk < 4 |""".stripMargin sparkSession.sql(sql).show() }) ssc.start() ssc.awaitTermination() } }
在执行linux运行nc -l 10086,
运行上面编写的api
然后在nc指令后面继续敲
8:00 1001 毛衣 10 8:00 1002 毛衣 1 8:00 1003 毛衣 10 8:00 1004 毛衣 10 8:00 1005 毛衣 10