Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core, Spark: Rewrite data files with high delete ratio #11825

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.PropertyUtil;

public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter<FileScanTask, DataFile> {
Expand Down Expand Up @@ -84,13 +86,30 @@ private boolean shouldRewrite(List<FileScanTask> group) {
return enoughInputFiles(group)
|| enoughContent(group)
|| tooMuchContent(group)
|| anyTaskHasTooManyDeletes(group);
|| anyTaskHasTooManyDeletes(group)
|| anyTaskHasTooHighDeleteRatio(group);
}

private boolean anyTaskHasTooManyDeletes(List<FileScanTask> group) {
return group.stream().anyMatch(this::tooManyDeletes);
}

private boolean anyTaskHasTooHighDeleteRatio(List<FileScanTask> group) {
return group.stream().anyMatch(this::tooHighDeleteRatio);
}

private boolean tooHighDeleteRatio(FileScanTask task) {
if (ContentFileUtil.containsSingleDV(task.deletes())) {
DeleteFile file = Iterables.getOnlyElement(task.deletes());
double deletedRecords = (double) Math.min(file.recordCount(), task.file().recordCount());
double deleteRatio = deletedRecords / task.file().recordCount();
Comment on lines +104 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] wondering if adjusting deleteRecords by taking a min is the right thing for SplitScanTask ? as the recordCount is records in data files.

my understanding was that delete ratio can be always:

file.recordCount() / task.file().recordCount()

IIUC, the following is Maths for this :

long[] splitOffsets = splitOffsets(file);
long splitOffset = splitOffsets != null ? splitOffsets[0] : 0L;
// meaning this fileScan task contains only scannedFileFraction of the file in it
double scannedFileFraction = ((double) length) / (file.fileSizeInBytes() - splitOffset);
// number of deletes associated with this based on uniform distribution of deletes across the splitScan task
long deletedRecords = (scannedFileFraction * deleteFile.recordCount()) ;
long totalRecordWithoutDeletes = (scannedFileFraction * file.recordCount());
double deleteRatio = deletedRecords / totalRecordWithoutDeletes
deleteRatio = (scannedFileFraction * deleteFile.recordCount()) / (scannedFileFraction * file.recordCount())
deleteRatio = (scannedFileFraction * deleteFile.recordCount()) / (scannedFileFraction * file.recordCount())
deleteRatio = deleteFile.recordCount() / file.recordCount()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning on re-visiting the calculation after the holiday break (sorry forgot to put the PR up as a draft)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, Happy Holidays @nastra !

// TODO: make this configurable
return deleteRatio >= 0.3;
}

return false;
}

@Override
protected long defaultTargetFileSize() {
return PropertyUtil.propertyAsLong(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,135 @@ public void testBinPackAfterPartitionChange() {
shouldHaveFiles(table, 20);
}

@TestTemplate
public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Table table = createTable();
int numDataFiles = 5;
// 100 / 5 = 20 records per data file
writeRecords(numDataFiles, 100);
// delete > 100% of records for each data file
int numPositionsToDelete = 1000;
table.refresh();
List<DataFile> dataFiles = TestHelpers.dataFiles(table);
assertThat(dataFiles).hasSize(numDataFiles);

RowDelta rowDelta = table.newRowDelta();
for (DataFile dataFile : dataFiles) {
writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete)
.forEach(rowDelta::addDeletes);
}

rowDelta.commit();

Set<DeleteFile> deleteFiles = TestHelpers.deleteFiles(table);
assertThat(deleteFiles).hasSize(numDataFiles);

// there are 5 data files with a delete ratio of > 100% each, so all data files should be
// rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only
// triggered when the delete ratio of >= 30% is hit
RewriteDataFiles.Result result =
SparkActions.get(spark)
.rewriteDataFiles(table)
.option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10")
.execute();

assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles);

table.refresh();
List<DataFile> newDataFiles = TestHelpers.dataFiles(table);
assertThat(newDataFiles).isEmpty();

Set<DeleteFile> newDeleteFiles = TestHelpers.deleteFiles(table);
assertThat(newDeleteFiles).isEmpty();
}

@TestTemplate
public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Table table = createTable();
int numDataFiles = 5;
// 100 / 5 = 20 records per data file
writeRecords(numDataFiles, 100);
// delete 40% of records for each data file
int numPositionsToDelete = 8;
table.refresh();
List<DataFile> dataFiles = TestHelpers.dataFiles(table);
assertThat(dataFiles).hasSize(numDataFiles);

RowDelta rowDelta = table.newRowDelta();
for (DataFile dataFile : dataFiles) {
writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete)
.forEach(rowDelta::addDeletes);
}

rowDelta.commit();

Set<DeleteFile> deleteFiles = TestHelpers.deleteFiles(table);
assertThat(deleteFiles).hasSize(numDataFiles);

// there are 5 data files with a delete ratio of 40% each, so all data files should be
// rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only
// triggered when the delete ratio of >= 30% is hit
RewriteDataFiles.Result result =
SparkActions.get(spark)
.rewriteDataFiles(table)
.option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10")
.execute();

assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles);

table.refresh();
List<DataFile> newDataFiles = TestHelpers.dataFiles(table);
assertThat(newDataFiles).hasSize(1);

Set<DeleteFile> newDeleteFiles = TestHelpers.deleteFiles(table);
assertThat(newDeleteFiles).isEmpty();
}

@TestTemplate
public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Table table = createTable();
int numDataFiles = 5;
// 100 / 5 = 20 records per data file
writeRecords(numDataFiles, 100);
// delete 25% of records for each data file
int numPositionsToDelete = 5;
table.refresh();
List<DataFile> dataFiles = TestHelpers.dataFiles(table);
assertThat(dataFiles).hasSize(numDataFiles);

RowDelta rowDelta = table.newRowDelta();
for (DataFile dataFile : dataFiles) {
writeDV(table, dataFile.partition(), dataFile.location(), numPositionsToDelete)
.forEach(rowDelta::addDeletes);
}

rowDelta.commit();

Set<DeleteFile> deleteFiles = TestHelpers.deleteFiles(table);
assertThat(deleteFiles).hasSize(numDataFiles);

// there are 5 data files with a delete ratio of 25% each, so data files should not be
// rewritten. Set MIN_INPUT_FILES > to the number of data files so that compaction is only
// triggered when the delete ratio of >= 30% is hit
RewriteDataFiles.Result result =
SparkActions.get(spark)
.rewriteDataFiles(table)
.option(SizeBasedFileRewriter.MIN_INPUT_FILES, "10")
.execute();

assertThat(result.rewrittenDataFilesCount()).isEqualTo(0);

table.refresh();
List<DataFile> newDataFiles = TestHelpers.dataFiles(table);
assertThat(newDataFiles).hasSameSizeAs(dataFiles);

Set<DeleteFile> newDeleteFiles = TestHelpers.deleteFiles(table);
assertThat(newDeleteFiles).hasSameSizeAs(deleteFiles);
}

@TestTemplate
public void testBinPackWithDeletes() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
Expand Down