Skip to content

Commit

Permalink
Merge pull request #516 from cloudsufi/oracle-date-issue-dev
Browse files Browse the repository at this point in the history
[PLUGIN-1812] Added fix for date datatype in oracle sink
  • Loading branch information
vikasrathee-cs authored Oct 23, 2024
2 parents f0347b9 + ca328a2 commit acd47fa
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 9 deletions.
51 changes: 51 additions & 0 deletions oracle-plugin/src/e2e-test/features/sink/OracleRunTime.feature
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,54 @@ Feature: Oracle - Verify data transfer from BigQuery source to Oracle sink
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table with case


@BQ_SOURCE_TEST_DATE @ORACLE_DATE_TABLE
Scenario: To verify data is getting transferred from BigQuery source to Oracle sink successfully when schema is having date and timestamp fields
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "Oracle" from the plugins list as: "Sink"
Then Connect plugins: "BigQuery" and "Oracle" to establish connection
Then Navigate to the properties page of plugin: "BigQuery"
Then Replace input plugin property: "project" with value: "projectId"
Then Enter input plugin property: "datasetProject" with value: "projectId"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Verify the Output Schema matches the Expected Schema: "outputDatatypesDateTimeSchema"
Then Validate "BigQuery" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Oracle"
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName"
Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields
Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Enter input plugin property: "referenceName" with value: "sourceRef"
Then Replace input plugin property: "database" with value: "databaseName"
Then Replace input plugin property: "tableName" with value: "targetTable"
Then Replace input plugin property: "dbSchemaName" with value: "schema"
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
Then Enter input plugin property: "referenceName" with value: "targetRef"
Then Select radio button plugin property: "connectionType" with value: "service"
Then Select radio button plugin property: "role" with value: "normal"
Then Validate "Oracle" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Verify the preview of pipeline is "success"
Then Click on preview data for Oracle sink
Then Close the preview data
Then Deploy the pipeline
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Verify the pipeline status is "Succeeded"
Then Validate records transferred to target table with record counts of BigQuery table
Then Validate the values of records transferred to target Oracle table is equal to the values from source BigQuery table
48 changes: 41 additions & 7 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/BQValidation.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.List;
Expand All @@ -44,6 +49,13 @@

