Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ my_config.yaml
my_config_catalog.yaml

# REST generated models
spec/generated
spec/generated
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public InternalDataFile convertAddActionToInternalDataFile(
boolean includeColumnStats,
DeltaPartitionExtractor partitionExtractor,
DeltaStatsExtractor fileStatsExtractor) {
FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields);
FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, deltaSnapshot, fields);
List<ColumnStat> columnStats =
includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList();
long recordCount = fileStats.getNumRecords();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ private InternalSchema toInternalSchema(
int openParenIndex = typeName.indexOf("(");
String trimmedTypeName = openParenIndex > 0 ? typeName.substring(0, openParenIndex) : typeName;
switch (trimmedTypeName) {
case "short":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the unit tests to cover this case?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah added

type = InternalType.INT;
break;
case "integer":
type = InternalType.INT;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
import lombok.extern.log4j.Log4j2;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;

import com.fasterxml.jackson.annotation.JsonAnySetter;
Expand All @@ -57,6 +61,8 @@
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.stat.Range;
import org.apache.xtable.parquet.ParquetMetadataExtractor;
import org.apache.xtable.parquet.ParquetStatsExtractor;

/**
* DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging
Expand Down Expand Up @@ -186,13 +192,64 @@ private void insertValueAtPath(Map<String, Object> jsonObject, String[] pathPart
}
}

/**
* Extracts column statistics for a Delta Lake data file. This method first attempts to read
* statistics from the Delta checkpoint (fast path). If checkpoint statistics are NULL or empty,
* it falls back to reading statistics directly from the Parquet file footer (slow path).
*
* <p>Delta Lake can store statistics in two locations:
*
* <ul>
* <li><b>Checkpoint files (JSON format):</b> Preferred and faster, but may be NULL if
* 'delta.checkpoint.writeStatsAsJson' is false or stats collection was disabled
* <li><b>Parquet file footers:</b> Fallback option that requires opening each data file
* individually, which is more expensive but ensures statistics are always available
* </ul>
*
* <p>Performance Considerations: When checkpoint statistics are NULL for many files, the fallback
* to Parquet footers can significantly slow down conversion. For large tables with thousands of
* files, consider enabling Delta checkpoint statistics via: {@code ALTER TABLE table_name SET
* TBLPROPERTIES ('delta.checkpoint.writeStatsAsJson' = 'true')}
*
* @param addFile the Delta AddFile action containing file metadata
* @param snapshot the Delta snapshot providing table context and base path
* @param fields the schema fields for which to extract statistics
* @return FileStats containing column statistics and record count
*/
public FileStats getColumnStatsForFile(
AddFile addFile, Snapshot snapshot, List<InternalField> fields) {
// Attempt to read statistics from Delta checkpoint (fast path)
String statsString = addFile.stats();

if (StringUtils.isNotEmpty(statsString)) {
log.debug("Reading stats from checkpoint for file: {}", addFile.path());
return parseStatsFromJson(statsString, fields);
}

// Checkpoint statistics are NULL or empty - fall back to Parquet footer (slow path)
log.debug(
"Stats not found in Delta checkpoint for file: {}, falling back to Parquet footer read",
addFile.path());
return readStatsFromParquetFooter(addFile, snapshot, fields);
}

/**
* Legacy method for backward compatibility. Use getColumnStatsForFile(AddFile, Snapshot, List)
* instead.
*/
public FileStats getColumnStatsForFile(AddFile addFile, List<InternalField> fields) {
if (StringUtils.isEmpty(addFile.stats())) {
String statsString = addFile.stats();
if (StringUtils.isEmpty(statsString)) {
return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build();
}
return parseStatsFromJson(statsString, fields);
}

/** Parses stats from JSON string and converts to FileStats. */
private FileStats parseStatsFromJson(String statsString, List<InternalField> fields) {
// TODO: Additional work needed to track maps & arrays.
try {
DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), DeltaStats.class);
DeltaStats deltaStats = MAPPER.readValue(statsString, DeltaStats.class);
collectUnsupportedStats(deltaStats.getAdditionalStats());

Map<String, Object> fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues());
Expand Down Expand Up @@ -229,6 +286,114 @@ public FileStats getColumnStatsForFile(AddFile addFile, List<InternalField> fiel
}
}

/**
* Reads column statistics directly from a Parquet file footer. This method is used as a fallback
* when Delta checkpoint statistics are NULL or unavailable.
*
* <p>This operation is expensive as it requires:
*
* <ul>
* <li>Opening each Parquet file individually (I/O overhead)
* <li>Reading the file footer metadata
* <li>Parsing column chunk metadata for all columns
* <li>Converting Parquet statistics to internal format
* </ul>
*
* <p>For cloud storage (S3, GCS, ADLS), this can add significant latency due to network overhead.
* The method performs several safety checks to prevent errors:
*
* <ul>
* <li>Filters out statistics with NULL min/max ranges (prevents NullPointerException)
* <li>Skips DECIMAL and complex types (prevents ClassCastException)
* <li>Validates Binary-to-primitive type conversions
* </ul>
*
* <p>Record Count: The record count is read from Parquet row group metadata, which is always
* reliable regardless of column statistics availability.
*
* @param addFile the Delta AddFile action containing the file path
* @param snapshot the Delta snapshot providing table base path
* @param fields the schema fields for which to extract statistics
* @return FileStats with extracted statistics, or empty stats if reading fails
*/
private FileStats readStatsFromParquetFooter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also make sure there is a test case to cover this flow. It can be a basic one, the detailed coverage for translating from parquet stats to our internal stats representation will be covered in #748

AddFile addFile, Snapshot snapshot, List<InternalField> fields) {
try {
// Construct absolute path to the Parquet data file
// Handle both absolute paths and relative paths from table base
String tableBasePath = snapshot.deltaLog().dataPath().toString();
String filePath = addFile.path();
String fullPath =
filePath.startsWith(tableBasePath) ? filePath : tableBasePath + "/" + filePath;

// Read Parquet file footer metadata using Hadoop FileSystem API
Configuration conf = new Configuration();
Path parquetPath = new Path(fullPath);

ParquetMetadata footer = ParquetMetadataExtractor.readParquetMetadata(conf, parquetPath);
List<ColumnStat> parquetStats = ParquetStatsExtractor.getColumnStatsForaFile(footer);

// Extract record count from Parquet row groups metadata
// This is always reliable and doesn't depend on column statistics
long numRecords = footer.getBlocks().stream().mapToLong(block -> block.getRowCount()).sum();

// Build lookup map for efficient field matching by path
Map<String, ColumnStat> pathToStat =
parquetStats.stream()
.collect(
Collectors.toMap(
stat -> stat.getField().getPath(),
Function.identity(),
(stat1, stat2) -> stat1)); // Keep first occurrence on collision

// Map Parquet stats to requested Delta schema fields
// Filter out statistics with NULL ranges to prevent downstream NullPointerException
List<ColumnStat> mappedStats =
fields.stream()
.filter(field -> pathToStat.containsKey(field.getPath()))
.map(
field -> {
ColumnStat parquetStat = pathToStat.get(field.getPath());
// Rebuild ColumnStat with correct Delta field reference
// while preserving Parquet statistics values
return ColumnStat.builder()
.field(field)
.numValues(parquetStat.getNumValues())
.numNulls(parquetStat.getNumNulls())
.totalSize(parquetStat.getTotalSize())
.range(parquetStat.getRange())
.build();
})
.filter(
stat ->
stat.getRange() != null
&& stat.getRange().getMinValue() != null
&& stat.getRange().getMaxValue() != null)
.collect(Collectors.toList());

log.debug(
"Successfully extracted {} column stats from Parquet footer for file: {}",
mappedStats.size(),
addFile.path());

return FileStats.builder().columnStats(mappedStats).numRecords(numRecords).build();

} catch (Exception e) {
// Log warning but continue conversion - the file will be added without statistics
// This is preferable to failing the entire conversion
log.warn(
"Failed to read stats from Parquet footer for file {}: {}. "
+ "File will be included without column statistics.",
addFile.path(),
e.getMessage());
Comment on lines +385 to +388
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Failed to read stats from Parquet footer for file {}: {}. "
+ "File will be included without column statistics.",
addFile.path(),
e.getMessage());
"Failed to read stats from Parquet footer for file {}. "
+ "File will be included without column statistics.",
addFile.path(),
e);

