在大数据的应用场景中,数据的处理往往是在分布式环境上进行的,在这种情况下,数据关联的计算往往还要考虑网络分发这个环节
在分布式环境中,Spark支持两类数据分发模式,一类是shuffle,shuffle通过中间文件来完成Map阶段与Reduce阶段的数据交换;另一类是广播变量,广播变量在Driver端创建,并由Driver分发到各个Excutors,因此从数据分发的角度看,数据关联又能分为Shuffle Join和Broadcase Join
eg:
import org.apache.spark.sql.Dataframeimport spark.implicits._val seq = Seq((1,"li",22,"Male"),(2,"shi",24,"Female"),(3,"ming",26,"Female"))val employees:Dataframe = seq.toDF("id","name","age","gender")val seq2 = Seq((1,20000),(2,30000),(3,40000),(4,50000))val salaries:Dataframe = seq2.toDF("id","salary")
一、Shuffle Join
以上面的员工表与薪资表为例,如果对employees和salaries按照id列做关联,那么对于id字段相同的员工数据和薪资数据,我们要保证它们坐落在同样的Executors进程中,SparkSql才能利用HJ、SMJ、NLJ,以Excutos(进程)为粒度并行地完成数据关联在没有开发者干预的情况下,Spark默认使用Shuffle Join来完成分布式环境下的数据关联Spark Sql之所以在默认情况下一律采用Shuffle Join,原因在于Shuffle
Join的万金油属性,在任何情况下,不论数据的体量大小、不管内存是否足够,Shuffle Join都能完成数据的关联计算
二、Broadcase Join
Spark不仅可以在普通变量上创建广播变量,在分布式数据集(RDD、Dataframe)上也可以创建广播变量,因此,对于参与关联的两张表,我们可以把较小的封装成广播变量
import org.apache.spark.sql.functions.broadcast//创建员工表的广播变量val bcEmployees = broadcast(employees)val joinDF:Dataframe = salaries.join(bcEmployees,salaries("id") === employees("id"),"inner")
在Broadcast Join的执行过程中,Spark
Sql首先从各个Executors收集employees表的数据分片,然后在Driver端创建广播变量bcEmployees
携带者employees表数据分片的广播变量被分发到各个executors,体量较大的薪资表只要保持不动,就可以轻松关联到与它保持一致的员工表数据了
尽管广播变量的创建与分发同样需要消耗网络带宽,但相比Shuffle
Join中两张表的全网分发,仅仅通过分发体量较小的数据来完成数据关联,Spark Sql的执行性能显然要高得多
三、Spark Sql支持的Join策略
不论是Shuffle Join,还是Broadcast Join,一旦数据分发完毕,理论上可以采用HJ、NLJ、SMJ这三种实现机制的任意一种,因此,两种分发模式与三种实现机制,组合起来,总共有6种分布式Join策略
如图所示,Spark Sql支持其中五种(除却红字)
HJ的执行效率不低于SMJ,为什么在等值关联中优先取Shuffle SMJ?
在Shuffle的实现机制中,map阶段会对数据做排序,而这正好符合SMJ机制,对于已经排序好的两张表,SMJ的复杂度为O(M+N),与HJ的O(M+N)不相上下;再者SMJ的稳定性远胜HJ,在内存受限的情况下,SMJ可以充分利用磁盘完关联计算。因此,优先考虑Shuffle
Join