public class BQValidation {

private static final List<SimpleDateFormat> TIMESTAMP_DATE_FORMATS = Arrays.asList(
new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss"),
new SimpleDateFormat("yyyy-MM-dd"));
private static final List<DateTimeFormatter> TIMESTAMP_TZ_DATE_FORMATS = Arrays.asList(
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX"),
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"));

/**
* Extracts entire data from source and target tables.
*
Expand Down Expand Up @@ -173,21 +185,43 @@ public static boolean compareResultSetAndJsonData(ResultSet rsSource, List<JsonO

case Types.TIMESTAMP:
Timestamp sourceTS = rsSource.getTimestamp(columnName);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss");
String targetT = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date dateParsed = dateFormat.parse(targetT);
Date dateParsed = null;
for (SimpleDateFormat dateTimeFormatter : TIMESTAMP_DATE_FORMATS) {
try {
dateParsed = dateTimeFormatter.parse(targetT);
break;
} catch (ParseException exception) {
// do nothing
}
}
Timestamp targetTs = new java.sql.Timestamp(dateParsed.getTime());
result = String.valueOf(sourceTS).equals(String.valueOf(targetTs));
result = sourceTS.equals(targetTs);
Assert.assertTrue("Different values found for column : %s", result);
break;

case OracleSourceSchemaReader.TIMESTAMP_TZ:
Timestamp sourceTZ = rsSource.getTimestamp(columnName);
SimpleDateFormat dateValue = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
String targetTS = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
Date date = dateValue.parse(targetTS);
Timestamp targetTZ = new Timestamp(date.getTime());
Assert.assertTrue("Different columns found for Timestamp", sourceTZ.equals(targetTZ));
ZonedDateTime targetDate = null;
for (DateTimeFormatter dateTimeFormatter : TIMESTAMP_TZ_DATE_FORMATS) {
try {
targetDate = ZonedDateTime.parse(targetTS, dateTimeFormatter);
break;
} catch (DateTimeParseException exception) {
// do nothing
}
}
Assert.assertTrue("Different columns found for Timestamp",
sourceTZ.toLocalDateTime().equals(targetDate.toLocalDateTime()));
break;

case OracleSourceSchemaReader.TIMESTAMP_LTZ:
Timestamp sourceLTZ = rsSource.getTimestamp(columnName);
String targetLTZ = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Assert.assertTrue("Different columns found for Timestamp",
sourceLTZ.toLocalDateTime().equals(LocalDateTime.parse(targetLTZ, formatter)));
break;

case OracleSourceSchemaReader.BINARY_FLOAT:
Expand Down
10 changes: 10 additions & 0 deletions oracle-plugin/src/e2e-test/java/io.cdap.plugin/OracleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,14 @@ public static void deleteTable(String schema, String table)
}
}
}

public static void createTargetDateTable(String targetTable, String schema) throws SQLException,
ClassNotFoundException {
try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) {
String createTargetTableQuery = "CREATE TABLE " + schema + "." + targetTable +
"(ID varchar2(100),DATE_COL DATE,TIMESTAMP_TZ_COL TIMESTAMP WITH TIME ZONE,TIMESTAMP_LTZ_COL " +
"TIMESTAMP WITH LOCAL TIME ZONE,INTERVAL_YM_COL INTERVAL YEAR TO MONTH,DATE_TYPE DATE)";
statement.executeUpdate(createTargetTableQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,34 @@ public static void deleteTempSourceBQTableSmallCase() throws IOException, Interr
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void createTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
createSourceBQTableWithQueries(PluginPropertyUtils.pluginProp("CreateBQTableQueryFileDate"),
PluginPropertyUtils.pluginProp("InsertBQDataQueryFileDate"));
}

@After(order = 1, value = "@BQ_SOURCE_TEST_DATE")
public static void deleteTempSourceBQTableWithDateColumns() throws IOException, InterruptedException {
String bqSourceTable = PluginPropertyUtils.pluginProp("bqSourceTable");
BigQueryClient.dropBqQuery(bqSourceTable);
BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " deleted successfully");
PluginPropertyUtils.removePluginProp("bqSourceTable");
}

@Before(order = 2, value = "@ORACLE_DATE_TABLE")
public static void createOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.createTargetDateTable(PluginPropertyUtils.pluginProp("targetTable"),
PluginPropertyUtils.pluginProp("schema"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " created successfully");
}

@After(order = 2, value = "@ORACLE_DATE_TABLE")
public static void dropOracleTargetDateTable() throws SQLException, ClassNotFoundException {
OracleClient.deleteTable(PluginPropertyUtils.pluginProp("schema"),
PluginPropertyUtils.pluginProp("targetTable"));
BeforeActions.scenario.write("Oracle Target Table - " + PluginPropertyUtils.pluginProp("targetTable")
+ " deleted successfully");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ InsertBQDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt
#bq queries file path for Small Case Schema
CreateBQTableQueryFileSmallCase=testdata/BigQuery/BigQueryCreateTableQuerySmallCase.txt
InsertBQDataQueryFileSmallCase=testdata/BigQuery/BigQueryInsertDataQuerySmallCase.txt
#bq queries file path for various Date time type Fields
CreateBQTableQueryFileDate=testdata/BigQuery/CreateBQTableQueryFileDate.txt
InsertBQDataQueryFileDate=testdata/BigQuery/InsertBQDataQueryFileDate.txt

#ORACLE Datatypes
bigQueryColumns=(COL23 FLOAT(4), COL28 TIMESTAMP, COL29 TIMESTAMP(9), COL30 TIMESTAMP WITH TIME ZONE, \
Expand All @@ -125,3 +128,6 @@ outputDatatypesSchema1=[{"key":"COL23","value":"double"},{"key":"COL28","value":
{"key":"COL29","value":"datetime"},{"key":"COL30","value":"timestamp"},{"key":"COL31","value":"string"},\
{"key":"COL32","value":"string"},{"key":"COL33","value":"datetime"},{"key":"COL34","value":"float"},\
{"key":"COL35","value":"double"}]
outputDatatypesDateTimeSchema=[{"key":"ID","value":"string"},{"key":"DATE_COL","value":"datetime"},\
{"key":"TIMESTAMP_TZ_COL","value":"timestamp"},{"key":"TIMESTAMP_LTZ_COL","value":"datetime"},\
{"key":"INTERVAL_YM_COL","value":"string"},{"key":"DATE_TYPE","value":"date"}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TABLE `DATASET.TABLE_NAME` (ID STRING NOT NULL,DATE_COL DATETIME, TIMESTAMP_TZ_COL TIMESTAMP, TIMESTAMP_LTZ_COL DATETIME, INTERVAL_YM_COL STRING,DATE_TYPE DATE);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO `DATASET.TABLE_NAME` (id, date_col, timestamp_tz_col, timestamp_ltz_col, interval_ym_col,date_type) VALUES('2', '2024-10-11', '2024-10-11 14:30:00.123456+00:00', '2024-10-11 14:30:00.123456','2-6','2024-10-11');
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String timestampString = Timestamp.valueOf(localDateTime).toString();
Object timestampWithTimeZone = createOracleTimestamp(stmt.getConnection(), timestampString);
stmt.setObject(sqlIndex, timestampWithTimeZone);
} else if (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) {
// Deprecated: Handle the case when the Timestamp is mapped to CDAP Timestamp type
} else {
// Handle the case when the Timestamp is mapped to CDAP Timestamp type or CDAP Date type.
super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
}
} else {
Expand Down

0 comments on commit acd47fa

Please sign in to comment.