欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Spark追妻系列

时间:2023-07-21

今天虽然有一些感冒,但是现在感觉实力爆棚,一股想要学习的冲动

小谈:

        昨天晚上睡觉的时候,当时的阅读量有19300多吧,然后写了一篇比较摆烂的博客,当时说如果阅读量破两万,就一定好好写博客,谁知道半夜醒来的时候就已经19900,早上醒来就已经突破两万了,这不明显系统在安排我,让我好好写博客,今天就努一努,不写好一篇博客不睡觉。

        稍微有一些感冒,早上力不从心,头晕然后就没有学习钻被窝里面睡了会觉。和我的那个她聊了聊天,玩了一会跳棋。

TopN案例

        时间戳 省份 城市 用户 广告

        1516609143867 6 7 64 16
        1516609143869 9 4 75 18
        1516609143869 1 7 87 12
        1516609143869 2 8 92 9
        1516609143869 6 7 84 24
        1516609143869 1 8 95 5
        1516609143869 8 1 90 29

        求每个省份,排名前三的广告以及点击量。

        先来看一下图解

 

        总之先取出相应的字段,对这些字段进行格式转换。之后使用各个算子进行操作

        下面来看代码吧,根据代码一一讲解

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) //时间戳 省份 城市 用户 广告 val value = sparkContext.textFile("date/agent.log") //首先取出来省份和广告 //map转换 (省份-广告,1) val value1 = value.map( line => { val strings = line.split(" ") (strings(1) + " - " +strings(4),1 ) } ) //根据key进行累加,最后结果就是(省份-广告,总数) val value2 = value1.reduceByKey(_ + _) //(省份-广告 ,总数) =>(省份,(广告,总数)) val value3 = value2.map { case (province, clickCount) => { val strings = province.split("-") (strings(0), (strings(1), clickCount)) } } //对省份进行分组,分组后的数据格式 //(省份,((广告,总数),(广告,总数),(广告,总数))) val value4 = value3.groupByKey() //对后面的广告根据总数进行排序,取降序 //mapValues 对Value进行操作 将Value根据SortWith自定义排序 value4.mapValues( itr => { itr.toList.sortWith( (left,right)=>{ left._2 > right._2 } ).take(3) } ).collect().foreach(println(_))

        求TopN,关键就是在于要将格式进行转化,其实算子的操作都很简单,重要的点是能否转换成自己想要的格式,如果没有思路,可以先在纸上写一些思路,有了思路就可以很好的进行编码了。Reduce

        聚合RDD中的所有元素,先聚合分区内的元素,再聚合分区间的元素

        val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)

        两个分区,分区0: 1 2 分区1:3 4

        聚合的时候,先将分区内的元素聚合

        分区0: 1 +2 =3 分区1: 3 + 4 = 7

        来看一下代码

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.reduce(_ + _) println(i)

        可以看到的是,在转换算子的时候,都会有RDD显示出来,但是动作算子并没有显示RDD,

        可以看到,转换算子并不存储数据,在Spark初了解博客里面就说过,转换算子并不存储数据,只是存储操作,当数据到节点之后,数据就会根据记录的操作对数据进行转换。

        行动算子之后,就可以从节点将数据返回到Driver端。

Collect

        在驱动程序中,以数组Array形式返回数据集的所有元素。

        来看一下Collect动作算子

     

   def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }

        可以看到 iter.toArray。将元素转换成数组的形式

 

foreach

        遍历RDD中的每一个元素,并依次调用里面的函数

        举一个例子

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.foreach(println(_)) println(i)

        上面这个例子中,就会遍历RDD中的每一个元素,因为设置的f函数就是打印操作

 

Count

        返回RDD中元素个数

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.count() println(i)

看一下图解

 

first

        返回RDD中的第一个元素

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.first() println(i)

        返回第一个元素,就是1

take

        返回一个由RDD的前n个元素组成的数组

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.take(2) println(i)

        返回的是数组类型,单独打印的话,打印的就是数组的地址。

takeOrdered

        返回该RDD排序后的前n个元素组成的数组

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") val sparkContext = new SparkContext(wordCount) val value = sparkContext.makeRDD(List(1, 2, 3, 4)) val i = value.takeOrdered(2) println(i)

怎么知道返回的是排序后的数组呢,看一下源码

        .toArray.sorted(ord),像转换成数组,然后排序。排完序后取前n个值

aggregate

        将每个分区内的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值进行操作。

先看图解,根据图解来讲

        4个数据分了8个区

val value = sparkContext.makeRDD(List(1, 2, 3, 4),8) val i1 = value.aggregate(10)(_+ _, _ + _)

        在分区内:对每一个值都和设定的初始值10,进行相加。

        分区间:在分区间计算的时候,将每个分区间的值进行相加,之后再与初始值进行相加。

countByKey

        统计每种key的个数

        看一下图解

 

        返回的是Map[K,Long]类型

        key key的总数

总结:

        更新到这里就不写了,要去歇息一会。

        如果我是一个打工仔,那么明天就要上班拉,虽然现在还不是打工仔,已经快是了。

        这两天更新的内容,干货不是特别多,到明天会更新分区器的相关内容。

        明天就要开始卷啦,颤抖吧,学生仔!!!

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。