Distributed Query Execution

The previous section on Parallel Query Execution covered some fundamental concepts such as partitioning, which we will build on in this section.

To somewhat over-simplify the concept of distributed query execution, the goal is to be able to create a physical query plan which defines how work is distributed to a number of "executors" in a cluster. Distributed query plans will typically contain new operators that describe how data is exchanged between executors at various points during query execution.

Let's use the example SQL query that we used in the previous chapter on Parallel Query Execution and look at the distributed planning implications of this query.

SELECT passenger_count,
MAX(max_fare)
FROM tripdata
GROUP BY passenger_count

We can execute this query in parallel on all partitions of data, with each executor in the cluster processing a subset of these partitions. However, we need to then combine all the resulting aggregated data onto a single node and then apply the final aggregate query so that we get a single result set without duplicate grouping keys (passenger_count in this case). Here is one possible logical query plan for representing this. Note the new Exchange operator which represents the exchange of data between executors. The physical plan for the exchange could be implemented by writing intermediate results to shared storage, or perhaps by streaming data directly to other executors.

HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)]
  Exchange:
    HashAggregate: groupBy=[passenger_count], aggr=[MAX(max_fare)]
      Scan: tripdata.parquet

The new Exchange operator is fundamentally different from other operators that we have discussed so far because we can't just build a tree of operators on a single node and start executing it. The query now requires co-ordination across executors which means that we now need to build a scheduler.

Distributed Query Scheduling

At a high level, the concept of a distributed query scheduler is not complex. The scheduler needs to examine the whole query and break it down into stages that can be executed in isolation (usually in parallel across the executors) and then schedule these stages for execution based on the available resources in the cluster. Once each query stage completes then any subsequent dependent query stages can be scheduled. This process repeats until all query stages have been executed.

The scheduler could also be responsible for managing the compute resources in the cluster so that extra executors can be started on demand to handle the query load.

In the remainder of this chapter, we will discuss the following topics, referring to Ballista and the design that is being implemented in that project.

  • Managing compute resources in the cluster
  • Serializing query plans and exchanging them with executors
  • Exchange intermediate results between executors
  • Optimizing distributed queries

Managing Compute Resources

A logical place to start is to decide how executors can be deployed and managed in a networked environment. There are many resource scheduling and orchestration tools available today, including YARN, Mesos, and Kubernetes. There are also projects that aim to provide an abstraction over these resource schedulers, such as Apache YuniKorn.

Ballista uses Kubernetes for the following reasons:

  • All the major cloud providers have first-class support for Kubernetes. Amazon has the Elastic Kubernetes Service (EKS), Microsoft has the Azure Kubernetes Service (AKS), and Google has the Google Kubernetes Engine (GKE).
  • There is huge momentum behind Kubernetes, and although there is a steep learning curve and some raw edges, I am confident that it will become easier over time.
  • Projects such as MicroK8s and Minikube makes it easy to create a Kubernetes cluster on a local development machine.

Kubernetes is an open-source framework for managing containerized workloads and services using a declarative configuration. This means that in order to make something happen in Kubernetes, we just need to declare the desired state in a YAML file and submit it to the Kubernetes API. Kubernetes will then perform the necessary actions to achieve the desired state.

Kubernetes used the term "job" for running containers that are intended to start, perform some processing, and then stop. The term "pod" is used for containers that are intended to stay running until requested to stop.

Here is an example YAML file for running a job.

apiVersion: batch/v1
kind: Job
metadata:
  name: parallel-aggregate
spec:
  template:
    spec:
      containers:
        - name: parallel-aggregate
          image: ballistacompute/parallel-aggregate-rs

This yaml file can be submitted to a Kubernetes cluster using the kubectl command line utility.

kubectl apply -f parallel-aggregate-rs.yaml

Here is an example YAML file for creating a cluster of 12 Ballista Rust executors. The Kubernetes "StatefulSet" concept is used here. The request is for 12 replicas and each instanced will be assigned a unique replica number within that range.

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: ballista
spec:
  serviceName: "ballista"
  replicas: 12
  selector:
    matchLabels:
      app: ballista
  template:
    metadata:
      labels:
        app: ballista
        ballista-cluster: ballista
    spec:
      containers:
      - name: ballista
        image: ballistacompute/rust:0.2.0-alpha-2

