Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Parallel Query Execution

A single-threaded query engine leaves most of a modern computer idle. My desktop has 24 CPU cores, but a single-threaded query uses only one of them, wasting 96% of available compute power. Parallel query execution changes this by spreading work across multiple cores.

The goal is straightforward: if a query takes 60 seconds on one core, running it across 12 cores should take closer to 5 seconds. We rarely achieve perfect linear speedup due to coordination overhead and uneven data distribution, but even partial parallelism delivers substantial improvements.

This chapter covers parallel execution on a single machine using multiple threads or coroutines. The next chapter extends these ideas to distributed execution across multiple machines, which introduces network coordination and data exchange between nodes.

Why Parallelism Helps

Query engines spend their time on three activities: reading data from storage, computing results, and writing output. Each of these can benefit from parallelism.

For I/O-bound queries that spend most of their time reading data, parallelism helps because modern storage systems (SSDs, NVMe drives) can handle multiple concurrent read requests faster than sequential ones. The operating system and storage controller can optimize the order of reads, and multiple threads keep the I/O pipeline busy.

For CPU-bound queries that spend their time on computation (aggregations, joins, complex expressions), parallelism directly multiplies throughput. If twelve cores each process their share of the data, the total time approaches one-twelfth of the single-threaded time.

In practice, most queries are a mix of both, and parallelism helps in both cases.

Data Parallelism

The form of parallelism we will explore is called data parallelism: running the same computation on different subsets of data simultaneously. If we have 100 million rows to process, we split them into chunks and process each chunk on a different thread.

This contrasts with pipeline parallelism, where different operators in the query run simultaneously on different stages of the data. Pipeline parallelism is harder to implement and offers less benefit for most query workloads, so most query engines focus on data parallelism.

Data parallelism requires the input data to be partitioned, meaning split into independent chunks that can be processed separately. The natural partitioning depends on how data is stored.

A Practical Example

The NYC taxi data set provides a convenient test case for parallel execution. The data is already partitioned by month, with one CSV file per month, giving us twelve partitions for a year of data. The most straightforward approach to parallel query execution is to use one thread per partition, execute the same query in parallel across all partitions, and then combine the results.

The source code for this example can be found at jvm/examples/src/main/kotlin/ParallelQuery.kt in the KQuery GitHub repository.

We will run an aggregate query across all twelve months in parallel using Kotlin coroutines. First, here is the single-threaded function for querying one partition:

fun executeQuery(path: String, month: Int, sql: String): List<RecordBatch> {
  val monthStr = String.format("%02d", month);
  val filename = "$path/yellow_tripdata_2019-$monthStr.csv"
  val ctx = ExecutionContext()
  ctx.registerCsv("tripdata", filename)
  val df = ctx.sql(sql)
  return ctx.execute(df).toList()
}

With this helper function, we can run the query in parallel across all twelve partitions:

val start = System.currentTimeMillis()
val deferred = (1..12).map {month ->
  GlobalScope.async {

    val sql = "SELECT passenger_count, " +
        "MAX(CAST(fare_amount AS double)) AS max_fare " +
        "FROM tripdata " +
        "GROUP BY passenger_count"

    val start = System.currentTimeMillis()
    val result = executeQuery(path, month, sql)
    val duration = System.currentTimeMillis() - start
    println("Query against month $month took $duration ms")
    result
  }
}
val results: List<RecordBatch> = runBlocking {
  deferred.flatMap { it.await() }
}
val duration = System.currentTimeMillis() - start
println("Collected ${results.size} batches in $duration ms")

Running on a desktop with 24 cores produces output like this:

Query against month 8 took 17074 ms
Query against month 9 took 18976 ms
Query against month 7 took 20010 ms
Query against month 2 took 21417 ms
Query against month 11 took 21521 ms
Query against month 12 took 22082 ms
Query against month 6 took 23669 ms
Query against month 1 took 23735 ms
Query against month 10 took 23739 ms
Query against month 3 took 24048 ms
Query against month 5 took 24103 ms
Query against month 4 took 25439 ms
Collected 12 batches in 25505 ms

The total duration (25.5 seconds) is roughly the same as the slowest individual query (25.4 seconds for April). All twelve queries ran concurrently, so the overall time was determined by the slowest partition rather than the sum of all partitions. A single-threaded approach would have taken roughly 250 seconds (the sum of all query times).

