欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Spark之RDD转换算子

时间:2023-06-21

目录

一.value类型

1.map()

 2.mapPartitions()

3.mapPartitionsWithIndex()

 4.flatMap()扁平化

 5.glom()

 6.groupBy()

 7.filter()

8.sample()

9、distinct()

10.coalesce()

11.repartition()

 12.sortBy()

13、pipe()

二、双value 

1.intersection()

2.union()

3.subtract()

 4.zip()

 三、KEY-VALUE类型

1.partitionBy()

2.自定义分区 

3.reduceByKey()

4.groupByKey()

5.aggregateByKey()

6.foldByKey()

7.combineByKey()

8.sortByKey()

 9.mapValues()

10.join() 

 11.cogroup()

 12.求top3


一.value类型

1.map()

映射:新RDD每一个元素都是由原来RDD中每一个元素依次应用函数f得到的

val value: RDD[Int] = sc.makeRDD(1 to 4, 2)val rdd1: RDD[Int] = value.map(_*2)

 2.mapPartitions()

        以分区为单位执行Map

val rdd2: RDD[Int] = value.mapPartitions(list=> { println("计算一个数字") //运行两遍 因为两个分区 list.map(i => { println("数字") //运行四遍 i*2 }) })rdd2.collect().foreach(println)

注:在空间资源较大时使用,一个分区数据处理完之后,原RDD分区中的数据才释放,可能导致OOM

3.mapPartitionsWithIndex()

        带分区号映射,创建一个RDD,使每个元素跟所在分区号形成一个元组,组成一个新的RDD

val rdd4: RDD[(Int, Int)] = value.mapPartitionsWithIndex((num,list)=> list.map(i=>(num,i)))val rdd3: RDD[(Int, Int)] = value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))rdd3.collect().foreach(println)

 4.flatMap()扁平化

        将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。但在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

// 判断分区 // flatMap不改变分区的情况 保持原分区 val rdd3: RDD[(Int, Int)] = value1.mapPartitionsWithIndex((num,list)=>list.map((num,_))) rdd3.collect().foreach(println) // 对应 (长字符串 ,次数) => (单词,次数),(单词,次数) val tupleRDD: RDD[(String, Int)] = sc.makeRDD(List(("hello world", 100), ("hello scala", 200))) val value3: RDD[(String, Int)] = tupleRDD.flatMap(tuple => { val strings: Array[String] = tuple._1.split(" ") strings.map(word => (word, tuple._2)) }) value3.collect().foreach(println) tupleRDD.flatMap(tuple => { tuple._1.split(" ") .map(word => (word,tuple._2)) }) // 偏函数写法 val value4: RDD[(String, Int)] = tupleRDD.flatMap(tuple => tuple match { case (line, count) => line.split(" ").map(word => (word, count)) }) value4.collect().foreach(println) val value5: RDD[(String, Int)] = tupleRDD.flatMap{ case (line, count) => line.split(" ").map(word => (word, count))}

 5.glom()

        将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)val value: RDD[Array[Int]] = listRDD.glom()val arraydemo: RDD[Int] = value.map(Array => (Array.max))arraydemo.collect().foreach(println) // 从一个sql脚本文件中 提取对应的sqlval lineRDD: RDD[String] = sc.textFile("input/1.sql",1) // 直接使用spark的算子读取sql脚本文件 里面的内容是打散为一行一行val value2: RDD[Array[String]] = lineRDD.glom() //mkString 数组转化为字符串value2.map(array => array.mkString).collect().foreach(println)

 6.groupBy()

        将相同K对应的值放入一个迭代器

