[占位符]
查询计划器是数据库管理系统(DBMS)的组件,该系统负责生成执行数据库查询的计划。查询计划指定了DBMS将要检索查询要求的数据所采取的步骤。查询计划者的目标是生成尽可能高效的计划,这意味着它将尽快将数据返回给用户。
查询计划者是复杂的软件,它们很难理解。本实现基于成本的查询计划者的指南将为您提供该过程的分步概述,如何实现自己的基于成本的查询计划者,同时仍然涵盖查询计划者的基本概念。
由AI撰写,由人类编辑
本指南的编写:
目标:
图TD
用户((用户))
解析器[查询解析器]
计划者[查询计划者]
执行人[查询处理器]
用户 - 文字查询 - >解析器
解析器 - ast->计划者
计划者 - 物理计划 - >执行人
查询引擎的基本体系结构由这些组件组成:
通常,查询计划者分为两种类型:
启发式规划师是使用预定义规则生成查询计划的查询计划者。
基于成本的规划师是基于生成查询的成本的查询计划者,它试图根据输入查询的成本找到最佳计划。
尽管启发式规划师通常会通过应用转换规则找到最佳计划,如果它知道转换计划更好,但基于成本的计划者通过列举等效计划找到最佳计划,并尝试找到其中的最佳计划。
在基于成本的查询计划者中,通常由阶段组成:
在计划列举阶段,计划者将列举可能的等效计划。
之后,在查询优化阶段,计划者将从列举计划列表中搜索最佳计划。最好的计划是该计划的成本最低,该计划定义了成本模型(或成本功能)。
因为逻辑计划的自然是具有类似树状的结构,因此您可以认为优化/搜索实际上是一个搜索问题。这里有很多树搜索算法:
注意:从理论上讲,可以使用任何类型的树搜索算法。但是,实际上,这是不可行的,因为当我们的搜索算法复杂时,搜索时间会增加
注意:搜索终止条件通常是:
火山查询计划器(或火山优化器生成器)是基于成本的查询计划者
火山规划师使用动态编程方法来从枚举计划列表中找到最佳的查询计划。
详细信息:https://ieeexplore.ieee.org/document/344061(我懒得在这里解释论文)
这是一个很好的解释:https://15721.courses.cs.cmu.edu/spring2017/slides/15-optimizer2.pdf#page=9
我们的查询计划者是基于成本的查询计划者,遵循火山查询计划者的基本思想,我们的计划者将由两个主要阶段组成:
图LR
AST((AST))
logical_plan [计划]
Exportored_plans [``
计划#1
...
计划#N
``]]
enasemation_plan [“计划#x(最佳计划)”]
ast-转换为逻辑计划 - > logical_plan
logical_plan-探索阶段 - > explored_plans
Expentored_plans-优化阶段 - > enasemation_plan
LinkStyle 1,2颜色:橙色,中风:橙色,中风宽度:5px
逻辑计划是保持执行查询所需的转换步骤的抽象的数据架构。
这是一个逻辑计划的示例:
图TD
1 [“ project tbl1.id,tbl1.field1,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”];
2 [“加入”];
3 [“扫描TBL1”];
4 [“加入”];
5 [“扫描TBL2”];
6 [“扫描TBL3”];
1-> 2;
2-> 3;
2-> 4;
4-> 5;
4-> 6;
虽然逻辑计划仅具有抽象,但物理计划是持有实现详细信息的数据架构。每个逻辑计划都会有多个物理计划。例如,逻辑上的加入可能会有许多物理计划,例如哈希(Hash)加入,合并加入,广播加入等。
等效组是一组等效表达式(对于每个表达式,它们的逻辑计划在逻辑上是等效的)
例如
图TD
子图组#8
expr#8 [“扫描TBL2(field1,field2,id)”]
结尾
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组11
expr#11 [“加入”]
结尾
Expr#11->组#7
Expr#11->组#10
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#7
expr#7 [“扫描TBL1(ID,field1)”]
结尾
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#10
expr#10 [“加入”]
结尾
Expr#10->组#8
Expr#10->组#9
子图组#9
expr#9 [“扫描TBL3(ID,field2)”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#12 [“ project tbl1.id,tbl1.field1,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#12->组#11
Expr#6->组#5
在这里,我们可以看到Group#6有2个等效的表达式,它们都代表相同的查询(一个是从表进行扫描,然后是项目,一个将投影向下推到扫描节点)。
转换规则是从一个逻辑计划转换为另一个逻辑等效逻辑计划的规则
例如,计划:
图TD
1 [“ project tbl1.id,tbl1.field1,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”];
2 [“加入”];
3 [“扫描TBL1”];
4 [“加入”];
5 [“扫描TBL2”];
6 [“扫描TBL3”];
1-> 2;
2-> 3;
2-> 4;
4-> 5;
4-> 6;
应用投影下降转换时,将转换为:
图TD
1 [“项目 *。 *”];
2 [“加入”];
3 [“扫描TBL1(ID,field1)”];
4 [“加入”];
5 [“扫描TBL2(field1,field2)”];
6 [“扫描tbl3(id,field2,field2)”];
1-> 2;
2-> 3;
2-> 4;
4-> 5;
4-> 6;
转换规则可以通过逻辑性状/属性(例如表模式,数据统计信息等)影响。
实施规则是返回给定逻辑计划的物理计划的规则。
实现规则可以受到物理性状/属性(例如数据布局(是否排序)等)的影响。
在探索阶段,计划者将应用转换规则,生成等效的逻辑计划
例如,计划:
图TD
1326583549 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.if,tbl3.field2,tbl3.field2”];
-425111028 [“ JOIN”];
-349388609 [“ scan tbl1”];
1343755644 [“ JOIN”];
-1043437086 [“ scan tbl2”];
-1402686787 [“ scan tbl3”];
1326583549-> -425111028;
-425111028-> -349388609;
-425111028-> 1343755644;
1343755644-> -1043437086;
1343755644-> -1402686787;
应用转换规则后,导致以下图:
图TD
子图组#8
expr#8 [“扫描TBL2(ID,field1,field2)”]
结尾
子图组11
expr#11 [“加入”]
Expr#14 [“加入”]
结尾
Expr#11->组#7
Expr#11->组#10
Expr#14->组#8
Expr#14->组#12
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
expr#16 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
Expr#16->组#2
Expr#16->组#13
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#13
Expr#15 [“加入”]
结尾
Expr#15->组#1
Expr#15->组#3
子图组#7
expr#7 [“扫描TBL1(ID,field1)”]
结尾
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#10
expr#10 [“加入”]
结尾
Expr#10->组#8
Expr#10->组#9
子图组#9
expr#9 [“扫描TBL3(ID,field2)”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组12
Expr#13 [“加入”]
结尾
Expr#13->组#7
Expr#13->组#9
子图组6
expr#12 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#12->组#11
Expr#6->组#5
在这里,我们可以看到应用投影下降规则和加入重新排序规则。
优化阶段是在勘探阶段穿越扩展的树,以找到我们查询的最佳计划。
此“实际上”是树搜索优化,因此您可以使用任何可以想象的树搜索算法(但是您必须确保它正确)。
这是优化阶段后生成的物理计划的示例:
图TD
组6 [“”
第6组
选择:project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2
操作员:ProjectOperator
费用:成本(CPU = 641400.00,MEM = 1020400012.00,TIME = 1000000.00)
”
第6组 - >组#11
组#11 [”
第11组
选择:加入
操作员:HashJoineperator
费用:成本(CPU = 641400.00,MEM = 1020400012.00,TIME = 1000000.00)
”
组#11->组#7
组11->组#10
组#7 [”
第7组
选择:扫描TBL1(ID,field1)
操作员:NormalsCanoperator
费用:成本(CPU = 400.00,MEM = 400000.00,时间= 1000.00)
”
组#10 [”
小组#10
选择:加入
操作员:合并器
特质:排序
费用:成本(CPU = 640000.00,MEM = 20000012.00,时间= 1100000.00)
”
组#10->组#8
组#10->组#9
组#8 [”
第8组
选择:扫描TBL2(ID,field1,field2)
操作员:NormalsCanoperator
特质:排序
费用:成本(CPU = 600000.00,MEM = 12.00,TIME = 1000000.00)
”
组#9 [”
组#9
选择:扫描TBL3(ID,field2)
操作员:NormalsCanoperator
特质:排序
费用:成本(CPU = 40000.00,MEM = 20000000.00,TIME = 100000.00)
”
生成的计划显示了选定的逻辑计划,估计成本和物理操作员
我们的计划者将执行精疲力尽的搜索以找到最佳计划
由于规划师的代码很大,所以我不会逐步编写指南,但是我会解释一下代码的每一部分
在这里,我们将定义一种彻底使用本教程的查询语言
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . id我们将实现的查询语言是一种类似SQL的语言。但是,为了简单起见,我们将限制其功能和语法。
该语言以
SELECT tbl . field , [...]
FROM tbl JOIN [...]它只会支持SELECT和JOIN ,还必须在Select语句中的字段(以table.field的形式)中的字段,所有其他功能都不会受到支持
首先,我们必须为我们的语言定义AST。 AST(或抽象语法树)是用来表示文本的句法结构的树。
由于我们的语言是如此简单,因此我们可以在几行代码中定义AST结构:
sealed trait Identifier
case class TableID ( id : String ) extends Identifier
case class FieldID ( table : TableID , id : String ) extends Identifier
sealed trait Statement
case class Table ( table : TableID ) extends Statement
case class Join ( left : Statement , right : Statement , on : Seq [( FieldID , FieldID )]) extends Statement
case class Select ( fields : Seq [ FieldID ], from : Statement ) extends Statement
例如,查询
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . id可以表示为
Select (
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ),
FieldID ( TableID ( " tbl1 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " id " ),
FieldID ( TableID ( " tbl2 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " id " ),
FieldID ( TableID ( " tbl3 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " field2 " )
),
Join (
Table ( TableID ( " tbl1 " )),
Join (
Table ( TableID ( " tbl2 " )),
Table ( TableID ( " tbl3 " )),
Seq (
FieldID ( TableID ( " tbl2 " ), " id " ) -> FieldID ( TableID ( " tbl3 " ), " id " )
)
),
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ) -> FieldID ( TableID ( " tbl2 " ), " id " )
)
)
)定义AST结构后,我们将不得不编写查询解析器,该解析器用于将文本查询转换为AST形式。
由于本指南是使用Scala进行实施,因此我们将选择Scala-Parser-Combionators创建我们的查询解析器。
查询解析器类:
object QueryParser extends ParserWithCtx [ QueryExecutionContext , Statement ] with RegexParsers {
override def parse ( in : String )( implicit ctx : QueryExecutionContext ) : Either [ Throwable , Statement ] = {
Try (parseAll(statement, in) match {
case Success (result, _) => Right (result)
case NoSuccess (msg, _) => Left ( new Exception (msg))
}) match {
case util. Failure (ex) => Left (ex)
case util. Success (value) => value
}
}
private def select : Parser [ Select ] = ??? // we will implement it in later section
private def statement : Parser [ Statement ] = select
}
然后定义一些解析规则:
// common
private def str : Parser [ String ] = """ [a-zA-Z0-9_]+ """ .r
private def fqdnStr : Parser [ String ] = """ [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ """ .r
// identifier
private def tableId : Parser [ TableID ] = str ^^ (s => TableID (s))
private def fieldId : Parser [ FieldID ] = fqdnStr ^^ { s =>
val identifiers = s.split( '.' )
if (identifiers.length != 2 ) {
throw new Exception ( " should never happen " )
} else {
val table = identifiers.head
val field = identifiers( 1 )
FieldID ( TableID (table), field)
}
}这是两个规则,用于解析标识符: TableID和FieldID 。
表ID(或表名称)通常仅包含字符,数字和下划线( _ ),因此我们将使用一个简单的正则表达式[a-zA-Z0-9_]+来识别表名。
另一方面,我们语言中的字段ID(对于现场预选赛)是完全合格的场名称。通常,它以table.field的形式,字段名称通常也只包含字符,数字和下划线,因此我们将使用Regex [a-zA-Z0-9_]+.[a-zA-Z0-9_]+来解析字段名称。
定义了解析标识符的规则后,我们现在可以定义规则以解析查询语句:
// statement
private def table : Parser [ Table ] = tableId ^^ (t => Table (t))
private def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) " table规则是一个简单的规则,它仅通过使用tableId规则的分析TableID创建Table节点。
subQuery是解析子问题的规则。在SQL中,我们可以编写一个看起来像这样的查询:
SELECT a
FROM ( SELECT b FROM c) d SELECT b FROM c是上述语句中的子查询。在这里,在我们简单的查询语言中,我们将指出一个语句是一个子查询,如果它被一对括号( () )包含。由于我们的语言只有精选的语句,因此我们可以写出解析规则如下:
def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) "现在,我们将定义Select语句的解析规则:
private def fromSource : Parser [ Statement ] = table ||| subQuery
private def select : Parser [ Select ] =
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep(
" JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)
) ^^ {
case _ ~ fields ~ _ ~ src ~ joins =>
val p = if (joins.nonEmpty) {
def chain ( left : Statement , right : Seq [( Statement , Seq [( FieldID , FieldID )])]) : Join = {
if (right.isEmpty) {
throw new Exception ( " should never happen " )
} else if (right.length == 1 ) {
val next = right.head
Join (left, next._1, next._2)
} else {
val next = right.head
Join (left, chain(next._1, right.tail), next._2)
}
}
val temp = joins.map { join =>
val statement = join._1._1._2
val joinOn = join._2.map(on => on._1._1 -> on._2)
statement -> joinOn
}
chain(src, temp)
} else {
src
}
Select (fields, p)
}在SQL中,我们可以使用子问题作为联接源。例如:
SELECT * . *
FROM tbl1
JOIN ( SELECT * . * FROM tbl2)
JOIN tbl3因此,我们的解析器还将实施规则以在声明的联接部分中解析子查询,这就是为什么我们有分析规则:
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep( " JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)有关完整实施,请参见queryparser.scala
参见Queryparserspec.scala
从文本查询生成AST后,我们可以将其直接转换为逻辑计划
首先,让我们定义逻辑计划的接口:
sealed trait LogicalPlan {
def children () : Seq [ LogicalPlan ]
}
children是儿童逻辑计划的清单。例如:
图TD
1326583549 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.if,tbl3.field2,tbl3.field2”];
-425111028 [“ JOIN”];
-349388609 [“ scan tbl1”];
1343755644 [“ JOIN”];
-1043437086 [“ scan tbl2”];
-1402686787 [“ scan tbl3”];
1326583549-> -425111028;
-425111028-> -349388609;
-425111028-> 1343755644;
1343755644-> -1043437086;
1343755644-> -1402686787;
PROJECT节点的子节点是第一个JOIN节点。第一个JOIN节点有2个孩子,这是第二个JOIN节点和SCAN tbl1节点。很快, ...
由于我们的查询语言很简单,因此我们只需要3种逻辑节点:
case class Scan ( table : ql. TableID , projection : Seq [ String ]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq .empty
}
case class Project ( fields : Seq [ql. FieldID ], child : LogicalPlan ) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (child)
}
case class Join ( left : LogicalPlan , right : LogicalPlan , on : Seq [(ql. FieldID , ql. FieldID )]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (left, right)
}
然后,我们可以编写该功能以将AST转换为逻辑计划:
def toPlan ( node : ql. Statement ) : LogicalPlan = {
node match {
case ql. Table (table) => Scan (table, Seq .empty)
case ql. Join (left, right, on) => Join (toPlan(left), toPlan(right), on)
case ql. Select (fields, from) => Project (fields, toPlan(from))
}
}有关完整实施,请参见LogicalPlan.scala
我们可以为小组定义类,如以下内容:
case class Group (
id : Long ,
equivalents : mutable. HashSet [ GroupExpression ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
var implementation : Option [ GroupImplementation ] = None
}
case class GroupExpression (
id : Long ,
plan : LogicalPlan ,
children : mutable. MutableList [ Group ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
val appliedTransformations : mutable. HashSet [ TransformationRule ] = mutable. HashSet ()
}
Group是一组逻辑上等效的计划。
每个GroupExpression代表一个逻辑计划节点。由于我们定义了一个逻辑计划节点将具有子节点的列表(在上一节中),并且GroupExpression代表一个逻辑计划节点,并且Group代表一组等效计划,因此GroupExpression的孩子是Group的列表。
例如
图TD
子图组#8
Expr#8
结尾
子图组#2
Expr#2
结尾
子图组11
Expr#11
结尾
Expr#11->组#7
Expr#11->组#10
子图组5
Expr#5
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
Expr#4
结尾
Expr#4->组#2
Expr#4->组#3
子图组#7
Expr#7
结尾
子图组#1
Expr#1
结尾
子图组#10
Expr#10
结尾
Expr#10->组#8
Expr#10->组#9
子图组#9
Expr#9
结尾
子图组#3
Expr#3
结尾
子图组6
Expr#12
Expr#6
结尾
Expr#12->组#11
Expr#6->组#5
正如我们在这里可以看到的那样, Group#6具有2个等效表达式: Expr#12和Expr#6 , Expr#12的孩子是Group#11
注意:我们将在勘探阶段实施多轮转换,因此,对于每个Group和GroupExpression ,我们都有ExplorationMark标志的探索状态。
class ExplorationMark {
private var bits : Long = 0
def get : Long = bits
def isExplored ( round : Int ) : Boolean = BitUtils .getBit(bits, round)
def markExplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = true )
def markUnexplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = false )
}
ExplorationMark只是一个bitset包装器类,如果探索了第i-thile,它将标记为第1位,否则将标记为0。
ExplorationMark也可用于可视化确切的转换,有关更多细节,请参见可视化。
备忘录是一群帮助者,可以帮助构建同等的群体。备忘录由几个hashmap组成来缓存组和组表达式,还提供了注册新组或组表达式的方法。
class Memo (
groupIdGenerator : Generator [ Long ] = new LongGenerator ,
groupExpressionIdGenerator : Generator [ Long ] = new LongGenerator
) {
val groups : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]()
val parents : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]() // lookup group from group expression ID
val groupExpressions : mutable. HashMap [ LogicalPlan , GroupExpression ] = mutable. HashMap [ LogicalPlan , GroupExpression ]()
def getOrCreateGroupExpression ( plan : LogicalPlan ) : GroupExpression = {
val children = plan.children()
val childGroups = children.map(child => getOrCreateGroup(child))
groupExpressions.get(plan) match {
case Some (found) => found
case None =>
val id = groupExpressionIdGenerator.generate()
val children = mutable. MutableList () ++ childGroups
val expression = GroupExpression (
id = id,
plan = plan,
children = children
)
groupExpressions += plan -> expression
expression
}
}
def getOrCreateGroup ( plan : LogicalPlan ) : Group = {
val exprGroup = getOrCreateGroupExpression(plan)
val group = parents.get(exprGroup.id) match {
case Some (group) =>
group.equivalents += exprGroup
group
case None =>
val id = groupIdGenerator.generate()
val equivalents = mutable. HashSet () + exprGroup
val group = Group (
id = id,
equivalents = equivalents
)
groups.put(id, group)
group
}
parents += exprGroup.id -> group
group
}
}
有关完整实施,请参见备忘录。
计划器内部的第一步是初始化
图LR
查询((QUERY))
AST((AST))
root_plan((rootplan))
root_group((rootgroup))
查询 - “ queryparser.parse(query)” - > ast
ast-“ logicalplan.toplan(ast)” - > root_plan
root_plan-“备忘录。
首先,查询将被解析为AST。然后转换为逻辑计划,称为root plan ,然后从root plan中初始化该组,称为root group 。
def initialize ( query : Statement )( implicit ctx : VolcanoPlannerContext ) : Unit = {
ctx.query = query
ctx.rootPlan = LogicalPlan .toPlan(ctx.query)
ctx.rootGroup = ctx.memo.getOrCreateGroup(ctx.rootPlan)
// assuming this is first the exploration round,
// by marking the initialRound(0) as explored,
// it will be easier to visualize the different between rounds (added nodes, add connections)
ctx.memo.groups.values.foreach(_.explorationMark.markExplored(initialRound))
ctx.memo.groupExpressions.values.foreach(_.explorationMark.markExplored(initialRound))
}有关更多详细信息,请参见Volcanoplanner.scala
例如,查询:
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . id初始化后,这些小组将看起来像这样:
图TD
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#6->组#5
在这里您可以看到,每个组都有一个等效的表达
初始化后,现在是勘探阶段,它将探索所有可能的等效计划。
探索方法非常简单:
在研究探索法规之前,让我们先谈谈转换规则。
转换规则是一条规则,用于将逻辑计划转换为另一个等效逻辑计划,如果该计划与规则条件匹配。
这是转换规则的接口:
trait TransformationRule {
def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean
def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression
}
由于逻辑计划是类似树的数据架构,因此转换规则的match实现是在树上匹配的模式。
例如,这是用于匹配项目节点的match ,同时还检查它是否是包含联接和扫描的后代:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}该计划“匹配”:
图TD
子图组#2
Expr#2 [“扫描”]
结尾
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#1
Expr#1 [“扫描”]
结尾
子图组#3
Expr#3 [“扫描”]
结尾
子图组6
Expr#6 [“项目”]
结尾
Expr#6->组#5
虽然该计划不是:
图TD
子图组#2
Expr#2 [“扫描”]
结尾
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#3
Expr#5->组#4
子图组#4
Expr#4 [“扫描”]
结尾
子图组#7
Expr#7 [“项目”]
结尾
Expr#7->组#6
子图组#1
Expr#1 [“扫描”]
结尾
子图组#3
Expr#3 [“项目”]
结尾
Expr#3->组#2
子图组6
expr#6 [“加入”]
结尾
Expr#6->组#1
Expr#6->组#5
正如我们之前说过的,探索方法是:
这是探索代码(非常简单,是的):
private def exploreGroup (
group : Group ,
rules : Seq [ TransformationRule ],
round : Int
)( implicit ctx : VolcanoPlannerContext ) : Unit = {
while ( ! group.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
// explore all child groups
group.equivalents.foreach { equivalent =>
if ( ! equivalent.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
equivalent.children.foreach { child =>
exploreGroup(child, rules, round)
if (equivalent.explorationMark.isExplored(round) && child.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
} else {
equivalent.explorationMark.markUnexplored(round)
}
}
}
// fire transformation rules to explore all the possible transformations
rules.foreach { rule =>
if ( ! equivalent.appliedTransformations.contains(rule) && rule.`match`(equivalent)) {
val transformed = rule.transform(equivalent)
if ( ! group.equivalents.contains(transformed)) {
group.equivalents += transformed
transformed.explorationMark.markUnexplored(round)
group.explorationMark.markUnexplored(round)
}
}
}
if (group.explorationMark.isExplored(round) && equivalent.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
} else {
group.explorationMark.markUnexplored(round)
}
}
}
}有关更多详细信息,请参见Volcanoplanner.scala
现在是时候实施一些转换规则了
投影下降是一个简单的转换规则,用于将投影推向存储层。
例如,查询
SELECT field1, field2
from tbl有计划
图LR
项目[Project Field1,field2]
扫描[扫描TBL]
项目 - >扫描
通过此计划,在执行时,将完全获取存储层(扫描下)的行(在扫描下),然后将删除不必要的字段(项目)。不必要的数据仍然必须从扫描节点转移到项目节点,因此这里有一些浪费的努力。
只需告诉存储层仅获取必要的字段,我们就可以使它变得更好。现在该计划将转变为:
图LR
项目[Project Field1,field2]
扫描[“扫描TBL(field1,field2)”]
项目 - >扫描
让我们介绍代码:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}我们在这里的预测下降规则将匹配计划节点时的计划,并且其所有后代都是扫描并仅加入节点。
注意:实际上,真正的投影下降匹配更为复杂,但是为了简单起见,这里的匹配规则只是扫描和加入后代的项目节点
这是转换代码:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Project ]
val pushDownProjection = mutable. ListBuffer [ FieldID ]()
extractProjections(plan, pushDownProjection)
val newPlan = Project (plan.fields, pushDown(pushDownProjection.distinct, plan.child))
ctx.memo.getOrCreateGroupExpression(newPlan)
}
private def extractProjections ( node : LogicalPlan , buffer : mutable. ListBuffer [ FieldID ]) : Unit = {
node match {
case Scan (_, _) => () : Unit
case Project (fields, parent) =>
buffer ++= fields
extractProjections(parent, buffer)
case Join (left, right, on) =>
buffer ++= on.map(_._1) ++ on.map(_._2)
extractProjections(left, buffer)
extractProjections(right, buffer)
}
}
private def pushDown ( pushDownProjection : Seq [ FieldID ], node : LogicalPlan ) : LogicalPlan = {
node match {
case Scan (table, tableProjection) =>
val filteredPushDownProjection = pushDownProjection.filter(_.table == table).map(_.id)
val updatedProjection =
if (filteredPushDownProjection.contains( " * " ) || filteredPushDownProjection.contains( " *.* " )) {
Seq .empty
} else {
(tableProjection ++ filteredPushDownProjection).distinct
}
Scan (table, updatedProjection)
case Join (left, right, on) => Join (pushDown(pushDownProjection, left), pushDown(pushDownProjection, right), on)
case _ => throw new Exception ( " should never happen " )
}
}转换代码将首先从根项目节点中找到所有预测,然后将它们推向下面的所有扫描节点。
例如,可以看到我们的规则,例如计划
图TD
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#6->组#5
应用投影下降变换后,将导致一个新的等效计划,并将投影推向扫描操作(新计划是带有橙色边框节点的树)。
图TD
子图组#8
expr#8 [“扫描TBL2(ID,field1,field2)”]
结尾
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组11
expr#11 [“加入”]
结尾
Expr#11->组#7
Expr#11->组#10
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#7
expr#7 [“扫描TBL1(ID,field1)”]
结尾
子图组#10
expr#10 [“加入”]
结尾
Expr#10->组#8
Expr#10->组#9
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#9
expr#9 [“扫描TBL3(ID,field2)”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#12 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#12->组#11
Expr#6->组#5
样式Expr#12中风宽度:4PX,中风:橙色
样式Expr#8中风宽度:4PX,中风:橙色
样式expr#10中风宽度:4PX,中风:橙色
样式Expr#9中风宽度:4PX,中风:橙色
样式Expr#11中风宽度:4PX,中风:橙色
样式Expr#7中风宽度:4PX,中风:橙色
LinkStyle 0中风宽度:4PX,中风:橙色
LinkStyle 1中风宽度:4PX,中风:橙色
LinkStyle 6中风宽度:4PX,中风:橙色
LinkStyle 7中风宽度:4PX,中风:橙色
LinkStyle 8中风宽度:4PX,中风:橙色
有关完整实施
加入重新订购也是查询计划者世界上最知名的转型之一。我们的计划者还将实施一个重新订购规则。
由于在现实世界中加入重新订购并不是一个容易实施的部分。因此,我们将在此处实施一个简单的挖掘版本加入重订单规则。
首先,规则match :
// check if the tree only contains SCAN and JOIN nodes, and also extract all SCAN nodes and JOIN conditions
private def checkAndExtract (
node : LogicalPlan ,
buffer : mutable. ListBuffer [ Scan ],
joinCondBuffer : mutable. ListBuffer [(ql. FieldID , ql. FieldID )]
) : Boolean = {
node match {
case node @ Scan (_, _) =>
buffer += node
true
case Join (left, right, on) =>
joinCondBuffer ++= on
checkAndExtract(left, buffer, joinCondBuffer) && checkAndExtract(right, buffer, joinCondBuffer)
case _ => false
}
}
private def buildInterchangeableJoinCond ( conditions : Seq [(ql. FieldID , ql. FieldID )]) : Seq [ Seq [ql. FieldID ]] = {
val buffer = mutable. ListBuffer [mutable. Set [ql. FieldID ]]()
conditions.foreach { cond =>
val set = buffer.find { set =>
set.contains(cond._1) || set.contains(cond._2)
} match {
case Some (set) => set
case None =>
val set = mutable. Set [ql. FieldID ]()
buffer += set
set
}
set += cond._1
set += cond._2
}
buffer.map(_.toSeq)
}
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case node @ Join (_, _, _) =>
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
if (checkAndExtract(node, buffer, joinCondBuffer)) {
// only match if the join is 3 tables join
if (buffer.size == 3 ) {
var check = true
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
interChangeableCond.foreach { c =>
check &= c.size == 3
}
check
} else {
false
}
} else {
false
}
case _ => false
}
}如果我们匹配3向加入时,我们的规则只会匹配(涉及表的数量必须为3,并且联接条件必须为3向,例如tbl1.field1 = tbl2.field2 = tbl3.field3 )
例如,
tbl1
JOIN tbl2 ON tbl1 . field1 = tbl2 . field2
JOIN tbl3 ON tbl1 . field1 = tbl3 . field3由于它的3向加入,此处的加入语句将“匹配”(这是tbl1 , tbl2 , tbl3之间的联接,并且条件为tbl1.field1 = tbl2.field2 = tbl3.field3 )
接下来,是变换代码:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Join ]
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
checkAndExtract(plan, buffer, joinCondBuffer)
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
//
val scans = buffer.toList
implicit val ord : Ordering [ Scan ] = new Ordering [ Scan ] {
override def compare ( x : Scan , y : Scan ) : Int = {
val xStats = ctx.statsProvider.tableStats(x.table.id)
val yStats = ctx.statsProvider.tableStats(y.table.id)
xStats.estimatedTableSize.compareTo(yStats.estimatedTableSize)
}
}
def getJoinCond ( left : Scan , right : Scan ) : Seq [(ql. FieldID , ql. FieldID )] = {
val leftFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == left.table)
}
val rightFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == right.table)
}
if (leftFields.length != rightFields.length) {
throw new Exception ( s " leftFields.length( ${leftFields.length} ) != rightFields.length( ${rightFields.length} ) " )
} else {
leftFields zip rightFields
}
}
val sorted = scans.sorted
val newPlan = Join (
sorted( 0 ),
Join (
sorted( 1 ),
sorted( 2 ),
getJoinCond(sorted( 1 ), sorted( 2 ))
),
getJoinCond(sorted( 0 ), sorted( 1 ))
)
ctx.memo.getOrCreateGroupExpression(newPlan)
}这里的转换代码将通过其估计的大小重新排序表。
例如,如果我们有3个表A,B,C,估计尺寸为300B,100B,200B和A JOIN语句A JOIN B JOIN C ,则将其转换为B JOIN C JOIN A
注意:您可能会在此代码中注意到,我们利用表统计信息来提示改变计划。实际上,计划者可以使用各种统计数据来帮助其转换,例如表尺寸,行大小,无效,直方图等。
例如,可以看到我们的规则,例如计划
图TD
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#6->组#5
加入重新订购转换后,导致
图TD
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
expr#8 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
Expr#8->组#2
Expr#8->组#7
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#7
expr#7 [“加入”]
结尾
Expr#7->组#1
Expr#7->组#3
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#6->组#5
样式Expr#8中风宽度:4PX,中风:橙色
样式Expr#7中风宽度:4PX,中风:橙色
LinkStyle 2中风宽度:4PX,中风:橙色
LinkStyle 6中风宽度:4PX,中风:橙色
LinkStyle 3中风宽度:4PX,中风:橙色
LinkStyle 7中风宽度:4PX,中风:橙色
我们可以看到tbl2 JOIN tbl1 JOIN tbl3是由tbl1 JOIN tbl2 JOIN tbl3是由转换生成的(新添加的节点和边缘由Orange Lines表示)
请参阅X3TableJoinreOrderBysize.scala以获取完整实现
现在我们可以将转型规则放在一个地方
private val transformationRules : Seq [ Seq [ TransformationRule ]] = Seq (
Seq ( new ProjectionPushDown ),
Seq ( new X3TableJoinReorderBySize )
)并运行它们来探索等效组
for (r <- transformationRules.indices) {
exploreGroup(ctx.rootGroup, transformationRules(r), r + 1 )
}例如,计划
图TD
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组6
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#6->组#5
探索后,将导致此图
图TD
子图组#8
expr#8 [“扫描TBL2(ID,field1,field2)”]
结尾
子图组11
expr#11 [“加入”]
Expr#14 [“加入”]
结尾
Expr#11->组#7
Expr#11->组#10
Expr#14->组#8
Expr#14->组#12
子图组#2
expr#2 [“扫描TBL2”]
结尾
子图组5
expr#5 [“加入”]
expr#16 [“加入”]
结尾
Expr#5->组#1
Expr#5->组#4
Expr#16->组#2
Expr#16->组#13
子图组#4
expr#4 [“加入”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#13
Expr#15 [“加入”]
结尾
Expr#15->组#1
Expr#15->组#3
子图组#7
expr#7 [“扫描TBL1(ID,field1)”]
结尾
子图组#1
expr#1 [“扫描TBL1”]
结尾
子图组#10
expr#10 [“加入”]
结尾
Expr#10->组#8
Expr#10->组#9
子图组#9
expr#9 [“扫描TBL3(ID,field2)”]
结尾
子图组#3
expr#3 [“扫描TBL3”]
结尾
子图组12
Expr#13 [“加入”]
结尾
Expr#13->组#7
Expr#13->组#9
子图组6
expr#12 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
expr#6 [“ project tbl1.id,tbl1.field1,tbl2.id,tbl2.field1,tbl2.field2,tbl3.id,tbl3.field2,tbl3.field2”]
结尾
Expr#12->组#11
Expr#6->组#5
样式Expr#12中风宽度:4PX,中风:橙色
样式Expr#8中风宽度:4PX,中风:橙色
样式expr#10中风宽度:4PX,中风:橙色
样式Expr#13中风宽度:4PX,中风:橙色
样式Expr#14笔宽度:4PX,中风:橙色
样式Expr#11中风宽度:4PX,中风:橙色
样式Expr#9中风宽度:4PX,中风:橙色
样式expr#15笔划宽度:4PX,中风:橙色
样式Expr#7中风宽度:4PX,中风:橙色
样式expr#16笔宽度:4PX,中风:橙色
LinkStyle 0中风宽度:4PX,中风:橙色
LinkStyle 15笔宽度:4PX,中风:橙色
Linkstyle 12中风宽度:4PX,中风:橙色
LinkStyle 1中风宽度:4PX,中风:橙色
LinkStyle 16笔宽度:4PX,中风:橙色
LinkStyle 13中风宽度:4PX,中风:橙色
LinkStyle 2中风宽度:4PX,中风:橙色
LinkStyle 6中风宽度:4PX,中风:橙色
LinkStyle 3中风宽度:4PX,中风:橙色
LinkStyle 10笔宽度:4PX,中风:橙色
LinkStyle 7中风宽度:4PX,中风:橙色
LinkStyle 14中风宽度:4PX,中风:橙色
LinkStyle 11中风宽度:4PX,中风:橙色
有关更多详细信息,请参见Volcanoplanner.scala
在探索阶段之后,我们现在拥有一个完全扩展的树,其中包含所有可能的计划,现在是优化阶段。
在此阶段,我们将为根系找到最佳计划。优化过程如下:
这是一个例子
图TD
子图组#2 [“组#2(成本= 1)”]
expr#2 [“ expr#2(成本= 1)”]
结尾
子图组#5 [“组#5(成本= 3)”]
expr#5 [“ expr#5(成本= max(3,2)= 3”]
结尾
Expr#5->组#1
Expr#5->组#4
子图组#4 [“组#4(成本= 2)”]
expr#4 [“ expr#4(成本= max(1,2)= 2)”]
expr#7 [“ expr#7(成本= 1+2 = 3)”]
结尾
Expr#4->组#2
Expr#4->组#3
子图组#1 [“组#1(成本= 3)”]
expr#1 [“ expr#1(成本= 3)”]
结尾
子图组#3 [“组#3(成本= 2)”]
expr#3 [“ expr#3(成本= 2)”]
结尾
子图组#6 [“组#6(成本= 4.5)”]
expr#6 [“ expr#6(成本= 3*1.5 = 4.5)”]
结尾
Expr#6->组#5
子图组#8 [“组#8(成本= 1)”]
expr#8 [“ expr#8(成本= 1)”]
结尾
子图组#9 [“组#9(成本= 2)”]
expr#9 [“ expr#9(成本= 2)”]
结尾
Expr#7->组#8
Expr#7->组#9
例如,使用max功能, Expr#4成本由其子小组成本( Group#2和Group#3 )计算得出。另一个例子是Group#4 ,其成本是通过计算其等效表达式成本之间的最小值来计算的。
由于优化阶段的目标是鉴于探索的组表达式制定最佳的物理计划。我们可以定义物理计划如下:
sealed trait PhysicalPlan {
def operator () : Operator
def children () : Seq [ PhysicalPlan ]
def cost () : Cost
def estimations () : Estimations
def traits () : Set [ String ]
}
operator是用于执行计划的物理操作员,我们将在后面的部分中介绍它。然后, children是儿童计划节点的列表,他们用来参与成本计算过程。第三个属性是cost , cost是对象持有成本信息(例如CPU成本,内存成本,IO成本等)。 estimations是持有该计划的估计统计信息(例如行计数,行大小等),它也参与了成本计算。最后, traits是一组物理特征,它影响实施规则,以影响物理计划生成过程。
接下来,我们可以实现物理节点类:
case class Scan (
operator : Operator ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq .empty // scan do not receive any child
}
case class Project (
operator : Operator ,
child : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (child)
}
case class Join (
operator : Operator ,
leftChild : PhysicalPlan ,
rightChild : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (leftChild, rightChild)
}
有关完整实施,请参见物理计划
优化阶段的第一件事,即,我们必须实施实施规则。实施规则是从逻辑计划转换为实体计划而无需执行的规则。
由于我们不是直接执行计划者中的物理计划,因此我们将返回物理计划构建器,也更容易自定义每个节点的成本函数。
这是实施规则的接口:
trait PhysicalPlanBuilder {
def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ]
}
trait ImplementationRule {
def physicalPlanBuilders ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ]
}
在这里,鉴于孩子的物理计划, PhysicalPlanBuilder商是用于构建物理计划的界面。
例如,逻辑联接具有2个物理实现,哈希和合并加入
图TD
儿童#1 [“孩子#1”]
孩子#2 [“孩子#2”]
孩子#3 [“孩子#3”]
孩子#4 [“孩子#4”]
hash_join [`hash加入
成本= f(成本(儿童#1),费用(儿童#2))
``]]
merge_join [“`合并加入
成本= g(成本(儿童#3),成本(儿童#4))
``]]
hash_join->孩子#1
hash_join->孩子#2
Merge_join->孩子#3
merge_join --> child#4
the HASH JOIN cost is using function f() to calculate cost, and MERGE JOIN is using function g() to calculate cost, both are using its children as function parameters. So it's easier to code if we're returning just the phyiscal plan builder from the implementation rule instead of the physical plan.
As we've said before, the optimization process is described as following:
And here is the code:
private def implementGroup ( group : Group , combinedRule : ImplementationRule )(
implicit ctx : VolcanoPlannerContext
) : GroupImplementation = {
group.implementation match {
case Some (implementation) => implementation
case None =>
var bestImplementation = Option .empty[ GroupImplementation ]
group.equivalents.foreach { equivalent =>
val physicalPlanBuilders = combinedRule.physicalPlanBuilders(equivalent)
val childPhysicalPlans = equivalent.children.map { child =>
val childImplementation = implementGroup(child, combinedRule)
child.implementation = Option (childImplementation)
childImplementation.physicalPlan
}
// calculate the implementation, and update the best cost for group
physicalPlanBuilders.flatMap(_.build(childPhysicalPlans)).foreach { physicalPlan =>
val cost = physicalPlan.cost()
bestImplementation match {
case Some (currentBest) =>
if (ctx.costModel.isBetter(currentBest.cost, cost)) {
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
case None =>
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
}
}
bestImplementation.get
}
}This code is an exhaustive search code, which is using recursive function to traverse all nodes. At each node (group), the function is called once to get its best plan while also calculate the optimal cost of that group.
Finally, the best plan for our query is the best plan of the root group
val implementationRules = new ImplementationRule {
override def physicalPlanBuilders (
expression : GroupExpression
)( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
expression.plan match {
case node @ Scan (_, _) => implement. Scan (node)
case node @ Project (_, _) => implement. Project (node)
case node @ Join (_, _, _) => implement. Join (node)
}
}
}
ctx.rootGroup.implementation = Option (implementGroup(ctx.rootGroup, implementationRules))See VolcanoPlanner.scala for full implementation
Here is an example of the plan after optimization, it's shown the selected logical node, the selected physical operator, and the estimated cost
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
小组#10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Next, we will implement some implementation rules.
The first, easiest one is the implementation rule of logical PROJECT
object Project {
def apply ( node : logicalplan. Project )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
Seq (
new ProjectionImpl (node.fields)
)
}
}
class ProjectionImpl ( projection : Seq [ql. FieldID ]) extends PhysicalPlanBuilder {
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val child = children.head
val selfCost = Cost (
estimatedCpuCost = 0 ,
estimatedMemoryCost = 0 ,
estimatedTimeCost = 0
) // assuming the cost of projection is 0
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + child.cost().estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + child.cost().estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + child.cost().estimatedTimeCost
)
val estimations = Estimations (
estimatedLoopIterations = child.estimations().estimatedLoopIterations,
estimatedRowSize = child.estimations().estimatedRowSize // just guessing the value
)
Some (
Project (
operator = ProjectOperator (projection, child.operator()),
child = child,
cost = cost,
estimations = estimations,
traits = child.traits()
)
)
}
}
The implementation rule for logical PROJECT, is returning one physical plan builder ProjectionImpl . ProjectionImpl cost calculation is simple, it just inherits the cost from the child node (because the projection is actually not doing any intensive operation). Beside that, it also updates the estimation (in this code, estimation is also inherit from the child node)
See Project.scala for full implementation
Writing implementation rule for logical JOIN is way harder than PROJECTION.
One first reason is, a logical JOIN has many physical implementation, such as HASH JOIN, MERGE JOIN, BROADCAST JOIN, etc.
The second reason is, estimating cost for physical JOIN is also hard, because it depends on lots of factors such as row count, row size, data histogram, indexes, data layout, etc.
So, to keep everything simple in this guide, I will only implement 2 physical JOIN: HASH JOIN and MERGE JOIN. The cost estimation functions are fictional (just to show how it works, I'm not trying to correct it). And in the MERGE JOIN, all data is assuming to be sorted by join key.
这是代码:
object Join {
def apply ( node : logicalplan. Join )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
val leftFields = node.on.map(_._1).map(f => s " ${f.table.id} . ${f.id} " )
val rightFields = node.on.map(_._2).map(f => s " ${f.table.id} . ${f.id} " )
Seq (
new HashJoinImpl (leftFields, rightFields),
new MergeJoinImpl (leftFields, rightFields)
)
}
}
The HASH JOIN:
class HashJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
private def viewSize ( plan : PhysicalPlan ) : Long = {
plan.estimations().estimatedLoopIterations * plan.estimations().estimatedRowSize
}
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
// reorder the child nodes, the left child is the child with smaller view size (smaller than the right child if we're store all of them in memory)
val (leftChild, rightChild) = if (viewSize(children( 0 )) < viewSize(children( 1 ))) {
(children( 0 ), children( 1 ))
} else {
(children( 1 ), children( 0 ))
}
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = HashJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = Set .empty // don't inherit trait from children since we're hash join
)
)
}
}
We can see that the cost function of HASH JOIN is composed of its children costs and estimations
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)Next, the MERGE JOIN:
class MergeJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val (leftChild, rightChild) = (children( 0 ), children( 1 ))
if (leftChild.traits().contains( " SORTED " ) && rightChild.traits().contains( " SORTED " )) {
val estimatedTotalRowCount =
leftChild.estimations().estimatedLoopIterations +
rightChild.estimations().estimatedLoopIterations
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = MergeJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = leftChild.traits() ++ rightChild.traits()
)
)
} else {
None
}
}
}
Same with HASH JOIN, MERGE JOIN also uses its children costs and estimations to calculate its cost, but with different formulla:
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)See HashJoinImpl.scala and MergeJoinImpl.scala for full implementation
You can see other rules and physical plan builders here:
Now, after done implementing the implementation rules, now we can find our best plan. Let's start over from the user query
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idwill be converted to the logical plan
graph TD
1326583549["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"];
-425111028["JOIN"];
-349388609["SCAN tbl1"];
1343755644["JOIN"];
-1043437086["SCAN tbl2"];
-1402686787["SCAN tbl3"];
1326583549 --> -425111028;
-425111028 --> -349388609;
-425111028 --> 1343755644;
1343755644 --> -1043437086;
1343755644 --> -1402686787;
After exploration phase, it will generate lots of equivalent plans
graph TD
subgraph Group#8
Expr#8["SCAN tbl2 (id, field1, field2)"]
结尾
subgraph Group#11
Expr#11["JOIN"]
Expr#14["JOIN"]
结尾
Expr#11 --> Group#7
Expr#11 --> Group#10
Expr#14 --> Group#8
Expr#14 --> Group#12
subgraph Group#2
Expr#2["SCAN tbl2"]
结尾
subgraph Group#5
Expr#5["JOIN"]
Expr#16["JOIN"]
结尾
Expr#5 --> Group#1
Expr#5 --> Group#4
Expr#16 --> Group#2
Expr#16 --> Group#13
subgraph Group#4
Expr#4["JOIN"]
结尾
Expr#4 --> Group#2
Expr#4 --> Group#3
subgraph Group#13
Expr#15["JOIN"]
结尾
Expr#15 --> Group#1
Expr#15 --> Group#3
subgraph Group#7
Expr#7["SCAN tbl1 (id, field1)"]
结尾
subgraph Group#1
Expr#1["SCAN tbl1"]
结尾
subgraph Group#10
Expr#10["JOIN"]
结尾
Expr#10 --> Group#8
Expr#10 --> Group#9
subgraph Group#9
Expr#9["SCAN tbl3 (id, field2)"]
结尾
subgraph Group#3
Expr#3["SCAN tbl3"]
结尾
subgraph Group#12
Expr#13["JOIN"]
结尾
Expr#13 --> Group#7
Expr#13 --> Group#9
subgraph Group#6
Expr#12["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
Expr#6["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
结尾
Expr#12 --> Group#11
Expr#6 --> Group#5
style Expr#12 stroke-width: 4px, stroke: orange
style Expr#8 stroke-width: 4px, stroke: orange
style Expr#10 stroke-width: 4px, stroke: orange
style Expr#13 stroke-width: 4px, stroke: orange
style Expr#14 stroke-width: 4px, stroke: orange
style Expr#11 stroke-width: 4px, stroke: orange
style Expr#9 stroke-width: 4px, stroke: orange
style Expr#15 stroke-width: 4px, stroke: orange
style Expr#7 stroke-width: 4px, stroke: orange
style Expr#16 stroke-width: 4px, stroke: orange
linkStyle 0 stroke-width: 4px, stroke: orange
linkStyle 15 stroke-width: 4px, stroke: orange
linkStyle 12 stroke-width: 4px, stroke: orange
linkStyle 1 stroke-width: 4px, stroke: orange
linkStyle 16 stroke-width: 4px, stroke: orange
linkStyle 13 stroke-width: 4px, stroke: orange
linkStyle 2 stroke-width: 4px, stroke: orange
linkStyle 6 stroke-width: 4px, stroke: orange
linkStyle 3 stroke-width: 4px, stroke: orange
linkStyle 10 stroke-width: 4px, stroke: orange
linkStyle 7 stroke-width: 4px, stroke: orange
linkStyle 14 stroke-width: 4px, stroke: orange
linkStyle 11 stroke-width: 4px, stroke: orange
And the at optimization phase, a final best plan is chose
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
小组#10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Now we've done building a functional query planner which can optimize the query from user, but our query plan could not run by itself. So it's the reason why now we will implement the query processor to test out our query plan.
Basically the query process receive input from the query planner, and execute them
图LR
plan(("Physical Plan"))
storage[("Storage Layer")]
processor["Query Processor"]
plan -- execute --> processor
storage -- fetch --> processor
Volcano/iterator model is the query processing model that is widely used in many DBMS. It is a pipeline architecture, which means that the data is processed in stages, with each stage passing the output of the previous stage to the next stage.
Each stage in the pipeline is represented by an operator. Operators are functions that perform a specific operation on the data, such as selecting rows, filtering rows, or aggregating rows.
Usually, operator can be formed directly from the query plan.例如,查询
SELECT field_1
FROM tbl
WHERE field = 1will have the plan
graph TD
project["PROJECT: field_1"]
scan["SCAN: tbl"]
filter["FILTER: field = 1"]
project --> scan
filter --> project
will create a chain of operators like this:
scan = {
next() // fetch next row from table "tbl"
}
project = {
next() = {
next_row = scan.next() // fetch next row from scan operator
projected = next_row["field_1"]
return projected
}
}
filter = {
next() = {
next_row = {}
do {
next_row = project.next() // fetch next row from project operator
} while (next_row["field"] != 1)
return next_row
}
}
results = []
while (row = filter.next()) {
results.append(row)
}
notes : this pseudo code did not handle for end of result stream
The basic interface of an operator is described as following:
trait Operator {
def next () : Option [ Seq [ Any ]]
}
See Operator.scala for full implementation of all operators
Let's define a query
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idwith some data and stats
val table1 : Datasource = Datasource (
table = " emp " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" code " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id
),
rows = Seq (
Seq ( " 1 " , " Emp A " ),
Seq ( " 2 " , " Emp B " ),
Seq ( " 3 " , " Emp C " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " code " -> 32 )
)
)
val table2 : Datasource = Datasource (
table = " dept " ,
catalog = TableCatalog (
Seq (
" emp_id " -> classOf [ String ],
" dept_name " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by emp_id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " Dept 1 " ),
Seq ( " 1 " , " Dept 2 " ),
Seq ( " 2 " , " Dept 3 " ),
Seq ( " 3 " , " Dept 3 " )
),
stats = TableStats (
estimatedRowCount = 4 ,
avgColumnSize = Map ( " emp_id " -> 10 , " dept_name " -> 255 )
)
)
val table3 : Datasource = Datasource (
table = " emp_info " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" name " -> classOf [ String ],
" origin " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " AAAAA " , " Country A " ),
Seq ( " 2 " , " BBBBB " , " Country A " ),
Seq ( " 3 " , " CCCCC " , " Country B " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " name " -> 255 , " origin " -> 255 )
)
)The cost model is optimized for CPU
val costModel : CostModel = ( currentCost : Cost , newCost : Cost ) => {
currentCost.estimatedCpuCost > newCost.estimatedCpuCost
}Now, executing the query by running this code:
val planner = new VolcanoPlanner
QueryParser .parse(query) match {
case Left (err) => throw err
case Right (parsed) =>
val operator = planner.getPlan(parsed)
val result = Utils .execute(operator)
// print result
println(result._1.mkString( " , " ))
result._2.foreach(row => println(row.mkString( " , " )))
}it will print:
emp.id,emp.code,dept.dept_name,emp_info.name,emp_info.origin
1,Emp A,Dept 1,AAAAA,Country A
1,Emp A,Dept 2,AAAAA,Country A
2,Emp B,Dept 3,BBBBB,Country A
3,Emp C,Dept 3,CCCCC,Country B
Voila, We've done building a fully functional query planner and query engine :). You can start writing one for your own, good luck
See Demo.scala for full demo code
Thanks for reading this, this guide is quite long, and not fully correct, but I've tried my best to write it as understandably as possible ?