你好,游客 登录
背景:
阅读新闻

Spark Sql系统入门1:什么是spark sql及包含哪些组件

[日期:2017-10-31] 来源:aboutyun  作者:pig2 [字体: ]

1.为什么会产生spark sql
2.sparkSQL包含哪些内容?
3.spark运行框架是什么?
4.spark sql包含哪些组件?
5.sparkSQL有哪两个分支?





1.为什么会产生spark sql

随着Spark的发展,其中sparkSQL作为Spark生态的一员继续发展,而不再受限于hive,只是兼容hive;而hive on spark是一个hive的发展计划,该计划将spark作为hive的底层引擎之一,也就是说,hive将不再受限于一个引擎,可以采用map- reduce、Tez、spark等引擎。
  Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划 从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。这一策略导致了两个问题:
第一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;
第二是因为MR是进程级并行,写代码的时候不是很注意线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支(至于为何相关修改没有合并到Hive主线,我也不太清楚)。
  此外,除了兼容HQL、加速现有Hive数据的查询分析以外,Spark SQL还支持直接对原生RDD对象进行关系查询。同时,除了HQL以外,Spark SQL还内建了一个精简的SQL parser,以及一套Scala DSL。也就是说,如果只是使用Spark SQL内建的SQL方言或Scala DSL对原生RDD对象进行关系查询,用户在开发Spark应用时完全不需要依赖Hive的任何东西。


总结:
从这里说明spark sql是为了实现hive兼容计划,也就是说hive可以使用spark引擎,也就是说我们通过spark具体来说通过spark sql来操作hive。
2.为什么sparkSQL的性能得到提升
这个简单了解即可,
1.内存列存���(In-Memory Columnar Storage)
2.字节码生成技术
3.scala代码优化
3.sparkSQL组成
sparkSQL1.1总体上由四个模块组成:core、catalyst、hive、hive-Thriftserver:
1.core处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
2.catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
3.hive对hive数据的处理
4.hive-ThriftServer提供CLI和JDBC/ODBC接口
在这四个模块中,catalyst处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是catalyst的一个设计图:



其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:
1.sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;
2. Analyzer,主要完成绑定工作,将不同来源的Unresolved LogicalPlan和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolved LogicalPlan;
3. optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan;
4. Planner将LogicalPlan转换成PhysicalPlan;
5. CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划
这些组件的基本实现方法:
6. 先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。
7. Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的属性而转换成resolved LogicalPlan;
8. optimizer使用Optimization Rules,对resolved LogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimized LogicalPlan;
9. Planner使用Planning Strategies,对optimized LogicalPlan进行转换,转换成可以执行的物理计划。


4.sparkSQL组件之解析
sparkSQL有两个分支,sqlContext和hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);
这里介绍sqlContext的关键的概念和组件。

概念:
o LogicalPlan
组件:
o SqlParser
o Analyzer
o Optimizer
o Planner

LogicalPlan

在sparkSQL的运行架构中,LogicalPlan贯穿了大部分的过程,其中catalyst中的SqlParser、Analyzer、Optimizer都要对LogicalPlan进行操作。LogicalPlan的定义如下:

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
self: Product =>
case class Statistics( sizeInBytes: BigInt )
lazy val statistics: Statistics = {
if (children.size == 0) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.") }
Statistics(
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product) }
/**
* Returns the set of attributes that this node takes as * input from its children. */
lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))
/**
* Returns true if this expression and all its children have been resolved to a specific schema
* and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
* can override this (e.g.
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
* should return `false`).
*/
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
/**
* Returns true if all its children of this query plan have been resolved.
*/
 
