From fcbd5c5dfa3431f8c8a95fda452154cdde23329e Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 19 Dec 2024 11:23:47 +0100 Subject: [PATCH 1/3] Core, Spark: Rewrite data files with high delete ratio --- .../actions/SizeBasedDataRewriter.java | 21 ++- .../actions/TestRewriteDataFilesAction.java | 129 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java index e5b5908804e7..ec88ef1fa4e1 100644 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java +++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java @@ -22,12 +22,14 @@ import java.util.Map; import java.util.Set; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.PropertyUtil; public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter { @@ -84,13 +86,30 @@ private boolean shouldRewrite(List group) { return enoughInputFiles(group) || enoughContent(group) || tooMuchContent(group) - || anyTaskHasTooManyDeletes(group); + || anyTaskHasTooManyDeletes(group) + || anyTaskHasTooHighDeleteRatio(group); } private boolean anyTaskHasTooManyDeletes(List group) { return group.stream().anyMatch(this::tooManyDeletes); } + private boolean anyTaskHasTooHighDeleteRatio(List group) { + return group.stream().anyMatch(this::tooHighDeleteRatio); + } + + private boolean tooHighDeleteRatio(FileScanTask task) { + if (ContentFileUtil.containsSingleDV(task.deletes())) { + DeleteFile file = Iterables.getOnlyElement(task.deletes()); + double deletedRecords = (double) Math.min(file.recordCount(), task.file().recordCount()); + double deleteRatio = deletedRecords / task.file().recordCount(); + // TODO: make this configurable + return deleteRatio >= 0.3; + } + + return false; + } + @Override protected long defaultTargetFileSize() { return PropertyUtil.propertyAsLong( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index bd307a468279..0785bf9d355f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -313,6 +313,135 @@ public void testBinPackAfterPartitionChange() { shouldHaveFiles(table, 20); } + @TestTemplate + public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + Table table = createTable(); + int numDataFiles = 5; + // 100 / 5 = 20 records per data file + writeRecords(numDataFiles, 100); + // delete > 100% of records for each data file + int numPositionsToDelete = 1000; + table.refresh(); + List dataFiles = TestHelpers.dataFiles(table); + assertThat(dataFiles).hasSize(numDataFiles); + + RowDelta rowDelta = table.newRowDelta(); + for (DataFile dataFile : dataFiles) { + writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } + + rowDelta.commit(); + + Set deleteFiles = TestHelpers.deleteFiles(table); + assertThat(deleteFiles).hasSize(numDataFiles); + + // there are 5 data files with a delete ratio of > 100% each, so all data files should be + // rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only + // triggered when the delete ratio of >= 30% is hit + RewriteDataFiles.Result result = + SparkActions.get(spark) + .rewriteDataFiles(table) + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10") + .execute(); + + assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); + + table.refresh(); + List newDataFiles = TestHelpers.dataFiles(table); + assertThat(newDataFiles).isEmpty(); + + Set newDeleteFiles = TestHelpers.deleteFiles(table); + assertThat(newDeleteFiles).isEmpty(); + } + + @TestTemplate + public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + Table table = createTable(); + int numDataFiles = 5; + // 100 / 5 = 20 records per data file + writeRecords(numDataFiles, 100); + // delete 40% of records for each data file + int numPositionsToDelete = 8; + table.refresh(); + List dataFiles = TestHelpers.dataFiles(table); + assertThat(dataFiles).hasSize(numDataFiles); + + RowDelta rowDelta = table.newRowDelta(); + for (DataFile dataFile : dataFiles) { + writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } + + rowDelta.commit(); + + Set deleteFiles = TestHelpers.deleteFiles(table); + assertThat(deleteFiles).hasSize(numDataFiles); + + // there are 5 data files with a delete ratio of 40% each, so all data files should be + // rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only + // triggered when the delete ratio of >= 30% is hit + RewriteDataFiles.Result result = + SparkActions.get(spark) + .rewriteDataFiles(table) + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10") + .execute(); + + assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); + + table.refresh(); + List newDataFiles = TestHelpers.dataFiles(table); + assertThat(newDataFiles).hasSize(1); + + Set newDeleteFiles = TestHelpers.deleteFiles(table); + assertThat(newDeleteFiles).isEmpty(); + } + + @TestTemplate + public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + Table table = createTable(); + int numDataFiles = 5; + // 100 / 5 = 20 records per data file + writeRecords(numDataFiles, 100); + // delete 25% of records for each data file + int numPositionsToDelete = 5; + table.refresh(); + List dataFiles = TestHelpers.dataFiles(table); + assertThat(dataFiles).hasSize(numDataFiles); + + RowDelta rowDelta = table.newRowDelta(); + for (DataFile dataFile : dataFiles) { + writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } + + rowDelta.commit(); + + Set deleteFiles = TestHelpers.deleteFiles(table); + assertThat(deleteFiles).hasSize(numDataFiles); + + // there are 5 data files with a delete ratio of 25% each, so data files should not be + // rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only + // triggered when the delete ratio of >= 30% is hit + RewriteDataFiles.Result result = + SparkActions.get(spark) + .rewriteDataFiles(table) + .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10") + .execute(); + + assertThat(result.rewrittenDataFilesCount()).isEqualTo(0); + + table.refresh(); + List newDataFiles = TestHelpers.dataFiles(table); + assertThat(newDataFiles).hasSameSizeAs(dataFiles); + + Set newDeleteFiles = TestHelpers.deleteFiles(table); + assertThat(newDeleteFiles).hasSameSizeAs(deleteFiles); + } + @TestTemplate public void testBinPackWithDeletes() throws IOException { assumeThat(formatVersion).isGreaterThanOrEqualTo(2); From 3dfd87e0a776f619b0f328922f549b5677b121fa Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 8 Jan 2025 11:06:47 +0100 Subject: [PATCH 2/3] review feedback --- .../actions/SizeBasedDataRewriter.java | 17 ++-- .../org/apache/iceberg/MockFileScanTask.java | 28 ++++++ .../actions/TestRewriteDataFilesAction.java | 92 ++++++++++++++++--- .../spark/actions/TestSparkFileRewriter.java | 42 +++++++++ 4 files changed, 161 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java index ec88ef1fa4e1..4459e8892b46 100644 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java +++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java @@ -21,8 +21,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -70,7 +70,8 @@ public void init(Map options) { @Override protected Iterable filterFiles(Iterable tasks) { - return Iterables.filter(tasks, task -> wronglySized(task) || tooManyDeletes(task)); + return Iterables.filter( + tasks, task -> wronglySized(task) || tooManyDeletes(task) || tooHighDeleteRatio(task)); } private boolean tooManyDeletes(FileScanTask task) { @@ -99,11 +100,15 @@ private boolean anyTaskHasTooHighDeleteRatio(List group) { } private boolean tooHighDeleteRatio(FileScanTask task) { - if (ContentFileUtil.containsSingleDV(task.deletes())) { - DeleteFile file = Iterables.getOnlyElement(task.deletes()); - double deletedRecords = (double) Math.min(file.recordCount(), task.file().recordCount()); + if (null == task.deletes() || task.deletes().isEmpty()) { + return false; + } + + if (ContentFileUtil.containsSingleDV(task.deletes()) + || task.deletes().stream().allMatch(ContentFileUtil::isFileScoped)) { + long sum = task.deletes().stream().mapToLong(ContentFile::recordCount).sum(); + double deletedRecords = (double) Math.min(sum, task.file().recordCount()); double deleteRatio = deletedRecords / task.file().recordCount(); - // TODO: make this configurable return deleteRatio >= 0.3; } diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java index 565433c82cb1..06a9d87da500 100644 --- a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java +++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java @@ -73,6 +73,34 @@ public static MockFileScanTask mockTaskWithDeletes(long length, int nDeletes) { return new MockFileScanTask(mockFile, mockDeletes); } + public static MockFileScanTask mockTaskWithFileScopedDeleteRecords( + long length, long recordCount, int numDeleteFiles, long deletedRecords) { + DeleteFile[] mockDeletes = new DeleteFile[numDeleteFiles]; + for (int i = 0; i < numDeleteFiles; i++) { + DeleteFile deleteFile = Mockito.mock(DeleteFile.class); + Mockito.when(deleteFile.recordCount()).thenReturn(deletedRecords); + Mockito.when(deleteFile.referencedDataFile()).thenReturn("random data file"); + mockDeletes[i] = deleteFile; + } + + DataFile dataFile = Mockito.mock(DataFile.class); + Mockito.when(dataFile.fileSizeInBytes()).thenReturn(length); + Mockito.when(dataFile.recordCount()).thenReturn(recordCount); + return new MockFileScanTask(dataFile, mockDeletes); + } + + public static MockFileScanTask mockTaskWithDVDeleteRecords( + long length, long recordCount, long deletedRecords) { + DeleteFile deleteFile = Mockito.mock(DeleteFile.class); + Mockito.when(deleteFile.recordCount()).thenReturn(deletedRecords); + Mockito.when(deleteFile.format()).thenReturn(FileFormat.PUFFIN); + + DataFile dataFile = Mockito.mock(DataFile.class); + Mockito.when(dataFile.fileSizeInBytes()).thenReturn(length); + Mockito.when(dataFile.recordCount()).thenReturn(recordCount); + return new MockFileScanTask(dataFile, new DeleteFile[] {deleteFile}); + } + @Override public long length() { return length; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 0785bf9d355f..bdbb8c176812 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -315,7 +315,7 @@ public void testBinPackAfterPartitionChange() { @TestTemplate public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception { - assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); Table table = createTable(); int numDataFiles = 5; // 100 / 5 = 20 records per data file @@ -328,14 +328,20 @@ public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception { RowDelta rowDelta = table.newRowDelta(); for (DataFile dataFile : dataFiles) { - writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) - .forEach(rowDelta::addDeletes); + if (formatVersion >= 3) { + writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } else { + writePosDeletes(table, dataFile.partition(), dataFile.location(), 4, numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } } rowDelta.commit(); Set deleteFiles = TestHelpers.deleteFiles(table); - assertThat(deleteFiles).hasSize(numDataFiles); + int expectedDataFiles = formatVersion >= 3 ? numDataFiles : numDataFiles * 4; + assertThat(deleteFiles).hasSize(expectedDataFiles); // there are 5 data files with a delete ratio of > 100% each, so all data files should be // rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only @@ -344,6 +350,7 @@ public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception { SparkActions.get(spark) .rewriteDataFiles(table) .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10") + .option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "0") .execute(); assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); @@ -358,7 +365,7 @@ public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception { @TestTemplate public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception { - assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); Table table = createTable(); int numDataFiles = 5; // 100 / 5 = 20 records per data file @@ -371,14 +378,20 @@ public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception { RowDelta rowDelta = table.newRowDelta(); for (DataFile dataFile : dataFiles) { - writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) - .forEach(rowDelta::addDeletes); + if (formatVersion >= 3) { + writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } else { + writePosDeletes(table, dataFile.partition(), dataFile.location(), 4, numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } } rowDelta.commit(); Set deleteFiles = TestHelpers.deleteFiles(table); - assertThat(deleteFiles).hasSize(numDataFiles); + int expectedDataFiles = formatVersion >= 3 ? numDataFiles : numDataFiles * 4; + assertThat(deleteFiles).hasSize(expectedDataFiles); // there are 5 data files with a delete ratio of 40% each, so all data files should be // rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only @@ -387,6 +400,7 @@ public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception { SparkActions.get(spark) .rewriteDataFiles(table) .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10") + .option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "0") .execute(); assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); @@ -401,7 +415,7 @@ public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception { @TestTemplate public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception { - assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); Table table = createTable(); int numDataFiles = 5; // 100 / 5 = 20 records per data file @@ -414,14 +428,20 @@ public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception { RowDelta rowDelta = table.newRowDelta(); for (DataFile dataFile : dataFiles) { - writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) - .forEach(rowDelta::addDeletes); + if (formatVersion >= 3) { + writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } else { + writePosDeletes(table, dataFile.partition(), dataFile.location(), 5, numPositionsToDelete) + .forEach(rowDelta::addDeletes); + } } rowDelta.commit(); Set deleteFiles = TestHelpers.deleteFiles(table); - assertThat(deleteFiles).hasSize(numDataFiles); + int expectedDataFiles = formatVersion >= 3 ? numDataFiles : numDataFiles * 5; + assertThat(deleteFiles).hasSize(expectedDataFiles); // there are 5 data files with a delete ratio of 25% each, so data files should not be // rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only @@ -430,6 +450,7 @@ public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception { SparkActions.get(spark) .rewriteDataFiles(table) .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10") + .option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "0") .execute(); assertThat(result.rewrittenDataFilesCount()).isEqualTo(0); @@ -2322,6 +2343,53 @@ private List writePosDeletes( return results; } + private List writePosDeletes( + Table table, + StructLike partition, + String path, + int outputDeleteFiles, + int totalPositionsToDelete) { + List results = Lists.newArrayList(); + for (int file = 0; file < outputDeleteFiles; file++) { + OutputFile outputFile = + table + .io() + .newOutputFile( + table + .locationProvider() + .newDataLocation( + FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()))); + EncryptedOutputFile encryptedOutputFile = + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY); + + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec(), null, null, null); + PositionDeleteWriter posDeleteWriter = + appenderFactory + .set(TableProperties.DEFAULT_WRITE_METRICS_MODE, "full") + .newPosDeleteWriter(encryptedOutputFile, FileFormat.PARQUET, partition); + + PositionDelete posDelete = PositionDelete.create(); + int positionsPerDeleteFile = totalPositionsToDelete / outputDeleteFiles; + + for (int position = file * positionsPerDeleteFile; + position < (file + 1) * positionsPerDeleteFile; + position++) { + posDeleteWriter.write(posDelete.set(path, position, null)); + } + + try { + posDeleteWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + results.add(posDeleteWriter.toDeleteFile()); + } + + return results; + } + private List writeDV( Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { OutputFileFactory fileFactory = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java index e223d2e16411..1f40b6cba918 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java @@ -69,6 +69,8 @@ public void testBinPackDataSelectFiles() { checkDataFileGroupWithEnoughFiles(rewriter); checkDataFileGroupWithEnoughData(rewriter); checkDataFileGroupWithTooMuchData(rewriter); + checkDataFilesWithHighFileScopedDeleteRatio(rewriter); + checkDataFilesWithHighDVDeleteRatio(rewriter); } @Test @@ -81,6 +83,8 @@ public void testSortDataSelectFiles() { checkDataFileGroupWithEnoughFiles(rewriter); checkDataFileGroupWithEnoughData(rewriter); checkDataFileGroupWithTooMuchData(rewriter); + checkDataFilesWithHighFileScopedDeleteRatio(rewriter); + checkDataFilesWithHighDVDeleteRatio(rewriter); } @Test @@ -94,6 +98,8 @@ public void testZOrderDataSelectFiles() { checkDataFileGroupWithEnoughFiles(rewriter); checkDataFileGroupWithEnoughData(rewriter); checkDataFileGroupWithTooMuchData(rewriter); + checkDataFilesWithHighFileScopedDeleteRatio(rewriter); + checkDataFilesWithHighDVDeleteRatio(rewriter); } private void checkDataFileSizeFiltering(SizeBasedDataRewriter rewriter) { @@ -196,6 +202,42 @@ private void checkDataFileGroupWithTooMuchData(SizeBasedDataRewriter rewriter) { assertThat(group).as("Must rewrite big file").hasSize(1); } + private void checkDataFilesWithHighFileScopedDeleteRatio(SizeBasedDataRewriter rewriter) { + FileScanTask tooManyDeletesTask = + MockFileScanTask.mockTaskWithFileScopedDeleteRecords(1000L, 100, 1, 30); + FileScanTask optimalTask = + MockFileScanTask.mockTaskWithFileScopedDeleteRecords(1000L, 100, 1, 29); + List tasks = ImmutableList.of(tooManyDeletesTask, optimalTask); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "0", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "10"); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + assertThat(groups).as("Must have 1 group").hasSize(1); + List group = Iterables.getOnlyElement(groups); + assertThat(group).as("Must rewrite 1 file").hasSize(1); + } + + private void checkDataFilesWithHighDVDeleteRatio(SizeBasedDataRewriter rewriter) { + FileScanTask tooManyDeletesTask = MockFileScanTask.mockTaskWithDVDeleteRecords(1000L, 100, 30); + FileScanTask optimalTask = MockFileScanTask.mockTaskWithDVDeleteRecords(1000L, 100, 29); + List tasks = ImmutableList.of(tooManyDeletesTask, optimalTask); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "0", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "10"); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + assertThat(groups).as("Must have 1 group").hasSize(1); + List group = Iterables.getOnlyElement(groups); + assertThat(group).as("Must rewrite 1 file").hasSize(1); + } + @Test public void testInvalidConstructorUsagesSortData() { Table table = catalog.createTable(TABLE_IDENT, SCHEMA); From 311cdee2de22a149f93681b309e37a04fbb7a0bc Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 22 Jan 2025 09:48:15 +0100 Subject: [PATCH 3/3] review feedback --- .../actions/SizeBasedDataRewriter.java | 26 +++++++++++-------- .../org/apache/iceberg/MockFileScanTask.java | 12 --------- .../spark/actions/TestSparkFileRewriter.java | 20 -------------- 3 files changed, 15 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java index 4459e8892b46..61b90d9fc6e3 100644 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java +++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java @@ -47,6 +47,7 @@ public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter options) { @Override protected Iterable filterFiles(Iterable tasks) { - return Iterables.filter( - tasks, task -> wronglySized(task) || tooManyDeletes(task) || tooHighDeleteRatio(task)); + return Iterables.filter(tasks, this::shouldRewrite); + } + + private boolean shouldRewrite(FileScanTask task) { + return wronglySized(task) || tooManyDeletes(task) || tooHighDeleteRatio(task); } private boolean tooManyDeletes(FileScanTask task) { @@ -100,19 +104,19 @@ private boolean anyTaskHasTooHighDeleteRatio(List group) { } private boolean tooHighDeleteRatio(FileScanTask task) { - if (null == task.deletes() || task.deletes().isEmpty()) { + if (task.deletes() == null || task.deletes().isEmpty()) { return false; } - if (ContentFileUtil.containsSingleDV(task.deletes()) - || task.deletes().stream().allMatch(ContentFileUtil::isFileScoped)) { - long sum = task.deletes().stream().mapToLong(ContentFile::recordCount).sum(); - double deletedRecords = (double) Math.min(sum, task.file().recordCount()); - double deleteRatio = deletedRecords / task.file().recordCount(); - return deleteRatio >= 0.3; - } + long knownDeletedRecordCount = + task.deletes().stream() + .filter(ContentFileUtil::isFileScoped) + .mapToLong(ContentFile::recordCount) + .sum(); - return false; + double deletedRecords = (double) Math.min(knownDeletedRecordCount, task.file().recordCount()); + double deleteRatio = deletedRecords / task.file().recordCount(); + return deleteRatio >= DELETE_RATIO_THRESHOLD; } @Override diff --git a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java index 06a9d87da500..7717d25ea4aa 100644 --- a/core/src/test/java/org/apache/iceberg/MockFileScanTask.java +++ b/core/src/test/java/org/apache/iceberg/MockFileScanTask.java @@ -89,18 +89,6 @@ public static MockFileScanTask mockTaskWithFileScopedDeleteRecords( return new MockFileScanTask(dataFile, mockDeletes); } - public static MockFileScanTask mockTaskWithDVDeleteRecords( - long length, long recordCount, long deletedRecords) { - DeleteFile deleteFile = Mockito.mock(DeleteFile.class); - Mockito.when(deleteFile.recordCount()).thenReturn(deletedRecords); - Mockito.when(deleteFile.format()).thenReturn(FileFormat.PUFFIN); - - DataFile dataFile = Mockito.mock(DataFile.class); - Mockito.when(dataFile.fileSizeInBytes()).thenReturn(length); - Mockito.when(dataFile.recordCount()).thenReturn(recordCount); - return new MockFileScanTask(dataFile, new DeleteFile[] {deleteFile}); - } - @Override public long length() { return length; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java index 1f40b6cba918..3ffa53c3e6aa 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java @@ -70,7 +70,6 @@ public void testBinPackDataSelectFiles() { checkDataFileGroupWithEnoughData(rewriter); checkDataFileGroupWithTooMuchData(rewriter); checkDataFilesWithHighFileScopedDeleteRatio(rewriter); - checkDataFilesWithHighDVDeleteRatio(rewriter); } @Test @@ -84,7 +83,6 @@ public void testSortDataSelectFiles() { checkDataFileGroupWithEnoughData(rewriter); checkDataFileGroupWithTooMuchData(rewriter); checkDataFilesWithHighFileScopedDeleteRatio(rewriter); - checkDataFilesWithHighDVDeleteRatio(rewriter); } @Test @@ -99,7 +97,6 @@ public void testZOrderDataSelectFiles() { checkDataFileGroupWithEnoughData(rewriter); checkDataFileGroupWithTooMuchData(rewriter); checkDataFilesWithHighFileScopedDeleteRatio(rewriter); - checkDataFilesWithHighDVDeleteRatio(rewriter); } private void checkDataFileSizeFiltering(SizeBasedDataRewriter rewriter) { @@ -221,23 +218,6 @@ private void checkDataFilesWithHighFileScopedDeleteRatio(SizeBasedDataRewriter r assertThat(group).as("Must rewrite 1 file").hasSize(1); } - private void checkDataFilesWithHighDVDeleteRatio(SizeBasedDataRewriter rewriter) { - FileScanTask tooManyDeletesTask = MockFileScanTask.mockTaskWithDVDeleteRecords(1000L, 100, 30); - FileScanTask optimalTask = MockFileScanTask.mockTaskWithDVDeleteRecords(1000L, 100, 29); - List tasks = ImmutableList.of(tooManyDeletesTask, optimalTask); - - Map options = - ImmutableMap.of( - SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "0", - SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "10"); - rewriter.init(options); - - Iterable> groups = rewriter.planFileGroups(tasks); - assertThat(groups).as("Must have 1 group").hasSize(1); - List group = Iterables.getOnlyElement(groups); - assertThat(group).as("Must rewrite 1 file").hasSize(1); - } - @Test public void testInvalidConstructorUsagesSortData() { Table table = catalog.createTable(TABLE_IDENT, SCHEMA);