However, we now have a problem: the result is a list of twelve batches, each containing partial aggregates. There will be a result for passenger_count=1 from each of the twelve partitions, when we want a single combined result.

Combining Results

How we combine results from parallel execution depends on the type of query.

For projection and filter queries, results can simply be concatenated. If each partition produces filtered rows, the final result is just all those rows together, similar to SQL’s UNION ALL. No further processing is needed.

Aggregate queries require a two-phase approach that is often described using “map-reduce” terminology. The “map” phase runs the aggregate on each partition independently. The “reduce” phase combines those partial results into a final answer.

The combine step uses the same aggregate function for MIN, MAX, and SUM. To find the minimum across all partitions, we take the minimum of each partition’s minimum. The same logic applies to maximum and sum.

COUNT is different. We do not want the count of the counts. We want the sum of the counts. If partition A counted 1000 rows and partition B counted 2000 rows, the total count is 3000, not 2.

AVG is trickier still. The average of averages is not the correct overall average unless all partitions have the same number of rows. The correct approach is to compute the sum and count separately, then divide at the end. Some query engines rewrite AVG(x) into SUM(x) / COUNT(x) during planning specifically to handle parallel aggregation correctly.

For our taxi data example, we run a secondary aggregation on the partial results:

val sql = "SELECT passenger_count, " +
        "MAX(max_fare) " +
        "FROM tripdata " +
        "GROUP BY passenger_count"

val ctx = ExecutionContext()
ctx.registerDataSource("tripdata", InMemoryDataSource(results.first().schema, results))
val df = ctx.sql(sql)
ctx.execute(df).forEach { println(it) }

This produces the final result set:

1,671123.14
2,1196.35
3,350.0
4,500.0
5,760.0
6,262.5
7,80.52
8,89.0
9,97.5
0,90000.0

Partitioning Strategies

The “one thread per file” strategy worked well in our example because we had twelve files and roughly twelve cores. But this approach does not generalise well. What if we have thousands of small files? Starting a thread per file would create excessive overhead. What if we have one massive file? A single thread would process it while the others sit idle.

A better approach is to separate the concept of partitions (logical units of data) from workers (threads or processes). The query planner can then assign multiple partitions to each worker, or split large partitions across multiple workers, to balance the load.

File-Based Partitioning

The simplest form of partitioning uses files as partition boundaries. Each file becomes one partition. This works well when files are roughly equal in size and the number of files is appropriate for the available parallelism.

Row Group Partitioning

Some file formats have natural internal partitions. Apache Parquet files consist of multiple “row groups”, each containing a batch of columnar data (typically 128MB or so). A query planner can inspect the available Parquet files, enumerate all row groups across all files, and schedule reading these row groups across a fixed pool of worker threads.

This provides finer-grained parallelism than file-based partitioning. A single large Parquet file with ten row groups can be processed by ten workers, while ten small files might be processed by fewer workers to avoid overhead.

Splitting Unstructured Files

CSV and other text formats lack internal structure, making them harder to partition. We can inspect the file size and divide it into equal chunks, but record boundaries do not align with arbitrary byte offsets. A record might span two chunks.

The solution is to adjust chunk boundaries to record boundaries. After calculating the byte offset for a chunk boundary, we scan forward to find the next record delimiter (typically a newline, though this gets complicated with quoted fields that contain newlines). Each worker then knows the exact byte range of complete records it should process.

This complexity is one reason data engineering pipelines often convert CSV to Parquet early on. Parquet’s structured format makes subsequent parallel processing much simpler.

Partition Pruning

When data is organised into partitions based on column values, the query planner can skip entire partitions that cannot contain matching rows. This optimisation is called partition pruning.

A common convention is to use directory names containing key-value pairs to indicate partition contents:

/mnt/nyctaxi/csv/year=2019/month=1/tripdata.csv
/mnt/nyctaxi/csv/year=2019/month=2/tripdata.csv
...
/mnt/nyctaxi/csv/year=2019/month=12/tripdata.csv

Given this structure, a query filtering on WHERE year = 2019 AND month = 3 can read only the partition for March 2019, skipping the other eleven months entirely. This is a form of predicate push-down applied at the partition level.

The query planner examines filter predicates, identifies which ones reference partition keys, and eliminates partitions that cannot satisfy those predicates. For range queries like WHERE month >= 6, the planner would include partitions 6 through 12 and exclude partitions 1 through 5.

