Query Optimizations
The source code discussed in this chapter can be found in the optimizer module of the KQuery project.
A query engine that executes plans exactly as written will produce correct results but may be slow. Users writing SQL or DataFrame queries naturally express what they want, not how to compute it efficiently. The optimizer transforms logical plans into equivalent but faster plans.
Why Optimize?
Consider a query that joins two tables and then filters:
SELECT e.name, d.dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.id
WHERE e.state = 'CO'
Executing this literally means: join all employees with all departments, then filter to Colorado. If there are 100,000 employees and only 5,000 in Colorado, we do 95,000 unnecessary join lookups.
An optimizer recognizes that the filter on state only touches the employees table and can be applied before the join:
Before optimization:
Filter: state = 'CO'
Join: e.dept_id = d.id
Scan: employees
Scan: departments
After optimization:
Join: e.dept_id = d.id
Filter: state = 'CO'
Scan: employees
Scan: departments
Now we join only 5,000 employees instead of 100,000. This produces the same result but is much faster.
Rule-Based Optimization
KQuery uses rule-based optimization: a set of transformation rules that each improve the plan in some way. Rules are applied in sequence, each taking a logical plan and returning a (hopefully better) logical plan.
interface OptimizerRule {
fun optimize(plan: LogicalPlan): LogicalPlan
}
class Optimizer {
fun optimize(plan: LogicalPlan): LogicalPlan {
var result = plan
result = ProjectionPushDownRule().optimize(result)
// Additional rules would be applied here
return result
}
}
Rules work by walking the plan tree and rebuilding it with modifications. This functional approach (build a new tree rather than mutate the old one) is simpler and less error-prone.
Projection Push-Down
Projection push-down reduces memory usage by reading only the columns that the query actually uses. If a table has 50 columns but the query only references 3, we should read only those 3.
The rule works by:
- Walking the plan top-down, collecting column names referenced in each operator
- When reaching a Scan, replacing it with a Scan that projects only the needed columns
First, we need a helper to extract column references from expressions:
fun extractColumns(expr: LogicalExpr, input: LogicalPlan, accum: MutableSet<String>) {
when (expr) {
is Column -> accum.add(expr.name)
is ColumnIndex -> accum.add(input.schema().fields[expr.i].name)
is BinaryExpr -> {
extractColumns(expr.l, input, accum)
extractColumns(expr.r, input, accum)
}
is Alias -> extractColumns(expr.expr, input, accum)
is CastExpr -> extractColumns(expr.expr, input, accum)
is LiteralString, is LiteralLong, is LiteralDouble -> { }
else -> throw IllegalStateException("Unsupported: $expr")
}
}
Then the rule itself:
class ProjectionPushDownRule : OptimizerRule {
override fun optimize(plan: LogicalPlan): LogicalPlan {
return pushDown(plan, mutableSetOf())
}
private fun pushDown(plan: LogicalPlan, columnNames: MutableSet<String>): LogicalPlan {
return when (plan) {
is Projection -> {
extractColumns(plan.expr, plan.input, columnNames)
val input = pushDown(plan.input, columnNames)
Projection(input, plan.expr)
}
is Selection -> {
extractColumns(plan.expr, plan.input, columnNames)
val input = pushDown(plan.input, columnNames)
Selection(input, plan.expr)
}
is Aggregate -> {
extractColumns(plan.groupExpr, plan.input, columnNames)
extractColumns(plan.aggregateExpr.map { it.expr }, plan.input, columnNames)
val input = pushDown(plan.input, columnNames)
Aggregate(input, plan.groupExpr, plan.aggregateExpr)
}
is Scan -> {
val validFields = plan.dataSource.schema().fields.map { it.name }.toSet()
val projection = validFields.filter { columnNames.contains(it) }.sorted()
Scan(plan.path, plan.dataSource, projection)
}
else -> throw IllegalStateException("Unsupported: $plan")
}
}
}
Given this plan:
Projection: #id, #first_name, #last_name
Filter: #state = 'CO'
Scan: employee; projection=None
The optimizer produces:
Projection: #id, #first_name, #last_name
Filter: #state = 'CO'
Scan: employee; projection=[first_name, id, last_name, state]
The Scan now reads only four columns instead of all columns in the table. For columnar formats like Parquet, this dramatically reduces I/O.
Predicate Push-Down
Predicate push-down moves filters closer to the data source, reducing the number of rows processed by later operators.
Consider:
Projection: #dept_name, #first_name, #last_name
Filter: #state = 'CO'
Join: #employee.dept_id = #dept.id
Scan: employee
Scan: dept
The filter references only employee columns, so it can move below the join:
Projection: #dept_name, #first_name, #last_name
Join: #employee.dept_id = #dept.id
Filter: #state = 'CO'
Scan: employee
Scan: dept
Now the join processes fewer rows. This optimization becomes more important with larger tables and more selective predicates.
The implementation must analyze which tables each predicate references and only push predicates that reference a single table below joins. Predicates referencing both sides of a join cannot be pushed below it.
KQuery does not currently implement predicate push-down, but the pattern is similar to projection push-down: walk the tree, identify opportunities, rebuild with filters moved down.
Eliminate Common Subexpressions
When the same expression appears multiple times, we can compute it once and reuse the result:
SELECT sum(price * qty) AS total_price,
sum(price * qty * tax_rate) AS total_tax
FROM sales
The expression price * qty appears in both aggregates. Rather than compute it twice per row, we can add an intermediate projection:
Original:
Aggregate: sum(#price * #qty), sum(#price * #qty * #tax_rate)
Scan: sales
Optimized:
Aggregate: sum(#subtotal), sum(#subtotal * #tax_rate)
Projection: #price * #qty AS subtotal, #tax_rate
Scan: sales
This trades one multiplication per row (in the projection) against two multiplications per row (in the original aggregates). For large datasets, this adds up.
KQuery does not implement this optimization, but the approach involves:
- Finding expressions that appear multiple times
- Creating a projection that computes them once with generated names
- Rewriting later operators to reference the computed columns
Cost-Based Optimization
Rule-based optimization applies transformations unconditionally. Cost-based optimization estimates the cost of different plans and chooses the cheapest.
Consider join ordering. For three tables A, B, C:
- (A JOIN B) JOIN C
- (A JOIN C) JOIN B
- (B JOIN C) JOIN A
All produce the same result, but performance varies dramatically based on table sizes and join selectivity. If A has 1 million rows, B has 100 rows, and C has 10,000 rows, joining B and C first (100 × 10,000 = 1 million intermediate rows at most) then joining A is likely faster than starting with A.
Cost-based optimizers need statistics:
- Table row counts
- Column cardinality (number of distinct values)
- Value distributions (histograms)
- Min/max values per column
With statistics, the optimizer can estimate:
- How many rows a filter will produce (selectivity)
- How many rows a join will produce
- Memory requirements for hash tables
The optimizer generates candidate plans, estimates cost for each, and picks the cheapest.
The Statistics Challenge
Cost-based optimization sounds great but has practical challenges:
Gathering statistics is expensive. Scanning terabytes of data to build histograms takes time. For ad-hoc queries, this overhead may exceed the optimization benefit.
Statistics become stale. As data changes, statistics drift from reality. Stale statistics lead to bad plans.
Estimation errors compound. Each estimate has error. In a complex plan with many operators, errors multiply, potentially leading to catastrophically bad plans.
Some formats provide partial statistics. Parquet and ORC files include min/max values and row counts per column chunk. This helps but is not enough for accurate cardinality estimation.
KQuery uses only rule-based optimization. Production systems like Spark, Presto, and traditional databases invest heavily in cost-based optimization, but it remains an area of active research and engineering.
Other Optimizations
Query engines implement many other optimizations:
Constant folding: Evaluate constant expressions at planning time. WHERE date > '2024-01-01' AND 1 = 1 becomes WHERE date > '2024-01-01'.
Dead column elimination: Remove columns from intermediate results when they are not needed downstream.
Join reordering: Choose the order of joins to minimize intermediate result sizes.
Limit push-down: Push LIMIT operators down to reduce work. If we only need 10 rows, stop early.
Partition pruning: Skip reading partitions that cannot contain matching data based on partition keys.
The right set of optimizations depends on the workload and data characteristics. Simple rule-based optimizations provide significant benefits with minimal complexity.
This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work
Copyright © 2020-2025 Andy Grove. All rights reserved.