val sc: SparkContext = new SparkContext(conf) val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val value: RDD[(Int, Iterable[Int])] = listRDD.groupBy(i => i%2) value.collect().foreach(println) //(0,CompactBuffer(2, 4)) //(1,CompactBuffer(1, 3))//groupby实现wordcount val text: RDD[String] = sc.textFile("input/1.txt") val fmRDD: RDD[String] = text.flatMap(_.split(" ")) val value: RDD[(String, Iterable[String])] = fmRDD.groupBy(s => s) value.collect().foreach(println) val value1: RDD[(String, Int)] = value.mapValues(list => list.size) value1.collect().foreach(println) val value2: RDD[(String, Int)] = value.map({ case (word, list) => (word, list.size) }) value2.collect().foreach(println)

 7.filter()

        接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val value: RDD[Int] = listRDD.filter(i => i%2 ==0) value.collect().foreach(println) // 过滤 // 保持分区不变

8.sample()

        采样,从大量的数据中采样

val listRDD: RDD[Int] = sc.makeRDD(1 to 10) // 抽取数据不放回(伯努利算法) // 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。 // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要 // 第一个参数:抽取的数据是否放回,false:不放回 // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取; // 第三个参数:随机数种子 // 随机算法相同,种子相同,那么随机数就相同 // 不输入参数,种子取的当前时间的纳秒值,所以随机结果就不相同了 val value1: RDD[Int] = listRDD.sample(false,0.5,10) value1.collect().foreach(println) // 抽取数据放回(泊松算法) // 第一个参数:抽取的数据是否放回,true:放回;false:不放回 // 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数 // 第三个参数:随机数种子 val value2: RDD[Int] = listRDD.sample(true,2.3,100) value2.collect().foreach(println)

9、distinct()

        去重,将去重后的数据放入新的RDD中

 

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,5,4,2,3,1,6),2)val value: RDD[Int] = listRDD.distinct()//distinct会存在shuffle过程

10.coalesce()

        合并分区,缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

// 缩减分区 // 多对一的关系 不走shuffle val coalrdd: RDD[Int] = listRDD.coalesce(2) val rdd3: RDD[(Int, Int)] = coalrdd.mapPartitionsWithIndex((num,list)=>list.map((num,_))) rdd3.collect().foreach(println) println("===============") // 扩大分区 // 必须走shuffle 不然没有意义 val listRDD1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val coalrdd2: RDD[Int] = listRDD1.coalesce(5,true) val rdd4: RDD[(Int, Int)] = coalrdd2.mapPartitionsWithIndex((num,list)=>list.map((num,_))) rdd4.collect().foreach(println)

11.repartition()

        重新分区,该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

val sc: SparkContext = new SparkContext(conf) val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val value: RDD[Int] = listRDD.repartition(4) val rdd3: RDD[(Int, Int)] = value.mapPartitionsWithIndex((num,list)=>list.map((num,_)))

 12.sortBy()

        排序,该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。

val listRDD: RDD[Int] = sc.makeRDD(List(1,6,5,4,2,3,4,9),2) val value: RDD[Int] = listRDD.sortBy(i => i) value.collect().foreach(println) println("==================") // spark的排序能够实现全局有序 // 保证0号分区的数据都大于等于1号分区的数据 // sortBy需要走shuffle val value2: RDD[Int] = listRDD.sortBy(i => i,false) val rdd3: RDD[(Int, Int)] = value2.mapPartitionsWithIndex((num,list)=>list.map((num,_))) rdd3.collect().foreach(println)// (0,9)// (0,6)// (0,5)// (1,4)// (1,4)// (1,3)// (1,2)// (1,1)

13、pipe()

        调用脚本,管道,针对每个分区,都调用一次shell脚本,返回输出的RDD

二、双value 

1.intersection()

        求交集

2.union()

         求并集