Partition pruning is particularly valuable for time-series data, where queries typically focus on recent time periods. A well-partitioned dataset can reduce I/O by orders of magnitude compared to scanning everything.

Parallel Joins

Joins present a different challenge for parallel execution than aggregates. With aggregates, we can process partitions independently and combine results at the end. Joins require matching rows from two different tables, and matching rows might be in different partitions.

Broadcast Join

When one side of a join is small enough to fit in memory, the simplest parallel strategy is the broadcast join. We load the small table entirely into memory on each worker, then each worker joins its partition of the large table against this shared copy.

For example, joining a 1-billion-row orders table with a 10,000-row products table: each worker loads all 10,000 products into memory, then processes its assigned partitions of the orders table, looking up product details as it goes. No coordination between workers is needed during execution because every worker has all the data it needs.

The broadcast join works well when the small table truly is small. If it grows too large, the memory overhead of replicating it to every worker becomes prohibitive.

Partitioned Hash Join

When both sides of a join are large, we need a different approach: the partitioned hash join (also called parallel hash join or shuffle hash join).

The key insight is that rows can only join if their join keys match. If we partition both tables by the join key using the same partitioning scheme, then rows that might join will end up in corresponding partitions. We can then perform independent hash joins on each pair of partitions.

Consider joining orders and customers on customer_id. We partition both tables by hashing customer_id into, say, 16 buckets. All orders for customer 12345 end up in the same bucket (perhaps bucket 7), and all details for customer 12345 also end up in bucket 7. Workers can then join bucket 7 of orders with bucket 7 of customers, completely independently of what happens in other buckets.

The process has two phases:

  1. Partition phase: Read both inputs and write each row to an appropriate partition based on the hash of its join key. This redistributes the data.

  2. Join phase: For each pair of partitions, perform a standard hash join. One side builds a hash table, the other side probes it.

The partition phase is the expensive part. It requires reading all data, computing hashes, and writing to temporary storage (either memory or disk). For distributed execution across multiple machines, this phase involves network transfer, which we will discuss in the next chapter.

Repartitioning and Exchange

The partitioned hash join illustrates a general concept: sometimes we need to reorganise data during query execution. Data arrives partitioned one way (perhaps by file), but we need it partitioned a different way (by join key, or into a single partition for final aggregation).

This reorganisation is called repartitioning or shuffling. The operator that performs it is often called an exchange operator.

An exchange operator reads its input partitions and writes to output partitions based on some partitioning scheme:

  • Hash partitioning: Rows are assigned to partitions based on the hash of one or more columns. This is used for joins and some aggregates.

  • Round-robin partitioning: Rows are distributed evenly across partitions without regard to content. This is useful for load balancing when the specific partition does not matter.

  • Single partition: All rows go to one partition. This is used for final aggregation or sorting when we need a single combined result.

For parallel execution on a single machine, the exchange operator might use shared memory queues or temporary files to pass data between threads. For distributed execution, it uses network transfer. The logical concept is the same; only the physical mechanism differs.

Understanding exchange operators is important because they represent the points in a query plan where parallelism changes. We will explore this further in the next chapter on distributed execution.

Limits of Parallelism

Not every query benefits equally from parallelism. Several factors limit how much speedup we can achieve.

Amdahl’s Law: If part of a computation must run sequentially, that sequential portion limits overall speedup. A query where 90% of the work can be parallelised achieves at most 10x speedup, no matter how many cores we throw at it, because the remaining 10% still takes the same amount of time.

Coordination overhead: Spawning threads, distributing work, and collecting results all have costs. For small datasets, this overhead can exceed the time saved by parallelism. There is a minimum dataset size below which single-threaded execution is actually faster.

Memory pressure: Parallel execution multiplies memory usage. If each of 12 workers builds a hash table for a join, we need 12 times the memory of a single worker. When memory runs short, workers spill to disk, which is dramatically slower.

Uneven partitions: If some partitions are larger than others, fast workers finish early and sit idle while slow workers complete their larger partitions. The overall time is determined by the slowest worker. Good partitioning schemes try to distribute work evenly, but this is not always achievable.

I/O bandwidth: Parallelism helps CPU-bound work more than I/O-bound work. If a query is bottlenecked on disk or network throughput, adding more CPU cores does not help once we saturate the available bandwidth.

Despite these limitations, parallel execution provides substantial benefits for most analytical queries on modern hardware. The key is understanding when it helps and when the overhead is not worthwhile.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2025 Andy Grove. All rights reserved.