小谈:今天虽然有一些感冒,但是现在感觉实力爆棚,一股想要学习的冲动
昨天晚上睡觉的时候,当时的阅读量有19300多吧,然后写了一篇比较摆烂的博客,当时说如果阅读量破两万,就一定好好写博客,谁知道半夜醒来的时候就已经19900,早上醒来就已经突破两万了,这不明显系统在安排我,让我好好写博客,今天就努一努,不写好一篇博客不睡觉。
稍微有一些感冒,早上力不从心,头晕然后就没有学习钻被窝里面睡了会觉。和我的那个她聊了聊天,玩了一会跳棋。
TopN案例时间戳 省份 城市 用户 广告
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
求每个省份,排名前三的广告以及点击量。
先来看一下图解
总之先取出相应的字段,对这些字段进行格式转换。之后使用各个算子进行操作
下面来看代码吧,根据代码一一讲解
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) //时间戳 省份 城市 用户 广告 val value = sparkContext.textFile("date/agent.log") //首先取出来省份和广告 //map转换 (省份-广告,1) val value1 = value.map( line => { val strings = line.split(" ") (strings(1) + " - " +strings(4),1 ) } ) //根据key进行累加,最后结果就是(省份-广告,总数) val value2 = value1.reduceByKey(_ + _) //(省份-广告 ,总数) =>(省份,(广告,总数)) val value3 = value2.map { case (province, clickCount) => { val strings = province.split("-") (strings(0), (strings(1), clickCount)) } } //对省份进行分组,分组后的数据格式 //(省份,((广告,总数),(广告,总数),(广告,总数))) val value4 = value3.groupByKey() //对后面的广告根据总数进行排序,取降序 //mapValues 对Value进行操作 将Value根据SortWith自定义排序 value4.mapValues( itr => { itr.toList.sortWith( (left,right)=>{ left._2 > right._2 } ).take(3) } ).collect().foreach(println(_))
求TopN,关键就是在于要将格式进行转化,其实算子的操作都很简单,重要的点是能否转换成自己想要的格式,如果没有思路,可以先在纸上写一些思路,有了思路就可以很好的进行编码了。Reduce
聚合RDD中的所有元素,先聚合分区内的元素,再聚合分区间的元素
val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)
两个分区,分区0: 1 2 分区1:3 4
聚合的时候,先将分区内的元素聚合
分区0: 1 +2 =3 分区1: 3 + 4 = 7
来看一下代码
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.reduce(_ + _) println(i)
可以看到的是,在转换算子的时候,都会有RDD显示出来,但是动作算子并没有显示RDD,
可以看到,转换算子并不存储数据,在Spark初了解博客里面就说过,转换算子并不存储数据,只是存储操作,当数据到节点之后,数据就会根据记录的操作对数据进行转换。
行动算子之后,就可以从节点将数据返回到Driver端。
Collect在驱动程序中,以数组Array形式返回数据集的所有元素。
来看一下Collect动作算子
def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
可以看到 iter.toArray。将元素转换成数组的形式
foreach
遍历RDD中的每一个元素,并依次调用里面的函数
举一个例子
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.foreach(println(_)) println(i)
上面这个例子中,就会遍历RDD中的每一个元素,因为设置的f函数就是打印操作
Count
返回RDD中元素个数
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.count() println(i)
看一下图解
first
返回RDD中的第一个元素
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.first() println(i)
返回第一个元素,就是1
take返回一个由RDD的前n个元素组成的数组
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.take(2) println(i)
返回的是数组类型,单独打印的话,打印的就是数组的地址。
takeOrdered返回该RDD排序后的前n个元素组成的数组
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.takeOrdered(2) println(i)
怎么知道返回的是排序后的数组呢,看一下源码
.toArray.sorted(ord),像转换成数组,然后排序。排完序后取前n个值
aggregate将每个分区内的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值进行操作。
先看图解,根据图解来讲
4个数据分了8个区
val value = sparkContext.makeRDD(List(1, 2, 3, 4),8) val i1 = value.aggregate(10)(_+ _, _ + _)
在分区内:对每一个值都和设定的初始值10,进行相加。
分区间:在分区间计算的时候,将每个分区间的值进行相加,之后再与初始值进行相加。
countByKey统计每种key的个数
看一下图解
返回的是Map[K,Long]类型
key key的总数
总结:更新到这里就不写了,要去歇息一会。
如果我是一个打工仔,那么明天就要上班拉,虽然现在还不是打工仔,已经快是了。
这两天更新的内容,干货不是特别多,到明天会更新分区器的相关内容。
明天就要开始卷啦,颤抖吧,学生仔!!!