Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Physical Plans and Expressions

The source code discussed in this chapter can be found in the physical-plan module of the KQuery project.

Logical plans describe what computation to perform. Physical plans describe how to perform it. This chapter covers the physical plan layer, where abstract operations become executable code.

Why Separate Physical from Logical?

A logical plan says “aggregate this data by department.” A physical plan specifies which algorithm to use. There might be several valid choices:

  • Hash Aggregate: Build a hash map keyed by grouping columns, update accumulators as rows arrive. Works well for unsorted data with moderate cardinality.
  • Sort Aggregate: Requires data sorted by grouping columns, but uses less memory since it only tracks one group at a time.

Similarly for joins:

  • Hash Join: Build a hash table from one side, probe with the other. Fast for equi-joins.
  • Sort-Merge Join: Sort both sides, merge them. Good when data is already sorted.
  • Nested Loop Join: For each row on the left, scan the entire right side. Simple but slow; necessary for non-equi joins.

The logical plan does not care which algorithm runs. The query planner chooses based on data characteristics, available indexes, and cost estimates. Keeping logical and physical separate enables this flexibility.

Physical plans might also vary by execution environment:

  • Single-threaded vs parallel execution
  • CPU vs GPU computation
  • Local vs distributed processing

The PhysicalPlan Interface

Physical plans produce data. The interface reflects this:

interface PhysicalPlan {

  fun schema(): Schema

  /* Execute a physical plan and produce a series of record batches. */
  fun execute(): Sequence<RecordBatch>

  /*
   * Returns the children (inputs) of this physical plan. This method is used
   * to enable use of the visitor pattern to walk a query tree.
   */
  fun children(): List<PhysicalPlan>
}

The key method is execute(), which returns a sequence of record batches. This is the pull-based execution model: the caller pulls batches as needed rather than having batches pushed to it. Kotlin’s Sequence is lazy, so computation happens only when batches are consumed.

Physical Expressions

Logical expressions reference columns by name. Physical expressions reference columns by index for efficiency. At execution time, we do not want to search for column names.

interface Expression {
  fun evaluate(input: RecordBatch): ColumnVector
}

A physical expression takes a record batch and produces a column vector. The output has one value per row in the input batch.

Column Expression

The simplest expression retrieves a column from the input:

class ColumnExpression(val i: Int) : Expression {

  override fun evaluate(input: RecordBatch): ColumnVector {
    return input.field(i)
  }
}

No computation, just a lookup by index.

Literal Expression

Literal values produce a column where every row has the same value. Rather than allocating storage for identical values, we use a LiteralValueVector that returns the same value for any index:

class LiteralValueVector(
    val arrowType: ArrowType,
    val value: Any?,
    val size: Int
) : ColumnVector {

  override fun getType(): ArrowType = arrowType

  override fun getValue(i: Int): Any? {
    if (i < 0 || i >= size) {
      throw IndexOutOfBoundsException()
    }
    return value
  }

  override fun size(): Int = size
}

This optimization matters because expressions like salary * 1.1 would otherwise allocate a column of 1.1 values just to multiply element-wise.

class LiteralDoubleExpression(val value: Double) : Expression {
  override fun evaluate(input: RecordBatch): ColumnVector {
    return LiteralValueVector(ArrowTypes.DoubleType, value, input.rowCount())
  }
}

Binary Expressions

Binary expressions evaluate two sub-expressions and combine them. A base class handles the common logic:

abstract class BinaryExpression(val l: Expression, val r: Expression) : Expression {

  override fun evaluate(input: RecordBatch): ColumnVector {
    val ll = l.evaluate(input)
    val rr = r.evaluate(input)
    assert(ll.size() == rr.size())
    return evaluate(ll, rr)
  }

  abstract fun evaluate(l: ColumnVector, r: ColumnVector): ColumnVector
}

Comparison expressions produce boolean results:

class EqExpression(l: Expression, r: Expression) : BooleanExpression(l, r) {

  override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType): Boolean {
    return when (arrowType) {
      ArrowTypes.Int32Type -> (l as Int) == (r as Int)
      ArrowTypes.Int64Type -> (l as Long) == (r as Long)
      ArrowTypes.DoubleType -> (l as Double) == (r as Double)
      ArrowTypes.StringType -> toString(l) == toString(r)
      else -> throw IllegalStateException("Unsupported type: $arrowType")
    }
  }
}

Math expressions produce numeric results:

class AddExpression(l: Expression, r: Expression) : MathExpression(l, r) {

  override fun evaluate(l: Any?, r: Any?, arrowType: ArrowType): Any? {
    return when (arrowType) {
      ArrowTypes.Int32Type -> (l as Int) + (r as Int)
      ArrowTypes.Int64Type -> (l as Long) + (r as Long)
      ArrowTypes.DoubleType -> (l as Double) + (r as Double)
      else -> throw IllegalStateException("Unsupported type: $arrowType")
    }
  }
}

Aggregate Expressions

Aggregate expressions work differently. Rather than producing one output value per input row, they reduce many rows to one value. This requires accumulators that maintain state across batches:

interface AggregateExpression {
  fun inputExpression(): Expression
  fun createAccumulator(): Accumulator
}

interface Accumulator {
  fun accumulate(value: Any?)
  fun finalValue(): Any?
}

Each aggregate type creates its own accumulator:

class MaxExpression(private val expr: Expression) : AggregateExpression {

  override fun inputExpression(): Expression = expr

  override fun createAccumulator(): Accumulator = MaxAccumulator()
}

class MaxAccumulator : Accumulator {
  var value: Any? = null

