Logical Plans & Expressions

_The source code discussed in this chapter can be found in the logical-plan module of the KQuery project. _ A logical plan represents a relation (a set of tuples) with a known schema. Each logical plan can have zero or more logical plans as inputs. It is convenient for a logical plan to expose its child plans so that a visitor pattern can be used to walk through the plan.

interface LogicalPlan {
  fun schema(): Schema
  fun children(): List<LogicalPlan>
}

Printing Logical Plans

It is important to be able to print logical plans in human-readable form to help with debugging. Logical plans are typically printed as a hierarchical structure with child nodes indented.

We can implement a simple recursive helper function to format a logical plan.

fun format(plan: LogicalPlan, indent: Int = 0): String {
  val b = StringBuilder()
  0.rangeTo(indent).forEach { b.append("\t") }
  b.append(plan.toString()).append("\n")
  plan.children().forEach { b.append(format(it, indent+1)) }
  return b.toString()
}

Here is an example of a logical plan formatted using this method.

Projection: #id, #first_name, #last_name, #state, #salary
  Filter: #state = 'CO'
    Scan: employee.csv; projection=None

Serialization

It is sometimes desirable to be able to serialize query plans so that they can easily be transferred to another process. It is good practice to add serialization early on as a precaution against accidentally referencing data structures that cannot be serialized (such as file handles or database connections).

One approach would be to use the implementation languages' default mechanism for serializing data structures to/from a format such as JSON. In Java, the Jackson library could be used. Kotlin has the kotlinx.serialization library, and Rust has a serde crate, for example.

Another option would be to define a language-agnostic serialization format using Avro, Thrift, or Protocol Buffers and then write code to translate between this format and the language-specific implementation.

Since publishing the first edition of this book, a new standard named "substrait" has emerged, with the goal of providing cross-language serialization for relational algebra. I am excited about this project and predict that it will become the de-facto standard for representing query plans and open up many integration possibilities. For example, it would be possible to use a mature Java-based query planner such as Apache Calcite, serialize the plan in Substrait format, and then execute the plan in a query engine implemented in a lower-level language, such as C++ or Rust. For more information, visit https://substrait.io/.

Logical Expressions

One of the fundamental building blocks of a query plan is the concept of an expression that can be evaluated against data at runtime.

Here are some examples of expressions that are typically supported in query engines.

ExpressionExamples
Literal Value"hello", 12.34
Column Referenceuser_id, first_name, last_name
Math Expressionsalary * state_tax
Comparison Expressionx >= y
Boolean Expressionbirthday = today() AND age >= 21
Aggregate ExpressionMIN(salary), MAX(salary), SUM(salary), AVG(salary), COUNT(*)
Scalar FunctionCONCAT(first_name, " ", last_name)
Aliased Expressionsalary * 0.02 AS pay_increase

Of course, all of these expressions can be combined to form deeply nested expression trees. Expression evaluation is a textbook case of recursive programming.

When we are planning queries, we will need to know some basic metadata about the output of an expression. Specifically, we need to have a name for the expression so that other expressions can reference it and we need to know the data type of the values that the expression will produce when evaluated so that we can validate that the query plan is valid. For example, if we have an expression a + b then it can only be valid if both a and b are numeric types.

Also note that the data type of an expression can be dependent on the input data. For example, a column reference will have the data type of the column it is referencing, but a comparison expression always returns a Boolean value.

interface LogicalExpr {
  fun toField(input: LogicalPlan): Field
}

Column Expressions

The Column expression simply represents a reference to a named column. The metadata for this expression is derived by finding the named column in the input and returning that column's metadata. Note that the term "column" here refers to a column produced by the input logical plan and could represent a column in a data source, or it could represent the result of an expression being evaluated against other inputs.

class Column(val name: String): LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
    return input.schema().fields.find { it.name == name } ?:
      throw SQLException("No column named '$name'")
  }

  override fun toString(): String {
    return "#$name"
  }

}

Literal Expressions

We need the ability to represent literal values as expressions so that we can write expressions such as salary * 0.05.

Here is an example expression for literal strings.

class LiteralString(val str: String): LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
    return Field(str, ArrowTypes.StringType)
  }

  override fun toString(): String {
    return "'$str'"
  }

}

