[marcador de posición]
Un planificador de consultas es un componente de un sistema de gestión de bases de datos (DBMS) que es responsable de generar un plan para ejecutar una consulta de base de datos. El plan de consulta especifica los pasos que tomarán el DBMS para recuperar los datos solicitados por la consulta. El objetivo del planificador de consultas es generar un plan que sea lo más eficiente posible, lo que significa que devolverá los datos al usuario lo más rápido posible.
Los planificadores de consultas son complejas piezas de software, y pueden ser difíciles de entender. Esta guía para implementar un planificador de consultas basado en costos le proporcionará una visión general paso a paso del proceso, cómo implementar su propio planificador de consultas basado en costos, al tiempo que cubre los conceptos básicos del planificador de consultas.
Escrito por AI, editado por humano
Esta guía está escrita para:
Objetivos:
Gráfico TD
usuario ((usuario))
analizador [Parser de consulta]
Planner [Planificador de consultas]
Ejecutor [Procesador de consulta]
Usuario -Consulta de texto -> analizador
analizador -AST -> Planner
Planner -Plan físico -> Ejecutor
La arquitectura básica de un motor de consulta consiste en esos componentes:
Normalmente, los planificadores de consultas se dividen en 2 tipos:
Heuristic Planner es el planificador de consultas que utilizó reglas predefinidas para generar un plan de consulta.
El planificador basado en costos es el planificador de consultas que se basa en el costo de generar consultas, intenta encontrar el plan óptimo basado en el costo de la consulta de entrada.
Si bien el planificador heurístico generalmente encuentra el mejor plan de aplicar reglas de transformación si sabe que el plan transformado es mejor, el planificador basado en costos encuentra el mejor plan por enumerados planes equivalentes e intenta encontrar el mejor plan entre ellos.
En el planificador de consultas basado en costos, generalmente se compone de fases:
En la fase de enumeraciones del plan, el planificador enumerará los posibles planes equivalentes.
Después de eso, en la fase de optimización de consultas, el planificador buscará el mejor plan de la lista de planes enumerados. El mejor plan es el plan que tiene el costo más bajo, que se define el modelo de costo (o función de costo).
Debido a que el plan natural del plan lógico es tener una estructura similar a un árbol, por lo que puede pensar que la optimización/búsqueda es en realidad un problema de búsqueda de árboles. Y hay muchos algoritmos de búsqueda de árboles aquí:
Notas: En teoría, es posible usar cualquier tipo de algoritmo de búsqueda de árboles. Sin embargo, en práctica no es factible ya que el tiempo de búsqueda aumenta cuando nuestro algoritmo de búsqueda es complejo
Notas: Las condiciones de terminación de búsqueda generalmente son:
El planificador de consultas de volcán (o el generador de optimizador de volcán) es un planificador de consultas basado en costos
Volcano Planner utiliza un enfoque de programación dinámica para encontrar el mejor plan de consulta de la lista de planes enumerados.
Detalles: https://ieeExplore.ieee.org/document/344061 (soy demasiado vago para explicar el documento aquí)
Aquí hay una gran explicación: https://15721.courses.cs.cmu.edu/spring2017/slides/15-optimizer2.pdf#page=9
Nuestro planificador de consultas es un planificador de consultas basado en costos, siguiendo la idea básica del planificador de consultas de volcán, nuestro planificador consistirá en 2 fases principales:
Gráfico LR
AST ((AST))
lógico_plan [plan]
explored_plans ["`
Plan #1
...
Plan #N
`"]
implementación_plan ["Plan #x (mejor plan)"]
AST -Convertir al plan lógico -> lógico_plan
Logical_plan -Fase de exploración -> Explored_plans
explored_plans -fase de optimización -> implementación_plan
LinkStyle 1,2 Color: Naranja, trazo: naranja, trazo de ancho: 5px
El plan lógico es la datos de datos que contiene la abstracción del paso de transformación requerido para ejecutar la consulta.
Aquí hay un ejemplo de un plan lógico:
Gráfico TD
1 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"];
2 ["unirse"];
3 ["escanear tbl1"];
4 ["unirse"];
5 ["escanear tbl2"];
6 ["escanear tbl3"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Si bien el plan lógico solo contiene la abstracción, el plan físico es la datos que contiene los detalles de implementación. Cada plan lógico tendrá múltiples planes físicos. Por ejemplo, una unión lógica podría tener muchos planes físicos como hash unirse, fusionar unirse, transmitir unirse, etc.
El grupo equivalente es un grupo de expresiones equivalentes (que para cada expresión, su plan lógico es lógicamente equivalente)
p.ej
Gráfico TD
Subgraph Group#8
Expr#8 ["Escanear TBL2 (Field1, Field2, ID)"]
fin
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Subgraph Group#11
Expr#11 ["unirse"]
fin
Expr#11 -> Grupo#7
Expr#11 -> Grupo#10
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#7
Expr#7 ["Escanear TBL1 (ID, Field1)"]
fin
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Grupo de subgraph#10
Expr#10 ["unirse"]
fin
Expr#10 -> Grupo#8
Expr#10 -> Grupo#9
Subgraph Group#9
Expr#9 ["Escanear TBL3 (ID, Field2)"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#12 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#12 -> Grupo#11
Expr#6 -> Grupo#5
Aquí podemos ver que Group#6 está teniendo 2 expresiones equivalentes, que representan la misma consulta (uno está haciendo escaneo desde la tabla y luego el proyecto, uno está empujando hacia abajo la proyección hacia abajo para escanear el nodo).
La regla de transformación es la regla para transformar de un plan lógico a otro plan lógico equivalente lógico
Por ejemplo, el plan:
Gráfico TD
1 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"];
2 ["unirse"];
3 ["escanear tbl1"];
4 ["unirse"];
5 ["escanear tbl2"];
6 ["escanear tbl3"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Cuando se aplica la transformación de empuje de proyección, se transforma en:
Gráfico TD
1 ["Proyecto *. *"];
2 ["unirse"];
3 ["escanear TBL1 (id, campo1)"];
4 ["unirse"];
5 ["Escanear TBL2 (Field1, Field2)"];
6 ["Escanear TBL3 (ID, Field2, Field2)"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
La regla de transformación puede ser afectada por rasgos/propiedades lógicas, como esquema de tabla, estadísticas de datos, etc.
La regla de implementación es la regla para devolver los planes físicos dado el plan lógico.
La regla de implementación puede ser afectada por rasgos/propiedades físicas, como el diseño de datos (ordenado o no), etc.
En la fase de exploración, el planificador aplicará reglas de transformación, generando planes lógicos equivalentes
Por ejemplo, el plan:
Gráfico TD
1326583549 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"];
-425111028 ["unirse"];
-349388609 ["escanear tbl1"];
1343755644 ["unirse"];
-1043437086 ["escanear tbl2"];
-1402686787 ["escanear tbl3"];
1326583549 -> -425111028;
-425111028 -> -349388609;
-425111028 -> 1343755644;
1343755644 -> -1043437086;
1343755644 -> -1402686787;
Después de aplicar reglas de transformación, dando como resultado el siguiente gráfico:
Gráfico TD
Subgraph Group#8
Expr#8 ["Escanear TBL2 (ID, Field1, Field2)"]
fin
Subgraph Group#11
Expr#11 ["unirse"]
Expr#14 ["unirse"]
fin
Expr#11 -> Grupo#7
Expr#11 -> Grupo#10
Expr#14 -> Grupo#8
Expr#14 -> Grupo#12
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
Expr#16 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Expr#16 -> Grupo#2
Expr#16 -> Grupo#13
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#13
Expr#15 ["unirse"]
fin
Expr#15 -> Grupo#1
Expr#15 -> Grupo#3
Subgraph Group#7
Expr#7 ["Escanear TBL1 (ID, Field1)"]
fin
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Grupo de subgraph#10
Expr#10 ["unirse"]
fin
Expr#10 -> Grupo#8
Expr#10 -> Grupo#9
Subgraph Group#9
Expr#9 ["Escanear TBL3 (ID, Field2)"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#12
Expr#13 ["unirse"]
fin
Expr#13 -> Grupo#7
Expr#13 -> Grupo#9
Grupo de subgraph#6
Expr#12 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#12 -> Grupo#11
Expr#6 -> Grupo#5
Aquí podemos ver que se aplican la regla de empuje de proyección y la regla de reorden de unión.
La fase de optimización es atravesar el árbol expandido en la fase de exploración, para encontrar el mejor plan para nuestra consulta.
Este "en realidad" es la optimización de búsqueda de árboles, por lo que puede usar cualquier algoritmo de búsqueda de árboles que pueda imaginar (pero debe asegurarse de que sea correcto).
Aquí está el ejemplo del plan físico generado después de la fase de optimización:
Gráfico TD
Grupo#6 ["
Grupo #6
Seleccionado: Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2
Operador: ProjectOperator
Costo: Costo (CPU = 641400.00, MEM = 1020400012.00, tiempo = 1000000.00)
"]
Grupo#6 -> Grupo#11
Grupo#11 ["
Grupo #11
Seleccionado: une
Operador: hashjoinoperador
Costo: Costo (CPU = 641400.00, MEM = 1020400012.00, tiempo = 1000000.00)
"]
Grupo#11 -> Grupo#7
Grupo#11 -> Grupo#10
Grupo#7 ["
Grupo #7
Seleccionado: Escanear TBL1 (ID, Field1)
Operador: Normalscanoperator
Costo: Costo (CPU = 400.00, MEM = 400000.00, tiempo = 1000.00)
"]
Grupo#10 ["
Grupo #10
Seleccionado: une
Operador: MergeJoinoperator
Rasgos: ordenado
Costo: Costo (CPU = 640000.00, MEM = 20000012.00, tiempo = 1100000.00)
"]
Grupo#10 -> Grupo#8
Grupo#10 -> Grupo#9
Grupo#8 ["
Grupo #8
Seleccionado: Escanear TBL2 (ID, Field1, Field2)
Operador: Normalscanoperator
Rasgos: ordenado
Costo: Costo (CPU = 600000.00, MEM = 12.00, tiempo = 1000000.00)
"]
Grupo#9 ["
Grupo #9
Seleccionado: Escanear TBL3 (ID, Field2)
Operador: Normalscanoperator
Rasgos: ordenado
Costo: Costo (CPU = 40000.00, MEM = 20000000.00, Tiempo = 100000.00)
"]
El plan generado ha mostrado el plan lógico seleccionado, el costo estimado y el operador físico
Nuestro planificador realizará una búsqueda de agotamiento para encontrar el mejor plan
Dado que el código del planificador es grande, por lo que no escribiré una guía paso a paso, pero explicaré cada pieza del código.
Aquí definiremos un lenguaje de consulta que usara a fondo este tutorial
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idEl lenguaje de consulta que implementaremos es un lenguaje similar a SQL. Sin embargo, en aras de la simplicidad, restringiremos su funcionalidad y sintaxis.
El lenguaje aparece en forma de
SELECT tbl . field , [...]
FROM tbl JOIN [...] Solo admitirá para SELECT y JOIN , también el campo en la instrucción selecta debe estar completamente calificado (en forma de table.field ), todas las demás funcionalidades no serán admitidas
Primero, tenemos que definir el AST para nuestro idioma. AST (o árbol de sintaxis abstracta) es un árbol utilizado para representar la estructura sintáctica de un texto.
Dado que nuestro lenguaje es tan simple, podemos definir la estructura AST en varias línea de códigos:
sealed trait Identifier
case class TableID ( id : String ) extends Identifier
case class FieldID ( table : TableID , id : String ) extends Identifier
sealed trait Statement
case class Table ( table : TableID ) extends Statement
case class Join ( left : Statement , right : Statement , on : Seq [( FieldID , FieldID )]) extends Statement
case class Select ( fields : Seq [ FieldID ], from : Statement ) extends Statement
Por ejemplo, una consulta
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idpuede representarse como
Select (
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ),
FieldID ( TableID ( " tbl1 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " id " ),
FieldID ( TableID ( " tbl2 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " id " ),
FieldID ( TableID ( " tbl3 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " field2 " )
),
Join (
Table ( TableID ( " tbl1 " )),
Join (
Table ( TableID ( " tbl2 " )),
Table ( TableID ( " tbl3 " )),
Seq (
FieldID ( TableID ( " tbl2 " ), " id " ) -> FieldID ( TableID ( " tbl3 " ), " id " )
)
),
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ) -> FieldID ( TableID ( " tbl2 " ), " id " )
)
)
)Después de definir la estructura AST, tendremos que escribir el analizador de consultas, que se utiliza para convertir la consulta de texto en forma AST.
Dado que esta guía está utilizando Scala para la implementación, elegiremos los combinadores de Scala-Parser para crear nuestro analizador de consultas.
Clase de analizador de consultas:
object QueryParser extends ParserWithCtx [ QueryExecutionContext , Statement ] with RegexParsers {
override def parse ( in : String )( implicit ctx : QueryExecutionContext ) : Either [ Throwable , Statement ] = {
Try (parseAll(statement, in) match {
case Success (result, _) => Right (result)
case NoSuccess (msg, _) => Left ( new Exception (msg))
}) match {
case util. Failure (ex) => Left (ex)
case util. Success (value) => value
}
}
private def select : Parser [ Select ] = ??? // we will implement it in later section
private def statement : Parser [ Statement ] = select
}
Luego defina algunas reglas de análisis:
// common
private def str : Parser [ String ] = """ [a-zA-Z0-9_]+ """ .r
private def fqdnStr : Parser [ String ] = """ [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ """ .r
// identifier
private def tableId : Parser [ TableID ] = str ^^ (s => TableID (s))
private def fieldId : Parser [ FieldID ] = fqdnStr ^^ { s =>
val identifiers = s.split( '.' )
if (identifiers.length != 2 ) {
throw new Exception ( " should never happen " )
} else {
val table = identifiers.head
val field = identifiers( 1 )
FieldID ( TableID (table), field)
}
} Aquí hay dos reglas, que se utilizan para analizar los identificadores: TableID y FieldID .
La ID de la tabla (o el nombre de la tabla) generalmente solo contiene caracteres, números y subrayos ( _ ), por lo que usaremos una regex simple [a-zA-Z0-9_]+ para identificar el nombre de la tabla.
Por otro lado, la identificación de campo (para el calificador de campo) en nuestro idioma es el nombre de campo completamente calificado. Normalmente está en forma de table.field , y el nombre de campo también generalmente contiene caracteres, números y subrayos, por lo que usaremos el Regex [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ para analizar el nombre de campo.
Después de definir las reglas para analizar los identificadores, ahora podemos definir reglas para analizar la declaración de consulta:
// statement
private def table : Parser [ Table ] = tableId ^^ (t => Table (t))
private def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) " La regla table es una regla simple, solo crea el nodo Table mediante el uso del TableID analizado de la regla tableId .
La subQuery , es la regla para analizar el subcontrol. En SQL, podemos escribir una consulta que se ve así:
SELECT a
FROM ( SELECT b FROM c) d El SELECT b FROM c es el subcontrol en la declaración anterior. Aquí, en nuestro lenguaje de consulta simple, indicaremos que una declaración es una subconsulta si está encerrada por un par de paréntesis (( () ). Dado que nuestro idioma solo tiene una declaración de selección, podemos escribir la regla de análisis de la siguiente manera:
def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) "Ahora definiremos las reglas de análisis para la declaración de selección:
private def fromSource : Parser [ Statement ] = table ||| subQuery
private def select : Parser [ Select ] =
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep(
" JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)
) ^^ {
case _ ~ fields ~ _ ~ src ~ joins =>
val p = if (joins.nonEmpty) {
def chain ( left : Statement , right : Seq [( Statement , Seq [( FieldID , FieldID )])]) : Join = {
if (right.isEmpty) {
throw new Exception ( " should never happen " )
} else if (right.length == 1 ) {
val next = right.head
Join (left, next._1, next._2)
} else {
val next = right.head
Join (left, chain(next._1, right.tail), next._2)
}
}
val temp = joins.map { join =>
val statement = join._1._1._2
val joinOn = join._2.map(on => on._1._1 -> on._2)
statement -> joinOn
}
chain(src, temp)
} else {
src
}
Select (fields, p)
}En SQL, podemos usar una subcontratación como fuente de unión. Por ejemplo:
SELECT * . *
FROM tbl1
JOIN ( SELECT * . * FROM tbl2)
JOIN tbl3Por lo tanto, nuestro analizador también implementará reglas para analizar el subcreador en la parte de unión de la declaración, por eso tenemos la regla de análisis:
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep( " JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)Ver QueryParser.Scala para la implementación completa
Ver QueryParserspec.Scala
Después de generar el AST a partir de la consulta de texto, podemos convertirlo directamente en el plan lógico
Primero, definamos la interfaz de nuestro plan lógico:
sealed trait LogicalPlan {
def children () : Seq [ LogicalPlan ]
}
children es la lista de plan lógico infantil. Por ejemplo:
Gráfico TD
1326583549 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"];
-425111028 ["unirse"];
-349388609 ["escanear tbl1"];
1343755644 ["unirse"];
-1043437086 ["escanear tbl2"];
-1402686787 ["escanear tbl3"];
1326583549 -> -425111028;
-425111028 -> -349388609;
-425111028 -> 1343755644;
1343755644 -> -1043437086;
1343755644 -> -1402686787;
Los nodos infantiles del nodo PROJECT son el primer nodo JOIN . El primer nodo JOIN tiene 2 hijos, que son el segundo nodo JOIN y SCAN tbl1 . Pronto, ...
Dado que nuestro lenguaje de consulta es simple, solo necesitamos 3 tipos de nodo lógico:
case class Scan ( table : ql. TableID , projection : Seq [ String ]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq .empty
}
case class Project ( fields : Seq [ql. FieldID ], child : LogicalPlan ) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (child)
}
case class Join ( left : LogicalPlan , right : LogicalPlan , on : Seq [(ql. FieldID , ql. FieldID )]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (left, right)
}
Entonces podemos escribir la función para convertir el AST en plan lógico:
def toPlan ( node : ql. Statement ) : LogicalPlan = {
node match {
case ql. Table (table) => Scan (table, Seq .empty)
case ql. Join (left, right, on) => Join (toPlan(left), toPlan(right), on)
case ql. Select (fields, from) => Project (fields, toPlan(from))
}
}Consulte LogicalPlan.scala para la implementación completa
Podemos definir clases para el grupo como lo siguiente:
case class Group (
id : Long ,
equivalents : mutable. HashSet [ GroupExpression ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
var implementation : Option [ GroupImplementation ] = None
}
case class GroupExpression (
id : Long ,
plan : LogicalPlan ,
children : mutable. MutableList [ Group ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
val appliedTransformations : mutable. HashSet [ TransformationRule ] = mutable. HashSet ()
}
Group es el conjunto de planes que son lógicamente equivalentes.
Cada GroupExpression representa un nodo de plan lógico. Dado que hemos definido un nodo de plan lógico tendrá una lista de nodos infantiles (en la sección anterior), y la expresión de GroupExpression representa un nodo del plan lógico, y el Group representa un conjunto de planes equivalentes, por lo que los hijos de GroupExpression son una lista de Group
p.ej
Gráfico TD
Subgraph Group#8
Expr#8
fin
Subgraph Group#2
Expr#2
fin
Subgraph Group#11
Expr#11
fin
Expr#11 -> Grupo#7
Expr#11 -> Grupo#10
Grupo de subgraph#5
Expr#5
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#7
Expr#7
fin
Grupo de subgraph#1
Expr#1
fin
Grupo de subgraph#10
Expr#10
fin
Expr#10 -> Grupo#8
Expr#10 -> Grupo#9
Subgraph Group#9
Expr#9
fin
Subgraph Group#3
Expr#3
fin
Grupo de subgraph#6
Expr#12
Expr#6
fin
Expr#12 -> Grupo#11
Expr#6 -> Grupo#5
Como podemos ver aquí, el Group#6 tiene 2 expresiones equivalentes: Expr#12 y Expr#6 , y los hijos de Expr#12 es Group#11
Notas: Implementaremos múltiples transformación de ronda en la fase de exploración, por lo que para cada Group y GroupExpression , tenemos una indicación ExplorationMark del estado de exploración.
class ExplorationMark {
private var bits : Long = 0
def get : Long = bits
def isExplored ( round : Int ) : Boolean = BitUtils .getBit(bits, round)
def markExplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = true )
def markUnexplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = false )
}
ExplorationMark es solo una clase de envoltorio de bitset, marcará el 1 de la 1 si se explora la ronda I-th, marque como 0 de lo contrario.
ExplorationMark también se puede utilizar para visualizar la transformación exacta, consulte la visualización para obtener más detalles
Memo es un grupo de ayudantes para ayudar a construir los grupos equivalentes. Memo consta de varios hashmap para almacenar en caché el grupo y la expresión del grupo, también proporcionan métodos para registrar la nueva expresión de grupo o grupo.
class Memo (
groupIdGenerator : Generator [ Long ] = new LongGenerator ,
groupExpressionIdGenerator : Generator [ Long ] = new LongGenerator
) {
val groups : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]()
val parents : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]() // lookup group from group expression ID
val groupExpressions : mutable. HashMap [ LogicalPlan , GroupExpression ] = mutable. HashMap [ LogicalPlan , GroupExpression ]()
def getOrCreateGroupExpression ( plan : LogicalPlan ) : GroupExpression = {
val children = plan.children()
val childGroups = children.map(child => getOrCreateGroup(child))
groupExpressions.get(plan) match {
case Some (found) => found
case None =>
val id = groupExpressionIdGenerator.generate()
val children = mutable. MutableList () ++ childGroups
val expression = GroupExpression (
id = id,
plan = plan,
children = children
)
groupExpressions += plan -> expression
expression
}
}
def getOrCreateGroup ( plan : LogicalPlan ) : Group = {
val exprGroup = getOrCreateGroupExpression(plan)
val group = parents.get(exprGroup.id) match {
case Some (group) =>
group.equivalents += exprGroup
group
case None =>
val id = groupIdGenerator.generate()
val equivalents = mutable. HashSet () + exprGroup
val group = Group (
id = id,
equivalents = equivalents
)
groups.put(id, group)
group
}
parents += exprGroup.id -> group
group
}
}
Ver memo.scala para la implementación completa
El primer paso dentro del planificador es la inicialización
Gráfico LR
consulta ((consulta))
AST ((AST))
root_plan ((rootplan))
root_group ((rootgroup))
Consulta -"QueryParser.Parse (consulta)" -> AST
AST -"LogicalPlan.toplan (AST)" -> root_plan
root_plan -"Memo.getorcreateGroup (rootplan)" -> root_group
Primero, la consulta se analizará en AST. Luego se convierte al plan lógico, llamado root plan , luego inicialice el grupo desde root plan , llamado root group .
def initialize ( query : Statement )( implicit ctx : VolcanoPlannerContext ) : Unit = {
ctx.query = query
ctx.rootPlan = LogicalPlan .toPlan(ctx.query)
ctx.rootGroup = ctx.memo.getOrCreateGroup(ctx.rootPlan)
// assuming this is first the exploration round,
// by marking the initialRound(0) as explored,
// it will be easier to visualize the different between rounds (added nodes, add connections)
ctx.memo.groups.values.foreach(_.explorationMark.markExplored(initialRound))
ctx.memo.groupExpressions.values.foreach(_.explorationMark.markExplored(initialRound))
}Ver volcanoplanner.scala para más detalles
Por ejemplo, la consulta:
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idDespués de la inicialización, los grupos se verán así:
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#6 -> Grupo#5
Aquí puedes ver que cada grupo tiene exactamente una expresión equivalente
Después de la inicialización, ahora es la fase de exploración, que explorará todos los planes equivalentes posibles.
El método de exploración es bastante simple:
Antes de sumergirnos en el código de exploración, hablemos primero sobre la regla de transformación.
La regla de transformación es una regla utilizada para transformar un plan lógico en otro plan lógico equivalente si coincide con la condición de la regla.
Aquí está la interfaz de la regla de transformación:
trait TransformationRule {
def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean
def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression
}
Dado que el plan lógico es una datos de datos en forma de árbol, por lo que la implementación de match de reglas de transformación es la coincidencia de patrones en el árbol.
Por ejemplo, aquí está la match que se utiliza para que coincida con el nodo del proyecto, mientras que también verifica si son descendientes que contienen unión y exploración solo:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}Este plan está "emparejado":
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Grupo de subgraph#1
Expr#1 ["escanear"]
fin
Subgraph Group#3
Expr#3 ["escanear"]
fin
Grupo de subgraph#6
Expr#6 ["Proyecto"]
fin
Expr#6 -> Grupo#5
Si bien este plan no es:
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#3
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["escanear"]
fin
Subgraph Group#7
Expr#7 ["Proyecto"]
fin
Expr#7 -> Grupo#6
Grupo de subgraph#1
Expr#1 ["escanear"]
fin
Subgraph Group#3
Expr#3 ["Proyecto"]
fin
Expr#3 -> Grupo#2
Grupo de subgraph#6
Expr#6 ["unirse"]
fin
Expr#6 -> Grupo#1
Expr#6 -> Grupo#5
Como hemos dicho antes, el método de exploración es:
Y aquí está el código de exploración (bastante simple, huh):
private def exploreGroup (
group : Group ,
rules : Seq [ TransformationRule ],
round : Int
)( implicit ctx : VolcanoPlannerContext ) : Unit = {
while ( ! group.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
// explore all child groups
group.equivalents.foreach { equivalent =>
if ( ! equivalent.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
equivalent.children.foreach { child =>
exploreGroup(child, rules, round)
if (equivalent.explorationMark.isExplored(round) && child.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
} else {
equivalent.explorationMark.markUnexplored(round)
}
}
}
// fire transformation rules to explore all the possible transformations
rules.foreach { rule =>
if ( ! equivalent.appliedTransformations.contains(rule) && rule.`match`(equivalent)) {
val transformed = rule.transform(equivalent)
if ( ! group.equivalents.contains(transformed)) {
group.equivalents += transformed
transformed.explorationMark.markUnexplored(round)
group.explorationMark.markUnexplored(round)
}
}
}
if (group.explorationMark.isExplored(round) && equivalent.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
} else {
group.explorationMark.markUnexplored(round)
}
}
}
}Ver volcanoplanner.scala para más detalles
Ahora es el momento de implementar algunas reglas de transformación
El impulso de proyección es una regla de transformación simple, utilizada para impulsar la proyección a la capa de almacenamiento.
Por ejemplo, la consulta
SELECT field1, field2
from tbltiene el plan
Gráfico LR
Proyecto [Proyecto Field1, Field2]
escanear [escanear TBL]
Proyecto -> escanear
Con este plan, al ejecutar, las filas de la capa de almacenamiento (bajo escaneo) se obtendrán por completo, y luego se eliminarán los campos innecesarios (proyecto). Los datos innecesarios aún tienen que pasar del nodo de escaneo al nodo del proyecto, por lo que hay algunos esfuerzos desperdiciados aquí.
Podemos mejorar simplemente simplemente decirle a la capa de almacenamiento solo obtener los campos necesarios. Ahora el plan se transformará en:
Gráfico LR
Proyecto [Proyecto Field1, Field2]
escanear ["escanear TBL (Field1, Field2)"]
Proyecto -> escanear
Vamos al código:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}Nuestra regla de impulso de proyección aquí coincidirá con el plan cuando sea el nodo del proyecto, y todos sus descendientes son solo nodo de escaneo y unión.
Notas: En realidad, la coincidencia de pushdown de proyección real es más compleja, pero en aras de la simplicidad, la regla del partido aquí es solo el nodo de proyecto con descendientes de escaneo y unión
Y aquí está el código de transformación:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Project ]
val pushDownProjection = mutable. ListBuffer [ FieldID ]()
extractProjections(plan, pushDownProjection)
val newPlan = Project (plan.fields, pushDown(pushDownProjection.distinct, plan.child))
ctx.memo.getOrCreateGroupExpression(newPlan)
}
private def extractProjections ( node : LogicalPlan , buffer : mutable. ListBuffer [ FieldID ]) : Unit = {
node match {
case Scan (_, _) => () : Unit
case Project (fields, parent) =>
buffer ++= fields
extractProjections(parent, buffer)
case Join (left, right, on) =>
buffer ++= on.map(_._1) ++ on.map(_._2)
extractProjections(left, buffer)
extractProjections(right, buffer)
}
}
private def pushDown ( pushDownProjection : Seq [ FieldID ], node : LogicalPlan ) : LogicalPlan = {
node match {
case Scan (table, tableProjection) =>
val filteredPushDownProjection = pushDownProjection.filter(_.table == table).map(_.id)
val updatedProjection =
if (filteredPushDownProjection.contains( " * " ) || filteredPushDownProjection.contains( " *.* " )) {
Seq .empty
} else {
(tableProjection ++ filteredPushDownProjection).distinct
}
Scan (table, updatedProjection)
case Join (left, right, on) => Join (pushDown(pushDownProjection, left), pushDown(pushDownProjection, right), on)
case _ => throw new Exception ( " should never happen " )
}
}El código de transformación primero encontrará todas las proyecciones del nodo del proyecto raíz y luego las empujará hacia abajo a todos los nodos de escaneo debajo.
Visualizar nuestra regla, por ejemplo, el plan
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#6 -> Grupo#5
Después de aplicar la transformación de empuje de proyección, dará como resultado un nuevo plan equivalente con las proyecciones se empujan hacia las operaciones de escaneo (el nuevo plan es el árbol con nodos de borde naranja).
Gráfico TD
Subgraph Group#8
Expr#8 ["Escanear TBL2 (ID, Field1, Field2)"]
fin
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Subgraph Group#11
Expr#11 ["unirse"]
fin
Expr#11 -> Grupo#7
Expr#11 -> Grupo#10
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#7
Expr#7 ["Escanear TBL1 (ID, Field1)"]
fin
Grupo de subgraph#10
Expr#10 ["unirse"]
fin
Expr#10 -> Grupo#8
Expr#10 -> Grupo#9
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Subgraph Group#9
Expr#9 ["Escanear TBL3 (ID, Field2)"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#12 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#12 -> Grupo#11
Expr#6 -> Grupo#5
Estilo expr#12 Width: 4px, trazo: naranja
Estilo Expr#8 Soteo-ancho: 4px, trazo: naranja
Estilo Expr#10 Width: 4px, Stroke: Orange
Estilo Expr#9 Apertura de trazo: 4px, trazo: naranja
Estilo expr#11 Width de trazo: 4px, trazo: naranja
Estilo Expr#7 Langua del trazo: 4px, trazo: naranja
LinkStyle 0 Stroke-Width: 4px, Stroke: Orange
LinkStyle 1 Width: 4px, Stroke: Orange
LinkStyle 6 Stroke-Width: 4px, Stroke: Orange
LinkStyle 7 Stroke-Width: 4px, Stroke: Orange
LinkStyle 8 Stroke-Width: 4px, Stroke: Orange
Ver ProjectionPushdown.Scala para la implementación completa
Join Reorder también es una de las transformaciones más reconocidas en el mundo del planificador de consultas. Nuestro planificador también implementará una regla de transformación de reorden.
Dado que unirse a Reorder en el mundo real no es una pieza fácil de implementar. Por lo tanto, implementaremos una versión simple y de estafa de la regla de Readorder Join.
Primero, el match de la regla:
// check if the tree only contains SCAN and JOIN nodes, and also extract all SCAN nodes and JOIN conditions
private def checkAndExtract (
node : LogicalPlan ,
buffer : mutable. ListBuffer [ Scan ],
joinCondBuffer : mutable. ListBuffer [(ql. FieldID , ql. FieldID )]
) : Boolean = {
node match {
case node @ Scan (_, _) =>
buffer += node
true
case Join (left, right, on) =>
joinCondBuffer ++= on
checkAndExtract(left, buffer, joinCondBuffer) && checkAndExtract(right, buffer, joinCondBuffer)
case _ => false
}
}
private def buildInterchangeableJoinCond ( conditions : Seq [(ql. FieldID , ql. FieldID )]) : Seq [ Seq [ql. FieldID ]] = {
val buffer = mutable. ListBuffer [mutable. Set [ql. FieldID ]]()
conditions.foreach { cond =>
val set = buffer.find { set =>
set.contains(cond._1) || set.contains(cond._2)
} match {
case Some (set) => set
case None =>
val set = mutable. Set [ql. FieldID ]()
buffer += set
set
}
set += cond._1
set += cond._2
}
buffer.map(_.toSeq)
}
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case node @ Join (_, _, _) =>
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
if (checkAndExtract(node, buffer, joinCondBuffer)) {
// only match if the join is 3 tables join
if (buffer.size == 3 ) {
var check = true
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
interChangeableCond.foreach { c =>
check &= c.size == 3
}
check
} else {
false
}
} else {
false
}
case _ => false
}
} Nuestra regla solo coincidirá, si coincidimos con la unión de 3 vías (el número de tabla involucrada debe ser 3, y la condición de unión debe ser de 3 vías, como tbl1.field1 = tbl2.field2 = tbl3.field3 )
Por ejemplo,
tbl1
JOIN tbl2 ON tbl1 . field1 = tbl2 . field2
JOIN tbl3 ON tbl1 . field1 = tbl3 . field3 La declaración de unión aquí se "combinará" ya que se une a 3 vías (es la unión entre tbl1 , tbl2 , tbl3 , y la condición es tbl1.field1 = tbl2.field2 = tbl3.field3 )
A continuación, es el código de transformación:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Join ]
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
checkAndExtract(plan, buffer, joinCondBuffer)
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
//
val scans = buffer.toList
implicit val ord : Ordering [ Scan ] = new Ordering [ Scan ] {
override def compare ( x : Scan , y : Scan ) : Int = {
val xStats = ctx.statsProvider.tableStats(x.table.id)
val yStats = ctx.statsProvider.tableStats(y.table.id)
xStats.estimatedTableSize.compareTo(yStats.estimatedTableSize)
}
}
def getJoinCond ( left : Scan , right : Scan ) : Seq [(ql. FieldID , ql. FieldID )] = {
val leftFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == left.table)
}
val rightFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == right.table)
}
if (leftFields.length != rightFields.length) {
throw new Exception ( s " leftFields.length( ${leftFields.length} ) != rightFields.length( ${rightFields.length} ) " )
} else {
leftFields zip rightFields
}
}
val sorted = scans.sorted
val newPlan = Join (
sorted( 0 ),
Join (
sorted( 1 ),
sorted( 2 ),
getJoinCond(sorted( 1 ), sorted( 2 ))
),
getJoinCond(sorted( 0 ), sorted( 1 ))
)
ctx.memo.getOrCreateGroupExpression(newPlan)
}El código de transformación aquí reordenará las tablas por su tamaño estimado.
Por ejemplo, si tenemos 3 tablas A, B, C con un tamaño estimado de 300B, 100B, 200B y una declaración de unión A JOIN B JOIN C B JOIN C JOIN A
Notas: Puede notar en este código, hemos utilizado estadísticas de tabla, para proporcionar una pista para transformar el plan. En práctica, el planificador puede usar todo tipo de estadísticas para ayudar a su transformación, como el tamaño de la tabla, el tamaño de la fila, el recuento nulo, el histograma, etc.
Visualizar nuestra regla, por ejemplo, el plan
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#6 -> Grupo#5
Después de unirse a la transformación de reorden, lo que resulta en
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
Expr#8 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Expr#8 -> Grupo#2
Expr#8 -> Grupo#7
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#7
Expr#7 ["unirse"]
fin
Expr#7 -> Grupo#1
Expr#7 -> Grupo#3
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#6 -> Grupo#5
Estilo Expr#8 Soteo-ancho: 4px, trazo: naranja
Estilo Expr#7 Langua del trazo: 4px, trazo: naranja
LinkStyle 2 Stroke-Width: 4px, Stroke: Orange
LinkStyle 6 Stroke-Width: 4px, Stroke: Orange
LinkStyle 3 Stroke-Width: 4px, Stroke: Orange
LinkStyle 7 Stroke-Width: 4px, Stroke: Orange
Podemos ver que tbl2 JOIN tbl1 JOIN tbl3 se crea a partir de tbl1 JOIN tbl2 JOIN tbl3 se genera mediante la transformación (los nodos y los bordes recién agregados están indicados por líneas naranjas)
Ver x3TableJoinRoRordBySize.Scala para la implementación completa
Ahora podemos poner nuestras reglas de transformación en un solo lugar
private val transformationRules : Seq [ Seq [ TransformationRule ]] = Seq (
Seq ( new ProjectionPushDown ),
Seq ( new X3TableJoinReorderBySize )
)Y ejecutarlos para explorar los grupos equivalentes
for (r <- transformationRules.indices) {
exploreGroup(ctx.rootGroup, transformationRules(r), r + 1 )
}Por ejemplo, el plan
Gráfico TD
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#6
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#6 -> Grupo#5
Después de ser explorado, dará como resultado este gráfico
Gráfico TD
Subgraph Group#8
Expr#8 ["Escanear TBL2 (ID, Field1, Field2)"]
fin
Subgraph Group#11
Expr#11 ["unirse"]
Expr#14 ["unirse"]
fin
Expr#11 -> Grupo#7
Expr#11 -> Grupo#10
Expr#14 -> Grupo#8
Expr#14 -> Grupo#12
Subgraph Group#2
Expr#2 ["escanear TBL2"]
fin
Grupo de subgraph#5
Expr#5 ["unirse"]
Expr#16 ["unirse"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Expr#16 -> Grupo#2
Expr#16 -> Grupo#13
Grupo de subgraph#4
Expr#4 ["unirse"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#13
Expr#15 ["unirse"]
fin
Expr#15 -> Grupo#1
Expr#15 -> Grupo#3
Subgraph Group#7
Expr#7 ["Escanear TBL1 (ID, Field1)"]
fin
Grupo de subgraph#1
Expr#1 ["escanear TBL1"]
fin
Grupo de subgraph#10
Expr#10 ["unirse"]
fin
Expr#10 -> Grupo#8
Expr#10 -> Grupo#9
Subgraph Group#9
Expr#9 ["Escanear TBL3 (ID, Field2)"]
fin
Subgraph Group#3
Expr#3 ["escanear TBL3"]
fin
Grupo de subgraph#12
Expr#13 ["unirse"]
fin
Expr#13 -> Grupo#7
Expr#13 -> Grupo#9
Grupo de subgraph#6
Expr#12 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
Expr#6 ["Proyecto TBL1.ID, TBL1.FIELD1, TBL2.ID, TBL2.FIELD1, TBL2.FIELD2, TBL3.ID, TBL3.FIELD2, TBL3.FIELD2"]
fin
Expr#12 -> Grupo#11
Expr#6 -> Grupo#5
Estilo expr#12 Width: 4px, trazo: naranja
Estilo Expr#8 Soteo-ancho: 4px, trazo: naranja
Estilo Expr#10 Width: 4px, Stroke: Orange
Estilo Expr#13 Apertura de trazo: 4px, Stroke: Orange
Estilo Expr#14 Aspecto de trazo: 4px, trazo: naranja
Estilo expr#11 Width de trazo: 4px, trazo: naranja
Estilo Expr#9 Apertura de trazo: 4px, trazo: naranja
Estilo Expr#15 Width: 4px, Stroke: Orange
Estilo Expr#7 Langua del trazo: 4px, trazo: naranja
Estilo Expr#16 Stroke-ancho: 4px, trazo: naranja
LinkStyle 0 Stroke-Width: 4px, Stroke: Orange
LinkStyle 15 Stroke-Width: 4px, Stroke: Orange
LinkStyle 12 Stroke-Width: 4px, Stroke: Orange
LinkStyle 1 Width: 4px, Stroke: Orange
LinkStyle 16 Stroke-Width: 4px, Stroke: Orange
LinkStyle 13 Stroke-Width: 4px, Stroke: Orange
LinkStyle 2 Stroke-Width: 4px, Stroke: Orange
LinkStyle 6 Stroke-Width: 4px, Stroke: Orange
LinkStyle 3 Stroke-Width: 4px, Stroke: Orange
LinkStyle 10 Stroke-Width: 4px, Stroke: Orange
LinkStyle 7 Stroke-Width: 4px, Stroke: Orange
LinkStyle 14 Stroke-Width: 4px, Stroke: Orange
LinkStyle 11 Stroke-Width: 4px, Stroke: Orange
Ver volcanoplanner.scala para más detalles
Después de la fase de exploración, ahora tenemos un árbol completamente expandido que contiene todos los planes posibles, ahora es la fase de optimización.
En esta fase, encontraremos el mejor plan para nuestro grupo raíz. El proceso de optimización se describe como lo siguiente:
Aquí hay un ejemplo
Gráfico TD
Subgraph Group#2 ["Grupo#2 (costo = 1)"]
Expr#2 ["Expr#2 (costo = 1)"]
fin
Subgraph Group#5 ["Grupo#5 (costo = 3)"]
Expr#5 ["Expr#5 (costo = max (3,2) = 3"]
fin
Expr#5 -> Grupo#1
Expr#5 -> Grupo#4
Subgraph Group#4 ["Grupo#4 (costo = 2)"]
Expr#4 ["Expr#4 (costo = max (1,2) = 2)"]
Expr#7 ["Expr#7 (costo = 1+2 = 3)"]
fin
Expr#4 -> Grupo#2
Expr#4 -> Grupo#3
Subgraph Group#1 ["Grupo#1 (costo = 3)"]
Expr#1 ["Expr#1 (costo = 3)"]
fin
Subgraph Group#3 ["Grupo#3 (costo = 2)"]
Expr#3 ["Expr#3 (costo = 2)"]
fin
Subgraph Group#6 ["Grupo#6 (costo = 4.5)"]
Expr#6 ["Expr#6 (costo = 3*1.5 = 4.5)"]
fin
Expr#6 -> Grupo#5
Subgraph Group#8 ["Grupo#8 (costo = 1)"]
Expr#8 ["Expr#8 (costo = 1)"]
fin
Subgraph Group#9 ["Grupo#9 (costo = 2)"]
Expr#9 ["Expr#9 (costo = 2)"]
fin
Expr#7 -> Grupo#8
Expr#7 -> Grupo#9
Por ejemplo, el costo Expr#4 es calculado por los costos de su grupo infantil ( Group#2 y Group#3 ) utilizando la función max . Otro ejemplo, es el Group#4 , su costo se calcula calculando el valor mínimo entre los costos de sus expresiones equivalentes.
Dado que el objetivo de la fase de optimización es producir el mejor plan físico dadas las expresiones grupales exploradas. Podemos definir el plan físico como lo siguiente:
sealed trait PhysicalPlan {
def operator () : Operator
def children () : Seq [ PhysicalPlan ]
def cost () : Cost
def estimations () : Estimations
def traits () : Set [ String ]
}
El operator es el operador físico, que solía ejecutar el plan, lo cubriremos en la sección posterior. Entonces children es la lista de nodos del plan infantil, están acostumbrados a participar en el proceso de cálculo de costos. El tercer atributo es cost , cost es una información de costo de posesión de objetos (como costo de CPU, costo de memoria, costo de IO, etc.). estimations son la propiedad que posee estadísticas estimadas sobre el plan (como el recuento de filas, el tamaño de la fila, etc.), también participa en el cálculo de los costos. Finalmente, traits son un conjunto de rasgos físicos, que afectan la regla de implementación para afectar el proceso de generación del plan físico.
A continuación, podemos implementar las clases de nodo físico:
case class Scan (
operator : Operator ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq .empty // scan do not receive any child
}
case class Project (
operator : Operator ,
child : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (child)
}
case class Join (
operator : Operator ,
leftChild : PhysicalPlan ,
rightChild : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (leftChild, rightChild)
}
Ver PhysicPlan.scala para la implementación completa
Lo primero en la fase de optimización, es decir, tenemos que implementar las reglas de implementación. La regla de implementación es la regla para convertir del plan lógico a los planes físicos sin ejecutarlos.
Como no estamos ejecutando directamente el plan físico en el planificador, por lo que devolveremos el creador de planes físicos, también es más fácil personalizar la función de costo para cada nodo.
Aquí está la interfaz de la regla de implementación:
trait PhysicalPlanBuilder {
def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ]
}
trait ImplementationRule {
def physicalPlanBuilders ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ]
}
Aquí el PhysicalPlanBuilder es la interfaz utilizada para construir el plan físico, dados los planes físicos del niño.
Por ejemplo, la unión lógica tiene 2 implementaciones físicas son hash unirse y fusionar unirse
Gráfico TD
Niño#1 ["Niño#1"]
Niño#2 ["Niño#2"]
Niño#3 ["Niño#3"]
Niño#4 ["Niño#4"]
hash_join ["` Hash unirse
costo = f (costo (niño#1), costo (niño#2))
`"]
Merge_Join ["` fusionar unirse
cost=g(cost(child#3),cost(child#4))
`"]
hash_join --> child#1
hash_join --> child#2
merge_join --> child#3
merge_join --> child#4
the HASH JOIN cost is using function f() to calculate cost, and MERGE JOIN is using function g() to calculate cost, both are using its children as function parameters. So it's easier to code if we're returning just the phyiscal plan builder from the implementation rule instead of the physical plan.
As we've said before, the optimization process is described as following:
And here is the code:
private def implementGroup ( group : Group , combinedRule : ImplementationRule )(
implicit ctx : VolcanoPlannerContext
) : GroupImplementation = {
group.implementation match {
case Some (implementation) => implementation
case None =>
var bestImplementation = Option .empty[ GroupImplementation ]
group.equivalents.foreach { equivalent =>
val physicalPlanBuilders = combinedRule.physicalPlanBuilders(equivalent)
val childPhysicalPlans = equivalent.children.map { child =>
val childImplementation = implementGroup(child, combinedRule)
child.implementation = Option (childImplementation)
childImplementation.physicalPlan
}
// calculate the implementation, and update the best cost for group
physicalPlanBuilders.flatMap(_.build(childPhysicalPlans)).foreach { physicalPlan =>
val cost = physicalPlan.cost()
bestImplementation match {
case Some (currentBest) =>
if (ctx.costModel.isBetter(currentBest.cost, cost)) {
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
case None =>
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
}
}
bestImplementation.get
}
}This code is an exhaustive search code, which is using recursive function to traverse all nodes. At each node (group), the function is called once to get its best plan while also calculate the optimal cost of that group.
Finally, the best plan for our query is the best plan of the root group
val implementationRules = new ImplementationRule {
override def physicalPlanBuilders (
expression : GroupExpression
)( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
expression.plan match {
case node @ Scan (_, _) => implement. Scan (node)
case node @ Project (_, _) => implement. Project (node)
case node @ Join (_, _, _) => implement. Join (node)
}
}
}
ctx.rootGroup.implementation = Option (implementGroup(ctx.rootGroup, implementationRules))See VolcanoPlanner.scala for full implementation
Here is an example of the plan after optimization, it's shown the selected logical node, the selected physical operator, and the estimated cost
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Next, we will implement some implementation rules.
The first, easiest one is the implementation rule of logical PROJECT
object Project {
def apply ( node : logicalplan. Project )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
Seq (
new ProjectionImpl (node.fields)
)
}
}
class ProjectionImpl ( projection : Seq [ql. FieldID ]) extends PhysicalPlanBuilder {
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val child = children.head
val selfCost = Cost (
estimatedCpuCost = 0 ,
estimatedMemoryCost = 0 ,
estimatedTimeCost = 0
) // assuming the cost of projection is 0
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + child.cost().estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + child.cost().estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + child.cost().estimatedTimeCost
)
val estimations = Estimations (
estimatedLoopIterations = child.estimations().estimatedLoopIterations,
estimatedRowSize = child.estimations().estimatedRowSize // just guessing the value
)
Some (
Project (
operator = ProjectOperator (projection, child.operator()),
child = child,
cost = cost,
estimations = estimations,
traits = child.traits()
)
)
}
}
The implementation rule for logical PROJECT, is returning one physical plan builder ProjectionImpl . ProjectionImpl cost calculation is simple, it just inherits the cost from the child node (because the projection is actually not doing any intensive operation). Beside that, it also updates the estimation (in this code, estimation is also inherit from the child node)
See Project.scala for full implementation
Writing implementation rule for logical JOIN is way harder than PROJECTION.
One first reason is, a logical JOIN has many physical implementation, such as HASH JOIN, MERGE JOIN, BROADCAST JOIN, etc.
The second reason is, estimating cost for physical JOIN is also hard, because it depends on lots of factors such as row count, row size, data histogram, indexes, data layout, etc.
So, to keep everything simple in this guide, I will only implement 2 physical JOIN: HASH JOIN and MERGE JOIN. The cost estimation functions are fictional (just to show how it works, I'm not trying to correct it). And in the MERGE JOIN, all data is assuming to be sorted by join key.
Here is the code:
object Join {
def apply ( node : logicalplan. Join )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
val leftFields = node.on.map(_._1).map(f => s " ${f.table.id} . ${f.id} " )
val rightFields = node.on.map(_._2).map(f => s " ${f.table.id} . ${f.id} " )
Seq (
new HashJoinImpl (leftFields, rightFields),
new MergeJoinImpl (leftFields, rightFields)
)
}
}
The HASH JOIN:
class HashJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
private def viewSize ( plan : PhysicalPlan ) : Long = {
plan.estimations().estimatedLoopIterations * plan.estimations().estimatedRowSize
}
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
// reorder the child nodes, the left child is the child with smaller view size (smaller than the right child if we're store all of them in memory)
val (leftChild, rightChild) = if (viewSize(children( 0 )) < viewSize(children( 1 ))) {
(children( 0 ), children( 1 ))
} else {
(children( 1 ), children( 0 ))
}
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = HashJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = Set .empty // don't inherit trait from children since we're hash join
)
)
}
}
We can see that the cost function of HASH JOIN is composed of its children costs and estimations
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)Next, the MERGE JOIN:
class MergeJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val (leftChild, rightChild) = (children( 0 ), children( 1 ))
if (leftChild.traits().contains( " SORTED " ) && rightChild.traits().contains( " SORTED " )) {
val estimatedTotalRowCount =
leftChild.estimations().estimatedLoopIterations +
rightChild.estimations().estimatedLoopIterations
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = MergeJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = leftChild.traits() ++ rightChild.traits()
)
)
} else {
None
}
}
}
Same with HASH JOIN, MERGE JOIN also uses its children costs and estimations to calculate its cost, but with different formulla:
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)See HashJoinImpl.scala and MergeJoinImpl.scala for full implementation
You can see other rules and physical plan builders here:
Now, after done implementing the implementation rules, now we can find our best plan. Let's start over from the user query
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idwill be converted to the logical plan
graph TD
1326583549["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"];
-425111028["JOIN"];
-349388609["SCAN tbl1"];
1343755644["JOIN"];
-1043437086["SCAN tbl2"];
-1402686787["SCAN tbl3"];
1326583549 --> -425111028;
-425111028 --> -349388609;
-425111028 --> 1343755644;
1343755644 --> -1043437086;
1343755644 --> -1402686787;
After exploration phase, it will generate lots of equivalent plans
graph TD
subgraph Group#8
Expr#8["SCAN tbl2 (id, field1, field2)"]
fin
subgraph Group#11
Expr#11["JOIN"]
Expr#14["JOIN"]
fin
Expr#11 --> Group#7
Expr#11 --> Group#10
Expr#14 --> Group#8
Expr#14 --> Group#12
subgraph Group#2
Expr#2["SCAN tbl2"]
fin
subgraph Group#5
Expr#5["JOIN"]
Expr#16["JOIN"]
fin
Expr#5 --> Group#1
Expr#5 --> Group#4
Expr#16 --> Group#2
Expr#16 --> Group#13
subgraph Group#4
Expr#4["JOIN"]
fin
Expr#4 --> Group#2
Expr#4 --> Group#3
subgraph Group#13
Expr#15["JOIN"]
fin
Expr#15 --> Group#1
Expr#15 --> Group#3
subgraph Group#7
Expr#7["SCAN tbl1 (id, field1)"]
fin
subgraph Group#1
Expr#1["SCAN tbl1"]
fin
subgraph Group#10
Expr#10["JOIN"]
fin
Expr#10 --> Group#8
Expr#10 --> Group#9
subgraph Group#9
Expr#9["SCAN tbl3 (id, field2)"]
fin
subgraph Group#3
Expr#3["SCAN tbl3"]
fin
subgraph Group#12
Expr#13["JOIN"]
fin
Expr#13 --> Group#7
Expr#13 --> Group#9
subgraph Group#6
Expr#12["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
Expr#6["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fin
Expr#12 --> Group#11
Expr#6 --> Group#5
style Expr#12 stroke-width: 4px, stroke: orange
style Expr#8 stroke-width: 4px, stroke: orange
style Expr#10 stroke-width: 4px, stroke: orange
style Expr#13 stroke-width: 4px, stroke: orange
style Expr#14 stroke-width: 4px, stroke: orange
style Expr#11 stroke-width: 4px, stroke: orange
style Expr#9 stroke-width: 4px, stroke: orange
style Expr#15 stroke-width: 4px, stroke: orange
style Expr#7 stroke-width: 4px, stroke: orange
style Expr#16 stroke-width: 4px, stroke: orange
linkStyle 0 stroke-width: 4px, stroke: orange
linkStyle 15 stroke-width: 4px, stroke: orange
linkStyle 12 stroke-width: 4px, stroke: orange
linkStyle 1 stroke-width: 4px, stroke: orange
linkStyle 16 stroke-width: 4px, stroke: orange
linkStyle 13 stroke-width: 4px, stroke: orange
linkStyle 2 stroke-width: 4px, stroke: orange
linkStyle 6 stroke-width: 4px, stroke: orange
linkStyle 3 stroke-width: 4px, stroke: orange
linkStyle 10 stroke-width: 4px, stroke: orange
linkStyle 7 stroke-width: 4px, stroke: orange
linkStyle 14 stroke-width: 4px, stroke: orange
linkStyle 11 stroke-width: 4px, stroke: orange
And the at optimization phase, a final best plan is chose
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Now we've done building a functional query planner which can optimize the query from user, but our query plan could not run by itself. So it's the reason why now we will implement the query processor to test out our query plan.
Basically the query process receive input from the query planner, and execute them
graph LR
plan(("Physical Plan"))
storage[("Storage Layer")]
processor["Query Processor"]
plan -- execute --> processor
storage -- fetch --> processor
Volcano/iterator model is the query processing model that is widely used in many DBMS. It is a pipeline architecture, which means that the data is processed in stages, with each stage passing the output of the previous stage to the next stage.
Each stage in the pipeline is represented by an operator. Operators are functions that perform a specific operation on the data, such as selecting rows, filtering rows, or aggregating rows.
Usually, operator can be formed directly from the query plan. For example, the query
SELECT field_1
FROM tbl
WHERE field = 1will have the plan
graph TD
project["PROJECT: field_1"]
scan["SCAN: tbl"]
filter["FILTER: field = 1"]
project --> scan
filter --> project
will create a chain of operators like this:
scan = {
next() // fetch next row from table "tbl"
}
project = {
next() = {
next_row = scan.next() // fetch next row from scan operator
projected = next_row["field_1"]
return projected
}
}
filter = {
next() = {
next_row = {}
do {
next_row = project.next() // fetch next row from project operator
} while (next_row["field"] != 1)
return next_row
}
}
results = []
while (row = filter.next()) {
results.append(row)
}
notes : this pseudo code did not handle for end of result stream
The basic interface of an operator is described as following:
trait Operator {
def next () : Option [ Seq [ Any ]]
}
See Operator.scala for full implementation of all operators
Let's define a query
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idwith some data and stats
val table1 : Datasource = Datasource (
table = " emp " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" code " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id
),
rows = Seq (
Seq ( " 1 " , " Emp A " ),
Seq ( " 2 " , " Emp B " ),
Seq ( " 3 " , " Emp C " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " code " -> 32 )
)
)
val table2 : Datasource = Datasource (
table = " dept " ,
catalog = TableCatalog (
Seq (
" emp_id " -> classOf [ String ],
" dept_name " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by emp_id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " Dept 1 " ),
Seq ( " 1 " , " Dept 2 " ),
Seq ( " 2 " , " Dept 3 " ),
Seq ( " 3 " , " Dept 3 " )
),
stats = TableStats (
estimatedRowCount = 4 ,
avgColumnSize = Map ( " emp_id " -> 10 , " dept_name " -> 255 )
)
)
val table3 : Datasource = Datasource (
table = " emp_info " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" name " -> classOf [ String ],
" origin " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " AAAAA " , " Country A " ),
Seq ( " 2 " , " BBBBB " , " Country A " ),
Seq ( " 3 " , " CCCCC " , " Country B " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " name " -> 255 , " origin " -> 255 )
)
)The cost model is optimized for CPU
val costModel : CostModel = ( currentCost : Cost , newCost : Cost ) => {
currentCost.estimatedCpuCost > newCost.estimatedCpuCost
}Now, executing the query by running this code:
val planner = new VolcanoPlanner
QueryParser .parse(query) match {
case Left (err) => throw err
case Right (parsed) =>
val operator = planner.getPlan(parsed)
val result = Utils .execute(operator)
// print result
println(result._1.mkString( " , " ))
result._2.foreach(row => println(row.mkString( " , " )))
}it will print:
emp.id,emp.code,dept.dept_name,emp_info.name,emp_info.origin
1,Emp A,Dept 1,AAAAA,Country A
1,Emp A,Dept 2,AAAAA,Country A
2,Emp B,Dept 3,BBBBB,Country A
3,Emp C,Dept 3,CCCCC,Country B
Voila, We've done building a fully functional query planner and query engine :). You can start writing one for your own, good luck
See Demo.scala for full demo code
Thanks for reading this, this guide is quite long, and not fully correct, but I've tried my best to write it as understandably as possible ?