欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkDPP导致的DataSourceScanExecNullPointerException问题分析以及解决

时间:2023-04-22
背景

本文基于spark 3.1.2,且运行在yarn模式下
最近在调试 spark sql的时候遇到了空指针的问题,如下:

Caused by: java.lang.NullPointerExceptionat org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57)at org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:172)at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:635)at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:162) ... at org.apache.spark.sql.catalyst.expressions.SubExprevaluationRuntime.proxyexpressions(SubExprevaluationRuntime.scala:89)at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.(predicates.scala:53)at org.apache.spark.sql.catalyst.expressions.Predicate$.createInterpretedObject(predicates.scala:92)at org.apache.spark.sql.catalyst.expressions.Predicate$.createInterpretedObject(predicates.scala:85)at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:56)at org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:101)at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2(basicPhysicalOperators.scala:246)at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2$adapted(basicPhysicalOperators.scala:245)at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ...at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:131)

分析

遇到以上问题,一脸懵?
我们分析一下,这里会涉及到spark的DPP(动态分区裁剪) 以及codegen部分。我们提取最小化的报错sql如下(脱敏处理):

create table test_a_pt(col1 int, col2 int,pt string) USING parquet PARTITIonED BY (pt);insert into table test_a_pt values(1,2,'20220101'),(3,4,'20220101'),(1,2,'20220101'),(3,4,'20220101'),(1,2,'20220101'),(3,4,'20220101');drop table if exists test_b;create table test_b as select 1 as `搜索demo_uv` ,2 as `搜索demo_gmv`, 'gogo' as scenes, '2021-03-04' as date1;drop table if exists gg_gg;create table gg_gg as SELECT a.pt, a.scenesFROM ( SELECt '20220101' as pt ,'comeon' AS scenes FROM test_b where scenes='gogo' and exists(array(date1),x-> x =='2021-03-04') UNIOn ALL SELECt pt ,'comeon' FROM ( SELECt pt,COUNT( distinct col2) AS buy_tab_uv FROM test_a_pt where pt='20220101' GROUP BY pt ) ) aJOIN ( SELECt pt ,COUNT(distinct col2) AS buy_tab_uv FROM test_a_pt where pt='20220101' GROUP BY pt ) bON a.pt = b.pt;其中exists方法是继承CodegenFallback的

运行完后,我们可以拿到对应的物理计划,



这个sql会生成对应的DynamicPruningexpression(InSubqueryExec(value, broadcastValues, exprId))(这个对下面的分析很重要)表达式,至于为什么会生成对应的表达式,可以看对应的逻辑计划和物理计划的生成日志,这里就不再说明。
我们直接拿堆栈信息进行分析,首先是
Task.run 这说明是在Executor端报错的
再者是FilterExec,对应的代码如下:

protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val predicate = Predicate.create(condition, child.output) predicate.initialize(0) iter.filter { row => val r = predicate.eval(row) if (r) numOutputRows += 1 r } } }

这里的Predicate.create方法会根据表达式来生成对应的basePredicate类,这个类是对输入的每一行进行布尔计算的。
按照正常的流程的话,其实是会先调用在createCodeGeneratedObject,如果报错,就会再调用createInterpretedObject方法的。
细心的同学会发现,我们的报错stack中就只有Predicate$.createInterpretedObject,那第一部的代码生成呢?去哪了?
其实我们知道表达式的代码生成是在executor端的,所以我们可以找到对应executor端的代码,可以找到对应的信息:

22/02/26 19:36:19 WARN Predicate: Expr codegen error and falling back to interpreter modejava.lang.NullPointerExceptionat org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57)at org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:172)at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:635)at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:162)at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:387)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)

WARN Predicate: Expr codegen error and falling back to interpreter mode 这句就说明了该sql生成codegen的代码报错了,至于为什么生成代码失败了,下一次分析。
我们暂且就按照报错的堆栈进行分析,直接转到InterpretedPredicate类:

