Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2870] Rewrite the task which contain different eq-delete fields #3175

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;

public class CloseablePredicate<T> implements Predicate<T>, Closeable {

private final Predicate<T> predicate;

private final Closeable closeable;
private final List<Closeable> closeable;

public CloseablePredicate(Predicate<T> predicate, Closeable closeable) {
public CloseablePredicate(Predicate<T> predicate, Closeable... closeable) {
this.predicate = predicate;
this.closeable = closeable;
this.closeable = Arrays.asList(closeable);
}

@Override
Expand All @@ -40,6 +42,8 @@ public boolean test(T t) {

@Override
public void close() throws IOException {
closeable.close();
for (Closeable closeable : closeable) {
closeable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableSet;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Multimap;
import org.apache.amoro.shade.guava32.com.google.common.collect.Multimaps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.guava32.com.google.common.hash.BloomFilter;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.map.StructLikeBaseMap;
Expand All @@ -51,10 +54,12 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.StructProjection;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -90,17 +95,23 @@ public abstract class CombinedDeleteFilter<T extends StructLike> {

private final RewriteFilesInput input;
private final List<DeleteFile> posDeletes;
private final List<DeleteFile> eqDeletes;
// There may have multiple equality delete fields within a rewrite input
private final Multimap<Set<Integer>, DeleteFile> eqDeleteFilesByDeleteIds =
Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);

private Map<String, Roaring64Bitmap> positionMap;

private final Set<String> positionPathSets;

private Set<Integer> deleteIds = new HashSet<>();
// The delete ids are union of all equality delete fields
private final Set<Integer> deleteIds = new HashSet<>();

private CloseablePredicate<StructForDelete<T>> eqPredicate;

private final Schema deleteSchema;
// Include all identifier fields of equality delete files
private final Schema requiredSchema;

private final Map<Set<Integer>, Schema> deleteSchemaByDeleteIds = new HashMap<>();

private StructLikeCollections structLikeCollections = StructLikeCollections.DEFAULT;

Expand All @@ -115,34 +126,19 @@ protected CombinedDeleteFilter(
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
if (rewriteFilesInput.deleteFiles() != null) {
String firstDeleteFilePath = null;
for (ContentFile<?> delete : rewriteFilesInput.deleteFiles()) {
switch (delete.content()) {
case POSITION_DELETES:
posDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
break;
case EQUALITY_DELETES:
if (deleteIds.isEmpty()) {
deleteIds = ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
firstDeleteFilePath = delete.path().toString();
} else {
Set<Integer> currentDeleteIds =
ImmutableSet.copyOf(ContentFiles.asDeleteFile(delete).equalityFieldIds());
if (!deleteIds.equals(currentDeleteIds)) {
throw new IllegalArgumentException(
String.format(
"Equality delete files have different delete fields, first equality field ids:[%s],"
+ " current equality field ids:[%s], first delete file path:[%s], "
+ " current delete file path: [%s].",
deleteIds,
currentDeleteIds,
firstDeleteFilePath,
delete.path().toString()));
}
}
eqDeleteBuilder.add(ContentFiles.asDeleteFile(delete));
DeleteFile deleteFile = ContentFiles.asDeleteFile(delete);
Set<Integer> eqFieldIds = Sets.newHashSet(delete.equalityFieldIds());
deleteIds.addAll(eqFieldIds);
eqDeleteFilesByDeleteIds.put(eqFieldIds, deleteFile);
deleteSchemaByDeleteIds.putIfAbsent(
eqFieldIds, TypeUtil.select(tableSchema, eqFieldIds));
break;
default:
throw new UnsupportedOperationException(
Expand All @@ -155,8 +151,7 @@ protected CombinedDeleteFilter(
.map(s -> s.path().toString())
.collect(Collectors.toSet());
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.deleteSchema = TypeUtil.select(tableSchema, deleteIds);
this.requiredSchema = TypeUtil.select(tableSchema, deleteIds);

if (structLikeCollections != null) {
this.structLikeCollections = structLikeCollections;
Expand Down Expand Up @@ -186,12 +181,20 @@ public boolean isFilterEqDelete() {

protected abstract AuthenticatedFileIO getFileIO();

/**
* Get all delete ids of equality delete files
*
* <p>For example, if there are two equality delete fields, one is [1, 2] and another is [1], the
* delete ids will be [1, 2].
*
* @return delete ids
*/
public Set<Integer> deleteIds() {
return deleteIds;
}

public boolean hasPosition() {
return posDeletes != null && posDeletes.size() > 0;
return posDeletes != null && !posDeletes.isEmpty();
}

public void close() {
Expand Down Expand Up @@ -232,35 +235,85 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
return eqPredicate;
}

if (eqDeletes.isEmpty()) {
if (eqDeleteFilesByDeleteIds.isEmpty()) {
return record -> false;
}

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());
List<Predicate<StructForDelete<T>>> isInDeleteSets = Lists.newArrayList();
List<Closeable> structMapCloseable = Lists.newArrayList();
BloomFilter<StructLike> bloomFilter = initializeBloomFilter();
for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) {
Predicate<StructForDelete<T>> predicate =
applyEqDeletesForSchema(deleteSchemaEntry, bloomFilter, structMapCloseable);
isInDeleteSets.add(predicate);
}

BloomFilter<StructLike> bloomFilter = null;
if (filterEqDelete) {
LOG.debug(
"Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}",
dataRecordCnt);
// one million data is about 1.71M memory usage
bloomFilter = BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
try (CloseableIterable<Record> deletes =
CloseableIterable.concat(
CloseableIterable.transform(
CloseableIterable.withNoopClose(
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
s -> openFile(s, deleteSchema)))) {
for (Record record : deletes) {
StructLike identifier = internalRecordWrapper.copyFor(record);
bloomFilter.put(identifier);
}
} catch (IOException e) {
throw new RuntimeException(e);
Predicate<StructForDelete<T>> isInDelete =
isInDeleteSets.stream().reduce(Predicate::or).orElse(record -> false);
this.eqPredicate =
new CloseablePredicate<>(isInDelete, structMapCloseable.toArray(new Closeable[0]));
return isInDelete;
}

private BloomFilter<StructLike> initializeBloomFilter() {
if (!filterEqDelete) {
return null;
}

LOG.debug(
"Enable bloom-filter to filter eq-delete, (rewrite + rewrite pos) data count is {}",
dataRecordCnt);
// one million data is about 1.71M memory usage
BloomFilter<StructLike> bloomFilter =
BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);

Map<Set<Integer>, InternalRecordWrapper> recordWrappers = Maps.newHashMap();
for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) {
Set<Integer> ids = deleteSchemaEntry.getKey();
Schema deleteSchema = deleteSchemaEntry.getValue();

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());
recordWrappers.put(ids, internalRecordWrapper);
}

try (CloseableIterable<Record> deletes = readRecords()) {
for (Record record : deletes) {
recordWrappers.forEach(
(ids, internalRecordWrapper) -> {
Schema deleteSchema = deleteSchemaByDeleteIds.get(ids);
StructProjection projection =
StructProjection.create(requiredSchema, deleteSchema).wrap(record);
StructLike deletePK = internalRecordWrapper.copyFor(projection);
bloomFilter.put(deletePK);
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}

return bloomFilter;
}

private CloseableIterable<Record> readRecords() {
return CloseableIterable.concat(
CloseableIterable.transform(
CloseableIterable.withNoopClose(
Arrays.stream(input.dataFiles()).collect(Collectors.toList())),
s -> openFile(s, requiredSchema)));
}

private Predicate<StructForDelete<T>> applyEqDeletesForSchema(
Map.Entry<Set<Integer>, Schema> deleteSchemaEntry,
BloomFilter<StructLike> bloomFilter,
List<Closeable> structMapCloseable) {
Set<Integer> ids = deleteSchemaEntry.getKey();
Schema deleteSchema = deleteSchemaEntry.getValue();
Iterable<DeleteFile> eqDeletes = eqDeleteFilesByDeleteIds.get(ids);

InternalRecordWrapper internalRecordWrapper =
new InternalRecordWrapper(deleteSchema.asStruct());

CloseableIterable<RecordWithLsn> deleteRecords =
CloseableIterable.transform(
CloseableIterable.concat(
Expand Down Expand Up @@ -294,23 +347,20 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
} catch (IOException e) {
throw new RuntimeException(e);
}
structMapCloseable.add(structLikeMap);

return structForDelete -> {
StructProjection deleteProjection =
StructProjection.create(requiredSchema, deleteSchema).wrap(structForDelete.getPk());
StructLike dataPk = internalRecordWrapper.copyFor(deleteProjection);
Long dataLSN = structForDelete.getLsn();
Long deleteLsn = structLikeMap.get(dataPk);
if (deleteLsn == null) {
return false;
}

Predicate<StructForDelete<T>> isInDeleteSet =
structForDelete -> {
StructLike dataPk = internalRecordWrapper.copyFor(structForDelete.getPk());
Long dataLSN = structForDelete.getLsn();
Long deleteLsn = structLikeMap.get(dataPk);
if (deleteLsn == null) {
return false;
}

return deleteLsn.compareTo(dataLSN) > 0;
};

CloseablePredicate<StructForDelete<T>> closeablePredicate =
new CloseablePredicate<>(isInDeleteSet, structLikeMap);
this.eqPredicate = closeablePredicate;
return isInDeleteSet;
return deleteLsn.compareTo(dataLSN) > 0;
};
}

private CloseableIterable<StructForDelete<T>> applyEqDeletes(
Expand All @@ -322,7 +372,7 @@ private CloseableIterable<StructForDelete<T>> applyEqDeletes(
private CloseableIterable<StructForDelete<T>> eqDeletesBase(
CloseableIterable<StructForDelete<T>> records, Predicate<StructForDelete<T>> predicate) {
// Predicate to test whether a row should be visible to user after applying equality deletions.
if (eqDeletes.isEmpty()) {
if (eqDeleteFilesByDeleteIds.isEmpty()) {
return records;
}

Expand Down
Loading
Loading