def childrenResolved: Boolean = !children.exists(!_.resolved)
/**
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child * nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/ def resolveChildren(name: String): Option[NamedExpression] = resolve(name, children.flatMap(_.output))
/**
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/ def resolve(name: String): Option[NamedExpression] = resolve(name, output)
/**
Performs attribute resolution given a name and a sequence of possible attributes.
*
/ protected def resolve(name: String, input: Seq[Attribute]):
Option[NamedExpression] = {
val parts = name.split("\\.")
val options = input.flatMap { option =>
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1)
else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil } options.distinct match {
case Seq((a, Nil)) => Some(a)
// One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) => a.dataType match { case StructType(fields) => Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)()) case _ => None
// Don't know how to resolve these field references } case Seq() => None
// No matches.
case ambiguousReferences => throw new TreeNodeException( this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
}
}



在LogicalPlan里维护者一套统计数据和属性数据,也提供了解析方法。同时延伸了三种类型的LogicalPlan:
1 LeafNode:对应于trees.LeafNode的LogicalPlan
2 UnaryNode:对应于trees.UnaryNode的LogicalPlan
3 BinaryNode:对应于trees.BinaryNode的LogicalPlan
而对于SQL语句解析时,会调用和SQL匹配的操作方法来进行解析;这些操作分四大类,最终生成LeafNode、UnaryNode、BinaryNode中的一种:
1 basicOperators:一些数据基本操作,如Ioin、Union、Filter、Project、Sort
2 commands:一些命令操作,如SetCommand、CacheCommand
3 partitioning:一些分区操作,如RedistributeData
4 ScriptTransformation:对脚本的处理,如ScriptTransformation
5 LogicalPlan类的总体架构如下所示





SqlParser

SqlParser的功能就是将SQL语句解析成Unresolved LogicalPlan。现阶段的SqlParser语法解析功能比较简单,支持的语法比较有限。其解析过程中有两个关键组件和一个关键函数:
1词法读入器SqlLexical,其作用就是将输入的SQL语句进行扫描、去空、去注释、校验、分词等动作。
2 SQL语法表达式query,其作用定义SQL语法表达式,同时也定义了SQL语法表达式的具体实现,即将不同的表达式生成不同sparkSQL的Unresolved LogicalPlan。
3 函数phrase(),上面个两个组件通过调用phrase(query)(new lexical.Scanner(input)),完成对SQL语句的解析;在解析过程中,SqlLexical一边读入,一边解析,如果碰上生成符合 SQL语法的表达式时,就调用相应SQL语法表达式的具体实现函数,将SQL语句解析成Unresolved LogicalPlan。
下面看看sparkSQL的整个解析过程和相关组件:


解析过程

首先,在sqlContext中使用下面代码调用catalyst.SqlParser:

[Scala] 纯文本查看 复制代码
1
2
3
/*源自 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)



然后,直接在SqlParser的apply方法中对输入的SQL语句进行解析,解析功能的核心代码就是:
phrase(query)(new lexical.Scanner(input))

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */
class SqlParser extends StandardTokenParsers with PackratParsers {
def apply(input: String): LogicalPlan = {
if (input.trim.toLowerCase.startsWith("set")) {
//set设置项的处理
...... }
else {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
} } }
......



可以看得出来,该语句就是调用phrase()函数,使用SQL语法表达式query,对词法读入器lexical读入的SQL语句进行解析,其中词法读 入器lexical通过重写语句:override val lexical = new SqlLexical(reservedWords) 调用扩展了功能的SqlLexical。其定义:

[Scala] 纯文本查看 复制代码
1
2
3
4
5
6
7
8
/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */
// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
this.getClass .
getMethods
.filter(_.getReturnType == classOf[Keyword])
.map(_.invoke(this).asInstanceOf[Keyword].str)
override val lexical = new SqlLexical(reservedWords)


