Skip to content

Commit

Permalink
Detect cascading actions in RDS source (#5168)
Browse files Browse the repository at this point in the history
* Detect cascades

Signed-off-by: Hai Yan <oeyh@amazon.com>

* Address comments

Signed-off-by: Hai Yan <oeyh@amazon.com>

---------

Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh authored Nov 11, 2024
1 parent f99a08f commit 85f1718
Show file tree
Hide file tree
Showing 20 changed files with 931 additions and 21 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ dependencies {
implementation 'com.zendesk:mysql-binlog-connector-java:0.29.2'
implementation 'com.mysql:mysql-connector-j:8.4.0'

compileOnly 'org.projectlombok:lombok:1.18.20'
annotationProcessor 'org.projectlombok:lombok:1.18.20'

testImplementation project(path: ':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(path: ':data-prepper-test-event')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition;

import java.util.function.Function;

Expand All @@ -34,6 +35,8 @@ public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem
return new DataFilePartition(partitionStoreItem);
case StreamPartition.PARTITION_TYPE:
return new StreamPartition(partitionStoreItem);
case ResyncPartition.PARTITION_TYPE:
return new ResyncPartition(partitionStoreItem);
default:
// Unable to acquire other partitions.
return new GlobalState(partitionStoreItem);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.partition;

import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState;

import java.util.Optional;

public class ResyncPartition extends EnhancedSourcePartition<ResyncProgressState> {

public static final String PARTITION_TYPE = "RESYNC";

private final String database;
private final String table;
private final long timestamp;
private final ResyncProgressState state;

public ResyncPartition(String database, String table, long timestamp, ResyncProgressState state) {
this.database = database;
this.table = table;
this.timestamp = timestamp;
this.state = state;
}

public ResyncPartition(SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|");
database = keySplits[0];
table = keySplits[1];
timestamp = Long.parseLong(keySplits[2]);
state = convertStringToPartitionProgressState(ResyncProgressState.class, sourcePartitionStoreItem.getPartitionProgressState());
}

@Override
public String getPartitionType() {
return PARTITION_TYPE;
}

@Override
public String getPartitionKey() {
return database + "|" + table + "|" + timestamp;
}

@Override
public Optional<ResyncProgressState> getProgressState() {
if (state != null) {
return Optional.of(state);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Setter
@Getter
public class ResyncProgressState {
@JsonProperty("foreignKeyName")
private String foreignKeyName;

@JsonProperty("updatedValue")
private Object updatedValue;

@JsonProperty("primaryKeys")
private List<String> primaryKeys;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;

public class StreamProgressState {

Expand All @@ -16,6 +19,9 @@ public class StreamProgressState {
@JsonProperty("waitForExport")
private boolean waitForExport = false;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}
Expand All @@ -31,4 +37,12 @@ public boolean shouldWaitForExport() {
public void setWaitForExport(boolean waitForExport) {
this.waitForExport = waitForExport;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;
import static org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata.DOT_DELIMITER;

public class DataFileLoader implements Runnable {

Expand Down Expand Up @@ -116,7 +117,7 @@ public void run() {

DataFileProgressState progressState = dataFilePartition.getProgressState().get();

final String fullTableName = progressState.getSourceDatabase() + "." + progressState.getSourceTable();
final String fullTableName = progressState.getSourceDatabase() + DOT_DELIMITER + progressState.getSourceTable();
final List<String> primaryKeys = progressState.getPrimaryKeyMap().getOrDefault(fullTableName, List.of());

final long snapshotTime = progressState.getSnapshotTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) {
final StreamProgressState progressState = new StreamProgressState();
progressState.setWaitForExport(sourceConfig.isExportEnabled());
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition);
progressState.setForeignKeyRelations(schemaManager.getForeignKeyRelations(sourceConfig.getTableNames()));
StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);
sourceCoordinator.createPartition(streamPartition);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.model;

import java.sql.DatabaseMetaData;
import java.util.Set;

public enum ForeignKeyAction {
CASCADE,
NO_ACTION,
RESTRICT,
SET_DEFAULT,
SET_NULL,
UNKNOWN;

private static final Set<ForeignKeyAction> CASCADING_ACTIONS = Set.of(CASCADE, SET_DEFAULT, SET_NULL);
/**
* Returns the corresponding ForeignKeyAction for the given metadata action value.
*
* @param action the metadata action value
* @return the corresponding ForeignKeyAction
*/
public static ForeignKeyAction getActionFromMetadata(short action) {
switch (action) {
case DatabaseMetaData.importedKeyCascade:
return CASCADE;
case DatabaseMetaData.importedKeySetNull:
return SET_NULL;
case DatabaseMetaData.importedKeySetDefault:
return SET_DEFAULT;
case DatabaseMetaData.importedKeyRestrict:
return RESTRICT;
case DatabaseMetaData.importedKeyNoAction:
return NO_ACTION;
default:
return UNKNOWN;
}
}

/**
* Checks if the foreign key action is one of the cascading actions (CASCADE, SET_DEFAULT, SET_NULL)
* that will result in changes to the foreign key value when referenced key in parent table changes.
*
* @param foreignKeyAction the foreign key action
* @return true if the foreign key action is a cascade action, false otherwise
*/
public static boolean isCascadingAction(ForeignKeyAction foreignKeyAction) {
if (foreignKeyAction == null) {
return false;
}
return CASCADING_ACTIONS.contains(foreignKeyAction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.model;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
public class ForeignKeyRelation {
// TODO: add java docs
@JsonProperty("database_name")
private final String databaseName;

@JsonProperty("parent_table_name")
private final String parentTableName;

@JsonProperty("referenced_key_name")
private final String referencedKeyName;

@JsonProperty("child_table_name")
private final String childTableName;

@JsonProperty("foreign_key_name")
private final String foreignKeyName;

@JsonProperty("foreign_key_default_value")
@Builder.Default
private Object foreignKeyDefaultValue = null;

@JsonProperty("update_action")
private final ForeignKeyAction updateAction;

@JsonProperty("delete_action")
private final ForeignKeyAction deleteAction;

@JsonCreator
public ForeignKeyRelation(@JsonProperty("database_name") String databaseName,
@JsonProperty("parent_table_name") String parentTableName,
@JsonProperty("referenced_key_name") String referencedKeyName,
@JsonProperty("child_table_name") String childTableName,
@JsonProperty("foreign_key_name") String foreignKeyName,
@JsonProperty("foreign_key_default_value") Object foreignKeyDefaultValue,
@JsonProperty("update_action") ForeignKeyAction updateAction,
@JsonProperty("delete_action") ForeignKeyAction deleteAction) {
this.databaseName = databaseName;
this.parentTableName = parentTableName;
this.referencedKeyName = referencedKeyName;
this.childTableName = childTableName;
this.foreignKeyName = foreignKeyName;
this.foreignKeyDefaultValue = foreignKeyDefaultValue;
this.updateAction = updateAction;
this.deleteAction = deleteAction;
}

/**
* Checks either update action or delete action is one of the cascading actions (CASCADE, SET_DEFAULT, SET_NULL).
*
* @param foreignKeyRelation The foreign key relation.
* @return True if the foreign key relation contains a cascade action, false otherwise.
*/
public static boolean containsCascadingAction(ForeignKeyRelation foreignKeyRelation) {
return ForeignKeyAction.isCascadingAction(foreignKeyRelation.getUpdateAction()) ||
ForeignKeyAction.isCascadingAction(foreignKeyRelation.getDeleteAction());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.model;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A data model for a parent table in a foreign key relationship
*/
@Getter
@Builder
public class ParentTable {
private final String databaseName;
private final String tableName;
/**
* Column name to a list of ForeignKeyRelation in which the column is referenced
*/
private final Map<String, List<ForeignKeyRelation>> referencedColumnMap;

@Getter(AccessLevel.NONE)
@Builder.Default
private Map<String, List<ForeignKeyRelation>> columnsWithCascadingUpdate = null;

@Getter(AccessLevel.NONE)
@Builder.Default
private Map<String, List<ForeignKeyRelation>> columnsWithCascadingDelete = null;

/**
* Returns a map of column name to a list of ForeignKeyRelation in which the column is referenced and the update action is cascading.
* @return a map of column name to a list of ForeignKeyRelation
*/
public Map<String, List<ForeignKeyRelation>> getColumnsWithCascadingUpdate() {
if (columnsWithCascadingUpdate != null) {
return columnsWithCascadingUpdate;
}

columnsWithCascadingUpdate = new HashMap<>();
for (String column : referencedColumnMap.keySet()) {
for (ForeignKeyRelation foreignKeyRelation : referencedColumnMap.get(column)) {
if (ForeignKeyAction.isCascadingAction(foreignKeyRelation.getUpdateAction())) {
if (!columnsWithCascadingUpdate.containsKey(column)) {
columnsWithCascadingUpdate.put(column, new ArrayList<>());
}
columnsWithCascadingUpdate.get(column).add(foreignKeyRelation);
}
}
}
return columnsWithCascadingUpdate;
}

/**
* Returns a map of column name to a list of ForeignKeyRelation in which the column is referenced and the delete action is cascading.
* @return a map of column name to a list of ForeignKeyRelation
*/
public Map<String, List<ForeignKeyRelation>> getColumnsWithCascadingDelete() {
if (columnsWithCascadingDelete != null) {
return columnsWithCascadingDelete;
}

columnsWithCascadingDelete = new HashMap<>();
for (String column : referencedColumnMap.keySet()) {
for (ForeignKeyRelation foreignKeyRelation : referencedColumnMap.get(column)) {
if (ForeignKeyAction.isCascadingAction(foreignKeyRelation.getDeleteAction())) {
if (!columnsWithCascadingDelete.containsKey(column)) {
columnsWithCascadingDelete.put(column, new ArrayList<>());
}
columnsWithCascadingDelete.get(column).add(foreignKeyRelation);
}
}
}
return columnsWithCascadingDelete;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Map;

public class TableMetadata {
public static final String DOT_DELIMITER = ".";

private String databaseName;
private String tableName;
private List<String> columnNames;
Expand Down Expand Up @@ -40,7 +42,7 @@ public String getTableName() {
}

public String getFullTableName() {
return databaseName + "." + tableName;
return databaseName + DOT_DELIMITER + tableName;
}

public List<String> getColumnNames() {
Expand Down
Loading

0 comments on commit 85f1718

Please sign in to comment.