Here is an example expression for literal longs.

class LiteralLong(val n: Long): LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
      return Field(n.toString(), ArrowTypes.Int64Type)
  }

  override fun toString(): String {
      return n.toString()
  }

}

Binary Expressions

Binary expressions are simply expressions that take two inputs. There are three categories of binary expressions that we will implement, and those are comparison expressions, Boolean expressions, and math expressions. Because the string representation is the same for all of these, we can use a common base class that provides the toString method. The variables "l" and "r" refer to the left and right inputs.

abstract class BinaryExpr(
    val name: String,
    val op: String,
    val l: LogicalExpr,
    val r: LogicalExpr) : LogicalExpr {

  override fun toString(): String {
    return "$l $op $r"
  }
}

Comparison expressions such as = or < compare two values of the same data type and return a Boolean value. We also need to implement Boolean operators AND and OR which also take two arguments and produce a Boolean result, so we can use a common base class for these as well.

abstract class BooleanBinaryExpr(
    name: String,
    op: String,
    l: LogicalExpr,
    r: LogicalExpr) : BinaryExpr(name, op, l, r) {

  override fun toField(input: LogicalPlan): Field {
      return Field(name, ArrowTypes.BooleanType)
  }

}

This base class provides a concise way to implement the concrete comparison expressions.

Comparison Expressions

/** Equality (`=`) comparison */
class Eq(l: LogicalExpr, r: LogicalExpr)
	: BooleanBinaryExpr("eq", "=", l, r)

/** Inequality (`!=`) comparison */
class Neq(l: LogicalExpr, r: LogicalExpr)
	: BooleanBinaryExpr("neq", "!=", l, r)

/** Greater than (`>`) comparison */
class Gt(l: LogicalExpr, r: LogicalExpr)
	: BooleanBinaryExpr("gt", ">", l, r)

/** Greater than or equals (`>=`) comparison */
class GtEq(l: LogicalExpr, r: LogicalExpr)
	: BooleanBinaryExpr("gteq", ">=", l, r)

/** Less than (`<`) comparison */
class Lt(l: LogicalExpr, r: LogicalExpr)
	: BooleanBinaryExpr("lt", "<", l, r)

/** Less than or equals (`<=`) comparison */
class LtEq(l: LogicalExpr, r: LogicalExpr)
	: BooleanBinaryExpr("lteq", "<=", l, r)

Boolean Expressions

The base class also provides a concise way to implement the concrete Boolean logic expressions.

