Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Query Planner

The source code discussed in this chapter can be found in the query-planner module of the KQuery project.

We now have logical plans that describe what to compute and physical plans that describe how to compute it. The query planner bridges these: it takes a logical plan and produces a physical plan that can be executed.

What the Query Planner Does

The query planner walks the logical plan tree and creates a corresponding physical plan tree. For each logical operator, it creates the appropriate physical operator. For each logical expression, it creates the corresponding physical expression.

Some translations are straightforward. A logical Scan becomes a physical ScanExec. A logical Add expression becomes a physical AddExpression.

Other translations involve choices. A logical Aggregate could become a HashAggregateExec or a SortAggregateExec depending on whether the input is sorted. A logical Join could become a hash join, sort-merge join, or nested loop join. These decisions affect performance significantly.

KQuery’s query planner is simple: it makes fixed choices (always hash aggregate, for example). Production query planners use cost-based optimization to estimate which physical plan will be fastest.

The QueryPlanner Class

The planner has two main methods: one for plans, one for expressions.

class QueryPlanner {

    fun createPhysicalPlan(plan: LogicalPlan): PhysicalPlan {
        return when (plan) {
            is Scan -> ...
            is Selection -> ...
            is Projection -> ...
            is Aggregate -> ...
            else -> throw IllegalStateException("Unknown plan: $plan")
        }
    }

    fun createPhysicalExpr(expr: LogicalExpr, input: LogicalPlan): Expression {
        return when (expr) {
            is Column -> ...
            is LiteralLong -> ...
            is BinaryExpr -> ...
            else -> throw IllegalStateException("Unknown expression: $expr")
        }
    }
}

Both methods use pattern matching to dispatch on the type. Both are recursive: translating a Projection requires translating its input plan, and translating a BinaryExpr requires translating its child expressions.

Translating Expressions

Column References

Logical expressions reference columns by name. Physical expressions use column indices for efficiency. The planner performs this lookup:

is Column -> {
    val i = input.schema().fields.indexOfFirst { it.name == expr.name }
    if (i == -1) {
        throw SQLException("No column named '${expr.name}'")
    }
    ColumnExpression(i)
}

If the column does not exist in the input schema, we throw an error. This should not happen if the logical plan was validated, but the check provides a safety net.

Literals

Literal translations are trivial since we just copy the value:

is LiteralLong -> LiteralLongExpression(expr.n)
is LiteralDouble -> LiteralDoubleExpression(expr.n)
is LiteralString -> LiteralStringExpression(expr.str)

Binary Expressions

Binary expressions require recursively translating both operands, then creating the appropriate physical operator:

is BinaryExpr -> {
    val l = createPhysicalExpr(expr.l, input)
    val r = createPhysicalExpr(expr.r, input)
    when (expr) {
        // Comparison
        is Eq -> EqExpression(l, r)
        is Neq -> NeqExpression(l, r)
        is Gt -> GtExpression(l, r)
        is GtEq -> GtEqExpression(l, r)
        is Lt -> LtExpression(l, r)
        is LtEq -> LtEqExpression(l, r)

        // Boolean
        is And -> AndExpression(l, r)
        is Or -> OrExpression(l, r)

        // Math
        is Add -> AddExpression(l, r)
        is Subtract -> SubtractExpression(l, r)
        is Multiply -> MultiplyExpression(l, r)
        is Divide -> DivideExpression(l, r)

        else -> throw IllegalStateException("Unsupported: $expr")
    }
}

Aliases

Aliases are interesting: they have no physical representation. An alias just gives a name to an expression for use in planning. At execution time, we evaluate the underlying expression:

is Alias -> {
    // Aliases only affect naming during planning, not execution
    createPhysicalExpr(expr.expr, input)
}

Translating Plans

Scan

Scan is the simplest translation. We pass the data source and projection through:

is Scan -> {
    ScanExec(plan.dataSource, plan.projection)
}

Selection (Filter)

Selection translates the input plan and the filter expression:

is Selection -> {
    val input = createPhysicalPlan(plan.input)
    val filterExpr = createPhysicalExpr(plan.expr, plan.input)
    SelectionExec(input, filterExpr)
}

Note that createPhysicalExpr receives plan.input (the logical input), not input (the physical input). We need the logical schema to resolve column names to indices.

Projection

Projection translates the input and each projection expression:

is Projection -> {
    val input = createPhysicalPlan(plan.input)
    val projectionExpr = plan.expr.map { createPhysicalExpr(it, plan.input) }
    val projectionSchema = Schema(plan.expr.map { it.toField(plan.input) })
    ProjectionExec(input, projectionSchema, projectionExpr)
}

We derive the output schema from the logical expressions since they know their output types.

Aggregate

Aggregate translation involves grouping expressions and aggregate functions:

is Aggregate -> {
    val input = createPhysicalPlan(plan.input)
    val groupExpr = plan.groupExpr.map { createPhysicalExpr(it, plan.input) }
    val aggregateExpr = plan.aggregateExpr.map {
        when (it) {
            is Max -> MaxExpression(createPhysicalExpr(it.expr, plan.input))
            is Min -> MinExpression(createPhysicalExpr(it.expr, plan.input))
            is Sum -> SumExpression(createPhysicalExpr(it.expr, plan.input))
            is Avg -> AvgExpression(createPhysicalExpr(it.expr, plan.input))
            is Count -> CountExpression(createPhysicalExpr(it.expr, plan.input))
            else -> throw IllegalStateException("Unsupported: $it")
        }
    }
    HashAggregateExec(input, groupExpr, aggregateExpr, plan.schema())
}

Notice that we always create HashAggregateExec. A more sophisticated planner might choose SortAggregateExec when the input is already sorted by the grouping columns.

A Complete Example

Consider this query:

SELECT department, AVG(salary)
FROM employees
WHERE state = 'CO'
GROUP BY department

The logical plan:

Aggregate: groupBy=[#department], aggr=[AVG(#salary)]
    Selection: #state = 'CO'
        Scan: employees

The query planner walks this top-down:

  1. Aggregate: Create physical plan for input, translate expressions
    • Recurse to translate Selection
  2. Selection: Create physical plan for input, translate filter expression
    • Recurse to translate Scan
  3. Scan: Create ScanExec directly

Building physical plans bottom-up:

  1. ScanExec(employeesDataSource, [])
  2. SelectionExec(scanExec, EqExpression(ColumnExpression(2), LiteralStringExpression("CO")))
  3. HashAggregateExec(selectionExec, [ColumnExpression(0)], [AvgExpression(ColumnExpression(3))], schema)

The result is an executable physical plan.

Where Optimization Fits

The query planner shown here does direct translation. Each logical operator becomes exactly one physical operator.

In practice, optimization happens between logical planning and physical planning:

  1. Parse SQL to logical plan
  2. Optimize the logical plan (reorder joins, push down predicates, etc.)
  3. Translate optimized logical plan to physical plan

Alternatively, some systems perform physical optimization:

  1. Parse SQL to logical plan
  2. Generate multiple candidate physical plans
  3. Estimate cost of each
  4. Choose the cheapest

KQuery uses the first approach with a simple optimizer (covered in the next chapter after Joins and Subqueries). The planner here assumes it receives an already-optimized logical plan.

Error Handling

The query planner should catch errors that slip past logical plan validation:

  • Unknown column names
  • Unsupported expression types
  • Type mismatches

In KQuery, these throw exceptions. Production systems would produce structured error messages with source locations.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2025 Andy Grove. All rights reserved.