欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkDataFrameAPI踩坑记

时间:2023-06-22

1、当join 条件使用 df("col_1") === df2("col_2"), 由于spark sql 执行计划的原因,有可能会导致结果出现一异常

解决方案: 使用sql 或者 重命名为一样的字段名字

@Test def joinTest() = { spark.conf.set("spark.sql.crossJoin.enabled", true) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.orc.filterPushdown", false) val source = Seq( ("11111111", Seq("22222222", "33333333")), ("22222222", Seq("44444444", "33333333")), ("33333333", Seq("44444444", "22222222")), ("44444444", Seq( "22222222")) ) val rawDF = spark.createDataframe(source).toDF("esn", "esn_list") rawDF.createOrReplaceTempView("raw_tb") rawDF.printSchema() rawDF.show(false) val tmpDF = rawDF.select( col("esn") , explode(col("esn_list")).as("group_esn") ) tmpDF.createOrReplaceTempView("tmp_db") tmpDF.printSchema() tmpDF.show(false) val rawTmpDF = rawDF.selectExpr("esn as group_esn", "esn_list") //val rawTmpDF = rawDF.selectExpr("esn as esn2", "esn_list") // val testDF = tmpDF.join(rawDF, rawDF("esn") === tmpDF("group_esn"), "inner") // 结果为空 val testDF = tmpDF.join(rawTmpDF, rawTmpDF("group_esn") === tmpDF("group_esn"), "inner")// val testDF = tmpDF.join(rawTmpDF, rawTmpDF("esn2") === tmpDF("group_esn"), "inner") testDF.printSchema() testDF.show(false) testDF.explain(true)// sql 正常没有问题// val sql =// """// |select *// |from raw_tb t1// |inner join tmp_db t2// |on t1.esn = t2.group_esn// |// |""".stripMargin//// val frame = spark.sql(sql)//// frame.printSchema()// frame.show(false)// frame.explain(true) }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。