diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java index 00dc4f75ec..fa5e315142 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java @@ -20,17 +20,19 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.function.Predicate; public class CloseablePredicate implements Predicate, Closeable { private final Predicate predicate; - private final Closeable closeable; + private final List closeable; - public CloseablePredicate(Predicate predicate, Closeable closeable) { + public CloseablePredicate(Predicate predicate, Closeable... closeable) { this.predicate = predicate; - this.closeable = closeable; + this.closeable = Arrays.asList(closeable); } @Override @@ -40,6 +42,8 @@ public boolean test(T t) { @Override public void close() throws IOException { - closeable.close(); + for (Closeable closeable : closeable) { + closeable.close(); + } } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java index f81fea10b4..4a497997ab 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java @@ -23,9 +23,12 @@ import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; -import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet; import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap; +import org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter; import org.apache.amoro.utils.ContentFiles; import org.apache.amoro.utils.map.StructLikeBaseMap; @@ -51,10 +54,12 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Filter; +import org.apache.iceberg.util.StructProjection; import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -90,17 +95,23 @@ public abstract class CombinedDeleteFilter { private final RewriteFilesInput input; private final List posDeletes; - private final List eqDeletes; + // There may have multiple equality delete fields within a rewrite input + private final Multimap, DeleteFile> eqDeleteFilesByDeleteIds = + Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); private Map positionMap; private final Set positionPathSets; - private Set deleteIds = new HashSet<>(); + // The delete ids are union of all equality delete fields + private final Set deleteIds = new HashSet<>(); private CloseablePredicate> eqPredicate; - private final Schema deleteSchema; + // Include all identifier fields of equality delete files + private final Schema requiredSchema; + + private final Map, Schema> deleteSchemaByDeleteIds = new HashMap<>(); private StructLikeCollections structLikeCollections = StructLikeCollections.DEFAULT; @@ -115,34 +126,19 @@ protected CombinedDeleteFilter( this.dataRecordCnt = Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum(); ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); - ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); if (rewriteFilesInput.deleteFiles() != null) { - String firstDeleteFilePath = null; for (ContentFile delete : rewriteFilesInput.deleteFiles()) { switch (delete.content()) { case POSITION_DELETES: posDeleteBuilder.add(ContentFiles.asDeleteFile(delete)); break; case EQUALITY_DELETES: - if (deleteIds.isEmpty()) { - deleteIds = ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds()); - firstDeleteFilePath = delete.path().toString(); - } else { - Set currentDeleteIds = - ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds()); - if (!deleteIds.equals(currentDeleteIds)) { - throw new IllegalArgumentException( - String.format( - "Equality delete files have different delete fields, first equality field ids:[%s]," - + " current equality field ids:[%s], first delete file path:[%s], " - + " current delete file path: [%s].", - deleteIds, - currentDeleteIds, - firstDeleteFilePath, - delete.path().toString())); - } - } - eqDeleteBuilder.add(ContentFiles.asDeleteFile(delete)); + DeleteFile deleteFile = ContentFiles.asDeleteFile(delete); + Set eqFieldIds = Sets.newHashSet(delete.equalityFieldIds()); + deleteIds.addAll(eqFieldIds); + eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile); + deleteSchemaByDeleteIds.putIfAbsent( + eqFieldIds, TypeUtil.select(tableSchema, eqFieldIds)); break; default: throw new UnsupportedOperationException( @@ -155,8 +151,7 @@ protected CombinedDeleteFilter( .map(s -> s.path().toString()) .collect(Collectors.toSet()); this.posDeletes = posDeleteBuilder.build(); - this.eqDeletes = eqDeleteBuilder.build(); - this.deleteSchema = TypeUtil.select(tableSchema, deleteIds); + this.requiredSchema = TypeUtil.select(tableSchema, deleteIds); if (structLikeCollections != null) { this.structLikeCollections = structLikeCollections; @@ -186,12 +181,20 @@ public boolean isFilterEqDelete() { protected abstract AuthenticatedFileIO getFileIO(); + /** + * Get all delete ids of equality delete files + * + *

For example, if there are two equality delete fields, one is [1, 2] and another is [1], the + * delete ids will be [1, 2]. + * + * @return delete ids + */ public Set deleteIds() { return deleteIds; } public boolean hasPosition() { - return posDeletes != null && posDeletes.size() > 0; + return posDeletes != null && !posDeletes.isEmpty(); } public void close() { @@ -232,35 +235,85 @@ private Predicate> applyEqDeletes() { return eqPredicate; } - if (eqDeletes.isEmpty()) { + if (eqDeleteFilesByDeleteIds.isEmpty()) { return record -> false; } - InternalRecordWrapper internalRecordWrapper = - new InternalRecordWrapper(deleteSchema.asStruct()); + List>> isInDeleteSets = Lists.newArrayList(); + List structMapCloseable = Lists.newArrayList(); + BloomFilter bloomFilter = initializeBloomFilter(); + for (Map.Entry, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) { + Predicate> predicate = + applyEqDeletesForSchema(deleteSchemaEntry, bloomFilter, structMapCloseable); + isInDeleteSets.add(predicate); + } - BloomFilter bloomFilter = null; - if (filterEqDelete) { - LOG.debug( - "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}", - dataRecordCnt); - // one million data is about 1.71M memory usage - bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001); - try (CloseableIterable deletes = - CloseableIterable.concat( - CloseableIterable.transform( - CloseableIterable.withNoopClose( - Arrays.stream(input.dataFiles()).collect(Collectors.toList())), - s -> openFile(s, deleteSchema)))) { - for (Record record : deletes) { - StructLike identifier = internalRecordWrapper.copyFor(record); - bloomFilter.put(identifier); - } - } catch (IOException e) { - throw new RuntimeException(e); + Predicate> isInDelete = + isInDeleteSets.stream().reduce(Predicate::or).orElse(record -> false); + this.eqPredicate = + new CloseablePredicate<>(isInDelete, structMapCloseable.toArray(new Closeable[0])); + return isInDelete; + } + + private BloomFilter initializeBloomFilter() { + if (!filterEqDelete) { + return null; + } + + LOG.debug( + "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}", + dataRecordCnt); + // one million data is about 1.71M memory usage + BloomFilter bloomFilter = + BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001); + + Map, InternalRecordWrapper> recordWrappers = Maps.newHashMap(); + for (Map.Entry, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) { + Set ids = deleteSchemaEntry.getKey(); + Schema deleteSchema = deleteSchemaEntry.getValue(); + + InternalRecordWrapper internalRecordWrapper = + new InternalRecordWrapper(deleteSchema.asStruct()); + recordWrappers.put(ids, internalRecordWrapper); + } + + try (CloseableIterable deletes = readRecords()) { + for (Record record : deletes) { + recordWrappers.forEach( + (ids, internalRecordWrapper) -> { + Schema deleteSchema = deleteSchemaByDeleteIds.get(ids); + StructProjection projection = + StructProjection.create(requiredSchema, deleteSchema).wrap(record); + StructLike deletePK = internalRecordWrapper.copyFor(projection); + bloomFilter.put(deletePK); + }); } + } catch (IOException e) { + throw new RuntimeException(e); } + return bloomFilter; + } + + private CloseableIterable readRecords() { + return CloseableIterable.concat( + CloseableIterable.transform( + CloseableIterable.withNoopClose( + Arrays.stream(input.dataFiles()).collect(Collectors.toList())), + s -> openFile(s, requiredSchema))); + } + + private Predicate> applyEqDeletesForSchema( + Map.Entry, Schema> deleteSchemaEntry, + BloomFilter bloomFilter, + List structMapCloseable) { + Set ids = deleteSchemaEntry.getKey(); + Schema deleteSchema = deleteSchemaEntry.getValue(); + Iterable eqDeletes = eqDeleteFilesByDeleteIds.get(ids); + + InternalRecordWrapper internalRecordWrapper = + new InternalRecordWrapper(deleteSchema.asStruct()); + CloseableIterable deleteRecords = CloseableIterable.transform( CloseableIterable.concat( @@ -294,23 +347,20 @@ private Predicate> applyEqDeletes() { } catch (IOException e) { throw new RuntimeException(e); } + structMapCloseable.add(structLikeMap); + + return structForDelete -> { + StructProjection deleteProjection = + StructProjection.create(requiredSchema, deleteSchema).wrap(structForDelete.getPk()); + StructLike dataPk = internalRecordWrapper.copyFor(deleteProjection); + Long dataLSN = structForDelete.getLsn(); + Long deleteLsn = structLikeMap.get(dataPk); + if (deleteLsn == null) { + return false; + } - Predicate> isInDeleteSet = - structForDelete -> { - StructLike dataPk = internalRecordWrapper.copyFor(structForDelete.getPk()); - Long dataLSN = structForDelete.getLsn(); - Long deleteLsn = structLikeMap.get(dataPk); - if (deleteLsn == null) { - return false; - } - - return deleteLsn.compareTo(dataLSN) > 0; - }; - - CloseablePredicate> closeablePredicate = - new CloseablePredicate<>(isInDeleteSet, structLikeMap); - this.eqPredicate = closeablePredicate; - return isInDeleteSet; + return deleteLsn.compareTo(dataLSN) > 0; + }; } private CloseableIterable> applyEqDeletes( @@ -322,7 +372,7 @@ private CloseableIterable> applyEqDeletes( private CloseableIterable> eqDeletesBase( CloseableIterable> records, Predicate> predicate) { // Predicate to test whether a row should be visible to user after applying equality deletions. - if (eqDeletes.isEmpty()) { + if (eqDeleteFilesByDeleteIds.isEmpty()) { return records; } diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java index 3e33363c42..01eaf4180b 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java @@ -297,4 +297,229 @@ public void readDataEnableFilterEqDelete() throws IOException { } dataReader.close(); } + + @Test + public void readDataDropAEqField() throws IOException { + CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 100L; + StructLike partitionData = getPartitionData(); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) + .format(fileFormat) + .build(); + DataFile dataFile = + FileHelpers.writeDataFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + Arrays.asList( + MixedDataTestHelpers.createRecord(1, "john", 0, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(2, "lily", 1, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(3, "sam", 2, "1970-01-01T08:00:00"))); + + Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1, 2)); + GenericRecord idRecord = GenericRecord.create(idSchema1); + List records = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records.add(idRecord.copy("id", id, "name", "john"))); + DeleteFile eqDeleteFile1 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records, + idSchema1); + + // Assuming that drop an identifier field + Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1)); + GenericRecord idRecord2 = GenericRecord.create(idSchema2); + List records2 = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records2.add(idRecord2.copy("id", id))); + DeleteFile eqDeleteFile2 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records2, + idSchema2); + + RewriteFilesInput task2 = + new RewriteFilesInput( + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DeleteFile[] {}, + new DeleteFile[] { + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L), + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L) + }, + getMixedTable()); + + GenericCombinedIcebergDataReader dataReader = + new GenericCombinedIcebergDataReader( + getMixedTable().io(), + getMixedTable().schema(), + getMixedTable().spec(), + getMixedTable().asUnkeyedTable().encryption(), + null, + false, + IdentityPartitionConverters::convertConstant, + false, + null, + task2); + try (CloseableIterable readRecords = dataReader.readData()) { + Assert.assertEquals(1, Iterables.size(readRecords)); + } + + try (CloseableIterable readRecords = dataReader.readDeletedData()) { + Assert.assertEquals(2, Iterables.size(readRecords)); + } + + dataReader.close(); + } + + @Test + public void readDataReplaceAEqField() throws IOException { + StructLike partitionData = getPartitionData(); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) + .format(fileFormat) + .build(); + DataFile dataFile = + FileHelpers.writeDataFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + Arrays.asList( + MixedDataTestHelpers.createRecord(1, "john", 0, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(2, "lily", 1, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(3, "sam", 2, "1970-01-01T08:00:00"))); + + Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1)); + GenericRecord idRecord1 = GenericRecord.create(idSchema1); + List records1 = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id", id))); + DeleteFile eqDeleteFile1 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records1, + idSchema1); + + // Write records and identifier field is `name` instead + Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(2)); + GenericRecord idRecord2 = GenericRecord.create(idSchema2); + List records2 = new ArrayList<>(); + records2.add(idRecord2.copy("name", "john")); + DeleteFile eqDeleteFile2 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records2, + idSchema2); + RewriteFilesInput task = + new RewriteFilesInput( + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DeleteFile[] {}, + new DeleteFile[] { + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L), + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L) + }, + getMixedTable()); + GenericCombinedIcebergDataReader dataReader = + new GenericCombinedIcebergDataReader( + getMixedTable().io(), + getMixedTable().schema(), + getMixedTable().spec(), + getMixedTable().asUnkeyedTable().encryption(), + null, + false, + IdentityPartitionConverters::convertConstant, + false, + null, + task); + try (CloseableIterable readRecords = dataReader.readData()) { + Assert.assertEquals(0, Iterables.size(readRecords)); + } + try (CloseableIterable readRecords = dataReader.readDeletedData()) { + Assert.assertEquals(3, Iterables.size(readRecords)); + } + + dataReader.close(); + } + + @Test + public void readReadAddAEqField() throws IOException { + StructLike partitionData = getPartitionData(); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) + .format(fileFormat) + .build(); + DataFile dataFile = + FileHelpers.writeDataFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + Arrays.asList( + MixedDataTestHelpers.createRecord(1, "john", 0, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(2, "lily", 1, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(3, "sam", 2, "1970-01-01T08:00:00"))); + + Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1)); + GenericRecord idRecord1 = GenericRecord.create(idSchema1); + List records1 = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id", id))); + DeleteFile eqDeleteFile1 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records1, + idSchema1); + + // Write delete records and add a new field `name` + Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1, 2)); + GenericRecord idRecord2 = GenericRecord.create(idSchema2); + List records2 = new ArrayList<>(); + IntStream.range(1, 100).forEach(id -> records2.add(idRecord2.copy("id", id, "name", "john"))); + DeleteFile eqDeleteFile2 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records2, + idSchema2); + RewriteFilesInput task = + new RewriteFilesInput( + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DeleteFile[] {}, + new DeleteFile[] { + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L), + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L) + }, + getMixedTable()); + + GenericCombinedIcebergDataReader dataReader = + new GenericCombinedIcebergDataReader( + getMixedTable().io(), + getMixedTable().schema(), + getMixedTable().spec(), + getMixedTable().asUnkeyedTable().encryption(), + null, + false, + IdentityPartitionConverters::convertConstant, + false, + null, + task); + + try (CloseableIterable readRecords = dataReader.readData()) { + Assert.assertEquals(0, Iterables.size(readRecords)); + } + try (CloseableIterable readRecords = dataReader.readDeletedData()) { + Assert.assertEquals(3, Iterables.size(readRecords)); + } + + dataReader.close(); + } }