为了加深对SQL语句解析过程的理解,让我们看看下面这个简单数字表达式解析过程来说明:

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import scala.util.parsing.combinator.PackratParsers
import scala.util.parsing.combinator.syntactical._
object mylexical extends StandardTokenParsers with PackratParsers {
//定义分割符 lexical.delimiters ++= List(".", ";", "+", "-", "*")
//定义表达式,支持加,减,乘
lazy val expr: PackratParser[Int] = plus | minus | multi
//加法表示式的实现
lazy val plus: PackratParser[Int] = num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt}
//减法表达式的实现 lazy val minus: PackratParser[Int] = num ~ "-" ~ num ^^ { case n1 ~ "-" ~ n2 => n1.toInt - n2.toInt}
//乘法表达式的实现 lazy val multi: PackratParser[Int] = num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt} lazy val num = numericLit def parse(input: String) = {
//定义词法读入器myread,并将扫描头放置在input的首位
val myread = new PackratReader(new lexical.Scanner(input))
print("处理表达式 " + input)
phrase(expr)(myread)
match { case Success(result, _) => println(" Success!");
println(result); Some(result) case n => println(n);
println("Err!"); None
}
}
def main(args: Array[String]) {
 
val prg = "6 * 3" :: "24-/*aaa*/4" :: "a+5" :: "21/3" :: Nil
prg.map(parse)
}
}

运行结果:

[Bash shell] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
处理表达式 6 * 3 Success! //lexical对空格进行了处理,得到6*3
 18 //6*3符合乘法表达式,调用n1.toInt * n2.toInt,得到结果并返回
处理表达式 24-/*aaa*/4 Success!
//lexical对注释进行了处理,得到20-4 20
//20-4符合减法表达式,调用n1.toInt - n2.toInt,得到结果并返回
处理表达式 a+5[1.1] failure: number expected
//lexical在解析到a,发现不是整数型,故报错误位置和内容
a+5
^
Err!
处理表达式 21/3[1.3] failure: ``*'' expected but ErrorToken(illegal character)
found
//lexical在解析到/,发现不是分割符,故报错误位置和内容
21/3
^
Err!


在运行的时候,首先对表达式 6 * 3 进行解析,词法读入器myread将扫描头置于6的位置;当phrase()函数使用定义好的数字表达式expr处理6 * 3的时候,6 * 3每读入一个词法,就和expr进行匹配,如读入6*和expr进行匹配,先匹配表达式plus,*和+匹配不上;就继续匹配表达式minus,*和-匹 配不上;就继续匹配表达式multi,这次匹配上了,等读入3的时候,因为3是num类型,就调用调用n1.toInt * n2.toInt进行计算。

注意,这里的expr、plus、minus、multi、num都是表达式,|、~、^^是复合因子,表达式和复合因子可以组成一个新的表达式,如 plus(num ~ "+" ~ num ^^ { case n1 ~ "+" ~ n2 => n1.toInt + n2.toInt})就是一个由num、+、num、函数构成的复合表达式;而expr(plus | minus | multi)是由plus、minus、multi构成的复合表达式;复合因子的含义定义在类scala/util/parsing /combinator/Parsers.scala,下面是几个常用的复合因子:


1 p | q p失败则q,返回第一个成功的结果
2 p ^^ f 如果p成功,将函数f应用到p的结果上
3 p ^? f 如果p成功,如果函数f可以应用到p的结果上的话,就将p的结果用f进行转换

针对上面的6 * 3使用的是multi表达式(num ~ "*" ~ num ^^ { case n1 ~ "*" ~ n2 => n1.toInt * n2.toInt}),其含义就是:num后跟*再跟num,如果满足就将使用函数n1.toInt * n2.toInt。
到这里为止,大家应该明白整个解析过程了吧,。SqlParser的原理和这个表达式解析器使用了一样的原理,只不过是定义的SQL语法表达式query 复杂一些,使用的词法读入器更丰富一些而已。下面分别介绍一下相关组件SqlParser、SqlLexical、query。

SqlParser

首先,看看SqlParser的UML图:




