Skip to content

Commit

Permalink
Persistence Component: Unitemp Delta: Always Derive Optimization Filt…
Browse files Browse the repository at this point in the history
…ers + Bitemp Delta: Fix Temp Table Dropping (finos#3146)
  • Loading branch information
kumuwu authored Oct 16, 2024
1 parent c6f70c6 commit 7fe40bf
Show file tree
Hide file tree
Showing 20 changed files with 104 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@ public interface OptimizationFilterAbstract
@Value.Parameter(order = 0)
String fieldName();

@Value.Parameter(order = 1)
String lowerBoundPattern();
@Value.Default
default String lowerBoundPattern()
{
return "{" + fieldName() + "_legend_persistence_lower}";
}

@Value.Parameter(order = 2)
String upperBoundPattern();
@Value.Default
default String upperBoundPattern()
{
return "{" + fieldName() + "_legend_persistence_upper}";
}

@Value.Default
default boolean includesNullValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.DiffBinaryValueOperator;
import org.finos.legend.engine.persistence.components.util.Capability;
import org.finos.legend.engine.persistence.components.util.LogicalPlanUtils;
import org.finos.legend.engine.persistence.components.util.TableNameGenUtils;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -67,7 +68,6 @@
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_UPDATED;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_TERMINATED;
import static org.finos.legend.engine.persistence.components.common.StatisticName.ROWS_INSERTED;
import static org.finos.legend.engine.persistence.components.util.LogicalPlanUtils.UNDERSCORE;

class BitemporalDeltaPlanner extends BitemporalPlanner
{
Expand Down Expand Up @@ -110,7 +110,7 @@ class BitemporalDeltaPlanner extends BitemporalPlanner

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime && ingestMode().filterExistingRecords())
{
this.stagingDataset = getStagingDatasetWithoutDuplicates(datasets);
this.stagingDataset = getStagingDatasetWithoutDuplicates(datasets, options().ingestRunId());
this.stagingDatasetWithoutDuplicates = Optional.of(this.stagingDataset);
}
else
Expand Down Expand Up @@ -173,22 +173,23 @@ class BitemporalDeltaPlanner extends BitemporalPlanner

if (ingestMode().validityMilestoning().validityDerivation() instanceof SourceSpecifiesFromDateTime)
{
this.tempDataset = LogicalPlanUtils.getTempDataset(datasets);
this.tempDataset = LogicalPlanUtils.getTempDataset(datasets, options().ingestRunId());
if (deleteIndicatorField.isPresent())
{
this.tempDatasetWithDeleteIndicator = LogicalPlanUtils.getTempDatasetWithDeleteIndicator(datasets, deleteIndicatorField.get());
this.tempDatasetWithDeleteIndicator = LogicalPlanUtils.getTempDatasetWithDeleteIndicator(datasets, deleteIndicatorField.get(), options().ingestRunId());
}
}
}

