1、当join 条件使用 df("col_1") === df2("col_2"), 由于spark sql 执行计划的原因,有可能会导致结果出现一异常
解决方案: 使用sql 或者 重命名为一样的字段名字
@Test def joinTest() = { spark.conf.set("spark.sql.crossJoin.enabled", true) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.orc.filterPushdown", false) val source = Seq( ("11111111", Seq("22222222", "33333333")), ("22222222", Seq("44444444", "33333333")), ("33333333", Seq("44444444", "22222222")), ("44444444", Seq( "22222222")) ) val rawDF = spark.createDataframe(source).toDF("esn", "esn_list") rawDF.createOrReplaceTempView("raw_tb") rawDF.printSchema() rawDF.show(false) val tmpDF = rawDF.select( col("esn") , explode(col("esn_list")).as("group_esn") ) tmpDF.createOrReplaceTempView("tmp_db") tmpDF.printSchema() tmpDF.show(false) val rawTmpDF = rawDF.selectExpr("esn as group_esn", "esn_list") //val rawTmpDF = rawDF.selectExpr("esn as esn2", "esn_list") // val testDF = tmpDF.join(rawDF, rawDF("esn") === tmpDF("group_esn"), "inner") // 结果为空 val testDF = tmpDF.join(rawTmpDF, rawTmpDF("group_esn") === tmpDF("group_esn"), "inner")// val testDF = tmpDF.join(rawTmpDF, rawTmpDF("esn2") === tmpDF("group_esn"), "inner") testDF.printSchema() testDF.show(false) testDF.explain(true)// sql 正常没有问题// val sql =// """// |select *// |from raw_tb t1// |inner join tmp_db t2// |on t1.esn = t2.group_esn// |// |""".stripMargin//// val frame = spark.sql(sql)//// frame.printSchema()// frame.show(false)// frame.explain(true) }