其次,看看SqlParser的定义,SqlParser继承自类StandardTokenParsers和特质PackratParsers:
其中,PackratParsers:
1 扩展了scala.util.parsing.combinator.Parsers所提供的parser,做了内存化处理;
2 Packrat解析器实现了回溯解析和递归下降解析,具有无限先行和线性分析时的优势。同时,也支持左递归词法解析。
3 从Parsers中继承出来的class或trait都可以使用PackratParsers,如:object MyGrammar extends StandardTokenParsers with PackratParsers;
4 PackratParsers将分析结果进行缓存,因此,PackratsParsers需要PackratReader(内存化处理的Reader)作 为输入,程序员可以手工创建PackratReader,如production(new PackratReader(new lexical.Scanner(input))),更多的细节参见scala库中/scala/util/parsing/combinator /PackratParsers.scala文件。
StandardTokenParsers是最终继承自Parsers
5 增加了词法的处理能力(Parsers是字符处理),在StdTokenParsers中定义了四种基本词法:
o keyword tokens
o numeric literal tokens
o string literal tokens
o identifier tokens
6 定义了一个词法读入器lexical,可以进行词法读入
SqlParser在进行解析SQL语句的时候是调用了PackratParsers中phrase():

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
/*源自 scala/util/parsing/combinator/PackratParsers.scala */
/**
* A parser generator delimiting whole phrases (i.e. programs).
*
* Overridden to make sure any input passed to the argument parser
* is wrapped in a `PackratReader`.
*/
override def phrase[T](p: Parser[T]) = {
val q = super.phrase(p)
new PackratParser[T] {
def apply(in: Input) = in match {
case in: PackratReader[_] => q(in)
case in => q(new PackratReader(in))
}
}
}


在解析过程中,一般会定义多个表达式,如上面例子中的plus | minus | multi,一旦前一个表达式不能解析的话,就会调用下一个表达式进行解析:

[Scala] 纯文本查看 复制代码
1
2
3
4
5
6
/*源自 scala/util/parsing/combinator/Parsers.scala */
def append[U >: T](p0: => Parser[U]): Parser[U] = { lazy val p = p0
// lazy argument
Parser{ in => this(in) append p(in)
}
}


表达式解析正确后,具体的实现函数是在PackratParsers中完成:

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/*源自 scala/util/parsing/combinator/PackratParsers.scala */
def memo[T](p: super.Parser[T]): PackratParser[T] = {
new PackratParser[T] {
def apply(in: Input) = {
val inMem = in.asInstanceOf[PackratReader[Elem]]
//look in the global cache if in a recursion
val m = recall(p, inMem) m match {
//nothing has been done due to recall
case None =>
val base = LR(Failure("Base Failure",in), p, None)
inMem.lrStack = base::inMem.lrStack
//cache base result
inMem.updateCacheAndGet(p,MemoEntry(Left(base)))
//parse the input
val tempRes = p(in)
//the base variable has passed equality tests with the cache
inMem.lrStack = inMem.lrStack.tail
//check whether base has changed, if yes, we will have a
head base.head match {
case None => /*simple result*/ inMem.updateCacheAndGet(p,MemoEntry(Right(tempRes))) tempRes
case s@Some(_) =>
/*non simple result*/
base.seed = tempRes
//the base variable has passed equality tests with the cache
val res = lrAnswer(p, inMem, base)
res
}
case Some(mEntry) => {
//entry found in cache
mEntry match {
case MemoEntry(Left(recDetect)) => {
setupLR(p, inMem, recDetect)
//all setupLR does is change the heads of the recursions, so the seed will stay the same
recDetect match {case LR(seed, _, _) =>
seed.asInstanceOf[ParseResult[T]]} }
case MemoEntry(Right(res: ParseResult[_])) =>
res.asInstanceOf[ParseResult[T]]
}
}
}
}
}
}



StandardTokenParsers增加了词法处理能力,SqlParers定义了大量的关键字,重写了词法读入器,将这些关键字应用于词法读入器。

SqlLexical
词法读入器SqlLexical扩展了StdLexical的功能,首先增加了大量的关键字:

[Scala] 纯文本查看 复制代码
1
2
3
4
5
6
7
8
/*源自 src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala */
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
......
protected val SUBSTR = Keyword("SUBSTR")
protected val SUBSTRING = Keyword("SUBSTRING")


