Skip to content

Commit

Permalink
Iceberg ITs (#219)
Browse files Browse the repository at this point in the history
* Iceberg Data generation + IT source tests

* add test for snapshot expiry

* more changes

* more fixes

* pr feedback

* more changes

* fix closeable in tests

* removing changes no longer needed

* more changes

* cleanups

* more changes

* more changes

* more changes

* fix tests

* more changes
  • Loading branch information
vamshigv authored Nov 14, 2023
1 parent 9eddd58 commit 252b4f4
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 54 deletions.
10 changes: 10 additions & 0 deletions core/src/test/java/io/onetable/GenericTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public interface GenericTable<T, Q> extends AutoCloseable {

String getBasePath();

default String getDataPath() {
return getBasePath();
}

String getOrderByColumn();

void close();
Expand All @@ -72,6 +76,9 @@ static GenericTable getInstance(
case DELTA:
return TestSparkDeltaTable.forStandardSchemaAndPartitioning(
tableName, tempDir, sparkSession, isPartitioned ? "level" : null);
case ICEBERG:
return TestIcebergTable.forStandardSchemaAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration());
default:
throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
}
Expand All @@ -91,6 +98,9 @@ static GenericTable getInstanceWithAdditionalColumns(
case DELTA:
return TestSparkDeltaTable.forSchemaWithAdditionalColumnsAndPartitioning(
tableName, tempDir, sparkSession, isPartitioned ? "level" : null);
case ICEBERG:
return TestIcebergTable.forSchemaWithAdditionalColumnsAndPartitioning(
tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration());
default:
throw new IllegalArgumentException("Unsupported source format: " + sourceFormat);
}
Expand Down
75 changes: 42 additions & 33 deletions core/src/test/java/io/onetable/ITOneTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -63,6 +62,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;

Expand All @@ -77,6 +77,7 @@
import io.onetable.hudi.HudiSourceClientProvider;
import io.onetable.hudi.HudiSourceConfig;
import io.onetable.hudi.HudiTestUtil;
import io.onetable.iceberg.IcebergSourceClientProvider;
import io.onetable.model.storage.TableFormat;
import io.onetable.model.sync.SyncMode;

Expand All @@ -87,8 +88,6 @@ public class ITOneTableClient {

private static JavaSparkContext jsc;
private static SparkSession sparkSession;
private SourceClientProvider<HoodieInstant> hudiSourceClientProvider;
private SourceClientProvider<Long> deltaSourceClientProvider;

@BeforeAll
public static void setupOnce() {
Expand All @@ -102,14 +101,6 @@ public static void setupOnce() {
jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
}

@BeforeEach
public void setup() {
hudiSourceClientProvider = new HudiSourceClientProvider();
hudiSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap());
deltaSourceClientProvider = new DeltaSourceClientProvider();
deltaSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap());
}

@AfterAll
public static void teardown() {
if (jsc != null) {
Expand All @@ -126,7 +117,8 @@ private static Stream<Arguments> testCasesWithPartitioningAndSyncModes() {

private static Stream<Arguments> generateTestParametersForFormatsSyncModesAndPartitioning() {
List<Arguments> arguments = new ArrayList<>();
for (TableFormat sourceTableFormat : Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) {
for (TableFormat sourceTableFormat :
Arrays.asList(TableFormat.HUDI, TableFormat.DELTA, TableFormat.ICEBERG)) {
for (SyncMode syncMode : SyncMode.values()) {
for (boolean isPartitioned : new boolean[] {true, false}) {
arguments.add(Arguments.of(sourceTableFormat, syncMode, isPartitioned));
Expand All @@ -142,9 +134,18 @@ private static Stream<Arguments> testCasesWithSyncModes() {

private SourceClientProvider<?> getSourceClientProvider(TableFormat sourceTableFormat) {
if (sourceTableFormat == TableFormat.HUDI) {
SourceClientProvider<HoodieInstant> hudiSourceClientProvider = new HudiSourceClientProvider();
hudiSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap());
return hudiSourceClientProvider;
} else if (sourceTableFormat == TableFormat.DELTA) {
SourceClientProvider<Long> deltaSourceClientProvider = new DeltaSourceClientProvider();
deltaSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap());
return deltaSourceClientProvider;
} else if (sourceTableFormat == TableFormat.ICEBERG) {
SourceClientProvider<Snapshot> icebergSourceClientProvider =
new IcebergSourceClientProvider();
icebergSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap());
return icebergSourceClientProvider;
} else {
throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat);
}
Expand Down Expand Up @@ -183,6 +184,7 @@ public void testVariousOperations(
.tableName(tableName)
.targetTableFormats(targetTableFormats)
.tableBasePath(table.getBasePath())
.tableDataPath(table.getDataPath())
.hudiSourceConfig(
HudiSourceConfig.builder()
.partitionFieldSpecConfig(oneTablePartitionConfig)
Expand Down Expand Up @@ -215,6 +217,7 @@ public void testVariousOperations(
.tableName(tableName)
.targetTableFormats(targetTableFormats)
.tableBasePath(tableWithUpdatedSchema.getBasePath())
.tableDataPath(tableWithUpdatedSchema.getDataPath())
.hudiSourceConfig(
HudiSourceConfig.builder()
.partitionFieldSpecConfig(oneTablePartitionConfig)
Expand Down Expand Up @@ -254,6 +257,7 @@ public void testVariousOperations(
public void testConcurrentInsertWritesInSource(
SyncMode syncMode, PartitionConfig partitionConfig) {
String tableName = getTableName();
SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
List<TableFormat> targetTableFormats = getOtherFormats(TableFormat.HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
Expand All @@ -279,11 +283,11 @@ public void testConcurrentInsertWritesInSource(
.syncMode(syncMode)
.build();
OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);

checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 50);
table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 100);
}
}
Expand All @@ -293,7 +297,7 @@ public void testConcurrentInsertWritesInSource(
public void testConcurrentInsertsAndTableServiceWrites(
SyncMode syncMode, PartitionConfig partitionConfig) {
HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;

SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
List<TableFormat> targetTableFormats = getOtherFormats(TableFormat.HUDI);
String tableName = getTableName();
try (TestSparkHudiTable table =
Expand All @@ -313,15 +317,15 @@ public void testConcurrentInsertsAndTableServiceWrites(
.syncMode(syncMode)
.build();
OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 50);

table.deleteRecords(insertedRecords1.subList(0, 20), true);
// At this point table should have 30 records but only after compaction.
String scheduledCompactionInstant = table.onlyScheduleCompaction();

table.insertRecords(50, true);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
Map<String, String> sourceHudiOptions =
Collections.singletonMap("hoodie.datasource.query.type", "read_optimized");
// Because compaction is not completed yet and read optimized query, there are 100 records.
Expand All @@ -334,7 +338,7 @@ public void testConcurrentInsertsAndTableServiceWrites(
100);

table.insertRecords(50, true);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
// Because compaction is not completed yet and read optimized query, there are 150 records.
checkDatasetEquivalence(
TableFormat.HUDI,
Expand All @@ -345,15 +349,15 @@ public void testConcurrentInsertsAndTableServiceWrites(
150);

table.completeScheduledCompaction(scheduledCompactionInstant);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 130);
}
}

@ParameterizedTest
@EnumSource(
value = TableFormat.class,
names = {"HUDI", "DELTA"})
names = {"HUDI", "DELTA", "ICEBERG"})
public void testTimeTravelQueries(TableFormat sourceTableFormat) throws Exception {
String tableName = getTableName();
try (GenericTable table =
Expand All @@ -365,6 +369,7 @@ public void testTimeTravelQueries(TableFormat sourceTableFormat) throws Exceptio
.tableName(tableName)
.targetTableFormats(targetTableFormats)
.tableBasePath(table.getBasePath())
.tableDataPath(table.getDataPath())
.syncMode(SyncMode.INCREMENTAL)
.build();
SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(sourceTableFormat);
Expand Down Expand Up @@ -462,6 +467,7 @@ public void testPartitionedData(
String hudiPartitionConfig,
String filter) {
String tableName = getTableName();
SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, hudiPartitionConfig, HoodieTableType.COPY_ON_WRITE)) {
Expand All @@ -478,10 +484,10 @@ public void testPartitionedData(
.build();
table.insertRecords(100, true);
OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
// Do a second sync to force the test to read back the metadata it wrote earlier
table.insertRecords(100, true);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);

checkDatasetEquivalenceWithFilter(TableFormat.HUDI, table, targetTableFormats, filter);
}
Expand All @@ -491,6 +497,7 @@ public void testPartitionedData(
@EnumSource(value = SyncMode.class)
public void testSyncWithSingleFormat(SyncMode syncMode) {
String tableName = getTableName();
SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
Expand All @@ -513,18 +520,18 @@ public void testSyncWithSingleFormat(SyncMode syncMode) {
.build();

OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
oneTableClient.sync(perTableConfigIceberg, hudiSourceClientProvider);
oneTableClient.sync(perTableConfigIceberg, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 100);
oneTableClient.sync(perTableConfigDelta, hudiSourceClientProvider);
oneTableClient.sync(perTableConfigDelta, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Collections.singletonList(TableFormat.DELTA), 100);

table.insertRecords(100, true);
oneTableClient.sync(perTableConfigIceberg, hudiSourceClientProvider);
oneTableClient.sync(perTableConfigIceberg, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 200);
oneTableClient.sync(perTableConfigDelta, hudiSourceClientProvider);
oneTableClient.sync(perTableConfigDelta, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Collections.singletonList(TableFormat.DELTA), 200);
}
Expand All @@ -533,6 +540,7 @@ public void testSyncWithSingleFormat(SyncMode syncMode) {
@Test
public void testOutOfSyncIncrementalSyncs() {
String tableName = getTableName();
SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
Expand All @@ -555,13 +563,13 @@ public void testOutOfSyncIncrementalSyncs() {
table.insertRecords(50, true);
OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
// sync iceberg only
oneTableClient.sync(singleTableConfig, hudiSourceClientProvider);
oneTableClient.sync(singleTableConfig, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 50);
// insert more records
table.insertRecords(50, true);
// iceberg will be an incremental sync and delta will need to bootstrap with snapshot sync
oneTableClient.sync(dualTableConfig, hudiSourceClientProvider);
oneTableClient.sync(dualTableConfig, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 100);

Expand All @@ -570,14 +578,14 @@ public void testOutOfSyncIncrementalSyncs() {
// insert more records
table.insertRecords(50, true);
// incremental sync for two commits for iceberg only
oneTableClient.sync(singleTableConfig, hudiSourceClientProvider);
oneTableClient.sync(singleTableConfig, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 200);

// insert more records
table.insertRecords(50, true);
// incremental sync for one commit for iceberg and three commits for delta
oneTableClient.sync(dualTableConfig, hudiSourceClientProvider);
oneTableClient.sync(dualTableConfig, sourceClientProvider);
checkDatasetEquivalence(
TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 250);
}
Expand All @@ -586,6 +594,7 @@ public void testOutOfSyncIncrementalSyncs() {
@Test
public void testMetadataRetention() {
String tableName = getTableName();
SourceClientProvider<?> sourceClientProvider = getSourceClientProvider(TableFormat.HUDI);
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
Expand All @@ -599,7 +608,7 @@ public void testMetadataRetention() {
.build();
OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration());
table.insertRecords(10, true);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
// later we will ensure we can still read the source table at this instant to ensure that
// neither target cleaned up the underlying parquet files in the table
Instant instantAfterFirstCommit = Instant.now();
Expand All @@ -608,7 +617,7 @@ public void testMetadataRetention() {
.forEach(
unused -> {
table.insertRecords(10, true);
oneTableClient.sync(perTableConfig, hudiSourceClientProvider);
oneTableClient.sync(perTableConfig, sourceClientProvider);
});
// ensure that hudi rows can still be read and underlying files were not removed
List<Row> rows =
Expand Down Expand Up @@ -729,7 +738,7 @@ private void checkDatasetEquivalence(
.read()
.options(finalTargetOptions)
.format(targetFormat.name().toLowerCase())
.load(sourceTable.getBasePath())
.load(sourceTable.getDataPath())
.orderBy(sourceTable.getOrderByColumn())
.filter(filterCondition);
}));
Expand Down
Loading

0 comments on commit 252b4f4

Please sign in to comment.