From 7051a274e0d4ead65a1e1238a4dead751887ec0c Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Thu, 5 Sep 2024 15:05:24 +0800 Subject: [PATCH 1/4] rewrite multiple delete ids --- .../apache/amoro/io/CloseablePredicate.java | 12 +- .../amoro/io/reader/CombinedDeleteFilter.java | 184 ++++++++------ .../amoro/io/TestIcebergCombinedReader.java | 225 ++++++++++++++++++ 3 files changed, 350 insertions(+), 71 deletions(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java index 00dc4f75ec..fa5e315142 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/CloseablePredicate.java @@ -20,17 +20,19 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.function.Predicate; public class CloseablePredicate implements Predicate, Closeable { private final Predicate predicate; - private final Closeable closeable; + private final List closeable; - public CloseablePredicate(Predicate predicate, Closeable closeable) { + public CloseablePredicate(Predicate predicate, Closeable... closeable) { this.predicate = predicate; - this.closeable = closeable; + this.closeable = Arrays.asList(closeable); } @Override @@ -40,6 +42,8 @@ public boolean test(T t) { @Override public void close() throws IOException { - closeable.close(); + for (Closeable closeable : closeable) { + closeable.close(); + } } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java index f81fea10b4..d73164a545 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java @@ -23,7 +23,6 @@ import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; -import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet; import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter; @@ -49,12 +48,18 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimap; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Filter; +import org.apache.iceberg.util.StructProjection; import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -90,17 +95,23 @@ public abstract class CombinedDeleteFilter { private final RewriteFilesInput input; private final List posDeletes; - private final List eqDeletes; + // There may have multiple equality delete fields within a rewrite input + // and the delete ids are union of all equality delete fields. + private final Multimap, DeleteFile> eqDeleteFilesByDeleteIds = + Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); private Map positionMap; private final Set positionPathSets; - private Set deleteIds = new HashSet<>(); + private final Set deleteIds = new HashSet<>(); private CloseablePredicate> eqPredicate; - private final Schema deleteSchema; + // Include all identifier fields of equality delete files + private final Schema requiredSchema; + + private final Map, Schema> deleteSchemaByDeleteIds = new HashMap<>(); private StructLikeCollections structLikeCollections = StructLikeCollections.DEFAULT; @@ -115,34 +126,19 @@ protected CombinedDeleteFilter( this.dataRecordCnt = Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum(); ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); - ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); if (rewriteFilesInput.deleteFiles() != null) { - String firstDeleteFilePath = null; for (ContentFile delete : rewriteFilesInput.deleteFiles()) { switch (delete.content()) { case POSITION_DELETES: posDeleteBuilder.add(ContentFiles.asDeleteFile(delete)); break; case EQUALITY_DELETES: - if (deleteIds.isEmpty()) { - deleteIds = ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds()); - firstDeleteFilePath = delete.path().toString(); - } else { - Set currentDeleteIds = - ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds()); - if (!deleteIds.equals(currentDeleteIds)) { - throw new IllegalArgumentException( - String.format( - "Equality delete files have different delete fields, first equality field ids:[%s]," - + " current equality field ids:[%s], first delete file path:[%s], " - + " current delete file path: [%s].", - deleteIds, - currentDeleteIds, - firstDeleteFilePath, - delete.path().toString())); - } - } - eqDeleteBuilder.add(ContentFiles.asDeleteFile(delete)); + DeleteFile deleteFile = ContentFiles.asDeleteFile(delete); + Set eqFieldIds = Sets.newHashSet(delete.equalityFieldIds()); + deleteIds.addAll(eqFieldIds); + eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile); + deleteSchemaByDeleteIds.putIfAbsent( + eqFieldIds, TypeUtil.select(tableSchema, eqFieldIds)); break; default: throw new UnsupportedOperationException( @@ -155,8 +151,7 @@ protected CombinedDeleteFilter( .map(s -> s.path().toString()) .collect(Collectors.toSet()); this.posDeletes = posDeleteBuilder.build(); - this.eqDeletes = eqDeleteBuilder.build(); - this.deleteSchema = TypeUtil.select(tableSchema, deleteIds); + this.requiredSchema = TypeUtil.select(tableSchema, deleteIds); if (structLikeCollections != null) { this.structLikeCollections = structLikeCollections; @@ -186,12 +181,20 @@ public boolean isFilterEqDelete() { protected abstract AuthenticatedFileIO getFileIO(); + /** + * Get all delete ids of equality delete files + * + *

For example, if there are two equality delete fields, one is [1, 2] and another is [1], the + * delete ids will be [1, 2]. + * + * @return delete ids + */ public Set deleteIds() { return deleteIds; } public boolean hasPosition() { - return posDeletes != null && posDeletes.size() > 0; + return posDeletes != null && !posDeletes.isEmpty(); } public void close() { @@ -232,35 +235,85 @@ private Predicate> applyEqDeletes() { return eqPredicate; } - if (eqDeletes.isEmpty()) { + if (eqDeleteFilesByDeleteIds.isEmpty()) { return record -> false; } - InternalRecordWrapper internalRecordWrapper = - new InternalRecordWrapper(deleteSchema.asStruct()); + List>> isInDeleteSets = Lists.newArrayList(); + List structMapCloseable = Lists.newArrayList(); + BloomFilter bloomFilter = initializeBloomFilter(); + for (Map.Entry, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) { + Predicate> predicate = + applyEqDeletesForSchema(deleteSchemaEntry, bloomFilter, structMapCloseable); + isInDeleteSets.add(predicate); + } - BloomFilter bloomFilter = null; - if (filterEqDelete) { - LOG.debug( - "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}", - dataRecordCnt); - // one million data is about 1.71M memory usage - bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001); - try (CloseableIterable deletes = - CloseableIterable.concat( - CloseableIterable.transform( - CloseableIterable.withNoopClose( - Arrays.stream(input.dataFiles()).collect(Collectors.toList())), - s -> openFile(s, deleteSchema)))) { - for (Record record : deletes) { - StructLike identifier = internalRecordWrapper.copyFor(record); - bloomFilter.put(identifier); - } - } catch (IOException e) { - throw new RuntimeException(e); + Predicate> isInDelete = + isInDeleteSets.stream().reduce(Predicate::or).orElse(record -> false); + this.eqPredicate = + new CloseablePredicate<>(isInDelete, structMapCloseable.toArray(new Closeable[0])); + return isInDelete; + } + + private BloomFilter initializeBloomFilter() { + if (!filterEqDelete) { + return null; + } + + LOG.debug( + "Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}", + dataRecordCnt); + // one million data is about 1.71M memory usage + BloomFilter bloomFilter = + BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001); + + Map, InternalRecordWrapper> recordWrappers = Maps.newHashMap(); + for (Map.Entry, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) { + Set ids = deleteSchemaEntry.getKey(); + Schema deleteSchema = deleteSchemaEntry.getValue(); + + InternalRecordWrapper internalRecordWrapper = + new InternalRecordWrapper(deleteSchema.asStruct()); + recordWrappers.put(ids, internalRecordWrapper); + } + + try (CloseableIterable deletes = readRecords()) { + for (Record record : deletes) { + recordWrappers.forEach( + (ids, internalRecordWrapper) -> { + Schema deleteSchema = deleteSchemaByDeleteIds.get(ids); + StructProjection projection = + StructProjection.create(requiredSchema, deleteSchema).wrap(record); + StructLike deletePK = internalRecordWrapper.copyFor(projection); + bloomFilter.put(deletePK); + }); } + } catch (IOException e) { + throw new RuntimeException(e); } + return bloomFilter; + } + + private CloseableIterable readRecords() { + return CloseableIterable.concat( + CloseableIterable.transform( + CloseableIterable.withNoopClose( + Arrays.stream(input.dataFiles()).collect(Collectors.toList())), + s -> openFile(s, requiredSchema))); + } + + private Predicate> applyEqDeletesForSchema( + Map.Entry, Schema> deleteSchemaEntry, + BloomFilter bloomFilter, + List structMapCloseable) { + Set ids = deleteSchemaEntry.getKey(); + Schema deleteSchema = deleteSchemaEntry.getValue(); + Iterable eqDeletes = eqDeleteFilesByDeleteIds.get(ids); + + InternalRecordWrapper internalRecordWrapper = + new InternalRecordWrapper(deleteSchema.asStruct()); + CloseableIterable deleteRecords = CloseableIterable.transform( CloseableIterable.concat( @@ -294,23 +347,20 @@ private Predicate> applyEqDeletes() { } catch (IOException e) { throw new RuntimeException(e); } + structMapCloseable.add(structLikeMap); + + return structForDelete -> { + StructProjection deleteProjection = + StructProjection.create(requiredSchema, deleteSchema).wrap(structForDelete.getPk()); + StructLike dataPk = internalRecordWrapper.copyFor(deleteProjection); + Long dataLSN = structForDelete.getLsn(); + Long deleteLsn = structLikeMap.get(dataPk); + if (deleteLsn == null) { + return false; + } - Predicate> isInDeleteSet = - structForDelete -> { - StructLike dataPk = internalRecordWrapper.copyFor(structForDelete.getPk()); - Long dataLSN = structForDelete.getLsn(); - Long deleteLsn = structLikeMap.get(dataPk); - if (deleteLsn == null) { - return false; - } - - return deleteLsn.compareTo(dataLSN) > 0; - }; - - CloseablePredicate> closeablePredicate = - new CloseablePredicate<>(isInDeleteSet, structLikeMap); - this.eqPredicate = closeablePredicate; - return isInDeleteSet; + return deleteLsn.compareTo(dataLSN) > 0; + }; } private CloseableIterable> applyEqDeletes( @@ -322,7 +372,7 @@ private CloseableIterable> applyEqDeletes( private CloseableIterable> eqDeletesBase( CloseableIterable> records, Predicate> predicate) { // Predicate to test whether a row should be visible to user after applying equality deletions. - if (eqDeletes.isEmpty()) { + if (eqDeleteFilesByDeleteIds.isEmpty()) { return records; } diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java index 3e33363c42..01eaf4180b 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/io/TestIcebergCombinedReader.java @@ -297,4 +297,229 @@ public void readDataEnableFilterEqDelete() throws IOException { } dataReader.close(); } + + @Test + public void readDataDropAEqField() throws IOException { + CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 100L; + StructLike partitionData = getPartitionData(); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) + .format(fileFormat) + .build(); + DataFile dataFile = + FileHelpers.writeDataFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + Arrays.asList( + MixedDataTestHelpers.createRecord(1, "john", 0, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(2, "lily", 1, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(3, "sam", 2, "1970-01-01T08:00:00"))); + + Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1, 2)); + GenericRecord idRecord = GenericRecord.create(idSchema1); + List records = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records.add(idRecord.copy("id", id, "name", "john"))); + DeleteFile eqDeleteFile1 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records, + idSchema1); + + // Assuming that drop an identifier field + Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1)); + GenericRecord idRecord2 = GenericRecord.create(idSchema2); + List records2 = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records2.add(idRecord2.copy("id", id))); + DeleteFile eqDeleteFile2 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records2, + idSchema2); + + RewriteFilesInput task2 = + new RewriteFilesInput( + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DeleteFile[] {}, + new DeleteFile[] { + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L), + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L) + }, + getMixedTable()); + + GenericCombinedIcebergDataReader dataReader = + new GenericCombinedIcebergDataReader( + getMixedTable().io(), + getMixedTable().schema(), + getMixedTable().spec(), + getMixedTable().asUnkeyedTable().encryption(), + null, + false, + IdentityPartitionConverters::convertConstant, + false, + null, + task2); + try (CloseableIterable readRecords = dataReader.readData()) { + Assert.assertEquals(1, Iterables.size(readRecords)); + } + + try (CloseableIterable readRecords = dataReader.readDeletedData()) { + Assert.assertEquals(2, Iterables.size(readRecords)); + } + + dataReader.close(); + } + + @Test + public void readDataReplaceAEqField() throws IOException { + StructLike partitionData = getPartitionData(); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) + .format(fileFormat) + .build(); + DataFile dataFile = + FileHelpers.writeDataFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + Arrays.asList( + MixedDataTestHelpers.createRecord(1, "john", 0, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(2, "lily", 1, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(3, "sam", 2, "1970-01-01T08:00:00"))); + + Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1)); + GenericRecord idRecord1 = GenericRecord.create(idSchema1); + List records1 = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id", id))); + DeleteFile eqDeleteFile1 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records1, + idSchema1); + + // Write records and identifier field is `name` instead + Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(2)); + GenericRecord idRecord2 = GenericRecord.create(idSchema2); + List records2 = new ArrayList<>(); + records2.add(idRecord2.copy("name", "john")); + DeleteFile eqDeleteFile2 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records2, + idSchema2); + RewriteFilesInput task = + new RewriteFilesInput( + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DeleteFile[] {}, + new DeleteFile[] { + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L), + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L) + }, + getMixedTable()); + GenericCombinedIcebergDataReader dataReader = + new GenericCombinedIcebergDataReader( + getMixedTable().io(), + getMixedTable().schema(), + getMixedTable().spec(), + getMixedTable().asUnkeyedTable().encryption(), + null, + false, + IdentityPartitionConverters::convertConstant, + false, + null, + task); + try (CloseableIterable readRecords = dataReader.readData()) { + Assert.assertEquals(0, Iterables.size(readRecords)); + } + try (CloseableIterable readRecords = dataReader.readDeletedData()) { + Assert.assertEquals(3, Iterables.size(readRecords)); + } + + dataReader.close(); + } + + @Test + public void readReadAddAEqField() throws IOException { + StructLike partitionData = getPartitionData(); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) + .format(fileFormat) + .build(); + DataFile dataFile = + FileHelpers.writeDataFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + Arrays.asList( + MixedDataTestHelpers.createRecord(1, "john", 0, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(2, "lily", 1, "1970-01-01T08:00:00"), + MixedDataTestHelpers.createRecord(3, "sam", 2, "1970-01-01T08:00:00"))); + + Schema idSchema1 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1)); + GenericRecord idRecord1 = GenericRecord.create(idSchema1); + List records1 = new ArrayList<>(); + IntStream.range(2, 100).forEach(id -> records1.add(idRecord1.copy("id", id))); + DeleteFile eqDeleteFile1 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records1, + idSchema1); + + // Write delete records and add a new field `name` + Schema idSchema2 = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1, 2)); + GenericRecord idRecord2 = GenericRecord.create(idSchema2); + List records2 = new ArrayList<>(); + IntStream.range(1, 100).forEach(id -> records2.add(idRecord2.copy("id", id, "name", "john"))); + DeleteFile eqDeleteFile2 = + FileHelpers.writeDeleteFile( + getMixedTable().asUnkeyedTable(), + outputFileFactory.newOutputFile(partitionData).encryptingOutputFile(), + partitionData, + records2, + idSchema2); + RewriteFilesInput task = + new RewriteFilesInput( + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 3L)}, + new DeleteFile[] {}, + new DeleteFile[] { + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile1, 4L), + MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile2, 5L) + }, + getMixedTable()); + + GenericCombinedIcebergDataReader dataReader = + new GenericCombinedIcebergDataReader( + getMixedTable().io(), + getMixedTable().schema(), + getMixedTable().spec(), + getMixedTable().asUnkeyedTable().encryption(), + null, + false, + IdentityPartitionConverters::convertConstant, + false, + null, + task); + + try (CloseableIterable readRecords = dataReader.readData()) { + Assert.assertEquals(0, Iterables.size(readRecords)); + } + try (CloseableIterable readRecords = dataReader.readDeletedData()) { + Assert.assertEquals(3, Iterables.size(readRecords)); + } + + dataReader.close(); + } } From 20f04806acfe03083901c0710f39d994e8d9444a Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Thu, 5 Sep 2024 15:57:06 +0800 Subject: [PATCH 2/4] update comments --- .../java/org/apache/amoro/io/reader/CombinedDeleteFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java index d73164a545..8eda291a25 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java @@ -96,7 +96,6 @@ public abstract class CombinedDeleteFilter { private final RewriteFilesInput input; private final List posDeletes; // There may have multiple equality delete fields within a rewrite input - // and the delete ids are union of all equality delete fields. private final Multimap, DeleteFile> eqDeleteFilesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); @@ -104,6 +103,7 @@ public abstract class CombinedDeleteFilter { private final Set positionPathSets; + // The delete ids are union of all equality delete fields private final Set deleteIds = new HashSet<>(); private CloseablePredicate> eqPredicate; From 3828b5d87391e71017d332d9e42eddfa8d4eba92 Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Wed, 25 Sep 2024 14:01:46 +0800 Subject: [PATCH 3/4] replace shade guava with google --- .../org/apache/amoro/io/reader/CombinedDeleteFilter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java index 8eda291a25..4a497997ab 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java @@ -25,6 +25,10 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap; +import org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter; import org.apache.amoro.utils.ContentFiles; import org.apache.amoro.utils.map.StructLikeBaseMap; @@ -48,10 +52,6 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Multimap; -import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.StructProjection; From ca64fb8412ca371d799db8db38f74097620457a6 Mon Sep 17 00:00:00 2001 From: Xavier Bai Date: Fri, 18 Oct 2024 10:23:54 +0800 Subject: [PATCH 4/4] replace `computeIfAbsent` with `putIfAbsent` to void projecting schema every time --- .../java/org/apache/amoro/io/reader/CombinedDeleteFilter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java index 4a497997ab..830976ea28 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java @@ -137,8 +137,8 @@ protected CombinedDeleteFilter( Set eqFieldIds = Sets.newHashSet(delete.equalityFieldIds()); deleteIds.addAll(eqFieldIds); eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile); - deleteSchemaByDeleteIds.putIfAbsent( - eqFieldIds, TypeUtil.select(tableSchema, eqFieldIds)); + deleteSchemaByDeleteIds.computeIfAbsent( + eqFieldIds, ids -> TypeUtil.select(tableSchema, ids)); break; default: throw new UnsupportedOperationException(