其次丰富了分隔符、词法处理、空格注释处理:





最后看看SQL语法表达式query。

query

SQL语法表达式支持3种操作:select、insert、cache




而这些操作还有具体的定义,如select,这里开始定义了具体的函数,将SQL语句转换成构成Unresolved LogicalPlan的一些Node:



Analyzer

Analyzer的功能就是对来自SqlParser的Unresolved LogicalPlan中的UnresolvedAttribute项和UnresolvedRelation项,对照catalog和 FunctionRegistry生成Analyzed LogicalPlan。Analyzer定义了5大类14小类的rule:

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala */
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once, NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once, (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
ResolveSortReferences ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
typeCoercionRules :_*),
Batch("Check Analysis", Once, CheckResolution),
Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) )



MultiInstanceRelations
o NewRelationInstances
1 CaseInsensitiveAttributeReferences
o LowercaseAttributeReferences
2 Resolution
o ResolveReferences
o ResolveRelations
o ResolveSortReferences
o NewRelationInstances
o ImplicitGenerate
o StarExpansion
o ResolveFunctions
o GlobalAggregates
o UnresolvedHavingClauseAttributes
o typeCoercionRules
3Check Analysis
o CheckResolution
4 AnalysisOperators
o EliminateAnalysisOperators
这些rule都是使用transform对Unresolved LogicalPlan进行操作,其中typeCoercionRules是对HiveQL语义进行处理,在其下面又定义了多个 rule:PropagateTypes、ConvertNaNs、WidenTypes、PromoteStrings、 BooleanComparisons、BooleanCasts、StringToIntegralCasts、 FunctionArgumentConversion、CaseWhenCoercion、Division,同样了这些rule也是使用 transform对Unresolved LogicalPlan进行操作。这些rule操作后,使得LogicalPlan的信息变得丰满和易懂。下面拿其中的两个rule来简单介绍一下:
比如rule之ResolveReferences,最终调用LogicalPlan的resolveChildren对列名给一名字和序号,如name#67之列的,这样保持列的唯一性:




又比如rule之StarExpansion,其作用就是将Select * Fom tbl中的*展开,赋予列名:






Optimizer

Optimizer的功能就是将来自Analyzer的Analyzed LogicalPlan进行多种rule优化,生成Optimized LogicalPlan。Optimizer定义了3大类12个小类的优化rule:

[Scala] 纯文本查看 复制代码
1
2
3
4
5
6
7
8
/*源自
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Combine Limits", FixedPoint(100), CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, LikeSimplification, BooleanSimplification, SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughJoin, ColumnPruning) :: Nil
}


1Combine Limits 合并Limit
o CombineLimits:将两个相邻的limit合为一个
2ConstantFolding 常量叠加
o NullPropagation 空格处理
o ConstantFolding:常量叠加
o LikeSimplification:like表达式简化
o BooleanSimplification:布尔表达式简化
o SimplifyFilters:Filter简化
o SimplifyCasts:Cast简化
o SimplifyCaseConversionExpressions:CASE大小写转化表达式简化
3 Filter Pushdown Filter下推
o CombineFilters Filter合并
o PushPredicateThroughProject 通过Project谓词下推
o PushPredicateThroughJoin 通过Join谓词下推
o ColumnPruning 列剪枝
这些优化rule都是使用transform对LogicalPlan进行操作,如合并、删除冗余、简化、剪枝等,是整个LogicalPlan变得更简洁更高效。
比如将两个相邻的limit进行合并,可以使用CombineLimits。象sql("select * from (select * from src limit 5)a limit 3 ") 这样一个SQL语句,会将limit 5和limit 3进行合并,只剩一个一个limit 3。

[Scala] 纯文本查看 复制代码
1
2
3
4
5
6
7
/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case ll @ Limit(le, nl @ Limit(ne, grandChild)) =>
Limit(If(LessThan(ne, le), ne, le), grandChild)
}
}



又比如Null值的处理,可以使用NullPropagation处理。象sql("select count(null) from src where key is not null")这样一个SQL语句会转换成sql("select count(0) from src where key is not null")来处理��

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
/*源自 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala */
object NullPropagation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsUp {
case e @ Count(Literal(null, _)) => Cast(Literal(0L), e.dataType)
case e @ Sum(Literal(c, _)) if c == 0 => Cast(Literal(0L), e.dataType)
case e @ Average(Literal(c, _)) if c == 0 => Literal(0.0, e.dataType) case e @ IsNull(c) if !c.nullable => Literal(false, BooleanType)
case e @ IsNotNull(c) if !c.nullable => Literal(true, BooleanType)
case e @ GetItem(Literal(null, _), _) => Literal(null, e.dataType)
case e @ GetItem(_, Literal(null, _)) => Literal(null, e.dataType)
case e @ GetField(Literal(null, _), _) => Literal(null, e.dataType)
case e @ EqualNullSafe(Literal(null, _), r) => IsNull(r)
case e @ EqualNullSafe(l, Literal(null, _)) => IsNull(l) ......
}
}
}



