Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Query Execution

The source code discussed in this chapter can be found in the execution module of the KQuery project.

We have built all the pieces: data sources, logical plans, physical plans, a query planner, and an optimizer. This chapter ties them together into a working query engine.

The Execution Context

The ExecutionContext is the entry point for running queries. It manages registered tables and coordinates the execution pipeline:

class ExecutionContext(val settings: Map<String, String>) {

    private val tables = mutableMapOf<String, DataFrame>()

    fun sql(sql: String): DataFrame {
        val tokens = SqlTokenizer(sql).tokenize()
        val ast = SqlParser(tokens).parse() as SqlSelect
        return SqlPlanner().createDataFrame(ast, tables)
    }

    fun csv(filename: String): DataFrame {
        return DataFrameImpl(Scan(filename, CsvDataSource(filename, ...), listOf()))
    }

    fun register(tablename: String, df: DataFrame) {
        tables[tablename] = df
    }

    fun execute(plan: LogicalPlan): Sequence<RecordBatch> {
        val optimizedPlan = Optimizer().optimize(plan)
        val physicalPlan = QueryPlanner().createPhysicalPlan(optimizedPlan)
        return physicalPlan.execute()
    }
}

Users interact with the context to:

  1. Register data sources as named tables
  2. Build queries using SQL or the DataFrame API
  3. Execute queries and consume results

The Execution Pipeline

When you execute a query, it flows through several stages:

SQL String
    ↓ tokenize
Tokens
    ↓ parse
SQL AST
    ↓ plan
Logical Plan
    ↓ optimize
Optimized Logical Plan
    ↓ physical planning
Physical Plan
    ↓ execute
Sequence<RecordBatch>

For DataFrame queries, the pipeline starts at the logical plan stage since the DataFrame API builds logical plans directly.

Stage 1: Parsing (SQL only)

SQL text becomes tokens, then a syntax tree:

val tokens = SqlTokenizer(sql).tokenize()
val ast = SqlParser(tokens).parse()

Stage 2: Logical Planning

The SQL AST (or DataFrame) becomes a logical plan:

val logicalPlan = SqlPlanner().createDataFrame(ast, tables).logicalPlan()

Stage 3: Optimization

The optimizer transforms the logical plan:

val optimizedPlan = Optimizer().optimize(logicalPlan)

Stage 4: Physical Planning

The query planner creates an executable physical plan:

val physicalPlan = QueryPlanner().createPhysicalPlan(optimizedPlan)

Stage 5: Execution

The physical plan executes, producing record batches:

val results: Sequence<RecordBatch> = physicalPlan.execute()

Running a Query

Here is a complete example:

val ctx = ExecutionContext(mapOf())

// Register a CSV file as a table
ctx.registerCsv("employees", "/data/employees.csv")

// Execute a SQL query
val df = ctx.sql("""
    SELECT department, AVG(salary) as avg_salary
    FROM employees
    WHERE state = 'CO'
    GROUP BY department
""")

// Execute and print results
ctx.execute(df).forEach { batch ->
    println(batch.toCSV())
}

Or using the DataFrame API:

val ctx = ExecutionContext(mapOf())

val results = ctx.csv("/data/employees.csv")
    .filter(col("state") eq lit("CO"))
    .aggregate(
        listOf(col("department")),
        listOf(avg(col("salary")))
    )

ctx.execute(results).forEach { batch ->
    println(batch.toCSV())
}

Both approaches produce the same physical plan and results.

Lazy Evaluation

Notice that building a DataFrame does not execute anything. The DataFrame just holds a logical plan. Execution happens only when you call execute() and consume the resulting sequence.

This lazy evaluation has benefits:

  • The optimizer sees the complete query before execution
  • Errors in the plan are caught before processing starts
  • Resources are not allocated until needed

Consuming Results

The execute() method returns Sequence<RecordBatch>. You can process results in several ways:

Iterate batches:

ctx.execute(df).forEach { batch ->
    // Process each batch
}

Collect all results:

val allBatches = ctx.execute(df).toList()

Take only what you need:

val firstBatch = ctx.execute(df).first()

Because Sequence is lazy, taking only the first batch avoids computing subsequent batches. This matters for queries with LIMIT.

Example: NYC Taxi Data

Let us run a real query against the NYC Taxi dataset, a common benchmark dataset with millions of rows.

val ctx = ExecutionContext(mapOf())
ctx.registerCsv("tripdata", "/data/yellow_tripdata_2019-01.csv")

val start = System.currentTimeMillis()

val df = ctx.sql("""
    SELECT passenger_count, MAX(fare_amount)
    FROM tripdata
    GROUP BY passenger_count
""")

ctx.execute(df).forEach { batch ->
    println(batch.toCSV())
}

println("Query took ${System.currentTimeMillis() - start} ms")

Output:

passenger_count,MAX
1,623259.86
2,492.5
3,350.0
4,500.0
5,760.0
6,262.5
7,78.0
8,87.0
9,92.0
0,36090.3

Query took 6740 ms

The Impact of Optimization

To see how much the optimizer helps, we can bypass it:

// With optimization (normal path)
val optimizedPlan = Optimizer().optimize(df.logicalPlan())
val physicalPlan = QueryPlanner().createPhysicalPlan(optimizedPlan)
// Query took 6740 ms

// Without optimization
val physicalPlan = QueryPlanner().createPhysicalPlan(df.logicalPlan())
// Query took 36090 ms

The unoptimized query takes about five times longer. The difference comes from projection push-down: the optimized plan reads only the columns it needs (passenger_count, fare_amount), while the unoptimized plan reads all 17 columns from the CSV file.

For wider tables or more selective filters, the optimization impact would be even greater.

Comparison with Apache Spark

For reference, here is the same query in Apache Spark:

val spark = SparkSession.builder()
    .master("local[1]")  // Single thread for fair comparison
    .getOrCreate()

val tripdata = spark.read
    .option("header", "true")
    .schema(schema)
    .csv("/data/yellow_tripdata_2019-01.csv")

tripdata.createOrReplaceTempView("tripdata")

val df = spark.sql("""
    SELECT passenger_count, MAX(fare_amount)
    FROM tripdata
    GROUP BY passenger_count
""")

df.show()
// Query took 14418 ms

KQuery’s performance is competitive for this query. Spark has more overhead for small-to-medium datasets but scales better to very large datasets through its distributed execution capabilities.

Error Handling

Errors can occur at any stage:

  • Parsing: Syntax errors in SQL
  • Planning: Unknown table or column names, type mismatches
  • Execution: Runtime errors like division by zero, file not found

KQuery currently throws exceptions for errors. A production system would provide structured error types with source locations and helpful messages.

What We Have Built

At this point, we have a working query engine that can:

  • Read CSV files
  • Execute SQL queries
  • Execute DataFrame queries
  • Optimize query plans
  • Process data in batches

The remaining chapters cover more advanced topics: parallel execution within a single machine, and distributed execution across a cluster.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2025 Andy Grove. All rights reserved.