欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Spark序列化

时间:2023-04-30
目录

spark序列化方式SparkSql与序列化在spark中使用kryo使用kryo但不注册类kryo与java占用内存对比Encoders.kryo vs Encoders.javaSerialization参考 spark序列化方式

分布式的程序存在着网络传输,无论是数据还是程序本身的序列化都是必不可少的。spark自身提供两种序列化方式:

java序列化:这是spark默认的序列化方式,使用java的ObjectOutputStream框架,只要是实现了java.io.Serializable接口的类都可以,这种方式虽然通用但是性能差,占用的空间也比较大kryo序列化:相比于java序列化,kryo更高效且序列化结果紧凑,占用空间小,但是不能序列化所有数据类型,且为更好的性能,需要在程序中注册需要序列化的类

kryo不作为默认的序列化方式,是因为需要显式注册自定义的类型,自spark2.0后,对于一些简单类型的rdd(AllScalaRegistrar默认注册了一些常用的基本类型)在shuffling时内部默认使用kryo作序列化

SparkSql与序列化

SparkSql并不使用kryo或java序列化,Dataset使用的是Encoder将jvm对象转换为二进制(《spark数据格式UnsafeRow》),类似于序列化过程,但是Encoder是动态生成代码,并使用标准的InternalRow格式,使得spark可以直接基于字节上做很多操作(不需要反序列化过程),比如filtering,sorting和hashing;Encoder比kryo和java序列化更轻量级,因为它不用额外保存类的描述信息。

在spark中使用kryo

kryo使用比较麻烦,但为了更好的性能和使用更少的内存,还是建议使用kryo序列化。

初始化sparkcontext时指定使用kryo序列化向kryo注册自定义类(registerKryoClasses->org.apache.spark.serializer.KryoSerializer.classesToRegister)

val conf=new SparkConf().setAppName("kryo-test").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrationRequired", "true") .registerKryoClasses(Array(classOf[com.test.spark.KryoTest.Person], classOf[Array[com.test.spark.KryoTest.Person]], classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))val spark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()case class Person(val name: String, val age: Long)val array = (1 to 100000).map(v => Person("person" + v, v)).toSeqval rdd = spark.sparkContext.parallelize(array, 5).persist(StorageLevel.MEMORY_ONLY_SER)rdd.countrdd.take(10).foreach(p => println(p.age, p.name))

上面的示例执行完后在sparkui上看到使用内存为1641.3 KB

使用kryo但不注册类

val conf=new SparkConf().setAppName("kryo-test").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val spark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()case class Person(val name: String, val age: Long)val array = (1 to 100000).map(v => Person("person" + v, v)).toSeqval rdd = spark.sparkContext.parallelize(array, 5).persist(StorageLevel.MEMORY_ONLY_SER)rdd.countrdd.take(10).foreach(p => println(p.age, p.name))

上面的示例执行完后在sparkui上看到使用内存为4.8 MB,是注册类时1641.3 KB的差不多3倍,不仅如此,如果在Stage界面观察Task Deserialization Time和Result Serialization Time两项指标,也可以看出注册了类的话耗时更少。

当序列化Person实例对象时,如果不注册Person类,那么会写入Person类的完全限定类名,如果注册了,则会使用一个int类型的ID(1-2字节)代替完全限定类名,显然注册类更加高效且节省空间

这个ID是通过com.esotericsoftware.kryo.Kryo#nextRegisterID自增生成的,与类唯一对应,在反序列化时通过ID反向找到类然后实例化。所以在spark这种分布式的程序中,Person类在所有executor中都必需对应着相同的ID值,这是如何保证的?是因为在所有的executor中代码相同所以注册类的顺序一致,还是在driver中把所有类统一注册然后广播到各个executor中?

kryo与java占用内存对比

将上面代码改为使用java序列化方式,最终得到2.7MB

序列化方式占用内存kryo且注册类1641.3 KBkryo不注册类4.8 MBjava2.7 MB

可以看到kryo在不注册类的情况时,rdd缓存占用的内存比使用java时还要多

Encoders.kryo vs Encoders.javaSerialization

在Dataset中如果想使用kryo序列化,可以通过工厂类org.apache.spark.sql.Encoders生成一个使用kryo序列化/java序列化的Encoder,但是创建的Dataset并不是一个标准的数据集,因为得到的数据集中有唯一一列"value",而这个列的值则是整行记录的二进制数据

val df_java_ser = spark.createDataset(array)(Encoders.javaSerialization(classOf[Person])).persist(StorageLevel.MEMORY_ONLY_SER)df_java_ser.countdf_java_ser.showval df_kryo_ser = spark.createDataset(array)(Encoders.kryo(classOf[Person])).persist(StorageLevel.MEMORY_ONLY_SER)df_kryo_ser.countdf_kryo_ser.show(3)//show的结果如下+--------------------+| value|+--------------------+|[01 00 63 6F 6D 2...||[01 00 63 6F 6D 2...||[01 00 63 6F 6D 2...|+--------------------+

上面分别使用kryo和java序列化时分别占用内在为:5.2M和16.8M

在Dataset两种方式最后大小都比上面rdd的多,猜测这是因为行数据序列化为二进制后被包装成UnsafeRow,所以需要很多额外的空间。
但是在同样没注册到Person类的情况下,RDD是java优于kryo,Dataset是kryo优于java,这是什么原因,求留言告知!!!

撇开Dataset直接对Person对象进行序列化(一样不注册Person类):

val bf_kryo = new org.apache.spark.serializer.KryoSerializer(new SparkConf()).newInstance().serialize(Person("abcd" , 1))val bf_java = new org.apache.spark.serializer.JavaSerializer(new SparkConf()).newInstance().serialize(Person("abcd" , 1))

kryo使用了42字节,java使用了103字节,既然单个对象的序列化结果都是kryo优于java,为什么在RDD中却相反,why?

参考

https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7abhttps://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab

https://spark.apache.org/docs/latest/tuning.html#data-serializationhttps://spark.apache.org/docs/latest/tuning.html#data-serialization

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。