3.subtract()

        求差集

 4.zip()

        拉链,该操作可以将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。

        将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val listRDD1: RDD[Int] = sc.makeRDD(List(1,2,3,4),2) // 求交集会打散重新分区 即需要走shuffle // 默认采用交集中较多的分区 val demo01: RDD[Int] = listRDD.intersection(listRDD1) demo01.collect().foreach(println) // 求并集 // 并集不走shuffle // 只是把两个RDD的分区数据拿到一起 分区的个数等于两个RDD分区个数之和 val demo02: RDD[Int] = listRDD.union(listRDD1) val rdd3: RDD[(Int, Int)] = demo02.mapPartitionsWithIndex((num,list)=>list.map((num,_))) rdd3.collect().foreach(println) // 求差集 // 需要重写分区 走shuffle 可以自己写分区数 println("=======================") val demo03: RDD[Int] = listRDD.subtract(listRDD1) val rdd4: RDD[(Int, Int)] = demo03.mapPartitionsWithIndex((num,list)=>list.map((num,_))) rdd4.collect().foreach(println) // 将相同分区对应位置的元素拉链到一起 成为一个2元组 // zip只能操作两个rdd具有相同的分区个数和元素个数 val demo04: RDD[(Int, Int)] = listRDD.zip(listRDD1) demo04.mapPartitionsWithIndex((num,list) => list.map((num,_))).collect().foreach(println)

 三、KEY-VALUE类型

1.partitionBy()

        根据key值重新分区,如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val value: RDD[(Int, Int)] = listRDD.map((_,1)) //按照key对分区数取模分区 val hp: RDD[(Int, Int)] = value.partitionBy(new HashPartitioner(2)) hp.mapPartitionsWithIndex((num,list)=>list.map((num,_))).collect().foreach(println)

2.自定义分区 

        要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。

(1)numPartitions: Int:返回创建出来的分区数。

(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val value: RDD[(Int, Int)] = listRDD.map((_,1)) val hp: RDD[(Int, Int)] = value.partitionBy(new MyPartition) hp.mapPartitionsWithIndex((num,list)=>list.map((num,_))).collect().foreach(println) sc.stop() } //自定义分区 class MyPartition extends Partitioner{ override def numPartitions: Int = 2 // 获取分区号 => 根据元素的key值 判断分给哪个分区 // spark 的分区器只能对key进行分区 override def getPartition(key: Any): Int = { key match { case i:Int => i%2 case _ => 0 }

3.reduceByKey()

        元素按照相同的Key对Value进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) val tuplerdd: RDD[(Int, Int)] = listRDD.map( (_ ,1)) val result1: RDD[(Int, Int)] = tuplerdd.reduceByKey((res,elem) => res - elem) result1.collect().foreach(println) // 验证结果 // 需要进行两次归约 一次分区内 一次分区间 // 分区间的第一个元素取决于分区的编号 编号越小越靠前 val value1: RDD[(String, Int)] = sc.makeRDD( List(("a", 1), ("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1), ("b", 1), ("a", 1),("c",1)), 2) val result2: RDD[(String, Int)] = value1.reduceByKey((res,elem) => res - elem) result2.collect().foreach(println)

4.groupByKey()

        按照key重新分组,对每个key进行操作,但只生成一个seq,并不进行聚合

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,1,2,3,4),2) val tupleRDD: RDD[(Int, Int)] = listRDD.map((_, 1)) val result: RDD[(Int, Iterable[(Int, Int)])] = tupleRDD.groupBy(tuple => tuple._1) result.collect().foreach(println) //(4,CompactBuffer((4,1), (4,1))) //(2,CompactBuffer((2,1), (2,1))) //(1,CompactBuffer((1,1), (1,1))) //(3,CompactBuffer((3,1), (3,1))) val result2: RDD[(Int, Iterable[Int])] = tupleRDD.groupByKey() result2.collect().foreach(println) //(4,CompactBuffer(1, 1)) //(2,CompactBuffer(1, 1)) //(1,CompactBuffer(1, 1)) //(3,CompactBuffer(1, 1))

5.aggregateByKey()

        按照Key处理分区内和分区间逻辑

val value1: RDD[(String, Int)] = sc.makeRDD( List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4) val result1: RDD[(String, Int)] = value1.aggregateByKey(10)(_ + _,_ + _) result1.collect().foreach(println)

6.foldByKey()

        即分区内和分区间相同的aggregateByKey()

val value1: RDD[(String, Int)] = sc.makeRDD( List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4) value1.foldByKey(10)(_ + _).collect().foreach(println) value1.foldByKey(10)((res,elem) => math.max(res,elem)).collect().foreach(println)