  override fun accumulate(value: Any?) {
    if (value != null) {
      if (this.value == null) {
        this.value = value
      } else {
        val isMax = when (value) {
          is Int -> value > this.value as Int
          is Long -> value > this.value as Long
          is Double -> value > this.value as Double
          else -> throw UnsupportedOperationException("MAX not supported for: ${value.javaClass}")
        }
        if (isMax) {
          this.value = value
        }
      }
    }
  }

  override fun finalValue(): Any? = value
}

Physical Plans

With expressions defined, we can implement the physical plan operators.

Scan

Scan reads from a data source. It is the simplest operator, delegating entirely to the data source:

class ScanExec(val ds: DataSource, val projection: List<String>) : PhysicalPlan {

  override fun schema(): Schema = ds.schema().select(projection)

  override fun children(): List<PhysicalPlan> = listOf()

  override fun execute(): Sequence<RecordBatch> = ds.scan(projection)
}

The projection list tells the data source which columns to read. For columnar formats like Parquet, this avoids reading unnecessary data.

Projection

Projection evaluates expressions to produce new columns:

class ProjectionExec(
    val input: PhysicalPlan,
    val schema: Schema,
    val expr: List<Expression>
) : PhysicalPlan {

  override fun schema(): Schema = schema

  override fun children(): List<PhysicalPlan> = listOf(input)

  override fun execute(): Sequence<RecordBatch> {
    return input.execute().map { batch ->
      val columns = expr.map { it.evaluate(batch) }
      RecordBatch(schema, columns)
    }
  }
}

For each input batch, evaluate each expression to produce output columns. When an expression is just a column reference, the output column is the same object as the input column; no data is copied.

Selection (Filter)

Selection keeps rows where a predicate is true:

class SelectionExec(
    val input: PhysicalPlan,
    val expr: Expression
) : PhysicalPlan {

  override fun schema(): Schema = input.schema()

  override fun children(): List<PhysicalPlan> = listOf(input)

  override fun execute(): Sequence<RecordBatch> {
    return input.execute().map { batch ->
      val result = expr.evaluate(batch) as BitVector
      val filteredFields = batch.schema.fields.indices.map { i ->
        filter(batch.field(i), result)
      }
      RecordBatch(batch.schema, filteredFields)
    }
  }

  private fun filter(v: ColumnVector, selection: BitVector): ColumnVector {
    // Count selected rows
    var count = 0
    (0 until selection.valueCount).forEach {
      if (selection.get(it) == 1) count++
    }

    // Build filtered vector
    val filtered = FieldVectorFactory.create(v.getType(), count)
    var index = 0
    (0 until selection.valueCount).forEach {
      if (selection.get(it) == 1) {
        filtered.set(index++, v.getValue(it))
      }
    }
    return filtered
  }
}

The predicate expression produces a bit vector (one bit per row). We then copy values where the bit is set. This is a straightforward implementation; production systems optimize cases where all or no rows match.

Hash Aggregate

Hash aggregation groups rows by key and computes aggregates. It processes all input before producing output:

class HashAggregateExec(
    val input: PhysicalPlan,
    val groupExpr: List<Expression>,
    val aggregateExpr: List<AggregateExpression>,
    val schema: Schema
) : PhysicalPlan {

  override fun schema(): Schema = schema

  override fun children(): List<PhysicalPlan> = listOf(input)

  override fun execute(): Sequence<RecordBatch> {
    val map = HashMap<List<Any?>, List<Accumulator>>()

    // Process all input batches
    input.execute().forEach { batch ->
      val groupKeys = groupExpr.map { it.evaluate(batch) }
      val aggrInputs = aggregateExpr.map { it.inputExpression().evaluate(batch) }

      // For each row, update accumulators
      (0 until batch.rowCount()).forEach { row ->
        val key = groupKeys.map { it.getValue(row) }

        val accumulators = map.getOrPut(key) {
          aggregateExpr.map { it.createAccumulator() }
        }

        accumulators.forEachIndexed { i, acc ->
          acc.accumulate(aggrInputs[i].getValue(row))
        }
      }
    }

    // Build output batch from accumulated results
    val root = VectorSchemaRoot.create(schema.toArrow(), allocator)
    root.allocateNew()
    root.rowCount = map.size

    map.entries.forEachIndexed { rowIndex, entry ->
      val (groupKey, accumulators) = entry

      groupExpr.indices.forEach { i ->
        root.getVector(i).set(rowIndex, groupKey[i])
      }
      aggregateExpr.indices.forEach { i ->
        root.getVector(groupExpr.size + i).set(rowIndex, accumulators[i].finalValue())
      }
    }

    return sequenceOf(RecordBatch(schema, root.fieldVectors.map { ArrowFieldVector(it) }))
  }
}

The hash map keys are lists of grouping column values. Each entry holds accumulators for that group. After processing all input, we iterate the map to build the output batch.

This is the “hash” aggregate because we use a hash map. For sorted data, a “sort” aggregate would be more efficient since we could emit results as soon as the grouping key changes, without storing all groups in memory.

Execution Model

KQuery uses pull-based execution: the root operator calls execute() on its children, which call their children, and so on. Data flows up as batches are requested.

The alternative is push-based execution, where operators push batches to their parents. Both models work; the choice affects how backpressure and parallelism are handled.

Returning Sequence<RecordBatch> enables lazy evaluation. If the root only needs the first batch (for a LIMIT 1 query), we avoid computing subsequent batches.

Next Steps

Physical plans are executable, but we still need something to create them from logical plans. The next chapter covers the query planner that performs this translation.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2025 Andy Grove. All rights reserved.