/** Logical AND */
class And(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("and", "AND", l, r)

/** Logical OR */
class Or(l: LogicalExpr, r: LogicalExpr)
    : BooleanBinaryExpr("or", "OR", l, r)

Math Expressions

Math expressions are another specialization of a binary expression. Math expressions typically operate on values of the same data type and produce a result of the same data type.

abstract class MathExpr(
    name: String,
    op: String,
    l: LogicalExpr,
    r: LogicalExpr) : BinaryExpr(name, op, l, r) {

  override fun toField(input: LogicalPlan): Field {
      return Field("mult", l.toField(input).dataType)
  }

}

class Add(l: LogicalExpr, r: LogicalExpr) : MathExpr("add", "+", l, r)
class Subtract(l: LogicalExpr, r: LogicalExpr) : MathExpr("subtract", "-", l, r)
class Multiply(l: LogicalExpr, r: LogicalExpr) : MathExpr("mult", "*", l, r)
class Divide(l: LogicalExpr, r: LogicalExpr) : MathExpr("div", "/", l, r)
class Modulus(l: LogicalExpr, r: LogicalExpr) : MathExpr("mod", "%", l, r)

Aggregate Expressions

Aggregate expressions perform an aggregate function such as MIN, MAX, COUNT, SUM, or AVG on an input expression.

abstract class AggregateExpr(
    val name: String,
    val expr: LogicalExpr) : LogicalExpr {

  override fun toField(input: LogicalPlan): Field {
    return Field(name, expr.toField(input).dataType)
  }

  override fun toString(): String {
    return "$name($expr)"
  }
}

For aggregate expressions where the aggregated data type is the same as the input type, we can simply extend this base class.

class Sum(input: LogicalExpr) : AggregateExpr("SUM", input)
class Min(input: LogicalExpr) : AggregateExpr("MIN", input)
class Max(input: LogicalExpr) : AggregateExpr("MAX", input)
class Avg(input: LogicalExpr) : AggregateExpr("AVG", input)

For aggregate expressions where the data type is not dependent on the input type, we need to override the toField method. For example, the "COUNT" aggregate expression always produces an integer regardless of the data type of the values being counted.

class Count(input: LogicalExpr) : AggregateExpr("COUNT", input) {

  override fun toField(input: LogicalPlan): Field {
    return Field("COUNT", ArrowTypes.Int32Type)
  }

  override fun toString(): String {
    return "COUNT($expr)"
  }
}

Logical Plans

With the logical expressions in place, we can now implement the logical plans for the various transformations that the query engine will support.

Scan

The Scan logical plan represents fetching data from a DataSource with an optional projection. Scan is the only logical plan in our query engine that does not have another logical plan as an input. It is a leaf node in the query tree.

class Scan(
    val path: String,
    val dataSource: DataSource,
    val projection: List<String>): LogicalPlan {

  val schema = deriveSchema()

  override fun schema(): Schema {
    return schema
  }

  private fun deriveSchema() : Schema {
    val schema = dataSource.schema()
    if (projection.isEmpty()) {
      return schema
    } else {
      return schema.select(projection)
    }
  }

  override fun children(): List<LogicalPlan> {
    return listOf()
  }

  override fun toString(): String {
    return if (projection.isEmpty()) {
      "Scan: $path; projection=None"
    } else {
      "Scan: $path; projection=$projection"
    }
  }

}

Projection

The Projection logical plan applies a projection to its input. A projection is a list of expressions to be evaluated against the input data. Sometimes this is as simple as a list of columns, such as SELECT a, b, c FROM foo, but it could also include any other type of expression that is supported. A more complex example would be SELECT (CAST(a AS float) * 3.141592)) AS my_float FROM foo.

class Projection(
    val input: LogicalPlan,
    val expr: List<LogicalExpr>): LogicalPlan {

  override fun schema(): Schema {
    return Schema(expr.map { it.toField(input) })
  }

  override fun children(): List<LogicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "Projection: ${ expr.map {
        it.toString() }.joinToString(", ")
    }"
  }
}

Selection (also known as Filter)

The selection logical plan applies a filter expression to determine which rows should be selected (included) in its output. This is represented by the WHERE clause in SQL. A simple example would be SELECT * FROM foo WHERE a > 5. The filter expression needs to evaluate to a Boolean result.

class Selection(
    val input: LogicalPlan,
    val expr: Expr): LogicalPlan {

  override fun schema(): Schema {
    // selection does not change the schema of the input
    return input.schema()
  }

  override fun children(): List<LogicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "Filter: $expr"
  }
}

Aggregate

The Aggregate logical plan is more complex than Projection, Selection, or Scan and calculates aggregates of underlying data such as calculating minimum, maximum, averages, and sums of data. Aggregates are often grouped by other columns (or expressions). A simple example would be SELECT job_title, AVG(salary) FROM employee GROUP BY job_title.

class Aggregate(
    val input: LogicalPlan,
    val groupExpr: List<LogicalExpr>,
    val aggregateExpr: List<AggregateExpr>) : LogicalPlan {

  override fun schema(): Schema {
    return Schema(groupExpr.map { it.toField(input) } +
         		  aggregateExpr.map { it.toField(input) })
  }

  override fun children(): List<LogicalPlan> {
    return listOf(input)
  }

  override fun toString(): String {
    return "Aggregate: groupExpr=$groupExpr, aggregateExpr=$aggregateExpr"
  }
}

Note that in this implementation, the output of the aggregate plan is organized with grouping columns followed by aggregate expressions. It will often be necessary to wrap the aggregate logical plan in a projection so that columns are returned in the order requested in the original query.

This book is also available for purchase in ePub, MOBI, and PDF format from https://leanpub.com/how-query-engines-work

Copyright © 2020-2023 Andy Grove. All rights reserved.