对于具体的优化方法可以使用下一章所介绍的hive/console调试方法进行调试,用户可以使用自定义的优化函数,也可以使用sparkSQL提供的 优化函数。使用前先定义一个要优化查询,然后查看一下该查询的Analyzed LogicalPlan,再使用优化函数去优化,将生成的Optimized LogicalPlan和Analyzed LogicalPlan进行比较,就可以看到优化的效果。



5.spark sql运行架构

对于sql语句,spark与传统数据库解析是差不多的。将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

如下面是传统数据库sql语句对应的tree



sparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个 Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

Tree介绍

Tree的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
1. Logical Plans、Expressions、Physical Operators都可以使用Tree表示
2. Tree的具体操作是通过TreeNode来实现的
o sparkSQL定义了catalyst.trees的日志,通过这个日志可以形象的表示出树的结构
o TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)进行操作
o 有了TreeNode,通过Tree中各个TreeNode之间的关系,可以对Tree进行遍历操作,如使用transformDown、 transformUp将Rule应用到给定的树段,然后用结果替代旧的树段;也可以使用transformChildrenDown、 transformChildrenUp对一个给定的节点进行操作,通过迭代将Rule应用到该节点以及子节点。


3.TreeNode可以细分成三种类型的Node:
o UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作
o BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作
o LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand



Rule介绍
1. Rule的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
2. Rule在sparkSQL的Analyzer、Optimizer、SparkPlan等各个组件中都有应用到
3. Rule是一个抽象类,具体的Rule实现是通过RuleExecutor完成
4. Rule通过定义batch和batchs,可以简便的、模块化地对Tree进行transform操作
5. Rule通过定义Once和FixedPoint,可以对Tree进行一次操作或多次操作(如对某些Tree进行多次迭代操作的时候,达到FixedPoint次数迭代或达到前���两次的�����结构没变化才停止操作,具体参看RuleExecutor.apply)


拿个简单的例子,在处理由解析器(SqlParse)生成的LogicPlan Tree的时候,在Analyzer中就定义了多种Rules应用到LogicPlan Tree上。
应用示意图:


Analyzer中使用的Rules,定义了batches,由多个batch构成,如MultiInstanceRelations、 Resolution、Check Analysis、AnalysisOperators等构成;每个batch又有不同的rule构成,如Resolution由 ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等构成;每个rule又有自己相对应的处理函数,可以具体参看Analyzer中的 ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances函数;同时要注意的是,不同的rule应用次数是不同的:如 CaseInsensitiveAttributeReferences这个batch中rule只应用了一次(Once),而Resolution这个 batch中的rule应用了多次(fixedPoint = FixedPoint(100),也就是说最多应用100次,除非前后迭代结果一致退出)。



