[Platzhalter]
Ein Abfrageplaner ist eine Komponente eines Datenbankverwaltungssystems (DBMS), das für die Generierung eines Plans zur Ausführung einer Datenbankabfrage verantwortlich ist. Der Abfrageplan gibt die Schritte an, die das DBMS ausführen wird, um die von der Abfrage angeforderten Daten abzurufen. Das Ziel des Abfragsplaners ist es, einen Plan zu generieren, der so effizient wie möglich ist, was bedeutet, dass die Daten den Benutzer so schnell wie möglich an den Benutzer zurückgibt.
Queryplaner sind komplexe Software -Teile und können schwer zu verstehen sein. In diesem Handbuch zur Implementierung eines kostenbasierten Abfragedanachters können Sie einen Schritt-für-Schritt-Überblick über den Prozess erhalten, wie Sie Ihren eigenen kostenbasierten Abfrageplaner implementieren und dennoch die grundlegenden Konzepte des Abfrageplaners abdecken.
Geschrieben von AI, herausgegeben von Mensch
Dieser Leitfaden ist geschrieben für:
Ziele:
Graph TD
Benutzer ((Benutzer))
Parser [Abfrage Parser]
Planer [Queryplaner]
Executor [Abfrageprozessor]
Benutzer -Textabfrage -> Parser
Parser -AST -> Planer
Planer -physischer Plan -> Vollstrecker
Die grundlegende Architektur eines Abfragemotors besteht aus diesen Komponenten:
Normalerweise sind Queryplaner in 2 Typen unterteilt:
Heuristischer Planer ist der Queryplaner, der vordefinierte Regeln verwendet hat, um Abfragedarsteller zu generieren.
Kostenbasierter Planer ist der Queryplaner, der auf den Kosten für die Erstellung von Abfragen basiert und versucht, den optimalen Plan basierend auf den Kosten der Eingabebestand zu ermitteln.
Während der heuristische Planer normalerweise den besten Plan durch Anwenden von Transformationsregeln findet, wenn er weiß, dass der transformierte Plan besser ist, findet der kostenbasierte Planer den besten Plan, indem er äquivalente Pläne aufzählte und versucht, den besten Plan unter ihnen zu finden.
In kostenbasierten Queryplaner besteht es normalerweise aus Phasen:
In der Phase der Planaufzählungen wird der Planer die möglichen äquivalenten Pläne aufzählen.
Danach sucht der Planer in der Phase der Abfrageoptimierung nach dem besten Plan aus der Liste der aufgezählten Pläne. Der beste Plan ist der Plan mit den niedrigsten Kosten, die das Kostenmodell (oder die Kostenfunktion) definiert ist.
Weil der natürliche logische Plan darin besteht, eine Baumstruktur zu haben, sodass Sie der Meinung sind, dass die Optimierung/Suche tatsächlich ein Problem der Baumsuche ist. Und hier draußen gibt es viele Baumsuchalgorithmen:
Anmerkungen: Theoretisch ist es möglich, jede Art von Baumsuchalgorithmus zu verwenden. Praktisch ist es jedoch nicht machbar, da die Suchzeit erhöht wird, wenn unser Suchalgorithmus komplex ist
Anmerkungen: Die Suchabschlusspräminierungsbedingungen sind normalerweise:
Vulcano Query Planer (oder Vulcano Optimizer Generator) ist ein kostenbasierter Queryplaner
Der Volcano Planer verwendet einen dynamischen Programmieransatz, um den besten Abfrageplan aus der Liste der aufgezählten Pläne zu finden.
Details: https://ieeexplore.ieee.org/document/344061 (Ich bin zu faul, um das Papier hier zu erklären)
Hier ist eine großartige Erklärung: https://15721.courses.cs.cmu.edu/spring2017/slides/15-optimizer2.pdf#page=9
Unser Queryplaner ist ein kostenbasierter Abfrageplaner, der der Grundidee des Vulkananfragsplaners unseres Planers aus 2 Hauptphasen bestehen wird:
Graph LR
AST ((AST))
logical_plan [Plan]
erforscht_plans ["`
Plan Nr. 1
...
Plan #N
`"]
Implementation_Plan ["Plan #x (Bester Plan)"]
AST -Konvertieren Sie in logischen Plan -> logical_plan
LOGICAL_PLAN -Exploration Phase -> Explored_plans
Explored_plans -Optimierungsphase -> Implementierung_Plan
LinkStyle 1,2 Farbe: Orange, Schlaganfall: Orange, Schlaganfall: 5px
Der logische Plan ist die Datenbetrag, die die Abstraktion des Transformationsschritts enthält, der zur Ausführung der Abfrage erforderlich ist.
Hier ist ein Beispiel für einen logischen Plan:
Graph TD
1 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"];
2 ["Join"];
3 ["Scan TBL1"];
4 ["Join"];
5 ["Scan TBL2"];
6 ["Scan TBL3"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Während der logische Plan nur die Abstraktion enthält, ist der physische Plan die Datenstruktur, die die Implementierungsdetails enthält. Jeder logische Plan hat mehrere physische Pläne. Zum Beispiel hat eine logische Verbindung viele physische Pläne wie Hash, Merge Join, Broadcast -Join usw.
Die äquivalente Gruppe ist eine Gruppe äquivalenter Ausdrücke (die für jeden Ausdruck logisch äquivalent ist)
z.B
Graph TD
Subgraph -Gruppe Nr. 8
Expr. 8 ["Scan TBL2 (Feld1, Feld2, ID)"]]
Ende
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph -Gruppe Nr. 11
Expr#11 ["Join"]
Ende
Expr#11 -> Gruppe Nr. 7
Expr#11 -> Gruppe#10
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 7
Expr. 7 ["Scan TBL1 (ID, Feld1)"]
Ende
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe#10
Expr#10 ["Join"]
Ende
Expr#10 -> Gruppe#8
Expr#10 -> Gruppe 9
Subgraph -Gruppe Nr. 9
Expr#9 ["Scan TBL3 (ID, Feld2)"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
Expr#12 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#12 -> Gruppe Nr. 11
Expr#6 -> Gruppe 5
Hier sehen wir, dass Group#6 2 äquivalente Ausdrücke aufweist, die beide dieselbe Abfrage darstellen (eine scan aus der Tabelle und dann projiziert, einer drückt die Projektion auf den Scan -Knoten nach unten).
Transformationsregel ist die Regel, um von einem logischen Plan zu einem anderen logischen äquivalenten logischen Plan zu transformieren
Zum Beispiel der Plan:
Graph TD
1 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"];
2 ["Join"];
3 ["Scan TBL1"];
4 ["Join"];
5 ["Scan TBL2"];
6 ["Scan TBL3"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Wenn die Projektions -Pushdown -Transformation angewendet wird, wird er transformiert in:
Graph TD
1 ["Projekt *. *"];
2 ["Join"];
3 ["scannen tbl1 (id, field1)"];
4 ["Join"];
5 ["Scan TBL2 (Field1, Feld2)"];
6 ["Scan TBL3 (ID, Feld2, Feld2)"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Die Transformationsregel kann durch logische Merkmale/Eigenschaften wie Tabellenschema, Datenstatistik usw. beeinflusst werden.
Die Implementierungsregel ist die Regel, um die physikalischen Pläne für logische Plane zurückzugeben.
Die Implementierungsregel kann durch physikalische Merkmale/Eigenschaften wie Datenlayout (sortiert oder nicht) usw. beeinflusst werden.
In der Explorationsphase wird der Planer Transformationsregeln anwendet und gleichwertige logische Pläne generiert
Zum Beispiel der Plan:
Graph TD
1326583549 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.field2, Tbl3field2"];
-425111028 ["Join"];
-349388609 ["Scan TBL1"];
1343755644 ["Join"];
-1043437086 ["Scan TBL2"];
-1402686787 ["Scan TBL3"];
1326583549 -> -425111028;
-425111028 -> -349388609;
-425111028 -> 1343755644;
1343755644 -> -1043437086;
1343755644 -> -1402686787;
Nach Anwendung von Transformationsregeln, was zu der folgenden Grafik führt:
Graph TD
Subgraph -Gruppe Nr. 8
Expr. 8 ["Scan TBL2 (ID, Feld1, Feld2)"]]
Ende
Subgraph -Gruppe Nr. 11
Expr#11 ["Join"]
Expr#14 ["Join"]
Ende
Expr#11 -> Gruppe Nr. 7
Expr#11 -> Gruppe#10
Expr#14 -> Gruppe#8
Expr#14 -> Gruppe#12
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Expr#16 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Expr#16 -> Gruppe 2
Expr#16 -> Gruppe Nr. 13
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 13
Expr#15 ["Join"]
Ende
Expr#15 -> Gruppe Nr. 1
Expr#15 -> Gruppe 3
Subgraph -Gruppe Nr. 7
Expr. 7 ["Scan TBL1 (ID, Feld1)"]
Ende
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe#10
Expr#10 ["Join"]
Ende
Expr#10 -> Gruppe#8
Expr#10 -> Gruppe 9
Subgraph -Gruppe Nr. 9
Expr#9 ["Scan TBL3 (ID, Feld2)"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph -Gruppe#12
Expr#13 ["Join"]
Ende
Expr#13 -> Gruppe Nr. 7
Expr#13 -> Gruppe 9
Subgraph Group#6
EXPR#12 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#12 -> Gruppe Nr. 11
Expr#6 -> Gruppe 5
Hier können wir sehen, dass die Regelung der Projektion Pushdown -Regel und der Verbindungsreformanlage angewendet wird.
Die Optimierungsphase besteht darin, den erweiterten Baum in der Explorationsphase zu durchqueren, um den besten Plan für unsere Anfrage zu finden.
Diese "eigentlich" ist die Optimierung der Baumsuche, sodass Sie jeden Baumsuchalgorithmus verwenden können, den Sie sich vorstellen können (aber Sie müssen sicherstellen, dass es richtig ist).
Hier ist das Beispiel des generierten physischen Planes nach der Optimierungsphase:
Graph TD
Gruppe 6 ["
Gruppe 6
Ausgewählt: Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2
Betreiber: Projektoperator
Kosten: Kosten (CPU = 641400.00, MEM = 1020400012.00, Zeit = 1000000.00)
"]]
Gruppe 6 -> Gruppe Nr. 11
Gruppe Nr. 11 ["
Gruppe Nr. 11
Ausgewählt: Join
Operator: Hashjoinoperator
Kosten: Kosten (CPU = 641400.00, MEM = 1020400012.00, Zeit = 1000000.00)
"]]
Gruppe Nr. 11 -> Gruppe Nr. 7
Gruppe Nr. 11 -> Gruppe#10
Gruppe 7 ["
Gruppe 7
Ausgewählt: Scan TBL1 (ID, Feld1)
Operator: Normalcanoperator
Kosten: Kosten (CPU = 400,00, MEM = 400000,00, Zeit = 1000,00)
"]]
Gruppe 10 ["
Gruppe 10
Ausgewählt: Join
Bediener: Mergejoinoperator
Merkmale: sortiert
Kosten: Kosten (CPU = 640000.00, MEM = 20000012.00, Zeit = 1100000,00)
"]]
Gruppe 10 -> Gruppe 8
Gruppe 10 -> Gruppe 9
Gruppe 8 ["
Gruppe 8
Ausgewählt: Scan TBL2 (ID, Feld1, Feld2)
Operator: Normalcanoperator
Merkmale: sortiert
Kosten: Kosten (CPU = 600000,00, MEM = 12,00, Zeit = 1000000.00)
"]]
Gruppe 9 ["
Gruppe 9
Ausgewählt: Scan TBL3 (ID, Feld2)
Operator: Normalcanoperator
Merkmale: sortiert
Kosten: Kosten (CPU = 40000.00, MEM = 20000000.00, Zeit = 100000,00)
"]]
Der generierte Plan hat den ausgewählten logischen Plan, die geschätzten Kosten und den physischen Bediener gezeigt
Unser Planer wird die Erschöpfungssuche durchführen, um den besten Plan zu finden
Da der Code des Planers groß ist, werde ich nicht Schritt-für-Schritt-Anleitung schreiben, aber ich werde stattdessen jedes Stück des Code erläutern
Hier definieren wir eine Abfragesprache, die dieses Tutorial gründlich verwendet hat
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idDie Abfragesprache, die wir implementieren werden, ist eine SQL-ähnliche Sprache. Aus Einfachheit halber werden wir jedoch seine Funktionalität und Syntax einschränken.
Die Sprache erschien in Form von
SELECT tbl . field , [...]
FROM tbl JOIN [...] Es unterstützt nur SELECT and JOIN , auch das Feld in der Auswahlanweisung muss vollständig qualifiziert sein (in Form von table.field ), alle anderen Funktionen werden nicht unterstützt
Zunächst müssen wir den AST für unsere Sprache definieren. AST (oder abstrakter Syntaxbaum) ist ein Baum, der zur Darstellung der syntaktischen Struktur eines Textes verwendet wird.
Da unsere Sprache so einfach ist, können wir die AST -Struktur in mehreren Codes einfach definieren:
sealed trait Identifier
case class TableID ( id : String ) extends Identifier
case class FieldID ( table : TableID , id : String ) extends Identifier
sealed trait Statement
case class Table ( table : TableID ) extends Statement
case class Join ( left : Statement , right : Statement , on : Seq [( FieldID , FieldID )]) extends Statement
case class Select ( fields : Seq [ FieldID ], from : Statement ) extends Statement
Zum Beispiel eine Frage
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idkann als dargestellt werden als
Select (
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ),
FieldID ( TableID ( " tbl1 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " id " ),
FieldID ( TableID ( " tbl2 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " id " ),
FieldID ( TableID ( " tbl3 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " field2 " )
),
Join (
Table ( TableID ( " tbl1 " )),
Join (
Table ( TableID ( " tbl2 " )),
Table ( TableID ( " tbl3 " )),
Seq (
FieldID ( TableID ( " tbl2 " ), " id " ) -> FieldID ( TableID ( " tbl3 " ), " id " )
)
),
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ) -> FieldID ( TableID ( " tbl2 " ), " id " )
)
)
)Nachdem wir die AST -Struktur definiert haben, müssen wir den Abfrage -Parser schreiben, mit dem die Textabfrage in AST -Form umgewandelt wird.
Da diese Anleitung Scala zur Implementierung verwendet, werden wir Scalaa-Parser-Kombinatoren auswählen, um unseren Abfrage-Parser zu erstellen.
Abfrage -Parser -Klasse:
object QueryParser extends ParserWithCtx [ QueryExecutionContext , Statement ] with RegexParsers {
override def parse ( in : String )( implicit ctx : QueryExecutionContext ) : Either [ Throwable , Statement ] = {
Try (parseAll(statement, in) match {
case Success (result, _) => Right (result)
case NoSuccess (msg, _) => Left ( new Exception (msg))
}) match {
case util. Failure (ex) => Left (ex)
case util. Success (value) => value
}
}
private def select : Parser [ Select ] = ??? // we will implement it in later section
private def statement : Parser [ Statement ] = select
}
Definieren Sie dann einige Parse -Regeln:
// common
private def str : Parser [ String ] = """ [a-zA-Z0-9_]+ """ .r
private def fqdnStr : Parser [ String ] = """ [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ """ .r
// identifier
private def tableId : Parser [ TableID ] = str ^^ (s => TableID (s))
private def fieldId : Parser [ FieldID ] = fqdnStr ^^ { s =>
val identifiers = s.split( '.' )
if (identifiers.length != 2 ) {
throw new Exception ( " should never happen " )
} else {
val table = identifiers.head
val field = identifiers( 1 )
FieldID ( TableID (table), field)
}
} Hier sind zwei Regeln, mit denen die Kennungen analysiert werden: TableID und FieldID .
Die Tabellen-ID (oder der Tabellenname) enthält normalerweise nur Zeichen, Zahlen und Unterstriche ( _ ), sodass wir einen einfachen Regex [a-zA-Z0-9_]+ verwenden, um den Tabellennamen zu identifizieren.
Andererseits ist die Feld-ID (für Feldqualifikationsmerkmale) in unserer Sprache vollständig qualifiziertes Feldname. Normalerweise befindet es sich in Form von table.field und Feldname enthält normalerweise auch nur Zeichen, Zahlen und Unterstriche, sodass wir den Regex [a-zA-Z0-9_]+.[a-zA-Z0-9_]+
Nachdem wir die Regeln für das Parsen der Kennungen definiert haben, können wir jetzt Regeln definieren, um die Anweisung der Abfrage zu analysieren:
// statement
private def table : Parser [ Table ] = tableId ^^ (t => Table (t))
private def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) " Die table ist eine einfache Regel, die nur Table erstellt wird, indem sie die analysierte TableID aus tableId -Regel verwendet.
Die subQuery ist die Regel, um den Unterbild zu analysieren. In SQL können wir eine Abfrage schreiben, die so aussieht:
SELECT a
FROM ( SELECT b FROM c) d Der SELECT b FROM c ist die Unterauszahlung in der obigen Anweisung. Hier, in unserer einfachen Abfragesprache, geben wir an, dass eine Aussage ein Unterbild ist, wenn sie von einem Paar Klammern ( () ) eingeschlossen ist. Da unsere Sprache nur ausgewählte Erklärung hat, können wir die Parse -Regel wie folgt schreiben:
def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) "Jetzt werden wir die Parse -Regeln für die Auswahlanweisung definieren:
private def fromSource : Parser [ Statement ] = table ||| subQuery
private def select : Parser [ Select ] =
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep(
" JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)
) ^^ {
case _ ~ fields ~ _ ~ src ~ joins =>
val p = if (joins.nonEmpty) {
def chain ( left : Statement , right : Seq [( Statement , Seq [( FieldID , FieldID )])]) : Join = {
if (right.isEmpty) {
throw new Exception ( " should never happen " )
} else if (right.length == 1 ) {
val next = right.head
Join (left, next._1, next._2)
} else {
val next = right.head
Join (left, chain(next._1, right.tail), next._2)
}
}
val temp = joins.map { join =>
val statement = join._1._1._2
val joinOn = join._2.map(on => on._1._1 -> on._2)
statement -> joinOn
}
chain(src, temp)
} else {
src
}
Select (fields, p)
}In SQL können wir einen Unterbild als Join-Quelle verwenden. Zum Beispiel:
SELECT * . *
FROM tbl1
JOIN ( SELECT * . * FROM tbl2)
JOIN tbl3Daher wird unser Parser auch Regeln implementieren, um den Unterbild im Join-Teil der Erklärung zu analysieren. Deshalb haben wir die Parse-Regel:
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep( " JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)In QueryParser.scala finden Sie die vollständige Implementierung
Siehe Queryparserspec.scala
Nachdem wir das AST aus der Textabfrage generieren, können wir es direkt in den logischen Plan konvertieren
Lassen Sie uns zunächst die Schnittstelle für unseren logischen Plan definieren:
sealed trait LogicalPlan {
def children () : Seq [ LogicalPlan ]
}
children sind die Liste des logischen Kinderplanes. Zum Beispiel:
Graph TD
1326583549 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.field2, Tbl3field2"];
-425111028 ["Join"];
-349388609 ["Scan TBL1"];
1343755644 ["Join"];
-1043437086 ["Scan TBL2"];
-1402686787 ["Scan TBL3"];
1326583549 -> -425111028;
-425111028 -> -349388609;
-425111028 -> 1343755644;
1343755644 -> -1043437086;
1343755644 -> -1402686787;
Die untergeordneten Knoten des PROJECT sind der erste JOIN -Knoten. Der erste JOIN -Knoten hat 2 Kinder, die der zweite JOIN -Knoten und SCAN tbl1 -Knoten sind. Bald, ...
Da unsere Abfragesprache einfach ist, benötigen wir nur 3 Arten logischer Knoten:
case class Scan ( table : ql. TableID , projection : Seq [ String ]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq .empty
}
case class Project ( fields : Seq [ql. FieldID ], child : LogicalPlan ) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (child)
}
case class Join ( left : LogicalPlan , right : LogicalPlan , on : Seq [(ql. FieldID , ql. FieldID )]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (left, right)
}
Dann können wir die Funktion schreiben, um den AST in einen logischen Plan umzuwandeln:
def toPlan ( node : ql. Statement ) : LogicalPlan = {
node match {
case ql. Table (table) => Scan (table, Seq .empty)
case ql. Join (left, right, on) => Join (toPlan(left), toPlan(right), on)
case ql. Select (fields, from) => Project (fields, toPlan(from))
}
}In logicalPlan.scala finden Sie die vollständige Implementierung
Wir können Klassen für Gruppen wie folgt definieren:
case class Group (
id : Long ,
equivalents : mutable. HashSet [ GroupExpression ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
var implementation : Option [ GroupImplementation ] = None
}
case class GroupExpression (
id : Long ,
plan : LogicalPlan ,
children : mutable. MutableList [ Group ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
val appliedTransformations : mutable. HashSet [ TransformationRule ] = mutable. HashSet ()
}
Group ist die Reihe von Plänen, die logisch äquivalent sind.
Jede GroupExpression repräsentiert einen logischen Planknoten. Da wir GroupExpression logischen Group GroupExpression Group
z.B
Graph TD
Subgraph -Gruppe Nr. 8
Expr#8
Ende
Subgraph -Gruppe Nr. 2
Expr#2
Ende
Subgraph -Gruppe Nr. 11
Expr#11
Ende
Expr#11 -> Gruppe Nr. 7
Expr#11 -> Gruppe#10
Subgraph Group#5
Expr#5
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 7
Expr#7
Ende
Subgraph -Gruppe Nr. 1
Expr#1
Ende
Subgraph -Gruppe#10
Expr#10
Ende
Expr#10 -> Gruppe#8
Expr#10 -> Gruppe 9
Subgraph -Gruppe Nr. 9
Expr#9
Ende
Subgraph -Gruppe Nr. 3
Expr#3
Ende
Subgraph Group#6
Expr#12
Expr#6
Ende
Expr#12 -> Gruppe Nr. 11
Expr#6 -> Gruppe 5
Wie wir hier sehen können, hat die Group#6 2 äquivalente Ausdrücke: Expr#12 und Expr#6 , und die Kinder von Expr#12 sind Group#11
Anmerkungen: Wir werden in der Explorationsphase mehrere runde Transformationen implementieren. Für jede Group und GroupExpression haben wir einen ExplorationMark -Anzeichen für den Explorationsstatus.
class ExplorationMark {
private var bits : Long = 0
def get : Long = bits
def isExplored ( round : Int ) : Boolean = BitUtils .getBit(bits, round)
def markExplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = true )
def markUnexplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = false )
}
ExplorationMark ist nur eine Bitset-Wrapper-Klasse. Es markiert I-Th Bit als 1, wenn I-Th-Runde untersucht wird, markieren Sie als 0 ansonsten.
ExplorationMark kann auch zur Visualisierung der genauen Transformation verwendet werden. Weitere Informationen finden Sie in der Visualisierung
Memo ist eine Reihe von Helfern, um die äquivalenten Gruppen zu konstruieren. Memo is besteht aus mehreren HashMap, um die Gruppen- und Gruppenexpression zwischenzuspeichern und auch Methoden zur Registrierung neuer Gruppen- oder Gruppenexpression bereitzustellen.
class Memo (
groupIdGenerator : Generator [ Long ] = new LongGenerator ,
groupExpressionIdGenerator : Generator [ Long ] = new LongGenerator
) {
val groups : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]()
val parents : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]() // lookup group from group expression ID
val groupExpressions : mutable. HashMap [ LogicalPlan , GroupExpression ] = mutable. HashMap [ LogicalPlan , GroupExpression ]()
def getOrCreateGroupExpression ( plan : LogicalPlan ) : GroupExpression = {
val children = plan.children()
val childGroups = children.map(child => getOrCreateGroup(child))
groupExpressions.get(plan) match {
case Some (found) => found
case None =>
val id = groupExpressionIdGenerator.generate()
val children = mutable. MutableList () ++ childGroups
val expression = GroupExpression (
id = id,
plan = plan,
children = children
)
groupExpressions += plan -> expression
expression
}
}
def getOrCreateGroup ( plan : LogicalPlan ) : Group = {
val exprGroup = getOrCreateGroupExpression(plan)
val group = parents.get(exprGroup.id) match {
case Some (group) =>
group.equivalents += exprGroup
group
case None =>
val id = groupIdGenerator.generate()
val equivalents = mutable. HashSet () + exprGroup
val group = Group (
id = id,
equivalents = equivalents
)
groups.put(id, group)
group
}
parents += exprGroup.id -> group
group
}
}
In Memo.scala finden Sie die vollständige Implementierung
Der erste Schritt im Planer ist die Initialisierung
Graph LR
Abfrage ((Abfrage))
AST ((AST))
root_plan ((RootPlan))
root_group ((Rootgroup))
Abfrage -"QueryParser.Parse (Abfrage)" -> ast
AST -"LOGICALPLAN.TOPLAN (AST)" -> root_plan
root_plan -"memo.getorCreateGroup (rootPlan)" -> root_group
Erstens wird die Anfrage in AST analysiert. Dann konvertiert in logischen Plan, als root plan bezeichnet, und initialisieren Sie dann die Gruppe aus root plan als root group .
def initialize ( query : Statement )( implicit ctx : VolcanoPlannerContext ) : Unit = {
ctx.query = query
ctx.rootPlan = LogicalPlan .toPlan(ctx.query)
ctx.rootGroup = ctx.memo.getOrCreateGroup(ctx.rootPlan)
// assuming this is first the exploration round,
// by marking the initialRound(0) as explored,
// it will be easier to visualize the different between rounds (added nodes, add connections)
ctx.memo.groups.values.foreach(_.explorationMark.markExplored(initialRound))
ctx.memo.groupExpressions.values.foreach(_.explorationMark.markExplored(initialRound))
}Weitere Informationen finden Sie unter vulcanoplanner.scala
Zum Beispiel die Abfrage:
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idNach der Initialisierung werden die Gruppen so aussehen:
Graph TD
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#6 -> Gruppe 5
Hier können Sie sehen, dass jede Gruppe genau einen äquivalenten Ausdruck hat
Nach der Initialisierung befindet sich nun die Explorationsphase, in der alle möglichen gleichwertigen Pläne untersucht werden.
Die Erkundungsmethode ist recht einfach:
Bevor wir uns mit dem Erkundungscode eintauchen, sprechen wir zuerst über die Transformationsregel.
Transformationsregel ist eine Regel, mit der ein logischer Plan in einen anderen äquivalenten logischen Plan umgewandelt wird, wenn er mit der Regelbedingung übereinstimmt.
Hier ist die Schnittstelle der Transformationsregel:
trait TransformationRule {
def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean
def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression
}
Da der logische Plan eine Baum-ähnliche Datenbetrag ist, ist die match der Transformationsregeln die Übereinstimmung mit der Übereinstimmung auf dem Baum.
Hier ist beispielsweise die match , mit der der Projektknoten übereinstimmt und gleichzeitig prüft, ob Nachkommen nur Join und Scan enthalten:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}Dieser Plan ist "Matched":
Graph TD
Subgraph -Gruppe Nr. 2
Expr. 2 ["Scan"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan"]
Ende
Subgraph Group#6
Expr#6 ["Projekt"]
Ende
Expr#6 -> Gruppe 5
Während dieser Plan nicht ist:
Graph TD
Subgraph -Gruppe Nr. 2
Expr. 2 ["Scan"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe Nr. 3
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Scan"]
Ende
Subgraph -Gruppe Nr. 7
Expr#7 ["Projekt"]
Ende
Expr#7 -> Gruppe 6
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Projekt"]
Ende
Expr#3 -> Gruppe 2
Subgraph Group#6
Expr#6 ["Join"]
Ende
Expr#6 -> Gruppe Nr. 1
Expr#6 -> Gruppe 5
Wie wir bereits gesagt haben, lautet die Erkundungsmethode:
Und hier ist Explorationscode (ganz einfach, huh):
private def exploreGroup (
group : Group ,
rules : Seq [ TransformationRule ],
round : Int
)( implicit ctx : VolcanoPlannerContext ) : Unit = {
while ( ! group.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
// explore all child groups
group.equivalents.foreach { equivalent =>
if ( ! equivalent.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
equivalent.children.foreach { child =>
exploreGroup(child, rules, round)
if (equivalent.explorationMark.isExplored(round) && child.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
} else {
equivalent.explorationMark.markUnexplored(round)
}
}
}
// fire transformation rules to explore all the possible transformations
rules.foreach { rule =>
if ( ! equivalent.appliedTransformations.contains(rule) && rule.`match`(equivalent)) {
val transformed = rule.transform(equivalent)
if ( ! group.equivalents.contains(transformed)) {
group.equivalents += transformed
transformed.explorationMark.markUnexplored(round)
group.explorationMark.markUnexplored(round)
}
}
}
if (group.explorationMark.isExplored(round) && equivalent.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
} else {
group.explorationMark.markUnexplored(round)
}
}
}
}Weitere Informationen finden Sie unter vulcanoplanner.scala
Jetzt ist es Zeit, einige Transformationsregeln zu implementieren
Der Projection Pushdown ist eine einfache Transformationsregel, mit der die Projektion auf die Speicherschicht gedrückt wird.
Zum Beispiel die Abfrage
SELECT field1, field2
from tblhat den Plan
Graph LR
Projekt [Projektfeld1, Feld2]
Scan [Scan TBL] scannen]
Projekt -> Scan
Bei diesem Plan werden bei der Ausführung Zeilen aus der Speicherschicht (unter Scan) vollständig abgerufen und dann unnötige Felder fallen (Projekt). Die unnötigen Daten müssen sich immer noch vom Scan -Knoten zum Projektknoten wechseln, sodass hier einige verschwendete Anstrengungen vorhanden sind.
Wir können es besser machen, indem wir einfach die Speicherschicht nur sagen, nur die erforderlichen Felder abzurufen. Jetzt wird der Plan zu:
Graph LR
Projekt [Projektfeld1, Feld2]
Scan ["Scan TBL (Field1, Field2)"]
Projekt -> Scan
Gehen wir in den Code:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}Unsere Projektions -Pushdown -Regel hier wird mit dem Plan übereinstimmen, wenn es sich um den Projektknoten handelt, und alle Nachkommen werden nur Scan und Verbinden des Knotens.
Anmerkungen: Tatsächlich ist die reale Projection -Pushdown -Match komplexer, aber der Einfachheit halber ist die Match -Regel hier nur der Projektknoten mit Scan und Join -Nachkommen
Und hier ist der Transformationscode:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Project ]
val pushDownProjection = mutable. ListBuffer [ FieldID ]()
extractProjections(plan, pushDownProjection)
val newPlan = Project (plan.fields, pushDown(pushDownProjection.distinct, plan.child))
ctx.memo.getOrCreateGroupExpression(newPlan)
}
private def extractProjections ( node : LogicalPlan , buffer : mutable. ListBuffer [ FieldID ]) : Unit = {
node match {
case Scan (_, _) => () : Unit
case Project (fields, parent) =>
buffer ++= fields
extractProjections(parent, buffer)
case Join (left, right, on) =>
buffer ++= on.map(_._1) ++ on.map(_._2)
extractProjections(left, buffer)
extractProjections(right, buffer)
}
}
private def pushDown ( pushDownProjection : Seq [ FieldID ], node : LogicalPlan ) : LogicalPlan = {
node match {
case Scan (table, tableProjection) =>
val filteredPushDownProjection = pushDownProjection.filter(_.table == table).map(_.id)
val updatedProjection =
if (filteredPushDownProjection.contains( " * " ) || filteredPushDownProjection.contains( " *.* " )) {
Seq .empty
} else {
(tableProjection ++ filteredPushDownProjection).distinct
}
Scan (table, updatedProjection)
case Join (left, right, on) => Join (pushDown(pushDownProjection, left), pushDown(pushDownProjection, right), on)
case _ => throw new Exception ( " should never happen " )
}
}Der Transformationscode findet zuerst alle Projektionen aus dem Root -Projektknoten und drückt sie dann auf alle Scanknoten darunter.
Visualisieren Sie unsere Regel beispielsweise den Plan
Graph TD
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#6 -> Gruppe 5
Nach der Anwendung der Projection -Pushdown -Transformation führt zu einem neuen äquivalenten Plan mit den Projektionen, die auf die Scanoperationen gedrückt werden (der neue Plan ist der Baum mit orangefarbenen Grenzknoten).
Graph TD
Subgraph -Gruppe Nr. 8
Expr. 8 ["Scan TBL2 (ID, Feld1, Feld2)"]]
Ende
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph -Gruppe Nr. 11
Expr#11 ["Join"]
Ende
Expr#11 -> Gruppe Nr. 7
Expr#11 -> Gruppe#10
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 7
Expr. 7 ["Scan TBL1 (ID, Feld1)"]
Ende
Subgraph -Gruppe#10
Expr#10 ["Join"]
Ende
Expr#10 -> Gruppe#8
Expr#10 -> Gruppe 9
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe Nr. 9
Expr#9 ["Scan TBL3 (ID, Feld2)"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
EXPR#12 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#12 -> Gruppe Nr. 11
Expr#6 -> Gruppe 5
Style Expr#12 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#8 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#10 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#9 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#11 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#7 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 0 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 1 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 6 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 7 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 8 Hub-Width: 4px, Schlaganfall: Orange
In ProjectionPushdown.scala finden Sie die vollständige Implementierung
Die Zusammenordnung ist auch eine der bekanntesten Transformationen in der Welt des Queryplaners. Unser Planer wird auch eine Reformationsregel implementieren.
Da der Beitritt zu Real World in Real World kein leicht zu implementieren ist. Wir werden hier eine einfache, Abzocke-Version der Join-Reordnungsregel implementieren.
Erstens die match :
// check if the tree only contains SCAN and JOIN nodes, and also extract all SCAN nodes and JOIN conditions
private def checkAndExtract (
node : LogicalPlan ,
buffer : mutable. ListBuffer [ Scan ],
joinCondBuffer : mutable. ListBuffer [(ql. FieldID , ql. FieldID )]
) : Boolean = {
node match {
case node @ Scan (_, _) =>
buffer += node
true
case Join (left, right, on) =>
joinCondBuffer ++= on
checkAndExtract(left, buffer, joinCondBuffer) && checkAndExtract(right, buffer, joinCondBuffer)
case _ => false
}
}
private def buildInterchangeableJoinCond ( conditions : Seq [(ql. FieldID , ql. FieldID )]) : Seq [ Seq [ql. FieldID ]] = {
val buffer = mutable. ListBuffer [mutable. Set [ql. FieldID ]]()
conditions.foreach { cond =>
val set = buffer.find { set =>
set.contains(cond._1) || set.contains(cond._2)
} match {
case Some (set) => set
case None =>
val set = mutable. Set [ql. FieldID ]()
buffer += set
set
}
set += cond._1
set += cond._2
}
buffer.map(_.toSeq)
}
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case node @ Join (_, _, _) =>
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
if (checkAndExtract(node, buffer, joinCondBuffer)) {
// only match if the join is 3 tables join
if (buffer.size == 3 ) {
var check = true
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
interChangeableCond.foreach { c =>
check &= c.size == 3
}
check
} else {
false
}
} else {
false
}
case _ => false
}
} Unsere Regel wird nur abgestimmt, wenn wir mit der 3-Wege-Verbindung übereinstimmen (die Anzahl der beteiligten Tabellen muss 3 sein, und die Join-Bedingung muss 3-Wege sein, z tbl1.field1 = tbl2.field2 = tbl3.field3
Zum Beispiel,
tbl1
JOIN tbl2 ON tbl1 . field1 = tbl2 . field2
JOIN tbl3 ON tbl1 . field1 = tbl3 . field3 Die Join-Erklärung hier wird "Match", da es 3-Wege-Join ist (es ist die Verbindung zwischen tbl1 , tbl2 , tbl3 und der Zustand ist tbl1.field1 = tbl2.field2 = tbl3.field3 )
Als nächstes ist der Transformationscode:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Join ]
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
checkAndExtract(plan, buffer, joinCondBuffer)
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
//
val scans = buffer.toList
implicit val ord : Ordering [ Scan ] = new Ordering [ Scan ] {
override def compare ( x : Scan , y : Scan ) : Int = {
val xStats = ctx.statsProvider.tableStats(x.table.id)
val yStats = ctx.statsProvider.tableStats(y.table.id)
xStats.estimatedTableSize.compareTo(yStats.estimatedTableSize)
}
}
def getJoinCond ( left : Scan , right : Scan ) : Seq [(ql. FieldID , ql. FieldID )] = {
val leftFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == left.table)
}
val rightFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == right.table)
}
if (leftFields.length != rightFields.length) {
throw new Exception ( s " leftFields.length( ${leftFields.length} ) != rightFields.length( ${rightFields.length} ) " )
} else {
leftFields zip rightFields
}
}
val sorted = scans.sorted
val newPlan = Join (
sorted( 0 ),
Join (
sorted( 1 ),
sorted( 2 ),
getJoinCond(sorted( 1 ), sorted( 2 ))
),
getJoinCond(sorted( 0 ), sorted( 1 ))
)
ctx.memo.getOrCreateGroupExpression(newPlan)
}Der Transformationscode hier wird die Tabellen nach seiner geschätzten Größe neu ordnen.
Wenn wir beispielsweise 3 Tabellen A, B, C mit einer geschätzten Größe von 300B, 100B, 200b und einer Join -Anweisung A JOIN B JOIN C haben, wird es in B JOIN C JOIN A umgewandelt.
Hinweise: Sie können in diesem Code feststellen, dass wir Tabellenstatistiken verwendet haben, um einen Hinweis zur Transformation des Plans zu geben. In der Praktikum kann der Planer alle Arten von Statistiken verwenden, um seine Transformation wie Tabellengröße, Zeilengröße, Nullzahl, Histogramm usw. zu unterstützen.
Visualisieren Sie unsere Regel beispielsweise den Plan
Graph TD
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#6 -> Gruppe 5
Nach der Transformation von Join -Neubestehen führt dazu
Graph TD
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Expr#8 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Expr#8 -> Gruppe 2
Expr#8 -> Gruppe Nr. 7
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 7
Expr#7 ["Join"]
Ende
Expr#7 -> Gruppe Nr. 1
Expr#7 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#6 -> Gruppe 5
Style Expr#8 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#7 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 2 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 6 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 3 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 7 Hub-Width: 4px, Schlaganfall: Orange
Wir können sehen, dass tbl2 JOIN tbl1 JOIN tbl3 aus tbl1 JOIN tbl2 JOIN tbl3 erstellt wird (die neu hinzugefügten Knoten und Kanten werden durch orangefarbene Linien angezeigt)
Siehe x3TablejoinreorderBySize.scala für die vollständige Implementierung
Jetzt können wir unsere Transformationsregeln an einem Ort setzen
private val transformationRules : Seq [ Seq [ TransformationRule ]] = Seq (
Seq ( new ProjectionPushDown ),
Seq ( new X3TableJoinReorderBySize )
)Und führen Sie sie aus, um die äquivalenten Gruppen zu erkunden
for (r <- transformationRules.indices) {
exploreGroup(ctx.rootGroup, transformationRules(r), r + 1 )
}Zum Beispiel der Plan
Graph TD
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph Group#6
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#6 -> Gruppe 5
Nach der Erforschung führt zu diesem Diagramm
Graph TD
Subgraph -Gruppe Nr. 8
Expr. 8 ["Scan TBL2 (ID, Feld1, Feld2)"]]
Ende
Subgraph -Gruppe Nr. 11
Expr#11 ["Join"]
Expr#14 ["Join"]
Ende
Expr#11 -> Gruppe Nr. 7
Expr#11 -> Gruppe#10
Expr#14 -> Gruppe#8
Expr#14 -> Gruppe#12
Subgraph -Gruppe Nr. 2
Expr#2 ["Scan TBL2"]
Ende
Subgraph Group#5
Expr#5 ["Join"]
Expr#16 ["Join"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Expr#16 -> Gruppe 2
Expr#16 -> Gruppe Nr. 13
Subgraph -Gruppe Nr. 4
Expr#4 ["Join"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph -Gruppe Nr. 13
Expr#15 ["Join"]
Ende
Expr#15 -> Gruppe Nr. 1
Expr#15 -> Gruppe 3
Subgraph -Gruppe Nr. 7
Expr. 7 ["Scan TBL1 (ID, Feld1)"]
Ende
Subgraph -Gruppe Nr. 1
Expr. 1 ["Scan TBL1"]
Ende
Subgraph -Gruppe#10
Expr#10 ["Join"]
Ende
Expr#10 -> Gruppe#8
Expr#10 -> Gruppe 9
Subgraph -Gruppe Nr. 9
Expr#9 ["Scan TBL3 (ID, Feld2)"]
Ende
Subgraph -Gruppe Nr. 3
Expr#3 ["Scan TBL3"]
Ende
Subgraph -Gruppe#12
Expr#13 ["Join"]
Ende
Expr#13 -> Gruppe Nr. 7
Expr#13 -> Gruppe 9
Subgraph Group#6
EXPR#12 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
EXPR#6 ["Projekt TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Ende
Expr#12 -> Gruppe Nr. 11
Expr#6 -> Gruppe 5
Style Expr#12 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#8 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#10 Hub-Width: 4px, Schlaganfall: Orange
Stilex#13 Schlaganfall: 4px, Schlaganfall: Orange
Stilex#14 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#11 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#9 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#15 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#7 Hub-Width: 4px, Schlaganfall: Orange
Style Expr#16 Schlaganfall: 4px, Schlaganfall: Orange
LinkStyle 0 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 15 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 12 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 1 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 16 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 13 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 2 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 6 Strichbreite: 4px, Schlaganfall: Orange
LinkStyle 3 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 10 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 7 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 14 Hub-Width: 4px, Schlaganfall: Orange
LinkStyle 11 Hub-Width: 4px, Schlaganfall: Orange
Weitere Informationen finden Sie unter vulcanoplanner.scala
Nach der Explorationsphase haben wir jetzt einen vollständig erweiterten Baum mit allen möglichen Plänen, jetzt die Optimierungsphase.
In dieser Phase finden wir den besten Plan für unsere Wurzelgruppe. Der Optimierungsprozess wird als folgt beschrieben:
Hier ist ein Beispiel
Graph TD
Subgraph Group#2 ["Gruppe 2 (Kosten = 1)"]
Expr#2 ["Expr#2 (cost = 1)"]
Ende
Subgraph Group#5 ["Gruppe 5 (Kosten = 3)"]
Expr#5 ["expr#5 (cost = max (3,2) = 3"]
Ende
Expr#5 -> Gruppe 1
Expr#5 -> Gruppe 4
Subgraph Group#4 ["Gruppe 4 (Kosten = 2)"]
Expr#4 ["expr#4 (cost = max (1,2) = 2)"]
Expr#7 ["expr#7 (cost = 1+2 = 3)"]
Ende
Expr#4 -> Gruppe 2
Expr#4 -> Gruppe Nr. 3
Subgraph Group Nr. 1 ["Gruppe 1 (Kosten = 3)"]
Expr#1 ["Expr#1 (cost = 3)"]
Ende
Subgraph Group#3 ["Gruppe Nr. 3 (Kosten = 2)"]
Expr#3 ["Expr#3 (Kosten = 2)"]
Ende
Subgraph Group#6 ["Gruppe 6 (Kosten = 4,5)"]
Expr#6 ["Expr#6 (Kosten = 3*1,5 = 4,5)"]
Ende
Expr#6 -> Gruppe 5
Subgraph Group#8 ["Gruppe 8 (Kosten = 1)"]
Expr#8 ["expr#8 (cost = 1)"]
Ende
Subgraph Group Nr. 9 ["Gruppe 9 (Kosten = 2)"]
Expr#9 ["Expr#9 (cost = 2)"]
Ende
Expr#7 -> Gruppe#8
Expr#7 -> Gruppe Nr. 9
Beispielsweise werden die Kosten der Expr#4 mit den Kosten für Kindergruppen ( Group#2 und Group#3 ) unter Verwendung max -Funktion berechnet. Ein weiteres Beispiel ist die Group#4 , ihre Kosten werden berechnet, indem der min -Wert zwischen den Kosten ihrer äquivalenten Ausdrücke berechnet wird.
Da das Ziel der Optimierungsphase darin besteht, angesichts der untersuchten Gruppenausdrücke den besten physischen Plan zu erstellen. Wir können den physischen Plan wie folgt definieren:
sealed trait PhysicalPlan {
def operator () : Operator
def children () : Seq [ PhysicalPlan ]
def cost () : Cost
def estimations () : Estimations
def traits () : Set [ String ]
}
Der operator ist der physische Bediener, der früher den Plan ausgeführt hat, und wir werden ihn im späteren Abschnitt abdecken. Dann ist children die Liste der Kinderplanknoten, die es gewohnt sind, am Prozess der Kostenberechnung teilzunehmen. Das dritte Attribut sind cost , cost sind ein Objekt, das die Kosteninformationen zur Haltung von Kosten halten (z. B. CPU -Kosten, Speicherkosten, IO -Kosten usw.). estimations sind die geschätzte Statistiken über den Plan (z. B. Zeilenzahl, Reihengröße usw.), die auch an der Kostenberechnung beteiligt ist. Schließlich sind traits eine Reihe von physischen Merkmalen, die sich auf die Implementierungsregel auswirken, um den Prozess der physischen Planerzeugung zu beeinflussen.
Als nächstes können wir die Klassen der physischen Knoten implementieren:
case class Scan (
operator : Operator ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq .empty // scan do not receive any child
}
case class Project (
operator : Operator ,
child : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (child)
}
case class Join (
operator : Operator ,
leftChild : PhysicalPlan ,
rightChild : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (leftChild, rightChild)
}
Siehe Physicalplan.scala für die vollständige Implementierung
Das erste, was in der Optimierungsphase, das heißt, wir müssen die Implementierungsregeln implementieren. Die Implementierungsregel ist die Regel, um vom logischen Plan in physische Pläne zu konvertieren, ohne sie auszuführen.
Da wir den physischen Plan im Planer nicht direkt ausführen, werden wir stattdessen den physischen Planbuilder zurückgeben. Außerdem ist es einfacher, die Kostenfunktion für jeden Knoten anzupassen.
Hier ist die Schnittstelle der Implementierungsregel:
trait PhysicalPlanBuilder {
def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ]
}
trait ImplementationRule {
def physicalPlanBuilders ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ]
}
Hier ist der PhysicalPlanBuilder die Schnittstelle, mit der der physische Plan angesichts der körperlichen Pläne für Kinder verwendet wird.
Zum Beispiel hat der logische Join zwei physikalische Implementierungen, die Join und Merge Join sind
Graph TD
Kind#1 ["Kind#1"]
Kind#2 ["Kind#2"]
Kind#3 ["Kind#3"]
Kind#4 ["Kind#4"]
Hash_Join ["` Hash Join
Kosten = F (Kosten (Kind#1), Kosten (Kind#2))
`"]
merge_join ["` Merge Join
Kosten = g (Kosten (Kind#3), Kosten (Kind#4))
`"]
hash_join --> child#1
hash_join --> child#2
merge_join --> child#3
merge_join --> child#4
the HASH JOIN cost is using function f() to calculate cost, and MERGE JOIN is using function g() to calculate cost, both are using its children as function parameters. So it's easier to code if we're returning just the phyiscal plan builder from the implementation rule instead of the physical plan.
As we've said before, the optimization process is described as following:
And here is the code:
private def implementGroup ( group : Group , combinedRule : ImplementationRule )(
implicit ctx : VolcanoPlannerContext
) : GroupImplementation = {
group.implementation match {
case Some (implementation) => implementation
case None =>
var bestImplementation = Option .empty[ GroupImplementation ]
group.equivalents.foreach { equivalent =>
val physicalPlanBuilders = combinedRule.physicalPlanBuilders(equivalent)
val childPhysicalPlans = equivalent.children.map { child =>
val childImplementation = implementGroup(child, combinedRule)
child.implementation = Option (childImplementation)
childImplementation.physicalPlan
}
// calculate the implementation, and update the best cost for group
physicalPlanBuilders.flatMap(_.build(childPhysicalPlans)).foreach { physicalPlan =>
val cost = physicalPlan.cost()
bestImplementation match {
case Some (currentBest) =>
if (ctx.costModel.isBetter(currentBest.cost, cost)) {
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
case None =>
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
}
}
bestImplementation.get
}
}This code is an exhaustive search code, which is using recursive function to traverse all nodes. At each node (group), the function is called once to get its best plan while also calculate the optimal cost of that group.
Finally, the best plan for our query is the best plan of the root group
val implementationRules = new ImplementationRule {
override def physicalPlanBuilders (
expression : GroupExpression
)( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
expression.plan match {
case node @ Scan (_, _) => implement. Scan (node)
case node @ Project (_, _) => implement. Project (node)
case node @ Join (_, _, _) => implement. Join (node)
}
}
}
ctx.rootGroup.implementation = Option (implementGroup(ctx.rootGroup, implementationRules))See VolcanoPlanner.scala for full implementation
Here is an example of the plan after optimization, it's shown the selected logical node, the selected physical operator, and the estimated cost
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Next, we will implement some implementation rules.
The first, easiest one is the implementation rule of logical PROJECT
object Project {
def apply ( node : logicalplan. Project )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
Seq (
new ProjectionImpl (node.fields)
)
}
}
class ProjectionImpl ( projection : Seq [ql. FieldID ]) extends PhysicalPlanBuilder {
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val child = children.head
val selfCost = Cost (
estimatedCpuCost = 0 ,
estimatedMemoryCost = 0 ,
estimatedTimeCost = 0
) // assuming the cost of projection is 0
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + child.cost().estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + child.cost().estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + child.cost().estimatedTimeCost
)
val estimations = Estimations (
estimatedLoopIterations = child.estimations().estimatedLoopIterations,
estimatedRowSize = child.estimations().estimatedRowSize // just guessing the value
)
Some (
Project (
operator = ProjectOperator (projection, child.operator()),
child = child,
cost = cost,
estimations = estimations,
traits = child.traits()
)
)
}
}
The implementation rule for logical PROJECT, is returning one physical plan builder ProjectionImpl . ProjectionImpl cost calculation is simple, it just inherits the cost from the child node (because the projection is actually not doing any intensive operation). Beside that, it also updates the estimation (in this code, estimation is also inherit from the child node)
See Project.scala for full implementation
Writing implementation rule for logical JOIN is way harder than PROJECTION.
One first reason is, a logical JOIN has many physical implementation, such as HASH JOIN, MERGE JOIN, BROADCAST JOIN, etc.
The second reason is, estimating cost for physical JOIN is also hard, because it depends on lots of factors such as row count, row size, data histogram, indexes, data layout, etc.
So, to keep everything simple in this guide, I will only implement 2 physical JOIN: HASH JOIN and MERGE JOIN. The cost estimation functions are fictional (just to show how it works, I'm not trying to correct it). And in the MERGE JOIN, all data is assuming to be sorted by join key.
Here is the code:
object Join {
def apply ( node : logicalplan. Join )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
val leftFields = node.on.map(_._1).map(f => s " ${f.table.id} . ${f.id} " )
val rightFields = node.on.map(_._2).map(f => s " ${f.table.id} . ${f.id} " )
Seq (
new HashJoinImpl (leftFields, rightFields),
new MergeJoinImpl (leftFields, rightFields)
)
}
}
The HASH JOIN:
class HashJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
private def viewSize ( plan : PhysicalPlan ) : Long = {
plan.estimations().estimatedLoopIterations * plan.estimations().estimatedRowSize
}
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
// reorder the child nodes, the left child is the child with smaller view size (smaller than the right child if we're store all of them in memory)
val (leftChild, rightChild) = if (viewSize(children( 0 )) < viewSize(children( 1 ))) {
(children( 0 ), children( 1 ))
} else {
(children( 1 ), children( 0 ))
}
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = HashJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = Set .empty // don't inherit trait from children since we're hash join
)
)
}
}
We can see that the cost function of HASH JOIN is composed of its children costs and estimations
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)Next, the MERGE JOIN:
class MergeJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val (leftChild, rightChild) = (children( 0 ), children( 1 ))
if (leftChild.traits().contains( " SORTED " ) && rightChild.traits().contains( " SORTED " )) {
val estimatedTotalRowCount =
leftChild.estimations().estimatedLoopIterations +
rightChild.estimations().estimatedLoopIterations
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = MergeJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = leftChild.traits() ++ rightChild.traits()
)
)
} else {
None
}
}
}
Same with HASH JOIN, MERGE JOIN also uses its children costs and estimations to calculate its cost, but with different formulla:
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)See HashJoinImpl.scala and MergeJoinImpl.scala for full implementation
You can see other rules and physical plan builders here:
Now, after done implementing the implementation rules, now we can find our best plan. Let's start over from the user query
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idwill be converted to the logical plan
graph TD
1326583549["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"];
-425111028["JOIN"];
-349388609["SCAN tbl1"];
1343755644["JOIN"];
-1043437086["SCAN tbl2"];
-1402686787["SCAN tbl3"];
1326583549 --> -425111028;
-425111028 --> -349388609;
-425111028 --> 1343755644;
1343755644 --> -1043437086;
1343755644 --> -1402686787;
After exploration phase, it will generate lots of equivalent plans
graph TD
subgraph Group#8
Expr#8["SCAN tbl2 (id, field1, field2)"]
Ende
subgraph Group#11
Expr#11["JOIN"]
Expr#14["JOIN"]
Ende
Expr#11 --> Group#7
Expr#11 --> Group#10
Expr#14 --> Group#8
Expr#14 --> Group#12
subgraph Group#2
Expr#2["SCAN tbl2"]
Ende
subgraph Group#5
Expr#5["JOIN"]
Expr#16["JOIN"]
Ende
Expr#5 --> Group#1
Expr#5 --> Group#4
Expr#16 --> Group#2
Expr#16 --> Group#13
subgraph Group#4
Expr#4["JOIN"]
Ende
Expr#4 --> Group#2
Expr#4 --> Group#3
subgraph Group#13
Expr#15["JOIN"]
Ende
Expr#15 --> Group#1
Expr#15 --> Group#3
subgraph Group#7
Expr#7["SCAN tbl1 (id, field1)"]
Ende
subgraph Group#1
Expr#1["SCAN tbl1"]
Ende
subgraph Group#10
Expr#10["JOIN"]
Ende
Expr#10 --> Group#8
Expr#10 --> Group#9
subgraph Group#9
Expr#9["SCAN tbl3 (id, field2)"]
Ende
subgraph Group#3
Expr#3["SCAN tbl3"]
Ende
subgraph Group#12
Expr#13["JOIN"]
Ende
Expr#13 --> Group#7
Expr#13 --> Group#9
subgraph Group#6
Expr#12["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
Expr#6["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
Ende
Expr#12 --> Group#11
Expr#6 --> Group#5
style Expr#12 stroke-width: 4px, stroke: orange
style Expr#8 stroke-width: 4px, stroke: orange
style Expr#10 stroke-width: 4px, stroke: orange
style Expr#13 stroke-width: 4px, stroke: orange
style Expr#14 stroke-width: 4px, stroke: orange
style Expr#11 stroke-width: 4px, stroke: orange
style Expr#9 stroke-width: 4px, stroke: orange
style Expr#15 stroke-width: 4px, stroke: orange
style Expr#7 stroke-width: 4px, stroke: orange
style Expr#16 stroke-width: 4px, stroke: orange
linkStyle 0 stroke-width: 4px, stroke: orange
linkStyle 15 stroke-width: 4px, stroke: orange
linkStyle 12 stroke-width: 4px, stroke: orange
linkStyle 1 stroke-width: 4px, stroke: orange
linkStyle 16 stroke-width: 4px, stroke: orange
linkStyle 13 stroke-width: 4px, stroke: orange
linkStyle 2 stroke-width: 4px, stroke: orange
linkStyle 6 stroke-width: 4px, stroke: orange
linkStyle 3 stroke-width: 4px, stroke: orange
linkStyle 10 stroke-width: 4px, stroke: orange
linkStyle 7 stroke-width: 4px, stroke: orange
linkStyle 14 stroke-width: 4px, stroke: orange
linkStyle 11 stroke-width: 4px, stroke: orange
And the at optimization phase, a final best plan is chose
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Now we've done building a functional query planner which can optimize the query from user, but our query plan could not run by itself. So it's the reason why now we will implement the query processor to test out our query plan.
Basically the query process receive input from the query planner, and execute them
graph LR
plan(("Physical Plan"))
storage[("Storage Layer")]
processor["Query Processor"]
plan -- execute --> processor
storage -- fetch --> processor
Volcano/iterator model is the query processing model that is widely used in many DBMS. It is a pipeline architecture, which means that the data is processed in stages, with each stage passing the output of the previous stage to the next stage.
Each stage in the pipeline is represented by an operator. Operators are functions that perform a specific operation on the data, such as selecting rows, filtering rows, or aggregating rows.
Usually, operator can be formed directly from the query plan. For example, the query
SELECT field_1
FROM tbl
WHERE field = 1will have the plan
graph TD
project["PROJECT: field_1"]
scan["SCAN: tbl"]
filter["FILTER: field = 1"]
project --> scan
filter --> project
will create a chain of operators like this:
scan = {
next() // fetch next row from table "tbl"
}
project = {
next() = {
next_row = scan.next() // fetch next row from scan operator
projected = next_row["field_1"]
return projected
}
}
filter = {
next() = {
next_row = {}
do {
next_row = project.next() // fetch next row from project operator
} while (next_row["field"] != 1)
return next_row
}
}
results = []
while (row = filter.next()) {
results.append(row)
}
notes : this pseudo code did not handle for end of result stream
The basic interface of an operator is described as following:
trait Operator {
def next () : Option [ Seq [ Any ]]
}
See Operator.scala for full implementation of all operators
Let's define a query
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idwith some data and stats
val table1 : Datasource = Datasource (
table = " emp " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" code " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id
),
rows = Seq (
Seq ( " 1 " , " Emp A " ),
Seq ( " 2 " , " Emp B " ),
Seq ( " 3 " , " Emp C " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " code " -> 32 )
)
)
val table2 : Datasource = Datasource (
table = " dept " ,
catalog = TableCatalog (
Seq (
" emp_id " -> classOf [ String ],
" dept_name " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by emp_id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " Dept 1 " ),
Seq ( " 1 " , " Dept 2 " ),
Seq ( " 2 " , " Dept 3 " ),
Seq ( " 3 " , " Dept 3 " )
),
stats = TableStats (
estimatedRowCount = 4 ,
avgColumnSize = Map ( " emp_id " -> 10 , " dept_name " -> 255 )
)
)
val table3 : Datasource = Datasource (
table = " emp_info " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" name " -> classOf [ String ],
" origin " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " AAAAA " , " Country A " ),
Seq ( " 2 " , " BBBBB " , " Country A " ),
Seq ( " 3 " , " CCCCC " , " Country B " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " name " -> 255 , " origin " -> 255 )
)
)The cost model is optimized for CPU
val costModel : CostModel = ( currentCost : Cost , newCost : Cost ) => {
currentCost.estimatedCpuCost > newCost.estimatedCpuCost
}Now, executing the query by running this code:
val planner = new VolcanoPlanner
QueryParser .parse(query) match {
case Left (err) => throw err
case Right (parsed) =>
val operator = planner.getPlan(parsed)
val result = Utils .execute(operator)
// print result
println(result._1.mkString( " , " ))
result._2.foreach(row => println(row.mkString( " , " )))
}it will print:
emp.id,emp.code,dept.dept_name,emp_info.name,emp_info.origin
1,Emp A,Dept 1,AAAAA,Country A
1,Emp A,Dept 2,AAAAA,Country A
2,Emp B,Dept 3,BBBBB,Country A
3,Emp C,Dept 3,CCCCC,Country B
Voila, We've done building a fully functional query planner and query engine :). You can start writing one for your own, good luck
See Demo.scala for full demo code
Thanks for reading this, this guide is quite long, and not fully correct, but I've tried my best to write it as understandably as possible ?