By Julien Le Dem

In 2018, I wanted to describe how the components of databases, distributed or not, were being commoditized as individual parts that anyone could recombine into use-case specific engines. Given your own constraints, you can leverage those components to build a query engine that solves your problem much faster than building everything from the ground up. I liked the idea of calling it “the Deconstructed Database” and gave a few talks about it.

Since then, It’s been incredible to see the adoption of key components like Parquet, Arrow, Iceberg, Calcite and OpenLineage. They provide an interoperability layer that enables using data for many purposes without creating silos or duplication. As compute and storage can be efficiently decoupled, a common storage layer enables a vibrant ecosystem of on-demand tools specialized to specific use cases that avoids vendor lock-in.

Recently there’s been more talk around the same idea of building composable data systems. Arrow and Iceberg have grown exponentially in popularity and some of the aspirational ideas in my talk in 2018 are now well established. We do live in the future I was hoping for then.

In this post, I want to explain in more detail what those components are and, more importantly, the contracts that keep them decoupled and composable.

The constraint space

First, we need to define the constraint space. The trade-offs you pick are going to impact the design of the system and how those components get rearranged. One universal constraint is cost, but beyond this there are other reasons why a distributed system will not scale to an infinitely large number of machines. Here are a few of the parameters to keep in mind:

Volume

One interesting tradeoff is considering whether the data fits on one machine, on disk or in memory. This is also dependent on the relative cost of a single large machine vs several small ones and the effect on reliability, latency and performance incurred by distributed processing.

Today, you see machines with many more cores and larger memory compared to ten years ago. The ratio of read times from disk compared to memory or network has also changed over time. This significantly affects the patterns of processing data and what approach is faster.

As a consequence, the design of a newer single machine system like DuckDB is significantly different from a distributed one like Presto which was created almost 10 years before.

Latency

Another trade-off is how soon you want to be able to query data after it is created.

OLAP engines typically assume higher latency is acceptable and bulk import data into a columnar format for performance purposes. This requires buffering rows to reach a big enough batch size for efficient columnar access.

Supporting lower latency so that data becomes visible very quickly to queries requires more expensive storage that’s more optimized for OLTP. A common compromise is to combine multiple tiers of storage. The data can start in row-oriented storage and be asynchronously batch converted to columnar. The query engine knows how to combine data from both tiers. This solves latency needs at the cost of increased complexity. Sometimes predictable latency is more important than the lowest possible latency as it enables better caching strategies.

Precision

Depending on whether exact results are required or not, faster and less expensive systems can be built.

For example, you can accept losing some data if latency is important and recency is more important than completeness.

Using sampled data or approximate data structures will speed up queries with a more controlled error band.

The result of using sampling is a decreased data volume that can provide significant speedups if it allows the dataset to fit on a single machine, or allows it to move to a faster storage tier instead of incurring the overhead of distributed processing or larger but slower storage tiers.

Approximate data structures like HyperLogLog or Count Min Sketch have the nice property of being associative and can be much more economically computed than an exact distinct count or Top K.

For example, you can not add two partial distinct counts computed in parallel on two subsets of your dataset. You need all distinct values to be kept in the working set to be able to combine two partial distinct counts. On the contrary, two HyperLogLog results from two subsets can easily just be added together and require a much smaller memory footprint.

Both sampling and approximate data structures have well-understood trade-offs between storage/memory usage and error bounds on the resulting data. This enables making sound decisions in the presence of results that might be approximate but with a bounded error.

The components and their abstractions

The deconstructed database breaks down in a few layers, from the query language to data storage.

The deconstructed database overview

I organized in a “frontend” box the very first steps of the Query execution lifecycle. Before any data is read or written, we need to parse the query, and figure out the best plan to execute it.

Once the plan is established, the query will get executed. During this step, one critical aspect is retrieving data efficiently from the storage layer. This is one key area where a standardized contract enables interchangeable components without sacrificing performance.

Query parser

There are two main abstractions people use to create data transformation.

The first is the relational model and SQL.

Select country, count(*) c from user
WHERE status = 'active'
Group by country 
Order by country desc

The second is dataframe.