7.combineByKey()

        转换结构后分区内和分区间操作,针对相同Key,将Value合并成一个集合。

val value1: RDD[(String, Int)] = sc.makeRDD( List(("a", 10), ("b", 7), ("a", 11), ("b", 21)), 4) // 对上面的元素进行归约 (单词,("product",21)) val value: RDD[(String, (String, Int))] = value1.combineByKey( i => ("product", i), // 分区内计算 是转换结构后的初始值和分区相同key的元素值进行归约 (res: (String, Int), elem: Int) => (res._1, res._2 * elem), // 分区间计算 将每个分区相同key的res值进行合并 (res: (String, Int), elem: (String, Int)) => (res._1, res._2 * elem._2) ) value.collect().foreach(println) println("==========================") val list: List[(String, Int)] = List( ("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)) val listRDD1: RDD[(String, Int)] = sc.makeRDD(list) val result2: RDD[(String, (Int, Int))] = listRDD1.combineByKey( // 将(a,88) => (a,(88,1)) 因为算子已经内部按照key聚合了 所以写的时候只写value i => (i, 1), (res: (Int, Int), elem: Int) => (res._1 + elem, res._2 + 1), // 分区内累加 将相同分区相同key的值合并 (88,1)和91 => (179,2) // 分区间累加 将不同分区相同key的二元组合并在一起 (179,2) 和 (95,1) => (274,3) (res: (Int, Int), elem: (Int, Int)) => (res._1 + elem._1, res._2 + elem._2) ) result2.mapValues({ case (res,elem) => res.toDouble/elem }).collect().foreach(println)

区别:

8.sortByKey()

        按照K进行排序,在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

val value: RDD[(String, Int)] = sc.makeRDD( List(("a", 110), ("b", 27), ("a", 11), ("b", 21)), 2) // 默认使用range分区器(尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。) // 固定使用二元组中的key进行排序 不会使用value value.sortByKey(false).collect().foreach(println) // 使用value排序 // 可以使用sortBy 底层仍为sortbykey value.sortBy(_._2).collect().foreach(println)

 9.mapValues()

       只对value进行修改

val value: RDD[(String, Int)] = sc.makeRDD( List(("a", 110), ("b", 27), ("a", 11), ("b", 21)), 2) value.mapValues(i => i*2).collect().foreach(println)

10.join() 

        在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

val value: RDD[(String, Int)] = sc.makeRDD( List(("a", 110), ("b", 27), ("a", 11), ("b", 21)), 2) val value1: RDD[(String, Int)] = sc.makeRDD( List(("a", 110), ("b", 27), ("a", 11), ("b", 21)), 2) // 将相同key合并 // join走shuffle 使用hash分区器 // 尽量保证join之前key是不重复的 如果有重复 会造成最终结果也是重复的 value.join(value1).collect().foreach(println) //解决重复 value.groupByKey().join(value1).collect().foreach(println)

 11.cogroup()

        在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"))) val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1,4),(2,5),(4,6))) rdd.cogroup(rdd1).collect().foreach(println)

 12.求top3

val listRDD: RDD[String] = sc.textFile("input/agent.log") val result1: RDD[(String, Int)] = listRDD.map({ line => { val strings: Array[String] = line.split(" ") (strings(1) + "-" + strings(2), 1) } }).reduceByKey(_ + _) result1.collect().foreach(println) val result2: RDD[(String, Iterable[(String, Int)])] = result1.map({ case (res, sum) => { val strings01: Array[String] = res.split("-") (strings01(0), (strings01(1), sum)) } }).groupByKey() val result3: RDD[(String, List[(String, Int)])] = result2.mapValues({ datas => { datas.toList.sortWith( (left, right) => { left._2 > right._2 } ).take(3) } })// 简写result3.mapValues(_.toList.sortWith(_._2 > _._2).take(3)) result3.collect().foreach(println)

 

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。