private Dataset getStagingDatasetWithoutDuplicates(Datasets datasets)
private Dataset getStagingDatasetWithoutDuplicates(Datasets datasets, String ingestRunId)
{
String tableName = stagingDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
String stagingTableName = stagingDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
String tempDatasetName = TableNameGenUtils.generateTableName(stagingTableName, STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME, ingestRunId);
return datasets.stagingDatasetWithoutDuplicates().orElse(DatasetDefinition.builder()
.schema(stagingDataset().schema())
.database(stagingDataset().datasetReference().database())
.group(stagingDataset().datasetReference().group())
.name(tableName + UNDERSCORE + STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME)
.name(tempDatasetName)
.alias(STAGE_DATASET_WITHOUT_DUPLICATES_BASE_NAME)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,20 @@ public static StagedFilesFieldValue getStagedFilesFieldValueWithType(Dataset dat
.build();
}

public static Dataset getTempDataset(Datasets datasets)
public static Dataset getTempDataset(Datasets datasets, String ingestRunId)
{
String mainDatasetName = datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
String tempDatasetName = TableNameGenUtils.generateTableName(mainDatasetName, TEMP_DATASET_BASE_NAME, ingestRunId);
return datasets.tempDataset().orElse(DatasetDefinition.builder()
.schema(datasets.mainDataset().schema())
.database(datasets.mainDataset().datasetReference().database())
.group(datasets.mainDataset().datasetReference().group())
.name(mainDatasetName + UNDERSCORE + TEMP_DATASET_BASE_NAME)
.name(tempDatasetName)
.alias(TEMP_DATASET_BASE_NAME)
.build());
}

public static Dataset getTempDatasetWithDeleteIndicator(Datasets datasets, String deleteIndicatorField)
public static Dataset getTempDatasetWithDeleteIndicator(Datasets datasets, String deleteIndicatorField, String ingestRunId)
{
if (datasets.tempDatasetWithDeleteIndicator().isPresent())
{
Expand All @@ -462,14 +463,15 @@ public static Dataset getTempDatasetWithDeleteIndicator(Datasets datasets, Strin
else
{
String mainDatasetName = datasets.mainDataset().datasetReference().name().orElseThrow((IllegalStateException::new));
String tempDatasetName = TableNameGenUtils.generateTableName(mainDatasetName, TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME, ingestRunId);
Field deleteIndicator = Field.builder().name(deleteIndicatorField).type(FieldType.of(DataType.BOOLEAN, Optional.empty(), Optional.empty())).build();
List<Field> mainFieldsPlusDeleteIndicator = new ArrayList<>(datasets.mainDataset().schema().fields());
mainFieldsPlusDeleteIndicator.add(deleteIndicator);
return DatasetDefinition.builder()
.schema(datasets.mainDataset().schema().withFields(mainFieldsPlusDeleteIndicator))
.database(datasets.mainDataset().datasetReference().database())
.group(datasets.mainDataset().datasetReference().group())
.name(mainDatasetName + UNDERSCORE + TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME)
.name(tempDatasetName)
.alias(TEMP_DATASET_WITH_DELETE_INDICATOR_BASE_NAME)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndNoDataSplits(Generator
@Override
public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndWithDataSplits(List<GeneratorResult> operations, List<DataSplitRange> dataSplitRanges)
{
String tempName = operations.get(0).preActionsSql().get(2).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempWithDeleteIndicatorName = operations.get(0).preActionsSql().get(3).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempName = "\"mydb\".\"main_legend_persistence_temp_lp_yosulf\"";
String tempWithDeleteIndicatorName = "\"mydb\".\"main_legend_persistence_tempWithDeleteIndicator_lp_yosulf\"";

String expectedBitemporalFromOnlyDefaultTempTableCreateQuery = "CREATE TABLE IF NOT EXISTS " + tempName +
"(\"id\" INTEGER NOT NULL," +
Expand Down Expand Up @@ -786,9 +786,9 @@ public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndNoDataSplitsFilterDupl
@Override
public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndWithDataSplitsFilterDuplicates(List<GeneratorResult> operations, List<DataSplitRange> dataSplitRanges)
{
String tempName = operations.get(0).preActionsSql().get(2).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempWithDeleteIndicatorName = operations.get(0).preActionsSql().get(3).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String stageWithoutDuplicatesName = operations.get(0).preActionsSql().get(4).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempName = "\"mydb\".\"main_legend_persistence_temp_lp_yosulf\"";
String tempWithDeleteIndicatorName = "\"mydb\".\"main_legend_persistence_tempWithDeleteIndicator_lp_yosulf\"";
String stageWithoutDuplicatesName = "\"mydb\".\"staging_legend_persistence_stageWithoutDuplicates_lp_yosulf\"";

String expectedBitemporalFromOnlyDefaultTempTableCreateQuery = "CREATE TABLE IF NOT EXISTS " + tempName +
"(\"id\" INTEGER NOT NULL," +
Expand Down Expand Up @@ -950,9 +950,9 @@ public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndWithDataSplitsFilterDu

Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), operations.get(0).metadataIngestSql().get(0));

Assertions.assertEquals(getDropTempTableQuery("\"mydb\".\"main_legend_persistence_temp\""), operations.get(0).postCleanupSql().get(0));
Assertions.assertEquals(getDropTempTableQuery("\"mydb\".\"main_legend_persistence_tempWithDeleteIndicator\""), operations.get(0).postCleanupSql().get(1));
Assertions.assertEquals(getDropTempTableQuery("\"mydb\".\"staging_legend_persistence_stageWithoutDuplicates\""), operations.get(0).postCleanupSql().get(2));
Assertions.assertEquals(getDropTempTableQuery(tempName), operations.get(0).postCleanupSql().get(0));
Assertions.assertEquals(getDropTempTableQuery(tempWithDeleteIndicatorName), operations.get(0).postCleanupSql().get(1));
Assertions.assertEquals(getDropTempTableQuery(stageWithoutDuplicatesName), operations.get(0).postCleanupSql().get(2));

Assertions.assertEquals(2, operations.size());
String incomingRecordCount = "SELECT COUNT(*) as \"incomingRecordCount\" FROM \"mydb\".\"staging\" as stage WHERE (stage.\"data_split\" >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.\"data_split\" <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}')";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public void verifyUnitemporalDeltaNoDeleteIndWithOptimizationFilters(GeneratorRe

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " +
"SET sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND ((sink.\"id\" >= '{ID_LOWER_BOUND}') AND (sink.\"id\" <= '{ID_UPPER_BOUND}')) AND " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND ((sink.\"id\" >= '{id_legend_persistence_lower}') AND (sink.\"id\" <= '{id_legend_persistence_upper}')) AND " +
"(EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage " +
"WHERE ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND " +
"(sink.\"digest\" <> stage.\"digest\")))";
Expand All @@ -564,7 +564,7 @@ public void verifyUnitemporalDeltaNoDeleteIndWithOptimizationFilters(GeneratorRe
"WHERE NOT (EXISTS " +
"(SELECT * FROM \"mydb\".\"main\" as sink WHERE (sink.\"batch_id_out\" = 999999999) AND " +
"(sink.\"digest\" = stage.\"digest\") AND ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) " +
"AND ((sink.\"id\" >= '{ID_LOWER_BOUND}') AND (sink.\"id\" <= '{ID_UPPER_BOUND}')))))";
"AND ((sink.\"id\" >= '{id_legend_persistence_lower}') AND (sink.\"id\" <= '{id_legend_persistence_upper}')))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));
Expand All @@ -591,7 +591,7 @@ public void verifyUnitemporalDeltaNoDeleteIndWithOptimizationFiltersIncludesNull

String expectedMilestoneQuery = "UPDATE \"mydb\".\"main\" as sink " +
"SET sink.\"batch_id_out\" = (SELECT COALESCE(MAX(batch_metadata.\"table_batch_id\"),0)+1 FROM batch_metadata as batch_metadata WHERE UPPER(batch_metadata.\"table_name\") = 'MAIN')-1 " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND (((sink.\"id\" >= '{ID_LOWER_BOUND}') AND (sink.\"id\" <= '{ID_UPPER_BOUND}')) OR (sink.\"id\" IS NULL)) AND " +
"WHERE (sink.\"batch_id_out\" = 999999999) AND (((sink.\"id\" >= '{id_legend_persistence_lower}') AND (sink.\"id\" <= '{id_legend_persistence_upper}')) OR (sink.\"id\" IS NULL)) AND " +
"(EXISTS (SELECT * FROM \"mydb\".\"staging\" as stage " +
"WHERE ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) AND " +
"(sink.\"digest\" <> stage.\"digest\")))";
Expand All @@ -605,7 +605,7 @@ public void verifyUnitemporalDeltaNoDeleteIndWithOptimizationFiltersIncludesNull
"WHERE NOT (EXISTS " +
"(SELECT * FROM \"mydb\".\"main\" as sink WHERE (sink.\"batch_id_out\" = 999999999) AND " +
"(sink.\"digest\" = stage.\"digest\") AND ((sink.\"id\" = stage.\"id\") AND (sink.\"name\" = stage.\"name\")) " +
"AND (((sink.\"id\" >= '{ID_LOWER_BOUND}') AND (sink.\"id\" <= '{ID_UPPER_BOUND}')) OR (sink.\"id\" IS NULL)))))";
"AND (((sink.\"id\" >= '{id_legend_persistence_lower}') AND (sink.\"id\" <= '{id_legend_persistence_upper}')) OR (sink.\"id\" IS NULL)))))";

Assertions.assertEquals(AnsiTestArtifacts.expectedMainTableBatchIdBasedCreateQuery, preActionsSql.get(0));
Assertions.assertEquals(getExpectedMetadataTableCreateQuery(), preActionsSql.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndNoDataSplits(Generator
@Override
public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndWithDataSplits(List<GeneratorResult> operations, List<DataSplitRange> dataSplitRanges)
{
String tempName = operations.get(0).preActionsSql().get(2).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempWithDeleteIndicatorName = operations.get(0).preActionsSql().get(3).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempName = "`mydb`.`main_legend_persistence_temp_lp_yosulf`";
String tempWithDeleteIndicatorName = "`mydb`.`main_legend_persistence_tempWithDeleteIndicator_lp_yosulf`";

String expectedBitemporalFromOnlyDefaultTempTableCreateQuery = "CREATE TABLE IF NOT EXISTS " + tempName +
"(`id` INT64 NOT NULL," +
Expand Down Expand Up @@ -766,9 +766,9 @@ public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndNoDataSplitsFilterDupl
@Override
public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndWithDataSplitsFilterDuplicates(List<GeneratorResult> operations, List<DataSplitRange> dataSplitRanges)
{
String tempName = operations.get(0).preActionsSql().get(2).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempWithDeleteIndicatorName = operations.get(0).preActionsSql().get(3).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String stageWithoutDuplicatesName = operations.get(0).preActionsSql().get(4).split("CREATE TABLE IF NOT EXISTS ")[1].split("\\(")[0];
String tempName = "`mydb`.`main_legend_persistence_temp_lp_yosulf`";
String tempWithDeleteIndicatorName = "`mydb`.`main_legend_persistence_tempWithDeleteIndicator_lp_yosulf`";
String stageWithoutDuplicatesName = "`mydb`.`staging_legend_persistence_stageWithoutDuplicates_lp_yosulf`";

String expectedBitemporalFromOnlyDefaultTempTableCreateQuery = "CREATE TABLE IF NOT EXISTS " + tempName +
"(`id` INT64 NOT NULL," +
Expand Down Expand Up @@ -930,9 +930,9 @@ public void verifyBitemporalDeltaBatchIdBasedWithDeleteIndWithDataSplitsFilterDu

Assertions.assertEquals(getExpectedMetadataTableIngestQuery(), operations.get(0).metadataIngestSql().get(0));

Assertions.assertEquals(getDropTempTableQuery("`mydb`.`main_legend_persistence_temp`"), operations.get(0).postCleanupSql().get(0));
Assertions.assertEquals(getDropTempTableQuery("`mydb`.`main_legend_persistence_tempWithDeleteIndicator`"), operations.get(0).postCleanupSql().get(1));
Assertions.assertEquals(getDropTempTableQuery("`mydb`.`staging_legend_persistence_stageWithoutDuplicates`"), operations.get(0).postCleanupSql().get(2));
Assertions.assertEquals(getDropTempTableQuery(tempName), operations.get(0).postCleanupSql().get(0));
Assertions.assertEquals(getDropTempTableQuery(tempWithDeleteIndicatorName), operations.get(0).postCleanupSql().get(1));
Assertions.assertEquals(getDropTempTableQuery(stageWithoutDuplicatesName), operations.get(0).postCleanupSql().get(2));

Assertions.assertEquals(2, operations.size());
String incomingRecordCount = "SELECT COUNT(*) as `incomingRecordCount` FROM `mydb`.`staging` as stage WHERE (stage.`data_split` >= '{DATA_SPLIT_LOWER_BOUND_PLACEHOLDER}') AND (stage.`data_split` <= '{DATA_SPLIT_UPPER_BOUND_PLACEHOLDER}')";
Expand Down
Loading

0 comments on commit 7fe40bf

Please sign in to comment.