Skip to content

Commit

Permalink
[HUDI-8518] Fix RLI and Secondary index with custom payload or merge …
Browse files Browse the repository at this point in the history
…mode (#12525)

- Ensure deletes via custom payload or merger gets synced to RLI (HUDI-8741)
- Fix secondary index updates for event_time and custom merge modes (HUDI-8518)
- Ensure secondary index updates are done in distributed manner without collected 
   any record keys or maps in driver (HUDI-8740)
  • Loading branch information
codope authored Jan 9, 2025
1 parent 3040aa5 commit ef9ab24
Show file tree
Hide file tree
Showing 23 changed files with 1,259 additions and 792 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromBaseFiles;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.readSecondaryKeysFromFileSlices;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
import static org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.convertWriteStatsToSecondaryIndexRecords;
import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;

/**
* Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
Expand Down Expand Up @@ -575,10 +575,6 @@ protected abstract HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<

protected abstract EngineType getEngineType();

public abstract HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext,
Map<String, String> recordKeySecondaryKeyMap,
HoodieIndexDefinition indexDefinition);

private Pair<Integer, HoodieData<HoodieRecord>> initializeExpressionIndexPartition(String indexName, String instantTime) throws Exception {
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName);
ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName);
Expand Down Expand Up @@ -1078,6 +1074,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood

/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
*/
Expand All @@ -1089,7 +1086,7 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient,
dataWriteConfig.getMetadataConfig(),
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding());
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType());

// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
Expand All @@ -1110,7 +1107,7 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord>
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, dataWriteConfig.getMetadataConfig(),
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding());
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getWritesFileIdEncoding(), getEngineType());
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), records.union(additionalUpdates));
updateExpressionIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
Expand Down Expand Up @@ -1185,47 +1182,19 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
});
}

private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = getPartitionFilePairs(commitMetadata);
// Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of
// the secondary index partition for each of these keys.
List<String> keysToRemove = HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(),
dataMetaClient, instantTime);

private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) {
List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
// Return early if there are no write stats.
if (allWriteStats.isEmpty() || WriteOperationType.isCompactionOrClustering(commitMetadata.getOperationType())) {
return engineContext.emptyHoodieData();
}
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexPartition);
// Fetch the secondary keys that each of the record keys ('keysToRemove') maps to
// This is obtained by scanning the entire secondary index partition in the metadata table
// This could be an expensive operation for a large commit (updating/deleting millions of rows)
Map<String, String> recordKeySecondaryKeyMap = metadata.getSecondaryKeys(keysToRemove, indexDefinition.getIndexName());
HoodieData<HoodieRecord> deletedRecords = getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap, indexDefinition);
// first deduce parallelism to avoid too few tasks for large number of records.
long totalWriteBytesForSecondaryIndex = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream)
.mapToLong(HoodieWriteStat::getTotalWriteBytes)
.sum();
// approximate task partition size of 100MB
// (TODO: make this configurable)
long targetPartitionSize = 100 * 1024 * 1024;
int parallelism = (int) Math.max(1, (totalWriteBytesForSecondaryIndex + targetPartitionSize - 1) / targetPartitionSize);

return readSecondaryKeysFromBaseFiles(
engineContext,
partitionFilePairs,
parallelism,
this.getClass().getSimpleName(),
dataMetaClient,
getEngineType(),
indexDefinition)
.union(deletedRecords)
.mapToPair(i -> Pair.of(i.getRecordKey(), i))
.reduceByKey((value1, value2) -> {
if (((HoodieMetadataPayload) value1.getData()).isDeleted()) {
return value2;
} else {
return value1;
}
}, parallelism)
.values();
// Load file system view for only the affected partitions on the driver.
// By loading on the driver one time, we avoid loading the same metadata multiple times on the executors.
HoodieMetadataFileSystemView fsView = getMetadataView();
fsView.loadPartitions(new ArrayList<>(commitMetadata.getWritePartitionPaths()));
return convertWriteStatsToSecondaryIndexRecords(allWriteStats, instantTime, indexDefinition, dataWriteConfig.getMetadataConfig(), fsView, dataMetaClient, engineContext, getEngineType());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand Down Expand Up @@ -93,6 +95,11 @@ public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
return HoodieListPairData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.eager(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,4 @@ protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaCli
protected EngineType getEngineType() {
return EngineType.FLINK;
}

@Override
public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map<String, String> recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) {
throw new HoodieNotSupportedException("Flink metadata table does not support secondary index yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand Down Expand Up @@ -76,6 +78,11 @@ public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
return HoodieListPairData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.eager(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,4 @@ protected HoodieData<HoodieRecord> getExpressionIndexRecords(List<Pair<String, P
protected EngineType getEngineType() {
return EngineType.JAVA;
}

@Override
public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map<String, String> recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) {
throw new HoodieNotSupportedException("Java metadata table writer does not support secondary index yet.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
Expand All @@ -33,13 +34,15 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.data.HoodieSparkLongAccumulator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
Expand Down Expand Up @@ -108,6 +111,11 @@ public <T> HoodieData<T> emptyHoodieData() {
return HoodieJavaRDD.of(javaSparkContext.emptyRDD());
}

@Override
public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
return HoodieJavaPairRDD.of(JavaPairRDD.fromJavaRDD(javaSparkContext.emptyRDD()));
}

public boolean supportsFileGroupReader() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.metadata;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import org.apache.hudi.HoodieSparkIndexClient;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
Expand All @@ -43,6 +42,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.expression.HoodieExpressionIndex;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
Expand All @@ -63,7 +63,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -238,22 +237,6 @@ protected EngineType getEngineType() {
return EngineType.SPARK;
}

@Override
public HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext, Map<String, String> recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition) {
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext;
if (recordKeySecondaryKeyMap.isEmpty()) {
return sparkEngineContext.emptyHoodieData();
}

List<HoodieRecord> deletedRecords = new ArrayList<>();
recordKeySecondaryKeyMap.forEach((key, value) -> {
HoodieRecord<HoodieMetadataPayload> siRecord = HoodieMetadataPayload.createSecondaryIndexRecord(key, value, indexDefinition.getIndexName(), true);
deletedRecords.add(siRecord);
});

return HoodieJavaRDD.of(deletedRecords, sparkEngineContext, 1);
}

@Override
protected void updateColumnsToIndexWithColStats(List<String> columnsToIndex) {
new HoodieSparkIndexClient(dataWriteConfig, engineContext).createOrUpdateColumnStatsIndexDefinition(dataMetaClient, columnsToIndex);
Expand Down
Loading

0 comments on commit ef9ab24

Please sign in to comment.