This will log out the full exception stacktrace to provide more details on the failure which makes it easier to debug.


// Return empty statistics but note that record count is also 0
// Delta AddFile doesn't contain record count, so we cannot preserve it here
// The file will still be added to target table with 0 record count in metadata
return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build();
}
}

private void collectUnsupportedStats(Map<String, Object> additionalStats) {
if (additionalStats == null || additionalStats.isEmpty()) {
return;
Expand All @@ -251,10 +416,18 @@ private void collectUnsupportedStats(Map<String, Object> additionalStats) {
*/
private Map<String, Object> flattenStatMap(Map<String, Object> statMap) {
Map<String, Object> result = new HashMap<>();
// Return empty map if input is null
if (statMap == null) {
return result;
}
Queue<StatField> statFieldQueue = new ArrayDeque<>();
statFieldQueue.add(StatField.of("", statMap));
while (!statFieldQueue.isEmpty()) {
StatField statField = statFieldQueue.poll();
// Skip if values map is null (can happen with malformed or partial stats)
if (statField.getValues() == null) {
continue;
}
String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + ".";
statField
.getValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ public Metrics toIceberg(Schema schema, long totalRowCount, List<ColumnStat> fie
valueCounts.put(fieldId, columnStats.getNumValues());
nullValueCounts.put(fieldId, columnStats.getNumNulls());
Type fieldType = icebergField.type();
if (columnStats.getRange().getMinValue() != null) {
// Add min/max bounds if available (they're optional in Iceberg Metrics)
// Native Iceberg includes columns even without bounds - they just have null bounds
Range range = columnStats.getRange();
if (range != null && range.getMinValue() != null) {
lowerBounds.put(
fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMinValue()));
fieldId, Conversions.toByteBuffer(fieldType, range.getMinValue()));
}
if (columnStats.getRange().getMaxValue() != null) {
if (range != null && range.getMaxValue() != null) {
upperBounds.put(
fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMaxValue()));
fieldId, Conversions.toByteBuffer(fieldType, range.getMaxValue()));
}
});
return new Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void init(TargetTable targetTable, Configuration configuration) {
IcebergPartitionSpecSync.getInstance(),
IcebergDataFileUpdatesSync.of(
IcebergColumnStatsConverter.getInstance(),
IcebergPartitionValueConverter.getInstance()),
IcebergPartitionValueConverter.getInstance(),
configuration),
IcebergTableManager.of(configuration));
}

Expand Down Expand Up @@ -226,6 +227,7 @@ public void syncFilesForSnapshot(List<PartitionFileGroup> partitionedDataFiles)
@Override
public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) {
dataFileUpdatesExtractor.applyDiff(
table,
transaction,
internalFilesDiff,
transaction.table().schema(),
Expand Down
Loading