本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见大数据技术体系
目录
Spark SQL 工作流程源码解析(一)总览(基于 Spark 3.3.0)
Spark SQL 工作流程源码解析(二)parsing 阶段(基于 Spark 3.3.0)
Spark SQL 工作流程源码解析(三)analysis 阶段(基于 Spark 3.3.0)
Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)
Spark SQL 工作流程源码解析(五)planning 阶段(基于 Spark 3.3.0)
关联
一篇文章了解 Spark 3.x 的 Catalog 体系
Spark 3.x 版本的 Table Catalog API 是怎样的?
Spark DataSource API v2 版本有哪些改进?v1 版本和 v2 版本有什么区别?
我们要做什么?
通过前面的学习,我们很容易就发现:
parsing 阶段要做的事情就是将SQL语句转化成 AST,它实际上只和 SQL语句 有关,也就是说,相同的SQL语句,最终生成的 AST 应该都是相同的。
而 analysis 阶段要做的就是对生成的 AST 进行进一步的解析,毕竟 之前生成的 AST 只是 SQL 语句转化来的,而在不同的环境下 SQL 语句能达成的效果也是不尽相同的,此时我们要做的就是因地制宜。
怎么因地制宜呢?
首先要明白我们有了哪些东西?我们才能根据我们手头上的东西来通盘考虑。
我们有了什么?
经过 parsing 阶段之后,我们得到了一个 AST:
此外,我们之前有了一套配置环境:
val spark = SparkSession.builder.master("local[*]").appName("SparkSQLExample").getOrCreate()
我们还准备了数据源和 schema
数据源来自这里:
val df = spark.read.json(DATA_PATH)
schema 信息来自这里:
df.createTempView("t_user")
我们看看上面的代码能带给我们什么关键信息吧:
我们知道了一个临时视图:t_user我们从 JSON 文件中推测到了字段名称和字段类型:上面的字段类型都是来自 Apache Spark 的内部类型。
看看源码实现吧 找到入口
def sql(sqlText: String): Dataframe = withActive { val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { // parsing 阶段 sessionState.sqlParser.parsePlan(sqlText) } Dataset.ofRows(self, plan, tracker) }
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker) : Dataframe = sparkSession.withActive { val qe = new QueryExecution(sparkSession, logicalPlan, tracker) qe.assertAnalyzed() new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)) }
// analyzed 延迟初始化 def assertAnalyzed(): Unit = analyzed
lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) { // 这里就是`analysis`阶段的入口啦 sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) }
Analyzer.executeAndCheckdef executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { // 已解析就直接返回当前逻辑计划 if (plan.analyzed) return plan // 这里利用了一个 `ThreadLocal[Int]` 类型避免解析器递归调用自己 AnalysisHelper.markInAnalyzer { // 执行 val analyzed = executeAndTrack(plan, tracker) try { // 执行完成后校验解析结果 checkAnalysis(analyzed) analyzed } catch { // 构造解析异常并抛出 case e: AnalysisException => val ae = e.copy(plan = Option(analyzed)) ae.setStackTrace(e.getStackTrace) throw ae } } }
可以看到analysis阶段的核心逻辑就 2 步:
执行校验我们先来看看执行
执行def executeAndTrack(plan: TreeType, tracker: QueryPlanningTracker): TreeType = { // 设置一下追踪器,关于查询计划追踪器请参考第一讲 QueryPlanningTracker.withTracker(tracker) { execute(plan) } }
def execute(plan: TreeType): TreeType = { var curPlan = plan // 这个是用于统计一些运行信息的,比如花了多少时间,跑了规则批等等 val queryExecutionMetrics = RuleExecutor.queryExecutionMeter // 当规则或者规则批被应用后,日志记录一下逻辑计划的改变 val planChangeLogger = new PlanChangeLogger[TreeType]() // 还是那个查询计划追踪器,参见第一讲 val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get // 执行前的度量信息 val beforeMetrics = RuleExecutor.getCurrentMetrics() // 这个是用来检查逻辑计划的完整性的,主要是通过校验命名表达式(Namedexpression)的 ID (exprId)是否唯一不重复 if (!isPlanIntegral(plan, plan)) { throw QueryExecutionErrors.structuralIntegrityOfInputPlanIsBrokenInClassError( this.getClass.getName.stripSuffix("$")) } batches.foreach { batch => // 规则批从哪一个逻辑计划开始 val batchStartPlan = curPlan // 迭代次数 var iteration = 1 // 记录当前的逻辑计划 var lastPlan = curPlan // 表示是否继续循环的标识 var continue = true // 运行到固定点(或者执行策略中规定的最大迭代次数) while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => // 规则运行的开始时间 val startTime = System.nanoTime() // 对逻辑计划应用规则 val result = rule(plan) // 规则的运行时间 val runTime = System.nanoTime() - startTime // 不等说明规则起了作用,是有效的 val effective = !result.fastEquals(plan) if (effective) { //记录一下有效的规则数量queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) // 记录一下有效的运行时间queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) // 日志打印一下规则是怎么变的 planChangeLogger.logRule(rule.ruleName, plan, result) } // 记录一下执行时间 queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) // 记录一下跑了多少个规则 queryExecutionMetrics.incNumExecution(rule.ruleName) // 使用查询计划追踪器记录一下一些和时间相关的信息 tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective)) // 跑完每个规则后都要检查一下逻辑计划的完整性 if (effective && !isPlanIntegral(plan, result)) { throw QueryExecutionErrors.structuralIntegrityIsBrokenAfterApplyingRuleError( rule.ruleName, batch.name) } result } // 迭代次数加 1 iteration += 1 if (iteration > batch.strategy.maxIterations) { // 只会日志打印那些运行次数大于 1 次的规则 if (iteration != 2) { val endingMsg = if (batch.strategy.maxIterationsSetting == null) { "." } else { s", please set '${batch.strategy.maxIterationsSetting}' to a larger value." } val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + s"$endingMsg" if (Utils.isTesting || batch.strategy.errorOnExceed) { throw new RuntimeException(message) } else { logWarning(message) } } // 检查单次规则批的幂等性 if (batch.strategy == once && Utils.isTesting && !excludedOnceBatches.contains(batch.name)) { checkBatchIdempotence(batch, curPlan) } continue = false } // 逻辑计划没变,此时就需要退出循环了 if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } // 日志打印一下逻辑计划从开始到现在是怎么变的 planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan) } // 日志打印一些度量信息 planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics) curPlan }
流程图如下所示:
整个流程看起来也比较简单,最核心的逻辑实际上就下面的一行代码:
val result = rule(plan)
这里代表的是对当前的逻辑计划应用特定的规则得到另一个逻辑计划。
看到这里,其实大家都明白了,执行相比来说并不重要,底层起到核心作用的是规则批(Batch)和规则(Rule)。
规则批(Batch)和规则(Rule)到底是什么呢?
规则批(Batch)
源码中的batches代表的就是规则批序列
protected def batches: Seq[Batch]
规则批(Batch)由规则名称、执行策略和规则列表组成。
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
执行策略
abstract class Strategy { def maxIterations: Int def errorOnExceed: Boolean = false def maxIterationsSetting: String = null }
执行策略分为 2 种类型:fixedPoint 和 Once
fixedPoint
// 如果在maxIterations中无法解决该计划,analyzer将抛出异常以通知用户增加SQLConf.ANALYZER_MAX_ITERATIONS 的值。 protected def fixedPoint = FixedPoint( conf.analyzerMaxIterations, erroronExceed = true, maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
SQLConf.ANALYZER_MAX_ITERATIONS 代表的是配置项spark.sql.analyzer.maxIterations
case class FixedPoint( override val maxIterations: Int, override val errorOnExceed: Boolean = false, override val maxIterationsSetting: String = null) extends Strategy
FixedPoint 代表运行到定点或最大迭代次数的策略,以先到者为准。
FixedPoint(1) 的规则批应该只运行一次。
once
case object once extends Strategy { val maxIterations = 1 }
once 代表是一种只运行一次且幂等的策略。
规则(Rule)
abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging { // 规则的整型 ID,用来修剪不必要的树遍历 protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName) val ruleName: String = { val className = getClass.getName if (className endsWith "$") className.dropRight(1) else className } def apply(plan: TreeType): TreeType}
其中 TreeType 代表的是 TreeNode 的任意一个子类,而 TreeNode 则是 Spark SQL 中所有树型结构的父类,像之前源码中的逻辑计划(LogicalPlan)就是它的一个子类。
规则批到底有哪些呢?
我们可以从 Analyzer.scala 文件中找到答案
override def batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, OptimizeUpdateFields, CTESubstitution, WindowsSubstitution, EliminateUnions, SubstituteUnresolvedOrdinals), Batch("Disable Hints", Once, new ResolveHints.DisableHints), Batch("Hints", fixedPoint, ResolveHints.ResolveJoinStrategyHints, ResolveHints.ResolveCoalesceHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Keep Legacy Outputs", Once, KeepLegacyOutputs), Batch("Resolution", fixedPoint, ResolveTablevaluedFunctions(v1SessionCatalog) :: ResolveNamespace(catalogManager) :: new ResolveCatalogs(catalogManager) :: ResolveUserSpecifiedColumns :: ResolveInsertInto :: ResolveRelations :: ResolvePartitionSpec :: ResolveFieldNameAndPosition :: AddmetadataColumns :: DeduplicateRelations :: ResolveReferences :: ResolveexpressionsWithNamePlaceholders :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowframe :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: ExtractWindowexpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: SessionWindowing :: ResolveInlineTables :: ResolveLambdaVariables :: ResolveTimeZone :: ResolveRandomSeed :: ResolveBinaryArithmetic :: ResolveUnion :: typeCoercionRules ++ Seq(ResolveWithCTE) ++ extendedResolutionRules : _*), Batch("Remove TempResolvedColumn", Once, RemoveTempResolvedColumn), Batch("Apply Char Padding", Once, ApplyCharTypePadding), Batch("Post-Hoc Resolution", Once, Seq(ResolveCommandsWithIfExists) ++ postHocResolutionRules: _*), Batch("Remove Unresolved Hints", Once, new ResolveHints.RemoveAllHints), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF, ResolveEncodersInUDF), Batch("UpdateNullability", Once, UpdateAttributeNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases), Batch("HandleAnalysisOnlyCommand", Once, HandleAnalysisOnlyCommand) )
因为底层跑的实际上就是一个个的规则(Rule),我们就以规则(Rule)为粒度,画出了下面的表格。
解析顺序
上表中的解析是按照顺序来执行的。
解析顺序如下图所示:
typeCoercionRules
上面的规则中有一组规则比较特别,还记得我们在第二讲(Spark SQL 工作流程源码解析(二)parsing 阶段(基于 Spark 3.3.0))提到的 Hive SQL ⇒ Spark SQL ⇒ ANSI SQL 的演变吗? 这组规则就和它有关。
ANSI SQL当 spark.sql.ansi.enabled 设置为 true 的时候,Spark 就会倾向 ANSI 的方式去处理 SQL。
当 spark.sql.ansi.enabled 设置为 false 的时候,Spark 就会倾向 HIVE 的方式去处理 SQL。
回到最初的例子
由于篇幅的限制,我们不可能做到详解每一种规则具体是怎么实现的。
所以,我们就回到最初的例子,搞懂它的解析相信可以做到触类旁通。
我们的例子中有 2 个规则起到了核心作用:
ResolveRelationsResolveReferencesResolveRelations
ResolveRelations 规则使用 catalog 中的具体关系替换未解析的关系(这里的关系指的表和视图)。
那么它是怎么实现的呢?
其实就 3 个步骤,具体的流程图如下:
下面,我们跟读源码来理解上面的流程图:
apply我们看下 apply 函数的实现,前面提到过,这是规则(Rule)中最核心的函数。
def apply(plan: LogicalPlan) : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
先来看看 resolveOperatorsUpWithPruning 是什么?
递归应用偏函数def resolveOperatorsUpWithPruning(cond: TreePatternBits => Boolean, ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan]) : LogicalPlan = { // 当前逻辑计划未被解析 并且 可以递归处理其子树 并且 对于id为ruleId的规则,此树节点及其子树没有被标记为无效。 if (!analyzed && cond.apply(self) && !isRuleIneffective(ruleId)) { // 防止嵌套调用,这里使用了一个 ThreadLocal[Int] 来记录调用的深度 AnalysisHelper.allowInvokingTransformsInAnalyzer { // 返回当前节点的副本,递归应用于其所有子节点 // 每一个规则的输入都是其子节点应用规则后的输出 val afterRuleonChildren = mapChildren(_.resolveOperatorsUpWithPruning(cond, ruleId)(rule)) // 如果前后逻辑计划未发生改变 val afterRule = if (self fastEquals afterRuleOnChildren) { CurrentOrigin.withOrigin(origin) { // 应用规则到逻辑计划 rule.applyOrElse(self, identity[LogicalPlan]) } } else { CurrentOrigin.withOrigin(origin) { // 应用规则到处理后的逻辑计划 rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan]) } } if (self eq afterRule) { // 标记规则(带有id ruleId)对此树节点及其子树无效。 self.markRuleAsIneffective(ruleId) self } else { // 复制节点标签 afterRule.copyTagsFrom(self) afterRule } } } else { self } }
可以看到这部分的核心在于偏函数 rule,我们再来看看它的实现。
由于 parsing 阶段完成后,我们的逻辑计划是这样的:
'Project [*]+- 'UnresolvedRelation [t_user], [], false
很明显就可以看出来,我们对应偏函数的分支是下面这个:
case u: UnresolvedRelation => lookupRelation(u).map(resolveViews).getOrElse(u)
查找关系我们需要从 Catalog 中查找对应的关系(表和视图)。
private def lookupRelation( u: UnresolvedRelation, timeTravelSpec: Option[TimeTravelSpec] = None): Option[LogicalPlan] = { // 查找临时视图 lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse { // 如果找不到的话,我们将尝试从关系缓存中查找 // 如果我们在视图中解析数据库对象(关系、函数等),我们可能需要当视图被创建后使用当前的 catalog 和 namespace 展开单个或多部分标识符。 expandIdentifier(u.multipartIdentifier) match { case CatalogAndIdentifier(catalog, ident) => val key = catalog.name +: ident.namespace :+ ident.name // 从关系缓存中查找 AnalysisContext.get.relationCache.get(key).map(_.transform { case multi: MultiInstanceRelation => val newRelation = multi.newInstance() newRelation.copyTagsFrom(multi) newRelation }).orElse { // 查不到手动加载并创建关系 val table = CatalogV2Util.loadTable(catalog, ident, timeTravelSpec) val loaded = createRelation(catalog, ident, table, u.options, u.isStreaming) // 更新缓存 loaded.foreach(AnalysisContext.get.relationCache.update(key, _)) loaded } case _ => None } } } }
怎么查找临时视图的呢?
private def lookupTempView( identifier: Seq[String], isStreaming: Boolean = false, isTimeTravel: Boolean = false): Option[LogicalPlan] = { // 我们正在解析一个视图并且当视图被创建的时候这个视图名称不是临时视图,我们就在这里早点返回 None if (isResolvingView && !isReferredTempViewName(identifier)) return None val tmpView = identifier match { // 我们的例子中 identifier 就一个:t_user // 通过 SessionCatalog 来查找临时视图 case Seq(part1) => v1SessionCatalog.lookupTempView(part1) case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2) case _ => None } // 找完了还得校验下 tmpView.foreach { v => if (isStreaming && !v.isStreaming) { throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted) } if (isTimeTravel) { val target = if (v.isStreaming) "streams" else "views" throw QueryCompilationErrors.timeTravelUnsupportedError(target) } } tmpView }
SessionCatalog 中是如何查找到对应临时视图的元数据信息呢?
对 SessionCatalog 困惑的同学先看看这个博客——一篇文章了解 Spark 3.x 的 Catalog 体系
def lookupTempView(table: String): Option[SubqueryAlias] = { val formattedTable = formatTableName(table) getTempView(formattedTable).map { view => SubqueryAlias(formattedTable, view) } }
先格式化一下表的名称
protected[this] def formatTableName(name: String): String = { if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) }
获取到对应的元数据信息,在转换成相应的逻辑计划
def getTempView(name: String): Option[View] = synchronized { getRawTempView(name).map(getTempViewPlan) }
def getRawTempView(name: String): Option[TemporaryViewRelation] = synchronized { tempViews.get(formatTableName(name)) }
哦豁~ 可以看出查找临时视图的逻辑就是到 SessionCatalog 的 Map 缓存中取出对应的临时视图,我们看看这个 Map 缓存:
protected val tempViews = new mutable.HashMap[String, TemporaryViewRelation]
我们现在已经很明确,想要找的临时视图的元数据信息就来自这个 HashMap 缓存。
那么,问题是:数据啥时候加进去的?
数据啥时候加进去的?很明显来自我们例子中下面的代码:
df.createTempView("t_user")
这一步就创建了我们需要的临时视图,我们看下它是怎么实现的~
@throws[AnalysisException] def createTempView(viewName: String): Unit = withPlan { createTempViewCommand(viewName, replace = false, global = false) }
private def createTempViewCommand( viewName: String, replace: Boolean, global: Boolean): CreateViewCommand = { val viewType = if (global) GlobalTempView else LocalTempView val tableIdentifier = try { // 看了第二讲的同学相信对这个不会陌生,和 parsing 阶段的逻辑基本都是类似的 sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName) } catch { case _: ParseException => throw QueryCompilationErrors.invalidViewNameError(viewName) } // 解析完视图后,构建一个创建视图的命令方便后面来执行 CreateViewCommand( name = tableIdentifier, userSpecifiedColumns = Nil, comment = None, properties = Map.empty, originalText = None, plan = logicalPlan, allowExisting = false, replace = replace, viewType = viewType, isAnalyzed = true) }
具体的执行在哪里呢?
@inline private def withPlan(logicalPlan: LogicalPlan): Dataframe = { Dataset.ofRows(sparkSession, logicalPlan) }
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataframe = sparkSession.withActive { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() new Dataset[Row](qe, RowEncoder(qe.analyzed.schema)) }
我们看到了啥?这不就是本讲源码解析的开头吗?
我们仿佛又回到了原点。
只不过,我们这次针对的是创建视图的命令,而本讲分析的是那个 SQL 语句。
所以,接下来和上面的逻辑不都差不多嘛,本质上还是应用一个个的规则!
那么核心区别在哪里呢?
我们既然针对的是创建视图的命令,有命令就必然要执行,所以,关键点在
CreateViewCommand 的 run 方法里面:
override def run(sparkSession: SparkSession): Seq[Row] = { if (!isAnalyzed) { throw QueryCompilationErrors.logicalPlanForViewNotAnalyzedError() } // 这是一个已经完成 analysis 阶段的计划 val analyzedPlan = plan // 如果用户指定的列不为空并且和 analysis 阶段的输出列不等就抛出异常 if (userSpecifiedColumns.nonEmpty && userSpecifiedColumns.length != analyzedPlan.output.length) { throw QueryCompilationErrors.createViewNumColumnsMismatchUserSpecifiedColumnLengthError( analyzedPlan.output.length, userSpecifiedColumns.length) } // 获取到 SessionCatalog val catalog = sparkSession.sessionState.catalog // 当创建一个永久视图时,不允许引用临时的对象 // 这里应当在 qe.assertAnalyzed() 之后被调用,也就是说,子节点都被解析过了 verifyTemporaryObjectsNotExists(isTemporary, name, analyzedPlan, referredTempFunctions) verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, name) // 临时视图,我们的例子就是这个类型 if (viewType == LocalTempView) { val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) val tableDefinition = createTemporaryViewRelation( name, sparkSession, replace, catalog.getRawTempView, originalText, analyzedPlan, aliasedPlan, referredTempFunctions) // 调用 SessionCatalog 创建临时视图的方法 catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) // 全局临时视图 } else if (viewType == GlobalTempView) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATAbase) val viewIdent = TableIdentifier(name.table, Option(db)) val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) val tableDefinition = createTemporaryViewRelation( viewIdent, sparkSession, replace, catalog.getRawGlobalTempView, originalText, analyzedPlan, aliasedPlan, referredTempFunctions) catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace) // 如果是永久视图的话, SessionCatalog 的缓存中有这个视图名称 } else if (catalog.tableExists(name)) { val tablemetadata = catalog.getTablemetadata(name) if (allowExisting) { // 如果遇到这种类型的 SQL: `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`、当目标视图不存在的时候就什么也不做 } else if (tablemetadata.tableType != CatalogTableType.VIEW) { throw QueryCompilationErrors.tableIsNotViewError(name) } else if (replace) { // 检测到循环的视图引用 CREATE OR REPLACE VIEW. val viewIdent = tablemetadata.identifier checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) // 在替换掉一个已经存在的视图的时候要把缓存给干掉 logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.") CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) // 处理这种类型的 SQL: `CREATE OR REPLACE VIEW v0 AS SELECT ...` // 老的视图里面的信息我们什么都不管,把它直接干掉再创建一个新的 catalog.dropTable(viewIdent, ignoreIfNotExists = false, purge = false) catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) } else { // 处理这种类型的 SQL: `CREATE VIEW v0 AS SELECT ...`、 // 当目标视图已经存在的时候就抛出异常 QueryCompilationErrors.viewAlreadyExistsError(name) } } else { // 如果不存在就创建视图 catalog.createTable(prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) } Seq.empty[Row] }
SessionCatalog 里面创建临时视图的方法是怎样的?
def createTempView( name: String, viewDefinition: TemporaryViewRelation, overrideIfExists: Boolean): Unit = synchronized { val table = formatTableName(name) if (tempViews.contains(table) && !overrideIfExists) { throw new TempTableAlreadyExistsException(name) } tempViews.put(table, viewDefinition) }
这里的tempViews 就是我们前面讲到的那个 HashMap,说明数据的来源找到了~
现在又一个问题来啦,数据长啥样子呢?
我们先把这个问题搁置,看看上面 3 步骤中的最后一个步骤:解析视图
解析视图private def resolveViews(plan: LogicalPlan): LogicalPlan = plan match { // 视图的子级应该是解析自“desc.viewText”的一个逻辑计划,变量`viewText`应该被定义,否则我们在生成视图算子时会抛出一个错误。 case view @ View(desc, isTempView, child) if !child.resolved => // 解析子节点中所有的 UnresolvedRelations 和视图 val newChild = AnalysisContext.withAnalysisContext(desc) { // 嵌套视图的深度 val nestedViewDepth = AnalysisContext.get.nestedViewDepth // 最大允许的深度 val maxNestedViewDepth = AnalysisContext.get.maxNestedViewDepth if (nestedViewDepth > maxNestedViewDepth) { throw QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( desc.identifier, maxNestedViewDepth, view) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { // 执行子节点 executeSameContext(child) } } // 由于在AnalysisContext之外,未解析的运算符在视图内部,可能解决不正确。 checkAnalysis(newChild) view.copy(child = newChild) // 封装表名称的别名子查询对象 case p @ SubqueryAlias(_, view: View) => // 副本递归调用 p.copy(child = resolveViews(view)) case _ => plan }
这里面本质上还是个递归调用嵌套视图解析的过程,没啥值得多描述的地方。
应用规则后经过了上面的一系列步骤,规则 ResolveRelations 应用后,生成的逻辑计划长这样:
'Project [*]+- SubqueryAlias t_user +- View (`t_user`, [addr#7,age#8L,name#9,sex#10]) +- Relation [addr#7,age#8L,name#9,sex#10] json
这里我们再回到上面的小问题:数据长啥样子呢?
数据长啥样子呢?我们在创建视图的命令执行前得到的是一个 LogicalRelation 类型的逻辑计划,所以关键在于这个计划是怎么打印成上面那样的?
LogicalRelation.simpleStringoverride def simpleString(maxFields: Int): String = { s"Relation ${catalogTable.map(_.identifier.unquotedString).getOrElse("")}" + s"[${truncatedString(output, ",", maxFields)}] $relation" }
AttributeReference.toStringoverride def toString: String = s"$name#${exprId.id}$typeSuffix$delaySuffix"
结合这两段代码实际上我们明白了一个困惑,上面的 #7,#8 实际上就是代表的表达式 ID啊,这个表达式就是命名表达式(Namedexpression),我们在博客中实际上多次提到它,忘记的同学不妨回看下前面的内容。
ResolveReferencesResolveReferences 规则的作用是将 UnresolvedAttribute 替换为逻辑计划节点子节点的具体 AttributeReference。
那么,它是怎么实现的呢?
还是老样子,先看 apply 方法:
applydef apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( AlwaysProcess.fn, ruleId) { // 此查询计划的所有子项并没有都被解析 case p: LogicalPlan if !p.childrenResolved => p // 等待规则 `DeduplicateRelations` 先解析冲突的属性 case p: LogicalPlan if hasConflictingAttrs(p) => p // 如果投影中包含 * 号,则去扩展它。 case p: Project if containsStar(p.projectList) => p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
Project [*]我们的例子中包含的 Project 投影对象里面很明显有个 * 号。
这里值得注意的是经过了ResolveRelations规则后,我们得到的逻辑计划实际上是个树状结果,其中只有 Project 节点(也就是根节点)才会走到下面的分支(我们调用规则实际上就是递归调用的过程,先解决下面的子节点,再解决上面的父节点)。
private def buildExpandedProjectList( exprs: Seq[Namedexpression], child: LogicalPlan): Seq[Namedexpression] = { exprs.flatMap { // 使用 Dataframe/Dataset 的 API: testData2.groupBy($"a", $"b").agg($"*") case s: Star => expand(s, child) // 使用 SQL 的 API 但是没有运行规则 ResolveAlias: SELECT * FROM testData2 group by a, b case UnresolvedAlias(s: Star, _) => expand(s, child) // 如果 exprs 是一个列表类型有多个元素,并且其中包含 * 号 case o if containsStar(o :: Nil) => expandStarexpression(o, child) :: Nil case o => o :: Nil }.map(_.asInstanceOf[Namedexpression]) }
通过 Project [*] 我们知道传入的 exprs 实际上就是这个 *,而在第二讲中,我们知道了 * 的类型是UnresolvedStar,它是Star类型的子类,所以上面的调用中很明显走的是第一个分支。
private def expand(s: Star, plan: LogicalPlan): Seq[Namedexpression] = { // 这个是用来捕获闭包内部抛出的 AnalysisExceptions 的,同时在异常上会附带一些 origin 信息 withPosition(s) { try { // 调用 UnresolvedStar 的 expand 方法 s.expand(plan, resolver) } catch { case e: AnalysisException => AnalysisContext.get.outerPlan.map { // 只有 Project 和 Aggregate 可以有 star 表达式 case u @ (_: Project | _: Aggregate) => Try(s.expand(u.children.head, resolver)) match { case Success(expanded) => expanded.map(wrapOuterReference) case Failure(_) => throw e } // 不要使用外部计划来解析 star 表达式 // 因为 star 的使用无效 case _ => throw e }.getOrElse { throw e } } } }
*我们再看看 UnresolvedStar 的 expand 方法
override def expand( input: LogicalPlan, resolver: Resolver): Seq[Namedexpression] = { // 如果没有指明任何表,使用所有的非隐藏属性 if (target.isEmpty) return input.output // 如果指明了一个表,隐藏的属性也得使用 val hiddenOutput = input.metadataOutput.filter(_.supportsQualifiedStar) val expandedAttributes = (hiddenOutput ++ input.output).filter( matchedQualifier(_, target.get, resolver)) if (expandedAttributes.nonEmpty) return expandedAttributes // 尝试将其解析为结构体类型扩展。 val attribute = input.resolve(target.get, resolver) if (attribute.isDefined) { // 如果目标可以解析成子节点的一个属性,那么它一定是个结构体,需要扩展 attribute.get.dataType match { case s: StructType => s.zipWithIndex.map { case (f, i) => val extract = GetStructField(attribute.get, i) Alias(extract, f.name)() } case _ => throw QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get) } } else { val from = input.inputSet.map(_.name).mkString(", ") val targetString = target.get.mkString(".") throw QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError( targetString, from) } }
由于我们只是一个*,不是类似 a.* 这样的,故 target 就是空的,我们直接在第一步就返回了,即返回 input.output,这里就来问题了:input 是什么呢?
input通过之前 AnalysisHelper.resolveOperatorsUpWithPruning (即之前递归应用偏函数一节) 的源码解析就可以看出来:
每一个规则的输入就是其子节点应用规则后的输出
故,input 就是 Project [*] 的子节点应用规则后的输出,也就是
+- SubqueryAlias t_user +- View (`t_user`, [addr#7,age#8L,name#9,sex#10]) +- Relation [addr#7,age#8L,name#9,sex#10] json
SubqueryAlias 及其子节点应用规则后未发生变化。
故,input.output 即 SubqueryAlias.output
SubqueryAlias.outputoverride def output: Seq[Attribute] = { // 将 标识符 拼接起来 val qualifierList = identifier.qualifier :+ alias child.output.map(_.withQualifier(qualifierList)) }
SubqueryAlias.output 所做的事情就是将包括子节点的标识符拼接起来。
最终解析后的逻辑计划长啥样子呢?
Project [addr#8, age#9L, name#10, sex#11]+- SubqueryAlias t_user +- View (`t_user`, [addr#8,age#9L,name#10,sex#11]) +- Relation [addr#8,age#9L,name#10,sex#11] json
照例,我们还是画一个表格将上面的输出和具体的源码对应起来:
我们在第一讲中实际上已经提前给出了 analysis 阶段的最终输出了:
== Analyzed Logical Plan ==addr: array
为啥和上面的逻辑计划打印不太一样呢?
我们可以从 QueryExecution.writePlans 中找到答案
append("n== Analyzed Logical Plan ==n") try { if (analyzed.output.nonEmpty) { append( truncatedString( analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields) ) append("n") } QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)
可以看到,在打印生成的逻辑计划之前,还会先打印属性名称和类型。
小结本讲是对 Spark SQL 工作流程中 analysis 阶段的源码解析。
上来我们就明确了自己的目标——我们要对 parsing 阶段生成的 AST 进行进一步的解析。
确定目标后,我们先盘算了手头已有的东西——一个临时视图:t_user 以及从 JSON 文件中推测到了字段名称和字段类型。
然后我们开始主流程的源码解析,从入口开始,讲到了执行和校验。
我们这时接触到了规则批(Batch)这个概念,并且我们梳理了所有的规则批。
由于篇幅的限制,我们不可能做到详解每一种规则具体是怎么实现的。
所以,我们就回到了我们最初的例子,通过分析这个简单直接的例子能够帮助我们触类旁通。
我们详细解析了ResolveRelations 和 ResolveReferences 这两个规则是如何一步步的作用于我们的逻辑计划,最终完成 analysis 阶段的解析的。
至此,analysis 阶段的基本流程相信大家已经一览无余了。
麻烦看到这里的同学帮忙三连支持一波,万分感谢~