在整个sql语句的处理过程中,Tree和Rule相互配合,完成了解析、绑定(在sparkSQL中称为Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。
知道了sparkSQL的各个过程的基本处理方式,下面来看看sparkSQL的运行过程。sparkSQL有两个分支,sqlContext和 hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);hiveContext现在支持sql语法解析器和 hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器,来运行hiveql不支持的语法,如select 1。关于sqlContext和hiveContext的具体看下面

SQLContext成员:

Catalog 
一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类。



SqlParser
Parse 传入的sql来对语法分词,构建语法树,返回一个logical plan



Analyzer
  logical plan的语法分析器


Optimizer
logical Plan的优化器



LogicalPlan
逻辑计划,由catalyst的TreeNode组成,可以看到有3种语法树



SparkPlanner
包含不同策略的优化策略来优化物理执行计划


QueryExecution
sql执行的环境上下文


就是这些对象组成了Spark SQL的运行时,看起来很酷,有静态的metadata存储,有分析器、优化器、逻辑计划、物理计划、执行运行时。
那这些对象是怎么相互协作来执行sql语句的呢?


sqlContext的运行过程
sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:


话不多说,先上图,这个图我用一个在线作图工具process on话的:


核心组件都是绿色的方框,每一步流程的结果都是蓝色的框框,调用的方法是橙色的框框。

先概括一下,大致的执行流程是:
Parse SQL -> Analyze Logical Plan -> Optimize Logical Plan -> Generate Physical Plan -> Prepareed Spark Plan -> Execute SQL -> Generate RDD

更具体的执行流程:
     sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD

详细分析:

sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText)) //parseSql(sqlText)对sql语句进行语法解析
}
else
{
sys.error(s"Unsupported SQL dialect: $dialect")
}
 
}



sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成Unresolved LogicalPlan。

[Scala] 纯文本查看 复制代码
1
2
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */protected[sql] val parser = new catalyst.SqlParser
protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)



类SchemaRDD继承自SchemaRDDLike

[Scala] 纯文本查看 复制代码
1
2
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala */
class SchemaRDD( @transient val sqlContext: SQLContext, @transient val baseLogicalPlan: LogicalPlan) extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike



SchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行 catalyst.SqlParser解析后生成Unresolved LogicalPlan,这里的baseLogicalPlan就是指Unresolved LogicalPlan。

[Scala] 纯文本查看 复制代码
1
2
3
4
5
/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala */private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient val baseLogicalPlan: LogicalPlan
private[sql] def baseSchemaRDD: SchemaRDD
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)



sqlContext.executePlan做了什么呢?它调用了QueryExecution类

[Scala] 纯文本查看 复制代码
1
2
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }



QueryExecution类的定义:

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala */protected abstract class QueryExecution {
def logical: LogicalPlan
//对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
//对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
lazy val optimizedPlan = optimizer(analyzed) // 将optimized LogicalPlan转换成PhysicalPlan
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
// PhysicalPlan执行前的准备工作,生成可执行的物理计划
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
//执行可执行物理计划
lazy val toRdd: RDD[Row] = executedPlan.execute()
......
}



sqlContext总的一个过程如下图所示:
1. SQL语句经过SqlParse解析成Unresolved LogicalPlan;
2. 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolved LogicalPlan;
3. 使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan;
4. 使用SparkPlan将LogicalPlan转换成PhysicalPlan;
5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
6. 使用execute()执行可执行物理计划;
7. 生成SchemaRDD。







总结:
  通过分析SQLContext我们知道了Spark SQL都包含了哪些组 件,SqlParser,Parser,Analyzer,Optimizer,LogicalPlan,SparkPlanner(包含 Physical Plan),QueryExecution.
  通过调试代码,知道了Spark SQL的执行流程:
sql or hql -> sql parser(parse)生成 unresolved logical plan -> analyzer(analysis)生成analyzed logical plan  -> optimizer(optimize)optimized logical plan -> spark planner(use strategies to plan)生成physical plan -> 采用不同Strategies生成spark plan -> spark plan(prepare) prepared spark plan -> call toRDD(execute()函数调用) 执行sql生成RDD

