1、快速入门
a)
val conf = new SparkConf().setAppName("xxx").setMaster("xxx")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().appName("xxx").getOrCreate()
b)
val、var、def的区别
val:定义的变量不能再被赋值,修饰的对象引用不可改变、状态可变。
var:定义的变量可以再被赋值,修饰的对象引用可以改变。
def:定义的成员变量,如果不具备setter和getter方法,那么在子类中通过override重写时报错,具体如下
abstract class Person{
def id: Int //定义一个方法
}
class Student extends Person{
override var id = 9527 //错误,因为父类中没有setter方法
}
改正如下:
abstract class Person{
var id: Int //var变量会自动生成getter和setter方法
}
class Student extends Person{
override var id = 9527 //错误,因为父类中没有setter方法
}
c)简单例子
val textFile = spark.read.textFile("xxx") | val textFile = spark.read.format("text")[.option(k,v)...].load("xxx") | val textFile = spark.SparkContext.textFile("xxx")
PS:可以传入文件夹的形式来读取所有文件格式的文件,也可以使用通配符*来匹配。load加载在不指定format的情况下默认加载的是parquet文件。
textFile.count() //统计DS中的元素数量
textFile.first() //获取DS中的第一个元素值
textFile.filter(line => line.contains("xxx")).count() //筛选出包含xxx值的元素的数量
textFile.map(line => line.length) //将每一行的内容换成长度值
textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b) //查询一行中单词数最多的值
textFile.map(line => line.split(" ").size).reduce((a,b) => Math.max(a,b)) //需要导入java.lang.Math
val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() //根据空格分割单词后按Key分组求每一组的元素总数,groupByKey方法中的identity不能省略
textFile.map(line => line.split(" ")).groupByKey(identity).count() //无法根据空格分隔符分割单词,还是按每一行作为key来进行分组,因为map转换操作是一对一的
for(item <- wordCounts.collect()){println(item._1+"-"+item._2)} //使用for或者foreach遍历分组后的数据
wordCounts.foreach(line => println(line._1+"-"+line._2))
d)
缓存RDD数据集到内存:RDD调用persist()或者cache()方法
释放RDD数据集内存:RDD调用unpersist()或者uncache()方法
e)
双引号中获取scala成员变量值:$、println(s"$xxx")
2、RDD编程指南
a)概述
每个Spark程序都包含一个Driver(运行应用程序中的main方法并在集群上执行各种并行操作)和一个executor(执行Task任务)
b)RDD(弹性分布式数据集)定义
分布式、不可变的、可分区的数据集。RDD上一个分区对应一个Task任务,并且RDD出现故障后,会自动根据Lineage(血统)来自动恢复。
c)共享变量:broadcast(广播变量)、accumulator(累加器)
默认情况下,若是在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝一个副本到每一个task中。此时每一个task只能操作本身的那份副本。若是多个task想要共享某个变量
那么这种方式是做不到的。为此,Spark提供了两种共享变量:broadcast和accumulator,两类共享变量都是由SparkContext对象创建的(sc.broadcast(共享变量v)、sc.longAccumulator(共享变量v))
broadcast将使用到的共享变量v为每一个节点拷贝一份该变量的副本而不是节点下每个用到该变量的tash都拷贝一份副本,这样可以更大的优化性能,减小网络传输以及内存消耗。当节点下的task用到
该变量的副本时,直接到节点的executor中获取,如果不存在,再去Driver中获取并保留一个副本在当前节点以便下次使用。broadcast是只读的,每一个节点都可以通过广播变量的value属性来获取
广播变量里的值。
而accumulator累加器则是让多个task共同操作一份变量,只提供了累加(add())操作。只有Driver程序能够通过value属性获取到累加器的值。
PS:要释放广播变量赋值到executor程序的资源,可以调用广播变量的unpersist()方法,如果之后再次使用到广播变量,则会重新广播。要永久释放广播变量使用的资源,请使用destory()方法,之后
不能再使用广播变量。这些方法默认情况下不会阻塞,要阻塞直到资源被释放,在调用它们的时候指定blocking=true参数。如unpersist(true)或者destory(true)
d)RDD的创建方式
1、通过外部存储文件系统(如HDFS等)创建:spark.read.textFile或者spark.read.format().load()或者sparkContext.textFile
2、通过scala集合以并行化的方式创建:sparkContext.parallelize(Array | Seq)
3、通过现有RDD通过transformation创建
PS:sparkContext.textFile()读取文件时每一行的数据是:key->相对于文本的偏移量、value->每一行的内容,如果想要将每一个文本文件的名称作为key,文本文件内容作为value的话,可以使用
sparkContext.wholeTextFiles()读取包含多个小文件的目录,该方法的第二个可选参数是空值分区数的。
e)RDD操作
1、RDD支持的两种类型算子操作:transformation(转换)和action(动作)
PS:Spark中所有的转换都是惰性的,也就是说,它不会立即计算结果,只是记录各个RDD间的转换关系来构建DAG图。只有当执行action操作时,才会真正的计算结果,action操作会根据DAG图来
将Job依据RDD的依赖类型类进行Stage的划分,宽依赖之前的转换为一个Stage,宽依赖之后的转换为另一个Stage,每个Stage又会被划分为多个Task,最后不同的Task在不同的节点上并行执行来计算
最后结果。一个Job中,Stage的个数等于宽依赖的个数+1,一个Stage的最后一个RDD会有多个分区,即包含多个Task,也就是说最后一个RDD的分区个数就是这一Stage中对应的Task个数。
2、两种类型算子的区别
a、transformation算子:返回的是新的RDD,常用的有:map、mapValues、filter、flatMap、mapPartitions、uoin、join、distinct、xxxByKey
b、action算子:返回一定不是RDD,常用的有:count、collect、collectAsMap、first、reduce、fold、aggregate、saveAsTextFile。
PS:默认情况下,每个转换后的RDD在每次运行操作时都会重新计算该RDD,为了RDD的重用性,可以将频繁使用到的RDD缓存至内存中,使用persist()或者cache()方法将RDD缓存至内存中,如果需要从
内存中释放RDD,可以调用unpersist()或者uncache()方法来将RDD从内存中释放。
PS:会导致Shuffle操作的transformation算子:xxxByKey(除了countByKey)、Join(join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cogroup)、distinct、intersection、subtract
partitionBy、repartition。
f)将函数传递给Spark来处理数据集中的每一个元素
1、匿名函数语法的形式:
val func1 = (str:String) => str.length()
textFile.map(func1).foreach(line => println(s"this length is : $line")) //map算子将每一行作为参数传递给func1匿名函数,func1返回每一行的长度值。
2、全局单例对象中的静态方法:
object MyFunction{
def func1(str:String):Long = {
str.length()
}
}
textFile.map(MyFunction.func1).foreach(line => println(s"this length is : $line"))
g)闭包(以foreach举例)
val data = Array(1,2,3,4,5)
var counter = 0
var rdd = sc.parallelize(data)
//Wrong: Don't do this
rdd.foreach(x => counter += x)
println("Counter value is :"+counter) //结果还是0
上述代码的行为是未定义的,并且不同模式下运行情况不同。为了执行作业,Spark将RDD操作的处理分解为tasks,每个task由Executor执行。在执行之前,Spark会计算task的闭包。闭包是Executor在RDD上进行计算的时候必须可见的那些变量和方法(在这种情况下是foreach())。闭包会被序列化并发送给每个Executor。
发送给每个Executor的闭包中的变量是副本,因此,当foreach函数内引用计数器时,它不再是driver节点上的计数器。driver节点的内存中仍有一个计数器,但该变量是Executor不可见的!执行者只能看到序列化闭包的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用了序列化闭包内的值。
h)输出RDD的元素
常见的用法是使用rdd.foreach(println)或者rdd.map(println)来输出RDD的元素。在一台机器上的时候,这种方法将会正确的输出所有RDD的元素。但是在cluster模式下的时候,由Executor执行输出
写入的是Executor的stdout,而不是driver上的那个stdout,所以driver得stdout不会显示这些。要在driver上输出所有的元素,可以使用collect方法首先将RDD数据带到driver节点:rdd.collect().for
each(println)。但这可能导致driver程序内存不足,因为collect()会将整个RDD数据提取到Driver端;如果只需要打印RDD的一些元素,可以使用take()方法:rdd.take(100).foreach(println)
i)键值对RDD
1、常见的仅适用于键值对的RDD是Shuffle操作,在Scala中,这些操作包含在Tuple2(元组,最简单的元组创建方式(a,b))对象的RDD上自动可用。sortByKey()方法按字母顺序进行排序。
2、sortBy和sortByKey函数(注意:只适用于RDD,不适用于DS和DF)
a)sortBy是对标准的RDD[K]进行排序,RDD全局有序,如根据RDD的第一个元素进行降序排序rdd.sortBy(_._1,false),升序则为true,那么第二个参数可以省略,又或第一个元素相同则按第二个
元素排序,rdd.sortBy(e => (e._1,e._2)),注意,在集群模式下,需要先调用collect将元素拉取到Driver中,否则遍历的时候还是无序的。如下例子
val datas = sc.parallelize(Array(("cc",12),("bb",32),("cc",22),("aa",18),("bb",16),("dd",16),("ee",54),("cc",1),("ff",13),("gg",32),("bb",4)))
datas.sortBy(_._1,false).collect().foreach(line => println((line._1,line._2))) //按第一个元素降序排序并输出元素,升序排序sortBy方法第二个参数可以省略,第三个参数指定排序后
RDD的分区个数,默认是跟排序之前的分区个数相等。
datas.sortBy(e => (e._1,e._2),false).collect().foreach(line => println(line._1,line._2)) //先按第一个元素降序排序,再按第二个元素降序排序,升序及分区数同上。
b)sortByKey是作用于key-value形式的RDD[K,V],并对Key进行排序。同样的,在集群模式下需要先使用collect将元素拉取到driver中进行遍历才是有序的。接受两个参数,第一个指定是升序还
是降序,第二个指定的是排序后的分区数。默认值是升序、分区数跟排序之前的一致。
3、RDD、DS、DF之间的转换
RDD->DS、DF:rdd.toDS、rdd.toDF
DS->RDD、DF:ds.rdd、ds.toDF
DF->RDD、DS:df.rdd、df.as[type]
PS:在对键值对RDD操作中使用自定义操作对象作为键时,必须确保自定义对象中实现equals()方法和hashCode()方法。
PS:使用split按分隔符分割行时返回的是数组,访问数组使用索引的方式,如arr(0)、arr(1)。
j)常用的action算子
ruduce、collect、count、first、take、takeSample、takeOrdered、saveAsSequeneceFile、saveAsTextFile、saveAsObjectFile、countByKey、foreach。
k)Shuffle
Shuffle操作会对涉及到RDD的所有分区,由此Shuffle操作伴随的便是磁盘I/O、数据序列化、网络I/O。以reduceByKey算子为例来解释Shuffle过程。
在Spark中,数据通常不会跨分区分布在特定的位置上,当执行RDD算子操作时,比如reduceByKey转换算子,单个任务在单个分区上执行,为了组织单个reduceByKey,reduce任务需要执行all-to-all操作,也就是它必须从所有分区中读取数据以便找到所有该键对应的所有值,然后将它们跨分区组合到当前分组下来计算某个键的最终结果,这就成为Shuffle。
l)RDD持久化
Spark最重要的功能之一就是跨操作将数据集持久化到内存中,可以通过调用RDD的persist()或者cache()方法将RDD持久化到内存中。如果RDD的任何分区丢失,会根据Lineage(血统)重新计算改分区。
使用unpersist()或者uncache()方法将RDD从缓存中删除。
1、持久化策略
a)MEMORY_ONLY:默认缓存策略,只缓存在内存中,如果内存不够,则某些分区将不会被缓存,并且会在每次需要时重新计算。
b)MEMORY_ONLY_SER:将RDD以序列化的方式缓存在内存中。
c)MEMORY_AND_DISK:将RDD缓存在内存中,当内存不足时,将根据LRU(最近最少使用算法)将RDD持久化到磁盘上。
d)MEMORY_AND_DISK_SER:类似于MEMEOY_ONLY_SER。
e)DISK_ONLY:将RDD持久化到磁盘上,此方式如果用到RDD时,会导致磁盘I/O。
3、Spark SQL、DF(数据帧)、DS(数据集)
1、起点
Spark中所有功能的入口点是SparkSession类:var spark = SparkSession.builder().appName("xxx")[.confing("k","v")...].getOrCreate()
2、创建DF和DS的方式(测试每种创建的方式是否可行?)
1、DF
a)通过SparkSession实例的read或者load读取/加载cvs、json、jdbc、parquet文件的形式或者load文本文件创建。
b)通过SparkSession实例的createDataframe(DS,Schema)动态SCHEMA的方式。如Schema = StructType(List(StructFiled("col1",StringType,true),StructFiled("age",IntType,true)))其中true标识改字段值可为null。DS元素必须是json、cvx或者使用Row()、类对象
c)由RDD或者DS转换而来。DS(DS元素必须是json、cvs或者使用Row()、类对象).toDF("col1","col2"...)静态SCHEMA的方式创建。
d)通过Seq的toDF()创建得来,toDF()方法可以指定DF的schema字段列表,Array不能通过.toDF()来转换为DF。
2、DS
a)通过SparkSession实例的createDataset(Array | Seq)的方式创建。
b)通过SparkSession实例的read读取文本文件的方式创建。
c)由RDD或者DF转换而来。
3、DF常用操作
a)select():查询全部或指定字段的值。
b)filter():查询符合过滤条件的记录,可以跟select()结合使用。
c)show():打印记录。
d)groupBy():按指定字段分组。
e)count():统计DF中总记录数,不能单独跟show()结合使用,必须结合groupBy()才能跟show()一起使用。
4、以编程的方式在DF上运行SQL查询
调用df的create[OrReplace]TempView("tbl_name")或者create[OrReplace]WholeTempView("tbl_name")将DF作为一个局部或全局的临时视图,即可在DF上调用sql()方法执行SQL查询,如下
df.createOrReplaceTempView("person")
df.sql("select * from person where age > 10").show() //如果是全局临时视图,需要在表名前面加上global_temp.前缀,如global_temp.person
5、show()方法
a)在没有任何参数的情况下默认显示前20条记录。如show()
b)根据指定的参数显示特定的记录数。如show(maxNum)
c)每个字段的值是否最多显示20个字符,默认为true。如show(true)
d)同时指定需要显示的记录数以及每个字段是否最多只显示20个字符。show(maxNum,true|false)
6、df.withColumn()
在现有的DF上新增一列,如果DF中已经存在该列,则会替换,如:df.withColumn("colName","value")
7、查询DF列的方式
df.select():处理列或表达式,可以是*、$"colname"、colname、df.col("colname")、df("colname")、col("colname")、column("colname")、expr("colname"),列名字符串colname不能跟其他函数方式同时使用。如df.select("colname",$"colname").show()会报错。
df.selectExpr():处理字符串表达式,和在select()中使用expr()效果一样,方法内只能是列名字符串,col()、column()、expr()等都不能使用。可以指定别名,如df.selectExpr("col as alias").show()
spark.sql():需要事先创建临时视图,df.createOrReplaceTempView("tbl_name"),spark.sql("select col_name_list from tbl_name where ...").show()
8、使用字段别名的三种方式
select()方法中:如df.select(col("colname").as("alias")).show(),除了列名字符串方式不能通过as()来指定别名之外,其他函数方式都可以,如df.select("col".as("alais")).show()报错。
selectExpr()方法中:如df.selectExpr("colname as alaisname").show(),只能是字符串表达式的形式,不能是函数方式。
spark.sql()方法中:spark.sql("select colname as alais from tbl_name where ...").show()
9、字段类型转换,以IntegerType为例
select()方法:限制条件跟上面的一致,不能使用列名字符串,只能使用函数表达式。
a)df.select(col("colname").cast(IntegerType)).show()
b)df.select(col("colname").cast("int")).show()
c)df.select(col("colname").cast("integer")).show()
selectExpr()方法:限制条件跟上面的一样,只能是字符串表达式。以int类型为例
a)df.selectExpr("cast(colname as int) colname").show()
b)df.selectExpr("INT(colname)").show()
sql()方法:
a)spark.sql("select INT(colname) as alais from tbl_name where ...").show()
b)spark.sql("select cast(colname as int) colname from tbl_name where ...").show() //不能通过as来指定别名,如果在转换类型的时候需要指定别名,那么必须使用第一种方式。
10、创建一个空的DF
val emptyDF = spark.emptyDataframe
11、删除DF中的列(不会真正删除DF中的列,只是返回一个不包含删除列的新的DF数据集)
val df1 = df.drop("colname"...) | df.drop(col("colname")...) | df.drop(column("colname")...) | df.drop($"colname"...) | df.drop(expr("colname")...) | df.drop(df.col("colname")...) | df.drop(df("colname")...)
12、创建Schema的方式
1、val schema = new SchemaType().add("schemaName",Type,true|false).add("schemaName",Type,true|false)...
2、val schema = SchemaType(SchemaField("schemaName",Type,true|false),SchemaField("schemaName",Type,true|false)...)