Query Execution
The source code discussed in this chapter can be found in the execution module of the KQuery project.
We have built all the pieces: data sources, logical plans, physical plans, a query planner, and an optimizer. This chapter ties them together into a working query engine.
The Execution Context
The ExecutionContext is the entry point for running queries. It manages registered tables and coordinates the execution pipeline:
class ExecutionContext(val settings: Map<String, String>) {
private val tables = mutableMapOf<String, DataFrame>()
fun sql(sql: String): DataFrame {
val tokens = SqlTokenizer(sql).tokenize()
val ast = SqlParser(tokens).parse() as SqlSelect
return SqlPlanner().createDataFrame(ast, tables)
}
fun csv(filename: String): DataFrame {
return DataFrameImpl(Scan(filename, CsvDataSource(filename, ...), listOf()))
}
fun register(tablename: String, df: DataFrame) {
tables[tablename] = df
}
fun execute(plan: LogicalPlan): Sequence<RecordBatch> {
val optimizedPlan = Optimizer().optimize(plan)
val physicalPlan = QueryPlanner().createPhysicalPlan(optimizedPlan)
return physicalPlan.execute()
}
}
Users interact with the context to:
- Register data sources as named tables
- Build queries using SQL or the DataFrame API
- Execute queries and consume results
The Execution Pipeline
When you execute a query, it flows through several stages:
SQL String
↓ tokenize
Tokens
↓ parse
SQL AST
↓ plan
Logical Plan
↓ optimize
Optimized Logical Plan
↓ physical planning
Physical Plan
↓ execute
Sequence<RecordBatch>
For DataFrame queries, the pipeline starts at the logical plan stage since the DataFrame API builds logical plans directly.
Stage 1: Parsing (SQL only)
SQL text becomes tokens, then a syntax tree:
val tokens = SqlTokenizer(sql).tokenize()
val ast = SqlParser(tokens).parse()
Stage 2: Logical Planning
The SQL AST (or DataFrame) becomes a logical plan:
val logicalPlan = SqlPlanner().createDataFrame(ast, tables).logicalPlan()
Stage 3: Optimization
The optimizer transforms the logical plan:
val optimizedPlan = Optimizer().optimize(logicalPlan)
Stage 4: Physical Planning
The query planner creates an executable physical plan:
val physicalPlan = QueryPlanner().createPhysicalPlan(optimizedPlan)
Stage 5: Execution
The physical plan executes, producing record batches:
val results: Sequence<RecordBatch> = physicalPlan.execute()
Running a Query
Here is a complete example:
val ctx = ExecutionContext(mapOf())
// Register a CSV file as a table
ctx.registerCsv("employees", "/data/employees.csv")
// Execute a SQL query
val df = ctx.sql("""
SELECT department, AVG(salary) as avg_salary
FROM employees
WHERE state = 'CO'
GROUP BY department
""")
// Execute and print results
ctx.execute(df).forEach { batch ->
println(batch.toCSV())
}
Or using the DataFrame API:
val ctx = ExecutionContext(mapOf())
val results = ctx.csv("/data/employees.csv")
.filter(col("state") eq lit("CO"))
.aggregate(
listOf(col("department")),
listOf(avg(col("salary")))
)
ctx.execute(results).forEach { batch ->
println(batch.toCSV())
}
Both approaches produce the same physical plan and results.
Lazy Evaluation
Notice that building a DataFrame does not execute anything. The DataFrame just holds a logical plan. Execution happens only when you call execute() and consume the resulting sequence.
This lazy evaluation has benefits:
- The optimizer sees the complete query before execution
- Errors in the plan are caught before processing starts
- Resources are not allocated until needed
Consuming Results
The execute() method returns Sequence<RecordBatch>. You can process results in several ways:
Iterate batches:
ctx.execute(df).forEach { batch ->
// Process each batch
}
Collect all results:
val allBatches = ctx.execute(df).toList()
Take only what you need:
val firstBatch = ctx.execute(df).first()
Because Sequence is lazy, taking only the first batch avoids computing subsequent batches. This matters for queries with LIMIT.
Example: NYC Taxi Data
Let us run a real query against the NYC Taxi dataset, a common benchmark dataset with millions of rows.
val ctx = ExecutionContext(mapOf())
ctx.registerCsv("tripdata", "/data/yellow_tripdata_2019-01.csv")
val start = System.currentTimeMillis()
val df = ctx.sql("""
SELECT passenger_count, MAX(fare_amount)
FROM tripdata
GROUP BY passenger_count
""")
ctx.execute(df).forEach { batch ->
println(batch.toCSV())
}
println("Query took ${System.currentTimeMillis() - start} ms")
Output:
passenger_count,MAX
1,623259.86
2,492.5
3,350.0
4,500.0
5,760.0
6,262.5
7,78.0
8,87.0
9,92.0
0,36090.3
Query took 6740 ms
The Impact of Optimization
To see how much the optimizer helps, we can bypass it:
// With optimization (normal path)
val optimizedPlan = Optimizer().optimize(df.logicalPlan())
val physicalPlan = QueryPlanner().createPhysicalPlan(optimizedPlan)
// Query took 6740 ms
// Without optimization
val physicalPlan = QueryPlanner().createPhysicalPlan(df.logicalPlan())
// Query took 36090 ms
The unoptimized query takes about five times longer. The difference comes from projection push-down: the optimized plan reads only the columns it needs (passenger_count, fare_amount), while the unoptimized plan reads all 17 columns from the CSV file.
For wider tables or more selective filters, the optimization impact would be even greater.
Comparison with Apache Spark
For reference, here is the same query in Apache Spark:
val spark = SparkSession.builder()
.master("local[1]") // Single thread for fair comparison
.getOrCreate()
val tripdata = spark.read
.option("header", "true")
.schema(schema)
.csv("/data/yellow_tripdata_2019-01.csv")
tripdata.createOrReplaceTempView("tripdata")
val df = spark.sql("""
SELECT passenger_count, MAX(fare_amount)
FROM tripdata
GROUP BY passenger_count
""")
df.show()
// Query took 14418 ms
KQuery’s performance is competitive for this query. Spark has more overhead for small-to-medium datasets but scales better to very large datasets through its distributed execution capabilities.
Error Handling
Errors can occur at any stage:
- Parsing: Syntax errors in SQL
- Planning: Unknown table or column names, type mismatches
- Execution: Runtime errors like division by zero, file not found
KQuery currently throws exceptions for errors. A production system would provide structured error types with source locations and helpful messages.
What We Have Built
At this point, we have a working query engine that can:
- Read CSV files
- Execute SQL queries
- Execute DataFrame queries
- Optimize query plans
- Process data in batches
The remaining chapters cover more advanced topics: parallel execution within a single machine, and distributed execution across a cluster.
This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work
Copyright © 2020-2025 Andy Grove. All rights reserved.