diff --git a/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature b/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature index 1943c85076..540b7e3d73 100644 --- a/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature +++ b/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature @@ -195,3 +195,153 @@ Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data tr Then Open and capture logs Then Verify the pipeline status is "Succeeded" Then Verify the partition table is created with partitioned on field "bqPartitionFieldTime" + + @BQ_EXISTING_SOURCE_DATATYPE_TEST @BQ_EXISTING_SINK_DATATYPE_TEST @EXISTING_BQ_CONNECTION + Scenario: Validate user is able to read the records from BigQuery source(existing table),source table here has more columns than BigQuery sink(existing table) with update button schema with use connection functionality + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Click on the Browse button inside plugin properties + Then Select connection data row with name: "dataset" + Then Select connection data row with name: "bqSourceTable" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Verify input plugin property: "table" contains value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Click on the Browse button inside plugin properties + Then Click SELECT button inside connection data row with name: "dataset" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bgInsertDatatypeFile" + + @BQ_INSERT_SOURCE_TEST @BQ_UPDATE_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced Operations Update without updating the destination table schema with use connection functionality + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Click on the Browse button inside plugin properties + Then Select connection data row with name: "dataset" + Then Select connection data row with name: "bqSourceTable" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Verify input plugin property: "table" contains value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Click on the Browse button inside plugin properties + Then Click SELECT button inside connection data row with name: "dataset" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "update" + Then Click on the Add Button of the property: "relationTableKey" with value: + | TableKey | + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + + @BQ_INSERT_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced operations Upsert without updating the destination table schema with use connection functionality + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Click on the Browse button inside plugin properties + Then Select connection data row with name: "dataset" + Then Select connection data row with name: "bqSourceTable" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Verify input plugin property: "table" contains value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Click on the Browse button inside plugin properties + Then Click SELECT button inside connection data row with name: "dataset" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "upsert" + Then Click on the Add Button of the property: "relationTableKey" with value: + | TableKey | + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table diff --git a/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature b/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature index 83e4d5e99d..6e78173267 100644 --- a/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature +++ b/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature @@ -262,3 +262,95 @@ Feature: BigQuery source - Verification of BigQuery to BigQuery successful data Then Open and capture logs Then Verify the pipeline status is "Succeeded" Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + + @BQ_EXISTING_SOURCE_TEST @BQ_EXISTING_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Validate user is able to read data from BigQuery source(existing table) and store them in BigQuery sink(existing table) with use connection functionality + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Click on the Browse button inside plugin properties + Then Select connection data row with name: "dataset" + Then Select connection data row with name: "bqSourceTable" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Verify input plugin property: "table" contains value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Click on the Browse button inside plugin properties + Then Click SELECT button inside connection data row with name: "dataset" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqExpectedFile" + + @BQ_EXISTING_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Validate user is able to read data from BigQuery source(existing table) without clicking on the validate button of BigQuery source and store them in BigQuery sink(new table) with use connection functionality + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Click on the Browse button inside plugin properties + Then Select connection data row with name: "dataset" + Then Select connection data row with name: "bqSourceTable" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Verify input plugin property: "table" contains value: "bqSourceTable" + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Click on the Browse button inside plugin properties + Then Click SELECT button inside connection data row with name: "dataset" + Then Wait till connection data loading completes with a timeout of 60 seconds + Then Verify input plugin property: "dataset" contains value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidationExistingTables.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidationExistingTables.java new file mode 100644 index 0000000000..e30a3b9c60 --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidationExistingTables.java @@ -0,0 +1,116 @@ +package io.cdap.plugin.bigquery.stepsdesign; + +import com.esotericsoftware.minlog.Log; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cucumber.core.logging.Logger; +import io.cucumber.core.logging.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +/** + * BigQuery Plugin Existing Table validation. + */ +public class BQValidationExistingTables { + + private static final Logger LOG = LoggerFactory.getLogger(BQValidationExistingTables.class); + private static final Gson gson = new Gson(); + + /** + * Validates the actual data in a BigQuery table against the expected data in a JSON file. + * @param table The name of the BigQuery table to retrieve data from. + * @param fileName The name of the JSON file containing the expected data. + * @return True if the actual data matches the expected data, false otherwise. + */ + public static boolean validateActualDataToExpectedData(String table, String fileName) throws IOException, + InterruptedException, URISyntaxException { + Map bigQueryMap = new HashMap<>(); + Map fileMap = new HashMap<>(); + Path bqExpectedFilePath = Paths.get(BQValidationExistingTables.class.getResource("/" + fileName).toURI()); + + getBigQueryTableData(table, bigQueryMap); + getFileData(bqExpectedFilePath.toString(), fileMap); + boolean isMatched = bigQueryMap.equals(fileMap); + return isMatched; + } + + /** + * Reads a JSON file line by line and populates a map with JSON objects using a specified ID key. + *@param fileName The path to the JSON file to be read. + * @param fileMap A map where the extracted JSON objects will be stored with their ID values as keys. + */ + + public static void getFileData(String fileName, Map fileMap) { + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + String line; + while ((line = br.readLine()) != null) { + JsonObject json = gson.fromJson(line, JsonObject.class); + String idKey = getIdKey(json); + if (idKey != null) { + JsonElement idElement = json.get(idKey); + if (idElement.isJsonPrimitive()) { + String idValue = idElement.getAsString(); + fileMap.put(idValue, json); + } + } else { + Log.error("ID key not found"); + } + } + } catch (IOException e) { + Log.error("Error reading the file: " + e.getMessage()); + } + } + + private static void getBigQueryTableData(String targetTable, Map bigQueryMap) + throws IOException, InterruptedException { + String dataset = PluginPropertyUtils.pluginProp("dataset"); + String projectId = PluginPropertyUtils.pluginProp("projectId"); + String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + targetTable + "` AS t"; + TableResult result = BigQueryClient.getQueryResult(selectQuery); + + for (FieldValueList row : result.iterateAll()) { + JsonObject json = gson.fromJson(row.get(0).getStringValue(), JsonObject.class); + String idKey = getIdKey(json); // Get the actual ID key from the JSON object + if (idKey != null) { + JsonElement idElement = json.get(idKey); + if (idElement.isJsonPrimitive()) { + String id = idElement.getAsString(); + bigQueryMap.put(id, json); + } else { + Log.error("Data Mismatched"); + } + } + } + } + + /** + * Retrieves the key for the ID element in the provided JSON object. + * + * @param json The JSON object to search for the ID key. + */ + private static String getIdKey(JsonObject json) { + if (json.has("ID")) { + return "ID"; + } else if (json.has("Name")) { + return "Name"; + } else if (json.has("Price")) { + return "Price"; + } else if (json.has("Customer_Exists")) { + return "Customer_Exists"; + } else { + return null; + } + } + } diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuerySource.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuerySource.java index 056aecd335..5b69faf108 100644 --- a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuerySource.java +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuerySource.java @@ -32,6 +32,7 @@ import org.junit.Assert; import java.io.IOException; +import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; @@ -160,4 +161,13 @@ public void enterTheBigQuerySourcePropertyForViewMaterializationDataset(String v public void enterBigQuerySourcePropertyTableNameAsView() { CdfBigQueryPropertiesActions.enterBigQueryTable(TestSetupHooks.bqSourceView); } + + @Then("Validate the data transferred from BigQuery to BigQuery with actual And expected file for: {string}") + public void validateTheDataFromBQToBQWithActualAndExpectedFileFor(String expectedFile) throws IOException, + InterruptedException, URISyntaxException { + boolean recordsMatched = BQValidationExistingTables.validateActualDataToExpectedData( + PluginPropertyUtils.pluginProp("bqTargetTable"), + PluginPropertyUtils.pluginProp(expectedFile)); + Assert.assertTrue("Value of records in actual and expected file is equal", recordsMatched); + } } diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 233f154b5b..8c5b1e385f 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -178,8 +178,7 @@ public static void createBucketWithAvroTestFile() throws IOException, URISyntaxE @After(order = 1, value = "@GCS_CSV_TEST or @GCS_TSV_TEST or @GCS_BLOB_TEST " + "or @GCS_DELIMITED_TEST or @GCS_TEXT_TEST or @GCS_OUTPUT_FIELD_TEST or @GCS_DATATYPE_1_TEST or " + "@GCS_DATATYPE_2_TEST or @GCS_READ_RECURSIVE_TEST or @GCS_DELETE_WILDCARD_TEST or @GCS_CSV_RANGE_TEST or" + - " @GCS_PARQUET_TEST or @GCS_AVRO_TEST or @GCS_DATATYPE_TEST or @GCS_AVRO_FILE or @GCS_CSV or " + - "@GCS_MULTIPLE_FILES_TEST or @GCS_MULTIPLE_FILES_REGEX_TEST") + " @GCS_PARQUET_TEST or @GCS_AVRO_TEST or @GCS_DATATYPE_TEST or @GCS_AVRO_FILE") public static void deleteSourceBucketWithFile() { deleteGCSBucket(gcsSourceBucketName); PluginPropertyUtils.removePluginProp("gcsSourceBucketName"); @@ -270,7 +269,8 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce } @After(order = 1, value = "@BQ_SOURCE_TEST or @BQ_PARTITIONED_SOURCE_TEST or @BQ_SOURCE_DATATYPE_TEST or " + - "@BQ_INSERT_SOURCE_TEST or @BQ_UPDATE_SINK_TEST") + "@BQ_INSERT_SOURCE_TEST or @BQ_UPDATE_SINK_TEST or @BQ_EXISTING_SOURCE_TEST or @BQ_EXISTING_SINK_TEST or " + + "@BQ_EXISTING_SOURCE_DATATYPE_TEST or @BQ_EXISTING_SINK_DATATYPE_TEST") public static void deleteTempSourceBQTable() throws IOException, InterruptedException { BigQueryClient.dropBqQuery(bqSourceTable); PluginPropertyUtils.removePluginProp("bqSourceTable"); @@ -937,8 +937,97 @@ public static void createBucketWithMultipleTestFiles() throws IOException, URISy } @Before(order = 1, value = "@GCS_MULTIPLE_FILES_REGEX_TEST") - public static void createBucketWithMultipleTestFilesWithRegex() throws IOException, URISyntaxException { + public static void createBucketWithMultipleTestFilesWithRegex () throws IOException, URISyntaxException { gcsSourceBucketName = createGCSBucketWithMultipleFiles(PluginPropertyUtils.pluginProp( "gcsMultipleFilesFilterRegexPath")); + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } + + @Before(order = 1, value = "@BQ_EXISTING_SOURCE_TEST") + public static void createSourceBQExistingTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-" , "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(1, 'Raja Sharma', 200.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_EXISTING_SINK_TEST") + public static void createSinkBQExistingTable() throws IOException, InterruptedException { + + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64,Name STRING," + + "Price FLOAT64, Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + } + + @Before(order = 1, value = "@BQ_EXISTING_SOURCE_DATATYPE_TEST") + public static void createSourceBQExistingDatatypeTable () throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL, transaction_date DATE," + + "business_ratio NUMERIC, updated_on TIMESTAMP ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, Customer_Exists,transaction_date," + + "business_ratio,updated_on)" + + "VALUES" + "(1, 'Raja Sharma', 200.0, true," + + "'2021-01-28'," + "0.0904809091," + + "'2018-03-10 04:50:01 UTC' " + + ") "); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_EXISTING_SINK_DATATYPE_TEST") + public static void createSinkBQExistingDatatypeTable() throws IOException, InterruptedException { + + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64,Name STRING," + + "Price FLOAT64, Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100.0, true)"); + + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); } } diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index eda2da060f..8fb47c51c0 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -205,6 +205,8 @@ bqInvalidRefName=invalidRef&^*&&* bqDatatypeChange1=[{"key":"Id","value":"long"},{"key":"Value","value":"long"}] bqDataTypeTestFileSchema1=[{"key":"Id","value":"long"},{"key":"Value","value":"long"},\ {"key":"UID","value":"string"}] +bqExpectedFile=testdata/BigQuery/BQExistingTableFile +bgInsertDatatypeFile=testdata/BigQuery/BQInsertDatatypeFile ## BIGQUERY-PLUGIN-PROPERTIES-END ## PUBSUBSINK-PLUGIN-PROPERTIES-START diff --git a/src/e2e-test/resources/testdata/BigQuery/BQExistingTableFile b/src/e2e-test/resources/testdata/BigQuery/BQExistingTableFile new file mode 100644 index 0000000000..b99eed37aa --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQExistingTableFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":1,"Name":"Raja Sharma","Price":200.0} +{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQInsertDatatypeFile b/src/e2e-test/resources/testdata/BigQuery/BQInsertDatatypeFile new file mode 100644 index 0000000000..68909f85e2 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQInsertDatatypeFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0,"business_ratio":null,"transaction_date":null,"updated_on":null} +{"Customer_Exists":true,"ID":1,"Name":"Raja Sharma","Price":200.0,"business_ratio":0.090480909,"transaction_date":"2021-01-28","updated_on":"2018-03-10T04:50:01Z"} \ No newline at end of file