[espaço reservado]
Um planejador de consulta é um componente de um sistema de gerenciamento de banco de dados (DBMS) responsável por gerar um plano para executar uma consulta de banco de dados. O plano de consulta especifica as etapas que o DBMS tomará para recuperar os dados solicitados pela consulta. O objetivo do planejador de consultas é gerar um plano o mais eficiente possível, o que significa que ele retornará os dados ao usuário o mais rápido possível.
Os planejadores de consultas são peças complexas de software e podem ser difíceis de entender. Este guia para a implementação de um planejador de consultas baseado em custos fornecerá uma visão geral passo a passo do processo, como implementar seu próprio planejador de consulta baseado em custos, enquanto ainda cobre os conceitos básicos de planejador de consulta.
Escrito por IA, editado por Human
Este guia está escrito para:
Metas:
Gráfico TD
Usuário ((usuário))
Analisador [Adirador de consulta]
Planejador [Consulta Planner]
Executor [Processador de consulta]
Usuário -Consulta de texto -> analisador
analisador -ast -> planejador
Planejador -Plano Físico -> Executor
A arquitetura básica de um mecanismo de consulta é consistida nesses componentes:
Normalmente, os planejadores de consulta são divididos em 2 tipos:
O planejador heurístico é o planejador de consultas que usou regras predefinidas para gerar plano de consulta.
O planejador baseado em custos é o planejador de consultas que, com base no custo de gerar consulta, tenta encontrar o plano ideal com base no custo da consulta de entrada.
Embora o planejador heurístico geralmente encontre o melhor plano, aplique regras de transformação se souber que o plano transformado é melhor, o planejador baseado em custos encontra o melhor plano por enumerar planos equivalentes e tentar encontrar o melhor plano entre eles.
No planejador de consultas baseado em custos, geralmente é composto de fases:
Na fase de enumerações do plano, o planejador enumerará os possíveis planos equivalentes.
Depois disso, na fase de otimização de consultas, o planejador procurará o melhor plano da lista de planos enumerados. O melhor plano é o plano com o menor custo, que o modelo de custo (ou função de custo) é definido.
Como o natural do plano lógico é ter uma estrutura semelhante a uma árvore, para que você possa pensar que a otimização/pesquisa é realmente um problema de pesquisa de árvores. E há muitos algoritmos de busca de árvores aqui:
Notas: Em teoria, é possível usar qualquer tipo de algoritmo de busca de árvores. No entanto, em prático, não é viável, pois o tempo de pesquisa aumenta quando nosso algoritmo de pesquisa é complexo
Notas: As condições de terminação de pesquisa geralmente são:
Planejador de consultas do vulcão (ou gerador de otimizador de vulcão) é um planejador de consulta baseado em custos
O Planner Volcano usa a abordagem de programação dinâmica para encontrar o melhor plano de consulta da lista de planos enumerados.
Detalhes: https://ieeexplore.ieee.org/document/344061 (estou com preguiça de explicar o artigo aqui)
Aqui está uma ótima explicação: https://15721.courses.cs.cmu.edu/spring2017/slides/15-optimizer2.pdf#page=9
Nosso planejador de consultas, é um planejador de consultas baseado em custos, seguindo a idéia básica do planejador de consultas do vulcão que nosso planejador será consistido em duas fases principais:
Gráfico LR
AST ((AST))
Logical_Plan [Plano]
explorado_plans ["`
Plano #1
...
Plano #n
`"]
implementação_plan ["Plano #x (melhor plano)"]
AST -converta para plano lógico -> Logical_Plan
Logical_Plan -Fase de exploração -> Explored_plans
Explored_plans -Fase de otimização -> Implementation_Plan
LinkStyle 1,2 Cor: Orange, AVC: laranja, largura de derrame: 5px
Plano lógico é aestrutura de dados que mantém a etapa de abstração da transformação necessária para executar a consulta.
Aqui está um exemplo de um plano lógico:
Gráfico TD
1 ["Projeto TBL1.ID, Tbl1.Field1, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"];
2 ["junção"];
3 ["Scan TBL1"];
4 ["Join"];
5 ["Scan TBL2"];
6 ["Scan TBL3"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Embora o Plano Lógico mantenha apenas a abstração, o Plano Físico é aestrutura de dados que mantém os detalhes da implementação. Cada plano lógico terá vários planos físicos. Por exemplo, uma junção lógica pode ter muitos planos físicos, como hash se juntando, junção de mesclagem, junção de transmissão etc.
Grupo equivalente é um grupo de expressões equivalentes (que para cada expressão, seu plano lógico é logicamente equivalente)
por exemplo
Gráfico TD
Grupo de subgrafias#8
EXPR#8 ["Scan Tbl2 (Field1, Field2, ID)"]
fim
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafes#11
Expr#11 ["JONE"]
fim
EXPR#11 -> Grupo#7
EXPR#11 -> Grupo nº 10
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#7
Expr#7 ["Scan TBL1 (ID, Field1)"]
fim
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#10
Expr#10 ["Join"]
fim
EXPR#10 -> Grupo#8
EXPR#10 -> Grupo#9
Grupo de subgrafias#9
Expr#9 ["Scan TBL3 (ID, Field2)"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
EXPR#12 ["Project tbl1.id, tbl1.field1, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
EXPR#6 ["Project tbl1.id, tbl1.field1, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#12 -> Grupo#11
EXPR#6 -> Grupo#5
Aqui podemos ver que Group#6 está tendo 2 expressões equivalentes, que representam a mesma consulta (uma está fazendo a digitalização da tabela e depois o projeto, um está empurrando a projeção para o nó de digitalização).
A regra de transformação é a regra de se transformar de um plano lógico para outro plano lógico equivalente lógico
Por exemplo, o plano:
Gráfico TD
1 ["Projeto TBL1.ID, Tbl1.Field1, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"];
2 ["junção"];
3 ["Scan TBL1"];
4 ["Join"];
5 ["Scan TBL2"];
6 ["Scan TBL3"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
Quando aplicar a transformação de projeção, é transformada em:
Gráfico TD
1 ["Projeto *. *"];
2 ["junção"];
3 ["Scan TBL1 (ID, Field1)"];
4 ["Join"];
5 ["Scan Tbl2 (Field1, Field2)"];
6 ["Scan TBL3 (ID, Field2, Field2)"];
1 -> 2;
2 -> 3;
2 -> 4;
4 -> 5;
4 -> 6;
A regra de transformação pode ser afetada por características/propriedades lógicas, como esquema de tabela, estatísticas de dados, etc.
A regra de implementação é a regra de devolver os planos físicos, dado o plano lógico.
A regra de implementação pode ser afetada por características/propriedades físicas, como layout de dados (classificadas ou não), etc.
Na fase de exploração, o planejador aplicará regras de transformação, gerando planos lógicos equivalentes
Por exemplo, o plano:
Gráfico TD
1326583549 ["Projeto TBL1.ID, Tbl1.Field1, Tbl2.id, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"];
-425111028 ["junção"];
-349388609 ["Scan TBL1"];
1343755644 ["junção"];
-1043437086 ["Scan TBL2"];
-1402686787 ["Scan Tbl3"];
1326583549 -> -425111028;
-425111028 -> -349388609;
-425111028 -> 1343755644;
1343755644 -> -1043437086;
1343755644 -> -1402686787;
Após a aplicação de regras de transformação, resultando no gráfico a seguir:
Gráfico TD
Grupo de subgrafias#8
EXPR#8 ["Scan Tbl2 (ID, Field1, Field2)"]
fim
Grupo de subgrafes#11
Expr#11 ["JONE"]
EXPR#14 ["Join"]
fim
EXPR#11 -> Grupo#7
EXPR#11 -> Grupo nº 10
EXPR#14 -> Grupo#8
EXPR#14 -> Grupo#12
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
Expr#16 ["Join"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
EXPR#16 -> Grupo#2
EXPR#16 -> Grupo#13
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#13
Expr#15 ["Join"]
fim
EXPR#15 -> Grupo#1
EXPR#15 -> Grupo#3
Grupo de subgrafias#7
Expr#7 ["Scan TBL1 (ID, Field1)"]
fim
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#10
Expr#10 ["Join"]
fim
EXPR#10 -> Grupo#8
EXPR#10 -> Grupo#9
Grupo de subgrafias#9
Expr#9 ["Scan TBL3 (ID, Field2)"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#12
Expr#13 ["Join"]
fim
EXPR#13 -> Grupo#7
EXPR#13 -> Grupo#9
Grupo de subgrafias#6
EXPR#12 ["Projeto TBL1.id, Tbl1.Field1, Tbl2.id, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"]
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#12 -> Grupo#11
EXPR#6 -> Grupo#5
Aqui podemos ver que a regra de pushdown de projeção e a regra de reordenação de junta -se são aplicadas.
A fase de otimização é atravessar a árvore expandida na fase de exploração, para encontrar o melhor plano para nossa consulta.
Isso "na verdade" é a otimização de pesquisa de árvores, para que você possa usar qualquer algoritmo de pesquisa de árvore que possa imaginar (mas precisa garantir que esteja correto).
Aqui está o exemplo do plano físico gerado após a fase de otimização:
Gráfico TD
Grupo#6 ["
Grupo #6
Selecionado: Projeto Tbl1.id, Tbl1.field1, Tbl2.id, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2
Operador: ProjectOperator
Custo: Custo (CPU = 641400,00, MEM = 1020400012.00, tempo = 1000000,00)
"]
Grupo#6 -> Grupo#11
Grupo#11 ["
Grupo #11
Selecionado: junte -se
Operador: HashjoinOperator
Custo: Custo (CPU = 641400,00, MEM = 1020400012.00, tempo = 1000000,00)
"]
Grupo#11 -> Grupo#7
Grupo#11 -> Grupo nº 10
Grupo#7 ["
Grupo #7
Selecionado: Scan TBL1 (ID, Field1)
Operador: NormalSCanoperator
Custo: Custo (CPU = 400,00, MEM = 400000,00, tempo = 1000,00)
"]
Grupo#10 ["
Grupo #10
Selecionado: junte -se
Operador: MergeJoinOperator
Traços: classificados
Custo: Custo (CPU = 640000,00, MEM = 20000012.00, tempo = 1100000,00)
"]
Grupo#10 -> Grupo#8
Grupo#10 -> Grupo#9
Grupo#8 ["
Grupo #8
Selecionado: Scan Tbl2 (ID, Field1, Field2)
Operador: NormalSCanoperator
Traços: classificados
Custo: Custo (CPU = 600000,00, MEM = 12,00, tempo = 1000000,00)
"]
Grupo#9 ["
Grupo #9
Selecionado: Scan TBL3 (ID, Field2)
Operador: NormalSCanoperator
Traços: classificados
Custo: Custo (CPU = 40000,00, MEM = 20000000.00, tempo = 100000,00)
"]
O plano gerado mostrou o plano lógico selecionado, o custo estimado e o operador físico
Nosso planejador fará uma pesquisa de exaustão para encontrar o melhor plano
Como o código do planejador é grande, então não vou escrever um guia passo a passo, mas explicarei todas as partes do código em vez disso
Aqui vamos definir uma linguagem de consulta que usou minuciosamente este tutorial
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idA linguagem de consulta que implementaremos é uma linguagem do tipo SQL. No entanto, por uma questão de simplicidade, restringiremos sua funcionalidade e sintaxe.
O idioma aparece na forma de
SELECT tbl . field , [...]
FROM tbl JOIN [...] Ele apoiará apenas para SELECT e JOIN , também o campo em instrução SELECT deve ser totalmente qualificada (na forma de table.field ), todas as outras funcionalidades não serão suportadas
Primeiro, temos que definir o AST para o nosso idioma. AST (ou árvore de sintaxe abstrata) é uma árvore usada para representar a estrutura sintática de um texto.
Como nosso idioma é tão simples, podemos definir a estrutura AST em várias linhas de códigos:
sealed trait Identifier
case class TableID ( id : String ) extends Identifier
case class FieldID ( table : TableID , id : String ) extends Identifier
sealed trait Statement
case class Table ( table : TableID ) extends Statement
case class Join ( left : Statement , right : Statement , on : Seq [( FieldID , FieldID )]) extends Statement
case class Select ( fields : Seq [ FieldID ], from : Statement ) extends Statement
Por exemplo, uma consulta
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idpode ser representado como
Select (
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ),
FieldID ( TableID ( " tbl1 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " id " ),
FieldID ( TableID ( " tbl2 " ), " field1 " ),
FieldID ( TableID ( " tbl2 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " id " ),
FieldID ( TableID ( " tbl3 " ), " field2 " ),
FieldID ( TableID ( " tbl3 " ), " field2 " )
),
Join (
Table ( TableID ( " tbl1 " )),
Join (
Table ( TableID ( " tbl2 " )),
Table ( TableID ( " tbl3 " )),
Seq (
FieldID ( TableID ( " tbl2 " ), " id " ) -> FieldID ( TableID ( " tbl3 " ), " id " )
)
),
Seq (
FieldID ( TableID ( " tbl1 " ), " id " ) -> FieldID ( TableID ( " tbl2 " ), " id " )
)
)
)Depois de definir a estrutura AST, teremos que escrever o analisador de consulta, que é usado para converter a consulta de texto em forma de AST.
Como este guia está usando o Scala para implementação, escolheremos os combinadores de scala-parser para criar nosso analisador de consulta.
Classe de analisador de consulta:
object QueryParser extends ParserWithCtx [ QueryExecutionContext , Statement ] with RegexParsers {
override def parse ( in : String )( implicit ctx : QueryExecutionContext ) : Either [ Throwable , Statement ] = {
Try (parseAll(statement, in) match {
case Success (result, _) => Right (result)
case NoSuccess (msg, _) => Left ( new Exception (msg))
}) match {
case util. Failure (ex) => Left (ex)
case util. Success (value) => value
}
}
private def select : Parser [ Select ] = ??? // we will implement it in later section
private def statement : Parser [ Statement ] = select
}
Em seguida, defina algumas regras de análise:
// common
private def str : Parser [ String ] = """ [a-zA-Z0-9_]+ """ .r
private def fqdnStr : Parser [ String ] = """ [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ """ .r
// identifier
private def tableId : Parser [ TableID ] = str ^^ (s => TableID (s))
private def fieldId : Parser [ FieldID ] = fqdnStr ^^ { s =>
val identifiers = s.split( '.' )
if (identifiers.length != 2 ) {
throw new Exception ( " should never happen " )
} else {
val table = identifiers.head
val field = identifiers( 1 )
FieldID ( TableID (table), field)
}
} Aqui estão duas regras, que são usadas para analisar os identificadores: TableID e FieldID .
ID da tabela (ou nome da tabela) geralmente contém apenas caracteres, números e sublinhados ( _ ), por isso usaremos um regex simples [a-zA-Z0-9_]+ para identificar o nome da tabela.
Por outro lado, o ID de campo (para qualificador de campo) em nosso idioma é de nome de campo totalmente qualificado. Normalmente, ele está na forma de table.field , e o nome do campo também geralmente contém apenas caracteres, números e sublinhados, por isso usaremos o regex [a-zA-Z0-9_]+.[a-zA-Z0-9_]+ para analisar o nome do campo.
Depois de definir as regras para analisar os identificadores, agora podemos definir regras para analisar a declaração de consulta:
// statement
private def table : Parser [ Table ] = tableId ^^ (t => Table (t))
private def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) " A regra table é uma regra simples, apenas cria o nó Table usando o TableID analisado da regra tableId .
A subQuery , é a regra para analisar a sub-quadra. No SQL, podemos escrever uma consulta que se parecia assim:
SELECT a
FROM ( SELECT b FROM c) d O SELECT b FROM c é a sub-quadra na instrução acima. Aqui, em nossa linguagem de consulta simples, indicaremos que uma declaração é uma sub-quadra se for fechada por um par de parênteses ( () ). Como nosso idioma só possui uma declaração selecionada, podemos escrever a regra de Parse como seguinte:
def subQuery : Parser [ Statement ] = " ( " ~> select <~ " ) "Agora vamos definir as regras de Parse para a instrução SELECT:
private def fromSource : Parser [ Statement ] = table ||| subQuery
private def select : Parser [ Select ] =
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep(
" JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)
) ^^ {
case _ ~ fields ~ _ ~ src ~ joins =>
val p = if (joins.nonEmpty) {
def chain ( left : Statement , right : Seq [( Statement , Seq [( FieldID , FieldID )])]) : Join = {
if (right.isEmpty) {
throw new Exception ( " should never happen " )
} else if (right.length == 1 ) {
val next = right.head
Join (left, next._1, next._2)
} else {
val next = right.head
Join (left, chain(next._1, right.tail), next._2)
}
}
val temp = joins.map { join =>
val statement = join._1._1._2
val joinOn = join._2.map(on => on._1._1 -> on._2)
statement -> joinOn
}
chain(src, temp)
} else {
src
}
Select (fields, p)
}No SQL, podemos usar um sub-quadro como fonte de junção. Por exemplo:
SELECT * . *
FROM tbl1
JOIN ( SELECT * . * FROM tbl2)
JOIN tbl3Portanto, nosso analisador também implementará regras para analisar a sub-quadra na parte de junção da declaração, é por isso que temos a regra de Parse:
" SELECT " ~ rep1sep(fieldId, " , " ) ~ " FROM " ~ fromSource ~ rep( " JOIN " ~ fromSource ~ " ON " ~ rep1(fieldId ~ " = " ~ fieldId)Veja Queryparser.scala para implementação completa
Veja Queryparserspec.scala
Depois de gerar o AST a partir da consulta de texto, podemos convertê -la diretamente no plano lógico
Primeiro, vamos definir a interface para o nosso plano lógico:
sealed trait LogicalPlan {
def children () : Seq [ LogicalPlan ]
}
children é a lista de um plano lógico infantil. Por exemplo:
Gráfico TD
1326583549 ["Projeto TBL1.ID, Tbl1.Field1, Tbl2.id, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"];
-425111028 ["junção"];
-349388609 ["Scan TBL1"];
1343755644 ["junção"];
-1043437086 ["Scan TBL2"];
-1402686787 ["Scan Tbl3"];
1326583549 -> -425111028;
-425111028 -> -349388609;
-425111028 -> 1343755644;
1343755644 -> -1043437086;
1343755644 -> -1402686787;
Os nós filhos do nó PROJECT é o primeiro nó JOIN . O primeiro nó JOIN tem 2 filhos, que são o segundo nó JOIN e SCAN tbl1 . Breve, ...
Como nossa linguagem de consulta é simples, precisamos apenas de três tipos de nó lógico:
case class Scan ( table : ql. TableID , projection : Seq [ String ]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq .empty
}
case class Project ( fields : Seq [ql. FieldID ], child : LogicalPlan ) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (child)
}
case class Join ( left : LogicalPlan , right : LogicalPlan , on : Seq [(ql. FieldID , ql. FieldID )]) extends LogicalPlan {
override def children () : Seq [ LogicalPlan ] = Seq (left, right)
}
Em seguida, podemos escrever a função para converter o AST em plano lógico:
def toPlan ( node : ql. Statement ) : LogicalPlan = {
node match {
case ql. Table (table) => Scan (table, Seq .empty)
case ql. Join (left, right, on) => Join (toPlan(left), toPlan(right), on)
case ql. Select (fields, from) => Project (fields, toPlan(from))
}
}Consulte LogicalPlan.scala para implementação completa
Podemos definir classes para o grupo como seguinte:
case class Group (
id : Long ,
equivalents : mutable. HashSet [ GroupExpression ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
var implementation : Option [ GroupImplementation ] = None
}
case class GroupExpression (
id : Long ,
plan : LogicalPlan ,
children : mutable. MutableList [ Group ]
) {
val explorationMark : ExplorationMark = new ExplorationMark
val appliedTransformations : mutable. HashSet [ TransformationRule ] = mutable. HashSet ()
}
Group é o conjunto de planos que são logicamente equivalentes.
Cada GroupExpression representa um nó do plano lógico. Como definimos um nó de plano lógico terá uma lista de nós filhos (na seção anterior), e a GroupExpression representa um nó de plano lógico, e o Group representa um conjunto de planos equivalentes, de modo que os filhos da GroupExpression são uma lista de Group
por exemplo
Gráfico TD
Grupo de subgrafias#8
EXPR#8
fim
Grupo de subgrafias#2
Expr#2
fim
Grupo de subgrafes#11
EXPR#11
fim
EXPR#11 -> Grupo#7
EXPR#11 -> Grupo nº 10
Grupo de subgrafias#5
EXPR#5
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
EXPR#4
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#7
EXPR#7
fim
Grupo de subgrafias#1
Expr#1
fim
Grupo de subgrafias#10
EXPR#10
fim
EXPR#10 -> Grupo#8
EXPR#10 -> Grupo#9
Grupo de subgrafias#9
EXPR#9
fim
Grupo de subgrafias#3
EXPR#3
fim
Grupo de subgrafias#6
EXPR#12
EXPR#6
fim
EXPR#12 -> Grupo#11
EXPR#6 -> Grupo#5
Como podemos ver aqui, o Group#6 tem 2 expressões equivalentes: Expr#12 e Expr#6 , e os filhos do Expr#12 é Group#11
Notas: Implementaremos várias transformações redondas na fase de exploração; portanto, para cada Group e GroupExpression , temos uma indicação ExplorationMark do status de exploração.
class ExplorationMark {
private var bits : Long = 0
def get : Long = bits
def isExplored ( round : Int ) : Boolean = BitUtils .getBit(bits, round)
def markExplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = true )
def markUnexplored ( round : Int ) : Unit = bits = BitUtils .setBit(bits, round, on = false )
}
ExplorationMark é apenas uma classe de wrapper Bitset, marcará I-Th como 1 se a rodada i-és é explorada, marque como 0 caso contrário.
ExplorationMark também pode ser usada para visualizar a transformação exata, consulte a visualização para obter mais detalhes
O memorando é um monte de ajudantes para ajudar a construir os grupos equivalentes. O memorando consiste em vários hashmap para cache a expressão do grupo e do grupo, também fornecem métodos para registrar um novo grupo ou expressão de grupo.
class Memo (
groupIdGenerator : Generator [ Long ] = new LongGenerator ,
groupExpressionIdGenerator : Generator [ Long ] = new LongGenerator
) {
val groups : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]()
val parents : mutable. HashMap [ Long , Group ] = mutable. HashMap [ Long , Group ]() // lookup group from group expression ID
val groupExpressions : mutable. HashMap [ LogicalPlan , GroupExpression ] = mutable. HashMap [ LogicalPlan , GroupExpression ]()
def getOrCreateGroupExpression ( plan : LogicalPlan ) : GroupExpression = {
val children = plan.children()
val childGroups = children.map(child => getOrCreateGroup(child))
groupExpressions.get(plan) match {
case Some (found) => found
case None =>
val id = groupExpressionIdGenerator.generate()
val children = mutable. MutableList () ++ childGroups
val expression = GroupExpression (
id = id,
plan = plan,
children = children
)
groupExpressions += plan -> expression
expression
}
}
def getOrCreateGroup ( plan : LogicalPlan ) : Group = {
val exprGroup = getOrCreateGroupExpression(plan)
val group = parents.get(exprGroup.id) match {
case Some (group) =>
group.equivalents += exprGroup
group
case None =>
val id = groupIdGenerator.generate()
val equivalents = mutable. HashSet () + exprGroup
val group = Group (
id = id,
equivalents = equivalents
)
groups.put(id, group)
group
}
parents += exprGroup.id -> group
group
}
}
Veja Memo.scala para implementação completa
O primeiro passo dentro do planejador é a inicialização
Gráfico LR
Consulta ((consulta))
AST ((AST))
root_plan ((rootplan))
root_group ((rootgroup))
Query -"queryparser.parse (consulta)" -> ast
AST -"LogicalPlan.toplan (AST)" -> root_plan
root_plan -"memor.getorCreateGroup (rootPlan)" -> root_group
Primeiro, a consulta será analisada no AST. Em seguida, convertido em plano lógico, chamado de root plan , inicialize o grupo do root plan , chamado root group .
def initialize ( query : Statement )( implicit ctx : VolcanoPlannerContext ) : Unit = {
ctx.query = query
ctx.rootPlan = LogicalPlan .toPlan(ctx.query)
ctx.rootGroup = ctx.memo.getOrCreateGroup(ctx.rootPlan)
// assuming this is first the exploration round,
// by marking the initialRound(0) as explored,
// it will be easier to visualize the different between rounds (added nodes, add connections)
ctx.memo.groups.values.foreach(_.explorationMark.markExplored(initialRound))
ctx.memo.groupExpressions.values.foreach(_.explorationMark.markExplored(initialRound))
}Veja vulcanoplanner.scala para obter mais detalhes
Por exemplo, a consulta:
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idApós a inicialização, os grupos serão parecidos com o seguinte:
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#6 -> Grupo#5
Aqui você pode ver que todo grupo tem exatamente uma expressão equivalente
Após a inicialização, agora é a fase de exploração, que explorará todos os planos equivalentes possíveis.
O método de exploração é bastante simples:
Antes de mergulhar no código de exploração, vamos falar sobre a regra de transformação primeiro.
A regra de transformação é uma regra usada para transformar um plano lógico em outro plano lógico equivalente se corresponder à condição da regra.
Aqui está a interface da regra de transformação:
trait TransformationRule {
def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean
def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression
}
Como o Plano Lógico é umaestrutura de datrastrutura semelhante a uma árvore, a implementação das regras de transformação match é a correspondência de padrões na árvore.
Por exemplo, aqui está a match usada para corresponder ao nó do projeto, enquanto também verifica se é descendentes que contêm apenas a união e a digitalização:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}Este plano é "comparado":
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#1
Expr#1 ["Scan"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan"]
fim
Grupo de subgrafias#6
Expr#6 ["Projeto"]
fim
EXPR#6 -> Grupo#5
Embora este plano não seja:
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#3
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["Scan"]
fim
Grupo de subgrafias#7
Expr#7 ["Projeto"]
fim
EXPR#7 -> Grupo#6
Grupo de subgrafias#1
Expr#1 ["Scan"]
fim
Grupo de subgrafias#3
Expr#3 ["Projeto"]
fim
EXPR#3 -> Grupo#2
Grupo de subgrafias#6
Expr#6 ["JONE"]
fim
EXPR#6 -> Grupo#1
EXPR#6 -> Grupo#5
Como dissemos antes, o método de exploração é:
E aqui está o código de exploração (bastante simples, hein):
private def exploreGroup (
group : Group ,
rules : Seq [ TransformationRule ],
round : Int
)( implicit ctx : VolcanoPlannerContext ) : Unit = {
while ( ! group.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
// explore all child groups
group.equivalents.foreach { equivalent =>
if ( ! equivalent.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
equivalent.children.foreach { child =>
exploreGroup(child, rules, round)
if (equivalent.explorationMark.isExplored(round) && child.explorationMark.isExplored(round)) {
equivalent.explorationMark.markExplored(round)
} else {
equivalent.explorationMark.markUnexplored(round)
}
}
}
// fire transformation rules to explore all the possible transformations
rules.foreach { rule =>
if ( ! equivalent.appliedTransformations.contains(rule) && rule.`match`(equivalent)) {
val transformed = rule.transform(equivalent)
if ( ! group.equivalents.contains(transformed)) {
group.equivalents += transformed
transformed.explorationMark.markUnexplored(round)
group.explorationMark.markUnexplored(round)
}
}
}
if (group.explorationMark.isExplored(round) && equivalent.explorationMark.isExplored(round)) {
group.explorationMark.markExplored(round)
} else {
group.explorationMark.markUnexplored(round)
}
}
}
}Veja vulcanoplanner.scala para obter mais detalhes
Agora é hora de implementar algumas regras de transformação
A projeção Pushdown é uma regra de transformação simples, usada para empurrar a projeção para baixo para a camada de armazenamento.
Por exemplo, a consulta
SELECT field1, field2
from tbltem o plano
Gráfico LR
Projeto [Projeto Field1, Field2]
Digitalizar [Scan TBL]
Projeto -> Scan
Com esse plano, ao executar, as linhas da camada de armazenamento (em Scan) serão totalmente buscadas e, em seguida, os campos desnecessários serão descartados (projeto). Os dados desnecessários ainda precisam passar do nó de digitalização para o nó do projeto, para que haja alguns esforços desperdiçados aqui.
Podemos melhorar, basta dizer à camada de armazenamento apenas buscar os campos necessários. Agora o plano será transformado em:
Gráfico LR
Projeto [Projeto Field1, Field2]
Digitalizar ["Scan TBL (Field1, Field2)"]
Projeto -> Scan
Vamos entrar no código:
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case Project (_, child) => check(child)
case _ => false
}
}
// check if the tree only contains SCAN and JOIN nodes
private def check ( node : LogicalPlan ) : Boolean = {
node match {
case Scan (_, _) => true
case Join (left, right, _) => check(left) && check(right)
case _ => false
}
}Nossa regra de pushdown de projeção aqui corresponderá ao plano quando for o nó do projeto, e todos os seus descendentes estão digitalizados e ingressarem apenas no nó.
Notas: Na verdade, a verdadeira correspondência de projeção é mais complexa, mas, por uma questão de simplicidade, a regra de correspondência aqui é apenas um nó do projeto com a digitalização e junte -se aos descendentes
E aqui está o código de transformação:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Project ]
val pushDownProjection = mutable. ListBuffer [ FieldID ]()
extractProjections(plan, pushDownProjection)
val newPlan = Project (plan.fields, pushDown(pushDownProjection.distinct, plan.child))
ctx.memo.getOrCreateGroupExpression(newPlan)
}
private def extractProjections ( node : LogicalPlan , buffer : mutable. ListBuffer [ FieldID ]) : Unit = {
node match {
case Scan (_, _) => () : Unit
case Project (fields, parent) =>
buffer ++= fields
extractProjections(parent, buffer)
case Join (left, right, on) =>
buffer ++= on.map(_._1) ++ on.map(_._2)
extractProjections(left, buffer)
extractProjections(right, buffer)
}
}
private def pushDown ( pushDownProjection : Seq [ FieldID ], node : LogicalPlan ) : LogicalPlan = {
node match {
case Scan (table, tableProjection) =>
val filteredPushDownProjection = pushDownProjection.filter(_.table == table).map(_.id)
val updatedProjection =
if (filteredPushDownProjection.contains( " * " ) || filteredPushDownProjection.contains( " *.* " )) {
Seq .empty
} else {
(tableProjection ++ filteredPushDownProjection).distinct
}
Scan (table, updatedProjection)
case Join (left, right, on) => Join (pushDown(pushDownProjection, left), pushDown(pushDownProjection, right), on)
case _ => throw new Exception ( " should never happen " )
}
}O código de transformação primeiro encontrará todas as projeções do nó do projeto raiz e depois as empurrará para todos os nós de digitalização.
Visualizando nossa regra, por exemplo, o plano
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#6 -> Grupo#5
Após a aplicação da transformação de pushdown de projeção, resultará em um novo plano equivalente com as projeções é empurrado para as operações de varredura (o novo plano é a árvore com nós de borda laranja).
Gráfico TD
Grupo de subgrafias#8
EXPR#8 ["Scan Tbl2 (ID, Field1, Field2)"]
fim
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafes#11
Expr#11 ["JONE"]
fim
EXPR#11 -> Grupo#7
EXPR#11 -> Grupo nº 10
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#7
Expr#7 ["Scan TBL1 (ID, Field1)"]
fim
Grupo de subgrafias#10
Expr#10 ["Join"]
fim
EXPR#10 -> Grupo#8
EXPR#10 -> Grupo#9
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#9
Expr#9 ["Scan TBL3 (ID, Field2)"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
EXPR#12 ["Projeto TBL1.id, Tbl1.Field1, Tbl2.id, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"]
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#12 -> Grupo#11
EXPR#6 -> Grupo#5
Estilo Expr#12 stroke largura: 4px, golpe: laranja
Estilo Expr#8 Width: 4px, golpe: laranja
Estilo Expr#10 Width: 4px, golpe: laranja
Estilo Expr#9 Width: 4px, golpe: laranja
Estilo Expr#11 Width: 4px, golpe: laranja
Estilo Expr#7 Width: 4px, golpe: laranja
linkstyle 0 stroke width: 4px, golpe: laranja
LinkStyle 1 Stroke-Width: 4px, AVC: laranja
LinkStyle 6 stroke largura: 4px, golpe: laranja
LinkStyle 7 Width: 4px, AVC: laranja
LinkStyle 8 Width: 4px, AVC: laranja
Veja projectionPushdown.scala para implementação completa
A Junst Reordes também é uma das transformações mais reconhecidas no mundo do planejador de consultas. Nosso planejador também implementará uma regra de transformação de reordenação.
Já que a ingresso em reordenação no mundo real não é uma peça fácil de implementar. Portanto, implementaremos uma versão simples e roubada da regra de reordenação de junção aqui.
Primeiro, a match da regra:
// check if the tree only contains SCAN and JOIN nodes, and also extract all SCAN nodes and JOIN conditions
private def checkAndExtract (
node : LogicalPlan ,
buffer : mutable. ListBuffer [ Scan ],
joinCondBuffer : mutable. ListBuffer [(ql. FieldID , ql. FieldID )]
) : Boolean = {
node match {
case node @ Scan (_, _) =>
buffer += node
true
case Join (left, right, on) =>
joinCondBuffer ++= on
checkAndExtract(left, buffer, joinCondBuffer) && checkAndExtract(right, buffer, joinCondBuffer)
case _ => false
}
}
private def buildInterchangeableJoinCond ( conditions : Seq [(ql. FieldID , ql. FieldID )]) : Seq [ Seq [ql. FieldID ]] = {
val buffer = mutable. ListBuffer [mutable. Set [ql. FieldID ]]()
conditions.foreach { cond =>
val set = buffer.find { set =>
set.contains(cond._1) || set.contains(cond._2)
} match {
case Some (set) => set
case None =>
val set = mutable. Set [ql. FieldID ]()
buffer += set
set
}
set += cond._1
set += cond._2
}
buffer.map(_.toSeq)
}
override def `match` ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Boolean = {
val plan = expression.plan
plan match {
case node @ Join (_, _, _) =>
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
if (checkAndExtract(node, buffer, joinCondBuffer)) {
// only match if the join is 3 tables join
if (buffer.size == 3 ) {
var check = true
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
interChangeableCond.foreach { c =>
check &= c.size == 3
}
check
} else {
false
}
} else {
false
}
case _ => false
}
} Nossa regra só será correspondida, se correspondermos à junção de três vias (o número de tabela envolvida deve ser 3, e a condição de junção deve ser de 3 vias, como tbl1.field1 = tbl2.field2 = tbl3.field3 )
Por exemplo,
tbl1
JOIN tbl2 ON tbl1 . field1 = tbl2 . field2
JOIN tbl3 ON tbl1 . field1 = tbl3 . field3 A declaração de junção aqui será "correspondente", pois é de três vias (é a junção entre tbl1 , tbl2 , tbl3 , e a condição é tbl1.field1 = tbl2.field2 = tbl3.field3 )
Em seguida, é o código de transformação:
override def transform ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : GroupExpression = {
val plan = expression.plan. asInstanceOf [ Join ]
val buffer = mutable. ListBuffer [ Scan ]()
val joinCondBuffer = mutable. ListBuffer [(ql. FieldID , ql. FieldID )]()
checkAndExtract(plan, buffer, joinCondBuffer)
val interChangeableCond = buildInterchangeableJoinCond(joinCondBuffer)
//
val scans = buffer.toList
implicit val ord : Ordering [ Scan ] = new Ordering [ Scan ] {
override def compare ( x : Scan , y : Scan ) : Int = {
val xStats = ctx.statsProvider.tableStats(x.table.id)
val yStats = ctx.statsProvider.tableStats(y.table.id)
xStats.estimatedTableSize.compareTo(yStats.estimatedTableSize)
}
}
def getJoinCond ( left : Scan , right : Scan ) : Seq [(ql. FieldID , ql. FieldID )] = {
val leftFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == left.table)
}
val rightFields = interChangeableCond.flatMap { c =>
c.filter(p => p.table == right.table)
}
if (leftFields.length != rightFields.length) {
throw new Exception ( s " leftFields.length( ${leftFields.length} ) != rightFields.length( ${rightFields.length} ) " )
} else {
leftFields zip rightFields
}
}
val sorted = scans.sorted
val newPlan = Join (
sorted( 0 ),
Join (
sorted( 1 ),
sorted( 2 ),
getJoinCond(sorted( 1 ), sorted( 2 ))
),
getJoinCond(sorted( 0 ), sorted( 1 ))
)
ctx.memo.getOrCreateGroupExpression(newPlan)
}O código de transformação aqui reordenará as tabelas pelo tamanho estimado.
Por exemplo, se tivermos 3 Tabelas A, B, C com tamanho estimado de 300b, 100b, 200b e uma declaração de junção A JOIN B JOIN C B JOIN C JOIN A
Notas: Você pode notar neste código, usamos estatísticas de tabela, para fornecer uma dica para transformar o plano. Na prática, o planejador pode usar todos os tipos de estatísticas para ajudar sua transformação, como tamanho da tabela, tamanho da linha, contagem de nulo, histograma, etc.
Visualizando nossa regra, por exemplo, o plano
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#6 -> Grupo#5
Após a transformação de reordenação de ingresso, resultando em
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
Expr#8 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
EXPR#8 -> Grupo#2
EXPR#8 -> Grupo#7
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#7
Expr#7 ["JONE"]
fim
EXPR#7 -> Grupo#1
EXPR#7 -> Grupo#3
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#6 -> Grupo#5
Estilo Expr#8 Width: 4px, golpe: laranja
Estilo Expr#7 Width: 4px, golpe: laranja
LinkStyle 2 stroke largura: 4px, golpe: laranja
LinkStyle 6 stroke largura: 4px, golpe: laranja
LinkStyle 3 Width: 4px, AVC: laranja
LinkStyle 7 Width: 4px, AVC: laranja
Podemos ver que tbl2 JOIN tbl1 JOIN tbl3 é criada a partir tbl1 JOIN tbl2 JOIN tbl3 é gerado pela transformação (os nós recém -adicionados e bordas são indicados por linhas laranja)
Consulte X3TableJoinReordyBysize.scala para implementação completa
Agora podemos colocar nossas regras de transformação em um só lugar
private val transformationRules : Seq [ Seq [ TransformationRule ]] = Seq (
Seq ( new ProjectionPushDown ),
Seq ( new X3TableJoinReorderBySize )
)E executá -los para explorar os grupos equivalentes
for (r <- transformationRules.indices) {
exploreGroup(ctx.rootGroup, transformationRules(r), r + 1 )
}Por exemplo, o plano
Gráfico TD
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#6
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#6 -> Grupo#5
Depois de ser explorado, resultará neste gráfico
Gráfico TD
Grupo de subgrafias#8
EXPR#8 ["Scan Tbl2 (ID, Field1, Field2)"]
fim
Grupo de subgrafes#11
Expr#11 ["JONE"]
EXPR#14 ["Join"]
fim
EXPR#11 -> Grupo#7
EXPR#11 -> Grupo nº 10
EXPR#14 -> Grupo#8
EXPR#14 -> Grupo#12
Grupo de subgrafias#2
Expr#2 ["Scan TBL2"]
fim
Grupo de subgrafias#5
Expr#5 ["JONE"]
Expr#16 ["Join"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
EXPR#16 -> Grupo#2
EXPR#16 -> Grupo#13
Grupo de subgrafias#4
Expr#4 ["JONE"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#13
Expr#15 ["Join"]
fim
EXPR#15 -> Grupo#1
EXPR#15 -> Grupo#3
Grupo de subgrafias#7
Expr#7 ["Scan TBL1 (ID, Field1)"]
fim
Grupo de subgrafias#1
Expr#1 ["Scan TBL1"]
fim
Grupo de subgrafias#10
Expr#10 ["Join"]
fim
EXPR#10 -> Grupo#8
EXPR#10 -> Grupo#9
Grupo de subgrafias#9
Expr#9 ["Scan TBL3 (ID, Field2)"]
fim
Grupo de subgrafias#3
Expr#3 ["Scan Tbl3"]
fim
Grupo de subgrafias#12
Expr#13 ["Join"]
fim
EXPR#13 -> Grupo#7
EXPR#13 -> Grupo#9
Grupo de subgrafias#6
EXPR#12 ["Projeto TBL1.id, Tbl1.Field1, Tbl2.id, Tbl2.Field1, Tbl2.Field2, Tbl3.Id, Tbl3.Field2, Tbl3.Field2"]
Expr#6 ["Project tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
EXPR#12 -> Grupo#11
EXPR#6 -> Grupo#5
Estilo Expr#12 stroke largura: 4px, golpe: laranja
Estilo Expr#8 Width: 4px, golpe: laranja
Estilo Expr#10 Width: 4px, golpe: laranja
Estilo Expr#13 Width: 4px, golpe: laranja
Estilo Expr#14 Width: 4px, golpe: laranja
Estilo Expr#11 Width: 4px, golpe: laranja
Estilo Expr#9 Width: 4px, golpe: laranja
Estilo Expr#15 STAKE-LIMA: 4PX, AVC: laranja
Estilo Expr#7 Width: 4px, golpe: laranja
Estilo Expr#16 Width: 4px, golpe: laranja
linkstyle 0 stroke width: 4px, golpe: laranja
LinkStyle 15 Witth Width: 4px, AVC: laranja
LinkStyle 12 stroke largura: 4px, golpe: laranja
LinkStyle 1 Stroke-Width: 4px, AVC: laranja
LinkStyle 16 Stroke Width: 4px, AVC: laranja
LinkStyle 13 Width: 4px, AVC: laranja
LinkStyle 2 stroke largura: 4px, golpe: laranja
LinkStyle 6 stroke largura: 4px, golpe: laranja
LinkStyle 3 Width: 4px, AVC: laranja
LinkStyle 10 Width: 4px, AVC: laranja
LinkStyle 7 Width: 4px, AVC: laranja
LinkStyle 14 largura de derrame: 4px, golpe: laranja
Linkstyle 11 largura de derrame: 4px, golpe: laranja
Veja vulcanoplanner.scala para obter mais detalhes
Após a fase de exploração, agora temos uma árvore totalmente expandida contendo todos os planos possíveis, agora é a fase de otimização.
Nesta fase, encontraremos o melhor plano para o nosso grupo raiz. O processo de otimização é descrito como seguinte:
Aqui está um exemplo
Gráfico TD
Grupo de subgrafias#2 ["Grupo#2 (custo = 1)"]
Expr#2 ["expr#2 (custo = 1)"]
fim
Grupo de subgrafias#5 ["Grupo#5 (custo = 3)"]
Expr#5 ["expr#5 (custo = max (3,2) = 3"]
fim
EXPR#5 -> Grupo#1
EXPR#5 -> Grupo#4
Grupo de subgrafias#4 ["Grupo#4 (custo = 2)"]
Expr#4 ["expr#4 (custo = max (1,2) = 2)"]
Expr#7 ["expr#7 (custo = 1+2 = 3)"]
fim
EXPR#4 -> Grupo#2
EXPR#4 -> Grupo#3
Grupo de subgrafias#1 ["Grupo nº 1 (custo = 3)"]
Expr#1 ["expr#1 (custo = 3)"]
fim
Grupo de subgrafes#3 ["Grupo#3 (custo = 2)"]
Expr#3 ["expr#3 (custo = 2)"]
fim
Grupo de subgrafes#6 ["Grupo#6 (custo = 4.5)"]
EXPR#6 ["EXPR#6 (custo = 3*1.5 = 4.5)"]
fim
EXPR#6 -> Grupo#5
Grupo de subgrafes#8 ["Grupo#8 (custo = 1)"]
Expr#8 ["expr#8 (custo = 1)"]
fim
Grupo de subgrafias#9 ["Grupo#9 (custo = 2)"]
Expr#9 ["expr#9 (custo = 2)"]
fim
EXPR#7 -> Grupo#8
EXPR#7 -> Grupo#9
Por exemplo, o custo Expr#4 é calculado pelos custos do grupo filho ( Group#2 e Group#3 ) usando a função max . Outro exemplo, é o Group#4 , seu custo é calculado calculando o valor mínimo entre os custos de suas expressões equivalentes.
Como o objetivo da fase de otimização é produzir o melhor plano físico, dadas as expressões de grupo exploradas. Podemos definir o plano físico como seguinte:
sealed trait PhysicalPlan {
def operator () : Operator
def children () : Seq [ PhysicalPlan ]
def cost () : Cost
def estimations () : Estimations
def traits () : Set [ String ]
}
O operator é o operador físico, usado para executar o plano, abordaremos -o na seção posterior. Então children são a lista de nós do plano infantil, eles estão acostumados a participar do processo de cálculo de custos. O terceiro atributo é cost , cost é um objeto que mantém informações de custo (como custo da CPU, custo da memória, custo de IO etc.). estimations são as estatísticas estimadas da propriedade que mantêm estatísticas sobre o plano (como contagem de linhas, tamanho da linha etc.), também está participando do cálculo de custos. Finalmente, traits são um conjunto de características físicas, que afetam a regra de implementação para afetar o processo de geração do plano físico.
Em seguida, podemos implementar as classes de nós físicos:
case class Scan (
operator : Operator ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq .empty // scan do not receive any child
}
case class Project (
operator : Operator ,
child : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (child)
}
case class Join (
operator : Operator ,
leftChild : PhysicalPlan ,
rightChild : PhysicalPlan ,
cost : Cost ,
estimations : Estimations ,
traits : Set [ String ] = Set .empty
) extends PhysicalPlan {
override def children () : Seq [ PhysicalPlan ] = Seq (leftChild, rightChild)
}
Veja físicoPlan.scala para implementação completa
A primeira coisa na fase de otimização, ou seja, temos que implementar as regras de implementação. A regra de implementação é a regra de converter do plano lógico em planos físicos sem executá -los.
Como não estamos executando diretamente o plano físico no planejador, devolveremos o construtor de planos físicos, também é mais fácil personalizar a função de custo para cada nó.
Aqui está a interface da regra de implementação:
trait PhysicalPlanBuilder {
def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ]
}
trait ImplementationRule {
def physicalPlanBuilders ( expression : GroupExpression )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ]
}
Aqui, o PhysicalPlanBuilder é a interface usada para construir o plano físico, dados os planos físicos da criança.
Por exemplo, a junção lógica tem 2 implementações físicas são hash se juntando e junção de mesclagem
Gráfico TD
Criança#1 ["Child#1"]
Criança#2 ["Criança#2"]
Criança#3 ["Child#3"]
Criança#4 ["Child#4"]
hash_join ["` hash junção
custo = f (custo (criança nº 1), custo (criança nº 2))
`"]
Merge_join ["` Mescamação de junção
cost=g(cost(child#3),cost(child#4))
`"]
hash_join --> child#1
hash_join --> child#2
merge_join --> child#3
merge_join --> child#4
the HASH JOIN cost is using function f() to calculate cost, and MERGE JOIN is using function g() to calculate cost, both are using its children as function parameters. So it's easier to code if we're returning just the phyiscal plan builder from the implementation rule instead of the physical plan.
As we've said before, the optimization process is described as following:
And here is the code:
private def implementGroup ( group : Group , combinedRule : ImplementationRule )(
implicit ctx : VolcanoPlannerContext
) : GroupImplementation = {
group.implementation match {
case Some (implementation) => implementation
case None =>
var bestImplementation = Option .empty[ GroupImplementation ]
group.equivalents.foreach { equivalent =>
val physicalPlanBuilders = combinedRule.physicalPlanBuilders(equivalent)
val childPhysicalPlans = equivalent.children.map { child =>
val childImplementation = implementGroup(child, combinedRule)
child.implementation = Option (childImplementation)
childImplementation.physicalPlan
}
// calculate the implementation, and update the best cost for group
physicalPlanBuilders.flatMap(_.build(childPhysicalPlans)).foreach { physicalPlan =>
val cost = physicalPlan.cost()
bestImplementation match {
case Some (currentBest) =>
if (ctx.costModel.isBetter(currentBest.cost, cost)) {
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
case None =>
bestImplementation = Option (
GroupImplementation (
physicalPlan = physicalPlan,
cost = cost,
selectedEquivalentExpression = equivalent
)
)
}
}
}
bestImplementation.get
}
}This code is an exhaustive search code, which is using recursive function to traverse all nodes. At each node (group), the function is called once to get its best plan while also calculate the optimal cost of that group.
Finally, the best plan for our query is the best plan of the root group
val implementationRules = new ImplementationRule {
override def physicalPlanBuilders (
expression : GroupExpression
)( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
expression.plan match {
case node @ Scan (_, _) => implement. Scan (node)
case node @ Project (_, _) => implement. Project (node)
case node @ Join (_, _, _) => implement. Join (node)
}
}
}
ctx.rootGroup.implementation = Option (implementGroup(ctx.rootGroup, implementationRules))See VolcanoPlanner.scala for full implementation
Here is an example of the plan after optimization, it's shown the selected logical node, the selected physical operator, and the estimated cost
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Next, we will implement some implementation rules.
The first, easiest one is the implementation rule of logical PROJECT
object Project {
def apply ( node : logicalplan. Project )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
Seq (
new ProjectionImpl (node.fields)
)
}
}
class ProjectionImpl ( projection : Seq [ql. FieldID ]) extends PhysicalPlanBuilder {
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val child = children.head
val selfCost = Cost (
estimatedCpuCost = 0 ,
estimatedMemoryCost = 0 ,
estimatedTimeCost = 0
) // assuming the cost of projection is 0
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + child.cost().estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + child.cost().estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + child.cost().estimatedTimeCost
)
val estimations = Estimations (
estimatedLoopIterations = child.estimations().estimatedLoopIterations,
estimatedRowSize = child.estimations().estimatedRowSize // just guessing the value
)
Some (
Project (
operator = ProjectOperator (projection, child.operator()),
child = child,
cost = cost,
estimations = estimations,
traits = child.traits()
)
)
}
}
The implementation rule for logical PROJECT, is returning one physical plan builder ProjectionImpl . ProjectionImpl cost calculation is simple, it just inherits the cost from the child node (because the projection is actually not doing any intensive operation). Beside that, it also updates the estimation (in this code, estimation is also inherit from the child node)
See Project.scala for full implementation
Writing implementation rule for logical JOIN is way harder than PROJECTION.
One first reason is, a logical JOIN has many physical implementation, such as HASH JOIN, MERGE JOIN, BROADCAST JOIN, etc.
The second reason is, estimating cost for physical JOIN is also hard, because it depends on lots of factors such as row count, row size, data histogram, indexes, data layout, etc.
So, to keep everything simple in this guide, I will only implement 2 physical JOIN: HASH JOIN and MERGE JOIN. The cost estimation functions are fictional (just to show how it works, I'm not trying to correct it). And in the MERGE JOIN, all data is assuming to be sorted by join key.
Here is the code:
object Join {
def apply ( node : logicalplan. Join )( implicit ctx : VolcanoPlannerContext ) : Seq [ PhysicalPlanBuilder ] = {
val leftFields = node.on.map(_._1).map(f => s " ${f.table.id} . ${f.id} " )
val rightFields = node.on.map(_._2).map(f => s " ${f.table.id} . ${f.id} " )
Seq (
new HashJoinImpl (leftFields, rightFields),
new MergeJoinImpl (leftFields, rightFields)
)
}
}
The HASH JOIN:
class HashJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
private def viewSize ( plan : PhysicalPlan ) : Long = {
plan.estimations().estimatedLoopIterations * plan.estimations().estimatedRowSize
}
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
// reorder the child nodes, the left child is the child with smaller view size (smaller than the right child if we're store all of them in memory)
val (leftChild, rightChild) = if (viewSize(children( 0 )) < viewSize(children( 1 ))) {
(children( 0 ), children( 1 ))
} else {
(children( 1 ), children( 0 ))
}
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = HashJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = Set .empty // don't inherit trait from children since we're hash join
)
)
}
}
We can see that the cost function of HASH JOIN is composed of its children costs and estimations
val selfCost = Cost (
estimatedCpuCost = leftChild.estimations().estimatedLoopIterations, // cost to hash all record from the smaller view
estimatedMemoryCost = viewSize(leftChild), // hash the smaller view, we need to hold the hash table in memory
estimatedTimeCost = rightChild.estimations().estimatedLoopIterations
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)Next, the MERGE JOIN:
class MergeJoinImpl ( leftFields : Seq [ String ], rightFields : Seq [ String ]) extends PhysicalPlanBuilder {
// noinspection ZeroIndexToHead,DuplicatedCode
override def build ( children : Seq [ PhysicalPlan ]) : Option [ PhysicalPlan ] = {
val (leftChild, rightChild) = (children( 0 ), children( 1 ))
if (leftChild.traits().contains( " SORTED " ) && rightChild.traits().contains( " SORTED " )) {
val estimatedTotalRowCount =
leftChild.estimations().estimatedLoopIterations +
rightChild.estimations().estimatedLoopIterations
val estimatedLoopIterations = Math .max(
leftChild.estimations().estimatedLoopIterations,
rightChild.estimations().estimatedLoopIterations
) // just guessing the value
val estimatedOutRowSize = leftChild.estimations().estimatedRowSize + rightChild.estimations().estimatedRowSize
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)
Some (
Join (
operator = MergeJoinOperator (
leftChild.operator(),
rightChild.operator(),
leftFields,
rightFields
),
leftChild = leftChild,
rightChild = rightChild,
cost = cost,
estimations = estimations,
traits = leftChild.traits() ++ rightChild.traits()
)
)
} else {
None
}
}
}
Same with HASH JOIN, MERGE JOIN also uses its children costs and estimations to calculate its cost, but with different formulla:
val selfCost = Cost (
estimatedCpuCost = 0 , // no additional cpu cost, just scan from child iterator
estimatedMemoryCost = 0 , // no additional memory cost
estimatedTimeCost = estimatedTotalRowCount
)
val childCosts = Cost (
estimatedCpuCost = leftChild.cost().estimatedCpuCost + rightChild.cost().estimatedCpuCost,
estimatedMemoryCost = leftChild.cost().estimatedMemoryCost + rightChild.cost().estimatedMemoryCost,
estimatedTimeCost = 0
)
val estimations = Estimations (
estimatedLoopIterations = estimatedLoopIterations,
estimatedRowSize = estimatedOutRowSize
)
val cost = Cost (
estimatedCpuCost = selfCost.estimatedCpuCost + childCosts.estimatedCpuCost,
estimatedMemoryCost = selfCost.estimatedMemoryCost + childCosts.estimatedMemoryCost,
estimatedTimeCost = selfCost.estimatedTimeCost + childCosts.estimatedTimeCost
)See HashJoinImpl.scala and MergeJoinImpl.scala for full implementation
You can see other rules and physical plan builders here:
Now, after done implementing the implementation rules, now we can find our best plan. Let's start over from the user query
SELECT tbl1 . id ,
tbl1 . field1 ,
tbl2 . id ,
tbl2 . field1 ,
tbl2 . field2 ,
tbl3 . id ,
tbl3 . field2 ,
tbl3 . field2
FROM tbl1
JOIN tbl2 ON tbl1 . id = tbl2 . id
JOIN tbl3 ON tbl2 . id = tbl3 . idwill be converted to the logical plan
graph TD
1326583549["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"];
-425111028["JOIN"];
-349388609["SCAN tbl1"];
1343755644["JOIN"];
-1043437086["SCAN tbl2"];
-1402686787["SCAN tbl3"];
1326583549 --> -425111028;
-425111028 --> -349388609;
-425111028 --> 1343755644;
1343755644 --> -1043437086;
1343755644 --> -1402686787;
After exploration phase, it will generate lots of equivalent plans
graph TD
subgraph Group#8
Expr#8["SCAN tbl2 (id, field1, field2)"]
fim
subgraph Group#11
Expr#11["JOIN"]
Expr#14["JOIN"]
fim
Expr#11 --> Group#7
Expr#11 --> Group#10
Expr#14 --> Group#8
Expr#14 --> Group#12
subgraph Group#2
Expr#2["SCAN tbl2"]
fim
subgraph Group#5
Expr#5["JOIN"]
Expr#16["JOIN"]
fim
Expr#5 --> Group#1
Expr#5 --> Group#4
Expr#16 --> Group#2
Expr#16 --> Group#13
subgraph Group#4
Expr#4["JOIN"]
fim
Expr#4 --> Group#2
Expr#4 --> Group#3
subgraph Group#13
Expr#15["JOIN"]
fim
Expr#15 --> Group#1
Expr#15 --> Group#3
subgraph Group#7
Expr#7["SCAN tbl1 (id, field1)"]
fim
subgraph Group#1
Expr#1["SCAN tbl1"]
fim
subgraph Group#10
Expr#10["JOIN"]
fim
Expr#10 --> Group#8
Expr#10 --> Group#9
subgraph Group#9
Expr#9["SCAN tbl3 (id, field2)"]
fim
subgraph Group#3
Expr#3["SCAN tbl3"]
fim
subgraph Group#12
Expr#13["JOIN"]
fim
Expr#13 --> Group#7
Expr#13 --> Group#9
subgraph Group#6
Expr#12["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
Expr#6["PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2"]
fim
Expr#12 --> Group#11
Expr#6 --> Group#5
style Expr#12 stroke-width: 4px, stroke: orange
style Expr#8 stroke-width: 4px, stroke: orange
style Expr#10 stroke-width: 4px, stroke: orange
style Expr#13 stroke-width: 4px, stroke: orange
style Expr#14 stroke-width: 4px, stroke: orange
style Expr#11 stroke-width: 4px, stroke: orange
style Expr#9 stroke-width: 4px, stroke: orange
style Expr#15 stroke-width: 4px, stroke: orange
style Expr#7 stroke-width: 4px, stroke: orange
style Expr#16 stroke-width: 4px, stroke: orange
linkStyle 0 stroke-width: 4px, stroke: orange
linkStyle 15 stroke-width: 4px, stroke: orange
linkStyle 12 stroke-width: 4px, stroke: orange
linkStyle 1 stroke-width: 4px, stroke: orange
linkStyle 16 stroke-width: 4px, stroke: orange
linkStyle 13 stroke-width: 4px, stroke: orange
linkStyle 2 stroke-width: 4px, stroke: orange
linkStyle 6 stroke-width: 4px, stroke: orange
linkStyle 3 stroke-width: 4px, stroke: orange
linkStyle 10 stroke-width: 4px, stroke: orange
linkStyle 7 stroke-width: 4px, stroke: orange
linkStyle 14 stroke-width: 4px, stroke: orange
linkStyle 11 stroke-width: 4px, stroke: orange
And the at optimization phase, a final best plan is chose
graph TD
Group#6["
Group #6
Selected: PROJECT tbl1.id, tbl1.field1, tbl2.id, tbl2.field1, tbl2.field2, tbl3.id, tbl3.field2, tbl3.field2
Operator: ProjectOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#6 --> Group#11
Group#11["
Group #11
Selected: JOIN
Operator: HashJoinOperator
Cost: Cost(cpu=641400.00, mem=1020400012.00, time=1000000.00)
"]
Group#11 --> Group#7
Group#11 --> Group#10
Group#7["
Group #7
Selected: SCAN tbl1 (id, field1)
Operator: NormalScanOperator
Cost: Cost(cpu=400.00, mem=400000.00, time=1000.00)
"]
Group#10["
Group #10
Selected: JOIN
Operator: MergeJoinOperator
Traits: SORTED
Cost: Cost(cpu=640000.00, mem=20000012.00, time=1100000.00)
"]
Group#10 --> Group#8
Group#10 --> Group#9
Group#8["
Group #8
Selected: SCAN tbl2 (id, field1, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=600000.00, mem=12.00, time=1000000.00)
"]
Group#9["
Group #9
Selected: SCAN tbl3 (id, field2)
Operator: NormalScanOperator
Traits: SORTED
Cost: Cost(cpu=40000.00, mem=20000000.00, time=100000.00)
"]
Now we've done building a functional query planner which can optimize the query from user, but our query plan could not run by itself. So it's the reason why now we will implement the query processor to test out our query plan.
Basically the query process receive input from the query planner, and execute them
graph LR
plan(("Physical Plan"))
storage[("Storage Layer")]
processor["Query Processor"]
plan -- execute --> processor
storage -- fetch --> processor
Volcano/iterator model is the query processing model that is widely used in many DBMS. It is a pipeline architecture, which means that the data is processed in stages, with each stage passing the output of the previous stage to the next stage.
Each stage in the pipeline is represented by an operator. Operators are functions that perform a specific operation on the data, such as selecting rows, filtering rows, or aggregating rows.
Usually, operator can be formed directly from the query plan. For example, the query
SELECT field_1
FROM tbl
WHERE field = 1will have the plan
graph TD
project["PROJECT: field_1"]
scan["SCAN: tbl"]
filter["FILTER: field = 1"]
project --> scan
filter --> project
will create a chain of operators like this:
scan = {
next() // fetch next row from table "tbl"
}
project = {
next() = {
next_row = scan.next() // fetch next row from scan operator
projected = next_row["field_1"]
return projected
}
}
filter = {
next() = {
next_row = {}
do {
next_row = project.next() // fetch next row from project operator
} while (next_row["field"] != 1)
return next_row
}
}
results = []
while (row = filter.next()) {
results.append(row)
}
notes : this pseudo code did not handle for end of result stream
The basic interface of an operator is described as following:
trait Operator {
def next () : Option [ Seq [ Any ]]
}
See Operator.scala for full implementation of all operators
Let's define a query
SELECT emp . id ,
emp . code ,
dept . dept_name ,
emp_info . name ,
emp_info . origin
FROM emp
JOIN dept ON emp . id = dept . emp_id
JOIN emp_info ON dept . emp_id = emp_info . idwith some data and stats
val table1 : Datasource = Datasource (
table = " emp " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" code " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id
),
rows = Seq (
Seq ( " 1 " , " Emp A " ),
Seq ( " 2 " , " Emp B " ),
Seq ( " 3 " , " Emp C " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " code " -> 32 )
)
)
val table2 : Datasource = Datasource (
table = " dept " ,
catalog = TableCatalog (
Seq (
" emp_id " -> classOf [ String ],
" dept_name " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by emp_id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " Dept 1 " ),
Seq ( " 1 " , " Dept 2 " ),
Seq ( " 2 " , " Dept 3 " ),
Seq ( " 3 " , " Dept 3 " )
),
stats = TableStats (
estimatedRowCount = 4 ,
avgColumnSize = Map ( " emp_id " -> 10 , " dept_name " -> 255 )
)
)
val table3 : Datasource = Datasource (
table = " emp_info " ,
catalog = TableCatalog (
Seq (
" id " -> classOf [ String ],
" name " -> classOf [ String ],
" origin " -> classOf [ String ]
),
metadata = Map ( " sorted " -> " true " ) // assumes rows are already sorted by id (this is just a fake trait to demonstrate how trait works)
),
rows = Seq (
Seq ( " 1 " , " AAAAA " , " Country A " ),
Seq ( " 2 " , " BBBBB " , " Country A " ),
Seq ( " 3 " , " CCCCC " , " Country B " )
),
stats = TableStats (
estimatedRowCount = 3 ,
avgColumnSize = Map ( " id " -> 10 , " name " -> 255 , " origin " -> 255 )
)
)The cost model is optimized for CPU
val costModel : CostModel = ( currentCost : Cost , newCost : Cost ) => {
currentCost.estimatedCpuCost > newCost.estimatedCpuCost
}Now, executing the query by running this code:
val planner = new VolcanoPlanner
QueryParser .parse(query) match {
case Left (err) => throw err
case Right (parsed) =>
val operator = planner.getPlan(parsed)
val result = Utils .execute(operator)
// print result
println(result._1.mkString( " , " ))
result._2.foreach(row => println(row.mkString( " , " )))
}it will print:
emp.id,emp.code,dept.dept_name,emp_info.name,emp_info.origin
1,Emp A,Dept 1,AAAAA,Country A
1,Emp A,Dept 2,AAAAA,Country A
2,Emp B,Dept 3,BBBBB,Country A
3,Emp C,Dept 3,CCCCC,Country B
Voila, We've done building a fully functional query planner and query engine :). You can start writing one for your own, good luck
See Demo.scala for full demo code
Thanks for reading this, this guide is quite long, and not fully correct, but I've tried my best to write it as understandably as possible ?