Kubernetes makes service discovery easy. Each container running in a Kubernetes cluster can inspect environment variables to find the host and port for other services running in the same cluster. Kubernetes can also be configured to create DNS entries for services.

Serializing a Query Plan

The query scheduler needs to send fragments of the overall query plan to executors for execution. As we saw in earlier chapters, logical and physical query plans are hierarchical data structures defining the steps needed to execute a query. To refresh our memories, here is an example of a logical query plan.

Projection: #id, #first_name, #last_name, #state, #salary
    Filter: #state = 'CO'
        Scan: employee; projection=None

There are a number of options for serializing a query plan so that it can be passed between processes. Many query engines choose the strategy of using the programming languages native serialization support, which is a suitable choice if there is no requirement to be able to exchange query plans between different programming languages and this is usually the simplest mechanism to implement.

However, there are advantages in using a serialization format that is programming language-agnostic. Ballista uses Google's Protocol Buffers format to define query plans. The project is typically abbreviated as "protobuf".

Here is a subset of the Ballista protocol buffer definition of a query plan.

Full source code can be found at proto/ballista.proto in the Ballista github repository.

message LogicalPlanNode {
  LogicalPlanNode input = 1;
  FileNode file = 10;
  ProjectionNode projection = 20;
  SelectionNode selection = 21;
  LimitNode limit = 22;
  AggregateNode aggregate = 23;
}

message FileNode {
  string filename = 1;
  Schema schema = 2;
  repeated string projection = 3;
}

message ProjectionNode {
  repeated LogicalExprNode expr = 1;
}

message SelectionNode {
  LogicalExprNode expr = 2;
}

message AggregateNode {
  repeated LogicalExprNode group_expr = 1;
  repeated LogicalExprNode aggr_expr = 2;
}

message LimitNode {
  uint32 limit = 1;
}

The protobuf project provides tools for generating language-specific source code for serializing and de-serializing data.

Serializing Data

Data must also be serialized as it is streamed between clients and executors and between executors.

Apache Arrow provides an IPC (Inter-process Communication) format for exchanging data between processes. Because of the standardized memory layout provided by Arrow, the raw bytes can be transferred directly between memory and an input/output device (disk, network, etc) without the overhead typically associated with serialization. This is effectively a zero copy operation because the data does not have to be transformed from its in-memory format to a separate serialization format.

However, the metadata about the data, such as the schema (column names and data types) does need to be encoded using Google Flatbuffers. This metadata is small and is typically serialized once per result set or per batch so the overhead is small.

Another advantage of using Arrow is that it provides very efficient exchange of data between different programming languages.

Apache Arrow IPC defines the data encoding format but not the mechanism for exchanging it. Arrow IPC could be used to transfer data from a JVM language to C or Rust via JNI for example.

Choosing a Protocol

Now that we have chosen serialization formats for query plans and data, the next question is how do we exchange this data between distributed processes.

Apache Arrow provides a Flight protocol which is intended for this exact purpose. Flight is a new general-purpose client-server framework to simplify high performance transport of large datasets over network interfaces.

The Arrow Flight libraries provide a development framework for implementing a service that can send and receive data streams. A Flight server supports several basic kinds of requests:

  • Handshake: a simple request to determine whether the client is authorized and, in some cases, to establish an implementation-defined session token to use for future requests
  • ListFlights: return a list of available data streams
  • GetSchema: return the schema for a data stream
  • GetFlightInfo: return an “access plan” for a dataset of interest, possibly requiring consuming multiple data streams. This request can accept custom serialized commands containing, for example, your specific application parameters.
  • DoGet: send a data stream to a client
  • DoPut: receive a data stream from a client
  • DoAction: perform an implementation-specific action and return any results, i.e. a generalized function call
  • ListActions: return a list of available action types

The GetFlightInfo method could be used to compile a query plan and return the necessary information for receiving the results, for example, followed by calls to DoGet on each executor to start receiving the results from the query.

Streaming

It is important that results of a query can be made available as soon as possible and be streamed to the next process that needs to operate on that data, otherwise there would be unacceptable latency involved as each operation would have to wait for the previous operation to complete.

However, some operations require all the input data to be received before any output can be produced. A sort operation is a good example of this. It isn't possible to completely sort a dataset until the whole data set has been received. The problem can be alleviated by increasing the number of partitions so that a large number of partitions are sorted in parallel and then the sorted batches can be combined efficiently using a merge operator.