case class InterpretedPredicate(expression: expression) extends basePredicate { private[this] val subExprEliminationEnabled = SQLConf.get.subexpressionEliminationEnabled private[this] lazy val runtime = new SubExprevaluationRuntime(SQLConf.get.subexpressionEliminationCacheMaxEntries) private[this] val expr = if (subExprEliminationEnabled) { runtime.proxyexpressions(Seq(expression)).head } else { expression }

这里有个subExprEliminationEnabled选项,这个选项是来消除公共表达式的,可以节省计算的时间,默认是开启的,注意这里很关键。
因为该选项开启了,所以我们接着往下走:
proxyexpressions的expressions.foreach(equivalentexpressions.addExprTree(_))方法:

def addExprTree( expr: expression, addFunc: expression => Boolean = addExpr): Unit = { val skip = expr.isInstanceOf[Leafexpression] || // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the // loop、So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. expr.find(_.isInstanceOf[LambdaVariable]).isDefined || // `Planexpression` wraps query plan、To compare query plans of `Planexpression` on executor, // can cause error like NPE. (expr.isInstanceOf[Planexpression[_]] && TaskContext.get != null) if (!skip && !addFunc(expr)) { childrenToRecurse(expr).foreach(addExprTree(_, addFunc)) commonChildrenToRecurse(expr).filter(_.nonEmpty).foreach(addCommonExprs(_, addFunc)) } }

这里有个skip判断,其中有一条是expr.isInstanceOf[Planexpression[_]] && TaskContext.get != null,这个显然是针对executor端的,因为在executor端会生成TaskContext实例。
还记得我们说的DynamicPruningexpression(InSubqueryExec(value, broadcastValues, exprId))表达式吗(broadcastValues 会包含FileSourceScanExec类)?在这里就很关键了,
但是从报错的堆栈信息里,明显可以知道skip是false,所以才会ReusedExchangeExec.doCanonicalize等信息,才会报NPE问题。
分析一下这里expr.isInstanceOf[Planexpression[_]] 很显然这个判断是不能判断DynamicPruningexpression的,因为InSubqueryExec才是Planexpression子类,才满足这个条件,
所以我们改成expr.isInstanceOf[Planexpression[_]] -> expr.find(_.isInstanceOf[Planexpression[_]]).isDefined
这样我们就能解决该问题。

我们接着往下走(中间的调用看堆栈信息即可):
DataSourceScanExec的57行,

trait DataSourceScanExec extends LeafExecNode { def relation: baseRelation def tableIdentifier: Option[TableIdentifier] protected val nodeNamePrefix: String = "" override val nodeName: String = { s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" } // metadata that describes more details of this scan. protected def metadata: Map[String, String] protected val maxmetadataValueLength = sqlContext.sessionState.conf.maxmetadataStringLength ...

也就是protected val maxmetadataValueLength = sqlContext.sessionState.conf.maxmetadataStringLength这行会报错。
其中sqlContext在SparkPlan类中 @transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
SparkSession.getActiveSession方法如下:

def getActiveSession: Option[SparkSession] = { if (TaskContext.get != null) { // Return None when running on executors. None } else { Option(activeThreadSession.get) } }

也就是说在executor端getActiveSession返回的是None,从而引发了NPE。
这样整个代码就完全捋顺了。

解决 关闭DPP(动态代码生成)
set spark.sql.optimizer.dynamicPartitionPruning.enabled=false关闭公共表达式消除
set spark.sql.subexpressionElimination.enabled=false修改代码,遇到DPP,直接跳过
这里有对应的jira

BTW 本文只是针对spark 3.1.2,对于3.2以及以上的版本代码进行了重构,具体的一些社区讨论可以参考SPARK-23731,SPARK-35742,SPARK-35798

其实解决方法1.2.3都是逐级的把影响范围缩小,至于怎么选择,看自己的选择。。。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。