hiveContext的运行过程


在分布式系统中,由于历史原因,很多数据已经定义了hive的元数据,通过这些hive元数据,sparkSQL使用hiveContext很容易实现对 这些数据的访问。值得注意的是hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函 数和变量��可以使用和sqlContext一样的函数��变��。
从sparkSQL1.1开始,hiveContext使用hiveContext.sql(sqlText)来提交用户sql语句进行查询:

[Bash shell] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
override def sql(sqlText: String): SchemaRDD = {
// 使用spark.sql.dialect定义采用的语法解析器
if (dialect == "sql") { super.sql(sqlText)
//如果使用sql解析器,则使用sqlContext的sql方法
}
else if (dialect == "hiveql") {
//如果使用和hiveql解析器,则使用HiveQl.parseSql
new SchemaRDD(this, HiveQl.parseSql(sqlText))
}
else {
sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'")
}
}



hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect == "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了 new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。

[Scala] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */
 
/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan =
{ try
  {
    if (条件) {
 
//非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
.....
}
else
{
val tree = getAst(sql)
if (nativeCommands contains tree.getText)
{
NativeCommand(sql)
}
else {
nodeToPlan(tree) match {
case NativePlaceholder => NativeCommand(sql)
case other => other } } } }
catch { //异常处理
...... }
 
}



因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:
1 首先考虑一些非hive语句的处理,这些命令属于sparkSQL本身的命令语句,如设置sparkSQL运行参数的set命令、cache table、add jar等,将这些语句转换成command类型的LogicalPlan;
2如果是hive语句,则调用getAst(sql)使用hive的ParseUtils将该语句先解析成AST树,然后根据AST树中的关键字进行转 换:类似命令型的语句、DDL类型的语句转换成command类型的LogicalPlan;其他的转换通过nodeToPlan转换成 LogicalPlan。

[Scala] 纯文本查看 复制代码
1
2
3
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala */
/** * Returns the AST for the given SQL string. */
def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))



和sqlContext一样,类SchemaRDD继承自SchemaRDDLike ,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了 executePlan()函数:

[Scala] 纯文本查看 复制代码
1
2
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan }



并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:

[AppleScript] 纯文本查看 复制代码
1
2
3
4
5
6
7
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
protected[sql] abstract class QueryExecution extends super.QueryExecution {
// TODO: Create mixin for the analyzer instead of overriding things here.
override lazy val optimizedPlan = optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))
override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
......
}



所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。
hiveContext的catalog,是指向 Hive Metastore:

[Scala] 纯文本查看 复制代码
1
2
3
4
5
6
7
8
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
/* A catalyst metadata catalog that points to the Hive Metastore. */
@transient override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
 
override def lookupRelation( databaseName: Option[String], tableName: String, alias: Option[String] = None): LogicalPlan = {
LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
}
}



hiveContext的analyzer,使用了新的catalog和functionRegistry:

[Scala] 纯文本查看 复制代码
1
2
3
4
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = false)


hiveContext的planner,使用新定义的hivePlanner:

[Scala] 纯文本查看 复制代码
1
2
/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala */
@transient override protected[sql] val planner =



所以hiveContext总的一个过程如下图所示:
1. SQL语句经过HiveQl.parseSql解析成Unresolved LogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;
2. 使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolved LogicalPlan;
3. 使用optimizer对resolved LogicalPlan进行优化,生成optimized LogicalPlan,优化前使用了 ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))) 进行预处理;
4. 使用hivePlanner将LogicalPlan转换成PhysicalPlan;
5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
6. 使用execute()执行可执行物理计划;
7. 执行后,使用map(_.copy)将结果导入SchemaRDD。
hiveContxt还有很多针对hive的特性,更细节的内容参看源码。




下一篇:spark sql 入门2----spark sql精简总结

 

 

收藏 推荐 打印 | 阅读:
相关新闻       spark sql  spark运行框架