From 23c9479d66850d96472543a64c3939b45d26073e Mon Sep 17 00:00:00 2001 From: priyabhatnagar Date: Fri, 28 Jul 2023 21:04:49 +0530 Subject: [PATCH] BQ e2e updated tests --- .../sink/BigQueryToBigQuerySink.feature | 197 ++++++++++++++++++ .../source/BigQueryToBigQuery.feature | 45 +++- .../bigquery/source/BigQueryToGCS.feature | 1 + .../bigquery/stepsdesign/BQValidation.java | 190 +++++++++++++++++ .../plugin/bigquery/stepsdesign/BigQuery.java | 47 +++++ .../common/stepsdesign/TestSetupHooks.java | 59 +++++- .../pluginDataCyAttributes.properties | 2 + .../resources/pluginParameters.properties | 7 +- 8 files changed, 537 insertions(+), 11 deletions(-) create mode 100644 src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature create mode 100644 src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidation.java create mode 100644 src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java diff --git a/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature b/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature new file mode 100644 index 0000000000..1943c85076 --- /dev/null +++ b/src/e2e-test/features/bigquery/sink/BigQueryToBigQuerySink.feature @@ -0,0 +1,197 @@ +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@BigQuery_Sink +Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer + + @BQ_SOURCE_DATATYPE_TEST @BQ_SINK_TEST + Scenario:Validate successful records transfer from BigQuery to BigQuery with partition type TIME with Partition field and require partitioned filter true + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "projectId" + And Replace input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + And Enter input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Click plugin property: "truncateTable" + Then Click plugin property: "updateTableSchema" + Then Enter BigQuery sink property partition field "bqPartitionFieldTime" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Verify the partition table is created with partitioned on field "bqPartitionFieldTime" + + @BQ_INSERT_SOURCE_TEST @BQ_UPDATE_SINK_TEST + Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced Operations Update for table key. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + And Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + And Replace input plugin property: "datasetProject" with value: "datasetprojectId" + And Replace input plugin property: "referenceName" with value: "reference" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "update" + Then Click plugin property: "updateTableSchema" + Then Click on the Add Button of the property: "relationTableKey" with value: + | TableKey | + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + + @BQ_INSERT_SOURCE_TEST @BQ_SINK_TEST + Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced operations Upsert + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + And Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + And Replace input plugin property: "datasetProject" with value: "datasetprojectId" + And Replace input plugin property: "referenceName" with value: "reference" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "upsert" + Then Click plugin property: "updateTableSchema" + Then Click on the Add Button of the property: "relationTableKey" with value: + | TableKey | + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + + @BQ_SOURCE_DATATYPE_TEST @BQ_SINK_TEST + Scenario:Validate successful records transfer from BigQuery to BigQuery with clustering order functionality + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Override Service account details if set in environment variables + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Enter BigQuery sink property partition field "bqPartitionFieldTime" + Then Click on the Add Button of the property: "clusteringOrder" with value: + | clusterValue | + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Verify the partition table is created with partitioned on field "bqPartitionFieldTime" diff --git a/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature b/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature index 3817a8fc3a..f3d4347b24 100644 --- a/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature +++ b/src/e2e-test/features/bigquery/source/BigQueryToBigQuery.feature @@ -218,4 +218,47 @@ Feature: BigQuery source - Verification of BigQuery to BigQuery successful data Then Wait till pipeline is in running state Then Open and capture logs Then Verify the pipeline status is "Succeeded" - Then Validate records transferred to target table is equal to number of records from source table + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + + + @BQ_SOURCE_TEST @BQ_SOURCE_VIEW_TEST @BQ_SINK_TEST + Scenario:Validate successful records transfer from BigQuery to BigQuery by enable querying views + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + And Enter input plugin property: "referenceName" with value: "Reference" + And Replace input plugin property: "project" with value: "projectId" + And Enter input plugin property: "datasetProject" with value: "datasetprojectId" + And Replace input plugin property: "dataset" with value: "dataset" + Then Override Service account details if set in environment variables + And Enter input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Click plugin property: "truncateTable" + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table diff --git a/src/e2e-test/features/bigquery/source/BigQueryToGCS.feature b/src/e2e-test/features/bigquery/source/BigQueryToGCS.feature index 2c0ee90ce5..e65f6511d4 100644 --- a/src/e2e-test/features/bigquery/source/BigQueryToGCS.feature +++ b/src/e2e-test/features/bigquery/source/BigQueryToGCS.feature @@ -75,6 +75,7 @@ Feature: BigQuery source - Verification of BigQuery to GCS successful data trans Then Open and capture logs Then Verify the pipeline status is "Succeeded" Then Verify data is transferred to target GCS bucket + Then Validate the values of records transferred to GCS bucket is equal to the values from source BigQuery table @BQ_SOURCE_TEST @BQ_SOURCE_VIEW_TEST @GCS_SINK_TEST Scenario:Validate successful records transfer from BigQuery to GCS by enable querying views diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidation.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidation.java new file mode 100644 index 0000000000..d9aac2c40c --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BQValidation.java @@ -0,0 +1,190 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.bigquery.stepsdesign; + +import com.google.cloud.Timestamp; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableResult; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.PluginPropertyUtils; +import org.apache.spark.sql.types.Decimal; +import org.joda.time.DateTime; +import org.junit.Assert; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Time; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * BigQuery Plugin validation. + */ +public class BQValidation { + private static List getBigQueryTableData(String table) + throws IOException, InterruptedException { + List bigQueryRows = new ArrayList<>(); + String projectId = PluginPropertyUtils.pluginProp("projectId"); + String dataset = PluginPropertyUtils.pluginProp("dataset"); + String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + table + "` AS t"; + TableResult result = BigQueryClient.getQueryResult(selectQuery); + result.iterateAll().forEach(value -> { + String json = value.get(0).getStringValue(); + JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject(); + bigQueryRows.add(jsonObject); + }); + String sortingIdKey = getIdKey(bigQueryRows.get(0)); + // Sort the list based on the determined sorting key + if (sortingIdKey != null) { + bigQueryRows.sort(Comparator.comparing(jsonData -> jsonData.get(sortingIdKey).getAsString())); + } + return bigQueryRows; + } + + /** + * Retrieves the key for the ID element in the provided JSON object. + * + * @param json The JSON object to search for the ID key. + */ + private static String getIdKey(JsonObject json) { + if (json.has("UID")) { + return "UID"; + } else if (json.has("transaction_uid")) { + return "transaction_uid"; + } else { + return null; + } + } + + public static boolean validateSourceBQToTargetBQRecord(String sourceTable, String targetTable) + throws IOException, InterruptedException { + List bigQuerySourceResponse = getBigQueryTableData(sourceTable); + List bigQueryTargetResponse = getBigQueryTableData(targetTable); + + // Compare the data from the source and target BigQuery tables + return compareSourceDataWithTargetData + (bigQuerySourceResponse, bigQueryTargetResponse, targetTable); + } + + public static boolean compareSourceDataWithTargetData(List sourceData, List targetData, + String tableName) { + if (targetData == null) { + Assert.fail("targetData is null"); + return false; + } + + if (sourceData.size() != targetData.size()) { + Assert.fail("Number of rows in source table is not equal to the number of rows in target table"); + return false; + } + + com.google.cloud.bigquery.BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); + String projectId = PluginPropertyUtils.pluginProp("projectId"); + String dataset = PluginPropertyUtils.pluginProp("dataset"); + // Build the table reference + TableId tableRef = TableId.of(projectId, dataset, tableName); + // Get the table schema + Schema schema = bigQuery.getTable(tableRef).getDefinition().getSchema(); + + for (int rowIndex = 0; rowIndex < sourceData.size(); rowIndex++) { + JsonObject sourceObject = sourceData.get(rowIndex); + JsonObject targetObject = targetData.get(rowIndex); + + for (Field field : schema.getFields()) { + String columnName = field.getName(); + String columnTypeName = field.getType().getStandardType().toString(); + + if (!sourceObject.has(columnName) || !targetObject.has(columnName)) { + Assert.fail("Column not found in source or target data: " + columnName); + return false; + } + + switch (columnTypeName) { + case "TIMESTAMP": + Timestamp timestampSource = Timestamp.parseTimestamp(sourceObject.get(columnName).getAsString()); + Timestamp timestampTarget = Timestamp.parseTimestamp(targetObject.get(columnName).getAsString()); + Assert.assertEquals("Different values found for column : %s", timestampSource, timestampTarget); + break; + + case "INT64": + int intSource = sourceObject.get(columnName).getAsInt(); + int intTarget = targetObject.get(columnName).getAsInt(); + Assert.assertEquals("Different values found for column : %s", intSource, intTarget); + break; + + case "BOOL": + Boolean booleanSource = Boolean.valueOf(sourceObject.get(columnName).getAsString()); + Boolean booleanTarget = Boolean.valueOf(targetObject.get(columnName).getAsString()); + Assert.assertEquals("Different values found for column : %s", booleanSource, booleanTarget); + break; + case "INTEGER": + Integer integerSource = sourceObject.get(columnName).getAsInt(); + Integer integerTarget = targetObject.get(columnName).getAsInt(); + Assert.assertEquals("Different values found for column : %s", integerSource, integerTarget); + break; + case "STRING": + String source = sourceObject.get(columnName).getAsString(); + String target = targetObject.get(columnName).getAsString(); + Assert.assertEquals("Different values found for column : %s", source, target); + break; + case "DATE": + java.sql.Date dateSource = java.sql.Date.valueOf(sourceObject.get(columnName).getAsString()); + java.sql.Date dateTarget = java.sql.Date.valueOf(targetObject.get(columnName).getAsString()); + Assert.assertEquals("Different values found for column : %s", dateSource, dateTarget); + break; + case "DATETIME": + DateTime sourceDatetime = DateTime.parse(sourceObject.get(columnName).getAsString()); + DateTime targetDateTime = DateTime.parse(targetObject.get(columnName).getAsString()); + Assert.assertEquals("Different values found for column : %s", sourceDatetime, targetDateTime); + break; + case "NUMERIC": + Decimal decimalSource = Decimal.fromDecimal(sourceObject.get(columnName).getAsBigDecimal()); + Decimal decimalTarget = Decimal.fromDecimal(targetObject.get(columnName).getAsBigDecimal()); + Assert.assertEquals("Different values found for column : %s", decimalSource, decimalTarget); + break; + case "BIGNUMERIC": + BigDecimal bigDecimalSource = sourceObject.get(columnName).getAsBigDecimal(); + BigDecimal bigDecimalTarget = targetObject.get(columnName).getAsBigDecimal(); + Assert.assertEquals("Different values found for column : %s", bigDecimalSource, bigDecimalTarget); + break; + case "FLOAT": + Double sourceFloat = sourceObject.get(columnName).getAsDouble(); + Double targetFloat = targetObject.get(columnName).getAsDouble(); + Assert.assertEquals("Different values found for column : %s", sourceFloat, targetFloat); + break; + case "TIME": + Time sourceTime = Time.valueOf(sourceObject.get(columnName).getAsString()); + Time targetTime = Time.valueOf(targetObject.get(columnName).getAsString()); + Assert.assertEquals("Different values found for column : %s", sourceTime, targetTime); + break; + + default: + String sourceValue = sourceObject.get(columnName).toString(); + String targetValue = targetObject.get(columnName).toString(); + Assert.assertEquals("Different values found for column : %s", sourceValue, targetValue); + break; + } + } + } + return true; + } +} diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java new file mode 100644 index 0000000000..215886662d --- /dev/null +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/BigQuery.java @@ -0,0 +1,47 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.bigquery.stepsdesign; + +import io.cdap.e2e.pages.actions.CdfPipelineRunAction; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.common.stepsdesign.TestSetupHooks; +import io.cucumber.java.en.Then; +import org.junit.Assert; +import stepsdesign.BeforeActions; + +import java.io.IOException; + +/** + * BigQuery Plugin validation common step design. + */ +public class BigQuery { + + @Then("Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table") + public void validateTheValuesOfRecordsTransferredToBQsinkIsEqualToTheValuesFromSourceBigQueryTable() + throws InterruptedException, IOException { + int sourceBQRecordsCount = BigQueryClient.countBqQuery(PluginPropertyUtils.pluginProp("bqSourceTable")); + BeforeActions.scenario.write("No of Records from source BigQuery table:" + sourceBQRecordsCount); + Assert.assertEquals("Out records should match with BigQuery source records count", + CdfPipelineRunAction.getCountDisplayedOnSourcePluginAsRecordsOut(), sourceBQRecordsCount); + + boolean recordsMatched = BQValidation.validateSourceBQToTargetBQRecord( + TestSetupHooks.bqSourceTable, TestSetupHooks.bqTargetTable); + Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " + + "of the records in the source table", recordsMatched); + } +} diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 1fc959fd3a..002f713655 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -67,6 +67,7 @@ public class TestSetupHooks { public static String spannerTargetDatabase = StringUtils.EMPTY; public static String spannerTargetTable = StringUtils.EMPTY; public static boolean firstSpannerTestFlag = true; + public static String datasetName = PluginPropertyUtils.pluginProp("dataset"); @Before(order = 1) public static void overrideServiceAccountFilePathIfProvided() { @@ -255,7 +256,7 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce records.append(" (").append(index).append(", ").append((int) (Math.random() * 1000 + 1)).append(", '") .append(UUID.randomUUID()).append("'), "); } - BigQueryClient.getSoleQueryResult("create table `test_automation." + bqSourceTable + "` as " + + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` as " + "SELECT * FROM UNNEST([ " + " STRUCT(1 AS Id, " + ((int) (Math.random() * 1000 + 1)) + " as Value, " + "'" + UUID.randomUUID() + "' as UID), " + @@ -267,7 +268,8 @@ public static void createTempSourceBQTable() throws IOException, InterruptedExce BeforeActions.scenario.write("BQ source Table " + bqSourceTable + " created successfully"); } - @After(order = 1, value = "@BQ_SOURCE_TEST or @BQ_PARTITIONED_SOURCE_TEST or @BQ_SOURCE_DATATYPE_TEST") + @After(order = 1, value = "@BQ_SOURCE_TEST or @BQ_PARTITIONED_SOURCE_TEST or @BQ_SOURCE_DATATYPE_TEST or " + + "@BQ_INSERT_SOURCE_TEST or @BQ_UPDATE_SINK_TEST") public static void deleteTempSourceBQTable() throws IOException, InterruptedException { BigQueryClient.dropBqQuery(bqSourceTable); PluginPropertyUtils.removePluginProp("bqSourceTable"); @@ -285,11 +287,11 @@ public static void deleteTempSourceBQTable() throws IOException, InterruptedExce @Before(order = 1, value = "@BQ_PARTITIONED_SOURCE_TEST") public static void createTempPartitionedSourceBQTable() throws IOException, InterruptedException { bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); - BigQueryClient.getSoleQueryResult("create table `test_automation." + bqSourceTable + "` " + + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + "(transaction_id INT64, transaction_uid STRING, transaction_date DATE ) " + "PARTITION BY _PARTITIONDATE"); try { - BigQueryClient.getSoleQueryResult("INSERT INTO `test_automation." + bqSourceTable + "` " + + BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + "(transaction_id, transaction_uid, transaction_date) " + "SELECT ROW_NUMBER() OVER(ORDER BY GENERATE_UUID()), GENERATE_UUID(), date " + "FROM UNNEST(GENERATE_DATE_ARRAY('2022-01-01', current_date())) AS date"); @@ -362,7 +364,7 @@ public static void createSourceBQViewWithQueries() throws IOException, Interrupt createSourceBQViewWithQueries(PluginPropertyUtils.pluginProp("bqCreateViewQueryFile")); } - @After(order = 2, value = "@BQ_SOURCE_VIEW_TEST") + @After(order = 2, value = "@BQ_SOURCE_VIEW_TESTT") public static void deleteTempSourceBQView() throws IOException, InterruptedException { BigQueryClient.getSoleQueryResult("DROP VIEW IF EXISTS " + PluginPropertyUtils.pluginProp("dataset") + "." + bqSourceView); @@ -763,12 +765,11 @@ public static void setInsertQueryBackWithTableDetailsPlaceholder() { @Before(order = 1, value = "@BQ_SOURCE_BQ_EXECUTE_TEST") public static void createBQTableForBQExecuteTest() throws IOException, InterruptedException { String bqSourceBQExecuteTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); - BigQueryClient.getSoleQueryResult("create table `" + PluginPropertyUtils.pluginProp("dataset") + "." - + bqSourceBQExecuteTable + "` as " + + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceBQExecuteTable + "` as " + "SELECT * FROM UNNEST([ " + " STRUCT(1 AS Id, '" + PluginPropertyUtils.pluginProp("projectId") + "' as ProjectId, " + - "'" + PluginPropertyUtils.pluginProp("dataset") + "' as Dataset)" + "])"); + "'" + datasetName + "' as Dataset)" + "])"); PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceBQExecuteTable); BeforeActions.scenario.write("BQ source Table " + bqSourceBQExecuteTable + " " + "for @BQ_SOURCE_BQ_EXECUTE_TEST created successfully"); @@ -877,4 +878,46 @@ public static void replaceTableDetailsInUpdateQuery() { public static void setUpdateQueryBackWithTableDetailsPlaceholder() { setQueryBackWithTableDetailsPlaceholder("bqExecuteDMLUpdate"); } + + @Before(order = 1, value = "@BQ_INSERT_SOURCE_TEST") + public static void createSourceBQInsertTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-" , "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(PersonID INT64, LastName STRING, " + "FirstName STRING ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(PersonID, LastName, FirstName)" + + "VALUES" + "(5, 'Rani', 'Raja')"); + + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_UPDATE_SINK_TEST") + public static void createSourceBQUpdateTable() throws IOException, InterruptedException { + + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(PersonID INT64,LastName STRING," + + "FirstName STRING ) "); + + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(PersonID, LastName, FirstName)" + + "VALUES" + "(5, 'Kumar', 'Rajan')"); + + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + } + + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } } diff --git a/src/e2e-test/resources/pluginDataCyAttributes.properties b/src/e2e-test/resources/pluginDataCyAttributes.properties index 8d418d0b3f..2f321c0333 100644 --- a/src/e2e-test/resources/pluginDataCyAttributes.properties +++ b/src/e2e-test/resources/pluginDataCyAttributes.properties @@ -2,6 +2,8 @@ projectId=project datasetProjectId=datasetProject referenceName=referenceName table=table +tableKey=relationTableKey +clusterOrder=clusteringOrder dataset=dataset skipHeader=switch-skipHeader path=path diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index 40ee747e62..0e680fd5b2 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -1,9 +1,9 @@ projectId=cdf-athena -dataset=test_automation +datasetprojectId=cdf-athena +dataset=bq_automation serviceAccountType=filePath serviceAccount=auto-detect csvFormat=csv - ## GCS-PLUGIN-PROPERTIES-START gcsMandatoryProperties=referenceName, path, format gcsInvalidBucketName=invalid?BucketName @@ -158,6 +158,9 @@ bqFuturePartitionStartDate=2099-05-01 bqFuturePartitionEndDate=2099-10-02 bqTruncateTableTrue=True bqUpdateTableSchemaTrue=True +clusterValue=transaction_date +TableKey=PersonID +bqSourceTable=dummy bqCreateTableQueryFile=testdata/BigQuery/BigQueryCreateTableQuery.txt bqInsertDataQueryFile=testdata/BigQuery/BigQueryInsertDataQuery.txt bqCreateViewQueryFile=testdata/BigQuery/BigQueryCreateViewQuery.txt