spark.read.parquet("/table")
     .filter("status = 'active'")
     .groupBy("country")
     .count()
     .sort("country", ascending=False)
     .show()

Independently of how you express the transformation it is turned into a logical plan expressing the logical transformation of the inputs into the outputs.

Query optimizer

At a high level, the query optimizer turns the logical plan into a physical plan for the execution layer to actually process and materialize the resulting data in the most efficient way.

Part of understanding the query plan requires the optimizer to access a catalog to determine which tables exist and what columns they contain.

The catalog will also contain metadata that will inform the planner on the most efficient way to process that data.

The optimizer will first rewrite the logical plan to push down limits, projections, filters and aggregations to the data sources as much as possible to reduce the amount of data that needs to be read and decoded.

Whether the data is stored sorted or partitioned will also affect how efficiently the plan can be executed. As an example, at the physical layer, the optimizer will pick the fastest join order and implementation depending on which tables are big or small and the distribution of the key space.

Calcite is a query optimizer framework that provides that frontend layer and can generate a physical plan for an execution layer to execute. It is used among other projects in Apache Hive.

Substrait is a standardized plan representation that can be used to facilitate interoperability between various DSLs, the optimizer, the execution and storage layers.

Execution

The execution layer will follow the physical plan, read the data from one or more Data source implementations and either return a result set or write back the result in the storage layer.

Apache Arrow has emerged as a standard representation for fast in-memory vectorized execution and zero-copy data exchange between nodes.

Projects like Velox and Datafusion make it easy to implement state of the art vectorized execution.

Datafusion has been used recently for things as diverse as timeseries engines, a stream processing engine and a vectorized implementation of Spark. These examples illustrate the power of composability.

Arrow also enables implementation of performant function libraries that will integrate well in a vectorized engine while being completely portable independently of what language they are written in.

Storage layer / Data sources

Retrieving data in the Arrow format from data sources ensures that it is already in an efficient representation for execution and won’t require additional preparation. The Query optimizer will have provided push downs to reduce the quantity of data read.

These push downs include:

  • Projection: What subset of the columns is being read. A columnar storage will be able to efficiently retrieve them independently of other columns.
  • Predicate: What filter is applied to the retrieved data. The storage layer can use the statistics it stores to skip data that won’t match the filter without decoding it.
  • Limit: The maximum number of rows to be returned.
  • Aggregation: If all we’re doing is counting rows or adding values, we do not need to materialize the data in memory before evaluating the aggregate.

A combination of a standard representation of push down (for example, as a substrait plan) and a standard representation of returned data in Arrow enables very loosely coupled storage layers that can be combined with various execution layers. If you need a low latency storage layer, you can create an in-memory service that returns Arrow. One example of this is InfluxDB 3.0’s ingester which uses DataFusion and Arrow. If you need cheaper storage with bulk load capabilities, you can use Iceberg to store parquet files in blob storage. If you need multiple storage tiers, you can make your optimizer aware of it to combine in the execution layers data from the same logical table split across two storage tiers.

In a future post I’ll explain how Iceberg is used to implement a very popular storage layer, the data lake. I’ll show how it works to efficiently retrieve data from blob storage leveraging Parquet.

Operational Lineage

Governance, compliance and data reliability have become a must-have of data systems. We must externally expose metadata about how data was derived from its upstream, study performance characteristics and facilitate change management and troubleshooting.

OpenLineage is the standard that allows collecting this information across the data ecosystem.

Typically, this integration is done by inspecting the logical and physical plan as well as the query profile resulting from the query execution.

Bringing it all together

A query optimizer like Calcite is the real conductor of this orchestra. It understands the capabilities and performance characteristics of the execution layer, it breaks down the plan into operations that can be pushed down to various storage layers to minimize IO and data materialization, and it generates a physical plan to bring it all together and generate a result. Meanwhile, Arrow is the key representation for exchanging and transforming data efficiently between those layers and producing an output, whether in the form of a new table or as a result set to the user.

Along the way, various storage components can be swapped out interchangeably or combined and vectorized execution engines like Velox or DataFusion can be used to efficiently process data. A popular storage layer is the data lake for which Iceberg and Parquet provide an open implementation.

Thank you Andrew Lamb and Ross Turk for the feedback.