Custom Code

It is often necessary to run custom code as part of a distributed query or computation. For a single language query engine it is often possible to use the language's built-in serialization mechanism to transmit this code over the network at query execution time which is very convenient during development. Another approach is to publish compiled code to a repository so that it can be downloaded into a cluster at runtime. For JVM based systems, a maven repository could be used. A more general purpose approach is to package all runtime dependencies into a Docker image.

The query plan needs to provide the necessary information to load the user code at runtime. For JVM based systems this could be a classpath and a class name. For C based systems, this could be the path to a shared object. In either case, the user code will need to implement some known API.

Distributed Query Optimizations

Distributed query execution has a lot of overhead compared to parallel query execution on a single host and should only be used when there is benefit in doing so. I recommend reading the paper Scalability! But at what COST for some interesting perspectives on this topic.

Also, there are many ways to distribute the same query so how do we know which one to use?

One answer is to build a mechanism to determine the cost of executing a particular query plan and then create some subset of all possible combinations of query plan for a given problem and determine which one is most efficient.

There are many factors involved in computing the cost of an operation and there are different resource costs and limitations involved.

  • Memory: We are typically concerned with availability of memory rather than performance. Processing data in memory is orders of magnitude faster than reading and writing to disk.
  • CPU: For workloads that are parallelizable, more CPU cores means better throughput.
  • GPU: Some operations are orders of magnitude faster on GPUs compared to CPUs.
  • Disk: Disks have finite read and write speeds and cloud vendors typically limit the number of I/O operations per second (IOPS). Different types of disk have different performance characteristics (spinning disk vs SSD vs NVMe).
  • Network: Distributed query execution involves streaming data between nodes. There is a throughput limitation imposed by the networking infrastructure.
  • Distributed Storage: It is very common for source data to be stored in a distributed file system (HDFS) or object store (Amazon S3, Azure Blob Storage) and there is a cost in transferring data between distributed storage and local file systems.
  • Data Size: The size of the data matters. When performing a join between two tables and data needs to be transferred over the network, it is better to transfer the smaller of the two tables. If one of the tables can fit in memory than a more efficient join operation can be used.
  • Monetary Cost: If a query can be computed 10% faster at 3x the cost, is it worth it? That is a question best answered by the user of course. Monetary costs are typically controlled by limiting the amount of compute resource that is available.

Query costs can be computed upfront using an algorithm if enough information is known ahead of time about the data, such as how large the data is, the cardinality of the partition of join keys used in the query, the number of partitions, and so on. This all depends on certain statistics being available for the data set being queried.

Another approach is to just start running a query and have each operator adapt based on the input data it receives. Apache Spark 3.0.0 introduced an Adaptive Query Execution feature that does just this.

Join Reordering

At planning time, the query optimizer can sometimes determine optimal join order based on available statistics. However, once we start executing queries we can often have much more accurate statistics available and can take the opportunity to further tune the query. For example, if the query contains a hash join then we want to use the smaller side of the join as the build side.

Caching Intermediate Results

It is very common for the same query to be executed multiple times per day with different parameters but some steps of the execution plan remain the same. It can be beneficial to persist intermediate results from query execution to avoid having to repeat the same operations repeatedly. It can also be beneficial if the data is cached on fast local storage such as NVMe drives, removing the need to repeatedly fetch the data from distributed storage. Dremio's Data Lake Engine and Snowflake's Cloud Data Platform are good examples of sophisticated query engines that leverage these techniques.

Of course, this level of sophistication introduces new challenges, such as taking data locality into account when planning where queries should run.

Materialized Views

Another optimization technique, especially for data that is constantly changing, is to build materialized views that can updated constantly as new data arrives, to avoid the cost of having to build the view from scratch. Materialize is a good example of this concept.

Recovering from Distributed Query Failures

There are a lot of moving parts involved in distributed query execution and there is a high likelihood that something will fail during query execution so it is important that there is some kind of recovery mechanism.

The simplest solution is to wait for the rest of the query to complete or fail and then try the whole query again, but this obviously very expensive and frustrating for the user.

One of the benefits of Kubernetes is that it will restart failed pods automatically but there is still a need for the query scheduler to monitor the executors to know when to execute the next phase of a query.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2022 Grove Enterprises, LLC. All rights reserved.