[プレースホルダー]
クエリプランナーは、データベースクエリを実行するための計画を生成する責任があるデータベース管理システム(DBMS)のコンポーネントです。クエリ計画は、クエリで要求されたデータを取得するためにDBMSが取る手順を指定します。クエリプランナーの目標は、可能な限り効率的な計画を生成することです。つまり、データをできるだけ早くユーザーに返すことを意味します。
クエリプランナーは複雑なソフトウェアであり、理解するのが難しい場合があります。コストベースのクエリプランナーを実装するためのこのガイドは、プロセスの段階的な概要、独自のコストベースのクエリプランナーを実装する方法を提供し、クエリプランナーの基本概念をカバーします。
AIによって書かれた、Humanが編集
このガイドは次のように書かれています:
目標:
グラフTD
ユーザー((ユーザー))
パーサー[クエリパーサー]
プランナー[クエリプランナー]
executor [クエリプロセッサ]
ユーザー - テキストクエリ - >パーサー
パーサー-AST->プランナー
プランナー - 物理計画 - >執行者
クエリエンジンの基本アーキテクチャは、これらのコンポーネントで構成されています。
通常、クエリプランナーは2つのタイプに分割されます。
ヒューリスティックプランナーは、事前定義されたルールを使用してクエリプランを生成するクエリプランナーです。
コストベースのプランナーは、クエリを生成するためのコストに基づいて、入力クエリのコストに基づいて最適なプランを見つけようとするクエリプランナーです。
ヒューリスティックプランナーは通常、変革された計画がより良いことを知っている場合、Transformルールを適用することで最適な計画を見つけますが、コストベースのプランナーは、同等の計画を列挙して最適な計画を見つけ、それらの中で最適な計画を見つけようとします。
コストベースのクエリプランナーでは、通常はフェーズで構成されています。
プランの列挙フェーズでは、プランナーは可能な同等の計画を列挙します。
その後、クエリ最適化フェーズでは、プランナーは列挙されたプランのリストから最適なプランを検索します。最良の計画は、コストが最も低い計画であり、コストモデル(またはコスト関数)が定義されています。
論理計画の自然は、木のような構造を持っているため、最適化/検索は実際にはツリー検索の問題であると考えることができます。そして、ここにはツリーサーチアルゴリズムがたくさんあります。
注:理論的には、あらゆる種類のツリー検索アルゴリズムを使用することが可能です。ただし、実際には、検索アルゴリズムが複雑なときに検索時間が増加するため、実用的ではありません
注:検索終了条件は通常、次のとおりです。
火山クエリプランナー(または火山オプティマイザージェネレーター)は、コストベースのクエリプランナーです
Volcano Plannerは、動的プログラミングアプローチを使用して、列挙された計画のリストから最高のクエリプランを見つけます。
詳細:https://ieeexplore.ieee.org/document/344061(ここで論文を説明するにはあまりにも面倒です)
素晴らしい説明です:https://15721.courses.cs.cmu.edu/spring2017/slides/15-optimizer2.pdf#page=9
私たちのクエリプランナーは、火山のクエリプランナーの基本的なアイデアに従って、コストベースのクエリプランナーです。プランナーは、2つの主要なフェーズで構成されます。
グラフlr
AST((AST))
logical_plan [plan]
Explored_plans ["`
プラン#1
...
計画#N
`"]
information_plan ["plan#x(best plan)"]]
AST-論理計画に変換 - > logical_plan
logical_plan-探索フェーズ - > explored_plans
Explored_plans-最適化フェーズ - >実装_plan
LinkStyle 1,2色:オレンジ、ストローク:オレンジ、ストローク幅:5px
論理計画は、クエリを実行するために必要な変換ステップの抽象化を保持するデータストラクチャです。
ここに論理的な計画の例があります。
グラフTD
1 ["Project TBL1.ID、TBL1.FIELD1、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"];
2 ["Join"];
3 ["スキャンTBL1"];
4 ["join"];
5 ["スキャンTBL2"];
6 ["スキャンTBL3"];
1-> 2;
2-> 3;
2-> 4;
4-> 5;
4-> 6;
論理計画は抽象化のみを保持していますが、物理計画は実装の詳細を保持しているデータストラクチャです。各論理計画には、複数の物理的計画があります。たとえば、論理的な結合には、Hash Join、Merge Join、Broadcast Joinなどなど、多くの物理的計画があります。
同等のグループは、同等の式のグループです(それぞれの表現について、その論理計画は論理的に同等です)
例えば
グラフTD
サブグラフグループ#8
expr#8 ["スキャンtbl2(field1、field2、id)"]]
終わり
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#11
expr#11 ["join"]
終わり
Expr#11->グループ#7
Expr#11->グループ#10
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#7
expr#7 ["スキャンtbl1(id、field1)"]]
終わり
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#10
expr#10 ["join"]
終わり
expr#10->グループ#8
expr#10->グループ#9
サブグラフグループ#9
expr#9 ["スキャンtbl3(id、field2)"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#12 ["Project TBL1.ID、TBL1.FIELD1、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]
終わり
expr#12->グループ#11
Expr#6->グループ#5
ここではGroup#6が2つの同等の式を持っていることがわかります。どちらも同じクエリを表しています(1つはテーブルからスキャンしてプロジェクトを行い、1つはプロジェクションを下に押し下げてスキャンノードに押し下げています)。
変換ルールは、ある論理計画から別の論理的な等価論的計画に変換するルールです
たとえば、計画:
グラフTD
1 ["Project TBL1.ID、TBL1.FIELD1、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"];
2 ["Join"];
3 ["スキャンTBL1"];
4 ["join"];
5 ["スキャンTBL2"];
6 ["スキャンTBL3"];
1-> 2;
2-> 3;
2-> 4;
4-> 5;
4-> 6;
投影プッシュダウン変換を適用すると、次のように変換されます。
グラフTD
1 ["project *。 *"];
2 ["Join"];
3 ["スキャンTBL1(id、field1)"];
4 ["join"];
5 ["スキャンTBL2(Field1、Field2)"];
6 ["スキャンTBL3(id、field2、field2)"];
1-> 2;
2-> 3;
2-> 4;
4-> 5;
4-> 6;
変換ルールは、テーブルスキーマ、データ統計などの論理的特性/プロパティによって影響を与える可能性があります。
実装規則は、論理計画を考慮して物理的な計画を返すルールです。
実装ルールは、データレイアウト(ソートまたはいずれか)などの物理的特性/プロパティによって影響を与える可能性があります。
調査段階では、プランナーは変換ルールを適用し、同等の論理計画を生成します
たとえば、計画:
グラフTD
1326583549 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"];
-425111028 ["join"];
-349388609 ["Scan tbl1"];
1343755644 ["join"];
-1043437086 ["Scan tbl2"];
-1402686787 ["Scan TBL3"];
1326583549-> -425111028;
-425111028-> -349388609;
-425111028-> 1343755644;
1343755644-> -1043437086;
1343755644-> -1402686787;
変換ルールを適用した後、次のグラフが得られます。
グラフTD
サブグラフグループ#8
expr#8 ["スキャンtbl2(id、field1、field2)"]]
終わり
サブグラフグループ#11
expr#11 ["join"]
expr#14 ["join"]
終わり
Expr#11->グループ#7
Expr#11->グループ#10
Expr#14->グループ#8
Expr#14->グループ#12
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
expr#16 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
Expr#16->グループ#2
Expr#16->グループ#13
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#13
expr#15 ["join"]
終わり
expr#15->グループ#1
Expr#15->グループ#3
サブグラフグループ#7
expr#7 ["スキャンtbl1(id、field1)"]]
終わり
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#10
expr#10 ["join"]
終わり
expr#10->グループ#8
expr#10->グループ#9
サブグラフグループ#9
expr#9 ["スキャンtbl3(id、field2)"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#12
expr#13 ["join"]
終わり
Expr#13->グループ#7
Expr#13->グループ#9
サブグラフグループ#6
Expr#12 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
expr#12->グループ#11
Expr#6->グループ#5
ここでは、プロジェクションプッシュダウンルールと結合Reorderルールが適用されていることがわかります。
最適化段階は、探査段階で拡張されたツリーを通過して、クエリに最適な計画を見つけることです。
この「実際に」はツリー検索の最適化であるため、想像できるツリー検索アルゴリズムを使用できます(ただし、正しいことを確認する必要があります)。
最適化フェーズ後の生成された物理計画の例は次のとおりです。
グラフTD
グループ#6 ["
グループ#6
選択:Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2
オペレーター:Projecoperator
コスト:コスト(cpu = 641400.00、mem = 1020400012.00、time = 1000000.00)
「]
グループ#6->グループ#11
グループ#11 ["
グループ#11
選択:参加
オペレーター:HashjoInoperator
コスト:コスト(cpu = 641400.00、mem = 1020400012.00、time = 1000000.00)
「]
グループ#11->グループ#7
グループ#11->グループ#10
グループ#7 ["
グループ#7
選択:スキャンTBL1(id、field1)
オペレーター:NormalScanoperator
コスト:コスト(cpu = 400.00、mem = 400000.00、time = 1000.00)
「]
グループ#10 ["
グループ#10
選択:参加
オペレーター:MergejoInoperator
特性:ソート
コスト:コスト(cpu = 640000.00、mem = 20000012.00、time = 1100000.00)
「]
グループ#10->グループ#8
グループ#10->グループ#9
グループ#8 ["
グループ#8
選択:スキャンTBL2(id、field1、field2)
オペレーター:NormalScanoperator
特性:ソート
コスト:コスト(cpu = 600000.00、mem = 12.00、time = 1000000.00)
「]
グループ#9 ["
グループ#9
選択:スキャンTBL3(ID、Field2)
オペレーター:NormalScanoperator
特性:ソート
コスト:コスト(cpu = 40000.00、mem = 20000000.00、time = 100000.00)
「]
生成された計画は、選択された論理計画、推定コスト、および物理演算子を示しています
私たちのプランナーは、最良の計画を見つけるために疲労検索を実行します
プランナーのコードは大きいので、ステップバイステップガイドを書きませんが、代わりにコードのすべての部分を説明します
ここでは、このチュートリアルを徹底的に使用したクエリ言語を定義します
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . id実装するクエリ言語は、SQLのような言語です。ただし、簡単にするために、その機能と構文を制限します。
言語は形で表示されます
SELECT tbl . field , [...]
FROM tbl JOIN [...] SELECTとJOINのみをサポートします。また、Selectステートメントのフィールドは完全に資格を取得する必要があります( table.fieldの形式)、他のすべての機能はサポートされません
まず、言語のASTを定義する必要があります。 AST(または抽象的構文ツリー)は、テキストの構文構造を表すために使用されるツリーです。
私たちの言語は非常に単純なので、いくつかのコードでAST構造を定義することができます。
sealed trait Identifier
case class TableID ( id : String ) extends Identifier
case class FieldID ( table : TableID , id : String ) extends Identifier
sealed trait Statement
case class Table ( table : TableID ) extends Statement
case class Join ( left : Statement , right : Statement , on : Seq [( FieldID , FieldID )]) extends Statement
case class Select ( fields : Seq [ FieldID ], from : Statement ) extends Statement
たとえば、クエリ
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idとして表すことができます
Select (
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ),
FieldID ( TableID ( " tbl1 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " id " ),
FieldID ( TableID ( " tbl2 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " id " ),
FieldID ( TableID ( " tbl3 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " field2 " )
),
Join (
Table ( TableID ( " tbl1 " )),
Join (
Table ( TableID ( " tbl2 " )),
Table ( TableID ( " tbl3 " )),
Seq (
FieldID ( TableID ( " tbl2 " ), " id " ) -> FieldID ( TableID ( " tbl3 " ), " id " )
)
),
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ) -> FieldID ( TableID ( " tbl2 " ), " id " )
)
)
)AST構造を定義した後、テキストクエリをASTフォームに変換するために使用されるクエリパーサーを作成する必要があります。
このガイドは実装にScalaを使用しているため、Scala-Parser-combinatorsを選択してクエリパーサーを作成します。
クエリパーサークラス:
object QueryParser extends ParserWithCtx [ QueryExecutionContext , Statement ] with RegexParsers {
override def parse ( in : String )( implicit ctx : QueryExecutionContext ) : Either [ Throwable , Statement ] = {
Try (parseAll(statement, in) match {
case Success (result, _) => Right (result)
case NoSuccess (msg, _) => Left ( new Exception (msg))
}) match {
case util. Failure (ex) => Left (ex)
case util. Success (value) => value
}
}
private def select : Parser [ Select ] = ??? // we will implement it in later section
private def statement : Parser [ Statement ] = select
}
次に、いくつかの解析ルールを定義します。
// common
private def str : Parser [ String ] = """ [a-zA-Z0-9_]+ """ .r
private def fqdnStr : Parser [ String ] = """ [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ """ .r
// identifier
private def tableId : Parser [ TableID ] = str ^^ (s => TableID (s))
private def fieldId : Parser [ FieldID ] = fqdnStr ^^ { s =>
val identifiers = s.split( '.' )
if (identifiers.length != 2 ) {
throw new Exception ( " should never happen " )
} else {
val table = identifiers.head
val field = identifiers( 1 )
FieldID ( TableID (table), field)
}
}識別子を解析するために使用される2つのルールを以下に示します: TableIDとFieldID 。
通常、テーブルID(またはテーブル名)には文字、数字、アンダースコアのみが含まれているため( _ )、単純なRegex [a-zA-Z0-9_]+を使用してテーブル名を識別します。
一方、私たちの言語のフィールドID(フィールド予選の場合)は、完全に資格のあるフィールド名です。通常、 table.fieldの形式であり、フィールド名には通常、文字、数字、アンダースコアのみが含まれているため、regex [a-zA-Z0-9_]+.[a-zA-Z0-9_]+
識別子を解析するためのルールを定義した後、クエリステートメントを解析するルールを定義できるようになりました。
// statement
private def table : Parser [ Table ] = tableId ^^ (t => Table (t))
private def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) " tableルールは単純なルールであり、 tableIdルールから解析されたTableIDを使用してTableノードを作成するだけです。
subQuery 、サブクエリを解析するためのルールです。 SQLでは、次のようなクエリを書くことができます。
SELECT a
FROM ( SELECT b FROM c) d SELECT b FROM cは、上記のステートメントのサブクエリです。ここでは、単純なクエリ言語では、括弧( () )のペアで囲まれている場合、ステートメントがサブクエリであることを示します。私たちの言語には選択されたステートメントのみがあるため、次のように解析ルールを書くことができます。
def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) "次に、選択されたステートメントの解析ルールを定義します。
private def fromSource : Parser [ Statement ] = table ||| subQuery
private def select : Parser [ Select ] =
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep(
" JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)
) ^^ {
case _ ~ fields ~ _ ~ src ~ joins =>
val p = if (joins.nonEmpty) {
def chain ( left : Statement , right : Seq [( Statement , Seq [( FieldID , FieldID )])]) : Join = {
if (right.isEmpty) {
throw new Exception ( " should never happen " )
} else if (right.length == 1 ) {
val next = right.head
Join (left, next._1, next._2)
} else {
val next = right.head
Join (left, chain(next._1, right.tail), next._2)
}
}
val temp = joins.map { join =>
val statement = join._1._1._2
val joinOn = join._2.map(on => on._1._1 -> on._2)
statement -> joinOn
}
chain(src, temp)
} else {
src
}
Select (fields, p)
}SQLでは、サブクエリを結合ソースとして使用できます。例えば:
SELECT * . *
FROM tbl1
JOIN ( SELECT * . * FROM tbl2)
JOIN tbl3したがって、パーサーは、声明の結合部分でサブクエリを解析するためのルールを実装します。だからこそ、次のことがあります。
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep( " JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)完全な実装については、queryparser.scalaを参照してください
queryparserspec.scalaを参照してください
テキストクエリからASTを生成した後、それを直接論理計画に変換できます
まず、論理計画のインターフェイスを定義しましょう。
sealed trait LogicalPlan {
def children () : Seq [ LogicalPlan ]
}
children 、子どもの論理計画のリストです。例えば:
グラフTD
1326583549 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"];
-425111028 ["join"];
-349388609 ["Scan tbl1"];
1343755644 ["join"];
-1043437086 ["Scan tbl2"];
-1402686787 ["Scan TBL3"];
1326583549-> -425111028;
-425111028-> -349388609;
-425111028-> 1343755644;
1343755644-> -1043437086;
1343755644-> -1402686787;
PROJECTノードの子ノードは、最初のJOINノードです。最初のJOINノードには2人の子供がいます。これは2番目のJOINノードとSCAN tbl1ノードです。すぐ、 ...
クエリ言語は簡単なので、3種類の論理ノードのみが必要です。
case class Scan ( table : ql. TableID , projection : Seq [ String ]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq .empty
}
case class Project ( fields : Seq [ql. FieldID ], child : LogicalPlan ) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (child)
}
case class Join ( left : LogicalPlan , right : LogicalPlan , on : Seq [(ql. FieldID , ql. FieldID )]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (left, right)
}
次に、関数を書き込み、ASTを論理計画に変換できます。
def toPlan ( node : ql. Statement ) : LogicalPlan = {
node match {
case ql. Table (table) => Scan (table, Seq .empty)
case ql. Join (left, right, on) => Join (toPlan(left), toPlan(right), on)
case ql. Select (fields, from) => Project (fields, toPlan(from))
}
}完全な実装については、logicalplan.scalaを参照してください
グループのクラスを次のように定義できます。
case class Group (
id : Long ,
equivalents : mutable. HashSet [ GroupExpression ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
var implementation : Option [ GroupImplementation ] = None
}
case class GroupExpression (
id : Long ,
plan : LogicalPlan ,
children : mutable. MutableList [ Group ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
val appliedTransformations : mutable. HashSet [ TransformationRule ] = mutable. HashSet ()
}
Group 、論理的に同等の計画のセットです。
各GroupExpression 、論理計画ノードを表します。論理計画ノードには(前のセクション)子ノードのリストが表示され、 GroupExpression論理プランノードを表し、 Group同等の計画のセットを表すため、 GroupExpressionの子供はGroupのリストです。
例えば
グラフTD
サブグラフグループ#8
expr#8
終わり
サブグラフグループ#2
expr#2
終わり
サブグラフグループ#11
expr#11
終わり
Expr#11->グループ#7
Expr#11->グループ#10
サブグラフグループ#5
expr#5
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#7
expr#7
終わり
サブグラフグループ#1
expr#1
終わり
サブグラフグループ#10
expr#10
終わり
expr#10->グループ#8
expr#10->グループ#9
サブグラフグループ#9
expr#9
終わり
サブグラフグループ#3
expr#3
終わり
サブグラフグループ#6
expr#12
expr#6
終わり
expr#12->グループ#11
Expr#6->グループ#5
ここで見ることができるように、 Group#6は2つの同等の式があります: Expr#12とExpr#6 、およびExpr#12の子供はGroup#11です
注:探査段階で複数のラウンド変換を実装するため、各GroupとGroupExpressionについて、探査ステータスを示すExplorationMarkが表示されます。
class ExplorationMark {
private var bits : Long = 0
def get : Long = bits
def isExplored ( round : Int ) : Boolean = BitUtils .getBit(bits, round)
def markExplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = true )
def markUnexplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = false )
}
ExplorationMark単なるBitset Lapperクラスであり、I-Th Roundが調査されている場合、I-Thビットは1にマークされ、それ以外の場合は0としてマークされます。
ExplorationMark 、正確な変換を視覚化するためにも使用できます。詳細については、視覚化を参照してください
メモは、同等のグループの構築を支援する多くのヘルパーです。メモは、グループとグループの表現をキャッシュするためのいくつかのハッシュマップで構成され、新しいグループまたはグループの表現を登録する方法も提供します。
class Memo (
groupIdGenerator : Generator [ Long ] = new LongGenerator ,
groupExpressionIdGenerator : Generator [ Long ] = new LongGenerator
) {
val groups : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]()
val parents : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]() // lookup group from group expression ID
val groupExpressions : mutable. HashMap [ LogicalPlan , GroupExpression ] = mutable. HashMap [ LogicalPlan , GroupExpression ]()
def getOrCreateGroupExpression ( plan : LogicalPlan ) : GroupExpression = {
val children = plan.children()
val childGroups = children.map(child => getOrCreateGroup(child))
groupExpressions.get(plan) match {
case Some (found) => found
case None =>
val id = groupExpressionIdGenerator.generate()
val children = mutable. MutableList () ++ childGroups
val expression = GroupExpression (
id = id,
plan = plan,
children = children
)
groupExpressions += plan -> expression
expression
}
}
def getOrCreateGroup ( plan : LogicalPlan ) : Group = {
val exprGroup = getOrCreateGroupExpression(plan)
val group = parents.get(exprGroup.id) match {
case Some (group) =>
group.equivalents += exprGroup
group
case None =>
val id = groupIdGenerator.generate()
val equivalents = mutable. HashSet () + exprGroup
val group = Group (
id = id,
equivalents = equivalents
)
groups.put(id, group)
group
}
parents += exprGroup.id -> group
group
}
}
完全な実装については、Memo.scalaを参照してください
プランナー内の最初のステップは、初期化です
グラフlr
クエリ((クエリ))
AST((AST))
root_plan((rootplan))
root_group((rootgroup))
クエリ - "queryparser.parse(query)" - > ast
AST-「logicalplan.toplan(ast)」 - > root_plan
root_plan- "memo.getorcreategroup(rootplan)" - > root_group
まず、クエリはASTに解析されます。次に、 root planと呼ばれる論理計画に変換され、 root groupと呼ばれるroot planからグループを初期化します。
def initialize ( query : Statement )( implicit ctx : VolcanoPlannerContext ) : Unit = {
ctx.query = query
ctx.rootPlan = LogicalPlan .toPlan(ctx.query)
ctx.rootGroup = ctx.memo.getOrCreateGroup(ctx.rootPlan)
// assuming this is first the exploration round,
// by marking the initialRound(0) as explored,
// it will be easier to visualize the different between rounds (added nodes, add connections)
ctx.memo.groups.values.foreach(_.explorationMark.markExplored(initialRound))
ctx.memo.groupExpressions.values.foreach(_.explorationMark.markExplored(initialRound))
}詳細については、volcanoplanner.scalaを参照してください
たとえば、クエリ:
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . id初期化後、グループは次のようになります。
グラフTD
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
Expr#6->グループ#5
ここであなたはそれを見ることができます、すべてのグループにはちょうど1つの同等の式があります
初期化後、今が探求段階であり、可能なすべての同等の計画を探求します。
探索方法は非常に簡単です:
探索コードに飛び込む前に、最初に変換ルールについて話しましょう。
変換ルールは、ルール条件と一致している場合、論理計画を別の同等の論理計画に変換するために使用されるルールです。
これが変換ルールのインターフェースです:
trait TransformationRule {
def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean
def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression
}
論理計画は木のようなデータストラクチャであるため、変換ルールのmatch実装はツリーのパターンマッチングです。
たとえば、プロジェクトノードを一致させるために使用されるmatchは、結合とスキャンのみを含む子孫であるかどうかを確認します。
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}この計画は「一致」されています:
グラフTD
サブグラフグループ#2
expr#2 ["スキャン"]
終わり
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#1
expr#1 ["スキャン"]
終わり
サブグラフグループ#3
expr#3 ["スキャン"]
終わり
サブグラフグループ#6
expr#6 ["project"]
終わり
Expr#6->グループ#5
この計画はそうではありませんが、
グラフTD
サブグラフグループ#2
expr#2 ["スキャン"]
終わり
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#3
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["スキャン"]
終わり
サブグラフグループ#7
expr#7 ["Project"]
終わり
expr#7->グループ#6
サブグラフグループ#1
expr#1 ["スキャン"]
終わり
サブグラフグループ#3
expr#3 ["Project"]
終わり
expr#3->グループ#2
サブグラフグループ#6
expr#6 ["join"]
終わり
Expr#6->グループ#1
Expr#6->グループ#5
前にも言ったように、探索方法は次のとおりです。
そして、ここに探索コードがあります(非常に単純です、huh):
private def exploreGroup (
group : Group ,
rules : Seq [ TransformationRule ],
round : Int
)( implicit ctx : VolcanoPlannerContext ) : Unit = {
while ( ! group.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
// explore all child groups
group.equivalents.foreach { equivalent =>
if ( ! equivalent.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
equivalent.children.foreach { child =>
exploreGroup(child, rules, round)
if (equivalent.explorationMark.isExplored(round) && child.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
} else {
equivalent.explorationMark.markUnexplored(round)
}
}
}
// fire transformation rules to explore all the possible transformations
rules.foreach { rule =>
if ( ! equivalent.appliedTransformations.contains(rule) && rule.`match`(equivalent)) {
val transformed = rule.transform(equivalent)
if ( ! group.equivalents.contains(transformed)) {
group.equivalents += transformed
transformed.explorationMark.markUnexplored(round)
group.explorationMark.markUnexplored(round)
}
}
}
if (group.explorationMark.isExplored(round) && equivalent.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
} else {
group.explorationMark.markUnexplored(round)
}
}
}
}詳細については、volcanoplanner.scalaを参照してください
今度は、いくつかの変換ルールを実装する時が来ました
投影プッシュダウンは、プロジェクションをストレージ層まで押し下げるために使用される単純な変換ルールです。
たとえば、クエリ
SELECT field1, field2
from tbl計画があります
グラフlr
Project [ProjectField1、Field2]
スキャン[スキャンTBL]
プロジェクト - >スキャン
この計画では、実行すると、ストレージレイヤー(スキャン下)からの行が完全にフェッチされ、不要なフィールドがドロップされます(プロジェクト)。不要なデータはまだスキャンノードからプロジェクトノードに移動する必要があるため、ここにはいくつかの無駄な努力があります。
単に必要なフィールドを取得するだけでストレージレイヤーを伝えるだけで、より良くすることができます。これで、計画は次のように変わります。
グラフlr
Project [ProjectField1、Field2]
スキャン["スキャンTBL(Field1、Field2)"]]
プロジェクト - >スキャン
コードに行きましょう:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}ここでの予測プッシュダウンルールは、プロジェクトノードである場合に計画と一致し、その子孫はすべてスキャンしてノードのみに結合します。
注:実際、実際の投影プッシュダウンマッチはより複雑ですが、簡単にするためには、ここでのマッチルールは、スキャンを備えたプロジェクトノードです。
そして、これが変換コードです:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Project ]
val pushDownProjection = mutable. ListBuffer [ FieldID ]()
extractProjections(plan, pushDownProjection)
val newPlan = Project (plan.fields, pushDown(pushDownProjection.distinct, plan.child))
ctx.memo.getOrCreateGroupExpression(newPlan)
}
private def extractProjections ( node : LogicalPlan , buffer : mutable. ListBuffer [ FieldID ]) : Unit = {
node match {
case Scan (_, _) => () : Unit
case Project (fields, parent) =>
buffer ++= fields
extractProjections(parent, buffer)
case Join (left, right, on) =>
buffer ++= on.map(_._1) ++ on.map(_._2)
extractProjections(left, buffer)
extractProjections(right, buffer)
}
}
private def pushDown ( pushDownProjection : Seq [ FieldID ], node : LogicalPlan ) : LogicalPlan = {
node match {
case Scan (table, tableProjection) =>
val filteredPushDownProjection = pushDownProjection.filter(_.table == table).map(_.id)
val updatedProjection =
if (filteredPushDownProjection.contains( " * " ) || filteredPushDownProjection.contains( " *.* " )) {
Seq .empty
} else {
(tableProjection ++ filteredPushDownProjection).distinct
}
Scan (table, updatedProjection)
case Join (left, right, on) => Join (pushDown(pushDownProjection, left), pushDown(pushDownProjection, right), on)
case _ => throw new Exception ( " should never happen " )
}
}変換コードは、最初にルートプロジェクトノードからすべての投影を見つけ、次にその下のすべてのスキャンノードに押し下げます。
たとえば、私たちのルールを視覚化します
グラフTD
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
Expr#6->グループ#5
投影プッシュダウン変換を適用した後、投影を伴う新しい同等の計画がスキャン操作に押し下げられます(新しいプランはオレンジ色の境界ノードを備えたツリーです)。
グラフTD
サブグラフグループ#8
expr#8 ["スキャンtbl2(id、field1、field2)"]]
終わり
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#11
expr#11 ["join"]
終わり
Expr#11->グループ#7
Expr#11->グループ#10
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#7
expr#7 ["スキャンtbl1(id、field1)"]]
終わり
サブグラフグループ#10
expr#10 ["join"]
終わり
expr#10->グループ#8
expr#10->グループ#9
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#9
expr#9 ["スキャンtbl3(id、field2)"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#12 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
expr#12->グループ#11
Expr#6->グループ#5
スタイルexpr#12ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#8ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#10ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#9ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#11ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#7ストロークウィッド:4px、ストローク:オレンジ
LinkStyle 0ストローク幅:4PX、ストローク:オレンジ
LinkStyle 1ストローク幅:4PX、ストローク:オレンジ
LinkStyle 6ストローク幅:4PX、ストローク:オレンジ
LinkStyle 7ストローク幅:4PX、ストローク:オレンジ
LinkStyle 8ストローク幅:4px、ストローク:オレンジ
完全な実装については、projectionPushdown.scalaを参照してください
Join Reorderは、クエリプランナーの世界で最も有名な変革の1つでもあります。私たちのプランナーは、再注文変換ルールも実装します。
Reorderに参加することは、実装するのが簡単な作品ではありません。したがって、Join Reorderルールのシンプルでリッピングオフバージョンをここに実装します。
まず、ルールmatch 。
// check if the tree only contains SCAN and JOIN nodes, and also extract all SCAN nodes and JOIN conditions
private def checkAndExtract (
node : LogicalPlan ,
buffer : mutable. ListBuffer [ Scan ],
joinCondBuffer : mutable. ListBuffer [(ql. FieldID , ql. FieldID )]
) : Boolean = {
node match {
case node @ Scan (_, _) =>
buffer += node
true
case Join (left, right, on) =>
joinCondBuffer ++= on
checkAndExtract(left, buffer, joinCondBuffer) && checkAndExtract(right, buffer, joinCondBuffer)
case _ => false
}
}
private def buildInterchangeableJoinCond ( conditions : Seq [(ql. FieldID , ql. FieldID )]) : Seq [ Seq [ql. FieldID ]] = {
val buffer = mutable. ListBuffer [mutable. Set [ql. FieldID ]]()
conditions.foreach { cond =>
val set = buffer.find { set =>
set.contains(cond._1) || set.contains(cond._2)
} match {
case Some (set) => set
case None =>
val set = mutable. Set [ql. FieldID ]()
buffer += set
set
}
set += cond._1
set += cond._2
}
buffer.map(_.toSeq)
}
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case node @ Join (_, _, _) =>
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
if (checkAndExtract(node, buffer, joinCondBuffer)) {
// only match if the join is 3 tables join
if (buffer.size == 3 ) {
var check = true
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
interChangeableCond.foreach { c =>
check &= c.size == 3
}
check
} else {
false
}
} else {
false
}
case _ => false
}
}私たちのルールは、3方向の結合と一致する場合にのみ一致します(関係するテーブルの数は3でなければならず、結合条件はtbl1.field1 = tbl2.field2 = tbl3.field3などの3方向でなければなりません)
例えば、
tbl1
JOIN tbl2 ON tbl1 . field1 = tbl2 . field2
JOIN tbl3 ON tbl1 . field1 = tbl3 . field3 3方向の結合であるため、ここでの結合ステートメントは「一致」されます( tbl1 、 tbl2 、 tbl3の間の結合、条件はtbl1.field1 = tbl2.field2 = tbl3.field3です)
次に、変換コードは次のとおりです。
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Join ]
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
checkAndExtract(plan, buffer, joinCondBuffer)
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
//
val scans = buffer.toList
implicit val ord : Ordering [ Scan ] = new Ordering [ Scan ] {
override def compare ( x : Scan , y : Scan ) : Int = {
val xStats = ctx.statsProvider.tableStats(x.table.id)
val yStats = ctx.statsProvider.tableStats(y.table.id)
xStats.estimatedTableSize.compareTo(yStats.estimatedTableSize)
}
}
def getJoinCond ( left : Scan , right : Scan ) : Seq [(ql. FieldID , ql. FieldID )] = {
val leftFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == left.table)
}
val rightFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == right.table)
}
if (leftFields.length != rightFields.length) {
throw new Exception ( s " leftFields.length( ${leftFields.length} ) != rightFields.length( ${rightFields.length} ) " )
} else {
leftFields zip rightFields
}
}
val sorted = scans.sorted
val newPlan = Join (
sorted( 0 ),
Join (
sorted( 1 ),
sorted( 2 ),
getJoinCond(sorted( 1 ), sorted( 2 ))
),
getJoinCond(sorted( 0 ), sorted( 1 ))
)
ctx.memo.getOrCreateGroupExpression(newPlan)
}ここの変換コードは、推定サイズでテーブルを並べ替えます。
たとえば、推定サイズが300B、100B、200Bの3つの表A、B、Cがあり、JOINステートメントA JOIN B JOIN Cがある場合、 B JOIN C JOIN A
注:このコードでは、テーブル統計を利用して、計画を変革するためのヒントを提供することに気付くかもしれません。実際には、プランナーはあらゆる種類の統計を使用して、テーブルサイズ、行サイズ、ヌルカウント、ヒストグラムなどの変換を支援できます。
たとえば、私たちのルールを視覚化します
グラフTD
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
Expr#6->グループ#5
結合した後、再注文変換、結果として
グラフTD
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
expr#8 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
expr#8->グループ#2
expr#8->グループ#7
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#7
expr#7 ["join"]
終わり
expr#7->グループ#1
Expr#7->グループ#3
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
Expr#6->グループ#5
スタイルexpr#8ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#7ストロークウィッド:4px、ストローク:オレンジ
LinkStyle 2ストローク幅:4PX、ストローク:オレンジ
LinkStyle 6ストローク幅:4PX、ストローク:オレンジ
LinkStyle 3ストローク幅:4PX、ストローク:オレンジ
LinkStyle 7ストローク幅:4PX、ストローク:オレンジ
tbl2 JOIN tbl1 JOIN tbl3 tbl1 JOIN tbl2 JOIN tbl3変換によって生成されます(新しく追加されたノードとエッジはオレンジラインで示されています)
完全な実装については、X3TableJoInReOrderBysize.scalaを参照してください
これで、変換ルールを1か所に置くことができます
private val transformationRules : Seq [ Seq [ TransformationRule ]] = Seq (
Seq ( new ProjectionPushDown ),
Seq ( new X3TableJoinReorderBySize )
)それらを実行して、同等のグループを探索します
for (r <- transformationRules.indices) {
exploreGroup(ctx.rootGroup, transformationRules(r), r + 1 )
}たとえば、計画
グラフTD
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#6
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
Expr#6->グループ#5
探索された後、このグラフになります
グラフTD
サブグラフグループ#8
expr#8 ["スキャンtbl2(id、field1、field2)"]]
終わり
サブグラフグループ#11
expr#11 ["join"]
expr#14 ["join"]
終わり
Expr#11->グループ#7
Expr#11->グループ#10
Expr#14->グループ#8
Expr#14->グループ#12
サブグラフグループ#2
expr#2 ["スキャンtbl2"]
終わり
サブグラフグループ#5
expr#5 ["join"]
expr#16 ["join"]
終わり
expr#5->グループ#1
expr#5->グループ#4
Expr#16->グループ#2
Expr#16->グループ#13
サブグラフグループ#4
expr#4 ["join"]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#13
expr#15 ["join"]
終わり
expr#15->グループ#1
Expr#15->グループ#3
サブグラフグループ#7
expr#7 ["スキャンtbl1(id、field1)"]]
終わり
サブグラフグループ#1
expr#1 ["Scan TBL1"]
終わり
サブグラフグループ#10
expr#10 ["join"]
終わり
expr#10->グループ#8
expr#10->グループ#9
サブグラフグループ#9
expr#9 ["スキャンtbl3(id、field2)"]
終わり
サブグラフグループ#3
expr#3 ["スキャンTBL3"]
終わり
サブグラフグループ#12
expr#13 ["join"]
終わり
Expr#13->グループ#7
Expr#13->グループ#9
サブグラフグループ#6
Expr#12 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
Expr#6 ["Project TBL1.ID、TBL1.FIELD1、TBL2.ID、TBL2.FIELD1、TBL2.FIELD2、TBL3.ID、TBL3.FIELD2、TBL3.FIELD2"]]]]]
終わり
expr#12->グループ#11
Expr#6->グループ#5
スタイルexpr#12ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#8ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#10ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#13ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#14ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#11ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#9ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#15ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#7ストロークウィッド:4px、ストローク:オレンジ
スタイルexpr#16ストロークウィッド:4px、ストローク:オレンジ
LinkStyle 0ストローク幅:4PX、ストローク:オレンジ
LinkStyle 15ストローク幅:4PX、ストローク:オレンジ
LinkStyle 12ストローク幅:4PX、ストローク:オレンジ
LinkStyle 1ストローク幅:4PX、ストローク:オレンジ
LinkStyle 16ストローク幅:4PX、ストローク:オレンジ
LinkStyle 13ストローク幅:4PX、ストローク:オレンジ
LinkStyle 2ストローク幅:4PX、ストローク:オレンジ
LinkStyle 6ストローク幅:4PX、ストローク:オレンジ
LinkStyle 3ストローク幅:4PX、ストローク:オレンジ
LinkStyle 10ストローク幅:4PX、ストローク:オレンジ
LinkStyle 7ストローク幅:4PX、ストローク:オレンジ
LinkStyle 14ストローク幅:4PX、ストローク:オレンジ
LinkStyle 11ストローク幅:4PX、ストローク:オレンジ
詳細については、volcanoplanner.scalaを参照してください
探査段階の後、すべての可能な計画を含む完全に拡張されたツリーがあります。これが最適化段階です。
このフェーズでは、ルートグループに最適な計画があります。最適化プロセスは、次のように説明されています。
これが例です
グラフTD
サブグラフグループ#2 ["グループ#2(コスト= 1)"]]
expr#2 ["expr#2(cost = 1)"]
終わり
サブグラフグループ#5 ["グループ#5(コスト= 3)"]
expr#5 ["expr#5(cost = max(3,2)= 3"]
終わり
expr#5->グループ#1
expr#5->グループ#4
サブグラフグループ#4 ["グループ#4(コスト= 2)"]]
expr#4 ["expr#4(cost = max(1,2)= 2)"]]
expr#7 ["expr#7(cost = 1+2 = 3)"]]
終わり
expr#4->グループ#2
expr#4->グループ#3
サブグラフグループ#1 ["グループ#1(コスト= 3)"]]
expr#1 ["expr#1(cost = 3)"]
終わり
サブグラフグループ#3 ["グループ#3(コスト= 2)"]]
expr#3 ["expr#3(cost = 2)"]
終わり
サブグラフグループ#6 ["グループ#6(コスト= 4.5)"]]
expr#6 ["expr#6(cost = 3*1.5 = 4.5)"]
終わり
Expr#6->グループ#5
サブグラフグループ#8 ["グループ#8(cost = 1)"]]
expr#8 ["expr#8(cost = 1)"]
終わり
サブグラフグループ#9 ["グループ#9(コスト= 2)"]]
expr#9 ["expr#9(cost = 2)"]
終わり
Expr#7->グループ#8
expr#7->グループ#9
たとえば、 Expr#4コストは、 max関数を使用して、子グループコスト( Group#2およびGroup#3 )によって計算されます。別の例は、 Group#4で、そのコストは、同等の式のコスト間の最小値を計算することによって計算されます。
最適化段階の目標は、探索されたグループの表現を考慮して、最高の物理的計画を作成することです。物理計画を次のように定義できます。
sealed trait PhysicalPlan {
def operator () : Operator
def children () : Seq [ PhysicalPlan ]
def cost () : Cost
def estimations () : Estimations
def traits () : Set [ String ]
}
operatorは物理オペレーターであり、計画の実行に使用されていました。後のセクションで説明します。その後、 children児童計画ノードのリストであり、コスト計算のプロセスに参加することに慣れています。 3番目の属性はcostです。 costコスト情報を保持するオブジェクトです(CPUコスト、メモリコスト、IOコストなど)。 estimationsとは、計画に関する推定統計を保持するプロパティ(行数、行サイズなど)であり、コスト計算にも参加しています。最後に、 traits物理的な特性のセットであり、物理計画の生成プロセスに影響を与える実装ルールに影響します。
次に、物理ノードクラスを実装できます。
case class Scan (
operator : Operator ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq .empty // scan do not receive any child
}
case class Project (
operator : Operator ,
child : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (child)
}
case class Join (
operator : Operator ,
leftChild : PhysicalPlan ,
rightChild : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (leftChild, rightChild)
}
完全な実装については、PhysicalPlan.scalaを参照してください
最適化段階で最初に、つまり、実装ルールを実装する必要があります。実装規則は、論理計画から物理的な計画を実行せずに変換するルールです。
プランナーで物理計画を直接実行していないため、代わりに物理プランビルダーを返します。また、各ノードのコスト関数をカスタマイズする方が簡単です。
実装ルールのインターフェイスは次のとおりです。
trait PhysicalPlanBuilder {
def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ]
}
trait ImplementationRule {
def physicalPlanBuilders ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ]
}
ここでは、 PhysicalPlanBuilderビルダーは、子供の身体計画を考慮して、物理計画の構築に使用されるインターフェイスです。
たとえば、論理結合には2つの物理的実装があります。
グラフTD
チャイルド#1 ["Child#1"]
子供#2 ["Child#2"]
子供#3 ["Child#3"]
子供#4 ["child#4"]
hash_join ["` hash join
コスト= f(コスト(子#1)、コスト(子#2))
`"]
merge_join ["`マージJOIN
コスト= g(コスト(子#3)、コスト(子#4))
`"]
hash_join->子#1
hash_join --> child#2
merge_join --> child#3
merge_join --> child#4
the HASH JOIN cost is using function f() to calculate cost, and MERGE JOIN is using function g() to calculate cost, both are using its children as function parameters. So it's easier to code if we're returning just the phyiscal plan builder from the implementation rule instead of the physical plan.
As we've said before, the optimization process is described as following:
And here is the code:
private def implementGroup ( group : Group , combinedRule : ImplementationRule )(
implicit ctx : VolcanoPlannerContext
) : GroupImplementation = {
group.implementation match {
case Some (implementation) => implementation
case None =>
var bestImplementation = Option .empty[ GroupImplementation ]
group.equivalents.foreach { equivalent =>
val physicalPlanBuilders = combinedRule.physicalPlanBuilders(equivalent)
val childPhysicalPlans = equivalent.children.map { child =>
val childImplementation = implementGroup(child, combinedRule)
child.implementation = Option (childImplementation)
childImplementation.physicalPlan
}
// calculate the implementation, and update the best cost for group
physicalPlanBuilders.flatMap(_.build(childPhysicalPlans)).foreach { physicalPlan =>
val cost = physicalPlan.cost()
bestImplementation match {
case Some (currentBest) =>
if (ctx.costModel.isBetter(currentBest.cost, cost)) {
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
case None =>
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
}
}
bestImplementation.get
}
}This code is an exhaustive search code, which is using recursive function to traverse all nodes. At each node (group), the function is called once to get its best plan while also calculate the optimal cost of that group.
Finally, the best plan for our query is the best plan of the root group
val implementationRules = new ImplementationRule {
override def physicalPlanBuilders (
expression : GroupExpression
)( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
expression.plan match {
case node @ Scan (_, _) => implement. Scan (node)
case node @ Project (_, _) => implement. Project (node)
case node @ Join (_, _, _) => implement. Join (node)
}
}
}
ctx.rootGroup.implementation = Option (implementGroup(ctx.rootGroup, implementationRules))See VolcanoPlanner.scala for full implementation
Here is an example of the plan after optimization, it's shown the selected logical node, the selected physical operator, and the estimated cost
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Next, we will implement some implementation rules.
The first, easiest one is the implementation rule of logical PROJECT
object Project {
def apply ( node : logicalplan. Project )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
Seq (
new ProjectionImpl (node.fields)
)
}
}
class ProjectionImpl ( projection : Seq [ql. FieldID ]) extends PhysicalPlanBuilder {
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val child = children.head
val selfCost = Cost (
estimatedCpuCost = 0 ,
estimatedMemoryCost = 0 ,
estimatedTimeCost = 0
) // assuming the cost of projection is 0
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + child.cost().estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + child.cost().estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + child.cost().estimatedTimeCost
)
val estimations = Estimations (
estimatedLoopIterations = child.estimations().estimatedLoopIterations,
estimatedRowSize = child.estimations().estimatedRowSize // just guessing the value
)
Some (
Project (
operator = ProjectOperator (projection, child.operator()),
child = child,
cost = cost,
estimations = estimations,
traits = child.traits()
)
)
}
}
The implementation rule for logical PROJECT, is returning one physical plan builder ProjectionImpl . ProjectionImpl cost calculation is simple, it just inherits the cost from the child node (because the projection is actually not doing any intensive operation). Beside that, it also updates the estimation (in this code, estimation is also inherit from the child node)
See Project.scala for full implementation
Writing implementation rule for logical JOIN is way harder than PROJECTION.
One first reason is, a logical JOIN has many physical implementation, such as HASH JOIN, MERGE JOIN, BROADCAST JOIN, etc.
The second reason is, estimating cost for physical JOIN is also hard, because it depends on lots of factors such as row count, row size, data histogram, indexes, data layout, etc.
So, to keep everything simple in this guide, I will only implement 2 physical JOIN: HASH JOIN and MERGE JOIN. The cost estimation functions are fictional (just to show how it works, I'm not trying to correct it). And in the MERGE JOIN, all data is assuming to be sorted by join key.
Here is the code:
object Join {
def apply ( node : logicalplan. Join )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
val leftFields = node.on.map(_._1).map(f => s " ${f.table.id} . ${f.id} " )
val rightFields = node.on.map(_._2).map(f => s " ${f.table.id} . ${f.id} " )
Seq (
new HashJoinImpl (leftFields, rightFields),
new MergeJoinImpl (leftFields, rightFields)
)
}
}
The HASH JOIN:
class HashJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
private def viewSize ( plan : PhysicalPlan ) : Long = {
plan.estimations().estimatedLoopIterations * plan.estimations().estimatedRowSize
}
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
// reorder the child nodes, the left child is the child with smaller view size (smaller than the right child if we're store all of them in memory)
val (leftChild, rightChild) = if (viewSize(children( 0 )) < viewSize(children( 1 ))) {
(children( 0 ), children( 1 ))
} else {
(children( 1 ), children( 0 ))
}
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = HashJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = Set .empty // don't inherit trait from children since we're hash join
)
)
}
}
We can see that the cost function of HASH JOIN is composed of its children costs and estimations
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)Next, the MERGE JOIN:
class MergeJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val (leftChild, rightChild) = (children( 0 ), children( 1 ))
if (leftChild.traits().contains( " SORTED " ) && rightChild.traits().contains( " SORTED " )) {
val estimatedTotalRowCount =
leftChild.estimations().estimatedLoopIterations +
rightChild.estimations().estimatedLoopIterations
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = MergeJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = leftChild.traits() ++ rightChild.traits()
)
)
} else {
None
}
}
}
Same with HASH JOIN, MERGE JOIN also uses its children costs and estimations to calculate its cost, but with different formulla:
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)See HashJoinImpl.scala and MergeJoinImpl.scala for full implementation
You can see other rules and physical plan builders here:
Now, after done implementing the implementation rules, now we can find our best plan. Let's start over from the user query
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idwill be converted to the logical plan
graph TD
1326583549["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"];
-425111028["JOIN"];
-349388609["SCAN tbl1"];
1343755644["JOIN"];
-1043437086["SCAN tbl2"];
-1402686787["SCAN tbl3"];
1326583549 --> -425111028;
-425111028 --> -349388609;
-425111028 --> 1343755644;
1343755644 --> -1043437086;
1343755644 --> -1402686787;
After exploration phase, it will generate lots of equivalent plans
graph TD
subgraph Group#8
Expr#8["SCAN tbl2 (id, field1, field2)"]
終わり
subgraph Group#11
Expr#11["JOIN"]
Expr#14["JOIN"]
終わり
Expr#11 --> Group#7
Expr#11 --> Group#10
Expr#14 --> Group#8
Expr#14 --> Group#12
subgraph Group#2
Expr#2["SCAN tbl2"]
終わり
subgraph Group#5
Expr#5["JOIN"]
Expr#16["JOIN"]
終わり
Expr#5 --> Group#1
Expr#5 --> Group#4
Expr#16 --> Group#2
Expr#16 --> Group#13
subgraph Group#4
Expr#4["JOIN"]
終わり
Expr#4 --> Group#2
Expr#4 --> Group#3
subgraph Group#13
Expr#15["JOIN"]
終わり
Expr#15 --> Group#1
Expr#15 --> Group#3
subgraph Group#7
Expr#7["SCAN tbl1 (id, field1)"]
終わり
subgraph Group#1
Expr#1["SCAN tbl1"]
終わり
subgraph Group#10
Expr#10["JOIN"]
終わり
Expr#10 --> Group#8
Expr#10 --> Group#9
subgraph Group#9
Expr#9["SCAN tbl3 (id, field2)"]
終わり
subgraph Group#3
Expr#3["SCAN tbl3"]
終わり
subgraph Group#12
Expr#13["JOIN"]
終わり
Expr#13 --> Group#7
Expr#13 --> Group#9
subgraph Group#6
Expr#12["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
Expr#6["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
終わり
Expr#12 --> Group#11
Expr#6 --> Group#5
style Expr#12 stroke-width: 4px, stroke: orange
style Expr#8 stroke-width: 4px, stroke: orange
style Expr#10 stroke-width: 4px, stroke: orange
style Expr#13 stroke-width: 4px, stroke: orange
style Expr#14 stroke-width: 4px, stroke: orange
style Expr#11 stroke-width: 4px, stroke: orange
style Expr#9 stroke-width: 4px, stroke: orange
style Expr#15 stroke-width: 4px, stroke: orange
style Expr#7 stroke-width: 4px, stroke: orange
style Expr#16 stroke-width: 4px, stroke: orange
linkStyle 0 stroke-width: 4px, stroke: orange
linkStyle 15 stroke-width: 4px, stroke: orange
linkStyle 12 stroke-width: 4px, stroke: orange
linkStyle 1 stroke-width: 4px, stroke: orange
linkStyle 16 stroke-width: 4px, stroke: orange
linkStyle 13 stroke-width: 4px, stroke: orange
linkStyle 2 stroke-width: 4px, stroke: orange
linkStyle 6 stroke-width: 4px, stroke: orange
linkStyle 3 stroke-width: 4px, stroke: orange
linkStyle 10 stroke-width: 4px, stroke: orange
linkStyle 7 stroke-width: 4px, stroke: orange
linkStyle 14 stroke-width: 4px, stroke: orange
linkStyle 11 stroke-width: 4px, stroke: orange
And the at optimization phase, a final best plan is chose
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Now we've done building a functional query planner which can optimize the query from user, but our query plan could not run by itself. So it's the reason why now we will implement the query processor to test out our query plan.
Basically the query process receive input from the query planner, and execute them
graph LR
plan(("Physical Plan"))
storage[("Storage Layer")]
processor["Query Processor"]
plan -- execute --> processor
storage -- fetch --> processor
Volcano/iterator model is the query processing model that is widely used in many DBMS. It is a pipeline architecture, which means that the data is processed in stages, with each stage passing the output of the previous stage to the next stage.
Each stage in the pipeline is represented by an operator. Operators are functions that perform a specific operation on the data, such as selecting rows, filtering rows, or aggregating rows.
Usually, operator can be formed directly from the query plan.たとえば、クエリ
SELECT field_1
FROM tbl
WHERE field = 1will have the plan
graph TD
project["PROJECT: field_1"]
scan["SCAN: tbl"]
filter["FILTER: field = 1"]
project --> scan
filter --> project
will create a chain of operators like this:
scan = {
next() // fetch next row from table "tbl"
}
project = {
next() = {
next_row = scan.next() // fetch next row from scan operator
projected = next_row["field_1"]
return projected
}
}
filter = {
next() = {
next_row = {}
do {
next_row = project.next() // fetch next row from project operator
} while (next_row["field"] != 1)
return next_row
}
}
results = []
while (row = filter.next()) {
results.append(row)
}
notes : this pseudo code did not handle for end of result stream
The basic interface of an operator is described as following:
trait Operator {
def next () : Option [ Seq [ Any ]]
}
See Operator.scala for full implementation of all operators
Let's define a query
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idwith some data and stats
val table1 : Datasource = Datasource (
table = " emp " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" code " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id
),
rows = Seq (
Seq ( " 1 " , " Emp A " ),
Seq ( " 2 " , " Emp B " ),
Seq ( " 3 " , " Emp C " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " code " -> 32 )
)
)
val table2 : Datasource = Datasource (
table = " dept " ,
catalog = TableCatalog (
Seq (
" emp_id " -> classOf [ String ],
" dept_name " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by emp_id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " Dept 1 " ),
Seq ( " 1 " , " Dept 2 " ),
Seq ( " 2 " , " Dept 3 " ),
Seq ( " 3 " , " Dept 3 " )
),
stats = TableStats (
estimatedRowCount = 4 ,
avgColumnSize = Map ( " emp_id " -> 10 , " dept_name " -> 255 )
)
)
val table3 : Datasource = Datasource (
table = " emp_info " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" name " -> classOf [ String ],
" origin " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " AAAAA " , " Country A " ),
Seq ( " 2 " , " BBBBB " , " Country A " ),
Seq ( " 3 " , " CCCCC " , " Country B " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " name " -> 255 , " origin " -> 255 )
)
)The cost model is optimized for CPU
val costModel : CostModel = ( currentCost : Cost , newCost : Cost ) => {
currentCost.estimatedCpuCost > newCost.estimatedCpuCost
}Now, executing the query by running this code:
val planner = new VolcanoPlanner
QueryParser .parse(query) match {
case Left (err) => throw err
case Right (parsed) =>
val operator = planner.getPlan(parsed)
val result = Utils .execute(operator)
// print result
println(result._1.mkString( " , " ))
result._2.foreach(row => println(row.mkString( " , " )))
}it will print:
emp.id,emp.code,dept.dept_name,emp_info.name,emp_info.origin
1,Emp A,Dept 1,AAAAA,Country A
1,Emp A,Dept 2,AAAAA,Country A
2,Emp B,Dept 3,BBBBB,Country A
3,Emp C,Dept 3,CCCCC,Country B
Voila, We've done building a fully functional query planner and query engine :). You can start writing one for your own, good luck
See Demo.scala for full demo code
Thanks for reading this, this guide is quite long, and not fully correct, but I've tried my best to write it as understandably as possible ?