Physical Plans and Expressions
The source code discussed in this chapter can be found in the physical-plan module of the KQuery project.
Logical plans describe what computation to perform. Physical plans describe how to perform it. This chapter covers the physical plan layer, where abstract operations become executable code.
Why Separate Physical from Logical?
A logical plan says "aggregate this data by department." A physical plan specifies which algorithm to use. There might be several valid choices:
- Hash Aggregate: Build a hash map keyed by grouping columns, update accumulators as rows arrive. Works well for unsorted data with moderate cardinality.
- Sort Aggregate: Requires data sorted by grouping columns, but uses less memory since it only tracks one group at a time.
Similarly for joins:
- Hash Join: Build a hash table from one side, probe with the other. Fast for equi-joins.
- Sort-Merge Join: Sort both sides, merge them. Good when data is already sorted.
- Nested Loop Join: For each row on the left, scan the entire right side. Simple but slow; necessary for non-equi joins.
The logical plan does not care which algorithm runs. The query planner chooses based on data characteristics, available indexes, and cost estimates. Keeping logical and physical separate enables this flexibility.
Physical plans might also vary by execution environment:
- Single-threaded vs parallel execution
- CPU vs GPU computation
- Local vs distributed processing
The PhysicalPlan Interface
Physical plans produce data. The interface reflects this:
/**
* A physical plan represents an executable piece of code that will produce
* data.
*/
interface PhysicalPlan {
fun schema(): Schema
/** Execute a physical plan and produce a series of record batches. */
fun execute(): Sequence<RecordBatch>
/**
* Returns the children (inputs) of this physical plan. This method is used to
* enable use of the visitor pattern to walk a query tree.
*/
fun children(): List<PhysicalPlan>
...
}
The key method is execute(), which returns a sequence of record batches. This is the pull-based execution model: the caller pulls batches as needed rather than having batches pushed to it. Kotlin's Sequence is lazy, so computation happens only when batches are consumed.
Physical Expressions
Logical expressions reference columns by name. Physical expressions reference columns by index for efficiency. At execution time, we do not want to search for column names.
/** Physical representation of an expression. */
interface Expression {
/**
* Evaluate the expression against an input record batch and produce a column
* of data as output
*/
fun evaluate(input: RecordBatch): ColumnVector
}
A physical expression takes a record batch and produces a column vector. The output has one value per row in the input batch.
Column Expression
The simplest expression retrieves a column from the input:
/** Reference column in a batch by index */
class ColumnExpression(val i: Int) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
return input.field(i)
}
override fun toString(): String {
return "#$i"
}
}
No computation, just a lookup by index.
Literal Expression
Literal values produce a column where every row has the same value. Rather than allocating storage for identical values, we use a LiteralValueVector (introduced in the Type System chapter) that returns the same value for any index. This optimization matters because expressions like salary * 1.1 would otherwise allocate a column of 1.1 values just to multiply element-wise.
class LiteralDoubleExpression(val value: Double) : Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
return LiteralValueVector(ArrowTypes.DoubleType, value, input.rowCount())
}
}
Binary Expressions
Binary expressions evaluate two sub-expressions and combine them. A base class handles the common logic:
/**
* For binary expressions we need to evaluate the left and right input
* expressions and then evaluate the specific binary operator against those
* input values, so we can use this base class to simplify the implementation
* for each operator.
*/
abstract class BinaryExpression(val l: Expression, val r: Expression) :
Expression {
override fun evaluate(input: RecordBatch): ColumnVector {
val ll = l.evaluate(input)
val rr = r.evaluate(input)
assert(ll.size() == rr.size())
...
return evaluate(ll, rr)
}
...
abstract fun evaluate(l: ColumnVector, r: ColumnVector): ColumnVector
}
Comparison expressions produce boolean results:
class EqExpression(l: Expression, r: Expression) : BooleanExpression(l, r) {
override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType): Boolean {
return when (arrowType) {
...
ArrowTypes.Int32Type -> (l as Int) == (r as Int)
ArrowTypes.Int64Type -> (l as Long) == (r as Long)
...
ArrowTypes.DoubleType -> (l as Double) == (r as Double)
ArrowTypes.StringType -> toString(l) == toString(r)
...
else ->
throw IllegalStateException(
"Unsupported data type in comparison expression: $arrowType")
}
}
}
Math expressions produce numeric results:
class AddExpression(l: Expression, r: Expression) : MathExpression(l, r) {
override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType): Any? {
if (l == null || r == null) return null
return when (arrowType) {
...
ArrowTypes.Int32Type -> (l as Int) + (r as Int)
ArrowTypes.Int64Type -> (l as Long) + (r as Long)
...
ArrowTypes.DoubleType -> (l as Double) + (r as Double)
else ->
throw IllegalStateException(
"Unsupported data type in math expression: $arrowType")
}
}
...
}
Aggregate Expressions
Aggregate expressions work differently. Rather than producing one output value per input row, they reduce many rows to one value. This requires accumulators that maintain state across batches:
interface AggregateExpression {
fun inputExpression(): Expression
fun createAccumulator(): Accumulator
}
interface Accumulator {
fun accumulate(value: Any?)
fun finalValue(): Any?
}
Each aggregate type creates its own accumulator:
class MaxExpression(private val expr: Expression) : AggregateExpression {
override fun inputExpression(): Expression {
return expr
}
override fun createAccumulator(): Accumulator {
return MaxAccumulator()
}
override fun toString(): String {
return "MAX($expr)"
}
}
class MaxAccumulator : Accumulator {
var value: Any? = null
override fun accumulate(value: Any?) {
if (value != null) {
if (this.value == null) {
this.value = value
} else {
val isMax =
when (value) {
...
is Int -> value > this.value as Int
is Long -> value > this.value as Long
...
is Double -> value > this.value as Double
...
else ->
throw UnsupportedOperationException(
"MAX is not implemented for data type: ${value.javaClass.name}")
}
if (isMax) {
this.value = value
}
}
}
}
override fun finalValue(): Any? {
return value
}
}
Physical Plans
With expressions defined, we can implement the physical plan operators.
Scan
Scan reads from a data source. It is the simplest operator, delegating entirely to the data source:
/** Scan a data source with optional push-down projection. */
class ScanExec(val ds: DataSource, val projection: List<String>) :
PhysicalPlan {
override fun schema(): Schema {
return ds.schema().select(projection)
}
override fun children(): List<PhysicalPlan> {
return listOf()
}
override fun execute(): Sequence<RecordBatch> {
return ds.scan(projection)
}
...
}
The projection list tells the data source which columns to read. For columnar formats like Parquet, this avoids reading unnecessary data.
Projection
Projection evaluates expressions to produce new columns:
/** Execute a projection. */
class ProjectionExec(
val input: PhysicalPlan,
val schema: Schema,
val expr: List<Expression>
) : PhysicalPlan {
override fun schema(): Schema {
return schema
}
override fun children(): List<PhysicalPlan> {
return listOf(input)
}
override fun execute(): Sequence<RecordBatch> {
return input.execute().map { batch ->
val columns = expr.map { it.evaluate(batch) }
RecordBatch(schema, columns)
}
}
...
}
For each input batch, evaluate each expression to produce output columns. When an expression is just a column reference, the output column is the same object as the input column; no data is copied.
Selection (Filter)
Selection keeps rows where a predicate is true:
/** Execute a selection. */
class SelectionExec(val input: PhysicalPlan, val expr: Expression) :
PhysicalPlan {
override fun schema(): Schema {
return input.schema()
}
override fun children(): List<PhysicalPlan> {
return listOf(input)
}
override fun execute(): Sequence<RecordBatch> {
val input = input.execute()
return input.map { batch ->
try {
val result =
(expr.evaluate(batch) as ArrowFieldVector).field as BitVector
val columnCount = batch.schema.fields.size
val filteredFields =
(0 until columnCount).map { filter(batch.field(it), result) }
val fields = filteredFields.map { ArrowFieldVector(it) }
RecordBatch(batch.schema, fields)
} finally {
batch.close()
}
}
}
private fun filter(v: ColumnVector, selection: BitVector): FieldVector {
// Count selected rows first to allocate correct capacity
var count = 0
(0 until selection.valueCount).forEach {
if (selection.get(it) == 1) {
count++
}
}
// Create vector of the same type as input
val filteredVector = FieldVectorFactory.create(v.getType(), count)
val builder = ArrowVectorBuilder(filteredVector)
var index = 0
(0 until selection.valueCount).forEach {
if (selection.get(it) == 1) {
builder.set(index, v.getValue(it))
index++
}
}
filteredVector.valueCount = count
return filteredVector
}
}
The predicate expression produces a bit vector (one bit per row). The implementation evaluates the filter expression, then copies values where the bit is set into new vectors. Production systems optimize cases where all or no rows match.
Hash Aggregate
Hash aggregation groups rows by key and computes aggregates. It processes all input before producing output:
class HashAggregateExec(
val input: PhysicalPlan,
val groupExpr: List<Expression>,
val aggregateExpr: List<AggregateExpression>,
val schema: Schema
) : PhysicalPlan {
override fun schema(): Schema {
return schema
}
override fun children(): List<PhysicalPlan> {
return listOf(input)
}
...
override fun execute(): Sequence<RecordBatch> {
val map = HashMap<List<Any?>, List<Accumulator>>()
// Process all input batches
input.execute().forEach { batch ->
...
val groupKeys = groupExpr.map { it.evaluate(batch) }
val aggrInputs =
aggregateExpr.map { it.inputExpression().evaluate(batch) }
// For each row, update accumulators
(0 until batch.rowCount()).forEach { rowIndex ->
...
val accumulators =
map.getOrPut(rowKey) {
aggregateExpr.map { it.createAccumulator() }
}
accumulators.withIndex().forEach { accum ->
val value = aggrInputs[accum.index].getValue(rowIndex)
accum.value.accumulate(value)
}
}
...
}
// Build output batch from accumulated results
...
}
}
The hash map keys are lists of grouping column values. Each entry holds accumulators for that group. After processing all input, we iterate the map to build the output batch.
This is the "hash" aggregate because we use a hash map. For sorted data, a "sort" aggregate would be more efficient since we could emit results as soon as the grouping key changes, without storing all groups in memory.
Execution Model
KQuery uses pull-based execution: the root operator calls execute() on its children, which call their children, and so on. Data flows up as batches are requested.
The alternative is push-based execution, where operators push batches to their parents. Both models work; the choice affects how backpressure and parallelism are handled.
Returning Sequence<RecordBatch> enables lazy evaluation. If the root only needs the first batch (for a LIMIT 1 query), we avoid computing subsequent batches.
Next Steps
Physical plans are executable, but we still need something to create them from logical plans. The next chapter covers the query planner that performs this translation.
This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work
Copyright © 2020-2025 Andy Grove. All rights reserved.