The Metastore is a metadata store for PuffinDB tables. It extends the lakehouse's catalog with metadata that cannot be stored in the latter.
The Metastore manages partition-level column statistics that are physically stored on the Object Store (e.g. Amazon S3). These statistics are critical for helping the distributed query planner perform certain optimizations, or for allowing user interfaces to rapidly display quantitatively-rich previews of tables before querying (e.g. STOIC Table Editor).
- Support all major table formats (Iceberg, Delta, Hudi)
- Support tables with fairly large numbers of columns (hundreds or even thousands)
- Support user-defined summary statistics
- Optimize partial lookups for subsets of columns
- Optimize partial lookups for top frequencies
- Optimize partial lookups for histogram subsets
- Keep metadata files as small as possible
- Reduce development costs as much as possible
In order to fulfill these requirements, statistics are captured across three files per partition:
- A Parquet file for summary statistics (minimum, maximum, mean, etc.), with one column per summary statistic and one row per column
- A parquet file the the frequencies of discrete column, with columns for the column, value, and frequency, and one row per column·value
- A parquet file for the histograms of numerical columns, with one column per column and one row per bin (1,000 or 10,000)
This approach would offer the following benefits over the proposed Iceberg Pufin file format:
- No need to develop and maintain a new parser | serializer
- Low-latency lookup of statistics for specific columns
- Much smaller file size
- Totally independent from Iceberg Table Format, therefore compatible with Delta Lake and Hudi table formats.
This approach would offer the following benefits over a key-value store like DynamoDB or Redis:
- Lower cost
- Higher scalability (when serving large numbers of concurrent requests)
- Higher throughput (when using up to one serverless function per partition)
- One column per summary statistic (minimum, maximum, mean, etc.)
- One row per column of the related table
- Ordered as columns are ordered in the related table
This table format is optimized for storing both simple and complex summary statistics (e.g. Percentiles).
- Columns for column, value, and frequency
- One row per pair of column·value
- Ordered by column (as columns are ordered in the related table) and decreasing frequency
This table format is optimized for columns with large numbers of distinct values.
- One column per column of the related table
- One row per bin (1,000 to 10,000 bins)
- Ordered by increasing bin minimum value
This table format is optimized for storing high-resolution histograms.
Note: the Lance file format could be used as an alternative to Parquet for 100× faster random access.
Table-level column statistics are managed in a similar fashion and are generated by reducing partition-level column statistics.
Note: as an option, the table-level frequencies.parquet
file could include the frequency distribution of every value across every partition. This would dramatically accelerate the identification of partitions that contain certain values for a categorical column. Of course, this would also increase the size of this file, but Parquet makes it easy to retrieve only certain columns, therefore it should not affect the lookup performance of total frequencies (frequencies of values across all partitions). This would also make it more expensive to update this file whenever updates are made to a particular partition, but this overhead might be small in relation to the benefits offered by the consolidation of these frequency distributions. If this feature is implemented, frequency distributions should be stored in two complementary ways: vectors for dense distributions, and hashes for sparse distributions.
The computation of frequencies is implemented using three complementary algorithms:
- Low count of distinct values
- Low count of duplicate values
- All other cases
The following process is implemented to decide which algorithm to use:
- Partition-level frequencies are first computed by the serverless functions.
- From there, counts of distinct and duplicate values are sent to the Monostore, which then picks the right algorithm.
- Algorithm
#1
if the sum of counts of distinct values is lower than a threshold defined by the Monostore's size (1M for a small one). - Algorithm
#2
if the sum of counts of duplicate values is such that all duplicate values could be handled by a single serverless function. - Algorithm
#3
otherwise.
- All serverless functions stream their frequencies to the Monostore.
- All frequencies are reduced using the Redis instance running on the Monostore.
- The Monostore now holds an exhaustive set of frequencies for all values.
- All serverless functions stream their frequencies to the Monostore.
- All values are passed through a Bloom filter running on the Monostore.
- All possible duplicates are consolidated on a Redis instance running on the Monostore.
- Once all values have been filtered, all possible duplicates are broadcasted to the serverless functions, which confirm actual duplicates.
- Actual duplicates are sent by the serverless functions to the Monostore.
- The frequencies of duplicates are reduced using the Redis instance running on the Monostore.
- Distinct values are ommited from the frequency distribution to save space, unless an exhaustive list of values is required.
It is assumed that unique values of a categorical column for which frequencies must be computed in this case can be uniformly binned locally, in a fully distributed manner. This is true for dense ordinals and randomly-generated indentifiers (e.g. UUID). This assumption is reasonable, as categorical columns that have neither a low count of distinct values nor a low count of duplicate values are likely to be relationship columns, which are usually encoded using ordinals or randomly-generated identifiers. One case for which this assumption might not hold true is when data has been filtered, thereby resulting in a non-uniform distribution of unique values. In such a case, the algorithm outlined below will still work, albeit with sub-optimal performance.
- If values are encoded using ordinals, partition-level minimum and maximum values are sent to the Monostore and reduced there.
- Column-level minimum and maximum values are then broadcasted by the Monostore to the serverless functions.
- Partition-level frequencies are binned by values by the serverless functions, with as many bins as there are serverless functions.
- Bin-level frequencies are scattered across all serverless functions, using NAT hole punching.
- Bin-level frequencies are reduced within individual serverless functions.
- Bin-level frequencies are streamed to the Monostore.
- The Monostore now holds an exhaustive set of frequencies for all values.
While quantiles can be accurately approximated with the most recent versions of the t-digest algorithm, exact quantiles are required for certain applications. In such cases, ranks must be computed for column values through distributed sorting. The following algorithm implements distributed sorting in a fixed number of steps, without requiring a large amount of memory in a centralized location.
- Partition-level columns are first sorted by the serveless functions.
- Then, their minimums and maximums are reduced by the Monostore to compute minimum and maximum values for the entire column.
- From there, each serverless function computes a histogram of its partition-level column, with as many bins as there are functions.
- These partition-level histograms are then streamed from the serverless functions to the Monostore.
- Histograms are then reduced with bin-wise summation of counts by the Monostore.
- From there, a uniform set of value ranges with equal numbers of values is generated, with as many ranges as there are functions.
- This set is then broadcasted by the Monostore to the serverless function.
- From there, each serverless function scatters its sorted partition-level column values across all other serverless functions.
- This scattering is done according to value ranges, using NAT hole punching for direct function-to-function communication.
- From there, each serverless function sorts its values locally and sends its last value and its rank to the next function.
- This last value·rank exchange is then used to offset local ranks in order to produce global ranks.
- From there, accurate quantiles can be computed through simple reduction to the Monostore.
Note: computed ranks could be stored into a ranks.parquet
file that would accelerate rank updates following random data updates:
- Columns for column, value, and rank
- One row per pair of column·value
- Ordered by column (as columns are ordered in the related table) and increasing rank
This table format is optimized for columns with large numbers of distinct values and potentially-large numbers of duplicate values.
In most instances, the lookup of statistics for a given partition should take less than 100 ms, and this lookup can be parallelized across 10,000 serverless functions or more. If partitions are 50 MB in size compressed (500 MB uncompressed), 10,000 serverless functions could lookup column statistics for 500 GB of compressed data (5 TB uncompressed) in 100 ms. Moving to partitions that are 1 GB in size compressed (10 GB uncompressed) would let 10,000 serverless functions lookup the same column statistics for 10 TB of data compressed (100 TB uncompressed) within the same 100 ms. This suggests that larger partitions would be preferable (for the Metastore at least).
The Metastore provides an API for querying the statistics of table columns. This API is used by the distributed query planner to create a plan for a particular query defined with filtering predicates, and for which the pre-computed column statistics are not sufficient. This API is itself implemented by the distributed query engine, in a naturally recursive manner.